/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.container.versioning;

import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.infinispan.Cache;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commands.VisitableCommand;
import org.infinispan.commands.remote.CacheRpcCommand;
import org.infinispan.commands.tx.CommitCommand;
import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.cache.IsolationLevel;
import org.infinispan.container.DataContainer;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.container.impl.InternalDataContainer;
import org.infinispan.container.versioning.EntryVersion;
import org.infinispan.container.versioning.IncrementableEntryVersion;
import org.infinispan.container.versioning.InequalVersionComparisonResult;
import org.infinispan.container.versioning.VersionGenerator;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.TxInvocationContext;
import org.infinispan.distribution.ch.ConsistentHashFactory;
import org.infinispan.interceptors.AsyncInterceptor;
import org.infinispan.interceptors.DDAsyncInterceptor;
import org.infinispan.remoting.inboundhandler.AbstractDelegatingHandler;
import org.infinispan.remoting.inboundhandler.DeliverOrder;
import org.infinispan.remoting.inboundhandler.PerCacheInboundInvocationHandler;
import org.infinispan.remoting.inboundhandler.Reply;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.rpc.RpcOptions;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.ResponseCollector;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestDataSCI;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.CleanupAfterMethod;
import org.infinispan.test.fwk.InCacheMode;
import org.infinispan.transaction.impl.WriteSkewHelper;
import org.infinispan.util.AbstractDelegatingRpcManager;
import org.infinispan.util.ControlledConsistentHashFactory;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@Test(groups={"functional"}, testName="container.versioning.WriteSkewConsistencyTest")
@CleanupAfterMethod
@InCacheMode(value={CacheMode.DIST_SYNC, CacheMode.REPL_SYNC})
public class WriteSkewConsistencyTest
extends MultipleCacheManagersTest {
    public void testValidationOnlyInPrimaryOwner() throws Exception {
        String key = "key";
        DataContainer primaryOwnerDataContainer = (DataContainer)TestingUtil.extractComponent(this.cache(1), InternalDataContainer.class);
        DataContainer backupOwnerDataContainer = (DataContainer)TestingUtil.extractComponent(this.cache(0), InternalDataContainer.class);
        VersionGenerator versionGenerator = TestingUtil.extractComponent(this.cache(1), VersionGenerator.class);
        this.injectReorderResponseRpcManager(this.cache(3), this.cache(0));
        this.cache(1).put((Object)key, (Object)1);
        for (Cache cache : this.caches()) {
            AssertJUnit.assertEquals((String)("Wrong initial value for cache " + String.valueOf(this.address(cache))), (Object)1, (Object)cache.get((Object)key));
        }
        InternalCacheEntry ice0 = primaryOwnerDataContainer.peek((Object)key);
        InternalCacheEntry ice1 = backupOwnerDataContainer.peek((Object)key);
        WriteSkewConsistencyTest.assertSameVersion("Wrong version for the same key", (EntryVersion)WriteSkewHelper.versionFromEntry((CacheEntry)ice0), (EntryVersion)WriteSkewHelper.versionFromEntry((CacheEntry)ice1));
        IncrementableEntryVersion version0 = WriteSkewHelper.versionFromEntry((CacheEntry)ice0);
        IncrementableEntryVersion version1 = versionGenerator.increment(version0);
        IncrementableEntryVersion version2 = versionGenerator.increment(version1);
        ControllerInboundInvocationHandler handler = TestingUtil.wrapInboundInvocationHandler(this.cache(0), ControllerInboundInvocationHandler::new);
        BackupOwnerInterceptor backupOwnerInterceptor = WriteSkewConsistencyTest.injectBackupOwnerInterceptor(this.cache(0));
        backupOwnerInterceptor.blockCommit();
        handler.discardRemoteGet = true;
        Future<Boolean> tx1 = this.fork(() -> {
            this.tm(2).begin();
            AssertJUnit.assertEquals((String)"Wrong value for tx1.", (Object)1, (Object)this.cache(2).get(key));
            this.cache(2).put(key, (Object)2);
            this.tm(2).commit();
            return Boolean.TRUE;
        });
        WriteSkewConsistencyTest.eventually(() -> Objects.equals(this.cache(1).get(key), 2));
        WriteSkewConsistencyTest.eventually(() -> Objects.equals(this.cache(3).get(key), 2));
        WriteSkewConsistencyTest.assertSameVersion("Wrong version in the primary owner", (EntryVersion)WriteSkewHelper.versionFromEntry((CacheEntry)primaryOwnerDataContainer.peek((Object)key)), (EntryVersion)version1);
        WriteSkewConsistencyTest.assertSameVersion("Wrong version in the backup owner", (EntryVersion)WriteSkewHelper.versionFromEntry((CacheEntry)backupOwnerDataContainer.peek((Object)key)), (EntryVersion)version0);
        backupOwnerInterceptor.resetPrepare();
        Future<Boolean> tx2 = this.fork(() -> {
            this.tm(3).begin();
            AssertJUnit.assertEquals((String)"Wrong value for tx2.", (Object)2, (Object)this.cache(3).get(key));
            this.cache(3).put(key, (Object)3);
            this.tm(3).commit();
            return Boolean.TRUE;
        });
        AssertJUnit.assertTrue((String)"Prepare of tx2 was never received.", (boolean)backupOwnerInterceptor.awaitPrepare());
        backupOwnerInterceptor.unblockCommit();
        handler.discardRemoteGet = false;
        AssertJUnit.assertTrue((String)"Error in tx1.", (boolean)tx1.get(15L, TimeUnit.SECONDS));
        AssertJUnit.assertTrue((String)"Error in tx2.", (boolean)tx2.get(15L, TimeUnit.SECONDS));
        WriteSkewConsistencyTest.assertSameVersion("Wrong version in the primary owner", (EntryVersion)WriteSkewHelper.versionFromEntry((CacheEntry)primaryOwnerDataContainer.peek((Object)key)), (EntryVersion)version2);
        WriteSkewConsistencyTest.assertSameVersion("Wrong version in the backup owner", (EntryVersion)WriteSkewHelper.versionFromEntry((CacheEntry)backupOwnerDataContainer.peek((Object)key)), (EntryVersion)version2);
        this.assertNoTransactions();
        this.assertNotLocked(key);
    }

    @Override
    protected final void createCacheManagers() {
        ConfigurationBuilder builder = WriteSkewConsistencyTest.getDefaultClusteredCacheConfig(this.cacheMode, true);
        builder.locking().isolationLevel(IsolationLevel.REPEATABLE_READ);
        ControlledConsistentHashFactory consistentHashFactory = this.cacheMode.isReplicated() ? new ControlledConsistentHashFactory.Replicated(1) : new ControlledConsistentHashFactory.Default(1, 0);
        builder.clustering().hash().numSegments(1).consistentHashFactory((ConsistentHashFactory)consistentHashFactory);
        this.createClusteredCaches(4, TestDataSCI.INSTANCE, builder);
    }

    private static BackupOwnerInterceptor injectBackupOwnerInterceptor(Cache<?, ?> cache) {
        BackupOwnerInterceptor ownerInterceptor = new BackupOwnerInterceptor();
        TestingUtil.extractInterceptorChain(cache).addInterceptor((AsyncInterceptor)ownerInterceptor, 1);
        return ownerInterceptor;
    }

    private void injectReorderResponseRpcManager(Cache<?, ?> toInject, Cache<?, ?> lastResponse) {
        RpcManager rpcManager = TestingUtil.extractComponent(toInject, RpcManager.class);
        ReorderResponsesRpcManager newRpcManager = new ReorderResponsesRpcManager(this.address(lastResponse), rpcManager);
        TestingUtil.replaceComponent(toInject, RpcManager.class, newRpcManager, true);
    }

    private static void assertSameVersion(String message, EntryVersion v0, EntryVersion v1) {
        AssertJUnit.assertEquals((String)message, (Object)InequalVersionComparisonResult.EQUAL, (Object)v0.compareTo(v1));
    }

    private static class ControllerInboundInvocationHandler
    extends AbstractDelegatingHandler {
        private volatile boolean discardRemoteGet;

        private ControllerInboundInvocationHandler(PerCacheInboundInvocationHandler delegate) {
            super(delegate);
        }

        public void handle(CacheRpcCommand command, Reply reply, DeliverOrder order) {
            if (this.discardRemoteGet && command.getCommandId() == 16) {
                return;
            }
            this.delegate.handle(command, reply, order);
        }
    }

    static class BackupOwnerInterceptor
    extends DDAsyncInterceptor {
        private final Object prepareProcessedLock = new Object();
        private boolean prepareProcessed;
        private volatile CompletableFuture<Void> commitBlocker = CompletableFutures.completedNull();

        BackupOwnerInterceptor() {
        }

        public Object visitCommitCommand(TxInvocationContext ctx, CommitCommand command) throws Throwable {
            return this.asyncInvokeNext((InvocationContext)ctx, (VisitableCommand)command, this.commitBlocker);
        }

        public Object visitPrepareCommand(TxInvocationContext ctx, PrepareCommand command) throws Throwable {
            return this.invokeNextAndFinally((InvocationContext)ctx, (VisitableCommand)command, (rCtx, rCommand, rv, throwable) -> this.notifyPrepareProcessed());
        }

        void blockCommit() {
            this.commitBlocker = new CompletableFuture();
        }

        void unblockCommit() {
            this.commitBlocker.complete(null);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        boolean awaitPrepare() throws InterruptedException {
            Object object = this.prepareProcessedLock;
            synchronized (object) {
                long endTime = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(10000L);
                long sleepTime = TimeUnit.NANOSECONDS.toMillis(endTime - System.nanoTime());
                while (!this.prepareProcessed && sleepTime > 0L) {
                    this.prepareProcessedLock.wait(sleepTime);
                    sleepTime = TimeUnit.NANOSECONDS.toMillis(endTime - System.nanoTime());
                }
                return this.prepareProcessed;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void resetPrepare() {
            Object object = this.prepareProcessedLock;
            synchronized (object) {
                this.prepareProcessed = false;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void notifyPrepareProcessed() {
            Object object = this.prepareProcessedLock;
            synchronized (object) {
                this.prepareProcessed = true;
                this.prepareProcessedLock.notifyAll();
            }
        }
    }

    private static class ReorderResponsesRpcManager
    extends AbstractDelegatingRpcManager {
        private final Address lastResponse;

        ReorderResponsesRpcManager(Address lastResponse, RpcManager realOne) {
            super(realOne);
            this.lastResponse = lastResponse;
        }

        @Override
        protected <T> CompletionStage<T> performRequest(Collection<Address> targets, ReplicableCommand command, ResponseCollector<T> collector, Function<ResponseCollector<T>, CompletionStage<T>> invoker, RpcOptions rpcOptions) {
            return super.performRequest(targets, command, collector, invoker, rpcOptions).thenApply(responseObject -> {
                if (!(responseObject instanceof Map)) {
                    log.debugf("Single response for command %s: %s", (Object)command, responseObject);
                    return responseObject;
                }
                LinkedHashMap<Address, Response> newResponseMap = new LinkedHashMap<Address, Response>(targets.size());
                boolean containsLastResponseAddress = false;
                for (Map.Entry entry : ((Map)responseObject).entrySet()) {
                    if (this.lastResponse.equals(entry.getKey())) {
                        containsLastResponseAddress = true;
                        continue;
                    }
                    newResponseMap.put((Address)entry.getKey(), (Response)entry.getValue());
                }
                if (containsLastResponseAddress) {
                    newResponseMap.put(this.lastResponse, (Response)((Map)responseObject).get(this.lastResponse));
                }
                log.debugf("Responses for command %s are %s", (Object)command, newResponseMap.values());
                return newResponseMap;
            });
        }
    }
}

