/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.statetransfer;

import jakarta.transaction.TransactionManager;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.infinispan.AdvancedCache;
import org.infinispan.commons.executors.BlockingThreadPoolExecutorFactory;
import org.infinispan.commons.executors.ThreadPoolExecutorFactory;
import org.infinispan.commons.util.IntSet;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.container.DataContainer;
import org.infinispan.container.impl.InternalDataContainer;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.statetransfer.StateConsumer;
import org.infinispan.statetransfer.StateProvider;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestException;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.CheckPoint;
import org.infinispan.test.fwk.CleanupAfterMethod;
import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.infinispan.transaction.LockingMode;
import org.infinispan.transaction.impl.TransactionTable;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@Test(testName="lock.ManyTxsDuringStateTransferTest", groups={"functional"})
@CleanupAfterMethod
public class ManyTxsDuringStateTransferTest
extends MultipleCacheManagersTest {
    public static final String CACHE_NAME = "testCache";
    private static final int NUM_TXS = 20;

    @Override
    protected void createCacheManagers() throws Throwable {
        ConfigurationBuilder defaultBuilder = new ConfigurationBuilder();
        this.addClusterEnabledCacheManager(this.getGlobalConfigurationBuilder(), defaultBuilder);
        this.addClusterEnabledCacheManager(this.getGlobalConfigurationBuilder(), defaultBuilder);
        this.waitForClusterToForm();
    }

    private GlobalConfigurationBuilder getGlobalConfigurationBuilder() {
        GlobalConfigurationBuilder globalBuilder = GlobalConfigurationBuilder.defaultClusteredBuilder();
        BlockingThreadPoolExecutorFactory threadPoolFactory = new BlockingThreadPoolExecutorFactory(1, 1, 0, 5L);
        globalBuilder.transport().remoteCommandThreadPool().threadPoolFactory((ThreadPoolExecutorFactory)threadPoolFactory);
        return globalBuilder;
    }

    public void testManyTxs() throws Throwable {
        ConfigurationBuilder cfg = TestCacheManagerFactory.getDefaultCacheConfiguration(true);
        cfg.clustering().cacheMode(CacheMode.DIST_SYNC).stateTransfer().awaitInitialTransfer(false).transaction().lockingMode(LockingMode.OPTIMISTIC);
        this.manager(0).defineConfiguration(CACHE_NAME, cfg.build());
        this.manager(1).defineConfiguration(CACHE_NAME, cfg.build());
        CheckPoint checkpoint = new CheckPoint();
        AdvancedCache cache0 = this.advancedCache(0, CACHE_NAME);
        TransactionManager tm0 = cache0.getTransactionManager();
        StateProvider stateProvider = TestingUtil.extractComponent(cache0, StateProvider.class);
        StateProvider spyProvider = (StateProvider)Mockito.spy((Object)stateProvider);
        ((StateProvider)Mockito.doAnswer(invocation -> {
            Object[] arguments = invocation.getArguments();
            Address source = (Address)arguments[0];
            int topologyId = (Integer)arguments[1];
            CompletionStage result = (CompletionStage)invocation.callRealMethod();
            return result.thenApply(transactions -> {
                try {
                    checkpoint.trigger("post_get_transactions_" + topologyId + "_from_" + String.valueOf(source));
                    checkpoint.awaitStrict("resume_get_transactions_" + topologyId + "_from_" + String.valueOf(source), 10L, TimeUnit.SECONDS);
                    return transactions;
                }
                catch (InterruptedException | TimeoutException e) {
                    throw new TestException(e);
                }
            });
        }).when((Object)spyProvider)).getTransactionsForSegments((Address)ArgumentMatchers.any(Address.class), ArgumentMatchers.anyInt(), (IntSet)ArgumentMatchers.any());
        TestingUtil.replaceComponent(cache0, StateProvider.class, spyProvider, true);
        DistributionManager dm0 = cache0.getDistributionManager();
        int initialTopologyId = dm0.getCacheTopology().getTopologyId();
        int rebalanceTopologyId = initialTopologyId + 1;
        AdvancedCache cache1 = this.advancedCache(1, CACHE_NAME);
        checkpoint.awaitStrict("post_get_transactions_" + rebalanceTopologyId + "_from_" + String.valueOf(this.address(1)), 10L, TimeUnit.SECONDS);
        Future[] futures = new Future[20];
        for (int i = 0; i < 20; ++i) {
            int ii = i;
            futures[i] = this.fork(() -> {
                tm0.begin();
                cache0.put((Object)("testkey" + ii), (Object)("v" + ii));
                tm0.commit();
                return null;
            });
        }
        Thread.sleep(1000L);
        StateConsumer stateConsumer = TestingUtil.extractComponent(cache1, StateConsumer.class);
        AssertJUnit.assertTrue((boolean)stateConsumer.isStateTransferInProgress());
        AssertJUnit.assertTrue((stateConsumer.inflightTransactionSegmentCount() > 0L ? 1 : 0) != 0);
        checkpoint.trigger("resume_get_transactions_" + rebalanceTopologyId + "_from_" + String.valueOf(this.address(1)));
        TestingUtil.waitForNoRebalance(this.caches(CACHE_NAME));
        AssertJUnit.assertFalse((boolean)stateConsumer.isStateTransferInProgress());
        AssertJUnit.assertEquals((long)stateConsumer.inflightTransactionSegmentCount(), (long)0L);
        DataContainer dataContainer0 = (DataContainer)TestingUtil.extractComponent(cache0, InternalDataContainer.class);
        DataContainer dataContainer1 = (DataContainer)TestingUtil.extractComponent(cache1, InternalDataContainer.class);
        for (int i = 0; i < 20; ++i) {
            futures[i].get(10L, TimeUnit.SECONDS);
            AssertJUnit.assertEquals((Object)("v" + i), (Object)dataContainer0.get((Object)("testkey" + i)).getValue());
            AssertJUnit.assertEquals((Object)("v" + i), (Object)dataContainer1.get((Object)("testkey" + i)).getValue());
        }
        TransactionTable tt0 = TestingUtil.extractComponent(cache0, TransactionTable.class);
        TransactionTable tt1 = TestingUtil.extractComponent(cache1, TransactionTable.class);
        this.eventuallyEquals(0, () -> ((TransactionTable)tt0).getLocalTxCount());
        this.eventuallyEquals(0, () -> ((TransactionTable)tt1).getRemoteTxCount());
    }
}

