package org.apache.kafka.clients.consumer.internals;

import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.IllegalGenerationException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Count;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Meter;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.HeartbeatRequest;
import org.apache.kafka.common.requests.HeartbeatResponse;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.JoinGroupResponse;
import org.apache.kafka.common.requests.LeaveGroupRequest;
import org.apache.kafka.common.requests.LeaveGroupResponse;
import org.apache.kafka.common.requests.SyncGroupRequest;
import org.apache.kafka.common.requests.SyncGroupResponse;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/kafka/clients/consumer/internals/AbstractCoordinator.class */
public abstract class AbstractCoordinator implements Closeable {
    public static final String HEARTBEAT_THREAD_PREFIX = "kafka-coordinator-heartbeat-thread";
    private final Logger log;
    private final int sessionTimeoutMs;
    private final boolean leaveGroupOnClose;
    private final GroupCoordinatorMetrics sensors;
    private final Heartbeat heartbeat;
    protected final int rebalanceTimeoutMs;
    protected final String groupId;
    protected final ConsumerNetworkClient client;
    protected final Time time;
    protected final long retryBackoffMs;
    private HeartbeatThread heartbeatThread = null;
    private boolean rejoinNeeded = true;
    private boolean needsJoinPrepare = true;
    private MemberState state = MemberState.UNJOINED;
    private RequestFuture<ByteBuffer> joinFuture = null;
    private Node coordinator = null;
    private Generation generation = Generation.NO_GENERATION;
    private RequestFuture<Void> findCoordinatorFuture = null;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/kafka/clients/consumer/internals/AbstractCoordinator$CoordinatorResponseHandler.class */
    public abstract class CoordinatorResponseHandler<R, T> extends RequestFutureAdapter<ClientResponse, T> {
        protected ClientResponse response;

        /* JADX INFO: Access modifiers changed from: protected */
        public CoordinatorResponseHandler() {
        }

        public abstract void handle(R r, RequestFuture<T> requestFuture);

        @Override // org.apache.kafka.clients.consumer.internals.RequestFutureAdapter
        public void onFailure(RuntimeException runtimeException, RequestFuture<T> requestFuture) {
            if (runtimeException instanceof DisconnectException) {
                AbstractCoordinator.this.markCoordinatorUnknown(true);
            }
            requestFuture.raise(runtimeException);
        }

        @Override // org.apache.kafka.clients.consumer.internals.RequestFutureAdapter
        public void onSuccess(ClientResponse clientResponse, RequestFuture<T> requestFuture) {
            try {
                this.response = clientResponse;
                handle(clientResponse.responseBody(), requestFuture);
            } catch (RuntimeException e) {
                if (requestFuture.isDone()) {
                    return;
                }
                requestFuture.raise(e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/clients/consumer/internals/AbstractCoordinator$FindCoordinatorResponseHandler.class */
    public class FindCoordinatorResponseHandler extends RequestFutureAdapter<ClientResponse, Void> {
        private FindCoordinatorResponseHandler() {
        }

        @Override // org.apache.kafka.clients.consumer.internals.RequestFutureAdapter
        public void onSuccess(ClientResponse clientResponse, RequestFuture<Void> requestFuture) {
            AbstractCoordinator.this.log.debug("Received FindCoordinator response {}", clientResponse);
            AbstractCoordinator.this.clearFindCoordinatorFuture();
            FindCoordinatorResponse findCoordinatorResponse = (FindCoordinatorResponse) clientResponse.responseBody();
            Errors error = findCoordinatorResponse.error();
            if (error != Errors.NONE) {
                if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                    requestFuture.raise(new GroupAuthorizationException(AbstractCoordinator.this.groupId));
                    return;
                } else {
                    AbstractCoordinator.this.log.debug("Group coordinator lookup failed: {}", error.message());
                    requestFuture.raise(error);
                    return;
                }
            }
            synchronized (AbstractCoordinator.this) {
                int id = Integer.MAX_VALUE - findCoordinatorResponse.node().id();
                AbstractCoordinator.this.coordinator = new Node(id, findCoordinatorResponse.node().host(), findCoordinatorResponse.node().port());
                AbstractCoordinator.this.log.info("Discovered group coordinator {}", AbstractCoordinator.this.coordinator);
                AbstractCoordinator.this.client.tryConnect(AbstractCoordinator.this.coordinator);
                AbstractCoordinator.this.heartbeat.resetTimeouts(AbstractCoordinator.this.time.milliseconds());
            }
            requestFuture.complete(null);
        }

        @Override // org.apache.kafka.clients.consumer.internals.RequestFutureAdapter
        public void onFailure(RuntimeException runtimeException, RequestFuture<Void> requestFuture) {
            AbstractCoordinator.this.clearFindCoordinatorFuture();
            super.onFailure(runtimeException, requestFuture);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/kafka/clients/consumer/internals/AbstractCoordinator$Generation.class */
    public static class Generation {
        public static final Generation NO_GENERATION = new Generation(-1, "", null);
        public final int generationId;
        public final String memberId;
        public final String protocol;

        public Generation(int i, String str, String str2) {
            this.generationId = i;
            this.memberId = str;
            this.protocol = str2;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/clients/consumer/internals/AbstractCoordinator$GroupCoordinatorMetrics.class */
    public class GroupCoordinatorMetrics {
        public final String metricGrpName;
        public final Sensor heartbeatLatency;
        public final Sensor joinLatency;
        public final Sensor syncLatency;

        public GroupCoordinatorMetrics(Metrics metrics, String str) {
            this.metricGrpName = str + "-coordinator-metrics";
            this.heartbeatLatency = metrics.sensor("heartbeat-latency");
            this.heartbeatLatency.add(metrics.metricName("heartbeat-response-time-max", this.metricGrpName, "The max time taken to receive a response to a heartbeat request"), new Max());
            this.heartbeatLatency.add(AbstractCoordinator.this.createMeter(metrics, this.metricGrpName, "heartbeat", "heartbeats"));
            this.joinLatency = metrics.sensor("join-latency");
            this.joinLatency.add(metrics.metricName("join-time-avg", this.metricGrpName, "The average time taken for a group rejoin"), new Avg());
            this.joinLatency.add(metrics.metricName("join-time-max", this.metricGrpName, "The max time taken for a group rejoin"), new Max());
            this.joinLatency.add(AbstractCoordinator.this.createMeter(metrics, this.metricGrpName, "join", "group joins"));
            this.syncLatency = metrics.sensor("sync-latency");
            this.syncLatency.add(metrics.metricName("sync-time-avg", this.metricGrpName, "The average time taken for a group sync"), new Avg());
            this.syncLatency.add(metrics.metricName("sync-time-max", this.metricGrpName, "The max time taken for a group sync"), new Max());
            this.syncLatency.add(AbstractCoordinator.this.createMeter(metrics, this.metricGrpName, "sync", "group syncs"));
            metrics.addMetric(metrics.metricName("last-heartbeat-seconds-ago", this.metricGrpName, "The number of seconds since the last controller heartbeat was sent"), new Measurable() { // from class: org.apache.kafka.clients.consumer.internals.AbstractCoordinator.GroupCoordinatorMetrics.1
                @Override // org.apache.kafka.common.metrics.Measurable
                public double measure(MetricConfig metricConfig, long j) {
                    return TimeUnit.SECONDS.convert(j - AbstractCoordinator.this.heartbeat.lastHeartbeatSend(), TimeUnit.MILLISECONDS);
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/clients/consumer/internals/AbstractCoordinator$HeartbeatResponseHandler.class */
    public class HeartbeatResponseHandler extends CoordinatorResponseHandler<HeartbeatResponse, Void> {
        private HeartbeatResponseHandler() {
            super();
        }

        @Override // org.apache.kafka.clients.consumer.internals.AbstractCoordinator.CoordinatorResponseHandler
        public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> requestFuture) {
            AbstractCoordinator.this.sensors.heartbeatLatency.record(this.response.requestLatencyMs());
            Errors error = heartbeatResponse.error();
            if (error == Errors.NONE) {
                AbstractCoordinator.this.log.debug("Received successful Heartbeat response");
                requestFuture.complete(null);
                return;
            }
            if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) {
                AbstractCoordinator.this.log.debug("Attempt to heartbeat since coordinator {} is either not started or not valid.", AbstractCoordinator.this.coordinator());
                AbstractCoordinator.this.markCoordinatorUnknown();
                requestFuture.raise(error);
                return;
            }
            if (error == Errors.REBALANCE_IN_PROGRESS) {
                AbstractCoordinator.this.log.debug("Attempt to heartbeat failed since group is rebalancing");
                AbstractCoordinator.this.requestRejoin();
                requestFuture.raise(Errors.REBALANCE_IN_PROGRESS);
            } else if (error == Errors.ILLEGAL_GENERATION) {
                AbstractCoordinator.this.log.debug("Attempt to heartbeat failed since generation {} is not current", Integer.valueOf(AbstractCoordinator.this.generation.generationId));
                AbstractCoordinator.this.resetGeneration();
                requestFuture.raise(Errors.ILLEGAL_GENERATION);
            } else if (error == Errors.UNKNOWN_MEMBER_ID) {
                AbstractCoordinator.this.log.debug("Attempt to heartbeat failed for since member id {} is not valid.", AbstractCoordinator.this.generation.memberId);
                AbstractCoordinator.this.resetGeneration();
                requestFuture.raise(Errors.UNKNOWN_MEMBER_ID);
            } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                requestFuture.raise(new GroupAuthorizationException(AbstractCoordinator.this.groupId));
            } else {
                requestFuture.raise(new KafkaException("Unexpected error in heartbeat response: " + error.message()));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/clients/consumer/internals/AbstractCoordinator$HeartbeatThread.class */
    public class HeartbeatThread extends KafkaThread {
        private boolean enabled;
        private boolean closed;
        private AtomicReference<RuntimeException> failed;

        private HeartbeatThread() {
            super(AbstractCoordinator.HEARTBEAT_THREAD_PREFIX + (AbstractCoordinator.this.groupId.isEmpty() ? "" : " | " + AbstractCoordinator.this.groupId), true);
            this.enabled = false;
            this.closed = false;
            this.failed = new AtomicReference<>(null);
        }

        public void enable() {
            synchronized (AbstractCoordinator.this) {
                AbstractCoordinator.this.log.debug("Enabling heartbeat thread");
                this.enabled = true;
                AbstractCoordinator.this.heartbeat.resetTimeouts(AbstractCoordinator.this.time.milliseconds());
                AbstractCoordinator.this.notify();
            }
        }

        public void disable() {
            synchronized (AbstractCoordinator.this) {
                AbstractCoordinator.this.log.debug("Disabling heartbeat thread");
                this.enabled = false;
            }
        }

        public void close() {
            synchronized (AbstractCoordinator.this) {
                this.closed = true;
                AbstractCoordinator.this.notify();
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean hasFailed() {
            return this.failed.get() != null;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public RuntimeException failureCause() {
            return this.failed.get();
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                try {
                    try {
                        try {
                            AbstractCoordinator.this.log.debug("Heartbeat thread started");
                            while (true) {
                                synchronized (AbstractCoordinator.this) {
                                    if (this.closed) {
                                        AbstractCoordinator.this.log.debug("Heartbeat thread has closed");
                                        return;
                                    }
                                    if (!this.enabled) {
                                        AbstractCoordinator.this.wait();
                                    } else if (AbstractCoordinator.this.state != MemberState.STABLE) {
                                        disable();
                                    } else {
                                        AbstractCoordinator.this.client.pollNoWakeup();
                                        long milliseconds = AbstractCoordinator.this.time.milliseconds();
                                        if (AbstractCoordinator.this.coordinatorUnknown()) {
                                            if (AbstractCoordinator.this.findCoordinatorFuture != null || AbstractCoordinator.this.lookupCoordinator().failed()) {
                                                AbstractCoordinator.this.wait(AbstractCoordinator.this.retryBackoffMs);
                                            }
                                        } else if (AbstractCoordinator.this.heartbeat.sessionTimeoutExpired(milliseconds)) {
                                            AbstractCoordinator.this.markCoordinatorUnknown();
                                        } else if (AbstractCoordinator.this.heartbeat.pollTimeoutExpired(milliseconds)) {
                                            AbstractCoordinator.this.maybeLeaveGroup();
                                        } else if (AbstractCoordinator.this.heartbeat.shouldHeartbeat(milliseconds)) {
                                            AbstractCoordinator.this.heartbeat.sentHeartbeat(milliseconds);
                                            AbstractCoordinator.this.sendHeartbeatRequest().addListener(new RequestFutureListener<Void>() { // from class: org.apache.kafka.clients.consumer.internals.AbstractCoordinator.HeartbeatThread.1
                                                @Override // org.apache.kafka.clients.consumer.internals.RequestFutureListener
                                                public void onSuccess(Void r5) {
                                                    synchronized (AbstractCoordinator.this) {
                                                        AbstractCoordinator.this.heartbeat.receiveHeartbeat(AbstractCoordinator.this.time.milliseconds());
                                                    }
                                                }

                                                @Override // org.apache.kafka.clients.consumer.internals.RequestFutureListener
                                                public void onFailure(RuntimeException runtimeException) {
                                                    synchronized (AbstractCoordinator.this) {
                                                        if (runtimeException instanceof RebalanceInProgressException) {
                                                            AbstractCoordinator.this.heartbeat.receiveHeartbeat(AbstractCoordinator.this.time.milliseconds());
                                                        } else {
                                                            AbstractCoordinator.this.heartbeat.failHeartbeat();
                                                            AbstractCoordinator.this.notify();
                                                        }
                                                    }
                                                }
                                            });
                                        } else {
                                            AbstractCoordinator.this.wait(AbstractCoordinator.this.retryBackoffMs);
                                        }
                                    }
                                }
                            }
                        } catch (Throwable th) {
                            AbstractCoordinator.this.log.error("Heartbeat thread failed due to unexpected error", th);
                            if (th instanceof RuntimeException) {
                                this.failed.set((RuntimeException) th);
                            } else {
                                this.failed.set(new RuntimeException(th));
                            }
                            AbstractCoordinator.this.log.debug("Heartbeat thread has closed");
                        }
                    } catch (InterruptedException | InterruptException e) {
                        Thread.interrupted();
                        AbstractCoordinator.this.log.error("Unexpected interrupt received in heartbeat thread", e);
                        this.failed.set(new RuntimeException(e));
                        AbstractCoordinator.this.log.debug("Heartbeat thread has closed");
                    }
                } catch (AuthenticationException e2) {
                    AbstractCoordinator.this.log.error("An authentication error occurred in the heartbeat thread", e2);
                    this.failed.set(e2);
                    AbstractCoordinator.this.log.debug("Heartbeat thread has closed");
                } catch (GroupAuthorizationException e3) {
                    AbstractCoordinator.this.log.error("A group authorization error occurred in the heartbeat thread", e3);
                    this.failed.set(e3);
                    AbstractCoordinator.this.log.debug("Heartbeat thread has closed");
                }
            } catch (Throwable th2) {
                AbstractCoordinator.this.log.debug("Heartbeat thread has closed");
                throw th2;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/clients/consumer/internals/AbstractCoordinator$JoinGroupResponseHandler.class */
    public class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponse, ByteBuffer> {
        private JoinGroupResponseHandler() {
            super();
        }

        @Override // org.apache.kafka.clients.consumer.internals.AbstractCoordinator.CoordinatorResponseHandler
        public void handle(JoinGroupResponse joinGroupResponse, RequestFuture<ByteBuffer> requestFuture) {
            Errors error = joinGroupResponse.error();
            if (error == Errors.NONE) {
                AbstractCoordinator.this.log.debug("Received successful JoinGroup response: {}", joinGroupResponse);
                AbstractCoordinator.this.sensors.joinLatency.record(this.response.requestLatencyMs());
                synchronized (AbstractCoordinator.this) {
                    if (AbstractCoordinator.this.state != MemberState.REBALANCING) {
                        requestFuture.raise(new UnjoinedGroupException());
                    } else {
                        AbstractCoordinator.this.generation = new Generation(joinGroupResponse.generationId(), joinGroupResponse.memberId(), joinGroupResponse.groupProtocol());
                        if (joinGroupResponse.isLeader()) {
                            AbstractCoordinator.this.onJoinLeader(joinGroupResponse).chain(requestFuture);
                        } else {
                            AbstractCoordinator.this.onJoinFollower().chain(requestFuture);
                        }
                    }
                }
                return;
            }
            if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
                AbstractCoordinator.this.log.debug("Attempt to join group rejected since coordinator {} is loading the group.", AbstractCoordinator.this.coordinator());
                requestFuture.raise(error);
                return;
            }
            if (error == Errors.UNKNOWN_MEMBER_ID) {
                AbstractCoordinator.this.resetGeneration();
                AbstractCoordinator.this.log.debug("Attempt to join group failed due to unknown member id.");
                requestFuture.raise(Errors.UNKNOWN_MEMBER_ID);
                return;
            }
            if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) {
                AbstractCoordinator.this.markCoordinatorUnknown();
                AbstractCoordinator.this.log.debug("Attempt to join group failed due to obsolete coordinator information: {}", error.message());
                requestFuture.raise(error);
            } else if (error == Errors.INCONSISTENT_GROUP_PROTOCOL || error == Errors.INVALID_SESSION_TIMEOUT || error == Errors.INVALID_GROUP_ID) {
                AbstractCoordinator.this.log.error("Attempt to join group failed due to fatal error: {}", error.message());
                requestFuture.raise(error);
            } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                requestFuture.raise(new GroupAuthorizationException(AbstractCoordinator.this.groupId));
            } else {
                requestFuture.raise(new KafkaException("Unexpected error in join group response: " + error.message()));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/clients/consumer/internals/AbstractCoordinator$LeaveGroupResponseHandler.class */
    public class LeaveGroupResponseHandler extends CoordinatorResponseHandler<LeaveGroupResponse, Void> {
        private LeaveGroupResponseHandler() {
            super();
        }

        @Override // org.apache.kafka.clients.consumer.internals.AbstractCoordinator.CoordinatorResponseHandler
        public void handle(LeaveGroupResponse leaveGroupResponse, RequestFuture<Void> requestFuture) {
            Errors error = leaveGroupResponse.error();
            if (error == Errors.NONE) {
                AbstractCoordinator.this.log.debug("LeaveGroup request returned successfully");
                requestFuture.complete(null);
            } else {
                AbstractCoordinator.this.log.debug("LeaveGroup request failed with error: {}", error.message());
                requestFuture.raise(error);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/clients/consumer/internals/AbstractCoordinator$MemberState.class */
    public enum MemberState {
        UNJOINED,
        REBALANCING,
        STABLE
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/clients/consumer/internals/AbstractCoordinator$SyncGroupResponseHandler.class */
    public class SyncGroupResponseHandler extends CoordinatorResponseHandler<SyncGroupResponse, ByteBuffer> {
        private SyncGroupResponseHandler() {
            super();
        }

        @Override // org.apache.kafka.clients.consumer.internals.AbstractCoordinator.CoordinatorResponseHandler
        public void handle(SyncGroupResponse syncGroupResponse, RequestFuture<ByteBuffer> requestFuture) {
            Errors error = syncGroupResponse.error();
            if (error == Errors.NONE) {
                AbstractCoordinator.this.sensors.syncLatency.record(this.response.requestLatencyMs());
                requestFuture.complete(syncGroupResponse.memberAssignment());
                return;
            }
            AbstractCoordinator.this.requestRejoin();
            if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                requestFuture.raise(new GroupAuthorizationException(AbstractCoordinator.this.groupId));
                return;
            }
            if (error == Errors.REBALANCE_IN_PROGRESS) {
                AbstractCoordinator.this.log.debug("SyncGroup failed because the group began another rebalance");
                requestFuture.raise(error);
                return;
            }
            if (error == Errors.UNKNOWN_MEMBER_ID || error == Errors.ILLEGAL_GENERATION) {
                AbstractCoordinator.this.log.debug("SyncGroup failed: {}", error.message());
                AbstractCoordinator.this.resetGeneration();
                requestFuture.raise(error);
            } else {
                if (error != Errors.COORDINATOR_NOT_AVAILABLE && error != Errors.NOT_COORDINATOR) {
                    requestFuture.raise(new KafkaException("Unexpected error from SyncGroup: " + error.message()));
                    return;
                }
                AbstractCoordinator.this.log.debug("SyncGroup failed: {}", error.message());
                AbstractCoordinator.this.markCoordinatorUnknown();
                requestFuture.raise(error);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/clients/consumer/internals/AbstractCoordinator$UnjoinedGroupException.class */
    public static class UnjoinedGroupException extends RetriableException {
        private UnjoinedGroupException() {
        }
    }

    public AbstractCoordinator(LogContext logContext, ConsumerNetworkClient consumerNetworkClient, String str, int i, int i2, int i3, Metrics metrics, String str2, Time time, long j, boolean z) {
        this.log = logContext.logger(AbstractCoordinator.class);
        this.client = consumerNetworkClient;
        this.time = time;
        this.groupId = str;
        this.rebalanceTimeoutMs = i;
        this.sessionTimeoutMs = i2;
        this.leaveGroupOnClose = z;
        this.heartbeat = new Heartbeat(i2, i3, i, j);
        this.sensors = new GroupCoordinatorMetrics(metrics, str2);
        this.retryBackoffMs = j;
    }

    protected abstract String protocolType();

    protected abstract List<JoinGroupRequest.ProtocolMetadata> metadata();

    protected abstract void onJoinPrepare(int i, String str);

    protected abstract Map<String, ByteBuffer> performAssignment(String str, String str2, Map<String, ByteBuffer> map);

    protected abstract void onJoinComplete(int i, String str, String str2, ByteBuffer byteBuffer);

    public synchronized void ensureCoordinatorReady() {
        ensureCoordinatorReady(0L, Long.MAX_VALUE);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized boolean ensureCoordinatorReady(long j, long j2) {
        long j3 = j2;
        while (coordinatorUnknown()) {
            RequestFuture<Void> lookupCoordinator = lookupCoordinator();
            this.client.poll(lookupCoordinator, j3);
            if (lookupCoordinator.failed()) {
                if (!lookupCoordinator.isRetriable()) {
                    throw lookupCoordinator.exception();
                }
                long milliseconds = j2 - (this.time.milliseconds() - j);
                if (milliseconds <= 0) {
                    break;
                }
                this.log.debug("Coordinator discovery failed, refreshing metadata");
                this.client.awaitMetadataUpdate(milliseconds);
            } else if (this.coordinator != null && this.client.connectionFailed(this.coordinator)) {
                markCoordinatorUnknown();
                this.time.sleep(this.retryBackoffMs);
            }
            j3 = j2 - (this.time.milliseconds() - j);
            if (j3 <= 0) {
                break;
            }
        }
        return !coordinatorUnknown();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized RequestFuture<Void> lookupCoordinator() {
        if (this.findCoordinatorFuture == null) {
            Node leastLoadedNode = this.client.leastLoadedNode();
            if (leastLoadedNode == null) {
                this.log.debug("No broker available to send FindCoordinator request");
                return RequestFuture.noBrokersAvailable();
            }
            this.findCoordinatorFuture = sendFindCoordinatorRequest(leastLoadedNode);
        }
        return this.findCoordinatorFuture;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void clearFindCoordinatorFuture() {
        this.findCoordinatorFuture = null;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized boolean needRejoin() {
        return this.rejoinNeeded;
    }

    private synchronized boolean rejoinIncomplete() {
        return this.joinFuture != null;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized void pollHeartbeat(long j) {
        if (this.heartbeatThread != null) {
            if (this.heartbeatThread.hasFailed()) {
                RuntimeException failureCause = this.heartbeatThread.failureCause();
                this.heartbeatThread = null;
                throw failureCause;
            }
            if (this.heartbeat.shouldHeartbeat(j)) {
                notify();
            }
            this.heartbeat.poll(j);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized long timeToNextHeartbeat(long j) {
        if (this.state == MemberState.UNJOINED) {
            return Long.MAX_VALUE;
        }
        return this.heartbeat.timeToNextHeartbeat(j);
    }

    public void ensureActiveGroup() {
        ensureCoordinatorReady();
        startHeartbeatThreadIfNeeded();
        joinGroupIfNeeded();
    }

    private synchronized void startHeartbeatThreadIfNeeded() {
        if (this.heartbeatThread == null) {
            this.heartbeatThread = new HeartbeatThread();
            this.heartbeatThread.start();
        }
    }

    private synchronized void disableHeartbeatThread() {
        if (this.heartbeatThread != null) {
            this.heartbeatThread.disable();
        }
    }

    private void closeHeartbeatThread() {
        synchronized (this) {
            if (this.heartbeatThread == null) {
                return;
            }
            this.heartbeatThread.close();
            HeartbeatThread heartbeatThread = this.heartbeatThread;
            this.heartbeatThread = null;
            try {
                heartbeatThread.join();
            } catch (InterruptedException e) {
                this.log.warn("Interrupted while waiting for consumer heartbeat thread to close");
                throw new InterruptException(e);
            }
        }
    }

    void joinGroupIfNeeded() {
        while (true) {
            if (!needRejoin() && !rejoinIncomplete()) {
                return;
            }
            ensureCoordinatorReady();
            if (this.needsJoinPrepare) {
                onJoinPrepare(this.generation.generationId, this.generation.memberId);
                this.needsJoinPrepare = false;
            }
            RequestFuture<ByteBuffer> initiateJoinGroup = initiateJoinGroup();
            this.client.poll(initiateJoinGroup);
            if (initiateJoinGroup.succeeded()) {
                onJoinComplete(this.generation.generationId, this.generation.memberId, this.generation.protocol, initiateJoinGroup.value());
                resetJoinGroupFuture();
                this.needsJoinPrepare = true;
            } else {
                resetJoinGroupFuture();
                RuntimeException exception = initiateJoinGroup.exception();
                if (!(exception instanceof UnknownMemberIdException) && !(exception instanceof RebalanceInProgressException) && !(exception instanceof IllegalGenerationException)) {
                    if (!initiateJoinGroup.isRetriable()) {
                        throw exception;
                    }
                    this.time.sleep(this.retryBackoffMs);
                }
            }
        }
    }

    private synchronized void resetJoinGroupFuture() {
        this.joinFuture = null;
    }

    private synchronized RequestFuture<ByteBuffer> initiateJoinGroup() {
        if (this.joinFuture == null) {
            disableHeartbeatThread();
            this.state = MemberState.REBALANCING;
            this.joinFuture = sendJoinGroupRequest();
            this.joinFuture.addListener(new RequestFutureListener<ByteBuffer>() { // from class: org.apache.kafka.clients.consumer.internals.AbstractCoordinator.1
                @Override // org.apache.kafka.clients.consumer.internals.RequestFutureListener
                public void onSuccess(ByteBuffer byteBuffer) {
                    synchronized (AbstractCoordinator.this) {
                        AbstractCoordinator.this.log.info("Successfully joined group with generation {}", Integer.valueOf(AbstractCoordinator.this.generation.generationId));
                        AbstractCoordinator.this.state = MemberState.STABLE;
                        AbstractCoordinator.this.rejoinNeeded = false;
                        if (AbstractCoordinator.this.heartbeatThread != null) {
                            AbstractCoordinator.this.heartbeatThread.enable();
                        }
                    }
                }

                @Override // org.apache.kafka.clients.consumer.internals.RequestFutureListener
                public void onFailure(RuntimeException runtimeException) {
                    synchronized (AbstractCoordinator.this) {
                        AbstractCoordinator.this.state = MemberState.UNJOINED;
                    }
                }
            });
        }
        return this.joinFuture;
    }

    private RequestFuture<ByteBuffer> sendJoinGroupRequest() {
        if (coordinatorUnknown()) {
            return RequestFuture.coordinatorNotAvailable();
        }
        this.log.info("(Re-)joining group");
        JoinGroupRequest.Builder rebalanceTimeout = new JoinGroupRequest.Builder(this.groupId, this.sessionTimeoutMs, this.generation.memberId, protocolType(), metadata()).setRebalanceTimeout(this.rebalanceTimeoutMs);
        this.log.debug("Sending JoinGroup ({}) to coordinator {}", rebalanceTimeout, this.coordinator);
        return this.client.send(this.coordinator, rebalanceTimeout).compose(new JoinGroupResponseHandler());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public RequestFuture<ByteBuffer> onJoinFollower() {
        SyncGroupRequest.Builder builder = new SyncGroupRequest.Builder(this.groupId, this.generation.generationId, this.generation.memberId, Collections.emptyMap());
        this.log.debug("Sending follower SyncGroup to coordinator {}: {}", this.coordinator, builder);
        return sendSyncGroupRequest(builder);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinGroupResponse) {
        try {
            SyncGroupRequest.Builder builder = new SyncGroupRequest.Builder(this.groupId, this.generation.generationId, this.generation.memberId, performAssignment(joinGroupResponse.leaderId(), joinGroupResponse.groupProtocol(), joinGroupResponse.members()));
            this.log.debug("Sending leader SyncGroup to coordinator {}: {}", this.coordinator, builder);
            return sendSyncGroupRequest(builder);
        } catch (RuntimeException e) {
            return RequestFuture.failure(e);
        }
    }

    private RequestFuture<ByteBuffer> sendSyncGroupRequest(SyncGroupRequest.Builder builder) {
        return coordinatorUnknown() ? RequestFuture.coordinatorNotAvailable() : this.client.send(this.coordinator, builder).compose(new SyncGroupResponseHandler());
    }

    private RequestFuture<Void> sendFindCoordinatorRequest(Node node) {
        this.log.debug("Sending FindCoordinator request to broker {}", node);
        return this.client.send(node, new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, this.groupId)).compose(new FindCoordinatorResponseHandler());
    }

    public boolean coordinatorUnknown() {
        return checkAndGetCoordinator() == null;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized Node checkAndGetCoordinator() {
        if (this.coordinator == null || !this.client.connectionFailed(this.coordinator)) {
            return this.coordinator;
        }
        markCoordinatorUnknown(true);
        return null;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized Node coordinator() {
        return this.coordinator;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized void markCoordinatorUnknown() {
        markCoordinatorUnknown(false);
    }

    protected synchronized void markCoordinatorUnknown(boolean z) {
        if (this.coordinator != null) {
            this.log.info("Group coordinator {} is unavailable or invalid, will attempt rediscovery", this.coordinator);
            Node node = this.coordinator;
            this.coordinator = null;
            if (z) {
                return;
            }
            this.client.disconnectAsync(node);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized Generation generation() {
        if (this.state != MemberState.STABLE) {
            return null;
        }
        return this.generation;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized void resetGeneration() {
        this.generation = Generation.NO_GENERATION;
        this.rejoinNeeded = true;
        this.state = MemberState.UNJOINED;
    }

    protected synchronized void requestRejoin() {
        this.rejoinNeeded = true;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public final void close() {
        close(0L);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void close(long j) {
        try {
            closeHeartbeatThread();
            synchronized (this) {
                if (this.leaveGroupOnClose) {
                    maybeLeaveGroup();
                }
                Node checkAndGetCoordinator = checkAndGetCoordinator();
                if (checkAndGetCoordinator != null && !this.client.awaitPendingRequests(checkAndGetCoordinator, j)) {
                    this.log.warn("Close timed out with {} pending requests to coordinator, terminating client connections", Integer.valueOf(this.client.pendingRequestCount(checkAndGetCoordinator)));
                }
            }
        } catch (Throwable th) {
            synchronized (this) {
                if (this.leaveGroupOnClose) {
                    maybeLeaveGroup();
                }
                Node checkAndGetCoordinator2 = checkAndGetCoordinator();
                if (checkAndGetCoordinator2 != null && !this.client.awaitPendingRequests(checkAndGetCoordinator2, j)) {
                    this.log.warn("Close timed out with {} pending requests to coordinator, terminating client connections", Integer.valueOf(this.client.pendingRequestCount(checkAndGetCoordinator2)));
                }
                throw th;
            }
        }
    }

    public synchronized void maybeLeaveGroup() {
        if (!coordinatorUnknown() && this.state != MemberState.UNJOINED && this.generation != Generation.NO_GENERATION) {
            this.log.debug("Sending LeaveGroup request to coordinator {}", this.coordinator);
            this.client.send(this.coordinator, new LeaveGroupRequest.Builder(this.groupId, this.generation.memberId)).compose(new LeaveGroupResponseHandler());
            this.client.pollNoWakeup();
        }
        resetGeneration();
    }

    synchronized RequestFuture<Void> sendHeartbeatRequest() {
        this.log.debug("Sending Heartbeat request to coordinator {}", this.coordinator);
        return this.client.send(this.coordinator, new HeartbeatRequest.Builder(this.groupId, this.generation.generationId, this.generation.memberId)).compose(new HeartbeatResponseHandler());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Meter createMeter(Metrics metrics, String str, String str2, String str3) {
        return new Meter(new Count(), metrics.metricName(str2 + "-rate", str, String.format("The number of %s per second", str3)), metrics.metricName(str2 + "-total", str, String.format("The total number of %s", str3)));
    }
}
