/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.conflict.impl;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commands.statetransfer.ConflictResolutionStartCommand;
import org.infinispan.commands.statetransfer.StateTransferCancelCommand;
import org.infinispan.commons.CacheException;
import org.infinispan.commons.test.Exceptions;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.conflict.impl.StateReceiverImpl;
import org.infinispan.container.entries.ImmortalCacheEntry;
import org.infinispan.container.impl.InternalDataContainer;
import org.infinispan.distribution.LocalizedCacheTopology;
import org.infinispan.distribution.TestAddress;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.distribution.ch.KeyPartitioner;
import org.infinispan.distribution.ch.impl.DefaultConsistentHashFactory;
import org.infinispan.distribution.ch.impl.HashFunctionPartitioner;
import org.infinispan.notifications.cachelistener.CacheNotifier;
import org.infinispan.notifications.cachelistener.event.DataRehashedEvent;
import org.infinispan.notifications.cachelistener.event.Event;
import org.infinispan.notifications.cachelistener.event.impl.EventImpl;
import org.infinispan.remoting.inboundhandler.DeliverOrder;
import org.infinispan.remoting.responses.SuccessfulResponse;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.rpc.RpcOptions;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.ResponseCollector;
import org.infinispan.statetransfer.InboundTransferTask;
import org.infinispan.statetransfer.StateChunk;
import org.infinispan.test.AbstractInfinispanTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.topology.CacheTopology;
import org.infinispan.topology.PersistentUUID;
import org.infinispan.topology.PersistentUUIDManagerImpl;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups={"functional"}, testName="conflict.resolution.StateReceiverTest")
public class StateReceiverTest
extends AbstractInfinispanTest {
    private StateReceiverImpl<Object, Object> stateReceiver;
    private LocalizedCacheTopology localizedCacheTopology;
    private final ExecutorService stateTransferExecutor = Executors.newSingleThreadExecutor(this.getTestThreadFactory("StateTransfer"));

    public void testGetReplicaException() {
        CompletableFuture<Void> taskFuture = new CompletableFuture<Void>();
        taskFuture.completeExceptionally(new CacheException("Problem encountered retrieving state"));
        this.initTransferTaskMock(taskFuture);
        CompletableFuture cf = this.stateReceiver.getAllReplicasForSegment(0, this.localizedCacheTopology, 10000L);
        Exceptions.expectExecutionException(CacheException.class, (Future)cf);
    }

    public void testTopologyChangeDuringSegmentRequest() {
        this.initTransferTaskMock(new CompletableFuture<Void>());
        CompletableFuture cf = this.stateReceiver.getAllReplicasForSegment(0, this.localizedCacheTopology, 10000L);
        AssertJUnit.assertTrue((!cf.isCancelled() ? 1 : 0) != 0);
        AssertJUnit.assertTrue((!cf.isCompletedExceptionally() ? 1 : 0) != 0);
        this.stateReceiver.onDataRehash((DataRehashedEvent)this.createEventImpl(4, 1, Event.Type.DATA_REHASHED));
        AssertJUnit.assertTrue((boolean)cf.isCompletedExceptionally());
        Exceptions.expectExecutionException(CacheException.class, (Future)cf);
        this.stateReceiver.onDataRehash((DataRehashedEvent)this.createEventImpl(4, 4, Event.Type.DATA_REHASHED));
        cf = this.stateReceiver.getAllReplicasForSegment(1, this.localizedCacheTopology, 10000L);
        AssertJUnit.assertTrue((!cf.isCompletedExceptionally() ? 1 : 0) != 0);
        AssertJUnit.assertTrue((!cf.isCancelled() ? 1 : 0) != 0);
    }

    public void testOldAndInvalidStateIgnored() {
        this.initTransferTaskMock(new CompletableFuture<Void>());
        int segmentId = 0;
        this.stateReceiver.getAllReplicasForSegment(segmentId, this.localizedCacheTopology, 10000L);
        ArrayList sourceAddresses = new ArrayList(this.stateReceiver.getTransferTaskMap(segmentId).keySet());
        Map receiverKeyMap = this.stateReceiver.getKeyReplicaMap(segmentId);
        AssertJUnit.assertEquals((int)0, (int)receiverKeyMap.size());
        this.stateReceiver.receiveState((Address)sourceAddresses.get(0), 2, this.createStateChunks("Key1", "Value1"));
        AssertJUnit.assertEquals((int)1, (int)receiverKeyMap.size());
        this.stateReceiver.receiveState((Address)new TestAddress(5), 2, this.createStateChunks("Key2", "Value2"));
        AssertJUnit.assertEquals((int)1, (int)receiverKeyMap.size());
        this.stateReceiver.receiveState((Address)sourceAddresses.get(1), 1, new ArrayList());
        AssertJUnit.assertEquals((int)1, (int)receiverKeyMap.size());
    }

    @Test(expectedExceptions={CancellationException.class})
    public void testRequestCanBeCancelledDuringTransfer() throws Exception {
        InboundTransferTask task = (InboundTransferTask)Mockito.mock(InboundTransferTask.class);
        Mockito.when((Object)task.requestSegments()).thenAnswer(invocationOnMock -> {
            TestingUtil.sleepThread(1000L);
            return CompletableFuture.completedFuture(new HashMap());
        });
        ((StateReceiverImpl)Mockito.doReturn((Object)task).when(this.stateReceiver)).createTransferTask(((Integer)ArgumentMatchers.any(Integer.class)).intValue(), (Address)ArgumentMatchers.any(Address.class), (CacheTopology)ArgumentMatchers.any(CacheTopology.class), ((Long)ArgumentMatchers.any(Long.class)).longValue());
        CompletableFuture future = this.stateReceiver.getAllReplicasForSegment(0, this.localizedCacheTopology, 10000L);
        future.whenComplete((result, throwable) -> {
            AssertJUnit.assertNull((Object)result);
            AssertJUnit.assertNotNull((Object)throwable);
            AssertJUnit.assertTrue((boolean)(throwable instanceof CancellationException));
        });
        this.stateReceiver.stop();
        future.get();
    }

    @BeforeMethod
    private void createAndInitStateReceiver() {
        CommandsFactory commandsFactory = (CommandsFactory)Mockito.mock(CommandsFactory.class);
        InternalDataContainer dataContainer = (InternalDataContainer)Mockito.mock(InternalDataContainer.class);
        RpcManager rpcManager = (RpcManager)Mockito.mock(RpcManager.class);
        CacheNotifier cacheNotifier = (CacheNotifier)Mockito.mock(CacheNotifier.class);
        Answer answer = invocation -> {
            Collection recipients = (Collection)invocation.getArguments()[0];
            Address recipient = (Address)recipients.iterator().next();
            HashMap<Address, SuccessfulResponse> results = new HashMap<Address, SuccessfulResponse>(1);
            results.put(recipient, SuccessfulResponse.SUCCESSFUL_EMPTY_RESPONSE);
            return results;
        };
        Mockito.when((Object)rpcManager.invokeCommand((Collection)ArgumentMatchers.any(Collection.class), (ReplicableCommand)ArgumentMatchers.any(ConflictResolutionStartCommand.class), (ResponseCollector)ArgumentMatchers.any(), (RpcOptions)ArgumentMatchers.any())).thenAnswer(answer);
        Mockito.when((Object)rpcManager.invokeCommand((Collection)ArgumentMatchers.any(Collection.class), (ReplicableCommand)ArgumentMatchers.any(StateTransferCancelCommand.class), (ResponseCollector)ArgumentMatchers.any(), (RpcOptions)ArgumentMatchers.any())).thenAnswer(answer);
        Mockito.when((Object)rpcManager.getSyncRpcOptions()).thenAnswer(invocation -> new RpcOptions(DeliverOrder.PER_SENDER, 10000L, TimeUnit.MILLISECONDS));
        StateReceiverImpl stateReceiver = new StateReceiverImpl();
        TestingUtil.inject(stateReceiver, cacheNotifier, commandsFactory, dataContainer, rpcManager, this.stateTransferExecutor);
        stateReceiver.start();
        stateReceiver.onDataRehash((DataRehashedEvent)this.createEventImpl(2, 4, Event.Type.DATA_REHASHED));
        this.localizedCacheTopology = this.createLocalizedCacheTopology(4);
        this.stateReceiver = (StateReceiverImpl)Mockito.spy((Object)stateReceiver);
    }

    @AfterClass(alwaysRun=true)
    public void stopExecutor() {
        this.stateTransferExecutor.shutdownNow();
    }

    private void initTransferTaskMock(CompletableFuture<Void> completableFuture) {
        InboundTransferTask task = (InboundTransferTask)Mockito.mock(InboundTransferTask.class);
        Mockito.when((Object)task.requestSegments()).thenReturn(completableFuture);
        ((StateReceiverImpl)Mockito.doReturn((Object)task).when(this.stateReceiver)).createTransferTask(((Integer)ArgumentMatchers.any(Integer.class)).intValue(), (Address)ArgumentMatchers.any(Address.class), (CacheTopology)ArgumentMatchers.any(CacheTopology.class), ((Long)ArgumentMatchers.any(Long.class)).longValue());
    }

    private Collection<StateChunk> createStateChunks(Object key, Object value) {
        Set<ImmortalCacheEntry> entries = Collections.singleton(new ImmortalCacheEntry(key, value));
        return Collections.singleton(new StateChunk(0, entries, true));
    }

    private ConsistentHash createConsistentHash(int numberOfNodes) {
        PersistentUUIDManagerImpl persistentUUIDManager = new PersistentUUIDManagerImpl();
        ArrayList<TestAddress> addresses = new ArrayList<TestAddress>(numberOfNodes);
        for (int i = 0; i < numberOfNodes; ++i) {
            TestAddress address = new TestAddress(i);
            addresses.add(address);
            persistentUUIDManager.addPersistentAddressMapping((Address)address, PersistentUUID.randomUUID());
        }
        DefaultConsistentHashFactory chf = new DefaultConsistentHashFactory();
        return chf.create(2, 40, addresses, null);
    }

    private LocalizedCacheTopology createLocalizedCacheTopology(int numberOfNodes) {
        ConsistentHash hash = this.createConsistentHash(numberOfNodes);
        CacheTopology topology = new CacheTopology(-1, -1, hash, null, CacheTopology.Phase.NO_REBALANCE, hash.getMembers(), null);
        return new LocalizedCacheTopology(CacheMode.DIST_SYNC, topology, (KeyPartitioner)new HashFunctionPartitioner(), (Address)hash.getMembers().get(0), true);
    }

    private EventImpl createEventImpl(int topologyId, int numberOfNodes, Event.Type type) {
        EventImpl event = EventImpl.createEvent(null, (Event.Type)type);
        ConsistentHash hash = this.createConsistentHash(numberOfNodes);
        event.setReadConsistentHashAtEnd(hash);
        event.setWriteConsistentHashAtEnd(hash);
        event.setNewTopologyId(topologyId);
        event.setPre(true);
        return event;
    }
}

