/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.statetransfer;

import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.infinispan.Cache;
import org.infinispan.commons.test.TestResourceTracker;
import org.infinispan.commons.util.Util;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.manager.CacheContainer;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.partitionhandling.PartitionHandling;
import org.infinispan.remoting.transport.Transport;
import org.infinispan.remoting.transport.jgroups.JGroupsTransport;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.CleanupAfterMethod;
import org.infinispan.test.fwk.JGroupsConfigBuilder;
import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.infinispan.test.fwk.TransportFlags;
import org.jgroups.BytesMessage;
import org.jgroups.Header;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.blocks.RequestCorrelator;
import org.jgroups.conf.ClassConfigurator;
import org.jgroups.fork.ForkChannel;
import org.jgroups.fork.UnknownForkHandler;
import org.jgroups.protocols.FORK;
import org.jgroups.stack.Protocol;
import org.testng.annotations.Test;

@Test(testName="statetransfer.ForkChannelRestartTest", groups={"functional"})
@CleanupAfterMethod
public class ForkChannelRestartTest
extends MultipleCacheManagersTest {
    private static final byte[] FORK_NOT_FOUND_BUFFER = Util.EMPTY_BYTE_ARRAY;
    private static final String CACHE_NAME = "repl";
    private static final int CLUSTER_SIZE = 3;

    @Override
    protected void createCacheManagers() throws Throwable {
    }

    public void testRestart() throws Exception {
        int i;
        TestResourceTracker.testThreadStarted((String)this.getTestName());
        ConfigurationBuilder replCfg = new ConfigurationBuilder();
        replCfg.clustering().cacheMode(CacheMode.REPL_SYNC).stateTransfer().timeout(30L, TimeUnit.SECONDS);
        replCfg.clustering().partitionHandling().whenSplit(PartitionHandling.DENY_READ_WRITES);
        String[] names = new String[4];
        JChannel[] channels = new JChannel[4];
        EmbeddedCacheManager[] managers = new EmbeddedCacheManager[4];
        for (i = 0; i < 3; ++i) {
            this.configureManager(replCfg, names, channels, managers, i);
        }
        for (i = 0; i < 3; ++i) {
            managers[i].getCache(CACHE_NAME);
        }
        log.debugf("Cache managers created. Crashing manager %s but keeping the channel in the view", (Object)names[1]);
        TestingUtil.getDiscardForCache(managers[1]).discardAll(true);
        TestingUtil.installNewView(managers[1]);
        managers[1].stop();
        this.configureManager(replCfg, names, channels, managers, 3);
        Future<Cache> future = this.fork(() -> managers[3].getCache(CACHE_NAME));
        Thread.sleep(1000L);
        log.debugf("Stopping channel %s", (Object)names[1]);
        channels[1].close();
        ArrayList<EmbeddedCacheManager> liveManagers = new ArrayList<EmbeddedCacheManager>(Arrays.asList(managers));
        liveManagers.remove(1);
        TestingUtil.blockUntilViewsReceived(10000, false, liveManagers);
        TestingUtil.waitForNoRebalance(liveManagers.stream().map(cm -> cm.getCache(CACHE_NAME)).collect(Collectors.toList()));
        log.debug((Object)"Rebalance finished successfully");
        future.get(10L, TimeUnit.SECONDS);
    }

    private void configureManager(ConfigurationBuilder replCfg, String[] names, JChannel[] channels, EmbeddedCacheManager[] managers, int i) throws Exception {
        names[i] = TestResourceTracker.getNextNodeName();
        channels[i] = this.createChannel(names[i]);
        managers[i] = this.createCacheManager(replCfg, names[i], channels[i]);
        managers[i].defineConfiguration(CACHE_NAME, replCfg.build());
    }

    private EmbeddedCacheManager createCacheManager(ConfigurationBuilder cacheCfg, String name, JChannel channel) throws Exception {
        ForkChannel fch = new ForkChannel(channel, "stack1", "channel1", new Protocol[0]);
        GlobalConfigurationBuilder gcb = new GlobalConfigurationBuilder();
        gcb.transport().nodeName(name);
        gcb.transport().transport((Transport)new JGroupsTransport((JChannel)fch));
        gcb.transport().distributedSyncTimeout(40L, TimeUnit.SECONDS);
        EmbeddedCacheManager cm = TestCacheManagerFactory.newDefaultCacheManager(true, gcb, cacheCfg);
        this.registerCacheManager(new CacheContainer[]{cm});
        return cm;
    }

    private JChannel createChannel(final String name) throws Exception {
        String configString = JGroupsConfigBuilder.getJGroupsConfig(ForkChannelRestartTest.class.getName(), new TransportFlags().withFD(true));
        JChannel channel = new JChannel((InputStream)new ByteArrayInputStream(configString.getBytes()));
        TestResourceTracker.addResource((TestResourceTracker.Cleaner)new TestResourceTracker.Cleaner<JChannel>(channel){

            public void close() {
                ((JChannel)this.ref).close();
            }
        });
        channel.setName(name);
        final FORK fork = new FORK();
        fork.setUnknownForkHandler(new UnknownForkHandler(){
            final /* synthetic */ ForkChannelRestartTest this$0;
            {
                this.this$0 = this$0;
            }

            public Object handleUnknownForkStack(Message message, String forkStackId) {
                return this.handle(message);
            }

            public Object handleUnknownForkChannel(Message message, String forkChannelId) {
                return this.handle(message);
            }

            private Object handle(Message message) {
                short id = ClassConfigurator.getProtocolId(RequestCorrelator.class);
                RequestCorrelator.Header requestHeader = (RequestCorrelator.Header)message.getHeader(id);
                if (requestHeader != null) {
                    log.debugf("Sending CacheNotFoundResponse reply from %s for %s", (Object)name, (Object)requestHeader);
                    short flags = JGroupsTransport.REPLY_FLAGS;
                    Message response = new BytesMessage(message.getSrc()).setFlag(flags, false);
                    FORK.ForkHeader forkHeader = (FORK.ForkHeader)message.getHeader(FORK.ID);
                    response.putHeader(FORK.ID, (Header)forkHeader);
                    response.putHeader(id, (Header)new RequestCorrelator.Header(1, requestHeader.req_id, id));
                    response.setArray(FORK_NOT_FOUND_BUFFER);
                    fork.down(response);
                }
                return null;
            }
        });
        channel.getProtocolStack().addProtocol((Protocol)fork);
        channel.connect("FORKISPN");
        log.tracef("Channel %s connected: %s", (Object)channel, (Object)channel.getViewAsString());
        return channel;
    }
}

