/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.distribution.rehash;

import java.lang.invoke.StringConcatFactory;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.infinispan.AdvancedCache;
import org.infinispan.Cache;
import org.infinispan.commands.FlagAffectedCommand;
import org.infinispan.commands.statetransfer.StateResponseCommand;
import org.infinispan.commands.triangle.BackupWriteCommand;
import org.infinispan.commands.write.BackupAckCommand;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.container.DataContainer;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.container.entries.ClearCacheEntry;
import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext;
import org.infinispan.globalstate.NoOpGlobalConfigurationManager;
import org.infinispan.interceptors.locking.ClusteringDependentLogic;
import org.infinispan.manager.CacheContainer;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.test.AbstractCacheTest;
import org.infinispan.test.Mocks;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.CacheEntryDelegator;
import org.infinispan.test.fwk.CheckPoint;
import org.infinispan.test.fwk.ClusteringDependentLogicDelegator;
import org.infinispan.test.op.TestWriteOperation;
import org.infinispan.topology.ClusterTopologyManager;
import org.infinispan.transaction.TransactionMode;
import org.infinispan.util.ControlledRpcManager;
import org.mockito.AdditionalAnswers;
import org.mockito.ArgumentMatchers;
import org.mockito.MockSettings;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@Test(groups={"functional"}, testName="distribution.rehash.NonTxStateTransferOverwritingValue2Test")
public class NonTxStateTransferOverwritingValue2Test
extends MultipleCacheManagersTest {
    public NonTxStateTransferOverwritingValue2Test() {
        this.cleanup = AbstractCacheTest.CleanupPhase.AFTER_METHOD;
    }

    @Override
    protected void createCacheManagers() throws Throwable {
        ConfigurationBuilder c = this.getConfigurationBuilder();
        this.addClusterEnabledCacheManager(c);
        this.waitForClusterToForm();
    }

    @Override
    protected void amendCacheManagerBeforeStart(EmbeddedCacheManager cm) {
        NoOpGlobalConfigurationManager.amendCacheManager(cm);
    }

    protected ConfigurationBuilder getConfigurationBuilder() {
        ConfigurationBuilder c = new ConfigurationBuilder();
        c.clustering().cacheMode(CacheMode.DIST_SYNC);
        c.transaction().transactionMode(TransactionMode.NON_TRANSACTIONAL);
        return c;
    }

    public void testBackupOwnerJoiningDuringPutOverwrite() throws Exception {
        this.doTest(TestWriteOperation.PUT_OVERWRITE);
    }

    public void testBackupOwnerJoiningDuringReplace() throws Exception {
        this.doTest(TestWriteOperation.REPLACE);
    }

    public void testBackupOwnerJoiningDuringReplaceWithPreviousValue() throws Exception {
        this.doTest(TestWriteOperation.REPLACE_EXACT);
    }

    public void testBackupOwnerJoiningDuringRemove() throws Exception {
        this.doTest(TestWriteOperation.REMOVE);
    }

    public void testBackupOwnerJoiningDuringRemoveWithPreviousValue() throws Exception {
        this.doTest(TestWriteOperation.REMOVE_EXACT);
    }

    private void doTest(TestWriteOperation op) throws Exception {
        AdvancedCache cache0 = this.advancedCache(0);
        String key = "key";
        Object previousValue = op.getPreviousValue();
        if (previousValue != null) {
            cache0.put((Object)"key", previousValue);
            AssertJUnit.assertEquals((Object)previousValue, (Object)cache0.get((Object)"key"));
            log.tracef("Previous value inserted: %s = %s", (Object)"key", previousValue);
        }
        int preJoinTopologyId = cache0.getDistributionManager().getCacheTopology().getTopologyId();
        CheckPoint checkPoint = new CheckPoint();
        ControlledRpcManager blockingRpcManager0 = ControlledRpcManager.replaceRpcManager(cache0, new Class[0]);
        blockingRpcManager0.excludeCommands(BackupWriteCommand.class, BackupAckCommand.class);
        this.blockRebalanceConfirmation(this.manager(0), checkPoint, preJoinTopologyId + 1);
        log.tracef("Starting the cache on the joiner", new Object[0]);
        ConfigurationBuilder c = this.getConfigurationBuilder();
        c.clustering().stateTransfer().awaitInitialTransfer(false);
        this.addClusterEnabledCacheManager(c);
        AdvancedCache cache1 = this.advancedCache(1);
        NonTxStateTransferOverwritingValue2Test.eventually(() -> cache0.getRpcManager().getMembers().size() == 2 && cache1.getRpcManager().getMembers().size() == 2);
        this.blockEntryCommit(checkPoint, cache1);
        ControlledRpcManager.BlockedRequest<StateResponseCommand> blockedStateResponse = blockingRpcManager0.expectCommand(StateResponseCommand.class);
        ControlledRpcManager.SentRequest sentStateResponse = blockedStateResponse.send();
        checkPoint.awaitStrict((String)((Object)StringConcatFactory.makeConcatWithConstants("makeConcatWithConstants", new Object[]{"pre_commit_entry_key_from_null"})), 5L, TimeUnit.SECONDS);
        Future<Object> future = this.fork(() -> op.perform((AdvancedCache<Object, Object>)cache0, "key"));
        boolean blocked = checkPoint.peek(1L, TimeUnit.SECONDS, "pre_commit_entry_key_from_" + String.valueOf(this.address(0))) == null;
        AssertJUnit.assertTrue((boolean)blocked);
        checkPoint.trigger((String)((Object)StringConcatFactory.makeConcatWithConstants("makeConcatWithConstants", new Object[]{"resume_commit_entry_key_from_null"})));
        checkPoint.awaitStrict("pre_commit_entry_key_from_" + String.valueOf(this.address(0)), 5L, TimeUnit.SECONDS);
        checkPoint.trigger("resume_commit_entry_key_from_" + String.valueOf(this.address(0)));
        checkPoint.awaitStrict((String)((Object)StringConcatFactory.makeConcatWithConstants("makeConcatWithConstants", new Object[]{"post_commit_entry_key_from_null"})), 10L, TimeUnit.SECONDS);
        checkPoint.awaitStrict("post_commit_entry_key_from_" + String.valueOf(this.address(0)), 10L, TimeUnit.SECONDS);
        Object result = future.get(10L, TimeUnit.SECONDS);
        AssertJUnit.assertEquals((Object)op.getReturnValue(), (Object)result);
        log.tracef("%s operation is done", (Object)op);
        sentStateResponse.receiveAll();
        int rebalanceTopologyId = preJoinTopologyId + 1;
        checkPoint.trigger("resume_rebalance_confirmation_" + rebalanceTopologyId + "_from_" + String.valueOf(this.address(0)));
        checkPoint.trigger("resume_rebalance_confirmation_" + rebalanceTopologyId + "_from_" + String.valueOf(this.address(1)));
        TestingUtil.waitForNoRebalance(new Cache[]{cache0, cache1});
        AssertJUnit.assertEquals((Object)op.getValue(), (Object)cache0.get((Object)"key"));
        AssertJUnit.assertEquals((Object)op.getValue(), (Object)cache1.get((Object)"key"));
        blockingRpcManager0.stopBlocking();
    }

    private void blockEntryCommit(final CheckPoint checkPoint, AdvancedCache<Object, Object> cache) {
        ClusteringDependentLogic cdl1 = TestingUtil.extractComponent(cache, ClusteringDependentLogic.class);
        ClusteringDependentLogicDelegator replaceCdl = new ClusteringDependentLogicDelegator(cdl1){

            @Override
            public CompletionStage<Void> commitEntry(CacheEntry entry, FlagAffectedCommand command, InvocationContext ctx, Flag trackFlag, boolean l1Invalidation) {
                if (entry instanceof ClearCacheEntry) {
                    return super.commitEntry(entry, command, ctx, trackFlag, l1Invalidation);
                }
                final Address source = ctx.getOrigin();
                CacheEntryDelegator newEntry = new CacheEntryDelegator(entry){

                    @Override
                    public void commit(DataContainer container) {
                        checkPoint.trigger("pre_commit_entry_" + String.valueOf(this.getKey()) + "_from_" + String.valueOf(source));
                        try {
                            checkPoint.awaitStrict("resume_commit_entry_" + String.valueOf(this.getKey()) + "_from_" + String.valueOf(source), 10L, TimeUnit.SECONDS);
                        }
                        catch (InterruptedException | TimeoutException e) {
                            throw new RuntimeException(e);
                        }
                        super.commit(container);
                        checkPoint.trigger("post_commit_entry_" + String.valueOf(this.getKey()) + "_from_" + String.valueOf(source));
                    }
                };
                return super.commitEntry(newEntry, command, ctx, trackFlag, l1Invalidation);
            }
        };
        TestingUtil.replaceComponent(cache, ClusteringDependentLogic.class, replaceCdl, true);
    }

    private void blockRebalanceConfirmation(EmbeddedCacheManager manager, CheckPoint checkPoint, int rebalanceTopologyId) throws Exception {
        ClusterTopologyManager ctm = TestingUtil.extractGlobalComponent((CacheContainer)manager, ClusterTopologyManager.class);
        Answer forwardedAnswer = AdditionalAnswers.delegatesTo((Object)ctm);
        ClusterTopologyManager mock = (ClusterTopologyManager)Mockito.mock(ClusterTopologyManager.class, (MockSettings)Mockito.withSettings().defaultAnswer(forwardedAnswer));
        ((ClusterTopologyManager)Mockito.doAnswer(invocation -> {
            Object[] arguments = invocation.getArguments();
            Address source = (Address)arguments[1];
            int topologyId = (Integer)arguments[2];
            if (rebalanceTopologyId == topologyId) {
                checkPoint.trigger("pre_rebalance_confirmation_" + topologyId + "_from_" + String.valueOf(source));
                return checkPoint.future("resume_rebalance_confirmation_" + topologyId + "_from_" + String.valueOf(source), 10L, TimeUnit.SECONDS, this.testExecutor()).thenCompose(__ -> (CompletionStage)Mocks.callAnotherAnswer(forwardedAnswer, invocation));
            }
            return forwardedAnswer.answer(invocation);
        }).when((Object)mock)).handleRebalancePhaseConfirm(ArgumentMatchers.anyString(), (Address)ArgumentMatchers.any(Address.class), ArgumentMatchers.anyInt(), (Throwable)ArgumentMatchers.isNull(), ArgumentMatchers.anyInt());
        TestingUtil.replaceComponent((CacheContainer)manager, ClusterTopologyManager.class, mock, true);
    }
}

