/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.xsite.irac;

import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.infinispan.commands.VisitableCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.configuration.cache.BackupConfiguration;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.FlagBitSets;
import org.infinispan.distribution.LocalizedCacheTopology;
import org.infinispan.interceptors.AsyncInterceptor;
import org.infinispan.interceptors.AsyncInterceptorChain;
import org.infinispan.interceptors.DDAsyncInterceptor;
import org.infinispan.interceptors.impl.NonTxIracLocalSiteInterceptor;
import org.infinispan.test.TestingUtil;
import org.infinispan.util.ControlledConsistentHashFactory;
import org.infinispan.xsite.AbstractMultipleSitesTest;
import org.testng.AssertJUnit;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups={"functional"}, testName="xsite.irac.IracOwnershipChangeTest")
public class IracOwnershipChangeTest
extends AbstractMultipleSitesTest {
    private final ControlledConsistentHashFactory<?> site0CHFactory = new ControlledConsistentHashFactory.Default(0, 1);
    private final ControlledConsistentHashFactory<?> site1CHFactory = new ControlledConsistentHashFactory.Default(0, 1);

    @Override
    protected int defaultNumberOfSites() {
        return 2;
    }

    @Override
    protected int defaultNumberOfNodes() {
        return 3;
    }

    @Override
    protected ConfigurationBuilder defaultConfigurationForSite(int siteIndex) {
        ConfigurationBuilder builder = IracOwnershipChangeTest.getDefaultClusteredCacheConfig(CacheMode.DIST_SYNC);
        builder.clustering().hash().numSegments(1).numOwners(2).consistentHashFactory(siteIndex == 0 ? this.site0CHFactory : this.site1CHFactory);
        builder.sites().addBackup().site(this.siteName(siteIndex == 0 ? 1 : 0)).strategy(BackupConfiguration.BackupStrategy.ASYNC);
        return builder;
    }

    @Override
    @BeforeMethod(alwaysRun=true)
    public void createBeforeMethod() {
        super.createBeforeMethod();
        this.site0CHFactory.setOwnerIndexes(0, 1);
        this.site1CHFactory.setOwnerIndexes(0, 1);
        this.site0CHFactory.triggerRebalance(this.cache(0, 0));
        this.site1CHFactory.triggerRebalance(this.cache(1, 0));
        this.site(0).waitForClusterToForm(null);
        this.site(1).waitForClusterToForm(null);
    }

    public void testPrimaryOwnerLosesOwnership() throws InterruptedException, ExecutionException, TimeoutException {
        String key = "key-1";
        String value = "primary-loses-ownership";
        this.assertOwnership(key, 0, 1, 2);
        BlockingInterceptor interceptor = this.blockingInterceptor(0, 0);
        CommandBlocker blocker = interceptor.blockCommand();
        CompletableFuture stage = this.cache(0, 0).putAsync((Object)key, (Object)value);
        AssertJUnit.assertTrue((boolean)blocker.blocked.await(10L, TimeUnit.SECONDS));
        this.site0CHFactory.setOwnerIndexes(1, 2);
        this.site0CHFactory.triggerRebalance(this.cache(0, 0));
        this.site(0).waitForClusterToForm(null);
        this.assertOwnership(key, 1, 2, 0);
        blocker.release();
        stage.get(10L, TimeUnit.SECONDS);
        this.eventuallyAssertInAllSitesAndCaches(cache -> value.equals(cache.get((Object)key)));
    }

    public void testBackupOwnerLosesOwnership() throws InterruptedException, ExecutionException, TimeoutException {
        String key = "key-2";
        String value = "backup-loses-ownership";
        this.assertOwnership(key, 0, 1, 2);
        BlockingInterceptor interceptor = this.blockingInterceptor(0, 1);
        CommandBlocker blocker = interceptor.blockCommand();
        CompletableFuture stage = this.cache(0, 1).putAsync((Object)key, (Object)value);
        AssertJUnit.assertTrue((boolean)blocker.blocked.await(10L, TimeUnit.SECONDS));
        this.site0CHFactory.setOwnerIndexes(0, 2);
        this.site0CHFactory.triggerRebalance(this.cache(0, 0));
        this.site(0).waitForClusterToForm(null);
        this.assertOwnership(key, 0, 2, 1);
        blocker.release();
        stage.get(10L, TimeUnit.SECONDS);
        this.eventuallyAssertInAllSitesAndCaches(cache -> value.equals(cache.get((Object)key)));
    }

    public void testPrimaryChangesOwnershipWithBackup() throws InterruptedException, ExecutionException, TimeoutException {
        String key = "key-3";
        String value = "primary-backup-swap";
        this.assertOwnership(key, 0, 1, 2);
        BlockingInterceptor interceptor = this.blockingInterceptor(0, 0);
        CommandBlocker blocker = interceptor.blockCommand();
        CompletableFuture stage = this.cache(0, 0).putAsync((Object)key, (Object)value);
        AssertJUnit.assertTrue((boolean)blocker.blocked.await(10L, TimeUnit.SECONDS));
        this.site0CHFactory.setOwnerIndexes(1, 0);
        this.site0CHFactory.triggerRebalance(this.cache(0, 0));
        this.site(0).waitForClusterToForm(null);
        this.assertOwnership(key, 1, 0, 2);
        blocker.release();
        stage.get(10L, TimeUnit.SECONDS);
        this.eventuallyAssertInAllSitesAndCaches(cache -> value.equals(cache.get((Object)key)));
    }

    public void testNonOwnerBecomesBackup() throws InterruptedException, ExecutionException, TimeoutException {
        String key = "key-4";
        String value = "non-owner-to-backup";
        this.assertOwnership(key, 0, 1, 2);
        BlockingInterceptor interceptor = this.blockingInterceptor(0, 2);
        CommandBlocker blocker = interceptor.blockCommand();
        CompletableFuture stage = this.cache(0, 2).putAsync((Object)key, (Object)value);
        AssertJUnit.assertTrue((boolean)blocker.blocked.await(10L, TimeUnit.SECONDS));
        this.site0CHFactory.setOwnerIndexes(0, 2);
        this.site0CHFactory.triggerRebalance(this.cache(0, 0));
        this.site(0).waitForClusterToForm(null);
        this.assertOwnership(key, 0, 2, 1);
        blocker.release();
        stage.get(10L, TimeUnit.SECONDS);
        this.eventuallyAssertInAllSitesAndCaches(cache -> value.equals(cache.get((Object)key)));
    }

    public void testNonOwnerBecomesPrimary() throws InterruptedException, ExecutionException, TimeoutException {
        String key = "key-5";
        String value = "non-owner-to-backup";
        this.assertOwnership(key, 0, 1, 2);
        BlockingInterceptor interceptor = this.blockingInterceptor(0, 2);
        CommandBlocker blocker = interceptor.blockCommand();
        CompletableFuture stage = this.cache(0, 2).putAsync((Object)key, (Object)value);
        AssertJUnit.assertTrue((boolean)blocker.blocked.await(10L, TimeUnit.SECONDS));
        this.site0CHFactory.setOwnerIndexes(2, 1);
        this.site0CHFactory.triggerRebalance(this.cache(0, 0));
        this.site(0).waitForClusterToForm(null);
        this.assertOwnership(key, 2, 1, 0);
        blocker.release();
        stage.get(10L, TimeUnit.SECONDS);
        this.eventuallyAssertInAllSitesAndCaches(cache -> value.equals(cache.get((Object)key)));
    }

    private LocalizedCacheTopology cacheTopology(int site, int index) {
        return TestingUtil.extractCacheTopology(this.cache(site, index));
    }

    private BlockingInterceptor blockingInterceptor(int site, int index) {
        AsyncInterceptorChain interceptorChain = TestingUtil.extractInterceptorChain(this.cache(site, index));
        BlockingInterceptor interceptor = (BlockingInterceptor)interceptorChain.findInterceptorExtending(BlockingInterceptor.class);
        if (interceptor != null) {
            return interceptor;
        }
        interceptor = new BlockingInterceptor();
        AssertJUnit.assertTrue((boolean)interceptorChain.addInterceptorAfter((AsyncInterceptor)interceptor, NonTxIracLocalSiteInterceptor.class));
        return interceptor;
    }

    private void assertOwnership(String key, int primary, int backup, int nonOwner) {
        AssertJUnit.assertTrue((boolean)this.cacheTopology(0, primary).getDistribution((Object)key).isPrimary());
        AssertJUnit.assertTrue((boolean)this.cacheTopology(0, backup).getDistribution((Object)key).isWriteBackup());
        AssertJUnit.assertFalse((boolean)this.cacheTopology(0, nonOwner).getDistribution((Object)key).isWriteOwner());
    }

    public static class BlockingInterceptor
    extends DDAsyncInterceptor {
        volatile CommandBlocker afterCompleted;

        public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable {
            CommandBlocker blocker = this.afterCompleted;
            if (blocker == null || blocker.delay.isDone() || command.hasAnyFlag(FlagBitSets.PUT_FOR_STATE_TRANSFER)) {
                log.tracef("Skipping command %s", (Object)command);
                return this.invokeNext(ctx, (VisitableCommand)command);
            }
            return this.invokeNextAndHandle(ctx, (VisitableCommand)command, (rCtx, rCommand, rv, throwable) -> BlockingInterceptor.delayedValue(blocker.notifyBlocked(rCommand), (Object)rv, (Throwable)throwable));
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        CommandBlocker blockCommand() {
            CommandBlocker existing;
            CommandBlocker newBlocker = new CommandBlocker(new CountDownLatch(1), new CompletableFuture<Void>());
            BlockingInterceptor blockingInterceptor = this;
            synchronized (blockingInterceptor) {
                existing = this.afterCompleted;
                this.afterCompleted = newBlocker;
            }
            if (existing != null) {
                existing.release();
            }
            return newBlocker;
        }
    }

    private static class CommandBlocker {
        final CountDownLatch blocked;
        final CompletableFuture<Void> delay;

        CommandBlocker(CountDownLatch blocked, CompletableFuture<Void> delay) {
            this.blocked = Objects.requireNonNull(blocked);
            this.delay = Objects.requireNonNull(delay);
        }

        CompletableFuture<Void> notifyBlocked(Object command) {
            log.tracef("Blocking command %s", command);
            this.blocked.countDown();
            return this.delay.thenRun(() -> log.tracef("Unblocking command %s", command));
        }

        void release() {
            this.blocked.countDown();
            this.delay.complete(null);
        }
    }
}

