/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.partitionhandling;

import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.infinispan.Cache;
import org.infinispan.commands.remote.CacheRpcCommand;
import org.infinispan.commons.TimeoutException;
import org.infinispan.distribution.MagicKey;
import org.infinispan.partitionhandling.AvailabilityMode;
import org.infinispan.partitionhandling.BasePartitionHandlingTest;
import org.infinispan.partitionhandling.impl.PartitionHandlingManager;
import org.infinispan.remoting.inboundhandler.AbstractDelegatingHandler;
import org.infinispan.remoting.inboundhandler.DeliverOrder;
import org.infinispan.remoting.inboundhandler.PerCacheInboundInvocationHandler;
import org.infinispan.remoting.inboundhandler.Reply;
import org.infinispan.test.TestingUtil;
import org.infinispan.util.concurrent.ReclosableLatch;
import org.infinispan.util.concurrent.locks.LockManager;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.testng.AssertJUnit;

public abstract class BaseTxPartitionAndMergeTest
extends BasePartitionHandlingTest {
    private static final Log log = LogFactory.getLog(BaseTxPartitionAndMergeTest.class);
    static final String INITIAL_VALUE = "init-value";
    static final String FINAL_VALUE = "final-value";

    private static NotifierFilter notifyCommandOn(Cache<?, ?> cache, Class<? extends CacheRpcCommand> blockClass) {
        NotifierFilter filter = new NotifierFilter(blockClass);
        BaseTxPartitionAndMergeTest.wrapAndApplyFilter(cache, filter);
        return filter;
    }

    private static BlockingFilter blockCommandOn(Cache<?, ?> cache, Class<? extends CacheRpcCommand> blockClass) {
        BlockingFilter filter = new BlockingFilter(blockClass);
        BaseTxPartitionAndMergeTest.wrapAndApplyFilter(cache, filter);
        return filter;
    }

    private static DiscardFilter discardCommandOn(Cache<?, ?> cache, Class<? extends CacheRpcCommand> blockClass) {
        DiscardFilter filter = new DiscardFilter(blockClass);
        BaseTxPartitionAndMergeTest.wrapAndApplyFilter(cache, filter);
        return filter;
    }

    private static void wrapAndApplyFilter(Cache<?, ?> cache, Filter filter) {
        ControlledInboundHandler controlledInboundHandler = TestingUtil.wrapInboundInvocationHandler(cache, delegate -> new ControlledInboundHandler((PerCacheInboundInvocationHandler)delegate, filter));
    }

    FilterCollection createFilters(String cacheName, boolean discard, Class<? extends CacheRpcCommand> commandClass, SplitMode splitMode) {
        ArrayList<AwaitAndUnblock> collection = new ArrayList<AwaitAndUnblock>(2);
        if (splitMode == SplitMode.ORIGINATOR_ISOLATED) {
            if (discard) {
                collection.add(BaseTxPartitionAndMergeTest.discardCommandOn(this.cache(1, cacheName), commandClass));
                collection.add(BaseTxPartitionAndMergeTest.discardCommandOn(this.cache(2, cacheName), commandClass));
            } else {
                collection.add(BaseTxPartitionAndMergeTest.blockCommandOn(this.cache(1, cacheName), commandClass));
                collection.add(BaseTxPartitionAndMergeTest.blockCommandOn(this.cache(2, cacheName), commandClass));
            }
        } else {
            collection.add(BaseTxPartitionAndMergeTest.notifyCommandOn(this.cache(1, cacheName), commandClass));
            if (discard) {
                collection.add(BaseTxPartitionAndMergeTest.discardCommandOn(this.cache(2, cacheName), commandClass));
            } else {
                collection.add(BaseTxPartitionAndMergeTest.blockCommandOn(this.cache(2, cacheName), commandClass));
            }
        }
        return new FilterCollection(collection);
    }

    protected abstract Log getLog();

    void mergeCluster(String cacheName) {
        this.getLog().debugf("Merging cluster", new Object[0]);
        this.partition(0).merge(this.partition(1));
        TestingUtil.waitForNoRebalance(this.caches(cacheName));
        for (int i = 0; i < this.numMembersInCluster; ++i) {
            PartitionHandlingManager phmI = this.partitionHandlingManager(this.cache(i, cacheName));
            this.eventuallyEquals(AvailabilityMode.AVAILABLE, () -> ((PartitionHandlingManager)phmI).getAvailabilityMode());
        }
        this.getLog().debugf("Cluster merged", new Object[0]);
    }

    void finalAsserts(String cacheName, KeyInfo keyInfo, String value) {
        this.assertNoTransactions(cacheName);
        this.assertNoTransactionsInPartitionHandler(cacheName);
        this.assertNoLocks(cacheName);
        this.assertValue(keyInfo.getKey1(), value, this.caches(cacheName));
        this.assertValue(keyInfo.getKey2(), value, this.caches(cacheName));
    }

    protected void assertNoLocks(String cacheName) {
        this.eventually("Expected no locks acquired in all nodes", () -> {
            for (Cache cache : this.caches(cacheName)) {
                LockManager lockManager = TestingUtil.extractLockManager(cache);
                this.getLog().tracef("Locks info=%s", (Object)lockManager.printLockInfo());
                if (lockManager.getNumberOfLocksHeld() == 0) continue;
                this.getLog().warnf("Locks acquired on cache '%s'", cache);
                return false;
            }
            return true;
        }, 30000L, TimeUnit.MILLISECONDS);
    }

    protected void assertValue(Object key, String value, Collection<Cache<Object, String>> caches) {
        for (Cache<Object, String> cache : caches) {
            AssertJUnit.assertEquals((String)("Wrong value in cache " + String.valueOf(this.address(cache))), (String)value, (String)((String)cache.get(key)));
        }
    }

    KeyInfo createKeys(String cacheName) {
        MagicKey key1 = new MagicKey("k1", this.cache(1, cacheName), this.cache(2, cacheName));
        MagicKey key2 = new MagicKey("k2", this.cache(2, cacheName), this.cache(1, cacheName));
        this.cache(1, cacheName).put((Object)key1, (Object)INITIAL_VALUE);
        this.cache(2, cacheName).put((Object)key2, (Object)INITIAL_VALUE);
        return new KeyInfo(key1, key2);
    }

    private void assertNoTransactionsInPartitionHandler(String cacheName) {
        this.eventually("Transactions pending in PartitionHandlingManager", () -> {
            for (Cache cache : this.caches(cacheName)) {
                Collection partialTransactions = TestingUtil.extractComponent(cache, PartitionHandlingManager.class).getPartialTransactions();
                if (partialTransactions.isEmpty()) continue;
                this.getLog().debugf("transactions not finished in %s. %s", (Object)this.address(cache), (Object)partialTransactions);
                return false;
            }
            return true;
        });
    }

    private static class NotifierFilter
    implements Filter {
        private final Class<? extends CacheRpcCommand> aClass;
        private final CountDownLatch notifier;

        private NotifierFilter(Class<? extends CacheRpcCommand> aClass) {
            this.aClass = aClass;
            this.notifier = new CountDownLatch(1);
        }

        @Override
        public boolean before(CacheRpcCommand command, Reply reply, DeliverOrder order) {
            log.tracef("[Notifier] Checking command %s.", (Object)command);
            if (this.aClass.isAssignableFrom(command.getClass())) {
                log.tracef("[Notifier] Notifying command %s.", (Object)command);
                this.notifier.countDown();
            }
            return true;
        }

        @Override
        public void await(long timeout, TimeUnit timeUnit) throws InterruptedException {
            if (!this.notifier.await(timeout, timeUnit)) {
                throw new TimeoutException();
            }
        }

        @Override
        public void unblock() {
        }
    }

    private static interface Filter
    extends AwaitAndUnblock {
        public boolean before(CacheRpcCommand var1, Reply var2, DeliverOrder var3);
    }

    private static class BlockingFilter
    implements Filter {
        private final Class<? extends CacheRpcCommand> aClass;
        private final ReclosableLatch notifier;
        private final ReclosableLatch blocker;

        private BlockingFilter(Class<? extends CacheRpcCommand> aClass) {
            this.aClass = aClass;
            this.blocker = new ReclosableLatch(false);
            this.notifier = new ReclosableLatch(false);
        }

        @Override
        public boolean before(CacheRpcCommand command, Reply reply, DeliverOrder order) {
            log.tracef("[Blocking] Checking command %s.", (Object)command);
            if (this.aClass.isAssignableFrom(command.getClass())) {
                log.tracef("[Blocking] Blocking command %s", (Object)command);
                this.notifier.open();
                try {
                    this.blocker.await(30L, TimeUnit.SECONDS);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                log.tracef("[Blocking] Unblocking command %s", (Object)command);
            }
            return true;
        }

        @Override
        public void await(long timeout, TimeUnit timeUnit) throws InterruptedException {
            if (!this.notifier.await(timeout, timeUnit)) {
                throw new TimeoutException();
            }
        }

        @Override
        public void unblock() {
            this.blocker.open();
        }
    }

    private static class DiscardFilter
    implements Filter {
        private final Class<? extends CacheRpcCommand> aClass;
        private final ReclosableLatch notifier;
        private volatile boolean discard;

        private DiscardFilter(Class<? extends CacheRpcCommand> aClass) {
            this.aClass = aClass;
            this.notifier = new ReclosableLatch(false);
            this.discard = true;
        }

        @Override
        public boolean before(CacheRpcCommand command, Reply reply, DeliverOrder order) {
            log.tracef("[Discard] Checking command %s. (discard enabled=%s)", (Object)command, (Object)this.discard);
            if (this.discard && this.aClass.isAssignableFrom(command.getClass())) {
                log.tracef("[Discard] Discarding command %s.", (Object)command);
                this.notifier.open();
                return false;
            }
            return true;
        }

        @Override
        public void await(long timeout, TimeUnit timeUnit) throws InterruptedException {
            if (!this.notifier.await(timeout, timeUnit)) {
                throw new TimeoutException();
            }
        }

        @Override
        public void unblock() {
        }

        private void stopDiscard() {
            this.discard = false;
        }
    }

    private static class ControlledInboundHandler
    extends AbstractDelegatingHandler {
        private final Filter filter;

        private ControlledInboundHandler(PerCacheInboundInvocationHandler delegate, Filter filter) {
            super(delegate);
            this.filter = filter;
        }

        public void handle(CacheRpcCommand command, Reply reply, DeliverOrder order) {
            Filter currentFilter = this.filter;
            if (currentFilter != null && currentFilter.before(command, reply, order)) {
                this.delegate.handle(command, reply, order);
            } else {
                log.debugf("Ignoring command %s", (Object)command);
            }
        }
    }

    /*
     * Uses 'sealed' constructs - enablewith --sealed true
     */
    protected static enum SplitMode {
        ORIGINATOR_ISOLATED{

            @Override
            public void split(BaseTxPartitionAndMergeTest test) {
                test.getLog().debug((Object)"Splitting cluster isolating the originator.");
                test.splitCluster({0}, {1, 2, 3});
                SplitMode.assertDegradedPartition(test, 0);
                TestingUtil.waitForNoRebalance(test.cache(1), test.cache(2), test.cache(3));
                test.getLog().debug((Object)"Cluster split.");
            }
        }
        ,
        BOTH_DEGRADED{

            @Override
            public void split(BaseTxPartitionAndMergeTest test) {
                test.getLog().debug((Object)"Splitting cluster in equal partition");
                test.splitCluster({0, 1}, {2, 3});
                SplitMode.assertDegradedPartition(test, 0, 1);
                test.getLog().debug((Object)"Cluster split.");
            }
        }
        ,
        PRIMARY_OWNER_ISOLATED{

            @Override
            public void split(BaseTxPartitionAndMergeTest test) {
                test.getLog().debug((Object)"Splitting cluster isolating a primary owner.");
                test.splitCluster({2}, {0, 1, 3});
                SplitMode.assertDegradedPartition(test, 0);
                TestingUtil.waitForNoRebalance(test.cache(0), test.cache(1), test.cache(3));
                test.getLog().debug((Object)"Cluster split.");
            }
        };


        public abstract void split(BaseTxPartitionAndMergeTest var1);

        private static void assertDegradedPartition(BaseTxPartitionAndMergeTest test, int ... partitionIndexes) {
            for (int i = 0; i < partitionIndexes.length; ++i) {
                test.partition(i).assertDegradedMode();
            }
        }
    }

    protected static class FilterCollection
    implements AwaitAndUnblock {
        private final Collection<AwaitAndUnblock> collection;

        FilterCollection(Collection<AwaitAndUnblock> collection) {
            this.collection = collection;
        }

        @Override
        public void await(long timeout, TimeUnit timeUnit) throws InterruptedException {
            for (AwaitAndUnblock await : this.collection) {
                await.await(timeout, timeUnit);
            }
        }

        @Override
        public void unblock() {
            this.collection.forEach(AwaitAndUnblock::unblock);
        }

        public void stopDiscard() {
            this.collection.stream().filter(DiscardFilter.class::isInstance).map(DiscardFilter.class::cast).forEach(DiscardFilter::stopDiscard);
        }
    }

    protected static class KeyInfo {
        private final Object key1;
        private final Object key2;

        KeyInfo(Object key1, Object key2) {
            this.key1 = key1;
            this.key2 = key2;
        }

        void putFinalValue(Cache<Object, String> cache) {
            cache.put(this.key1, (Object)BaseTxPartitionAndMergeTest.FINAL_VALUE);
            cache.put(this.key2, (Object)BaseTxPartitionAndMergeTest.FINAL_VALUE);
        }

        public Object getKey1() {
            return this.key1;
        }

        public Object getKey2() {
            return this.key2;
        }
    }

    private static interface AwaitAndUnblock {
        public void await(long var1, TimeUnit var3) throws InterruptedException;

        public void unblock();
    }
}

