/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.distribution.rehash;

import jakarta.transaction.Transaction;
import java.util.Arrays;
import java.util.HashSet;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.transaction.xa.XAResource;
import org.infinispan.Cache;
import org.infinispan.commands.VisitableCommand;
import org.infinispan.commands.remote.CacheRpcCommand;
import org.infinispan.commands.topology.RebalanceStartCommand;
import org.infinispan.commands.topology.TopologyUpdateCommand;
import org.infinispan.commands.tx.CommitCommand;
import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.TxInvocationContext;
import org.infinispan.distribution.rehash.XAResourceAdapter;
import org.infinispan.interceptors.AsyncInterceptor;
import org.infinispan.interceptors.AsyncInterceptorChain;
import org.infinispan.interceptors.DDAsyncInterceptor;
import org.infinispan.interceptors.impl.TxInterceptor;
import org.infinispan.remoting.inboundhandler.AbstractDelegatingHandler;
import org.infinispan.remoting.inboundhandler.DeliverOrder;
import org.infinispan.remoting.inboundhandler.PerCacheInboundInvocationHandler;
import org.infinispan.remoting.inboundhandler.Reply;
import org.infinispan.remoting.responses.ExceptionResponse;
import org.infinispan.remoting.responses.Response;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.CleanupAfterMethod;
import org.infinispan.transaction.TransactionMode;
import org.testng.annotations.Test;

@Test(groups={"unstable"}, testName="distribution.rehash.OngoingTransactionsAndJoinTest", description="See ISPN-4044 -- original group: functional")
@CleanupAfterMethod
public class OngoingTransactionsAndJoinTest
extends MultipleCacheManagersTest {
    ConfigurationBuilder configuration;
    ScheduledExecutorService delayedExecutor = Executors.newScheduledThreadPool(1, this.getTestThreadFactory("Timer"));

    @Override
    protected void createCacheManagers() throws Throwable {
        this.configuration = OngoingTransactionsAndJoinTest.getDefaultClusteredCacheConfig(CacheMode.DIST_SYNC);
        this.configuration.transaction().transactionMode(TransactionMode.TRANSACTIONAL);
        this.configuration.locking().lockAcquisitionTimeout(60000L).useLockStriping(false);
        this.configuration.clustering().stateTransfer().timeout(30L, TimeUnit.SECONDS);
        this.addClusterEnabledCacheManager(this.configuration);
    }

    public void testRehashOnJoin() throws InterruptedException {
        Object value;
        Cache firstNode = this.cache(0);
        CountDownLatch txsStarted = new CountDownLatch(3);
        CountDownLatch txsReady = new CountDownLatch(3);
        CountDownLatch joinEnded = new CountDownLatch(1);
        final CountDownLatch rehashStarted = new CountDownLatch(1);
        TestingUtil.wrapInboundInvocationHandler(firstNode, original -> new ListeningHandler((PerCacheInboundInvocationHandler)original, txsReady, joinEnded, rehashStarted));
        for (int i = 0; i < 10; ++i) {
            firstNode.put((Object)("OLD" + i), (Object)"value");
        }
        UnpreparedDuringRehashTask ut = new UnpreparedDuringRehashTask(firstNode, txsStarted, txsReady, joinEnded, rehashStarted);
        PrepareDuringRehashTask pt = new PrepareDuringRehashTask(firstNode, txsStarted, txsReady, joinEnded, rehashStarted);
        CommitDuringRehashTask ct = new CommitDuringRehashTask(firstNode, txsStarted, txsReady, joinEnded, rehashStarted);
        AsyncInterceptorChain ic = TestingUtil.extractInterceptorChain(firstNode);
        ic.addInterceptorAfter((AsyncInterceptor)pt, TxInterceptor.class);
        ic.addInterceptorAfter((AsyncInterceptor)ct, TxInterceptor.class);
        HashSet<Thread> threads = new HashSet<Thread>();
        threads.add(new Thread((Runnable)ut, "Worker-UnpreparedDuringRehashTask"));
        threads.add(new Thread((Runnable)pt, "Worker-PrepareDuringRehashTask"));
        threads.add(new Thread((Runnable)ct, "Worker-CommitDuringRehashTask"));
        for (Thread t : threads) {
            t.start();
        }
        txsStarted.await(10L, TimeUnit.SECONDS);
        this.delayedExecutor.schedule(new Callable<Object>(){
            final /* synthetic */ OngoingTransactionsAndJoinTest this$0;
            {
                this.this$0 = this$0;
            }

            @Override
            public Object call() throws Exception {
                rehashStarted.countDown();
                return null;
            }
        }, 10L, TimeUnit.MILLISECONDS);
        this.addClusterEnabledCacheManager(this.configuration);
        ListeningHandler listeningHandler2 = new ListeningHandler(TestingUtil.extractComponent(firstNode, PerCacheInboundInvocationHandler.class), txsReady, joinEnded, rehashStarted);
        TestingUtil.replaceComponent(this.cache(1), PerCacheInboundInvocationHandler.class, listeningHandler2, true);
        Cache joiner = this.cache(1);
        for (Thread t : threads) {
            t.join();
        }
        TestingUtil.waitForNoRebalance(this.cache(0), this.cache(1));
        for (int i = 0; i < 10; ++i) {
            Object key = "OLD" + i;
            value = joiner.get(key);
            log.infof(" TEST: Key %s is %s", key, value);
            assert ("value".equals(value)) : "Couldn't see key " + String.valueOf(key) + " on joiner!";
        }
        for (Object key : Arrays.asList(ut.key(), pt.key(), ct.key())) {
            value = joiner.get(key);
            log.infof(" TEST: Key %s is %s", key, value);
            assert ("value".equals(value)) : "Couldn't see key " + String.valueOf(key) + " on joiner!";
        }
    }

    class UnpreparedDuringRehashTask
    extends TransactionalTask {
        UnpreparedDuringRehashTask(Cache<Object, Object> cache, CountDownLatch txsStarted, CountDownLatch txsReady, CountDownLatch joinEnded, CountDownLatch rehashStarted) {
            this.cache = cache;
            this.txsStarted = txsStarted;
            this.txsReady = txsReady;
            this.joinEnded = joinEnded;
            this.rehashStarted = rehashStarted;
        }

        @Override
        Object key() {
            return "unprepared_during_rehash";
        }

        @Override
        public void run() {
            try {
                this.startTx();
                this.txsReady.countDown();
                this.joinEnded.await(10L, TimeUnit.SECONDS);
                OngoingTransactionsAndJoinTest.this.tm(this.cache).commit();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    class PrepareDuringRehashTask
    extends TransactionalTask {
        PrepareDuringRehashTask(Cache<Object, Object> cache, CountDownLatch txsStarted, CountDownLatch txsReady, CountDownLatch joinEnded, CountDownLatch rehashStarted) {
            this.cache = cache;
            this.txsStarted = txsStarted;
            this.txsReady = txsReady;
            this.joinEnded = joinEnded;
            this.rehashStarted = rehashStarted;
        }

        @Override
        Object key() {
            return "prepare_during_rehash";
        }

        @Override
        public void run() {
            try {
                this.startTx();
                OngoingTransactionsAndJoinTest.this.tm(this.cache).commit();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        public Object visitPrepareCommand(TxInvocationContext tcx, PrepareCommand cc) throws Throwable {
            if (this.tx.equals((Object)tcx.getTransaction())) {
                this.txsReady.countDown();
                this.rehashStarted.await(10L, TimeUnit.SECONDS);
            }
            return super.visitPrepareCommand(tcx, cc);
        }

        public Object visitCommitCommand(TxInvocationContext tcx, CommitCommand cc) throws Throwable {
            if (this.tx.equals((Object)tcx.getTransaction())) {
                this.joinEnded.await(10L, TimeUnit.SECONDS);
            }
            return super.visitCommitCommand(tcx, cc);
        }
    }

    class CommitDuringRehashTask
    extends TransactionalTask {
        CommitDuringRehashTask(Cache<Object, Object> cache, CountDownLatch txsStarted, CountDownLatch txsReady, CountDownLatch joinEnded, CountDownLatch rehashStarted) {
            this.cache = cache;
            this.txsStarted = txsStarted;
            this.txsReady = txsReady;
            this.joinEnded = joinEnded;
            this.rehashStarted = rehashStarted;
        }

        @Override
        Object key() {
            return "commit_during_rehash";
        }

        @Override
        public void run() {
            try {
                this.startTx();
                OngoingTransactionsAndJoinTest.this.tm(this.cache).commit();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        public Object visitPrepareCommand(TxInvocationContext tcx, PrepareCommand cc) throws Throwable {
            return this.invokeNextThenAccept((InvocationContext)tcx, (VisitableCommand)cc, (rCtx, rCommand, rv) -> {
                if (this.tx.equals((Object)tcx.getTransaction())) {
                    this.txsReady.countDown();
                }
            });
        }

        public Object visitCommitCommand(TxInvocationContext tcx, CommitCommand cc) throws Throwable {
            if (this.tx.equals((Object)tcx.getTransaction())) {
                this.rehashStarted.await(10L, TimeUnit.SECONDS);
            }
            return super.visitCommitCommand(tcx, cc);
        }
    }

    static class ListeningHandler
    extends AbstractDelegatingHandler {
        final CountDownLatch txsReady;
        final CountDownLatch joinEnded;
        final CountDownLatch rehashStarted;

        public ListeningHandler(PerCacheInboundInvocationHandler delegate, CountDownLatch txsReady, CountDownLatch joinEnded, CountDownLatch rehashStarted) {
            super(delegate);
            this.txsReady = txsReady;
            this.joinEnded = joinEnded;
            this.rehashStarted = rehashStarted;
        }

        public void handle(CacheRpcCommand cmd, Reply reply, DeliverOrder order) {
            boolean notifyRehashStarted = false;
            if (cmd instanceof RebalanceStartCommand) {
                try {
                    this.txsReady.await(10L, TimeUnit.SECONDS);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    reply.reply((Response)new ExceptionResponse((Exception)e));
                    return;
                }
                notifyRehashStarted = true;
            } else if (cmd instanceof TopologyUpdateCommand) {
                this.joinEnded.countDown();
            }
            this.delegate.handle(cmd, reply, order);
            if (notifyRehashStarted) {
                this.rehashStarted.countDown();
            }
        }
    }

    abstract class TransactionalTask
    extends DDAsyncInterceptor
    implements Runnable {
        Cache<Object, Object> cache;
        CountDownLatch txsStarted;
        CountDownLatch txsReady;
        CountDownLatch joinEnded;
        CountDownLatch rehashStarted;
        volatile Transaction tx;

        TransactionalTask() {
        }

        protected void startTx() throws Exception {
            OngoingTransactionsAndJoinTest.this.tm(this.cache).begin();
            this.cache.put(this.key(), (Object)"value");
            this.tx = OngoingTransactionsAndJoinTest.this.tm(this.cache).getTransaction();
            this.tx.enlistResource((XAResource)new XAResourceAdapter());
            this.txsStarted.countDown();
        }

        abstract Object key();
    }
}

