/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.keepalive;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.rsocket.frame.KeepAliveFrameCodec;
import io.rsocket.keepalive.KeepAliveFramesAcceptor;
import io.rsocket.resume.ResumeStateHolder;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.Consumer;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

public abstract class KeepAliveSupport
implements KeepAliveFramesAcceptor {
    final ByteBufAllocator allocator;
    final Scheduler scheduler;
    final Duration keepAliveInterval;
    final Duration keepAliveTimeout;
    final long keepAliveTimeoutMillis;
    volatile int state;
    static final AtomicIntegerFieldUpdater<KeepAliveSupport> STATE = AtomicIntegerFieldUpdater.newUpdater(KeepAliveSupport.class, "state");
    static final int STOPPED_STATE = 0;
    static final int STARTING_STATE = 1;
    static final int STARTED_STATE = 2;
    static final int DISPOSED_STATE = -1;
    volatile Consumer<KeepAlive> onTimeout;
    volatile Consumer<ByteBuf> onFrameSent;
    Disposable ticksDisposable;
    volatile ResumeStateHolder resumeStateHolder;
    volatile long lastReceivedMillis;

    private KeepAliveSupport(ByteBufAllocator allocator, int keepAliveInterval, int keepAliveTimeout) {
        this.allocator = allocator;
        this.scheduler = Schedulers.parallel();
        this.keepAliveInterval = Duration.ofMillis(keepAliveInterval);
        this.keepAliveTimeout = Duration.ofMillis(keepAliveTimeout);
        this.keepAliveTimeoutMillis = keepAliveTimeout;
    }

    public KeepAliveSupport start() {
        if (this.state == 0 && STATE.compareAndSet(this, 0, 1)) {
            Disposable disposable;
            this.lastReceivedMillis = this.scheduler.now(TimeUnit.MILLISECONDS);
            this.ticksDisposable = disposable = Flux.interval((Duration)this.keepAliveInterval, (Scheduler)this.scheduler).subscribe(v -> this.onIntervalTick());
            if (this.state != 1 || !STATE.compareAndSet(this, 1, 2)) {
                disposable.dispose();
            }
        }
        return this;
    }

    public void stop() {
        this.terminate(0);
    }

    @Override
    public void receive(ByteBuf keepAliveFrame) {
        this.lastReceivedMillis = this.scheduler.now(TimeUnit.MILLISECONDS);
        if (this.resumeStateHolder != null) {
            long remoteLastReceivedPos = KeepAliveFrameCodec.lastPosition(keepAliveFrame);
            this.resumeStateHolder.onImpliedPosition(remoteLastReceivedPos);
        }
        if (KeepAliveFrameCodec.respondFlag(keepAliveFrame)) {
            long localLastReceivedPos = this.localLastReceivedPosition();
            this.send(KeepAliveFrameCodec.encode(this.allocator, false, localLastReceivedPos, KeepAliveFrameCodec.data(keepAliveFrame).retain()));
        }
    }

    public KeepAliveSupport resumeState(ResumeStateHolder resumeStateHolder) {
        this.resumeStateHolder = resumeStateHolder;
        return this;
    }

    public KeepAliveSupport onSendKeepAliveFrame(Consumer<ByteBuf> onFrameSent) {
        this.onFrameSent = onFrameSent;
        return this;
    }

    public KeepAliveSupport onTimeout(Consumer<KeepAlive> onTimeout) {
        this.onTimeout = onTimeout;
        return this;
    }

    public void dispose() {
        this.terminate(-1);
    }

    public boolean isDisposed() {
        return this.ticksDisposable.isDisposed();
    }

    abstract void onIntervalTick();

    void send(ByteBuf frame) {
        if (this.onFrameSent != null) {
            this.onFrameSent.accept(frame);
        }
    }

    void tryTimeout() {
        long now = this.scheduler.now(TimeUnit.MILLISECONDS);
        if (now - this.lastReceivedMillis >= this.keepAliveTimeoutMillis) {
            if (this.onTimeout != null) {
                this.onTimeout.accept(new KeepAlive(this.keepAliveInterval, this.keepAliveTimeout));
            }
            this.stop();
        }
    }

    void terminate(int terminationState) {
        Disposable disposable;
        int state;
        do {
            if ((state = this.state) == 0 || state == -1) {
                return;
            }
            disposable = this.ticksDisposable;
        } while (!STATE.compareAndSet(this, state, terminationState));
        disposable.dispose();
    }

    long localLastReceivedPosition() {
        return this.resumeStateHolder != null ? this.resumeStateHolder.impliedPosition() : 0L;
    }

    public static final class KeepAlive {
        private final Duration tickPeriod;
        private final Duration timeoutMillis;

        public KeepAlive(Duration tickPeriod, Duration timeoutMillis) {
            this.tickPeriod = tickPeriod;
            this.timeoutMillis = timeoutMillis;
        }

        public Duration getTickPeriod() {
            return this.tickPeriod;
        }

        public Duration getTimeout() {
            return this.timeoutMillis;
        }
    }

    public static final class ClientKeepAliveSupport
    extends KeepAliveSupport {
        public ClientKeepAliveSupport(ByteBufAllocator allocator, int keepAliveInterval, int keepAliveTimeout) {
            super(allocator, keepAliveInterval, keepAliveTimeout);
        }

        @Override
        void onIntervalTick() {
            this.tryTimeout();
            this.send(KeepAliveFrameCodec.encode(this.allocator, true, this.localLastReceivedPosition(), Unpooled.EMPTY_BUFFER));
        }
    }
}

