/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.stream;

import io.reactivex.rxjava3.core.Flowable;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.infinispan.Cache;
import org.infinispan.CacheStream;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commands.statetransfer.StateTransferStartCommand;
import org.infinispan.commons.configuration.Combine;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.IntSets;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.container.impl.InternalDataContainer;
import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext;
import org.infinispan.distribution.MagicKey;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.distribution.ch.KeyPartitioner;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.reactive.publisher.impl.ClusterPublisherManager;
import org.infinispan.reactive.publisher.impl.DeliveryGuarantee;
import org.infinispan.reactive.publisher.impl.LocalPublisherManager;
import org.infinispan.reactive.publisher.impl.PublisherHandler;
import org.infinispan.reactive.publisher.impl.SegmentPublisherSupplier;
import org.infinispan.reactive.publisher.impl.commands.batch.InitialPublisherCommand;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.rpc.RpcOptions;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.ResponseCollector;
import org.infinispan.stream.BaseClusteredStreamIteratorTest;
import org.infinispan.test.AbstractCacheTest;
import org.infinispan.test.Mocks;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.CheckPoint;
import org.infinispan.test.fwk.TransportFlags;
import org.mockito.AdditionalAnswers;
import org.mockito.ArgumentMatchers;
import org.mockito.MockSettings;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@Test(groups={"functional", "smoke"}, testName="iteration.DistributedStreamIteratorTest")
public class DistributedStreamIteratorTest
extends BaseClusteredStreamIteratorTest {
    public DistributedStreamIteratorTest() {
        this(false, CacheMode.DIST_SYNC);
    }

    public DistributedStreamIteratorTest(boolean tx, CacheMode cacheMode) {
        super(tx, cacheMode);
        this.cleanup = AbstractCacheTest.CleanupPhase.AFTER_METHOD;
    }

    @Override
    protected Object getKeyTiedToCache(Cache<?, ?> cache) {
        return new MagicKey(cache);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testIterationDuringInitialTransfer() throws Exception {
        Map<Object, String> values = this.putValueInEachCache(3);
        this.killMember(2, "testCache");
        Cache cache0 = this.cache(0, "testCache");
        CheckPoint checkPoint = new CheckPoint();
        checkPoint.triggerForever("after_release");
        this.blockStateTransfer(cache0, checkPoint);
        EmbeddedCacheManager joinerManager = this.addClusterEnabledCacheManager(this.sci, new ConfigurationBuilder(), new TransportFlags().withFD(true));
        ConfigurationBuilder builderNoAwaitInitialTransfer = new ConfigurationBuilder();
        builderNoAwaitInitialTransfer.read(this.builderUsed.build(), Combine.DEFAULT);
        builderNoAwaitInitialTransfer.clustering().stateTransfer().awaitInitialTransfer(false);
        joinerManager.defineConfiguration("testCache", builderNoAwaitInitialTransfer.build());
        Cache joinerCache = joinerManager.getCache("testCache", true);
        checkPoint.awaitStrict("before_invocation", 10L, TimeUnit.SECONDS);
        HashSet<String> iteratorValues = new HashSet<String>();
        try {
            for (String value : joinerCache.entrySet().stream().map(Map.Entry::getValue)) {
                iteratorValues.add(value);
            }
        }
        finally {
            checkPoint.triggerForever("before_release");
        }
        for (Map.Entry<Object, String> entry : values.entrySet()) {
            AssertJUnit.assertTrue((String)("Entry wasn't found:" + String.valueOf(entry)), (boolean)iteratorValues.contains(entry.getValue()));
        }
    }

    @Test
    public void verifyNodeLeavesBeforeGettingData() throws Exception {
        Map<Object, String> values = this.putValueInEachCache(3);
        Cache cache0 = this.cache(0, "testCache");
        Cache cache1 = this.cache(1, "testCache");
        CheckPoint checkPoint = new CheckPoint();
        checkPoint.triggerForever("after_release");
        this.waitUntilSendingResponse(cache1, checkPoint);
        LinkedBlockingQueue returnQueue = new LinkedBlockingQueue();
        Future<Void> future = this.fork(() -> {
            for (String entry : cache0.values().stream()) {
                returnQueue.add(entry);
            }
            return null;
        });
        checkPoint.awaitStrict("before_invocation", 10L, TimeUnit.SECONDS);
        this.killMember(1, "testCache");
        checkPoint.trigger("before_release");
        future.get(10L, TimeUnit.SECONDS);
        for (Map.Entry<Object, String> entry : values.entrySet()) {
            AssertJUnit.assertTrue((String)("Entry wasn't found:" + String.valueOf(entry)), (boolean)returnQueue.contains(entry.getValue()));
        }
    }

    @Test
    public void verifyNodeLeavesAfterSendingBackSomeData() throws TimeoutException, InterruptedException, ExecutionException {
        Cache cache0 = this.cache(0, "testCache");
        Cache cache1 = this.cache(1, "testCache");
        HashMap<MagicKey, String> values = new HashMap<MagicKey, String>();
        int chunkSize = cache0.getCacheConfiguration().clustering().stateTransfer().chunkSize();
        for (int i = 0; i < chunkSize + 2; ++i) {
            MagicKey key = new MagicKey(cache1);
            cache1.put((Object)key, (Object)key.toString());
            values.put(key, key.toString());
        }
        CheckPoint checkPoint = new CheckPoint();
        checkPoint.trigger("before_release");
        this.waitUntilSendingResponse(cache1, checkPoint);
        LinkedBlockingQueue returnQueue = new LinkedBlockingQueue();
        Future<Void> future = this.fork(() -> {
            for (Map.Entry entry : cache0.entrySet().stream()) {
                returnQueue.add(entry);
            }
            return null;
        });
        checkPoint.awaitStrict("after_invocation", 10L, TimeUnit.SECONDS);
        checkPoint.trigger("after_release");
        Map.Entry value = (Map.Entry)returnQueue.poll(10L, TimeUnit.SECONDS);
        this.killMember(1, "testCache");
        future.get(10L, TimeUnit.SECONDS);
        for (Map.Entry entry : values.entrySet()) {
            AssertJUnit.assertTrue((String)("Entry wasn't found:" + String.valueOf(entry)), (returnQueue.contains(entry) || entry.equals(value) ? 1 : 0) != 0);
        }
    }

    @Test
    public void waitUntilProcessingResults() throws TimeoutException, InterruptedException, ExecutionException {
        Cache cache0 = this.cache(0, "testCache");
        Cache cache1 = this.cache(1, "testCache");
        HashMap<MagicKey, String> values = new HashMap<MagicKey, String>();
        for (int i = 0; i < 9; ++i) {
            MagicKey key = new MagicKey(cache1);
            cache1.put((Object)key, (Object)key.toString());
            values.put(key, key.toString());
        }
        CheckPoint checkPoint = new CheckPoint();
        checkPoint.triggerForever("after_release");
        ClusterPublisherManager spy = Mocks.replaceComponentWithSpy(cache0, ClusterPublisherManager.class);
        ((ClusterPublisherManager)Mockito.doAnswer(invocation -> {
            SegmentPublisherSupplier result = (SegmentPublisherSupplier)invocation.callRealMethod();
            return Mocks.blockingPublisher(result, checkPoint);
        }).when((Object)spy)).entryPublisher((IntSet)ArgumentMatchers.any(), (Set)ArgumentMatchers.any(), (InvocationContext)ArgumentMatchers.any(), ArgumentMatchers.anyLong(), (DeliveryGuarantee)ArgumentMatchers.any(), ArgumentMatchers.anyInt(), (Function)ArgumentMatchers.any());
        LinkedBlockingQueue<Map.Entry<Object, String>> returnQueue = new LinkedBlockingQueue<Map.Entry<Object, String>>();
        Future<Void> future = this.fork(() -> {
            for (Map.Entry entry : cache0.entrySet().stream()) {
                returnQueue.add(entry);
            }
            return null;
        });
        checkPoint.awaitStrict("before_invocation", 10L, TimeUnit.SECONDS);
        checkPoint.triggerForever("before_release");
        this.killMember(1, "testCache");
        future.get(10L, TimeUnit.SECONDS);
        KeyPartitioner keyPartitioner = TestingUtil.extractComponent(cache0, KeyPartitioner.class);
        Map<Integer, Set<Map.Entry<Object, String>>> expected = this.generateEntriesPerSegment(keyPartitioner, values.entrySet());
        Map<Integer, Set<Map.Entry<Object, String>>> answer = this.generateEntriesPerSegment(keyPartitioner, returnQueue);
        for (Map.Entry<Integer, Set<Map.Entry<Object, String>>> entry : expected.entrySet()) {
            Integer segment = entry.getKey();
            Set<Map.Entry<Object, String>> answerForSegment = answer.get(segment);
            if (answerForSegment != null) {
                for (Map.Entry<Object, String> exp : entry.getValue()) {
                    if (answerForSegment.contains(exp)) continue;
                    log.errorf("Segment %d, missing %s", (Object)segment, exp);
                }
                for (Map.Entry<Object, String> ans : answerForSegment) {
                    if (entry.getValue().contains(ans)) continue;
                    log.errorf("Segment %d, extra %s", (Object)segment, ans);
                }
                AssertJUnit.assertEquals((int)entry.getValue().size(), (int)answerForSegment.size());
            }
            AssertJUnit.assertEquals((String)("Segment " + segment + " had a mismatch"), entry.getValue(), answerForSegment);
        }
    }

    @Test
    public void testNodeLeavesWhileIteratingOverContainerCausingRehashToLoseValues() throws Exception {
        Cache cache0 = this.cache(0, "testCache");
        Cache cache1 = this.cache(1, "testCache");
        Cache cache2 = this.cache(2, "testCache");
        HashMap<MagicKey, String> values = new HashMap<MagicKey, String>();
        values.put(new MagicKey(cache0), "ignore");
        values.put(new MagicKey(cache0), "ignore");
        values.put(new MagicKey(cache0), "ignore");
        values.put(new MagicKey(cache1), "ignore");
        cache1.putAll(values);
        CheckPoint checkPoint = new CheckPoint();
        checkPoint.triggerForever("post_iterator_released");
        this.waitUntilDataContainerWillBeIteratedOn(cache0, checkPoint);
        LinkedBlockingQueue<Map.Entry<Object, String>> returnQueue = new LinkedBlockingQueue<Map.Entry<Object, String>>();
        Future<Void> future = this.fork(() -> {
            for (Map.Entry entry : cache2.entrySet().stream().distributedBatchSize(2)) {
                returnQueue.add(entry);
            }
            return null;
        });
        checkPoint.awaitStrict("pre_iterator_invoked", 10L, TimeUnit.SECONDS);
        this.killMember(0, "testCache", false);
        checkPoint.triggerForever("pre_iterator_released");
        future.get(10L, TimeUnit.SECONDS);
        KeyPartitioner keyPartitioner = TestingUtil.extractComponent(cache1, KeyPartitioner.class);
        Map<Integer, Set<Map.Entry<Object, String>>> expected = this.generateEntriesPerSegment(keyPartitioner, values.entrySet());
        Map<Integer, Set<Map.Entry<Object, String>>> answer = this.generateEntriesPerSegment(keyPartitioner, returnQueue);
        for (Map.Entry<Integer, Set<Map.Entry<Object, String>>> entry : expected.entrySet()) {
            try {
                AssertJUnit.assertEquals((String)("Segment " + String.valueOf(entry.getKey()) + " had a mismatch"), entry.getValue(), answer.get(entry.getKey()));
            }
            catch (AssertionError e) {
                log.fatal((Object)"TEST ENDED");
                throw e;
            }
        }
    }

    @Test
    public void testLocallyForcedStream() {
        Cache cache0 = this.cache(0, "testCache");
        Cache cache1 = this.cache(1, "testCache");
        Cache cache2 = this.cache(2, "testCache");
        HashMap<MagicKey, String> values = new HashMap<MagicKey, String>();
        MagicKey key1 = new MagicKey(cache0);
        cache0.put((Object)key1, (Object)key1.toString());
        values.put(key1, key1.toString());
        MagicKey key2 = this.magicKey(cache1, cache2);
        cache2.put((Object)key2, (Object)key2.toString());
        MagicKey key3 = this.magicKey(cache2, cache1);
        cache1.put((Object)key3, (Object)key3.toString());
        int count = 0;
        for (Map.Entry entry : cache0.getAdvancedCache().withFlags(Flag.CACHE_MODE_LOCAL).entrySet().stream()) {
            String cacheValue = (String)cache0.get(entry.getKey());
            AssertJUnit.assertNotNull((Object)cacheValue);
            AssertJUnit.assertEquals((String)cacheValue, (String)((String)entry.getValue()));
            ++count;
        }
        AssertJUnit.assertEquals((int)values.size(), (int)count);
    }

    private void testIteratorClosedProperlyOnClose(Cache<Object, String> dataOwnerCache, Cache<Object, String> iteratorCache) {
        Cache cache2 = this.cache(2, "testCache");
        for (int i = 0; i < Flowable.bufferSize() + 2; ++i) {
            dataOwnerCache.put((Object)this.magicKey(dataOwnerCache, cache2), (Object)"value");
        }
        PublisherHandler handler = TestingUtil.extractComponent(dataOwnerCache, PublisherHandler.class);
        AssertJUnit.assertEquals((int)0, (int)handler.openPublishers());
        try (CacheStream stream = iteratorCache.entrySet().stream();){
            Iterator iter = stream.distributedBatchSize(1).iterator();
            AssertJUnit.assertTrue((boolean)iter.hasNext());
            AssertJUnit.assertEquals((int)1, (int)handler.openPublishers());
        }
        this.eventuallyEquals(0, () -> ((PublisherHandler)handler).openPublishers());
    }

    public void testIteratorClosedWhenPartiallyIteratedLocal() {
        this.testIteratorClosedProperlyOnClose(this.cache(1, "testCache"), this.cache(1, "testCache"));
    }

    public void testIteratorClosedWhenPartiallyIteratedRemote() {
        this.testIteratorClosedProperlyOnClose(this.cache(1, "testCache"), this.cache(0, "testCache"));
    }

    public void testIteratorClosedWhenIteratedFully() {
        Cache cache0 = this.cache(0, "testCache");
        Cache cache1 = this.cache(1, "testCache");
        Cache cache2 = this.cache(2, "testCache");
        for (int i = 0; i < Flowable.bufferSize() + 2; ++i) {
            cache0.put((Object)this.magicKey(cache1, cache2), (Object)"not-local");
        }
        PublisherHandler handler = TestingUtil.extractComponent(cache1, PublisherHandler.class);
        AssertJUnit.assertEquals((int)0, (int)handler.openPublishers());
        Iterator iter = cache0.entrySet().stream().distributedBatchSize(1).iterator();
        AssertJUnit.assertTrue((boolean)iter.hasNext());
        AssertJUnit.assertEquals((int)1, (int)handler.openPublishers());
        iter.forEachRemaining(ignore -> {});
        this.eventuallyEquals(0, () -> ((PublisherHandler)handler).openPublishers());
    }

    protected MagicKey magicKey(Cache<Object, String> cache1, Cache<Object, String> cache2) {
        if (cache1.getCacheConfiguration().clustering().hash().numOwners() < 2) {
            return new MagicKey(cache1);
        }
        return new MagicKey(cache1, cache2);
    }

    @Test
    public void testStayLocalIfAllSegmentsPresentLocallyWithReHash() throws Exception {
        this.testStayLocalIfAllSegmentsPresentLocally(true);
    }

    @Test
    public void testStayLocalIfAllSegmentsPresentLocallyWithoutRehash() throws Exception {
        this.testStayLocalIfAllSegmentsPresentLocally(false);
    }

    private void testStayLocalIfAllSegmentsPresentLocally(boolean rehashAware) {
        Cache cache0 = this.cache(0, "testCache");
        RpcManager rpcManager = Mocks.replaceComponentWithSpy(cache0, RpcManager.class);
        this.putValueInEachCache(3);
        KeyPartitioner keyPartitioner = TestingUtil.extractComponent(cache0, KeyPartitioner.class);
        ConsistentHash ch = cache0.getAdvancedCache().getDistributionManager().getWriteConsistentHash();
        IntSet segmentsCache0 = IntSets.from((Set)ch.getSegmentsForOwner(this.address(0)));
        CacheStream stream = cache0.entrySet().stream();
        if (!rehashAware) {
            stream = stream.disableRehashAware();
        }
        Map entries = DistributedStreamIteratorTest.mapFromIterator(stream.filterKeySegments(segmentsCache0).iterator());
        Map<Integer, Set<Map.Entry<Object, String>>> entriesPerSegment = this.generateEntriesPerSegment(keyPartitioner, entries.entrySet());
        AssertJUnit.assertTrue((boolean)segmentsCache0.containsAll(entriesPerSegment.keySet()));
        ((RpcManager)Mockito.verify((Object)rpcManager, (VerificationMode)Mockito.never())).invokeCommand((Address)ArgumentMatchers.any(Address.class), (ReplicableCommand)ArgumentMatchers.any(InitialPublisherCommand.class), (ResponseCollector)ArgumentMatchers.any(), (RpcOptions)ArgumentMatchers.any());
    }

    protected void waitUntilSendingResponse(Cache<?, ?> cache, CheckPoint checkPoint) {
        Mocks.blockingMock(checkPoint, LocalPublisherManager.class, cache, (stub, m) -> ((LocalPublisherManager)stub.when(m)).entryPublisher((IntSet)ArgumentMatchers.any(), (Set)ArgumentMatchers.any(), (Set)ArgumentMatchers.any(), ArgumentMatchers.anyLong(), (DeliveryGuarantee)ArgumentMatchers.any(), (Function)ArgumentMatchers.any()), new Class[0]);
    }

    protected <K> void blockStateTransfer(Cache<?, ?> cache, CheckPoint checkPoint) {
        Mocks.blockInboundCacheRpcCommand(cache, checkPoint, command -> command instanceof StateTransferStartCommand);
    }

    protected void waitUntilDataContainerWillBeIteratedOn(Cache<?, ?> cache, CheckPoint checkPoint) {
        InternalDataContainer dataContainer = TestingUtil.extractComponent(cache, InternalDataContainer.class);
        Answer forwardedAnswer = AdditionalAnswers.delegatesTo((Object)dataContainer);
        InternalDataContainer mockContainer = (InternalDataContainer)Mockito.mock(InternalDataContainer.class, (MockSettings)Mockito.withSettings().defaultAnswer(forwardedAnswer));
        AtomicInteger invocationCount = new AtomicInteger();
        Answer blockingAnswer = invocation -> {
            boolean waiting = false;
            if (invocationCount.getAndIncrement() == 0) {
                waiting = true;
                checkPoint.trigger("pre_iterator_invoked");
                checkPoint.awaitStrict("pre_iterator_released", 10L, TimeUnit.SECONDS);
            }
            try {
                Object object = forwardedAnswer.answer(invocation);
                return object;
            }
            finally {
                invocationCount.getAndDecrement();
                if (waiting) {
                    checkPoint.trigger("post_iterator_invoked");
                    checkPoint.awaitStrict("post_iterator_released", 10L, TimeUnit.SECONDS);
                }
            }
        };
        ((InternalDataContainer)Mockito.doAnswer((Answer)blockingAnswer).when((Object)mockContainer)).publisher(ArgumentMatchers.anyInt());
        TestingUtil.replaceComponent(cache, InternalDataContainer.class, mockContainer, true);
    }
}

