/*
 * Decompiled with CFR 0.152.
 */
package io.lettuce.core;

import io.lettuce.core.KeyScanCursor;
import io.lettuce.core.KeyValue;
import io.lettuce.core.Operators;
import io.lettuce.core.ScanArgs;
import io.lettuce.core.ScanCursor;
import io.lettuce.core.ScoredValue;
import io.lettuce.core.ScoredValueScanCursor;
import io.lettuce.core.ValueScanCursor;
import io.lettuce.core.api.reactive.RedisHashReactiveCommands;
import io.lettuce.core.api.reactive.RedisKeyReactiveCommands;
import io.lettuce.core.api.reactive.RedisSetReactiveCommands;
import io.lettuce.core.api.reactive.RedisSortedSetReactiveCommands;
import io.lettuce.core.internal.LettuceAssert;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;

public abstract class ScanStream {
    private ScanStream() {
    }

    public static <K, V> Flux<K> scan(RedisKeyReactiveCommands<K, V> commands) {
        return ScanStream.scan(commands, Optional.empty());
    }

    public static <K, V> Flux<K> scan(RedisKeyReactiveCommands<K, V> commands, ScanArgs scanArgs) {
        LettuceAssert.notNull((Object)scanArgs, "ScanArgs must not be null");
        return ScanStream.scan(commands, Optional.of(scanArgs));
    }

    private static <K, V> Flux<K> scan(RedisKeyReactiveCommands<K, V> commands, Optional<ScanArgs> scanArgs) {
        LettuceAssert.notNull(commands, "RedisKeyCommands must not be null");
        return Flux.create(sink -> {
            Mono res = scanArgs.map(commands::scan).orElseGet(commands::scan);
            ScanStream.scan(sink, res, c -> scanArgs.map(it -> commands.scan((ScanCursor)c, (ScanArgs)it)).orElseGet(() -> commands.scan((ScanCursor)c)), KeyScanCursor::getKeys);
        });
    }

    public static <K, V> Flux<KeyValue<K, V>> hscan(RedisHashReactiveCommands<K, V> commands, K key) {
        return ScanStream.hscan(commands, key, Optional.empty());
    }

    public static <K, V> Flux<KeyValue<K, V>> hscan(RedisHashReactiveCommands<K, V> commands, K key, ScanArgs scanArgs) {
        LettuceAssert.notNull((Object)scanArgs, "ScanArgs must not be null");
        return ScanStream.hscan(commands, key, Optional.of(scanArgs));
    }

    private static <K, V> Flux<KeyValue<K, V>> hscan(RedisHashReactiveCommands<K, V> commands, K key, Optional<ScanArgs> scanArgs) {
        LettuceAssert.notNull(commands, "RedisHashReactiveCommands must not be null");
        LettuceAssert.notNull(key, "Key must not be null");
        return Flux.create(sink -> {
            Mono res = scanArgs.map(it -> commands.hscan(key, (ScanArgs)it)).orElseGet(() -> commands.hscan(key));
            ScanStream.scan(sink, res, c -> scanArgs.map(it -> commands.hscan(key, (ScanCursor)c, (ScanArgs)it)).orElseGet(() -> commands.hscan(key, (ScanCursor)c)), c -> {
                ArrayList list = new ArrayList(c.getMap().size());
                for (Map.Entry kvEntry : c.getMap().entrySet()) {
                    list.add(KeyValue.fromNullable(kvEntry.getKey(), kvEntry.getValue()));
                }
                return list;
            });
        });
    }

    public static <K, V> Flux<V> sscan(RedisSetReactiveCommands<K, V> commands, K key) {
        return ScanStream.sscan(commands, key, Optional.empty());
    }

    public static <K, V> Flux<V> sscan(RedisSetReactiveCommands<K, V> commands, K key, ScanArgs scanArgs) {
        LettuceAssert.notNull((Object)scanArgs, "ScanArgs must not be null");
        return ScanStream.sscan(commands, key, Optional.of(scanArgs));
    }

    private static <K, V> Flux<V> sscan(RedisSetReactiveCommands<K, V> commands, K key, Optional<ScanArgs> scanArgs) {
        LettuceAssert.notNull(commands, "RedisSetReactiveCommands must not be null");
        LettuceAssert.notNull(key, "Key must not be null");
        return Flux.create(sink -> {
            Mono res = scanArgs.map(it -> commands.sscan(key, (ScanArgs)it)).orElseGet(() -> commands.sscan(key));
            ScanStream.scan(sink, res, c -> scanArgs.map(it -> commands.sscan(key, (ScanCursor)c, (ScanArgs)it)).orElseGet(() -> commands.sscan(key, (ScanCursor)c)), ValueScanCursor::getValues);
        });
    }

    public static <K, V> Flux<ScoredValue<V>> zscan(RedisSortedSetReactiveCommands<K, V> commands, K key) {
        return ScanStream.zscan(commands, key, Optional.empty());
    }

    public static <K, V> Flux<ScoredValue<V>> zscan(RedisSortedSetReactiveCommands<K, V> commands, K key, ScanArgs scanArgs) {
        LettuceAssert.notNull((Object)scanArgs, "ScanArgs must not be null");
        return ScanStream.zscan(commands, key, Optional.of(scanArgs));
    }

    private static <K, V> Flux<ScoredValue<V>> zscan(RedisSortedSetReactiveCommands<K, V> commands, K key, Optional<ScanArgs> scanArgs) {
        LettuceAssert.notNull(commands, "RedisSortedSetReactiveCommands must not be null");
        LettuceAssert.notNull(key, "Key must not be null");
        return Flux.create(sink -> {
            Mono res = scanArgs.map(it -> commands.zscan(key, (ScanArgs)it)).orElseGet(() -> commands.zscan(key));
            ScanStream.scan(sink, res, c -> scanArgs.map(it -> commands.zscan(key, (ScanCursor)c, (ScanArgs)it)).orElseGet(() -> commands.zscan(key, (ScanCursor)c)), ScoredValueScanCursor::getValues);
        });
    }

    private static <V, C extends ScanCursor> void scan(FluxSink<V> sink, Mono<C> initialCursor, Function<ScanCursor, Mono<C>> scanFunction, Function<C, Collection<V>> manyMapper) {
        new SubscriptionAdapter<V, C>(sink, initialCursor, scanFunction, manyMapper).register();
    }

    static interface Completable {
        public void chunkCompleted();

        public void onError(Throwable var1);
    }

    static class ScanSubscriber<T, C extends ScanCursor>
    extends BaseSubscriber<C> {
        private static final AtomicReferenceFieldUpdater<ScanSubscriber, ScanCursor> CURSOR = AtomicReferenceFieldUpdater.newUpdater(ScanSubscriber.class, ScanCursor.class, "cursor");
        private static final AtomicLongFieldUpdater<ScanSubscriber> EMITTED = AtomicLongFieldUpdater.newUpdater(ScanSubscriber.class, "emitted");
        private final Completable completable;
        private final FluxSink<T> sink;
        private final Queue<T> buffer = Operators.newQueue();
        private final Context context;
        private final Function<C, Collection<T>> manyMapper;
        volatile boolean canceled;
        private volatile C cursor;
        private volatile long emitted;
        private volatile long cursorSize;

        ScanSubscriber(Completable completable, FluxSink<T> sink, Context context, Function<C, Collection<T>> manyMapper) {
            this.completable = completable;
            this.sink = sink;
            this.context = context;
            this.manyMapper = manyMapper;
        }

        public Context currentContext() {
            return this.context;
        }

        protected void hookOnNext(C cursor) {
            if (!CURSOR.compareAndSet(this, (ScanCursor)null, (ScanCursor)cursor)) {
                Operators.onOperatorError((Subscription)this, new IllegalStateException("Cannot propagate Cursor"), cursor, this.context);
                return;
            }
            Collection<T> items = this.manyMapper.apply(cursor);
            this.cursorSize = items.size();
            this.emitDirect(items);
        }

        void emitDirect(Iterable<T> iterable) {
            long demand = this.sink.requestedFromDownstream();
            long sent = 0L;
            for (T value : iterable) {
                if (this.canceled) break;
                if (demand <= sent) {
                    this.buffer.add(value);
                    continue;
                }
                ++sent;
                this.next(value);
            }
        }

        void emitFromBuffer() {
            long demand = this.sink.requestedFromDownstream();
            long sent = 0L;
            if (demand > 0L) {
                T value;
                while ((value = this.buffer.poll()) != null && !this.canceled) {
                    this.next(value);
                    if (demand > ++sent) continue;
                    break;
                }
            }
        }

        private void next(T value) {
            EMITTED.incrementAndGet(this);
            this.sink.next(value);
        }

        protected void hookOnComplete() {
            this.completable.chunkCompleted();
        }

        protected void hookOnError(Throwable throwable) {
            this.completable.onError(throwable);
        }

        protected void hookOnCancel() {
            this.canceled = true;
        }

        public ScanCursor getCursor() {
            return CURSOR.get(this);
        }

        public boolean isExhausted() {
            return EMITTED.get(this) == this.cursorSize && this.getCursor() != null;
        }
    }

    static class SubscriptionAdapter<T, C extends ScanCursor>
    implements Completable {
        private static final AtomicReferenceFieldUpdater<SubscriptionAdapter, ScanSubscriber> SUBSCRIBER = AtomicReferenceFieldUpdater.newUpdater(SubscriptionAdapter.class, ScanSubscriber.class, "currentSubscription");
        private static final AtomicIntegerFieldUpdater<SubscriptionAdapter> STATUS = AtomicIntegerFieldUpdater.newUpdater(SubscriptionAdapter.class, "status");
        private static final int STATUS_ACTIVE = 0;
        private static final int STATUS_TERMINATED = 0;
        private volatile ScanSubscriber<T, C> currentSubscription;
        private volatile boolean canceled;
        private volatile int status = 0;
        private final FluxSink<T> sink;
        private final Context context;
        private final Mono<C> initial;
        private final Function<ScanCursor, Mono<C>> scanFunction;
        private final Function<C, Collection<T>> manyMapper;

        SubscriptionAdapter(FluxSink<T> sink, Mono<C> initial, Function<ScanCursor, Mono<C>> scanFunction, Function<C, Collection<T>> manyMapper) {
            this.sink = sink;
            this.context = sink.currentContext();
            this.initial = initial;
            this.scanFunction = scanFunction;
            this.manyMapper = manyMapper;
        }

        public void register() {
            this.sink.onRequest(this::onDemand);
            this.sink.onCancel(this::canceled);
        }

        void onDemand(long n) {
            if (this.canceled) {
                return;
            }
            ScanSubscriber<T, C> current = this.getCurrentSubscriber();
            if (current == null) {
                current = new ScanSubscriber<T, C>(this, this.sink, this.context, this.manyMapper);
                if (SUBSCRIBER.compareAndSet(this, null, current)) {
                    this.initial.subscribe(current);
                }
                return;
            }
            ScanCursor cursor = current.getCursor();
            if (cursor == null) {
                return;
            }
            current.emitFromBuffer();
            if (!current.isExhausted() || current.canceled || this.sink.requestedFromDownstream() == 0L) {
                return;
            }
            if (cursor.isFinished()) {
                this.chunkCompleted();
                return;
            }
            Mono<C> next = this.scanFunction.apply(cursor);
            ScanSubscriber<T, C> nextSubscriber = new ScanSubscriber<T, C>(this, this.sink, this.context, this.manyMapper);
            if (SUBSCRIBER.compareAndSet(this, current, nextSubscriber)) {
                next.subscribe(nextSubscriber);
            }
        }

        private void canceled() {
            this.canceled = true;
            ScanSubscriber<T, C> current = this.getCurrentSubscriber();
            if (current != null) {
                current.cancel();
            }
        }

        @Override
        public void chunkCompleted() {
            if (this.canceled) {
                return;
            }
            ScanSubscriber<T, C> current = this.getCurrentSubscriber();
            if (current == null) {
                return;
            }
            ScanCursor cursor = current.getCursor();
            if (cursor == null) {
                return;
            }
            if (cursor.isFinished() && current.isExhausted()) {
                if (this.terminate()) {
                    this.sink.complete();
                }
            } else {
                this.onDemand(0L);
            }
        }

        ScanSubscriber<T, C> getCurrentSubscriber() {
            return SUBSCRIBER.get(this);
        }

        @Override
        public void onError(Throwable throwable) {
            if (!this.canceled && this.terminate()) {
                this.sink.error(throwable);
            }
        }

        protected boolean terminate() {
            return STATUS.compareAndSet(this, 0, 0);
        }
    }
}

