/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.distribution;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import java.util.function.Predicate;
import org.infinispan.Cache;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commands.VisitableCommand;
import org.infinispan.commands.remote.SingleRpcCommand;
import org.infinispan.commands.write.InvalidateL1Command;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.distribution.DistSyncFuncTest;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.rpc.RpcOptions;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.ResponseCollector;
import org.infinispan.test.ReplListener;
import org.infinispan.test.TestingUtil;
import org.infinispan.util.AbstractDelegatingRpcManager;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;

@Test(groups={"functional"}, testName="distribution.DistAsyncFuncTest")
public class DistAsyncFuncTest
extends DistSyncFuncTest {
    ReplListener r1;
    ReplListener r2;
    ReplListener r3;
    ReplListener r4;
    ReplListener[] r;
    Map<Address, ReplListener> listenerLookup;
    ConcurrentMap<Address, List<InvalidateL1Command>> expectedL1Invalidations = new ConcurrentHashMap<Address, List<InvalidateL1Command>>();

    @Override
    public Object[] factory() {
        return new Object[]{new DistAsyncFuncTest(), new DistAsyncFuncTest().groupers(true)};
    }

    public DistAsyncFuncTest() {
        this.cacheMode = CacheMode.DIST_ASYNC;
        this.testRetVals = false;
    }

    @Override
    protected void createCacheManagers() throws Throwable {
        super.createCacheManagers();
        this.r1 = new ReplListener(this.c1, true, true);
        this.r2 = new ReplListener(this.c2, true, true);
        this.r3 = new ReplListener(this.c3, true, true);
        this.r4 = new ReplListener(this.c4, true, true);
        this.r = new ReplListener[]{this.r1, this.r2, this.r3, this.r4};
        this.listenerLookup = new HashMap<Address, ReplListener>();
        for (ReplListener rl : this.r) {
            this.listenerLookup.put(rl.getCache().getCacheManager().getAddress(), rl);
        }
        for (Cache c : this.caches) {
            TestingUtil.wrapComponent(c, RpcManager.class, original -> new AbstractDelegatingRpcManager((RpcManager)original){

                @Override
                protected <T> CompletionStage<T> performRequest(Collection<Address> targets, ReplicableCommand command, ResponseCollector<T> collector, Function<ResponseCollector<T>, CompletionStage<T>> invoker, RpcOptions rpcOptions) {
                    if (command instanceof SingleRpcCommand) {
                        command = ((SingleRpcCommand)command).getCommand();
                    }
                    if (command instanceof InvalidateL1Command) {
                        InvalidateL1Command invalidateL1Command = (InvalidateL1Command)command;
                        log.tracef("Sending invalidation %s to %s", (Object)command, targets);
                        List realTargets = targets != null ? targets : DistAsyncFuncTest.this.cacheAddresses;
                        for (Address target : realTargets) {
                            DistAsyncFuncTest.this.expectedL1Invalidations.computeIfAbsent(target, ignored -> Collections.synchronizedList(new ArrayList())).add(invalidateL1Command);
                        }
                    }
                    return super.performRequest(targets, command, collector, invoker, rpcOptions);
                }
            });
        }
    }

    @AfterMethod
    public void resetEagerCommands() {
        for (ReplListener rl : this.r) {
            rl.resetEager();
        }
        this.expectedL1Invalidations.clear();
    }

    @Override
    protected void asyncWait(Object key, Predicate<VisitableCommand> command) {
        if (key == null) {
            for (ReplListener rl : this.r) {
                rl.expect(command);
            }
            for (ReplListener rl : this.r) {
                rl.waitForRpc();
            }
        } else {
            for (Cache c : this.getOwners(key)) {
                this.listenerLookup.get(this.address(c)).expect(command);
                this.listenerLookup.get(this.address(c)).waitForRpc();
            }
        }
        this.waitForInvalidations();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void waitForInvalidations() {
        for (Map.Entry expected : this.expectedL1Invalidations.entrySet()) {
            Address address = (Address)expected.getKey();
            ReplListener replListener = this.listenerLookup.get(address);
            List list = (List)expected.getValue();
            if (list.isEmpty()) continue;
            log.tracef("Waiting for invalidations on %s: %s", (Object)address, (Object)list);
            List list2 = list;
            synchronized (list2) {
                for (InvalidateL1Command cmd : list) {
                    replListener.expect((Class<? extends VisitableCommand>)InvalidateL1Command.class);
                }
                list.clear();
            }
            replListener.waitForRpc();
        }
    }

    @Override
    protected void asyncWaitOnPrimary(Object key, Class<? extends VisitableCommand> command) {
        assert (key != null);
        Cache primary = this.getFirstOwner(key);
        this.listenerLookup.get(this.address(primary)).expect(command);
        this.listenerLookup.get(this.address(primary)).waitForRpc();
        this.waitForInvalidations();
    }
}

