/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.functional;

import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.infinispan.AdvancedCache;
import org.infinispan.commands.functional.ReadWriteKeyCommand;
import org.infinispan.distribution.BlockingInterceptor;
import org.infinispan.functional.AbstractFunctionalTest;
import org.infinispan.functional.FunctionalMap;
import org.infinispan.functional.MetaParam;
import org.infinispan.functional.impl.FunctionalMapImpl;
import org.infinispan.functional.impl.ReadWriteMapImpl;
import org.infinispan.interceptors.impl.EntryWrappingInterceptor;
import org.infinispan.test.TestingUtil;
import org.infinispan.util.function.SerializableFunction;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups={"functional"}, testName="functional.FunctionalDistributionTest")
public class FunctionalDistributionTest
extends AbstractFunctionalTest {
    public FunctionalDistributionTest() {
        this.numNodes = 4;
        this.numDistOwners = 2;
        this.isSync = false;
    }

    @Override
    @BeforeClass
    public void createBeforeClass() throws Throwable {
        super.createBeforeClass();
    }

    public void testDistributionFromPrimaryOwner() throws Exception {
        String key = "testDistributionFromPrimaryOwner";
        this.doTestDistribution(key, (AdvancedCache<Object, Integer>)this.cacheManagers.stream().map(cm -> cm.getCache("dist").getAdvancedCache()).filter(cache -> cache.getDistributionManager().getCacheTopology().getDistribution(key).isPrimary()).findAny().get());
    }

    public void testDistributionFromSecondaryOwner() throws Exception {
        String key = "testDistributionFromSecondaryOwner";
        this.doTestDistribution(key, (AdvancedCache<Object, Integer>)this.cacheManagers.stream().map(cm -> cm.getCache("dist").getAdvancedCache()).filter(cache -> cache.getDistributionManager().getCacheTopology().getDistribution(key).isWriteBackup()).findAny().get());
    }

    public void testDistributionFromNonOwner() throws Exception {
        String key = "testDistributionFromNonOwner";
        this.doTestDistribution(key, (AdvancedCache<Object, Integer>)this.cacheManagers.stream().map(cm -> cm.getCache("dist").getAdvancedCache()).filter(cache -> !cache.getDistributionManager().getCacheTopology().isWriteOwner(key)).findAny().get());
    }

    private void doTestDistribution(Object key, AdvancedCache<Object, Integer> originator) throws Exception {
        FunctionalMap.ReadWriteMap rw = ReadWriteMapImpl.create((FunctionalMapImpl)FunctionalMapImpl.create(originator));
        this.iterate(key, (FunctionalMap.ReadWriteMap<Object, Integer>)rw, 1);
        this.iterate(key, (FunctionalMap.ReadWriteMap<Object, Integer>)rw, 2);
    }

    private void iterate(Object key, FunctionalMap.ReadWriteMap<Object, Integer> rw, int expectedValue) throws Exception {
        List owners = this.cacheManagers.stream().map(cm -> cm.getCache("dist").getAdvancedCache()).filter(cache -> cache.getDistributionManager().getCacheTopology().isWriteOwner(key)).collect(Collectors.toList());
        CyclicBarrier barrier = new CyclicBarrier(this.numDistOwners + 1);
        for (AdvancedCache cache2 : owners) {
            BlockingInterceptor<ReadWriteKeyCommand> bi = new BlockingInterceptor<ReadWriteKeyCommand>(barrier, ReadWriteKeyCommand.class, true, false);
            TestingUtil.extractInterceptorChain(cache2).addInterceptorBefore(bi, EntryWrappingInterceptor.class);
        }
        Future<Void> f = this.fork(() -> (Void)rw.eval(key, (SerializableFunction & Serializable)entry -> entry.set((Object)(entry.find().orElse(0) + 1), new MetaParam.Writable[0])).join());
        barrier.await(10L, TimeUnit.SECONDS);
        for (AdvancedCache cache3 : owners) {
            ((BlockingInterceptor)TestingUtil.extractInterceptorChain(cache3).findInterceptorWithClass(BlockingInterceptor.class)).suspend(true);
        }
        barrier.await(10L, TimeUnit.SECONDS);
        for (AdvancedCache cache3 : owners) {
            TestingUtil.extractInterceptorChain(cache3).removeInterceptor(BlockingInterceptor.class);
        }
        f.get(10L, TimeUnit.SECONDS);
        Assert.assertEquals((Collection)owners.stream().map(cache -> cache.getDataContainer().get(key).getValue()).collect(Collectors.toList()), Collections.nCopies(this.numDistOwners, expectedValue));
    }
}

