/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.mutiny.operators.multi;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.helpers.Subscriptions;
import io.smallrye.mutiny.helpers.queues.Queues;
import io.smallrye.mutiny.operators.MultiOperator;
import io.smallrye.mutiny.operators.multi.MultiOperatorProcessor;
import io.smallrye.mutiny.subscription.BackPressureStrategy;
import io.smallrye.mutiny.subscription.MultiSubscriber;
import io.smallrye.mutiny.subscription.PausableMulti;
import java.util.Queue;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

public class MultiDemandPausingOp<T>
extends MultiOperator<T, T>
implements PausableMulti {
    private volatile PausableProcessor processor;
    private final AtomicBoolean paused;
    private final AtomicBoolean subscribed = new AtomicBoolean();
    private final boolean lateSubscription;
    private final int bufferSize;
    private final boolean unbounded;
    private final BackPressureStrategy backPressureStrategy;

    public MultiDemandPausingOp(Multi<T> upstream, boolean initiallyPaused, boolean lateSubscription, int bufferSize, boolean unbounded, BackPressureStrategy backPressureStrategy) {
        super(upstream);
        this.paused = new AtomicBoolean(initiallyPaused);
        this.lateSubscription = lateSubscription;
        this.bufferSize = bufferSize;
        this.unbounded = unbounded;
        this.backPressureStrategy = backPressureStrategy;
    }

    @Override
    public void subscribe(MultiSubscriber<? super T> subscriber) {
        this.processor = new PausableProcessor(subscriber);
        if (!this.lateSubscription || !this.paused.get()) {
            this.subscribed.set(true);
            this.upstream().subscribe(this.processor);
        }
    }

    @Override
    public boolean isPaused() {
        return this.paused.get();
    }

    @Override
    public void pause() {
        this.paused.set(true);
    }

    @Override
    public void resume() {
        PausableProcessor p;
        if (this.paused.compareAndSet(true, false) && (p = this.processor) != null) {
            if (this.lateSubscription && this.subscribed.compareAndSet(false, true)) {
                this.upstream().subscribe(p);
            }
            p.resume();
        }
    }

    @Override
    public int bufferSize() {
        PausableProcessor p = this.processor;
        if (p != null) {
            return p.queueSize();
        }
        return 0;
    }

    @Override
    public boolean clearBuffer() {
        PausableProcessor p;
        if (this.paused.get() && (p = this.processor) != null) {
            p.clearQueue();
            return true;
        }
        return false;
    }

    private class PausableProcessor
    extends MultiOperatorProcessor<T, T> {
        private final AtomicLong demand;
        private final Queue<T> queue;
        private final AtomicInteger wip;
        private final AtomicInteger strictBoundCounter;
        private volatile boolean upstreamCompleted;
        private final AtomicBoolean clearQueue;

        PausableProcessor(MultiSubscriber<? super T> downstream) {
            super(downstream);
            this.demand = new AtomicLong();
            this.wip = new AtomicInteger();
            this.strictBoundCounter = new AtomicInteger(0);
            this.clearQueue = new AtomicBoolean();
            this.queue = MultiDemandPausingOp.this.backPressureStrategy == BackPressureStrategy.BUFFER ? (MultiDemandPausingOp.this.unbounded ? Queues.unbounded(MultiDemandPausingOp.this.bufferSize).get() : Queues.get(MultiDemandPausingOp.this.bufferSize).get()) : null;
        }

        void resume() {
            Flow.Subscription subscription = this.getUpstreamSubscription();
            if (subscription == Subscriptions.CANCELLED) {
                return;
            }
            this.drain();
            long currentDemand = this.demand.get();
            if (currentDemand > 0L) {
                Subscriptions.produced(this.demand, currentDemand);
                subscription.request(currentDemand);
            }
        }

        void drain() {
            if (this.queue == null) {
                if (this.upstreamCompleted) {
                    super.onCompletion();
                }
                return;
            }
            if (this.wip.getAndIncrement() > 0) {
                return;
            }
            do {
                Object item;
                Queue qe = this.queue;
                while (!MultiDemandPausingOp.this.paused.get() && (item = qe.poll()) != null) {
                    if (!MultiDemandPausingOp.this.unbounded) {
                        this.strictBoundCounter.decrementAndGet();
                    }
                    if (this.clearQueue.get()) break;
                    this.downstream.onItem(item);
                }
                if (!MultiDemandPausingOp.this.paused.get() && this.upstreamCompleted) {
                    super.onCompletion();
                }
                if (!this.clearQueue.compareAndSet(true, false)) continue;
                this.queue.clear();
                this.strictBoundCounter.set(0);
            } while (this.wip.decrementAndGet() != 0);
        }

        void clearQueue() {
            if (this.queue != null && this.clearQueue.compareAndSet(false, true) && this.wip.getAndIncrement() == 0) {
                this.queue.clear();
                this.clearQueue.set(false);
                this.strictBoundCounter.set(0);
                this.wip.decrementAndGet();
            }
        }

        int queueSize() {
            return this.queue != null ? this.queue.size() : 0;
        }

        @Override
        public void onItem(T item) {
            if (MultiDemandPausingOp.this.backPressureStrategy != BackPressureStrategy.IGNORE && MultiDemandPausingOp.this.paused.get()) {
                if (MultiDemandPausingOp.this.backPressureStrategy == BackPressureStrategy.DROP) {
                    return;
                }
                if (!MultiDemandPausingOp.this.unbounded && this.strictBoundCounter.getAndIncrement() >= MultiDemandPausingOp.this.bufferSize || !this.queue.offer(item)) {
                    this.onFailure(new IllegalStateException("Buffer overflow: cannot buffer more than " + MultiDemandPausingOp.this.bufferSize + " items"));
                }
            } else {
                super.onItem(item);
            }
        }

        @Override
        public void request(long numberOfItems) {
            if (numberOfItems <= 0L) {
                this.onFailure(Subscriptions.getInvalidRequestException());
                return;
            }
            Flow.Subscription subscription = this.getUpstreamSubscription();
            if (subscription == Subscriptions.CANCELLED) {
                return;
            }
            try {
                Subscriptions.add(this.demand, numberOfItems);
                if (MultiDemandPausingOp.this.paused.get()) {
                    return;
                }
                long currentDemand = this.demand.get();
                if (currentDemand > 0L) {
                    Subscriptions.produced(this.demand, currentDemand);
                    subscription.request(currentDemand);
                }
            }
            catch (Throwable failure) {
                this.onFailure(failure);
            }
        }

        @Override
        public void cancel() {
            this.clearQueue();
            MultiDemandPausingOp.this.processor = null;
            super.cancel();
        }

        @Override
        public void onFailure(Throwable failure) {
            this.clearQueue();
            super.onFailure(failure);
        }

        @Override
        public void onCompletion() {
            this.upstreamCompleted = true;
            this.drain();
        }
    }
}

