/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.distribution.rehash;

import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commands.remote.CacheRpcCommand;
import org.infinispan.commands.statetransfer.StateResponseCommand;
import org.infinispan.commands.statetransfer.StateTransferGetTransactionsCommand;
import org.infinispan.commands.statetransfer.StateTransferStartCommand;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.container.entries.ImmortalCacheEntry;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.distribution.MagicKey;
import org.infinispan.distribution.ch.ConsistentHashFactory;
import org.infinispan.remoting.inboundhandler.DeliverOrder;
import org.infinispan.remoting.inboundhandler.PerCacheInboundInvocationHandler;
import org.infinispan.remoting.inboundhandler.Reply;
import org.infinispan.remoting.transport.Address;
import org.infinispan.statetransfer.StateChunk;
import org.infinispan.statetransfer.StateTransferManager;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestDataSCI;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.concurrent.CommandMatcher;
import org.infinispan.test.concurrent.StateSequencer;
import org.infinispan.test.concurrent.StateSequencerUtil;
import org.infinispan.test.fwk.CleanupAfterMethod;
import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.infinispan.util.ByteString;
import org.infinispan.util.ControlledConsistentHashFactory;
import org.testng.Assert;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@CleanupAfterMethod
@Test(groups={"functional"}, testName="distribution.rehash.StateResponseOrderingTest")
public class StateResponseOrderingTest
extends MultipleCacheManagersTest {
    private ControlledConsistentHashFactory consistentHashFactory;

    @Override
    protected void createCacheManagers() throws Throwable {
        this.consistentHashFactory = new ControlledConsistentHashFactory.Default(new int[][]{{1, 2, 3}, {1, 2, 3}});
        ConfigurationBuilder builder = TestCacheManagerFactory.getDefaultCacheConfiguration(true);
        builder.clustering().cacheMode(CacheMode.DIST_SYNC).hash().numOwners(3);
        builder.clustering().hash().numSegments(2).consistentHashFactory((ConsistentHashFactory)this.consistentHashFactory);
        this.createCluster(TestDataSCI.INSTANCE, builder, 4);
        this.waitForClusterToForm();
    }

    public void testSimulatedOldStateResponse() throws Throwable {
        StateSequencer sequencer = new StateSequencer();
        sequencer.logicalThread("st", "st:block_state_request", "st:simulate_old_response", "st:resume_state_request");
        this.cache(1).put((Object)"k1", (Object)"v1");
        this.cache(2).put((Object)"k2", (Object)"v2");
        this.cache(3).put((Object)"k3", (Object)"v3");
        DistributionManager dm0 = this.advancedCache(0).getDistributionManager();
        int initialTopologyId = dm0.getCacheTopology().getTopologyId();
        AssertJUnit.assertEquals(Arrays.asList(this.address(1), this.address(2), this.address(3)), (Object)dm0.getCacheTopology().getDistribution((Object)"k1").readOwners());
        AssertJUnit.assertNull((Object)dm0.getCacheTopology().getPendingCH());
        CommandMatcher segmentRequestMatcher = command -> command instanceof StateTransferStartCommand && ((StateTransferStartCommand)command).getTopologyId() == initialTopologyId + 1;
        StateSequencerUtil.advanceOnOutboundRpc(sequencer, this.cache(0), segmentRequestMatcher).before("st:block_state_request", "st:resume_state_request");
        this.consistentHashFactory.setOwnerIndexes(new int[][]{{0, 1, 2}, {0, 1, 2}});
        this.consistentHashFactory.triggerRebalance(this.cache(0));
        sequencer.enter("st:simulate_old_response");
        Assert.assertNotNull((Object)dm0.getCacheTopology().getPendingCH());
        AssertJUnit.assertEquals(Arrays.asList(this.address(0), this.address(1), this.address(2)), (Object)dm0.getCacheTopology().getPendingCH().locateOwnersForSegment(0));
        AssertJUnit.assertEquals(Arrays.asList(this.address(1), this.address(2), this.address(3), this.address(0)), (Object)dm0.getCacheTopology().getDistribution((Object)"k1").writeOwners());
        PerCacheInboundInvocationHandler handler = TestingUtil.extractComponent(this.cache(0), PerCacheInboundInvocationHandler.class);
        StateChunk stateChunk0 = new StateChunk(0, Arrays.asList(new ImmortalCacheEntry((Object)"k0", (Object)"v0")), true);
        StateChunk stateChunk1 = new StateChunk(1, Arrays.asList(new ImmortalCacheEntry((Object)"k0", (Object)"v0")), true);
        String cacheName = (String)this.manager(0).getCacheManagerConfiguration().defaultCacheName().get();
        StateResponseCommand stateResponseCommand = new StateResponseCommand(ByteString.fromString((String)cacheName), initialTopologyId, Arrays.asList(stateChunk0, stateChunk1), false);
        stateResponseCommand.setOrigin(this.address(3));
        handler.handle((CacheRpcCommand)stateResponseCommand, Reply.NO_OP, DeliverOrder.PER_SENDER);
        sequencer.exit("st:simulate_old_response");
        TestingUtil.waitForNoRebalance(this.cache(0), this.cache(1), this.cache(2), this.cache(3));
        Assert.assertTrue((boolean)dm0.getCacheTopology().isReadOwner((Object)"k1"));
        Assert.assertTrue((boolean)dm0.getCacheTopology().isReadOwner((Object)"k2"));
        Assert.assertTrue((boolean)dm0.getCacheTopology().isReadOwner((Object)"k3"));
        AssertJUnit.assertEquals((Object)"v1", (Object)this.cache(0).get((Object)"k1"));
        AssertJUnit.assertEquals((Object)"v2", (Object)this.cache(0).get((Object)"k2"));
        AssertJUnit.assertEquals((Object)"v3", (Object)this.cache(0).get((Object)"k3"));
        AssertJUnit.assertNull((Object)this.cache(0).get((Object)"k0"));
    }

    public void testStateResponseWhileRestartingBrokenTransfers() throws Throwable {
        this.consistentHashFactory.setOwnerIndexes(new int[][]{{1, 2, 3}, {2, 1, 3}});
        this.consistentHashFactory.triggerRebalance(this.cache(0));
        this.eventuallyEquals(this.address(2), () -> this.advancedCache(0).getDistributionManager().getReadConsistentHash().locatePrimaryOwnerForSegment(1));
        StateSequencer sequencer = new StateSequencer();
        sequencer.logicalThread("st", "st:block_first_state_response", "st:kill_node", "st:block_second_state_request", "st:resume_first_state_response", "st:after_first_state_response", "st:check_incomplete", "st:resume_second_state_request");
        final AtomicReference firstResponseSender = new AtomicReference();
        CommandMatcher firstStateResponseMatcher = new CommandMatcher(){
            final CommandMatcher realMatcher = StateSequencerUtil.matchCommand(StateResponseCommand.class).matchCount(0).build();
            final /* synthetic */ StateResponseOrderingTest this$0;
            {
                this.this$0 = this$0;
            }

            @Override
            public boolean accept(ReplicableCommand command) {
                if (!this.realMatcher.accept(command)) {
                    return false;
                }
                firstResponseSender.set(((StateResponseCommand)command).getOrigin());
                return true;
            }
        };
        StateSequencerUtil.advanceOnInboundRpc(sequencer, this.cache(0), firstStateResponseMatcher).before("st:block_first_state_response", "st:resume_first_state_response").after("st:after_first_state_response", new String[0]);
        CommandMatcher secondStateRequestMatcher = new CommandMatcher(){
            private final AtomicInteger counter = new AtomicInteger();

            @Override
            public boolean accept(ReplicableCommand command) {
                if (command instanceof StateTransferGetTransactionsCommand) {
                    if (this.counter.getAndIncrement() == 2) {
                        return true;
                    }
                    log.debugf("Not blocking command %s", (Object)command);
                }
                return false;
            }
        };
        StateSequencerUtil.advanceOnOutboundRpc(sequencer, this.cache(0), secondStateRequestMatcher).before("st:block_second_state_request", "st:resume_second_state_request");
        DistributionManager dm0 = this.advancedCache(0).getDistributionManager();
        StateTransferManager stm0 = TestingUtil.extractComponentRegistry(this.cache(0)).getStateTransferManager();
        MagicKey k1 = new MagicKey("k1", this.cache(1));
        AssertJUnit.assertEquals(Arrays.asList(this.address(1), this.address(2), this.address(3)), (Object)dm0.getCacheTopology().getDistribution((Object)k1).readOwners());
        this.cache(0).put((Object)k1, (Object)"v1");
        MagicKey k2 = new MagicKey("k2", this.cache(2));
        AssertJUnit.assertEquals(Arrays.asList(this.address(2), this.address(1), this.address(3)), (Object)dm0.getCacheTopology().getDistribution((Object)k2).readOwners());
        this.cache(0).put((Object)k2, (Object)"v2");
        this.consistentHashFactory.setOwnerIndexes(new int[][]{{0, 1, 2}, {0, 2, 1}});
        this.consistentHashFactory.triggerRebalance(this.cache(0));
        sequencer.enter("st:kill_node");
        Assert.assertNotNull((Object)dm0.getCacheTopology().getPendingCH());
        int nodeToKeep = this.managerIndex((Address)firstResponseSender.get());
        int nodeToKill = nodeToKeep == 1 ? 2 : 1;
        log.debugf("Blocked state response from %s, killing %s", firstResponseSender.get(), (Object)this.manager(nodeToKill));
        this.cache(nodeToKill).stop();
        this.eventuallyEquals(3, () -> dm0.getCacheTopology().getMembers().size());
        sequencer.exit("st:kill_node");
        sequencer.enter("st:check_incomplete");
        Assert.assertTrue((boolean)stm0.isStateTransferInProgress());
        sequencer.exit("st:check_incomplete");
        TestingUtil.waitForNoRebalance(this.cache(0), this.cache(nodeToKeep), this.cache(3));
        Assert.assertTrue((boolean)dm0.getCacheTopology().isReadOwner((Object)k1));
        Assert.assertTrue((boolean)dm0.getCacheTopology().isReadOwner((Object)k2));
        AssertJUnit.assertEquals((Object)"v1", (Object)this.cache(0).get((Object)k1));
        AssertJUnit.assertEquals((Object)"v2", (Object)this.cache(0).get((Object)k2));
    }
}

