/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.xsite.statetransfer.failures;

import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.infinispan.distribution.DistributionTestHelper;
import org.infinispan.manager.CacheContainer;
import org.infinispan.remoting.transport.AbstractDelegatingTransport;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.Transport;
import org.infinispan.remoting.transport.XSiteResponse;
import org.infinispan.remoting.transport.impl.XSiteResponseImpl;
import org.infinispan.test.AbstractInfinispanTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.CheckPoint;
import org.infinispan.util.BlockingLocalTopologyManager;
import org.infinispan.xsite.XSiteBackup;
import org.infinispan.xsite.commands.remote.XSiteRequest;
import org.infinispan.xsite.statetransfer.XSiteProviderDelegator;
import org.infinispan.xsite.statetransfer.XSiteStateProvider;
import org.infinispan.xsite.statetransfer.XSiteStatePushCommand;
import org.infinispan.xsite.statetransfer.failures.AbstractTopologyChangeTest;
import org.testng.annotations.Test;

@Test(groups={"xsite", "unstable"}, testName="xsite.statetransfer.failures.SiteProviderTopologyChangeTest")
public class SiteProviderTopologyChangeTest
extends AbstractTopologyChangeTest {
    public void testJoinAfterXSiteST() throws Exception {
        this.doTopologyChangeAfterXSiteStateTransfer(AbstractTopologyChangeTest.TopologyEvent.JOIN);
    }

    public void testLeaveAfterXSiteST() throws Exception {
        this.doTopologyChangeAfterXSiteStateTransfer(AbstractTopologyChangeTest.TopologyEvent.LEAVE);
    }

    public void testCoordinatorLeaveAfterXSiteST() throws Exception {
        this.doTopologyChangeAfterXSiteStateTransfer(AbstractTopologyChangeTest.TopologyEvent.COORDINATOR_LEAVE);
    }

    public void testSiteMasterLeaveAfterXSiteST() throws Exception {
        this.doTopologyChangeAfterXSiteStateTransfer(AbstractTopologyChangeTest.TopologyEvent.SITE_MASTER_LEAVE);
    }

    public void testJoinDuringXSiteST() throws Exception {
        this.doTopologyChangeDuringXSiteStateTransfer(AbstractTopologyChangeTest.TopologyEvent.JOIN);
    }

    public void testLeaveDuringXSiteST() throws Exception {
        this.doTopologyChangeDuringXSiteStateTransfer(AbstractTopologyChangeTest.TopologyEvent.LEAVE);
    }

    public void testCoordinatorLeaveDuringXSiteST() throws Exception {
        this.doTopologyChangeDuringXSiteStateTransfer(AbstractTopologyChangeTest.TopologyEvent.COORDINATOR_LEAVE);
    }

    public void testSiteMasterLeaveDuringXSiteST() throws Exception {
        this.doTopologyChangeDuringXSiteStateTransfer(AbstractTopologyChangeTest.TopologyEvent.SITE_MASTER_LEAVE);
    }

    public void testXSiteSTDuringJoin() throws Exception {
        this.doXSiteStateTransferDuringTopologyChange(AbstractTopologyChangeTest.TopologyEvent.JOIN);
    }

    public void testXSiteSTDuringLeave() throws Exception {
        this.doXSiteStateTransferDuringTopologyChange(AbstractTopologyChangeTest.TopologyEvent.LEAVE);
    }

    @Test(groups={"xsite", "unstable"}, description="See ISPN-6749")
    public void testXSiteSTDuringSiteMasterLeave() throws Exception {
        this.doXSiteStateTransferDuringTopologyChange(AbstractTopologyChangeTest.TopologyEvent.SITE_MASTER_LEAVE);
    }

    private void doTopologyChangeAfterXSiteStateTransfer(AbstractTopologyChangeTest.TopologyEvent event) throws Exception {
        log.debugf("Start topology change after x-site state transfer with %s", (Object)event);
        this.initBeforeTest();
        log.debug((Object)"Setting blocking conditions");
        AbstractTopologyChangeTest.TestCaches testCaches = this.createTestCache(event, "LON-1");
        final AtomicReference<Object> pendingRequest = new AtomicReference<Object>(null);
        log.debugf("Controlled cache=%s, Coordinator cache=%s, Cache to remove=%s", (Object)DistributionTestHelper.addressOf(testCaches.controllerCache), (Object)DistributionTestHelper.addressOf(testCaches.coordinator), (Object)(testCaches.removeIndex < 0 ? "NONE" : DistributionTestHelper.addressOf(this.cache("LON-1", testCaches.removeIndex))));
        if (testCaches.removeIndex >= 0) {
            log.debugf("Discard x-site state transfer start command in cache %s to remove", (Object)DistributionTestHelper.addressOf(this.cache("LON-1", testCaches.removeIndex)));
            TestingUtil.wrapComponent(this.cache("LON-1", testCaches.removeIndex), XSiteStateProvider.class, (wrapOn, current) -> new XSiteProviderDelegator((XSiteStateProvider)current){

                @Override
                public void startStateTransfer(String siteName, Address requestor, int minTopologyId) {
                    log.debugf("Discard state transfer request to %s from %s", (Object)siteName, (Object)requestor);
                }
            }, true);
        } else {
            log.debugf("Block x-site state transfer start command in cache %s", (Object)DistributionTestHelper.addressOf(this.cache("LON-1", 1)));
            TestingUtil.wrapComponent(this.cache("LON-1", 1), XSiteStateProvider.class, (wrapOn, current) -> new XSiteProviderDelegator((XSiteStateProvider)current){

                @Override
                public void startStateTransfer(String siteName, Address requestor, int minTopologyId) {
                    log.debugf("Blocking state transfer request to %s from %s", (Object)siteName, (Object)requestor);
                    pendingRequest.set(new StateTransferRequest(siteName, requestor, minTopologyId, this.xSiteStateProvider));
                }
            }, true);
        }
        log.debug((Object)"Start x-site state transfer");
        this.startStateTransfer(testCaches.coordinator, "NYC-2");
        this.assertOnline("LON-1", "NYC-2");
        log.debug((Object)"Await until X-Site state transfer is finished!");
        SiteProviderTopologyChangeTest.eventually(() -> TestingUtil.extractComponent(testCaches.controllerCache, XSiteStateProvider.class).getCurrentStateSending().isEmpty(), TimeUnit.SECONDS.toMillis(30L));
        Future<Void> topologyEventFuture = this.triggerTopologyChange("LON-1", testCaches.removeIndex);
        topologyEventFuture.get();
        this.awaitLocalStateTransfer("LON-1");
        if (pendingRequest.get() != null) {
            log.debug((Object)"Let the blocked x-site state transfer request to proceed");
            ((StateTransferRequest)pendingRequest.get()).execute();
        }
        this.awaitXSiteStateSent("LON-1");
        log.debug((Object)"Check data in both sites.");
        this.assertData();
    }

    private void doTopologyChangeDuringXSiteStateTransfer(AbstractTopologyChangeTest.TopologyEvent event) throws Exception {
        log.debugf("Start topology change during x-site state transfer with %s", (Object)event);
        this.initBeforeTest();
        AbstractTopologyChangeTest.TestCaches testCaches = this.createTestCache(event, "LON-1");
        log.debugf("Controlled cache=%s, Coordinator cache=%s, Cache to remove=%s", (Object)DistributionTestHelper.addressOf(testCaches.controllerCache), (Object)DistributionTestHelper.addressOf(testCaches.coordinator), (Object)(testCaches.removeIndex < 0 ? "NONE" : DistributionTestHelper.addressOf(this.cache("LON-1", testCaches.removeIndex))));
        final CheckPoint checkPoint = new CheckPoint();
        final AtomicBoolean firstChunk = new AtomicBoolean(false);
        TestingUtil.wrapGlobalComponent((CacheContainer)testCaches.controllerCache.getCacheManager(), Transport.class, new TestingUtil.WrapFactory<Transport, Transport, CacheContainer>(){

            @Override
            public Transport wrap(CacheContainer wrapOn, Transport current) {
                return new AbstractDelegatingTransport(current){

                    public void start() {
                    }

                    public <O> XSiteResponse<O> backupRemotely(XSiteBackup backup, XSiteRequest<O> rpcCommand) {
                        if (rpcCommand instanceof XSiteStatePushCommand && firstChunk.compareAndSet(false, true)) {
                            checkPoint.trigger("before-second-chunk");
                            try {
                                checkPoint.awaitStrict("second-chunk", 30L, TimeUnit.SECONDS);
                            }
                            catch (InterruptedException | TimeoutException e) {
                                XSiteResponseImpl rsp = new XSiteResponseImpl(AbstractInfinispanTest.TIME_SERVICE, backup);
                                rsp.completeExceptionally((Throwable)e);
                                return rsp;
                            }
                        }
                        return super.backupRemotely(backup, rpcCommand);
                    }
                };
            }
        }, true);
        log.debug((Object)"Start x-site state transfer");
        this.startStateTransfer(testCaches.coordinator, "NYC-2");
        this.assertOnline("LON-1", "NYC-2");
        checkPoint.awaitStrict("before-second-chunk", 30L, TimeUnit.SECONDS);
        Future<Void> topologyEventFuture = this.triggerTopologyChange("LON-1", testCaches.removeIndex);
        topologyEventFuture.get();
        checkPoint.triggerForever("second-chunk");
        this.awaitLocalStateTransfer("LON-1");
        this.awaitXSiteStateSent("LON-1");
        this.assertData();
    }

    private void doXSiteStateTransferDuringTopologyChange(AbstractTopologyChangeTest.TopologyEvent event) throws Exception {
        log.debugf("Start topology change during x-site state transfer with %s", (Object)event);
        this.initBeforeTest();
        AbstractTopologyChangeTest.TestCaches testCaches = this.createTestCache(event, "LON-1");
        log.debugf("Controlled cache=%s, Coordinator cache=%s, Cache to remove=%s", (Object)DistributionTestHelper.addressOf(testCaches.controllerCache), (Object)DistributionTestHelper.addressOf(testCaches.coordinator), (Object)(testCaches.removeIndex < 0 ? "NONE" : DistributionTestHelper.addressOf(this.cache("LON-1", testCaches.removeIndex))));
        BlockingLocalTopologyManager topologyManager = BlockingLocalTopologyManager.replaceTopologyManagerDefaultCache(testCaches.controllerCache.getCacheManager());
        Future<Void> topologyEventFuture = this.triggerTopologyChange("LON-1", testCaches.removeIndex);
        BlockingLocalTopologyManager.BlockedTopology blockedTopology = topologyManager.expectTopologyUpdate();
        log.debug((Object)"Start x-site state transfer");
        this.startStateTransfer(testCaches.coordinator, "NYC-2");
        this.assertOnline("LON-1", "NYC-2");
        blockedTopology.unblock();
        topologyEventFuture.get();
        this.awaitLocalStateTransfer("LON-1");
        this.awaitXSiteStateSent("LON-1");
        this.assertData();
    }

    private static class StateTransferRequest {
        private final String siteName;
        private final Address requestor;
        private final int minTopologyId;
        private final XSiteStateProvider provider;

        private StateTransferRequest(String siteName, Address requestor, int minTopologyId, XSiteStateProvider provider) {
            this.siteName = siteName;
            this.requestor = requestor;
            this.minTopologyId = minTopologyId;
            this.provider = provider;
        }

        public void execute() {
            this.provider.startStateTransfer(this.siteName, this.requestor, this.minTopologyId);
        }
    }
}

