/*
 * Decompiled with CFR 0.152.
 */
package co.paralleluniverse.strands.channels.reactivestreams;

import co.paralleluniverse.fibers.FiberFactory;
import co.paralleluniverse.strands.SuspendableAction2;
import co.paralleluniverse.strands.channels.Channel;
import co.paralleluniverse.strands.channels.Channels;
import co.paralleluniverse.strands.channels.ReceivePort;
import co.paralleluniverse.strands.channels.SendPort;
import co.paralleluniverse.strands.channels.Topic;
import co.paralleluniverse.strands.channels.reactivestreams.ChannelProcessor;
import co.paralleluniverse.strands.channels.reactivestreams.ChannelPublisher;
import co.paralleluniverse.strands.channels.reactivestreams.ChannelSubscriber;
import co.paralleluniverse.strands.channels.reactivestreams.ChannelSubscription;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

public class ReactiveStreams {
    public static <T> ReceivePort<T> subscribe(int bufferSize, Channels.OverflowPolicy policy, Publisher<T> publisher) {
        Channel channel = Channels.newChannel((int)bufferSize, (Channels.OverflowPolicy)policy, (boolean)true, (boolean)true);
        ChannelSubscriber sub = new ChannelSubscriber(channel, false);
        publisher.subscribe(sub);
        return sub;
    }

    public static <T> Publisher<T> toPublisher(ReceivePort<T> channel, FiberFactory ff) {
        if (Channels.isTickerChannel(channel)) {
            return new ChannelPublisher<T>(ff, channel, false){

                @Override
                protected ChannelSubscription<T> newChannelSubscription(Subscriber<? super T> s, Object channel) {
                    return super.newChannelSubscription(s, Channels.newTickerConsumerFor((Channel)((Channel)channel)));
                }
            };
        }
        return new ChannelPublisher(ff, channel, true);
    }

    public static <T> Publisher<T> toPublisher(ReceivePort<T> channel) {
        return ReactiveStreams.toPublisher(channel, null);
    }

    public static <T> Publisher<T> toPublisher(Topic<T> topic, FiberFactory ff) {
        return new ChannelPublisher<T>(ff, topic, false){

            @Override
            protected ChannelSubscription<T> newChannelSubscription(Subscriber<? super T> s, Object channel) {
                final Topic topic = (Topic)channel;
                final Channel ch = Channels.newChannel((int)0);
                try {
                    topic.subscribe((SendPort)ch);
                    return new ChannelSubscription<T>(s, (ReceivePort)ch){

                        @Override
                        public void cancel() {
                            super.cancel();
                            topic.unsubscribe((SendPort)ch);
                        }
                    };
                }
                catch (Exception e) {
                    topic.unsubscribe((SendPort)ch);
                    throw e;
                }
            }
        };
    }

    public static <T> Publisher<T> toPublisher(Topic<T> topic) {
        return ReactiveStreams.toPublisher(topic, null);
    }

    public static <T, R> Processor<T, R> toProcessor(FiberFactory ff, int bufferSize, Channels.OverflowPolicy policy, SuspendableAction2<? extends ReceivePort<? super T>, ? extends SendPort<? extends R>> transformer) {
        Channel in = Channels.newChannel((int)bufferSize, (Channels.OverflowPolicy)policy, (boolean)true, (boolean)true);
        Channel out = Channels.newChannel((int)bufferSize, (Channels.OverflowPolicy)policy, (boolean)true, (boolean)true);
        return new ChannelProcessor(ff, false, in, out, transformer);
    }

    public static <T, R> Processor<T, R> toProcessor(int bufferSize, Channels.OverflowPolicy policy, SuspendableAction2<? extends ReceivePort<? super T>, ? extends SendPort<? extends R>> transformer) {
        return ReactiveStreams.toProcessor(null, bufferSize, policy, transformer);
    }
}

