/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.functional.distribution.rehash;

import jakarta.transaction.Transaction;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import org.infinispan.AdvancedCache;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.distribution.DistributionInfo;
import org.infinispan.distribution.ch.ConsistentHashFactory;
import org.infinispan.functional.EntryView;
import org.infinispan.functional.FunctionalMap;
import org.infinispan.functional.MetaParam;
import org.infinispan.functional.impl.FunctionalMapImpl;
import org.infinispan.functional.impl.ReadWriteMapImpl;
import org.infinispan.manager.CacheContainer;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.statetransfer.DelegatingStateConsumer;
import org.infinispan.statetransfer.StateChunk;
import org.infinispan.statetransfer.StateConsumer;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.CleanupAfterMethod;
import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.infinispan.test.fwk.TransportFlags;
import org.infinispan.transaction.TransactionMode;
import org.infinispan.util.ControlledConsistentHashFactory;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@Test(groups={"functional"}, testName="functional.distribution.rehash.FunctionalTxTest")
@CleanupAfterMethod
public class FunctionalTxTest
extends MultipleCacheManagersTest {
    ConfigurationBuilder cb;
    ControlledConsistentHashFactory chf;

    @Override
    protected void createCacheManagers() throws Throwable {
        this.chf = new ControlledConsistentHashFactory.Default(0, 1);
        this.cb = new ConfigurationBuilder();
        this.cb.transaction().transactionMode(TransactionMode.TRANSACTIONAL).useSynchronization(false);
        this.cb.clustering().cacheMode(CacheMode.DIST_SYNC).hash().numSegments(1).consistentHashFactory((ConsistentHashFactory)this.chf);
        this.createCluster(this.cb, 3);
        this.waitForClusterToForm();
    }

    public void testDoubleIncrementBeforeTopology() throws Exception {
        this.testBeforeTopology((rw, key) -> {
            Integer oldValue = (Integer)rw.eval(key, FunctionalTxTest::increment).join();
            rw.eval(key, FunctionalTxTest::increment).join();
            return oldValue;
        }, 2);
    }

    public void testDoubleIncrementAfterTopology() throws Exception {
        this.testAfterTopology((rw, key) -> {
            Integer oldValue = (Integer)rw.eval(key, FunctionalTxTest::increment).join();
            rw.eval(key, FunctionalTxTest::increment).join();
            return oldValue;
        }, 2);
    }

    public void testReadWriteKeyBeforeTopology() throws Exception {
        this.testBeforeTopology((rw, key) -> (Integer)rw.eval(key, FunctionalTxTest::increment).join(), 1);
    }

    public void testReadWriteKeyAfterTopology() throws Exception {
        this.testAfterTopology((rw, key) -> (Integer)rw.eval(key, FunctionalTxTest::increment).join(), 1);
    }

    public void testReadWriteManyKeysBeforeTopology() throws Exception {
        this.testBeforeTopology((rw, key) -> (Integer)rw.evalMany(Collections.singleton(key), FunctionalTxTest::increment).findAny().get(), 1);
    }

    public void testReadWriteManyKeysAfterTopology() throws Exception {
        this.testAfterTopology((rw, key) -> (Integer)rw.evalMany(Collections.singleton(key), FunctionalTxTest::increment).findAny().get(), 1);
    }

    public void testReadWriteManyEntriesBeforeTopology() throws Exception {
        this.testBeforeTopology((rw, key) -> (Integer)rw.evalMany(Collections.singletonMap(key, 1), FunctionalTxTest::add).findAny().get(), 1);
    }

    public void testReadWriteManyEntriesAfterTopology() throws Exception {
        this.testAfterTopology((rw, key) -> (Integer)rw.evalMany(Collections.singletonMap(key, 1), FunctionalTxTest::add).findAny().get(), 1);
    }

    private void testBeforeTopology(BiFunction<FunctionalMap.ReadWriteMap<String, Integer>, String, Integer> op, int expectedIncrement) throws Exception {
        this.cache(0).put((Object)"key", (Object)1);
        BlockingStateConsumer bsc2 = TestingUtil.wrapComponent(this.cache(2), StateConsumer.class, BlockingStateConsumer::new);
        this.tm(2).begin();
        FunctionalMap.ReadWriteMap rw = ReadWriteMapImpl.create((FunctionalMapImpl)FunctionalMapImpl.create((AdvancedCache)this.cache(2).getAdvancedCache()));
        AssertJUnit.assertEquals((Object)1, (Object)op.apply((FunctionalMap.ReadWriteMap<String, Integer>)rw, "key"));
        Transaction tx = this.tm(2).suspend();
        this.chf.setOwnerIndexes(0, 2);
        EmbeddedCacheManager cm = TestCacheManagerFactory.createClusteredCacheManager(false, GlobalConfigurationBuilder.defaultClusteredBuilder(), this.cb, new TransportFlags());
        this.registerCacheManager(new CacheContainer[]{cm});
        Future<Void> future = this.fork(() -> {
            cm.start();
            this.cache(3);
        });
        bsc2.await();
        DistributionInfo distributionInfo = this.cache(2).getAdvancedCache().getDistributionManager().getCacheTopology().getDistribution((Object)"key");
        AssertJUnit.assertFalse((boolean)distributionInfo.isReadOwner());
        AssertJUnit.assertTrue((boolean)distributionInfo.isWriteBackup());
        this.tm(2).resume(tx);
        this.tm(2).commit();
        bsc2.unblock();
        future.get(10L, TimeUnit.SECONDS);
        InternalCacheEntry ice = this.cache(2).getAdvancedCache().getDataContainer().get((Object)"key");
        AssertJUnit.assertEquals((String)("Current ICE: " + String.valueOf(ice)), (Object)(1 + expectedIncrement), (Object)ice.getValue());
    }

    private void testAfterTopology(BiFunction<FunctionalMap.ReadWriteMap<String, Integer>, String, Integer> op, int expectedIncrement) throws Exception {
        this.cache(0).put((Object)"key", (Object)1);
        BlockingStateConsumer bsc2 = TestingUtil.wrapComponent(this.cache(2), StateConsumer.class, BlockingStateConsumer::new);
        this.chf.setOwnerIndexes(0, 2);
        EmbeddedCacheManager cm = TestCacheManagerFactory.createClusteredCacheManager(false, GlobalConfigurationBuilder.defaultClusteredBuilder(), this.cb, new TransportFlags());
        this.registerCacheManager(new CacheContainer[]{cm});
        Future<Void> future = this.fork(() -> {
            cm.start();
            this.cache(3);
        });
        bsc2.await();
        DistributionInfo distributionInfo = this.cache(2).getAdvancedCache().getDistributionManager().getCacheTopology().getDistribution((Object)"key");
        AssertJUnit.assertFalse((boolean)distributionInfo.isReadOwner());
        AssertJUnit.assertTrue((boolean)distributionInfo.isWriteBackup());
        TestingUtil.withTx(this.tm(2), () -> {
            FunctionalMap.ReadWriteMap rw = ReadWriteMapImpl.create((FunctionalMapImpl)FunctionalMapImpl.create((AdvancedCache)this.cache(2).getAdvancedCache()));
            AssertJUnit.assertEquals((Object)1, op.apply(rw, "key"));
            return null;
        });
        bsc2.unblock();
        future.get(10L, TimeUnit.SECONDS);
        InternalCacheEntry ice = this.cache(2).getAdvancedCache().getDataContainer().get((Object)"key");
        AssertJUnit.assertEquals((String)("Current ICE: " + String.valueOf(ice)), (Object)(1 + expectedIncrement), (Object)ice.getValue());
    }

    private static Integer increment(EntryView.ReadWriteEntryView<String, Integer> view) {
        int value = view.find().orElse(0);
        view.set((Object)(value + 1), new MetaParam.Writable[0]);
        return value;
    }

    private static Integer add(Integer param, EntryView.ReadWriteEntryView<String, Integer> view) {
        int value = view.find().orElse(0);
        view.set((Object)(value + param), new MetaParam.Writable[0]);
        return value;
    }

    private static class BlockingStateConsumer
    extends DelegatingStateConsumer {
        private final CountDownLatch expectLatch = new CountDownLatch(1);
        private final CountDownLatch blockLatch = new CountDownLatch(1);

        public BlockingStateConsumer(StateConsumer delegate) {
            super(delegate);
        }

        @Override
        public CompletionStage<?> applyState(Address sender, int topologyId, Collection<StateChunk> stateChunks) {
            this.expectLatch.countDown();
            try {
                AssertJUnit.assertTrue((boolean)this.blockLatch.await(10L, TimeUnit.SECONDS));
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return super.applyState(sender, topologyId, stateChunks);
        }

        public void await() {
            try {
                AssertJUnit.assertTrue((boolean)this.expectLatch.await(10L, TimeUnit.SECONDS));
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }

        public void unblock() {
            this.blockLatch.countDown();
        }
    }
}

