/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.remoting;

import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.infinispan.commands.GlobalRpcCommand;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commands.VisitableCommand;
import org.infinispan.commands.module.TestGlobalConfigurationBuilder;
import org.infinispan.commands.remote.CacheRpcCommand;
import org.infinispan.commands.remote.SingleRpcCommand;
import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.factories.GlobalComponentRegistry;
import org.infinispan.manager.CacheContainer;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.remoting.inboundhandler.DeliverOrder;
import org.infinispan.remoting.inboundhandler.InboundInvocationHandler;
import org.infinispan.remoting.inboundhandler.Reply;
import org.infinispan.remoting.responses.ExceptionResponse;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.Transport;
import org.infinispan.test.AbstractInfinispanTest;
import org.infinispan.test.TestException;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.infinispan.util.ByteString;
import org.infinispan.util.concurrent.BlockingTaskAwareExecutorServiceImpl;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups={"functional"}, testName="remoting.AsynchronousInvocationTest")
public class AsynchronousInvocationTest
extends AbstractInfinispanTest {
    public static final String CACHE_NAME = "testCache";
    public static final ByteString CACHE_NAME_BYTES = ByteString.fromString((String)"testCache");
    private EmbeddedCacheManager cacheManager;
    private DummyTaskCountExecutorService nonBlockingExecutorService;
    private DummyTaskCountExecutorService blockingExecutorService;
    private InboundInvocationHandler invocationHandler;
    private Address address;

    private static CacheRpcCommand mockCacheRpcCommand(boolean blocking) throws Throwable {
        CacheRpcCommand mock = (CacheRpcCommand)Mockito.mock(CacheRpcCommand.class);
        Mockito.when((Object)mock.canBlock()).thenReturn((Object)blocking);
        Mockito.when((Object)mock.getCacheName()).thenReturn((Object)CACHE_NAME_BYTES);
        Mockito.when((Object)mock.invokeAsync((ComponentRegistry)ArgumentMatchers.any())).thenReturn((Object)CompletableFutures.completedNull());
        return mock;
    }

    private static GlobalRpcCommand mockGlobalRpcCommand(boolean blocking) throws Throwable {
        GlobalRpcCommand mock = (GlobalRpcCommand)Mockito.mock(GlobalRpcCommand.class);
        Mockito.when((Object)mock.canBlock()).thenReturn((Object)blocking);
        Mockito.when((Object)mock.invokeAsync((GlobalComponentRegistry)ArgumentMatchers.any())).thenReturn((Object)CompletableFutures.completedNull());
        return mock;
    }

    private static ReplicableCommand mockReplicableCommand(boolean blocking) throws Throwable {
        ReplicableCommand mock = (ReplicableCommand)Mockito.mock(ReplicableCommand.class);
        Mockito.when((Object)mock.canBlock()).thenReturn((Object)blocking);
        Mockito.when((Object)mock.invokeAsync()).thenReturn((Object)CompletableFutures.completedNull());
        return mock;
    }

    private static SingleRpcCommand mockSingleRpcCommand(boolean blocking) {
        VisitableCommand mock = (VisitableCommand)Mockito.mock(VisitableCommand.class);
        Mockito.when((Object)mock.canBlock()).thenReturn((Object)blocking);
        return new SingleRpcCommand(CACHE_NAME_BYTES, mock);
    }

    @BeforeClass
    public void setUp() throws Throwable {
        ExecutorService realExecutor = Executors.newSingleThreadExecutor();
        this.nonBlockingExecutorService = new DummyTaskCountExecutorService(realExecutor);
        this.blockingExecutorService = new DummyTaskCountExecutorService(realExecutor);
        BlockingTaskAwareExecutorServiceImpl nonBlockingExecutor = new BlockingTaskAwareExecutorServiceImpl((ExecutorService)this.nonBlockingExecutorService, TIME_SERVICE);
        BlockingTaskAwareExecutorServiceImpl blockingExecutor = new BlockingTaskAwareExecutorServiceImpl((ExecutorService)this.blockingExecutorService, TIME_SERVICE);
        GlobalConfigurationBuilder globalBuilder = GlobalConfigurationBuilder.defaultClusteredBuilder();
        globalBuilder.defaultCacheName(CACHE_NAME);
        ((TestGlobalConfigurationBuilder)globalBuilder.addModule(TestGlobalConfigurationBuilder.class)).testGlobalComponent("org.infinispan.executors.non-blocking", (Object)nonBlockingExecutor).testGlobalComponent("org.infinispan.executors.blocking", (Object)blockingExecutor);
        ConfigurationBuilder builder = TestCacheManagerFactory.getDefaultCacheConfiguration(false);
        builder.clustering().cacheMode(CacheMode.DIST_SYNC);
        this.cacheManager = TestCacheManagerFactory.createClusteredCacheManager(globalBuilder, builder);
        Transport transport = TestingUtil.extractGlobalComponent((CacheContainer)this.cacheManager, Transport.class);
        this.address = transport.getAddress();
        this.invocationHandler = TestingUtil.extractGlobalComponent((CacheContainer)this.cacheManager, InboundInvocationHandler.class);
        this.cacheManager.getCache();
    }

    @AfterClass
    public void tearDown() {
        if (this.cacheManager != null) {
            TestingUtil.extractGlobalComponent((CacheContainer)this.cacheManager, ExecutorService.class, "org.infinispan.executors.non-blocking").shutdownNow();
            TestingUtil.extractGlobalComponent((CacheContainer)this.cacheManager, ExecutorService.class, "org.infinispan.executors.blocking").shutdownNow();
            this.cacheManager.stop();
        }
    }

    public void testCacheRpcCommands() throws Throwable {
        CacheRpcCommand blockingCacheRpcCommand = AsynchronousInvocationTest.mockCacheRpcCommand(true);
        this.assertDispatchForCommand((ReplicableCommand)blockingCacheRpcCommand, true);
        CacheRpcCommand nonBlockingCacheRpcCommand = AsynchronousInvocationTest.mockCacheRpcCommand(false);
        this.assertDispatchForCommand((ReplicableCommand)nonBlockingCacheRpcCommand, false);
    }

    public void testGlobalRpcCommands() throws Throwable {
        GlobalRpcCommand blockingGlobalRpcCommand = AsynchronousInvocationTest.mockGlobalRpcCommand(true);
        this.assertDispatchForCommand((ReplicableCommand)blockingGlobalRpcCommand, true);
        GlobalRpcCommand nonBlockingGlobalRpcCommand = AsynchronousInvocationTest.mockGlobalRpcCommand(false);
        this.assertDispatchForCommand((ReplicableCommand)nonBlockingGlobalRpcCommand, false);
    }

    public void testReplicableCommands() throws Throwable {
        ReplicableCommand blockingReplicableCommand = AsynchronousInvocationTest.mockReplicableCommand(true);
        this.assertDispatchForCommand(blockingReplicableCommand, true);
        ReplicableCommand nonBlockingReplicableCommand = AsynchronousInvocationTest.mockReplicableCommand(false);
        this.assertDispatchForCommand(nonBlockingReplicableCommand, false);
    }

    public void testSingleRpcCommand() throws Throwable {
        SingleRpcCommand blockingSingleRpcCommand = AsynchronousInvocationTest.mockSingleRpcCommand(true);
        this.assertDispatchForCommand((ReplicableCommand)blockingSingleRpcCommand, true);
        SingleRpcCommand nonBlockingSingleRpcCommand = AsynchronousInvocationTest.mockSingleRpcCommand(false);
        this.assertDispatchForCommand((ReplicableCommand)nonBlockingSingleRpcCommand, false);
    }

    private void assertDispatchForCommand(ReplicableCommand command, boolean isBlocking) throws Exception {
        Assert.assertEquals((boolean)isBlocking, (boolean)command.canBlock());
        log.debugf("Testing " + command.getClass().getCanonicalName(), new Object[0]);
        DummyTaskCountExecutorService executorToUse = isBlocking ? this.blockingExecutorService : this.nonBlockingExecutorService;
        executorToUse.reset();
        CompletableFutureResponse response = new CompletableFutureResponse();
        this.invocationHandler.handleFromCluster(this.address, command, (Reply)response, DeliverOrder.NONE);
        response.await(30L, TimeUnit.SECONDS);
        Assert.assertEquals((boolean)executorToUse.hasExecutedCommand, (boolean)isBlocking, (String)("Command " + String.valueOf(command.getClass()) + " dispatched wrongly."));
        executorToUse.reset();
        response = new CompletableFutureResponse();
        this.invocationHandler.handleFromCluster(this.address, command, (Reply)response, DeliverOrder.PER_SENDER);
        response.await(30L, TimeUnit.SECONDS);
        Assert.assertFalse((boolean)executorToUse.hasExecutedCommand, (String)("Command " + String.valueOf(command.getClass()) + " dispatched wrongly."));
    }

    private static class DummyTaskCountExecutorService
    extends AbstractExecutorService {
        private final ExecutorService realExecutor;
        private volatile boolean hasExecutedCommand;

        private DummyTaskCountExecutorService(ExecutorService realExecutor) {
            this.realExecutor = realExecutor;
        }

        @Override
        public void execute(Runnable command) {
            this.hasExecutedCommand = true;
            this.realExecutor.execute(command);
        }

        public void reset() {
            this.hasExecutedCommand = false;
        }

        @Override
        public void shutdown() {
            this.realExecutor.shutdown();
        }

        @Override
        public List<Runnable> shutdownNow() {
            return this.realExecutor.shutdownNow();
        }

        @Override
        public boolean isShutdown() {
            return this.realExecutor.isShutdown();
        }

        @Override
        public boolean isTerminated() {
            return this.realExecutor.isTerminated();
        }

        @Override
        public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
            return this.realExecutor.awaitTermination(timeout, unit);
        }
    }

    private static class CompletableFutureResponse
    implements Reply {
        private final CompletableFuture<Response> responseFuture = new CompletableFuture();

        private CompletableFutureResponse() {
        }

        public void await(long time, TimeUnit unit) throws Exception {
            Response response = this.responseFuture.get(time, unit);
            if (response instanceof ExceptionResponse) {
                throw new TestException(((ExceptionResponse)response).getException());
            }
        }

        public void reply(Response response) {
            this.responseFuture.complete(response);
        }
    }
}

