/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.raft;

import java.nio.ByteBuffer;
import java.util.OptionalInt;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.protocol.types.Type;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.raft.AckMode;
import org.apache.kafka.raft.Isolation;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.raft.RaftClient;
import org.slf4j.Logger;

public class ReplicatedCounter {
    private final int localBrokerId;
    private final Logger log;
    private final RaftClient client;
    private final AtomicInteger committed = new AtomicInteger(0);
    private final AtomicInteger uncommitted = new AtomicInteger(0);
    private OffsetAndEpoch position = new OffsetAndEpoch(0L, 0);
    private LeaderAndEpoch currentLeaderAndEpoch = new LeaderAndEpoch(OptionalInt.empty(), 0);

    public ReplicatedCounter(int localBrokerId, RaftClient client, LogContext logContext) {
        this.localBrokerId = localBrokerId;
        this.client = client;
        this.log = logContext.logger(ReplicatedCounter.class);
    }

    private Records tryRead(long durationMs) {
        CompletableFuture<Records> future = this.client.read(this.position, Isolation.COMMITTED, durationMs);
        try {
            return future.get();
        }
        catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    private void apply(Record record) {
        int value = this.deserialize(record);
        if (value != this.committed.get() + 1) {
            this.log.debug("Ignoring non-sequential append at offset {}: {} -> {}", new Object[]{record.offset(), this.committed.get(), value});
            return;
        }
        this.log.debug("Applied increment at offset {}: {} -> {}", new Object[]{record.offset(), this.committed.get(), value});
        this.committed.set(value);
        if (value > this.uncommitted.get()) {
            this.uncommitted.set(value);
        }
    }

    public synchronized void poll(long durationMs) {
        LeaderAndEpoch latestLeaderAndEpoch = this.client.currentLeaderAndEpoch();
        if (!this.currentLeaderAndEpoch.equals(latestLeaderAndEpoch)) {
            if (this.localBrokerId == latestLeaderAndEpoch.leaderId.orElse(-1)) {
                this.uncommitted.set(this.committed.get());
            }
            this.currentLeaderAndEpoch = latestLeaderAndEpoch;
        }
        Records records = this.tryRead(durationMs);
        for (RecordBatch batch : records.batches()) {
            if (!batch.isControlBatch()) {
                batch.forEach(this::apply);
            }
            this.position = new OffsetAndEpoch(batch.lastOffset() + 1L, batch.partitionLeaderEpoch());
        }
    }

    public synchronized boolean isWritable() {
        return this.localBrokerId == this.currentLeaderAndEpoch.leaderId.orElse(-1);
    }

    public synchronized void increment() {
        if (!this.isWritable()) {
            throw new KafkaException("Counter is not currently writable");
        }
        int initialValue = this.uncommitted.get();
        int incrementedValue = this.uncommitted.incrementAndGet();
        MemoryRecords records = MemoryRecords.withRecords((CompressionType)CompressionType.NONE, (SimpleRecord[])new SimpleRecord[]{this.serialize(incrementedValue)});
        this.client.append((Records)records, AckMode.LEADER, Integer.MAX_VALUE).whenComplete((offsetAndEpoch, throwable) -> {
            if (offsetAndEpoch != null) {
                this.log.debug("Appended increment at offset {}: {} -> {}", new Object[]{offsetAndEpoch.offset, initialValue, incrementedValue});
            } else {
                this.uncommitted.set(initialValue);
                this.log.debug("Failed append of increment: {} -> {}", new Object[]{initialValue, incrementedValue, throwable});
            }
        });
    }

    private SimpleRecord serialize(int value) {
        ByteBuffer buffer = ByteBuffer.allocate(4);
        Type.INT32.write(buffer, (Object)value);
        buffer.flip();
        return new SimpleRecord(buffer);
    }

    private int deserialize(Record record) {
        return (Integer)Type.INT32.read(record.value());
    }
}

