/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.statetransfer;

import java.util.concurrent.CompletionStage;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.infinispan.Cache;
import org.infinispan.commons.util.IntSet;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.distribution.BaseDistFunctionalTest;
import org.infinispan.distribution.ch.ConsistentHashFactory;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.statetransfer.StateConsumer;
import org.infinispan.statetransfer.StateProvider;
import org.infinispan.statetransfer.StateTransferManager;
import org.infinispan.test.AbstractCacheTest;
import org.infinispan.test.TestException;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.CheckPoint;
import org.infinispan.test.fwk.CleanupAfterMethod;
import org.infinispan.topology.CacheTopology;
import org.infinispan.transaction.LockingMode;
import org.infinispan.transaction.TransactionMode;
import org.infinispan.util.ControlledConsistentHashFactory;
import org.mockito.AdditionalAnswers;
import org.mockito.ArgumentMatchers;
import org.mockito.MockSettings;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;

@Test(groups={"functional"}, testName="statetransfer.StateTransferSegmentMetricsTest")
@CleanupAfterMethod
public class StateTransferSegmentMetricsTest
extends BaseDistFunctionalTest<String, String> {
    private final int MAX_NUM_SEGMENTS = 6;
    private final int[][] owners1And2 = new int[][]{{0, 1}, {0, 1}, {0, 1}, {0, 1}, {0, 1}, {0, 1}};
    private final int[][] owners1And3 = new int[][]{{0, 2}, {0, 2}, {0, 2}, {0, 2}, {0, 2}, {0, 2}};
    private final ControlledConsistentHashFactory factory = new ControlledConsistentHashFactory.Default(this.owners1And2);

    public StateTransferSegmentMetricsTest() {
        this.transactional = true;
        this.INIT_CLUSTER_SIZE = 3;
        this.numOwners = 2;
        this.performRehashing = true;
        this.cleanup = AbstractCacheTest.CleanupPhase.AFTER_METHOD;
    }

    @AfterMethod
    public void resetFactory() {
        this.factory.setOwnerIndexes(this.owners1And2);
    }

    @Override
    protected ConfigurationBuilder buildConfiguration() {
        ConfigurationBuilder builder = super.buildConfiguration();
        builder.clustering().hash().consistentHashFactory((ConsistentHashFactory)this.factory).numSegments(6).transaction().transactionMode(TransactionMode.TRANSACTIONAL).lockingMode(LockingMode.OPTIMISTIC);
        return builder;
    }

    @Test
    public void testSegmentCounterDuringStateTransfer() throws Exception {
        StateTransferManager manager = TestingUtil.extractComponent(this.c3, StateTransferManager.class);
        CheckPoint checkPoint = new CheckPoint();
        this.waitTransactionRequest(this.c1, checkPoint);
        this.waitRequestingSegments(this.c3, checkPoint);
        this.waitApplyingSegmentBatch(this.c3, checkPoint);
        this.factory.setOwnerIndexes(this.owners1And3);
        EmbeddedCacheManager cm = this.addClusterEnabledCacheManager();
        cm.defineConfiguration(this.cacheName, this.configuration.build());
        Future<Void> join = this.fork(() -> {
            this.waitForClusterToForm(this.cacheName);
            log.debug((Object)"4th has joined");
            return null;
        });
        checkPoint.awaitStrict("topology_update_notify_invoked_" + String.valueOf(this.c3), 10L, TimeUnit.SECONDS);
        checkPoint.awaitStrict("transactions_requested_invoked_" + String.valueOf(this.c1), 10L, TimeUnit.SECONDS);
        Assert.assertEquals((long)manager.getInflightTransactionalSegmentCount(), (long)6L);
        Assert.assertTrue((boolean)manager.isStateTransferInProgress());
        checkPoint.triggerForever("transactions_requested_released_" + String.valueOf(this.c1));
        checkPoint.awaitStrict("topology_update_notify_executed_" + String.valueOf(this.c3), 10L, TimeUnit.SECONDS);
        Assert.assertEquals((long)manager.getInflightTransactionalSegmentCount(), (long)0L);
        Assert.assertEquals((long)manager.getInflightSegmentTransferCount(), (long)6L);
        checkPoint.triggerForever("topology_update_notify_released_" + String.valueOf(this.c3));
        checkPoint.awaitStrict("state_installed_invoked_" + String.valueOf(this.c3), 10L, TimeUnit.SECONDS);
        Assert.assertEquals((long)manager.getInflightSegmentTransferCount(), (long)6L);
        checkPoint.triggerForever("state_installed_invoked_release_" + String.valueOf(this.c3));
        checkPoint.awaitStrict("state_applied_" + String.valueOf(this.c3), 10L, TimeUnit.SECONDS);
        Assert.assertEquals((long)manager.getInflightSegmentTransferCount(), (long)0L);
        join.cancel(true);
    }

    private void waitTransactionRequest(Cache<?, ?> cache, CheckPoint checkPoint) {
        StateProvider sp = TestingUtil.extractComponent(cache, StateProvider.class);
        Answer forwardedAnswer = AdditionalAnswers.delegatesTo((Object)sp);
        StateProvider mockProvider = (StateProvider)Mockito.mock(StateProvider.class, (MockSettings)Mockito.withSettings().defaultAnswer(forwardedAnswer));
        ((StateProvider)Mockito.doAnswer(invocation -> {
            checkPoint.trigger("transactions_requested_invoked_" + String.valueOf(cache));
            Object response = forwardedAnswer.answer(invocation);
            try {
                checkPoint.awaitStrict("transactions_requested_released_" + String.valueOf(cache), 10L, TimeUnit.SECONDS);
                return response;
            }
            catch (InterruptedException | TimeoutException e) {
                throw new TestException(e);
            }
        }).when((Object)mockProvider)).getTransactionsForSegments((Address)ArgumentMatchers.any(Address.class), ArgumentMatchers.anyInt(), (IntSet)ArgumentMatchers.any(IntSet.class));
        TestingUtil.replaceComponent(cache, StateProvider.class, mockProvider, true);
    }

    private void waitRequestingSegments(Cache<?, ?> cache, CheckPoint checkPoint) {
        StateConsumer sc = TestingUtil.extractComponent(cache, StateConsumer.class);
        Answer forwardedAnswer = AdditionalAnswers.delegatesTo((Object)sc);
        StateConsumer mockConsumer = (StateConsumer)Mockito.mock(StateConsumer.class, (MockSettings)Mockito.withSettings().defaultAnswer(forwardedAnswer));
        ((StateConsumer)Mockito.doAnswer(invocation -> {
            checkPoint.trigger("topology_update_notify_invoked_" + String.valueOf(cache));
            return ((CompletionStage)forwardedAnswer.answer(invocation)).thenRun(() -> {
                checkPoint.trigger("topology_update_notify_executed_" + String.valueOf(cache));
                try {
                    checkPoint.awaitStrict("topology_update_notify_released_" + String.valueOf(cache), 10L, TimeUnit.SECONDS);
                }
                catch (InterruptedException | TimeoutException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }).when((Object)mockConsumer)).onTopologyUpdate((CacheTopology)ArgumentMatchers.any(CacheTopology.class), ArgumentMatchers.anyBoolean());
        TestingUtil.replaceComponent(cache, StateConsumer.class, mockConsumer, true);
    }

    private void waitApplyingSegmentBatch(Cache<?, ?> cache, CheckPoint checkPoint) {
        StateConsumer sc = TestingUtil.extractComponent(cache, StateConsumer.class);
        Answer forwardedAnswer = AdditionalAnswers.delegatesTo((Object)sc);
        StateConsumer mockConsumer = (StateConsumer)Mockito.mock(StateConsumer.class, (MockSettings)Mockito.withSettings().defaultAnswer(forwardedAnswer));
        ((StateConsumer)Mockito.doAnswer(invocation -> {
            checkPoint.trigger("state_installed_invoked_" + String.valueOf(cache));
            checkPoint.awaitStrict("state_installed_invoked_release_" + String.valueOf(cache), 10L, TimeUnit.SECONDS);
            return ((CompletionStage)forwardedAnswer.answer(invocation)).thenRun(() -> checkPoint.trigger("state_applied_" + String.valueOf(cache)));
        }).when((Object)mockConsumer)).applyState((Address)ArgumentMatchers.any(Address.class), ArgumentMatchers.anyInt(), ArgumentMatchers.anyCollection());
        TestingUtil.replaceComponent(cache, StateConsumer.class, mockConsumer, true);
    }
}

