/*
 * Decompiled with CFR 0.152.
 */
package co.paralleluniverse.strands.channels.reactivestreams;

import co.paralleluniverse.fibers.Fiber;
import co.paralleluniverse.fibers.FiberFactory;
import co.paralleluniverse.strands.SuspendableCallable;
import co.paralleluniverse.strands.channels.ReceivePort;
import co.paralleluniverse.strands.channels.reactivestreams.ChannelSubscription;
import java.util.concurrent.atomic.AtomicBoolean;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

class ChannelPublisher<T>
implements Publisher<T> {
    private final FiberFactory ff;
    private final Object channel;
    private final AtomicBoolean subscribed;
    private static final FiberFactory defaultFiberFactory = new FiberFactory(){

        public <T> Fiber<T> newFiber(SuspendableCallable<T> target) {
            return new Fiber(target);
        }
    };

    public ChannelPublisher(FiberFactory ff, Object channel, boolean singleSubscriber) {
        this.ff = ff != null ? ff : defaultFiberFactory;
        this.channel = channel;
        this.subscribed = singleSubscriber ? new AtomicBoolean() : null;
    }

    public void subscribe(Subscriber<? super T> s) {
        if (s == null) {
            throw new NullPointerException();
        }
        try {
            if (this.subscribed != null && !this.subscribed.compareAndSet(false, true)) {
                throw new RuntimeException("already subscribed");
            }
            this.ff.newFiber(this.newChannelSubscription(s, this.channel)).start();
        }
        catch (Exception e) {
            this.failedSubscribe(s, e);
        }
    }

    protected void failedSubscribe(Subscriber<? super T> s, Throwable t) {
        s.onSubscribe(this.newChannelSubscription(s, this.channel));
        s.onError(t);
    }

    protected ChannelSubscription<T> newChannelSubscription(Subscriber<? super T> s, Object channel) {
        return new ChannelSubscription<T>(s, (ReceivePort)channel);
    }
}

