/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.statetransfer;

import io.reactivex.rxjava3.core.Flowable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import org.infinispan.Cache;
import org.infinispan.commands.CommandInvocationId;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commands.VisitableCommand;
import org.infinispan.commands.statetransfer.StateTransferCancelCommand;
import org.infinispan.commands.statetransfer.StateTransferGetTransactionsCommand;
import org.infinispan.commands.statetransfer.StateTransferStartCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.commons.util.IntSet;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.cache.IsolationLevel;
import org.infinispan.conflict.impl.InternalConflictManager;
import org.infinispan.container.entries.ImmortalCacheEntry;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.container.impl.InternalDataContainer;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.InvocationContextFactory;
import org.infinispan.context.impl.SingleKeyNonTxInvocationContext;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.distribution.TestAddress;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.distribution.ch.impl.DefaultConsistentHash;
import org.infinispan.distribution.ch.impl.DefaultConsistentHashFactory;
import org.infinispan.distribution.ch.impl.HashFunctionPartitioner;
import org.infinispan.interceptors.AsyncInterceptorChain;
import org.infinispan.lifecycle.ComponentStatus;
import org.infinispan.metadata.Metadata;
import org.infinispan.notifications.cachelistener.CacheNotifier;
import org.infinispan.persistence.manager.PersistenceManager;
import org.infinispan.reactive.publisher.impl.LocalPublisherManager;
import org.infinispan.remoting.inboundhandler.DeliverOrder;
import org.infinispan.remoting.inboundhandler.PerCacheInboundInvocationHandler;
import org.infinispan.remoting.responses.SuccessfulResponse;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.rpc.RpcOptions;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.ResponseCollector;
import org.infinispan.remoting.transport.Transport;
import org.infinispan.statetransfer.CommitManager;
import org.infinispan.statetransfer.StateChunk;
import org.infinispan.statetransfer.StateConsumer;
import org.infinispan.statetransfer.StateConsumerImpl;
import org.infinispan.statetransfer.StateTransferLock;
import org.infinispan.test.AbstractInfinispanTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.topology.CacheTopology;
import org.infinispan.topology.PersistentUUID;
import org.infinispan.topology.PersistentUUIDManager;
import org.infinispan.topology.PersistentUUIDManagerImpl;
import org.infinispan.transaction.impl.TransactionTable;
import org.infinispan.util.ByteString;
import org.infinispan.util.concurrent.CommandAckCollector;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.infinispan.xsite.statetransfer.XSiteStateTransferManager;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.testng.Assert;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups={"functional"}, testName="statetransfer.StateConsumerTest")
public class StateConsumerTest
extends AbstractInfinispanTest {
    private static final Log log = LogFactory.getLog(StateConsumerTest.class);
    private static final ByteString CACHE_NAME = ByteString.fromString((String)"test-cache");
    private ExecutorService pooledExecutorService;

    @BeforeMethod
    public void createExecutorService() {
        this.pooledExecutorService = new ThreadPoolExecutor(0, 20, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), this.getTestThreadFactory("Worker"), new ThreadPoolExecutor.CallerRunsPolicy());
    }

    @AfterMethod
    public void shutdownExecutorService() {
        if (this.pooledExecutorService != null) {
            this.pooledExecutorService.shutdownNow();
            this.pooledExecutorService = null;
        }
    }

    private static Address[] createMembers(PersistentUUIDManager persistentUUIDManager) {
        Address[] addresses = new Address[4];
        for (int i = 0; i < 4; ++i) {
            addresses[i] = new TestAddress(i);
            persistentUUIDManager.addPersistentAddressMapping(addresses[i], PersistentUUID.randomUUID());
        }
        return addresses;
    }

    private static XSiteStateTransferManager mockXSiteStateTransferManager() {
        XSiteStateTransferManager mock = (XSiteStateTransferManager)Mockito.mock(XSiteStateTransferManager.class);
        ((XSiteStateTransferManager)Mockito.doNothing().when((Object)mock)).onTopologyUpdated((CacheTopology)ArgumentMatchers.any(CacheTopology.class), ArgumentMatchers.anyBoolean());
        return mock;
    }

    private static CommandsFactory mockCommandsFactory() {
        CommandsFactory mock = (CommandsFactory)Mockito.mock(CommandsFactory.class);
        Mockito.when((Object)mock.buildStateTransferStartCommand(ArgumentMatchers.anyInt(), (IntSet)ArgumentMatchers.any(IntSet.class))).thenAnswer(invocation -> new StateTransferStartCommand(CACHE_NAME, ((Integer)invocation.getArgument(0)).intValue(), (IntSet)invocation.getArgument(1)));
        Mockito.when((Object)mock.buildStateTransferGetTransactionsCommand(ArgumentMatchers.anyInt(), (IntSet)ArgumentMatchers.any(IntSet.class))).thenAnswer(invocation -> new StateTransferGetTransactionsCommand(CACHE_NAME, ((Integer)invocation.getArgument(0)).intValue(), (IntSet)invocation.getArgument(1)));
        Mockito.when((Object)mock.buildStateTransferCancelCommand(ArgumentMatchers.anyInt(), (IntSet)ArgumentMatchers.any(IntSet.class))).thenAnswer(invocation -> new StateTransferCancelCommand(CACHE_NAME, ((Integer)invocation.getArgument(0)).intValue(), (IntSet)invocation.getArgument(1)));
        Mockito.when((Object)mock.buildPutKeyValueCommand(ArgumentMatchers.any(), ArgumentMatchers.any(), ArgumentMatchers.anyInt(), (Metadata)ArgumentMatchers.any(Metadata.class), ArgumentMatchers.anyLong())).thenAnswer(invocation -> new PutKeyValueCommand(invocation.getArgument(0), invocation.getArgument(1), false, false, (Metadata)invocation.getArgument(3), ((Integer)invocation.getArgument(2)).intValue(), ((Long)invocation.getArgument(4)).longValue(), CommandInvocationId.DUMMY_INVOCATION_ID));
        return mock;
    }

    private static Configuration createConfiguration() {
        ConfigurationBuilder cb = new ConfigurationBuilder();
        cb.invocationBatching().enable().clustering().cacheMode(CacheMode.DIST_SYNC).clustering().stateTransfer().timeout(30000L).locking().lockAcquisitionTimeout(TestingUtil.shortTimeoutMillis()).locking().isolationLevel(IsolationLevel.REPEATABLE_READ);
        return cb.build();
    }

    private static Cache<?, ?> mockCache() {
        Cache cache = (Cache)Mockito.mock(Cache.class);
        Mockito.when((Object)cache.getName()).thenReturn((Object)CACHE_NAME.toString());
        Mockito.when((Object)cache.getStatus()).thenReturn((Object)ComponentStatus.RUNNING);
        return cache;
    }

    private static RpcManager mockRpcManager(Map<Address, Set<Integer>> requestedSegments, Set<Integer> flatRequestedSegments, Address address) {
        Transport transport = (Transport)Mockito.mock(Transport.class);
        Mockito.when((Object)transport.getViewId()).thenReturn((Object)1);
        RpcManager rpcManager = (RpcManager)Mockito.mock(RpcManager.class);
        Answer successfulResponse = invocation -> CompletableFuture.completedFuture(SuccessfulResponse.SUCCESSFUL_EMPTY_RESPONSE);
        Mockito.when((Object)rpcManager.invokeCommand((Address)ArgumentMatchers.any(Address.class), (ReplicableCommand)ArgumentMatchers.any(StateTransferGetTransactionsCommand.class), (ResponseCollector)ArgumentMatchers.any(ResponseCollector.class), (RpcOptions)ArgumentMatchers.any(RpcOptions.class))).thenAnswer(invocation -> {
            Address recipient = (Address)invocation.getArgument(0);
            StateTransferGetTransactionsCommand cmd = (StateTransferGetTransactionsCommand)invocation.getArgument(1);
            IntSet segments = cmd.getSegments();
            requestedSegments.put(recipient, (Set<Integer>)segments);
            flatRequestedSegments.addAll((Collection<Integer>)segments);
            return CompletableFuture.completedFuture(SuccessfulResponse.create(new ArrayList()));
        });
        Mockito.when((Object)rpcManager.invokeCommand((Address)ArgumentMatchers.any(Address.class), (ReplicableCommand)ArgumentMatchers.any(StateTransferStartCommand.class), (ResponseCollector)ArgumentMatchers.any(ResponseCollector.class), (RpcOptions)ArgumentMatchers.any(RpcOptions.class))).thenAnswer(successfulResponse);
        Mockito.when((Object)rpcManager.invokeCommand((Address)ArgumentMatchers.any(Address.class), (ReplicableCommand)ArgumentMatchers.any(StateTransferCancelCommand.class), (ResponseCollector)ArgumentMatchers.any(ResponseCollector.class), (RpcOptions)ArgumentMatchers.any(RpcOptions.class))).thenAnswer(successfulResponse);
        Mockito.when((Object)rpcManager.getSyncRpcOptions()).thenReturn((Object)new RpcOptions(DeliverOrder.NONE, 10000L, TimeUnit.MILLISECONDS));
        Mockito.when((Object)rpcManager.blocking((CompletionStage)ArgumentMatchers.any())).thenAnswer(invocation -> ((CompletionStage)invocation.getArgument(0)).toCompletableFuture().join());
        Mockito.when((Object)rpcManager.getAddress()).thenReturn((Object)address);
        Mockito.when((Object)rpcManager.getTransport()).thenReturn((Object)transport);
        return rpcManager;
    }

    private static PersistenceManager mockPersistenceManager() {
        PersistenceManager persistenceManager = (PersistenceManager)Mockito.mock(PersistenceManager.class);
        Mockito.when((Object)persistenceManager.removeSegments((IntSet)ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(false));
        Mockito.when((Object)persistenceManager.addSegments((IntSet)ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(false));
        Mockito.when((Object)persistenceManager.publishKeys((Predicate)ArgumentMatchers.any(), (Predicate)ArgumentMatchers.any())).thenReturn((Object)Flowable.empty());
        return persistenceManager;
    }

    private static TransactionTable mockTransactionTable() {
        TransactionTable transactionTable = (TransactionTable)Mockito.mock(TransactionTable.class);
        Mockito.when((Object)transactionTable.getLocalTransactions()).thenReturn(Collections.emptyList());
        Mockito.when((Object)transactionTable.getRemoteTransactions()).thenReturn(Collections.emptyList());
        return transactionTable;
    }

    private static InvocationContextFactory mockInvocationContextFactory() {
        InvocationContextFactory icf = (InvocationContextFactory)Mockito.mock(InvocationContextFactory.class);
        Mockito.when((Object)icf.createSingleKeyNonTxInvocationContext()).thenAnswer(invocationOnMock -> new SingleKeyNonTxInvocationContext(null));
        return icf;
    }

    private static void noRebalance(StateConsumer stateConsumer, PersistentUUIDManager persistentUUIDManager, int topologyId, int rebalanceId, ConsistentHash ch) {
        stateConsumer.onTopologyUpdate(new CacheTopology(topologyId, rebalanceId, ch, null, CacheTopology.Phase.NO_REBALANCE, ch.getMembers(), persistentUUIDManager.mapAddresses(ch.getMembers())), false);
    }

    private static void rebalanceStart(StateConsumer stateConsumer, PersistentUUIDManager persistentUUIDManager, int topologyId, int rebalanceId, ConsistentHash current, ConsistentHash pending, ConsistentHash union) {
        stateConsumer.onTopologyUpdate(new CacheTopology(topologyId, rebalanceId, current, pending, union, CacheTopology.Phase.READ_OLD_WRITE_ALL, union.getMembers(), persistentUUIDManager.mapAddresses(union.getMembers())), true);
    }

    private static void assertRebalanceStart(StateConsumerImpl stateConsumer, ConsistentHash current, ConsistentHash pending, Address member, Set<Integer> flatRequestedSegments) {
        Set oldSegments = current.getSegmentsForOwner(member);
        Set newSegments = pending.getSegmentsForOwner(member);
        newSegments.removeAll(oldSegments);
        log.debugf("Rebalancing. Added segments=%s, old segments=%s", (Object)newSegments, (Object)oldSegments);
        AssertJUnit.assertTrue((boolean)stateConsumer.hasActiveTransfers());
        Assert.assertEquals(flatRequestedSegments, (Set)newSegments);
        Assert.assertEquals((long)stateConsumer.inflightRequestCount(), (long)newSegments.size());
    }

    private static void completeAndCheckRebalance(StateConsumerImpl stateConsumer, Map<Address, Set<Integer>> requestedSegments, int topologyId) throws ExecutionException, InterruptedException, TimeoutException {
        long inflightCounter = requestedSegments.values().stream().mapToLong(Collection::size).sum();
        Assert.assertEquals((long)stateConsumer.inflightRequestCount(), (long)inflightCounter);
        for (Map.Entry<Address, Set<Integer>> entry : requestedSegments.entrySet()) {
            for (Integer segment : entry.getValue()) {
                List<StateChunk> chunks = Collections.singletonList(new StateChunk(segment.intValue(), Collections.emptyList(), true));
                stateConsumer.applyState(entry.getKey(), topologyId, chunks).toCompletableFuture().get(10L, TimeUnit.SECONDS);
                Assert.assertEquals((long)stateConsumer.inflightRequestCount(), (long)(--inflightCounter));
            }
        }
        Assert.assertEquals((long)stateConsumer.inflightRequestCount(), (long)0L);
        StateConsumerTest.eventually(() -> !stateConsumer.hasActiveTransfers());
    }

    private static void applyState(StateConsumer stateConsumer, Map<Address, Set<Integer>> requestedSegments, Collection<InternalCacheEntry<?, ?>> cacheEntries) {
        Map.Entry<Address, Set<Integer>> entry = requestedSegments.entrySet().iterator().next();
        List<StateChunk> chunks = Collections.singletonList(new StateChunk(entry.getValue().iterator().next().intValue(), cacheEntries, true));
        stateConsumer.applyState(entry.getKey(), 22, chunks);
    }

    private void injectComponents(StateConsumer stateConsumer, AsyncInterceptorChain interceptorChain, RpcManager rpcManager) {
        TestingUtil.inject(stateConsumer, StateConsumerTest.mockCache(), TestingUtil.named("org.infinispan.executors.non-blocking", this.pooledExecutorService), interceptorChain, StateConsumerTest.mockInvocationContextFactory(), StateConsumerTest.createConfiguration(), rpcManager, StateConsumerTest.mockCommandsFactory(), StateConsumerTest.mockPersistenceManager(), Mockito.mock(InternalDataContainer.class), StateConsumerTest.mockTransactionTable(), Mockito.mock(StateTransferLock.class), Mockito.mock(CacheNotifier.class), new CommitManager(), new CommandAckCollector(), new HashFunctionPartitioner(), Mockito.mock(InternalConflictManager.class), Mockito.mock(DistributionManager.class), Mockito.mock(LocalPublisherManager.class), Mockito.mock(PerCacheInboundInvocationHandler.class), StateConsumerTest.mockXSiteStateTransferManager());
    }

    public void testClusterRecoverDuringStateTransfer() throws Exception {
        PersistentUUIDManagerImpl persistentUUIDManager = new PersistentUUIDManagerImpl();
        Address[] addresses = StateConsumerTest.createMembers((PersistentUUIDManager)persistentUUIDManager);
        List<Address> members1 = Arrays.asList(addresses[0], addresses[1], addresses[2], addresses[3]);
        List<Address> members2 = Arrays.asList(addresses[0], addresses[1], addresses[2]);
        DefaultConsistentHashFactory chf = new DefaultConsistentHashFactory();
        DefaultConsistentHash ch1 = chf.create(2, 40, members1, null);
        DefaultConsistentHash ch2 = chf.updateMembers(ch1, members2, null);
        DefaultConsistentHash ch3 = chf.rebalance(ch2);
        DefaultConsistentHash ch23 = chf.union(ch2, ch3);
        log.debug((Object)ch1);
        log.debug((Object)ch2);
        ConcurrentHashMap<Address, Set<Integer>> requestedSegments = new ConcurrentHashMap<Address, Set<Integer>>();
        ConcurrentSkipListSet<Integer> flatRequestedSegments = new ConcurrentSkipListSet<Integer>();
        StateConsumerImpl stateConsumer = new StateConsumerImpl();
        this.injectComponents((StateConsumer)stateConsumer, (AsyncInterceptorChain)Mockito.mock(AsyncInterceptorChain.class), StateConsumerTest.mockRpcManager(requestedSegments, flatRequestedSegments, addresses[0]));
        stateConsumer.start();
        AssertJUnit.assertFalse((boolean)stateConsumer.hasActiveTransfers());
        StateConsumerTest.noRebalance((StateConsumer)stateConsumer, (PersistentUUIDManager)persistentUUIDManager, 1, 1, (ConsistentHash)ch2);
        AssertJUnit.assertFalse((boolean)stateConsumer.hasActiveTransfers());
        StateConsumerTest.rebalanceStart((StateConsumer)stateConsumer, (PersistentUUIDManager)persistentUUIDManager, 2, 2, (ConsistentHash)ch2, (ConsistentHash)ch3, (ConsistentHash)ch23);
        StateConsumerTest.assertRebalanceStart(stateConsumer, (ConsistentHash)ch2, (ConsistentHash)ch3, addresses[0], flatRequestedSegments);
        Future<Object> future = this.fork(() -> StateConsumerTest.lambda$testClusterRecoverDuringStateTransfer$9(stateConsumer, (PersistentUUIDManager)persistentUUIDManager, ch2));
        StateConsumerTest.noRebalance((StateConsumer)stateConsumer, (PersistentUUIDManager)persistentUUIDManager, 3, 2, (ConsistentHash)ch2);
        future.get();
        AssertJUnit.assertFalse((boolean)stateConsumer.hasActiveTransfers());
        requestedSegments.clear();
        flatRequestedSegments.clear();
        StateConsumerTest.rebalanceStart((StateConsumer)stateConsumer, (PersistentUUIDManager)persistentUUIDManager, 4, 4, (ConsistentHash)ch2, (ConsistentHash)ch3, (ConsistentHash)ch23);
        StateConsumerTest.assertRebalanceStart(stateConsumer, (ConsistentHash)ch2, (ConsistentHash)ch3, addresses[0], flatRequestedSegments);
        StateConsumerTest.completeAndCheckRebalance(stateConsumer, requestedSegments, 4);
        stateConsumer.stop();
    }

    public void testJoinDuringStateTransfer() throws Exception {
        PersistentUUIDManagerImpl persistentUUIDManager = new PersistentUUIDManagerImpl();
        Address[] addresses = StateConsumerTest.createMembers((PersistentUUIDManager)persistentUUIDManager);
        List<Address> members1 = Arrays.asList(addresses[0], addresses[1], addresses[2]);
        List<Address> members2 = Arrays.asList(addresses[1], addresses[2]);
        DefaultConsistentHashFactory chf = new DefaultConsistentHashFactory();
        DefaultConsistentHash ch1 = chf.create(2, 40, members1, null);
        DefaultConsistentHash ch2 = chf.updateMembers(ch1, members2, null);
        DefaultConsistentHash ch3 = chf.rebalance(ch2);
        DefaultConsistentHash ch23 = chf.union(ch2, ch3);
        log.debug((Object)ch1);
        log.debug((Object)ch2);
        log.debug((Object)ch23);
        ConcurrentHashMap<Address, Set<Integer>> requestedSegments = new ConcurrentHashMap<Address, Set<Integer>>();
        ConcurrentSkipListSet<Integer> flatRequestedSegments = new ConcurrentSkipListSet<Integer>();
        CompletableFuture<Object> putFuture = new CompletableFuture<Object>();
        AsyncInterceptorChain interceptorChain = (AsyncInterceptorChain)Mockito.mock(AsyncInterceptorChain.class);
        Mockito.when((Object)interceptorChain.invokeAsync((InvocationContext)ArgumentMatchers.any(), (VisitableCommand)ArgumentMatchers.any())).thenReturn(putFuture);
        StateConsumerImpl stateConsumer = new StateConsumerImpl();
        this.injectComponents((StateConsumer)stateConsumer, interceptorChain, StateConsumerTest.mockRpcManager(requestedSegments, flatRequestedSegments, addresses[1]));
        stateConsumer.start();
        StateConsumerTest.noRebalance((StateConsumer)stateConsumer, (PersistentUUIDManager)persistentUUIDManager, 21, 7, (ConsistentHash)ch1);
        AssertJUnit.assertFalse((boolean)stateConsumer.hasActiveTransfers());
        StateConsumerTest.rebalanceStart((StateConsumer)stateConsumer, (PersistentUUIDManager)persistentUUIDManager, 22, 8, (ConsistentHash)ch2, (ConsistentHash)ch3, (ConsistentHash)ch23);
        StateConsumerTest.assertRebalanceStart(stateConsumer, (ConsistentHash)ch2, (ConsistentHash)ch3, addresses[1], flatRequestedSegments);
        StateConsumerTest.applyState((StateConsumer)stateConsumer, requestedSegments, Collections.singletonList(new ImmortalCacheEntry((Object)"a", (Object)"b")));
        StateConsumerTest.noRebalance((StateConsumer)stateConsumer, (PersistentUUIDManager)persistentUUIDManager, 23, 9, (ConsistentHash)ch2);
        requestedSegments.clear();
        flatRequestedSegments.clear();
        StateConsumerTest.rebalanceStart((StateConsumer)stateConsumer, (PersistentUUIDManager)persistentUUIDManager, 24, 10, (ConsistentHash)ch2, (ConsistentHash)ch3, (ConsistentHash)ch23);
        StateConsumerTest.assertRebalanceStart(stateConsumer, (ConsistentHash)ch2, (ConsistentHash)ch3, addresses[1], flatRequestedSegments);
        putFuture.complete(null);
        StateConsumerTest.completeAndCheckRebalance(stateConsumer, requestedSegments, 24);
        stateConsumer.stop();
    }

    private static /* synthetic */ Object lambda$testClusterRecoverDuringStateTransfer$9(StateConsumerImpl stateConsumer, PersistentUUIDManager persistentUUIDManager, DefaultConsistentHash ch2) throws Exception {
        StateConsumerTest.noRebalance((StateConsumer)stateConsumer, persistentUUIDManager, 3, 2, (ConsistentHash)ch2);
        return null;
    }
}

