/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.internals.CommitRequestManager;
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager;
import org.apache.kafka.clients.consumer.internals.DefaultBackgroundThread;
import org.apache.kafka.clients.consumer.internals.ErrorEventHandler;
import org.apache.kafka.clients.consumer.internals.GroupState;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
import org.apache.kafka.clients.consumer.internals.RequestManager;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
import org.apache.kafka.clients.consumer.internals.events.NoopApplicationEvent;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.FindCoordinatorRequestData;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class DefaultBackgroundThreadTest {
    private static final long RETRY_BACKOFF_MS = 100L;
    private final Properties properties = new Properties();
    private MockTime time;
    private ConsumerMetadata metadata;
    private NetworkClientDelegate networkClient;
    private BlockingQueue<BackgroundEvent> backgroundEventsQueue;
    private BlockingQueue<ApplicationEvent> applicationEventsQueue;
    private ApplicationEventProcessor processor;
    private CoordinatorRequestManager coordinatorManager;
    private ErrorEventHandler errorEventHandler;
    private SubscriptionState subscriptionState;
    private int requestTimeoutMs = 500;
    private GroupState groupState;
    private CommitRequestManager commitManager;

    @BeforeEach
    public void setup() {
        this.time = new MockTime(0L);
        this.metadata = (ConsumerMetadata)Mockito.mock(ConsumerMetadata.class);
        this.networkClient = (NetworkClientDelegate)Mockito.mock(NetworkClientDelegate.class);
        this.applicationEventsQueue = (BlockingQueue)Mockito.mock(BlockingQueue.class);
        this.backgroundEventsQueue = (BlockingQueue)Mockito.mock(BlockingQueue.class);
        this.processor = (ApplicationEventProcessor)Mockito.mock(ApplicationEventProcessor.class);
        this.coordinatorManager = (CoordinatorRequestManager)Mockito.mock(CoordinatorRequestManager.class);
        this.errorEventHandler = (ErrorEventHandler)Mockito.mock(ErrorEventHandler.class);
        this.subscriptionState = (SubscriptionState)Mockito.mock(SubscriptionState.class);
        GroupRebalanceConfig rebalanceConfig = new GroupRebalanceConfig(100, 100, 100, "group_id", Optional.empty(), 100L, true);
        this.groupState = new GroupState(rebalanceConfig);
        this.commitManager = (CommitRequestManager)Mockito.mock(CommitRequestManager.class);
        this.properties.put("key.deserializer", StringDeserializer.class);
        this.properties.put("value.deserializer", StringDeserializer.class);
        this.properties.put("retry.backoff.ms", (Object)100L);
    }

    @Test
    public void testStartupAndTearDown() throws InterruptedException {
        Mockito.when((Object)this.coordinatorManager.poll(ArgumentMatchers.anyLong())).thenReturn((Object)this.mockPollCoordinatorResult());
        Mockito.when((Object)this.commitManager.poll(ArgumentMatchers.anyLong())).thenReturn((Object)this.mockPollCommitResult());
        DefaultBackgroundThread backgroundThread = this.mockBackgroundThread();
        backgroundThread.start();
        TestUtils.waitForCondition(() -> ((DefaultBackgroundThread)backgroundThread).isRunning(), "Failed awaiting for the background thread to be running");
        backgroundThread.close();
        Assertions.assertFalse((boolean)backgroundThread.isRunning());
    }

    @Test
    public void testApplicationEvent() {
        this.applicationEventsQueue = new LinkedBlockingQueue<ApplicationEvent>();
        this.backgroundEventsQueue = new LinkedBlockingQueue<BackgroundEvent>();
        Mockito.when((Object)this.coordinatorManager.poll(ArgumentMatchers.anyLong())).thenReturn((Object)this.mockPollCoordinatorResult());
        Mockito.when((Object)this.commitManager.poll(ArgumentMatchers.anyLong())).thenReturn((Object)this.mockPollCommitResult());
        DefaultBackgroundThread backgroundThread = this.mockBackgroundThread();
        NoopApplicationEvent e = new NoopApplicationEvent("noop event");
        this.applicationEventsQueue.add((ApplicationEvent)e);
        backgroundThread.runOnce();
        ((ApplicationEventProcessor)Mockito.verify((Object)this.processor, (VerificationMode)Mockito.times((int)1))).process((ApplicationEvent)e);
        backgroundThread.close();
    }

    @Test
    public void testMetadataUpdateEvent() {
        this.applicationEventsQueue = new LinkedBlockingQueue<ApplicationEvent>();
        this.backgroundEventsQueue = new LinkedBlockingQueue<BackgroundEvent>();
        this.processor = new ApplicationEventProcessor(this.backgroundEventsQueue, this.mockRequestManagerRegistry(), this.metadata);
        Mockito.when((Object)this.coordinatorManager.poll(ArgumentMatchers.anyLong())).thenReturn((Object)this.mockPollCoordinatorResult());
        Mockito.when((Object)this.commitManager.poll(ArgumentMatchers.anyLong())).thenReturn((Object)this.mockPollCommitResult());
        DefaultBackgroundThread backgroundThread = this.mockBackgroundThread();
        NewTopicsMetadataUpdateRequestEvent e = new NewTopicsMetadataUpdateRequestEvent();
        this.applicationEventsQueue.add((ApplicationEvent)e);
        backgroundThread.runOnce();
        ((ConsumerMetadata)Mockito.verify((Object)this.metadata)).requestUpdateForNewTopics();
        backgroundThread.close();
    }

    @Test
    public void testCommitEvent() {
        this.applicationEventsQueue = new LinkedBlockingQueue<ApplicationEvent>();
        this.backgroundEventsQueue = new LinkedBlockingQueue<BackgroundEvent>();
        Mockito.when((Object)this.coordinatorManager.poll(ArgumentMatchers.anyLong())).thenReturn((Object)this.mockPollCoordinatorResult());
        Mockito.when((Object)this.commitManager.poll(ArgumentMatchers.anyLong())).thenReturn((Object)this.mockPollCommitResult());
        DefaultBackgroundThread backgroundThread = this.mockBackgroundThread();
        CommitApplicationEvent e = new CommitApplicationEvent(new HashMap());
        this.applicationEventsQueue.add((ApplicationEvent)e);
        backgroundThread.runOnce();
        ((ApplicationEventProcessor)Mockito.verify((Object)this.processor)).process((ApplicationEvent)ArgumentMatchers.any(CommitApplicationEvent.class));
        backgroundThread.close();
    }

    @Test
    public void testAssignmentChangeEvent() {
        this.applicationEventsQueue = new LinkedBlockingQueue<ApplicationEvent>();
        this.backgroundEventsQueue = new LinkedBlockingQueue<BackgroundEvent>();
        this.processor = (ApplicationEventProcessor)Mockito.spy((Object)new ApplicationEventProcessor(this.backgroundEventsQueue, this.mockRequestManagerRegistry(), this.metadata));
        DefaultBackgroundThread backgroundThread = this.mockBackgroundThread();
        HashMap<TopicPartition, OffsetAndMetadata> offset = this.mockTopicPartitionOffset();
        long currentTimeMs = this.time.milliseconds();
        AssignmentChangeApplicationEvent e = new AssignmentChangeApplicationEvent(offset, currentTimeMs);
        this.applicationEventsQueue.add((ApplicationEvent)e);
        Mockito.when((Object)this.coordinatorManager.poll(ArgumentMatchers.anyLong())).thenReturn((Object)this.mockPollCoordinatorResult());
        Mockito.when((Object)this.commitManager.poll(ArgumentMatchers.anyLong())).thenReturn((Object)this.mockPollCommitResult());
        backgroundThread.runOnce();
        ((ApplicationEventProcessor)Mockito.verify((Object)this.processor)).process((ApplicationEvent)ArgumentMatchers.any(AssignmentChangeApplicationEvent.class));
        ((NetworkClientDelegate)Mockito.verify((Object)this.networkClient, (VerificationMode)Mockito.times((int)1))).poll(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong());
        ((CommitRequestManager)Mockito.verify((Object)this.commitManager, (VerificationMode)Mockito.times((int)1))).updateAutoCommitTimer(currentTimeMs);
        ((CommitRequestManager)Mockito.verify((Object)this.commitManager, (VerificationMode)Mockito.times((int)1))).maybeAutoCommit(offset);
        backgroundThread.close();
    }

    @Test
    void testFindCoordinator() {
        DefaultBackgroundThread backgroundThread = this.mockBackgroundThread();
        Mockito.when((Object)this.coordinatorManager.poll(ArgumentMatchers.anyLong())).thenReturn((Object)this.mockPollCoordinatorResult());
        Mockito.when((Object)this.commitManager.poll(ArgumentMatchers.anyLong())).thenReturn((Object)this.mockPollCommitResult());
        backgroundThread.runOnce();
        ((CoordinatorRequestManager)Mockito.verify((Object)this.coordinatorManager, (VerificationMode)Mockito.times((int)1))).poll(ArgumentMatchers.anyLong());
        ((NetworkClientDelegate)Mockito.verify((Object)this.networkClient, (VerificationMode)Mockito.times((int)1))).poll(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong());
        backgroundThread.close();
    }

    @Test
    void testPollResultTimer() {
        DefaultBackgroundThread backgroundThread = this.mockBackgroundThread();
        NetworkClientDelegate.PollResult success = new NetworkClientDelegate.PollResult(10L, Collections.singletonList(DefaultBackgroundThreadTest.findCoordinatorUnsentRequest(this.time, this.requestTimeoutMs)));
        Assertions.assertEquals((long)10L, (long)backgroundThread.handlePollResult(success));
        NetworkClientDelegate.PollResult failure = new NetworkClientDelegate.PollResult(10L, new ArrayList());
        Assertions.assertEquals((long)10L, (long)backgroundThread.handlePollResult(failure));
    }

    private HashMap<TopicPartition, OffsetAndMetadata> mockTopicPartitionOffset() {
        TopicPartition t0 = new TopicPartition("t0", 2);
        TopicPartition t1 = new TopicPartition("t0", 3);
        HashMap<TopicPartition, OffsetAndMetadata> topicPartitionOffsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        topicPartitionOffsets.put(t0, new OffsetAndMetadata(10L));
        topicPartitionOffsets.put(t1, new OffsetAndMetadata(20L));
        return topicPartitionOffsets;
    }

    private Map<RequestManager.Type, Optional<RequestManager>> mockRequestManagerRegistry() {
        HashMap<RequestManager.Type, Optional<RequestManager>> registry = new HashMap<RequestManager.Type, Optional<RequestManager>>();
        registry.put(RequestManager.Type.COORDINATOR, Optional.of(this.coordinatorManager));
        registry.put(RequestManager.Type.COMMIT, Optional.of(this.commitManager));
        return registry;
    }

    private static NetworkClientDelegate.UnsentRequest findCoordinatorUnsentRequest(Time time, long timeout) {
        NetworkClientDelegate.UnsentRequest req = new NetworkClientDelegate.UnsentRequest((AbstractRequest.Builder)new FindCoordinatorRequest.Builder(new FindCoordinatorRequestData().setKeyType(FindCoordinatorRequest.CoordinatorType.TRANSACTION.id()).setKey("foobar")), Optional.empty());
        req.setTimer(time, timeout);
        return req;
    }

    private DefaultBackgroundThread mockBackgroundThread() {
        return new DefaultBackgroundThread((Time)this.time, new ConsumerConfig(this.properties), new LogContext(), this.applicationEventsQueue, this.backgroundEventsQueue, this.subscriptionState, this.errorEventHandler, this.processor, this.metadata, this.networkClient, this.groupState, this.coordinatorManager, this.commitManager);
    }

    private NetworkClientDelegate.PollResult mockPollCoordinatorResult() {
        return new NetworkClientDelegate.PollResult(100L, Collections.singletonList(DefaultBackgroundThreadTest.findCoordinatorUnsentRequest(this.time, this.requestTimeoutMs)));
    }

    private NetworkClientDelegate.PollResult mockPollCommitResult() {
        return new NetworkClientDelegate.PollResult(100L, Collections.singletonList(DefaultBackgroundThreadTest.findCoordinatorUnsentRequest(this.time, this.requestTimeoutMs)));
    }
}

