/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.notifications.cachelistener.cluster;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.infinispan.Cache;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.distribution.BlockingInterceptor;
import org.infinispan.distribution.MagicKey;
import org.infinispan.interceptors.distribution.TriangleDistributionInterceptor;
import org.infinispan.notifications.cachelistener.cluster.AbstractClusterListenerNonTxTest;
import org.infinispan.notifications.cachelistener.cluster.AbstractClusterListenerUtilTest;
import org.infinispan.notifications.cachelistener.event.CacheEntryEvent;
import org.infinispan.remoting.transport.Address;
import org.infinispan.test.TestingUtil;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@Test(groups={"functional"}, testName="notifications.cachelistener.cluster.ClusterListenerDistTest")
public class ClusterListenerDistTest
extends AbstractClusterListenerNonTxTest {
    public ClusterListenerDistTest() {
        super(false, CacheMode.DIST_SYNC);
    }

    @Test
    public void testPrimaryOwnerGoesDownBeforeSendingEvent() throws InterruptedException, TimeoutException, ExecutionException, BrokenBarrierException {
        Cache cache0 = this.cache(0, "cluster-listener");
        Cache cache1 = this.cache(1, "cluster-listener");
        Cache cache2 = this.cache(2, "cluster-listener");
        AbstractClusterListenerUtilTest.ClusterListener clusterListener = new AbstractClusterListenerUtilTest.ClusterListener();
        cache0.addListener((Object)clusterListener);
        CyclicBarrier barrier = new CyclicBarrier(2);
        BlockingInterceptor<PutKeyValueCommand> blockingInterceptor = new BlockingInterceptor<PutKeyValueCommand>(barrier, PutKeyValueCommand.class, true, false);
        TestingUtil.extractInterceptorChain(cache1).addInterceptorBefore(blockingInterceptor, TriangleDistributionInterceptor.class);
        MagicKey key = new MagicKey(cache1, cache2);
        Future<String> future = this.fork(() -> (String)cache0.put((Object)key, (Object)"first-value"));
        barrier.await(10L, TimeUnit.SECONDS);
        this.awaitForBackups(cache0);
        TestingUtil.killCacheManagers(cache1.getCacheManager());
        AssertJUnit.assertEquals((String)future.get(10L, TimeUnit.SECONDS), (String)"first-value");
        TestingUtil.waitForNoRebalance(cache0, cache2);
        AssertJUnit.assertTrue((String)("Expected 1 - 5 events, but received " + String.valueOf(clusterListener.events)), (!clusterListener.events.isEmpty() && clusterListener.events.size() <= 5 ? 1 : 0) != 0);
        Address cache0primary = cache0.getAdvancedCache().getDistributionManager().getCacheTopology().getDistribution((Object)key).primary();
        Address cache2primary = cache2.getAdvancedCache().getDistributionManager().getCacheTopology().getDistribution((Object)key).primary();
        AssertJUnit.assertEquals((Object)cache0primary, (Object)cache2primary);
        clusterListener.events.forEach(e -> this.checkEvent((CacheEntryEvent<Object, String>)e, key, false, true));
    }
}

