/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.statetransfer;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.infinispan.Cache;
import org.infinispan.commands.VisitableCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.commons.tx.lookup.TransactionManagerLookup;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.container.DataContainer;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.FlagBitSets;
import org.infinispan.distribution.LocalizedCacheTopology;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.interceptors.AsyncInterceptor;
import org.infinispan.interceptors.BaseAsyncInterceptor;
import org.infinispan.interceptors.impl.InvocationContextInterceptor;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.fwk.CleanupAfterMethod;
import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.infinispan.transaction.LockingMode;
import org.infinispan.transaction.TransactionMode;
import org.infinispan.transaction.lookup.EmbeddedTransactionManagerLookup;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@Test(groups={"functional"}, testName="statetransfer.InitialStateTransferCompletionTest")
@CleanupAfterMethod
public class InitialStateTransferCompletionTest
extends MultipleCacheManagersTest {
    private ConfigurationBuilder cacheConfigBuilder;

    @Override
    protected void createCacheManagers() throws Throwable {
        this.cacheConfigBuilder = InitialStateTransferCompletionTest.getDefaultClusteredCacheConfig(CacheMode.DIST_SYNC, true, true);
        this.cacheConfigBuilder.transaction().transactionMode(TransactionMode.TRANSACTIONAL).transactionManagerLookup((TransactionManagerLookup)new EmbeddedTransactionManagerLookup()).lockingMode(LockingMode.PESSIMISTIC).clustering().hash().numOwners(10).stateTransfer().fetchInMemoryState(true).awaitInitialTransfer(true);
        this.createCluster(this.cacheConfigBuilder, 2);
        this.waitForClusterToForm();
    }

    public void testStateTransferCompletion() throws Exception {
        int numKeys = 100;
        Cache cache0 = this.cache(0);
        for (int i = 0; i < 100; ++i) {
            cache0.put((Object)("k" + i), (Object)("v" + i));
        }
        AtomicBoolean ignoreFurtherStateTransfer = new AtomicBoolean();
        AtomicInteger transferredKeys = new AtomicInteger();
        GlobalConfigurationBuilder global = GlobalConfigurationBuilder.defaultClusteredBuilder();
        TestCacheManagerFactory.addInterceptor(global, "defaultcache"::equals, (AsyncInterceptor)new CountInterceptor(ignoreFurtherStateTransfer, transferredKeys), TestCacheManagerFactory.InterceptorPosition.BEFORE, InvocationContextInterceptor.class);
        log.trace((Object)"Adding new member ...");
        this.addClusterEnabledCacheManager(global, this.cacheConfigBuilder);
        Cache cache2 = this.cache(2);
        ignoreFurtherStateTransfer.set(true);
        log.trace((Object)"Successfully added a new member");
        int actualTransferredKeys = transferredKeys.get();
        AssertJUnit.assertEquals((int)100, (int)actualTransferredKeys);
        LocalizedCacheTopology cacheTopology = cache2.getAdvancedCache().getDistributionManager().getCacheTopology();
        AssertJUnit.assertNull((Object)cacheTopology.getPendingCH());
        ConsistentHash readCh = cacheTopology.getReadConsistentHash();
        AssertJUnit.assertTrue((boolean)readCh.getMembers().contains(this.address(2)));
        DataContainer dc2 = this.cache(2).getAdvancedCache().getDataContainer();
        AssertJUnit.assertEquals((int)100, (int)dc2.size());
        for (int i = 0; i < 100; ++i) {
            String key = "k" + i;
            String expectedValue = "v" + i;
            AssertJUnit.assertTrue((boolean)cacheTopology.isReadOwner((Object)key));
            InternalCacheEntry entry = dc2.get((Object)key);
            AssertJUnit.assertNotNull((Object)entry);
            AssertJUnit.assertEquals((Object)expectedValue, (Object)entry.getValue());
        }
    }

    static class CountInterceptor
    extends BaseAsyncInterceptor {
        private final AtomicBoolean ignoreFurtherStateTransfer;
        private final AtomicInteger transferredKeys;

        public CountInterceptor(AtomicBoolean ignoreFurtherStateTransfer, AtomicInteger transferredKeys) {
            this.ignoreFurtherStateTransfer = ignoreFurtherStateTransfer;
            this.transferredKeys = transferredKeys;
        }

        public Object visitCommand(InvocationContext ctx, VisitableCommand cmd) throws Throwable {
            if (cmd instanceof PutKeyValueCommand && ((PutKeyValueCommand)cmd).hasAnyFlag(FlagBitSets.PUT_FOR_STATE_TRANSFER)) {
                if (this.ignoreFurtherStateTransfer.get()) {
                    return null;
                }
                return this.invokeNextThenAccept(ctx, cmd, (rCtx, rCommand, rv) -> this.transferredKeys.incrementAndGet());
            }
            return this.invokeNext(ctx, cmd);
        }
    }
}

