/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.util;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.infinispan.commons.TimeoutException;
import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.infinispan.manager.CacheContainer;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.test.TestException;
import org.infinispan.test.TestingUtil;
import org.infinispan.topology.CacheTopology;
import org.infinispan.topology.LocalTopologyManager;
import org.infinispan.util.AbstractControlledLocalTopologyManager;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.testng.AssertJUnit;

public class BlockingLocalTopologyManager
extends AbstractControlledLocalTopologyManager {
    private static final Log log = LogFactory.getLog(BlockingLocalTopologyManager.class);
    private static final int TIMEOUT_SECONDS = 10;
    private final Address address;
    private final String expectedCacheName;
    private final BlockingQueue<Event> queuedTopologies = new LinkedBlockingQueue<Event>();
    private volatile boolean enabled = true;
    private volatile RuntimeException exception;

    private BlockingLocalTopologyManager(LocalTopologyManager delegate, Address address, String cacheName) {
        super(delegate);
        this.address = address;
        this.expectedCacheName = cacheName;
    }

    public static BlockingLocalTopologyManager replaceTopologyManager(EmbeddedCacheManager cacheContainer, String cacheName) {
        LocalTopologyManager manager = TestingUtil.extractGlobalComponent((CacheContainer)cacheContainer, LocalTopologyManager.class);
        BlockingLocalTopologyManager controlledLocalTopologyManager = new BlockingLocalTopologyManager(manager, cacheContainer.getAddress(), cacheName);
        TestingUtil.replaceComponent((CacheContainer)cacheContainer, LocalTopologyManager.class, controlledLocalTopologyManager, true);
        return controlledLocalTopologyManager;
    }

    public static BlockingLocalTopologyManager replaceTopologyManagerDefaultCache(EmbeddedCacheManager cacheContainer) {
        return BlockingLocalTopologyManager.replaceTopologyManager(cacheContainer, TestingUtil.getDefaultCacheName(cacheContainer));
    }

    public static void confirmTopologyUpdate(CacheTopology.Phase phase, BlockingLocalTopologyManager ... topologyManagers) throws InterruptedException {
        for (BlockingLocalTopologyManager topologyManager : topologyManagers) {
            topologyManager.expectTopologyUpdate(phase).unblock();
        }
        if (BlockingLocalTopologyManager.needConfirmation(phase)) {
            for (BlockingLocalTopologyManager topologyManager : topologyManagers) {
                topologyManager.expectPhaseConfirmation().unblock();
            }
        }
    }

    public static void finishRebalance(CacheTopology.Phase nextPhase, BlockingLocalTopologyManager ... topologyManagers) throws InterruptedException {
        switch (nextPhase) {
            case READ_OLD_WRITE_ALL: {
                BlockingLocalTopologyManager.confirmTopologyUpdate(CacheTopology.Phase.READ_OLD_WRITE_ALL, topologyManagers);
            }
            case READ_ALL_WRITE_ALL: {
                BlockingLocalTopologyManager.confirmTopologyUpdate(CacheTopology.Phase.READ_ALL_WRITE_ALL, topologyManagers);
            }
            case READ_NEW_WRITE_ALL: {
                BlockingLocalTopologyManager.confirmTopologyUpdate(CacheTopology.Phase.READ_NEW_WRITE_ALL, topologyManagers);
            }
            case NO_REBALANCE: {
                BlockingLocalTopologyManager.confirmTopologyUpdate(CacheTopology.Phase.NO_REBALANCE, topologyManagers);
            }
        }
    }

    public BlockedTopology expectTopologyUpdate(CacheTopology.Phase phase) throws InterruptedException {
        BlockedTopology blockedTopology = this.expectTopologyUpdate();
        AssertJUnit.assertNotSame((String)"Expected a CH_UPDATE or REBALANCE_START, but got a CONFIRMATION", (Object)((Object)blockedTopology.getType()), (Object)((Object)Type.CONFIRMATION));
        AssertJUnit.assertEquals((Object)phase, (Object)blockedTopology.getCacheTopology().getPhase());
        return blockedTopology;
    }

    public BlockedTopology expectTopologyUpdate(CacheTopology.Phase phase, int topologyId) throws InterruptedException {
        BlockedTopology blockedTopology = this.expectTopologyUpdate();
        AssertJUnit.assertEquals((int)topologyId, (int)blockedTopology.getCacheTopology().getTopologyId());
        AssertJUnit.assertEquals((Object)phase, (Object)blockedTopology.getCacheTopology().getPhase());
        return blockedTopology;
    }

    public BlockedTopology expectTopologyUpdate() throws InterruptedException {
        Event update = this.queuedTopologies.poll(10L, TimeUnit.SECONDS);
        if (update == null) {
            throw new TimeoutException("Timed out waiting for topology update on " + String.valueOf(this.address));
        }
        return new BlockedTopology(update);
    }

    public BlockedConfirmation expectPhaseConfirmation() throws InterruptedException {
        Event update = this.queuedTopologies.poll(10L, TimeUnit.SECONDS);
        if (update == null) {
            throw new TimeoutException("Timed out waiting for phase confirmation on " + String.valueOf(this.address));
        }
        AssertJUnit.assertEquals((Object)((Object)Type.CONFIRMATION), (Object)((Object)update.type));
        return new BlockedConfirmation(update);
    }

    public BlockedConfirmation expectPhaseConfirmation(int topologyId) throws InterruptedException {
        BlockedConfirmation blockedConfirmation = this.expectPhaseConfirmation();
        AssertJUnit.assertEquals((int)topologyId, (int)blockedConfirmation.getTopologyId());
        return blockedConfirmation;
    }

    public void confirmTopologyUpdate(CacheTopology.Phase phase) throws InterruptedException {
        this.expectTopologyUpdate(phase).unblock();
        if (BlockingLocalTopologyManager.needConfirmation(phase)) {
            this.expectPhaseConfirmation().unblock();
        }
    }

    public void expectNoTopologyUpdate(long timeout, TimeUnit timeUnit) throws InterruptedException {
        Event update = this.queuedTopologies.poll(timeout, timeUnit);
        if (update != null) {
            throw new TestException("Expected no topology update on " + String.valueOf(this.address) + ", but got " + String.valueOf((Object)update.type) + " " + update.topologyId);
        }
    }

    public BlockedTopology expectRebalanceStartAfterLeave() throws InterruptedException {
        BlockedTopology topology0 = this.expectTopologyUpdate();
        if (topology0.getType() == Type.REBALANCE_START) {
            this.expectTopologyUpdate(CacheTopology.Phase.NO_REBALANCE).unblock();
        } else {
            topology0.unblock();
            topology0 = this.expectTopologyUpdate(CacheTopology.Phase.READ_OLD_WRITE_ALL);
        }
        return topology0;
    }

    private static boolean needConfirmation(CacheTopology.Phase phase) {
        return phase == CacheTopology.Phase.READ_OLD_WRITE_ALL || phase == CacheTopology.Phase.READ_ALL_WRITE_ALL || phase == CacheTopology.Phase.READ_NEW_WRITE_ALL;
    }

    public void stopBlocking() {
        this.enabled = false;
        if (this.exception != null) {
            throw this.exception;
        }
        if (!this.queuedTopologies.isEmpty()) {
            log.error((Object)("Stopped blocking topology updates, but there are " + this.queuedTopologies.size() + " blocked updates in the queue: " + String.valueOf(this.queuedTopologies)));
        }
        log.debugf("Stopped blocking topology updates", new Object[0]);
    }

    @Override
    protected final CompletionStage<Void> beforeHandleTopologyUpdate(String cacheName, CacheTopology cacheTopology, int viewId) {
        if (!this.enabled || !this.expectedCacheName.equals(cacheName)) {
            return CompletableFutures.completedNull();
        }
        Event event = new Event(cacheTopology, cacheTopology.getTopologyId(), viewId, Type.CH_UPDATE);
        this.queuedTopologies.add(event);
        log.debugf("Blocking topology update for cache %s: %s", (Object)cacheName, (Object)cacheTopology);
        return event.whenUnblocked().thenRun(() -> log.debugf("Continue consistent hash update for cache %s: %s", (Object)cacheName, (Object)cacheTopology));
    }

    @Override
    protected final CompletionStage<Void> beforeHandleRebalance(String cacheName, CacheTopology cacheTopology, int viewId) {
        if (!this.enabled || !this.expectedCacheName.equals(cacheName)) {
            return CompletableFutures.completedNull();
        }
        Event event = new Event(cacheTopology, cacheTopology.getTopologyId(), viewId, Type.REBALANCE_START);
        this.queuedTopologies.add(event);
        log.debugf("Blocking rebalance start for cache %s: %s", (Object)cacheName, (Object)cacheTopology);
        return event.whenUnblocked().thenRun(() -> log.debugf("Continue rebalance start for cache %s: %s", (Object)cacheName, (Object)cacheTopology));
    }

    @Override
    protected final CompletionStage<Void> beforeConfirmRebalancePhase(String cacheName, int topologyId, Throwable throwable) {
        if (!this.enabled || !this.expectedCacheName.equals(cacheName)) {
            return CompletableFutures.completedNull();
        }
        Event event = new Event(null, topologyId, -1, Type.CONFIRMATION);
        this.queuedTopologies.add(event);
        log.debugf("Blocking rebalance confirmation for cache %s: %s", (Object)cacheName, (Object)topologyId);
        return event.whenUnblocked().thenRun(() -> log.debugf("Continue rebalance confirmation for cache %s: %s", (Object)cacheName, (Object)topologyId));
    }

    void failManager(Throwable e) {
        this.exception = e instanceof RuntimeException ? (RuntimeException)e : new TestException(e);
    }

    public class BlockedTopology {
        private final Event event;

        BlockedTopology(Event event) {
            this.event = event;
        }

        public CacheTopology getCacheTopology() {
            return this.event.cacheTopology;
        }

        public CacheTopology.Phase getPhase() {
            return this.event.cacheTopology.getPhase();
        }

        public int getViewId() {
            return this.event.viewId;
        }

        public Type getType() {
            return this.event.type;
        }

        public void unblock() {
            this.event.unblock();
        }
    }

    public class BlockedConfirmation {
        private final Event event;

        BlockedConfirmation(Event event) {
            this.event = event;
        }

        public int getTopologyId() {
            return this.event.topologyId;
        }

        public void unblock() {
            this.event.unblock();
        }
    }

    public static enum Type {
        CH_UPDATE,
        REBALANCE_START,
        CONFIRMATION;

    }

    class Event {
        final CacheTopology cacheTopology;
        final int topologyId;
        final int viewId;
        final Type type;
        private final CompletableFuture<Void> latch = new CompletableFuture();

        Event(CacheTopology cacheTopology, int topologyId, int viewId, Type type) {
            this.cacheTopology = cacheTopology;
            this.topologyId = topologyId;
            this.viewId = viewId;
            this.type = type;
        }

        void awaitUnblock() {
            try {
                this.latch.get(20L, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                this.fail(e);
            }
            catch (ExecutionException e) {
                this.fail(e.getCause());
            }
            catch (java.util.concurrent.TimeoutException e) {
                this.fail(e);
            }
        }

        CompletionStage<Void> whenUnblocked() {
            return this.latch;
        }

        void unblock() {
            if (this.latch.isCompletedExceptionally()) {
                this.latch.join();
            }
            log.tracef("Unblocking %s %d on %s", (Object)this.type, (Object)this.topologyId, (Object)BlockingLocalTopologyManager.this.address);
            this.latch.complete(null);
        }

        void fail(Throwable e) {
            if (this.latch.isCompletedExceptionally()) {
                this.latch.join();
            }
            log.errorf(e, "Failed waiting for test to unblock %s %d on %s", (Object)this.type, (Object)this.topologyId, (Object)BlockingLocalTopologyManager.this.address);
            BlockingLocalTopologyManager.this.failManager(e);
            this.latch.completeExceptionally(e);
        }

        public String toString() {
            return "Event{type=" + String.valueOf((Object)this.type) + ", topologyId=" + this.topologyId + ", viewId=" + this.viewId + "}";
        }
    }
}

