/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.stream;

import java.io.Serializable;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.infinispan.Cache;
import org.infinispan.commons.util.EnumUtil;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.IntSets;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.context.Flag;
import org.infinispan.distribution.MagicKey;
import org.infinispan.distribution.ch.ConsistentHashFactory;
import org.infinispan.reactive.publisher.impl.DeliveryGuarantee;
import org.infinispan.reactive.publisher.impl.LocalPublisherManager;
import org.infinispan.reactive.publisher.impl.SegmentAwarePublisherSupplier;
import org.infinispan.test.Mocks;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestDataSCI;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.CheckPoint;
import org.infinispan.test.fwk.InCacheMode;
import org.infinispan.util.ControlledConsistentHashFactory;
import org.infinispan.util.function.SerializableSupplier;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@Test(groups={"functional"}, testName="streams.DistributedStreamRehashTest")
@InCacheMode(value={CacheMode.DIST_SYNC})
public class DistributedStreamRehashTest
extends MultipleCacheManagersTest {
    protected final String CACHE_NAME = "rehashStreamCache";
    private ControlledConsistentHashFactory consistentHashFactory;

    @Override
    protected void createCacheManagers() throws Throwable {
        this.consistentHashFactory = new ControlledConsistentHashFactory.Default(new int[][]{{0, 1}, {1, 2}, {2, 3}, {3, 0}});
        ConfigurationBuilder builderUsed = new ConfigurationBuilder();
        builderUsed.clustering().cacheMode(this.cacheMode);
        if (this.cacheMode == CacheMode.DIST_SYNC) {
            builderUsed.clustering().clustering().hash().numOwners(2).numSegments(4).consistentHashFactory((ConsistentHashFactory)this.consistentHashFactory);
        }
        this.createClusteredCaches(4, "rehashStreamCache", TestDataSCI.INSTANCE, builderUsed);
    }

    public void testNodeFailureDuringProcessingForCollect() throws InterruptedException, TimeoutException, ExecutionException {
        for (Cache cache : this.caches("rehashStreamCache")) {
            MagicKey key = new MagicKey(cache);
            cache.put((Object)key, (Object)key.toString());
        }
        Cache originator = this.cache(0, "rehashStreamCache");
        Cache nodeToBlockBeforeProcessing = this.cache(1, "rehashStreamCache");
        Cache nodeToStop = this.cache(3, "rehashStreamCache");
        CheckPoint checkPoint = new CheckPoint();
        checkPoint.triggerForever("before_release");
        LocalPublisherManager lpm = TestingUtil.extractComponent(nodeToBlockBeforeProcessing, LocalPublisherManager.class);
        LocalPublisherManager spy = (LocalPublisherManager)Mockito.spy((Object)lpm);
        Answer blockingLpmAnswer = invocation -> {
            SegmentAwarePublisherSupplier result = (SegmentAwarePublisherSupplier)invocation.callRealMethod();
            return Mocks.blockingPublisherAware(result, checkPoint);
        };
        ((LocalPublisherManager)Mockito.doAnswer((Answer)blockingLpmAnswer).when((Object)spy)).entryPublisher((IntSet)ArgumentMatchers.eq((Object)IntSets.immutableSet((int)1)), (Set)ArgumentMatchers.any(), (Set)ArgumentMatchers.any(), ArgumentMatchers.eq((long)EnumUtil.bitSetOf((Enum)Flag.STATE_TRANSFER_PROGRESS)), (DeliveryGuarantee)ArgumentMatchers.any(), (Function)ArgumentMatchers.any());
        TestingUtil.replaceComponent(nodeToBlockBeforeProcessing, LocalPublisherManager.class, spy, true);
        Future<List> future = this.fork(() -> (List)originator.entrySet().stream().collect((SerializableSupplier & Serializable)() -> Collectors.toList()));
        this.consistentHashFactory.setOwnerIndexes(new int[][]{{0, 1}, {0, 2}, {2, 1}, {1, 0}});
        this.cacheManagers.remove(this.cacheManagers.size() - 1);
        nodeToStop.getCacheManager().stop();
        TestingUtil.blockUntilViewsReceived((int)TimeUnit.SECONDS.toMillis(10L), false, this.caches("rehashStreamCache"));
        Future<Void> rebalanceFuture = this.fork(() -> TestingUtil.waitForNoRebalance(this.caches("rehashStreamCache")));
        checkPoint.awaitStrict("after_invocation", 10L, TimeUnit.SECONDS);
        checkPoint.triggerForever("after_release");
        rebalanceFuture.get(10L, TimeUnit.SECONDS);
        List list = future.get(10L, TimeUnit.SECONDS);
        AssertJUnit.assertEquals((int)(this.cacheManagers.size() + 1), (int)list.size());
    }
}

