/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.distribution.rehash;

import java.util.concurrent.CompletionStage;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.infinispan.AdvancedCache;
import org.infinispan.Cache;
import org.infinispan.commands.remote.recovery.TxCompletionNotificationCommand;
import org.infinispan.commands.statetransfer.StateResponseCommand;
import org.infinispan.commands.triangle.BackupWriteCommand;
import org.infinispan.commands.tx.AbstractTransactionBoundaryCommand;
import org.infinispan.commands.write.WriteCommand;
import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.cache.IsolationLevel;
import org.infinispan.distribution.BlockingInterceptor;
import org.infinispan.factories.GlobalComponentRegistry;
import org.infinispan.globalstate.NoOpGlobalConfigurationManager;
import org.infinispan.interceptors.AsyncInterceptorChain;
import org.infinispan.interceptors.impl.EntryWrappingInterceptor;
import org.infinispan.manager.CacheContainer;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.statetransfer.StateTransferLock;
import org.infinispan.test.AbstractCacheTest;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.CheckPoint;
import org.infinispan.test.op.TestWriteOperation;
import org.infinispan.topology.ClusterTopologyManager;
import org.infinispan.transaction.LockingMode;
import org.infinispan.util.ControlledRpcManager;
import org.infinispan.util.concurrent.BlockingManager;
import org.mockito.AdditionalAnswers;
import org.mockito.ArgumentMatchers;
import org.mockito.MockSettings;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@Test(groups={"functional"}, testName="distribution.rehash.StateTransferOverwritingValueTest")
public class StateTransferOverwritingValueTest
extends MultipleCacheManagersTest {
    public StateTransferOverwritingValueTest() {
        this.cleanup = AbstractCacheTest.CleanupPhase.AFTER_METHOD;
    }

    @Override
    public Object[] factory() {
        return new Object[]{new StateTransferOverwritingValueTest().cacheMode(CacheMode.DIST_SYNC).transactional(false), new StateTransferOverwritingValueTest().cacheMode(CacheMode.DIST_SYNC).transactional(true).lockingMode(LockingMode.OPTIMISTIC), new StateTransferOverwritingValueTest().cacheMode(CacheMode.DIST_SYNC).transactional(true).lockingMode(LockingMode.PESSIMISTIC)};
    }

    @Override
    protected void createCacheManagers() throws Throwable {
        ConfigurationBuilder c = this.getConfigurationBuilder();
        this.addClusterEnabledCacheManager(c);
        this.waitForClusterToForm();
    }

    @Override
    protected void amendCacheManagerBeforeStart(EmbeddedCacheManager cm) {
        NoOpGlobalConfigurationManager.amendCacheManager(cm);
    }

    protected ConfigurationBuilder getConfigurationBuilder() {
        ConfigurationBuilder c = new ConfigurationBuilder();
        c.clustering().cacheMode(this.cacheMode);
        c.transaction().transactionMode(this.transactionMode());
        if (this.lockingMode != null) {
            c.transaction().lockingMode(this.lockingMode);
        }
        c.locking().isolationLevel(IsolationLevel.READ_COMMITTED);
        return c;
    }

    public void testBackupOwnerJoiningDuringPut() throws Exception {
        this.doTest(TestWriteOperation.PUT_CREATE);
    }

    public void testBackupOwnerJoiningDuringPutOverwrite() throws Exception {
        this.doTest(TestWriteOperation.PUT_OVERWRITE);
    }

    public void testBackupOwnerJoiningDuringPutIfAbsent() throws Exception {
        this.doTest(TestWriteOperation.PUT_IF_ABSENT);
    }

    public void testBackupOwnerJoiningDuringReplace() throws Exception {
        this.doTest(TestWriteOperation.REPLACE);
    }

    public void testBackupOwnerJoiningDuringReplaceWithPreviousValue() throws Exception {
        this.doTest(TestWriteOperation.REPLACE_EXACT);
    }

    public void testBackupOwnerJoiningDuringRemove() throws Exception {
        this.doTest(TestWriteOperation.REMOVE);
    }

    public void testBackupOwnerJoiningDuringRemoveWithPreviousValue() throws Exception {
        this.doTest(TestWriteOperation.REMOVE_EXACT);
    }

    private void doTest(TestWriteOperation op) throws Exception {
        AdvancedCache cache0 = this.advancedCache(0);
        String key = "key";
        Object previousValue = op.getPreviousValue();
        if (previousValue != null) {
            cache0.put((Object)"key", previousValue);
            AssertJUnit.assertEquals((Object)previousValue, (Object)cache0.get((Object)"key"));
            log.tracef("Previous value inserted: %s = %s", (Object)"key", previousValue);
        }
        int preJoinTopologyId = cache0.getDistributionManager().getCacheTopology().getTopologyId();
        CheckPoint checkPoint = new CheckPoint();
        ControlledRpcManager blockingRpcManager0 = ControlledRpcManager.replaceRpcManager(cache0, new Class[0]);
        blockingRpcManager0.excludeCommands(WriteCommand.class, BackupWriteCommand.class, AbstractTransactionBoundaryCommand.class, TxCompletionNotificationCommand.class);
        int rebalanceTopologyId = preJoinTopologyId + 1;
        this.blockRebalanceConfirmation(this.manager(0), checkPoint, rebalanceTopologyId);
        log.tracef("Starting the cache on the joiner", new Object[0]);
        ConfigurationBuilder c = this.getConfigurationBuilder();
        c.clustering().stateTransfer().awaitInitialTransfer(false).timeout(30L, TimeUnit.SECONDS);
        this.addClusterEnabledCacheManager(c);
        AdvancedCache cache1 = this.advancedCache(1);
        StateTransferLock stateTransferLock1 = TestingUtil.extractComponent(cache1, StateTransferLock.class);
        stateTransferLock1.transactionDataFuture(rebalanceTopologyId).toCompletableFuture().get(10L, TimeUnit.SECONDS);
        AssertJUnit.assertEquals((int)2, (int)cache1.getRpcManager().getMembers().size());
        CyclicBarrier beforeCommitCache1Barrier = new CyclicBarrier(2);
        BlockingInterceptor blockingInterceptor1 = new BlockingInterceptor(beforeCommitCache1Barrier, true, false, t -> t.getClass() == op.getCommandClass());
        AsyncInterceptorChain interceptorChain1 = TestingUtil.extractInterceptorChain(cache1);
        Class<?> ewi = ((EntryWrappingInterceptor)interceptorChain1.findInterceptorExtending(EntryWrappingInterceptor.class)).getClass();
        AssertJUnit.assertTrue((boolean)interceptorChain1.addInterceptorAfter(blockingInterceptor1, ewi));
        ControlledRpcManager.BlockedRequest<StateResponseCommand> blockedStateResponse = blockingRpcManager0.expectCommand(StateResponseCommand.class);
        Future<Object> future = this.fork(() -> op.perform((AdvancedCache<Object, Object>)cache0, "key"));
        beforeCommitCache1Barrier.await(10L, TimeUnit.SECONDS);
        blockingInterceptor1.suspend(true);
        blockedStateResponse.send().receiveAll();
        checkPoint.awaitStrict("pre_rebalance_confirmation_" + rebalanceTopologyId + "_from_" + String.valueOf(this.address(1)), 10L, TimeUnit.SECONDS);
        beforeCommitCache1Barrier.await(10L, TimeUnit.SECONDS);
        Object result = future.get(10L, TimeUnit.SECONDS);
        AssertJUnit.assertEquals((Object)op.getReturnValue(), (Object)result);
        log.tracef("%s operation is done", (Object)op);
        checkPoint.trigger("resume_rebalance_confirmation_" + rebalanceTopologyId + "_from_" + String.valueOf(this.address(0)));
        checkPoint.trigger("resume_rebalance_confirmation_" + rebalanceTopologyId + "_from_" + String.valueOf(this.address(1)));
        TestingUtil.waitForNoRebalance(new Cache[]{cache0, cache1});
        AssertJUnit.assertEquals((Object)op.getValue(), (Object)cache0.get((Object)"key"));
        AssertJUnit.assertEquals((Object)op.getValue(), (Object)cache1.get((Object)"key"));
        blockingRpcManager0.stopBlocking();
    }

    private void blockRebalanceConfirmation(EmbeddedCacheManager manager, CheckPoint checkPoint, int rebalanceTopologyId) throws Exception {
        ClusterTopologyManager ctm = TestingUtil.extractGlobalComponent((CacheContainer)manager, ClusterTopologyManager.class);
        Answer forwardedAnswer = AdditionalAnswers.delegatesTo((Object)ctm);
        ClusterTopologyManager mock = (ClusterTopologyManager)Mockito.mock(ClusterTopologyManager.class, (MockSettings)Mockito.withSettings().defaultAnswer(forwardedAnswer));
        BlockingManager blockingManager = (BlockingManager)GlobalComponentRegistry.componentOf((EmbeddedCacheManager)manager, BlockingManager.class);
        ((ClusterTopologyManager)Mockito.doAnswer(invocation -> {
            Object[] arguments = invocation.getArguments();
            Address source = (Address)arguments[1];
            int topologyId = (Integer)arguments[2];
            if (topologyId == rebalanceTopologyId) {
                checkPoint.trigger("pre_rebalance_confirmation_" + topologyId + "_from_" + String.valueOf(source));
                return checkPoint.awaitStrictAsync("resume_rebalance_confirmation_" + topologyId + "_from_" + String.valueOf(source), 10L, TimeUnit.SECONDS, blockingManager.asExecutor("checkpoint")).thenCompose(unused -> {
                    try {
                        return (CompletionStage)forwardedAnswer.answer(invocation);
                    }
                    catch (Throwable e) {
                        throw CompletableFutures.asCompletionException((Throwable)e);
                    }
                });
            }
            return forwardedAnswer.answer(invocation);
        }).when((Object)mock)).handleRebalancePhaseConfirm(ArgumentMatchers.anyString(), (Address)ArgumentMatchers.any(Address.class), ArgumentMatchers.anyInt(), (Throwable)ArgumentMatchers.isNull(), ArgumentMatchers.anyInt());
        TestingUtil.replaceComponent((CacheContainer)manager, ClusterTopologyManager.class, mock, true);
    }
}

