/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.statetransfer;

import io.reactivex.rxjava3.core.Flowable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.PrimitiveIterator;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commands.statetransfer.StateResponseCommand;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.IntSets;
import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.infinispan.container.entries.ImmortalCacheEntry;
import org.infinispan.reactive.publisher.impl.Notifications;
import org.infinispan.reactive.publisher.impl.SegmentAwarePublisherSupplier;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.rpc.RpcOptions;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.LocalModeAddress;
import org.infinispan.remoting.transport.ResponseCollector;
import org.infinispan.statetransfer.OutboundTransferTask;
import org.infinispan.statetransfer.StateChunk;
import org.infinispan.test.TestException;
import org.infinispan.test.fwk.CleanupAfterMethod;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;

@Test(groups={"functional"}, testName="statetransfer.OutboundTransferTaskTest")
@CleanupAfterMethod
public class OutboundTransferTaskTest {
    public void shouldNotifyForAllSegments() throws InterruptedException {
        int numSegments = 30;
        IntSet segments = IntSets.from((PrimitiveIterator.OfInt)IntStream.range(0, numSegments).iterator());
        RpcManager rpcManager = (RpcManager)Mockito.mock(RpcManager.class);
        CommandsFactory commandsFactory = (CommandsFactory)Mockito.mock(CommandsFactory.class);
        OutboundTransferTask task = new OutboundTransferTask(LocalModeAddress.INSTANCE, segments, numSegments, numSegments, 1, chunks -> {}, rpcManager, commandsFactory, 10000L, "mock-cache", true);
        ArgumentCaptor cmdCaptor = ArgumentCaptor.forClass(Collection.class);
        Mockito.when((Object)commandsFactory.buildStateResponseCommand(ArgumentMatchers.anyInt(), (Collection)cmdCaptor.capture(), ArgumentMatchers.anyBoolean())).thenReturn((Object)((StateResponseCommand)Mockito.mock(StateResponseCommand.class)));
        Mockito.when((Object)rpcManager.invokeCommand((Address)ArgumentMatchers.any(Address.class), (ReplicableCommand)ArgumentMatchers.any(), (ResponseCollector)ArgumentMatchers.any(), (RpcOptions)ArgumentMatchers.any())).thenAnswer(i -> CompletableFutures.completedNull());
        ArrayList<SegmentAwarePublisherSupplier.NotificationWithLost> entries = new ArrayList<SegmentAwarePublisherSupplier.NotificationWithLost>();
        for (int i2 = 0; i2 < numSegments; ++i2) {
            ImmortalCacheEntry entry = new ImmortalCacheEntry((Object)"key", (Object)"value");
            entries.add(Notifications.value((Object)entry, (int)i2));
            entries.add(Notifications.segmentComplete((int)i2));
        }
        CountDownLatch latch = new CountDownLatch(1);
        task.execute(Flowable.fromIterable(entries)).whenComplete((v, t) -> {
            if (t != null) {
                throw new TestException((Throwable)t);
            }
            latch.countDown();
        });
        if (!latch.await(15L, TimeUnit.SECONDS)) {
            throw new TestException("Did not receive all segment notifications");
        }
        IntSet transferred = IntSets.mutableEmptySet((int)numSegments);
        Assert.assertEquals((int)cmdCaptor.getAllValues().size(), (int)2);
        for (Collection chunks2 : cmdCaptor.getAllValues()) {
            Assert.assertEquals((int)chunks2.size(), (int)15);
            transferred.addAll((Collection)chunks2.stream().map(StateChunk::getSegmentId).collect(Collectors.toList()));
        }
        Assert.assertEquals((Set)transferred, (Set)segments);
    }
}

