/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.statetransfer;

import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.infinispan.Cache;
import org.infinispan.commands.VisitableCommand;
import org.infinispan.commands.remote.recovery.TxCompletionNotificationCommand;
import org.infinispan.commands.tx.CommitCommand;
import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.commands.tx.VersionedPrepareCommand;
import org.infinispan.commons.marshall.Externalizer;
import org.infinispan.commons.marshall.SerializeWith;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.cache.IsolationLevel;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.container.DataContainer;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.container.impl.InternalDataContainer;
import org.infinispan.context.InvocationContext;
import org.infinispan.distribution.DistributionTestHelper;
import org.infinispan.distribution.ch.ConsistentHashFactory;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.interceptors.AsyncInterceptor;
import org.infinispan.interceptors.BaseAsyncInterceptor;
import org.infinispan.manager.CacheContainer;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.protostream.SerializationContextInitializer;
import org.infinispan.protostream.annotations.AutoProtoSchemaBuilder;
import org.infinispan.remoting.transport.Address;
import org.infinispan.statetransfer.WriteSkewDuringStateTransferSCIImpl;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.infinispan.test.fwk.TransportFlags;
import org.infinispan.topology.CacheTopology;
import org.infinispan.transaction.impl.WriteSkewHelper;
import org.infinispan.util.BaseControlledConsistentHashFactory;
import org.infinispan.util.BlockingLocalTopologyManager;
import org.infinispan.util.ControlledRpcManager;
import org.infinispan.util.logging.Log;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;

@Test(groups={"functional"}, testName="statetransfer.WriteSkewDuringStateTransferTest", singleThreaded=true)
public class WriteSkewDuringStateTransferTest
extends MultipleCacheManagersTest {
    private final List<BlockingLocalTopologyManager> topologyManagerList = Collections.synchronizedList(new ArrayList(4));

    @AfterMethod(alwaysRun=true)
    public final void unblockAll() {
        for (BlockingLocalTopologyManager topologyManager : this.topologyManagerList) {
            topologyManager.stopBlocking();
        }
        this.topologyManagerList.clear();
    }

    public void testVersionsAfterStateTransfer() throws Exception {
        this.assertClusterSize("Wrong cluster size", 2);
        String key = "key1";
        this.assertKeyOwnership(key, this.cache(1), this.cache(0));
        int currentTopologyId = this.currentTopologyId(this.cache(0));
        ControlledRpcManager nodeARpcManager = ControlledRpcManager.replaceRpcManager(this.cache(0), new Class[0]);
        NodeController nodeAController = WriteSkewDuringStateTransferTest.setNodeControllerIn(this.cache(0));
        WriteSkewDuringStateTransferTest.setInitialPhaseForNodeA(nodeAController, currentTopologyId);
        NodeController nodeBController = WriteSkewDuringStateTransferTest.setNodeControllerIn(this.cache(1));
        WriteSkewDuringStateTransferTest.setInitialPhaseForNodeB(nodeBController, currentTopologyId);
        NewNode nodeC = this.addNode(currentTopologyId);
        BlockingLocalTopologyManager.confirmTopologyUpdate(CacheTopology.Phase.READ_OLD_WRITE_ALL, nodeAController.topologyManager, nodeBController.topologyManager, nodeC.controller.topologyManager);
        nodeBController.topologyManager.confirmTopologyUpdate(CacheTopology.Phase.READ_ALL_WRITE_ALL);
        Future<Object> tx = this.executeTransaction(this.cache(0), key);
        ControlledRpcManager.BlockedResponseMap blockedPrepare = nodeARpcManager.expectCommand(VersionedPrepareCommand.class).send().expectAllResponses();
        AssertJUnit.assertEquals((long)0L, (long)nodeC.commandLatch.getCount());
        nodeAController.topologyManager.confirmTopologyUpdate(CacheTopology.Phase.READ_ALL_WRITE_ALL);
        nodeC.controller.topologyManager.confirmTopologyUpdate(CacheTopology.Phase.READ_ALL_WRITE_ALL);
        BlockingLocalTopologyManager.confirmTopologyUpdate(CacheTopology.Phase.READ_NEW_WRITE_ALL, nodeAController.topologyManager, nodeBController.topologyManager, nodeC.controller.topologyManager);
        BlockingLocalTopologyManager.confirmTopologyUpdate(CacheTopology.Phase.NO_REBALANCE, nodeAController.topologyManager, nodeBController.topologyManager, nodeC.controller.topologyManager);
        this.awaitForTopology(currentTopologyId + 4, this.cache(0));
        blockedPrepare.receive();
        nodeARpcManager.expectCommand(PrepareCommand.class).send().receiveAll();
        nodeARpcManager.expectCommand(CommitCommand.class).send().receiveAll();
        nodeARpcManager.expectCommand(TxCompletionNotificationCommand.class).send();
        AssertJUnit.assertNull((String)"Wrong put() return value.", (Object)tx.get());
        nodeAController.topologyManager.stopBlocking();
        nodeBController.topologyManager.stopBlocking();
        nodeC.controller.topologyManager.stopBlocking();
        nodeC.joinerFuture.get(30L, TimeUnit.SECONDS);
        this.awaitForTopology(currentTopologyId + 4, this.cache(0));
        this.awaitForTopology(currentTopologyId + 4, this.cache(1));
        this.awaitForTopology(currentTopologyId + 4, this.cache(2));
        this.assertKeyVersionInDataContainer(key, this.cache(1), this.cache(2));
        nodeARpcManager.stopBlocking();
        this.cache(0).put((Object)key, (Object)"v2");
    }

    @Override
    protected void createCacheManagers() throws Throwable {
        this.createClusteredCaches(2, WriteSkewDuringStateTransferSCI.INSTANCE, this.configuration());
    }

    private void assertKeyVersionInDataContainer(Object key, Cache<?, ?> ... owners) {
        for (Cache<?, ?> cache : owners) {
            DataContainer dataContainer = (DataContainer)TestingUtil.extractComponent(cache, InternalDataContainer.class);
            InternalCacheEntry entry = dataContainer.peek(key);
            AssertJUnit.assertNotNull((String)("Entry cannot be null in " + String.valueOf(this.address(cache)) + "."), (Object)entry);
            AssertJUnit.assertNotNull((String)"Version cannot be null.", (Object)WriteSkewHelper.versionFromEntry((CacheEntry)entry));
        }
    }

    private void awaitForTopology(int expectedTopologyId, Cache<?, ?> cache) {
        WriteSkewDuringStateTransferTest.eventually(() -> expectedTopologyId == this.currentTopologyId(cache));
    }

    private int currentTopologyId(Cache<?, ?> cache) {
        return cache.getAdvancedCache().getDistributionManager().getCacheTopology().getTopologyId();
    }

    private Future<Object> executeTransaction(Cache<Object, Object> cache, Object key) {
        return this.fork(() -> TestingUtil.withTx(cache.getAdvancedCache().getTransactionManager(), () -> cache.put(key, (Object)"value")));
    }

    private NewNode addNode(final int currentTopologyId) {
        final NewNode newNode = new NewNode();
        ConfigurationBuilder builder = this.configuration();
        newNode.controller = new NodeController();
        newNode.controller.interceptor = new ControlledCommandInterceptor();
        GlobalConfigurationBuilder global = GlobalConfigurationBuilder.defaultClusteredBuilder();
        global.serialization().addContextInitializer((SerializationContextInitializer)WriteSkewDuringStateTransferSCI.INSTANCE);
        TestCacheManagerFactory.addInterceptor(global, "defaultcache"::equals, (AsyncInterceptor)newNode.controller.interceptor, TestCacheManagerFactory.InterceptorPosition.FIRST, null);
        EmbeddedCacheManager embeddedCacheManager = TestCacheManagerFactory.createClusteredCacheManager(false, global, builder, new TransportFlags());
        this.registerCacheManager(new CacheContainer[]{embeddedCacheManager});
        newNode.controller.topologyManager = this.replaceTopologyManager(embeddedCacheManager);
        newNode.controller.interceptor.addAction(new Action(){

            @Override
            public boolean isApplicable(InvocationContext context, VisitableCommand command) {
                return !context.isOriginLocal() && command instanceof PrepareCommand;
            }

            @Override
            public void before(InvocationContext context, VisitableCommand command, Cache<?, ?> cache) {
                log.tracef("Before: command=%s. origin=%s", (Object)command, (Object)context.getOrigin());
                if (context.getOrigin().equals((Object)WriteSkewDuringStateTransferTest.this.address(WriteSkewDuringStateTransferTest.this.cache(1)))) {
                    try {
                        ComponentRegistry.of(cache).getStateTransferLock().waitForTopology(currentTopologyId + 2, 10L, TimeUnit.SECONDS);
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                    catch (TimeoutException e) {
                        throw Log.CLUSTER.failedWaitingForTopology(currentTopologyId + 2);
                    }
                }
            }

            @Override
            public void after(InvocationContext context, VisitableCommand command, Cache<?, ?> cache) {
                log.tracef("After: command=%s. origin=%s", (Object)command, (Object)context.getOrigin());
                if (context.getOrigin().equals((Object)WriteSkewDuringStateTransferTest.this.address(0))) {
                    newNode.commandLatch.countDown();
                }
            }
        });
        newNode.joinerFuture = this.fork(() -> {
            embeddedCacheManager.start();
            return null;
        });
        return newNode;
    }

    private ConfigurationBuilder configuration() {
        ConfigurationBuilder builder = WriteSkewDuringStateTransferTest.getDefaultClusteredCacheConfig(CacheMode.REPL_SYNC, true);
        builder.clustering().stateTransfer().fetchInMemoryState(true).hash().numSegments(1).numOwners(3).consistentHashFactory((ConsistentHashFactory)new ConsistentHashFactoryImpl());
        builder.locking().isolationLevel(IsolationLevel.REPEATABLE_READ);
        return builder;
    }

    private void assertKeyOwnership(Object key, Cache<?, ?> primaryOwner, Cache<?, ?> ... backupOwners) {
        AssertJUnit.assertTrue((String)("Wrong ownership for " + String.valueOf(key) + "."), (boolean)DistributionTestHelper.hasOwners(key, primaryOwner, backupOwners));
    }

    private BlockingLocalTopologyManager replaceTopologyManager(EmbeddedCacheManager cacheContainer) {
        BlockingLocalTopologyManager localTopologyManager = BlockingLocalTopologyManager.replaceTopologyManagerDefaultCache(cacheContainer);
        this.topologyManagerList.add(localTopologyManager);
        return localTopologyManager;
    }

    private static NodeController setNodeControllerIn(Cache<Object, Object> cache) {
        NodeController nodeController = new NodeController();
        nodeController.interceptor = new ControlledCommandInterceptor(cache);
        nodeController.topologyManager = BlockingLocalTopologyManager.replaceTopologyManagerDefaultCache(cache.getCacheManager());
        return nodeController;
    }

    private static void setInitialPhaseForNodeA(NodeController nodeA, final int currentTopology) {
        nodeA.interceptor.addAction(new Action(){

            @Override
            public boolean isApplicable(InvocationContext context, VisitableCommand command) {
                return context.isOriginLocal() && command instanceof PrepareCommand;
            }

            @Override
            public void before(InvocationContext context, VisitableCommand command, Cache<?, ?> cache) {
                try {
                    ComponentRegistry.of(cache).getStateTransferLock().waitForTopology(currentTopology + 1, 10L, TimeUnit.SECONDS);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                catch (TimeoutException e) {
                    throw Log.CLUSTER.failedWaitingForTopology(currentTopology + 1);
                }
            }

            @Override
            public void after(InvocationContext context, VisitableCommand command, Cache<?, ?> cache) {
            }
        });
    }

    private static void setInitialPhaseForNodeB(NodeController nodeB, final int currentTopology) {
        nodeB.interceptor.addAction(new Action(){

            @Override
            public boolean isApplicable(InvocationContext context, VisitableCommand command) {
                return !context.isOriginLocal() && command instanceof PrepareCommand;
            }

            @Override
            public void before(InvocationContext context, VisitableCommand command, Cache<?, ?> cache) {
                try {
                    ComponentRegistry.of(cache).getStateTransferLock().waitForTopology(currentTopology + 2, 10L, TimeUnit.SECONDS);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                catch (TimeoutException e) {
                    throw Log.CLUSTER.failedWaitingForTopology(currentTopology + 2);
                }
            }

            @Override
            public void after(InvocationContext context, VisitableCommand command, Cache<?, ?> cache) {
            }
        });
    }

    private static class NodeController {
        ControlledCommandInterceptor interceptor;
        BlockingLocalTopologyManager topologyManager;

        private NodeController() {
        }
    }

    private static class NewNode {
        Future<Void> joinerFuture;
        CountDownLatch commandLatch = new CountDownLatch(1);
        NodeController controller;

        private NewNode() {
        }
    }

    @AutoProtoSchemaBuilder(includeClasses={ConsistentHashFactoryImpl.class}, schemaFileName="test.core.WriteSkewDuringStateTransferTest.proto", schemaFilePath="proto/generated", schemaPackageName="org.infinispan.test.core.WriteSkewDuringStateTransferTest", service=false)
    public static interface WriteSkewDuringStateTransferSCI
    extends SerializationContextInitializer {
        public static final WriteSkewDuringStateTransferSCI INSTANCE = new WriteSkewDuringStateTransferSCIImpl();
    }

    public static class ControlledCommandInterceptor
    extends BaseAsyncInterceptor {
        private final List<Action> actionList = new ArrayList<Action>(3);
        private Cache<Object, Object> cache;

        public ControlledCommandInterceptor(Cache<Object, Object> cache) {
            this.cache = cache;
            this.cacheConfiguration = cache.getCacheConfiguration();
            TestingUtil.extractInterceptorChain(cache).addInterceptor((AsyncInterceptor)this, 0);
        }

        public ControlledCommandInterceptor() {
        }

        public void addAction(Action action) {
            this.actionList.add(action);
        }

        public Object visitCommand(InvocationContext ctx, VisitableCommand command) throws Throwable {
            List<Action> actions = this.extractActions(ctx, command);
            if (actions.isEmpty()) {
                return this.invokeNext(ctx, command);
            }
            for (Action action : actions) {
                action.before(ctx, command, this.cache);
            }
            return this.invokeNextThenAccept(ctx, command, (rCtx, rCommand, rv) -> {
                for (Action action : actions) {
                    action.after(ctx, command, this.cache);
                }
            });
        }

        private List<Action> extractActions(InvocationContext context, VisitableCommand command) {
            if (this.actionList.isEmpty()) {
                return Collections.emptyList();
            }
            ArrayList<Action> actions = new ArrayList<Action>(this.actionList.size());
            for (Action action : this.actionList) {
                if (!action.isApplicable(context, command)) continue;
                actions.add(action);
            }
            return actions;
        }
    }

    public static interface Action {
        public boolean isApplicable(InvocationContext var1, VisitableCommand var2);

        public void before(InvocationContext var1, VisitableCommand var2, Cache<?, ?> var3);

        public void after(InvocationContext var1, VisitableCommand var2, Cache<?, ?> var3);
    }

    @SerializeWith(value=Externalizer.class)
    public static class ConsistentHashFactoryImpl
    extends BaseControlledConsistentHashFactory.Default {
        ConsistentHashFactoryImpl() {
            super(1);
        }

        @Override
        protected final int[][] assignOwners(int numSegments, List<Address> members) {
            switch (members.size()) {
                case 1: {
                    return new int[][]{{0}};
                }
                case 2: {
                    return new int[][]{{1, 0}};
                }
            }
            return new int[][]{{members.size() - 1, 0, 1}};
        }

        public static class Externalizer
        implements org.infinispan.commons.marshall.Externalizer<ConsistentHashFactoryImpl> {
            public void writeObject(ObjectOutput output, ConsistentHashFactoryImpl object) throws IOException {
            }

            public ConsistentHashFactoryImpl readObject(ObjectInput input) throws IOException, ClassNotFoundException {
                return new ConsistentHashFactoryImpl();
            }
        }
    }
}

