/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.api;

import jakarta.transaction.TransactionManager;
import java.io.Serializable;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.infinispan.Cache;
import org.infinispan.LockedStream;
import org.infinispan.api.CacheAPITest;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.test.TestingUtil;
import org.infinispan.transaction.LockingMode;
import org.infinispan.util.concurrent.locks.LockManager;
import org.infinispan.util.function.SerializableBiConsumer;
import org.infinispan.util.function.SerializablePredicate;
import org.testng.AssertJUnit;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

public abstract class BaseCacheAPIPessimisticTest
extends CacheAPITest {
    @Override
    protected void amend(ConfigurationBuilder cb) {
        cb.transaction().lockingMode(LockingMode.PESSIMISTIC);
    }

    public void testLockedStreamBlocked() throws InterruptedException, TimeoutException, BrokenBarrierException, ExecutionException {
        for (int i = 0; i < 10; ++i) {
            this.cache.put((Object)i, (Object)("value" + i));
        }
        CyclicBarrier barrier = new CyclicBarrier(2);
        int key = 4;
        Future<Object> putFuture = this.fork(() -> TestingUtil.withTx(this.cache.getAdvancedCache().getTransactionManager(), () -> {
            Object prev = this.cache.put((Object)key, (Object)("value" + key + "-new"));
            barrier.await(10L, TimeUnit.SECONDS);
            barrier.await(10L, TimeUnit.SECONDS);
            return prev;
        }));
        barrier.await(10L, TimeUnit.SECONDS);
        LockedStream stream = this.cache.getAdvancedCache().lockedStream();
        Future<Void> forEachFuture = this.fork(() -> stream.filter((SerializablePredicate & Serializable)e -> e.getKey().equals(key)).forEach((SerializableBiConsumer & Serializable)(c, e) -> AssertJUnit.assertEquals((Object)("value" + key + "-new"), (Object)c.put(e.getKey(), (Object)String.valueOf(String.valueOf(e.getValue()) + "-other")))));
        TestingUtil.assertNotDone(forEachFuture);
        barrier.await(10L, TimeUnit.SECONDS);
        forEachFuture.get(10L, TimeUnit.MINUTES);
        AssertJUnit.assertEquals((Object)("value" + key), (Object)putFuture.get(10L, TimeUnit.SECONDS));
        AssertJUnit.assertEquals((Object)("value" + key + "-new-other"), (Object)this.cache.get((Object)key));
        LockManager lockManager = (LockManager)ComponentRegistry.componentOf((Cache)this.cache, LockManager.class);
        AssertJUnit.assertEquals((int)0, (int)lockManager.getNumberOfLocksHeld());
    }

    @DataProvider(name="testLockedStreamInTx")
    public Object[][] testLockedStreamInTxProvider() {
        return new Object[][]{{Boolean.TRUE}, {Boolean.FALSE}};
    }

    @Test(dataProvider="testLockedStreamInTx")
    public void testLockedStreamInTxCommit(Boolean shouldCommit) throws Exception {
        for (int i = 0; i < 5; ++i) {
            this.cache.put((Object)i, (Object)("value" + i));
        }
        TransactionManager tm = this.cache.getAdvancedCache().getTransactionManager();
        TestingUtil.withTx(tm, () -> {
            this.cache.getAdvancedCache().lockedStream().forEach((SerializableBiConsumer & Serializable)(c, e) -> c.put(e.getKey(), (Object)(String.valueOf(e.getValue()) + "-changed")));
            if (!shouldCommit.booleanValue()) {
                tm.setRollbackOnly();
            }
            return null;
        });
        for (int i = 0; i < 5; ++i) {
            AssertJUnit.assertEquals((Object)("value" + i + "-changed"), (Object)this.cache.get((Object)i));
        }
    }

    public void testLockedStreamTxInsideConsumer() {
        int i;
        for (i = 0; i < 5; ++i) {
            this.cache.put((Object)i, (Object)("value" + i));
        }
        this.cache.getAdvancedCache().lockedStream().forEach((SerializableBiConsumer & Serializable)(c, e) -> {
            try {
                TestingUtil.withTx(c.getAdvancedCache().getTransactionManager(), () -> c.put(e.getKey(), (Object)(String.valueOf(e.getValue()) + "-changed")));
            }
            catch (Exception e1) {
                throw new RuntimeException(e1);
            }
        });
        for (i = 0; i < 5; ++i) {
            AssertJUnit.assertEquals((Object)("value" + i + "-changed"), (Object)this.cache.get((Object)i));
        }
    }

    @DataProvider(name="testLockedStreamInTxAndConsumer")
    public Object[][] testLockedStreamInTxAndConsumerProvider() {
        return new Object[][]{{Boolean.TRUE, Boolean.TRUE}, {Boolean.TRUE, Boolean.FALSE}, {Boolean.FALSE, Boolean.TRUE}, {Boolean.FALSE, Boolean.FALSE}};
    }

    @Test(dataProvider="testLockedStreamInTxAndConsumer")
    public void testLockedStreamInTxAndConsumer(Boolean outerCommit, Boolean innerCommit) throws Exception {
        for (int i = 0; i < 5; ++i) {
            this.cache.put((Object)i, (Object)("value" + i));
        }
        TransactionManager tm = this.cache.getAdvancedCache().getTransactionManager();
        TestingUtil.withTx(tm, () -> {
            this.cache.getAdvancedCache().lockedStream().forEach((SerializableBiConsumer & Serializable)(c, e) -> {
                try {
                    TransactionManager innerTm = c.getAdvancedCache().getTransactionManager();
                    TestingUtil.withTx(innerTm, () -> {
                        c.put(e.getKey(), (Object)(String.valueOf(e.getValue()) + "-changed"));
                        if (!innerCommit.booleanValue()) {
                            innerTm.setRollbackOnly();
                        }
                        return null;
                    });
                }
                catch (Exception e1) {
                    throw new RuntimeException(e1);
                }
            });
            if (!outerCommit.booleanValue()) {
                tm.setRollbackOnly();
            }
            return null;
        });
        for (int i = 0; i < 5; ++i) {
            AssertJUnit.assertEquals((Object)("value" + i + (innerCommit != false ? "-changed" : "")), (Object)this.cache.get((Object)i));
        }
    }
}

