/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.notifications.cachelistener;

import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.infinispan.Cache;
import org.infinispan.commons.util.IntSet;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.context.InvocationContext;
import org.infinispan.distribution.LocalizedCacheTopology;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryCreated;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryModified;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryRemoved;
import org.infinispan.notifications.cachelistener.event.CacheEntryEvent;
import org.infinispan.notifications.cachelistener.event.Event;
import org.infinispan.reactive.publisher.impl.ClusterPublisherManager;
import org.infinispan.reactive.publisher.impl.DeliveryGuarantee;
import org.infinispan.reactive.publisher.impl.SegmentPublisherSupplier;
import org.infinispan.test.Mocks;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.fwk.CheckPoint;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;

@Test(groups={"unit"}, testName="notifications.cachelistener.CacheNotifierImplInitialTransferDistTest")
public class CacheNotifierImplInitialTransferDistTest
extends MultipleCacheManagersTest {
    private final String CACHE_NAME = "DistInitialTransferListener";

    @Override
    protected void createCacheManagers() throws Throwable {
        this.createClusteredCaches(3, "DistInitialTransferListener", CacheNotifierImplInitialTransferDistTest.getDefaultClusteredCacheConfig(CacheMode.DIST_SYNC));
    }

    public void testSimpleCacheStartingClusterListener() {
        this.testSimpleCacheStarting(new StateListenerClustered());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testSimpleCacheStarting(StateListener<String, String> listener) {
        HashMap<String, String> expectedValues = new HashMap<String, String>(10);
        Cache cache = this.cache(0, "DistInitialTransferListener");
        this.populateCache(cache, expectedValues);
        cache.addListener(listener);
        try {
            this.verifyEvents(this.isClustered(listener), listener, expectedValues);
        }
        finally {
            cache.removeListener(listener);
        }
    }

    private void populateCache(Cache<String, String> cache, Map<String, String> expectedValues) {
        for (int i = 0; i < 10; ++i) {
            String key = "key-" + i;
            String value = "value-" + i;
            expectedValues.put(key, value);
            cache.put((Object)key, (Object)value);
        }
    }

    private void verifyEvents(boolean isClustered, StateListener<String, String> listener, Map<String, String> expected) {
        Assert.assertEquals((int)listener.events.size(), (int)(isClustered ? expected.size() : expected.size() * 2));
        boolean isPost = true;
        for (CacheEntryEvent event : listener.events) {
            if (!isClustered) {
                isPost = !isPost;
            }
            Assert.assertEquals((Object)event.getType(), (Object)Event.Type.CACHE_ENTRY_CREATED);
            Assert.assertTrue((boolean)expected.containsKey(event.getKey()));
            Assert.assertEquals((boolean)event.isPre(), (!isPost ? 1 : 0) != 0);
            if (isPost) {
                Assert.assertEquals((String)((String)event.getValue()), (String)expected.get(event.getKey()));
                continue;
            }
            Assert.assertNull((Object)event.getValue());
        }
    }

    public void testCreateAfterIterationBeganButNotIteratedValueYetNonOwnerClustered() throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException {
        this.testModificationAfterIterationBeganButNotIteratedValueYet(new StateListenerClustered(), Operation.CREATE, false);
    }

    public void testCreateAfterIterationBeganButNotIteratedValueYetOwnerClustered() throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException {
        this.testModificationAfterIterationBeganButNotIteratedValueYet(new StateListenerClustered(), Operation.CREATE, true);
    }

    public void testModificationAfterIterationBeganButNotIteratedValueYetNonOwnerClustered() throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException {
        this.testModificationAfterIterationBeganButNotIteratedValueYet(new StateListenerClustered(), Operation.PUT, false);
    }

    public void testModificationAfterIterationBeganButNotIteratedValueYetOwnerClustered() throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException {
        this.testModificationAfterIterationBeganButNotIteratedValueYet(new StateListenerClustered(), Operation.PUT, true);
    }

    public void testRemoveAfterIterationBeganButNotIteratedValueYetNonOwnerClustered() throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException {
        this.testModificationAfterIterationBeganButNotIteratedValueYet(new StateListenerClustered(), Operation.REMOVE, false);
    }

    public void testRemoveAfterIterationBeganButNotIteratedValueYetOwnerClustered() throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException {
        this.testModificationAfterIterationBeganButNotIteratedValueYet(new StateListenerClustered(), Operation.REMOVE, true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testModificationAfterIterationBeganButNotIteratedValueYet(StateListener<String, String> listener, Operation operation, boolean shouldBePrimaryOwner) throws InterruptedException, TimeoutException, BrokenBarrierException, ExecutionException {
        HashMap<String, String> expectedValues = new HashMap<String, String>(10);
        Cache cache = this.cache(0, "DistInitialTransferListener");
        this.populateCache(cache, expectedValues);
        CheckPoint checkPoint = new CheckPoint();
        CacheNotifierImplInitialTransferDistTest.registerBlockingPublisher(checkPoint, cache);
        checkPoint.triggerForever("after_invocation");
        checkPoint.triggerForever("after_release");
        try {
            String keyToChange = this.findKeyBasedOnOwnership("key-to-change", cache.getAdvancedCache().getDistributionManager().getCacheTopology(), shouldBePrimaryOwner);
            String value = this.prepareOperation(operation, cache, keyToChange);
            if (value != null) {
                expectedValues.put(keyToChange, value);
            }
            Future<Void> future = this.fork(() -> {
                cache.addListener((Object)listener);
                return null;
            });
            checkPoint.awaitStrict("before_invocation", 10L, TimeUnit.SECONDS);
            operation.perform(cache, keyToChange, value);
            checkPoint.triggerForever("before_release");
            future.get(10L, TimeUnit.SECONDS);
            this.verifyEvents(this.isClustered(listener), listener, expectedValues);
        }
        finally {
            cache.removeListener(listener);
        }
    }

    private String prepareOperation(Operation operation, Cache<String, String> cache, String keyToChange) {
        return switch (operation.ordinal()) {
            case 1 -> "new-value";
            case 0 -> {
                cache.put((Object)keyToChange, (Object)"initial-value");
                yield "changed-value";
            }
            case 2 -> {
                cache.put((Object)keyToChange, (Object)"initial-value");
                yield null;
            }
            default -> throw new IllegalArgumentException("Unsupported Operation provided " + String.valueOf((Object)operation));
        };
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testModificationAfterIterationBeganAndCompletedSegmentValueOwner(StateListener<String, String> listener, Operation operation, boolean shouldBePrimaryOwner) throws IOException, InterruptedException, TimeoutException, BrokenBarrierException, ExecutionException {
        HashMap<String, String> expectedValues = new HashMap<String, String>(10);
        Cache cache = this.cache(0, "DistInitialTransferListener");
        this.populateCache(cache, expectedValues);
        CheckPoint checkPoint = new CheckPoint();
        CacheNotifierImplInitialTransferDistTest.registerBlockingPublisher(checkPoint, cache);
        checkPoint.triggerForever("before_release");
        try {
            CacheEntryEvent event;
            int position;
            String keyToChange = this.findKeyBasedOnOwnership("key-to-change", cache.getAdvancedCache().getDistributionManager().getCacheTopology(), shouldBePrimaryOwner);
            String value = this.prepareOperation(operation, cache, keyToChange);
            if (cache.get((Object)keyToChange) != null) {
                expectedValues.put(keyToChange, (String)cache.get((Object)keyToChange));
            }
            Future<Void> future = this.fork(() -> {
                cache.addListener((Object)listener);
                return null;
            });
            checkPoint.awaitStrict("after_invocation", 30L, TimeUnit.SECONDS);
            Object oldValue = operation.perform(cache, keyToChange, value);
            checkPoint.triggerForever("after_release");
            future.get(30L, TimeUnit.SECONDS);
            boolean isClustered = this.isClustered(listener);
            Assert.assertEquals((int)listener.events.size(), (int)(isClustered ? expectedValues.size() + 1 : (expectedValues.size() + 1) * 2));
            boolean isPost = true;
            for (position = 0; position < (isClustered ? expectedValues.size() : expectedValues.size() * 2); ++position) {
                if (!isClustered) {
                    isPost = !isPost;
                }
                event = listener.events.get(position);
                Assert.assertEquals((Object)event.getType(), (Object)Event.Type.CACHE_ENTRY_CREATED);
                Assert.assertTrue((boolean)expectedValues.containsKey(event.getKey()));
                Assert.assertEquals((boolean)event.isPre(), (!isPost ? 1 : 0) != 0);
                if (isPost) {
                    Assert.assertEquals((Object)event.getValue(), expectedValues.get(event.getKey()));
                    continue;
                }
                Assert.assertNull((Object)event.getValue());
            }
            if (isClustered) {
                event = listener.events.get(position);
                Assert.assertEquals((Object)event.getType(), (Object)operation.getType());
                Assert.assertEquals((boolean)event.isPre(), (boolean)false);
                Assert.assertEquals((String)((String)event.getKey()), (String)keyToChange);
                Assert.assertEquals((String)((String)event.getValue()), (String)value);
            } else {
                event = listener.events.get(position);
                Assert.assertEquals((Object)event.getType(), (Object)operation.getType());
                Assert.assertEquals((boolean)event.isPre(), (boolean)true);
                Assert.assertEquals((String)((String)event.getKey()), (String)keyToChange);
                Assert.assertEquals((Object)event.getValue(), (Object)oldValue);
                event = listener.events.get(position + 1);
                Assert.assertEquals((Object)event.getType(), (Object)operation.getType());
                Assert.assertEquals((boolean)event.isPre(), (boolean)false);
                Assert.assertEquals((String)((String)event.getKey()), (String)keyToChange);
                Assert.assertEquals((String)((String)event.getValue()), (String)value);
            }
        }
        finally {
            cache.removeListener(listener);
        }
    }

    private String findKeyBasedOnOwnership(String keyPrefix, LocalizedCacheTopology cacheTopology, boolean shouldBePrimaryOwner) {
        for (int i = 0; i < 1000; ++i) {
            String key = keyPrefix + i;
            boolean isPrimaryOwner = cacheTopology.getDistribution((Object)key).isPrimary();
            if (isPrimaryOwner != shouldBePrimaryOwner) continue;
            if (shouldBePrimaryOwner) {
                log.debugf("Found key %s with primary owner %s, segment %d", (Object)key, (Object)cacheTopology.getLocalAddress(), (Object)cacheTopology.getSegment((Object)key));
            } else {
                log.debugf("Found key %s with primary owner != %s, segment %d", (Object)key, (Object)cacheTopology.getLocalAddress(), (Object)cacheTopology.getSegment((Object)key));
            }
            return key;
        }
        throw new RuntimeException("No key could be found for owner, this may be a bug in test or really bad luck!");
    }

    public void testCreateAfterIterationBeganAndCompletedSegmentValueNonOwnerClustered() throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException, IOException {
        this.testModificationAfterIterationBeganAndCompletedSegmentValueOwner(new StateListenerClustered(), Operation.CREATE, false);
    }

    public void testCreateAfterIterationBeganAndCompletedSegmentValueOwnerClustered() throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException, IOException {
        this.testModificationAfterIterationBeganAndCompletedSegmentValueOwner(new StateListenerClustered(), Operation.CREATE, true);
    }

    public void testModificationAfterIterationBeganAndCompletedSegmentValueNonOwnerClustered() throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException, IOException {
        this.testModificationAfterIterationBeganAndCompletedSegmentValueOwner(new StateListenerClustered(), Operation.PUT, false);
    }

    public void testModificationAfterIterationBeganAndCompletedSegmentValueOwnerClustered() throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException, IOException {
        this.testModificationAfterIterationBeganAndCompletedSegmentValueOwner(new StateListenerClustered(), Operation.PUT, true);
    }

    public void testRemoveAfterIterationBeganAndCompletedSegmentValueNonOwnerClustered() throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException, IOException {
        this.testModificationAfterIterationBeganAndCompletedSegmentValueOwner(new StateListenerClustered(), Operation.REMOVE, false);
    }

    public void testRemoveAfterIterationBeganAndCompletedSegmentValueOwnerClustered() throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException, IOException {
        this.testModificationAfterIterationBeganAndCompletedSegmentValueOwner(new StateListenerClustered(), Operation.REMOVE, true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void testIterationBeganAndSegmentNotComplete(StateListener<String, String> listener, Operation operation, boolean shouldBePrimaryOwner) throws TimeoutException, InterruptedException, ExecutionException {
        HashMap<String, String> expectedValues = new HashMap<String, String>(10);
        Cache cache = this.cache(0, "DistInitialTransferListener");
        this.populateCache(cache, expectedValues);
        String keyToChange = this.findKeyBasedOnOwnership("key-to-change-", cache.getAdvancedCache().getDistributionManager().getCacheTopology(), shouldBePrimaryOwner);
        String value = this.prepareOperation(operation, cache, keyToChange);
        if (cache.get((Object)keyToChange) != null) {
            expectedValues.put(keyToChange, (String)cache.get((Object)keyToChange));
        }
        CheckPoint checkPoint = new CheckPoint();
        checkPoint.triggerForever("after_release");
        int segmentToUse = cache.getAdvancedCache().getDistributionManager().getCacheTopology().getSegment((Object)keyToChange);
        this.waitUntilClosingSegment(cache, segmentToUse, checkPoint);
        Future<Void> future = this.fork(() -> {
            cache.addListener((Object)listener);
            return null;
        });
        try {
            int position;
            checkPoint.awaitStrict("before_invocation", 10L, TimeUnit.SECONDS);
            Object oldValue = operation.perform(cache, keyToChange, value);
            checkPoint.triggerForever("before_release");
            future.get(10L, TimeUnit.SECONDS);
            boolean isClustered = this.isClustered(listener);
            Assert.assertEquals((int)listener.events.size(), (int)(isClustered ? expectedValues.size() + 1 : (expectedValues.size() + 1) * 2));
            if (isClustered) {
                CacheEntryEvent event = null;
                boolean foundEarlierCreate = false;
                for (int i = listener.events.size() - 1; i >= 0; --i) {
                    CacheEntryEvent currentEvent = listener.events.get(i);
                    if (currentEvent.getKey().equals(keyToChange) && operation.getType() == currentEvent.getType()) {
                        if (event == null) {
                            event = currentEvent;
                            listener.events.remove(i);
                            if (operation.getType() != Event.Type.CACHE_ENTRY_CREATED) continue;
                            foundEarlierCreate = true;
                            break;
                        }
                        Assert.fail((String)"There should only be a single event in the event queue!");
                        continue;
                    }
                    if (event != null && (foundEarlierCreate = event.getKey().equals(currentEvent.getKey()))) break;
                }
                Assert.assertTrue((boolean)foundEarlierCreate, (String)("There was no matching create event for key " + String.valueOf(event.getKey())));
                Assert.assertEquals((Object)event.getType(), (Object)operation.getType());
                Assert.assertEquals((boolean)event.isPre(), (boolean)false);
                Assert.assertEquals((Object)event.getValue(), (Object)value);
            }
            boolean isPost = true;
            for (position = 0; position < (isClustered ? expectedValues.size() : expectedValues.size() * 2); ++position) {
                if (!isClustered) {
                    isPost = !isPost;
                }
                CacheEntryEvent event = listener.events.get(position);
                Assert.assertEquals((Object)event.getType(), (Object)Event.Type.CACHE_ENTRY_CREATED);
                Assert.assertTrue((boolean)expectedValues.containsKey(event.getKey()));
                Assert.assertEquals((boolean)event.isPre(), (!isPost ? 1 : 0) != 0);
                if (isPost) {
                    Assert.assertEquals((String)((String)event.getValue()), (String)((String)expectedValues.get(event.getKey())));
                    continue;
                }
                Assert.assertNull((Object)event.getValue());
            }
            if (!isClustered) {
                CacheEntryEvent event = listener.events.get(position);
                Assert.assertEquals((Object)event.getType(), (Object)operation.getType());
                Assert.assertEquals((boolean)event.isPre(), (boolean)true);
                Assert.assertEquals((String)((String)event.getKey()), (String)keyToChange);
                Assert.assertEquals((Object)event.getValue(), (Object)oldValue);
                event = listener.events.get(position + 1);
                Assert.assertEquals((Object)event.getType(), (Object)operation.getType());
                Assert.assertEquals((boolean)event.isPre(), (boolean)false);
                Assert.assertEquals((String)((String)event.getKey()), (String)keyToChange);
                Assert.assertEquals((String)((String)event.getValue()), (String)value);
            }
        }
        finally {
            cache.removeListener(listener);
        }
    }

    public void testCreateAfterIterationBeganAndSegmentNotCompleteValueNonOwnerClustered() throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException, IOException {
        this.testIterationBeganAndSegmentNotComplete(new StateListenerClustered(), Operation.CREATE, false);
    }

    public void testCreateAfterIterationBeganAndSegmentNotCompleteValueOwnerClustered() throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException, IOException {
        this.testIterationBeganAndSegmentNotComplete(new StateListenerClustered(), Operation.CREATE, true);
    }

    public void testModificationAfterIterationBeganAndSegmentNotCompleteValueNonOwnerClustered() throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException, IOException {
        this.testIterationBeganAndSegmentNotComplete(new StateListenerClustered(), Operation.PUT, false);
    }

    public void testModificationAfterIterationBeganAndSegmentNotCompleteValueOwnerClustered() throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException, IOException {
        this.testIterationBeganAndSegmentNotComplete(new StateListenerClustered(), Operation.PUT, true);
    }

    public void testRemoveAfterIterationBeganAndSegmentNotCompleteValueNonOwnerClustered() throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException, IOException {
        this.testIterationBeganAndSegmentNotComplete(new StateListenerClustered(), Operation.REMOVE, false);
    }

    public void testRemoveAfterIterationBeganAndSegmentNotCompleteValueOwnerClustered() throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException, IOException {
        this.testIterationBeganAndSegmentNotComplete(new StateListenerClustered(), Operation.REMOVE, true);
    }

    private boolean isClustered(StateListener listener) {
        return listener.getClass().getAnnotation(Listener.class).clustered();
    }

    private void segmentCompletionWaiter(AtomicBoolean shouldFire, CheckPoint checkPoint) throws TimeoutException, InterruptedException {
        if (shouldFire.compareAndSet(false, true)) {
            log.tracef("We were first to check segment completion", new Object[0]);
        } else {
            log.tracef("We were last to check segment completion, so notifying main thread", new Object[0]);
            checkPoint.trigger("pre_complete_segment_invoked");
            checkPoint.awaitStrict("pre_complete_segment_released", 10L, TimeUnit.SECONDS);
        }
    }

    protected void waitUntilClosingSegment(Cache<?, ?> cache, int segment, CheckPoint checkPoint) {
        ClusterPublisherManager spy = Mocks.replaceComponentWithSpy(cache, ClusterPublisherManager.class);
        ((ClusterPublisherManager)Mockito.doAnswer(invocation -> {
            SegmentPublisherSupplier publisher = (SegmentPublisherSupplier)invocation.callRealMethod();
            return Mocks.blockingSegmentPublisherOnElement(publisher, checkPoint, n -> n.isSegmentComplete() && n.completedSegment() == segment);
        }).when((Object)spy)).entryPublisher((IntSet)ArgumentMatchers.any(), (Set)ArgumentMatchers.any(), (InvocationContext)ArgumentMatchers.any(), ArgumentMatchers.anyLong(), (DeliveryGuarantee)ArgumentMatchers.any(), ArgumentMatchers.anyInt(), (Function)ArgumentMatchers.any());
    }

    private static void registerBlockingPublisher(CheckPoint checkPoint, Cache<?, ?> cache) {
        ClusterPublisherManager spy = Mocks.replaceComponentWithSpy(cache, ClusterPublisherManager.class);
        ((ClusterPublisherManager)Mockito.doAnswer(invocation -> {
            SegmentPublisherSupplier result = (SegmentPublisherSupplier)invocation.callRealMethod();
            return Mocks.blockingPublisher(result, checkPoint);
        }).when((Object)spy)).entryPublisher((IntSet)ArgumentMatchers.any(), (Set)ArgumentMatchers.any(), (InvocationContext)ArgumentMatchers.any(), ArgumentMatchers.anyLong(), (DeliveryGuarantee)ArgumentMatchers.any(), ArgumentMatchers.anyInt(), (Function)ArgumentMatchers.any());
    }

    @Listener(includeCurrentState=true, clustered=true)
    private static class StateListenerClustered
    extends StateListener<String, String> {
        private StateListenerClustered() {
        }
    }

    protected static abstract class StateListener<K, V> {
        final List<CacheEntryEvent<K, V>> events = Collections.synchronizedList(new ArrayList());
        private static final Log log = LogFactory.getLog(MethodHandles.lookup().lookupClass());

        protected StateListener() {
        }

        @CacheEntryCreated
        @CacheEntryModified
        @CacheEntryRemoved
        public void onCacheNotification(CacheEntryEvent<K, V> event) {
            log.tracef("Received event: %s", event);
            this.events.add(event);
        }
    }

    /*
     * Uses 'sealed' constructs - enablewith --sealed true
     */
    private static enum Operation {
        PUT(Event.Type.CACHE_ENTRY_MODIFIED),
        CREATE(Event.Type.CACHE_ENTRY_CREATED),
        REMOVE(Event.Type.CACHE_ENTRY_REMOVED){

            @Override
            public <K, V> Object perform(Cache<K, V> cache, K key, V value) {
                return cache.remove(key);
            }
        };

        private final Event.Type type;

        private Operation(Event.Type type) {
            this.type = type;
        }

        public Event.Type getType() {
            return this.type;
        }

        public <K, V> Object perform(Cache<K, V> cache, K key, V value) {
            return cache.put(key, value);
        }
    }

    @Listener(includeCurrentState=true, clustered=false)
    private static class StateListenerNotClustered
    extends StateListener<String, String> {
        private StateListenerNotClustered() {
        }
    }
}

