/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.mutiny.vertx;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.helpers.Subscriptions;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.smallrye.mutiny.operators.AbstractMulti;
import io.vertx.core.streams.ReadStream;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public class MultiReadStream<T, U>
extends AbstractMulti<U>
implements Multi<U> {
    private final ReadStream<T> source;
    private final Function<T, U> transformation;
    private final AtomicReference<Subscription> upstream;

    public MultiReadStream(ReadStream<T> source, Function<T, U> transformation) {
        source.pause();
        this.source = source;
        this.transformation = Infrastructure.decorate(transformation);
        this.upstream = new AtomicReference();
    }

    private void release() {
        Subscription sub = this.upstream.get();
        if (sub != null && this.upstream.compareAndSet(sub, null)) {
            try {
                this.source.exceptionHandler(null);
                this.source.endHandler(null);
                this.source.handler(null);
            }
            catch (Exception exception) {
            }
            finally {
                try {
                    this.source.resume();
                }
                catch (Exception exception) {}
            }
        }
    }

    public void subscribe(Subscriber<? super U> downstream) {
        Subscription sub = new Subscription(){

            public void request(long req) {
                if (MultiReadStream.this.upstream.get() == this) {
                    MultiReadStream.this.source.fetch(req);
                }
            }

            public void cancel() {
                MultiReadStream.this.release();
            }
        };
        if (!this.upstream.compareAndSet(null, sub)) {
            Subscriptions.fail(downstream, (Throwable)new IllegalStateException("This processor allows only a single Subscriber"));
            return;
        }
        this.source.pause();
        this.source.endHandler(v -> {
            this.release();
            downstream.onComplete();
        });
        this.source.exceptionHandler(err -> {
            this.release();
            downstream.onError(err);
        });
        this.source.handler(item -> downstream.onNext(this.transformation.apply(item)));
        downstream.onSubscribe(sub);
    }
}

