/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.statetransfer;

import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.infinispan.Cache;
import org.infinispan.commons.test.TestResourceTracker;
import org.infinispan.commons.util.Util;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.manager.CacheContainer;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.remoting.transport.Transport;
import org.infinispan.remoting.transport.jgroups.JGroupsTransport;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.CleanupAfterMethod;
import org.infinispan.test.fwk.JGroupsConfigBuilder;
import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.infinispan.test.fwk.TransportFlags;
import org.jgroups.BytesMessage;
import org.jgroups.Header;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.blocks.RequestCorrelator;
import org.jgroups.conf.ClassConfigurator;
import org.jgroups.fork.ForkChannel;
import org.jgroups.fork.UnknownForkHandler;
import org.jgroups.protocols.FORK;
import org.jgroups.stack.Protocol;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(testName="statetransfer.ConcurrentStartForkChannelTest", groups={"functional"})
@CleanupAfterMethod
public class ConcurrentStartForkChannelTest
extends MultipleCacheManagersTest {
    public static final byte[] FORK_NOT_FOUND_BUFFER = Util.EMPTY_BYTE_ARRAY;
    public static final String CACHE_NAME = "repl";

    @Override
    protected void createCacheManagers() throws Throwable {
    }

    @DataProvider(name="startOrder")
    public Object[][] startOrder() {
        return new Object[][]{{0, 1}, {1, 0}};
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=30000L, dataProvider="startOrder")
    public void testConcurrentStart(int eagerManager, int lazyManager) throws Exception {
        TestResourceTracker.testThreadStarted((String)this.getTestName());
        ConfigurationBuilder replCfg = new ConfigurationBuilder();
        replCfg.clustering().cacheMode(CacheMode.REPL_SYNC).stateTransfer().timeout(30L, TimeUnit.SECONDS);
        String name1 = TestResourceTracker.getNextNodeName();
        String name2 = TestResourceTracker.getNextNodeName();
        JChannel ch1 = this.createChannel(name1);
        JChannel ch2 = this.createChannel(name2);
        EmbeddedCacheManager cm1 = this.createCacheManager(replCfg, name1, ch1);
        EmbeddedCacheManager cm2 = this.createCacheManager(replCfg, name2, ch2);
        cm1.defineConfiguration(CACHE_NAME, replCfg.build());
        cm2.defineConfiguration(CACHE_NAME, replCfg.build());
        try {
            log.debugf("Cache managers created. Starting the caches", new Object[0]);
            Future<Cache> c1rFuture = this.fork(() -> {
                EmbeddedCacheManager m = this.manager(eagerManager);
                m.start();
                return m.getCache(CACHE_NAME);
            });
            Thread.sleep(1000L);
            EmbeddedCacheManager m = this.manager(lazyManager);
            m.start();
            Cache c2r = m.getCache(CACHE_NAME);
            Cache c1r = c1rFuture.get(10L, TimeUnit.SECONDS);
            TestingUtil.blockUntilViewsReceived(10000L, new CacheContainer[]{cm1, cm2});
            TestingUtil.waitForNoRebalance(c1r, c2r);
        }
        finally {
            cm1.stop();
            ch1.close();
            cm2.stop();
            ch2.close();
        }
    }

    private EmbeddedCacheManager createCacheManager(ConfigurationBuilder cacheCfg, String name, JChannel channel) throws Exception {
        final FORK fork = new FORK();
        fork.setUnknownForkHandler(new UnknownForkHandler(){
            final /* synthetic */ ConcurrentStartForkChannelTest this$0;
            {
                this.this$0 = this$0;
            }

            public Object handleUnknownForkStack(Message message, String forkStackId) {
                return this.handle(message);
            }

            public Object handleUnknownForkChannel(Message message, String forkChannelId) {
                return this.handle(message);
            }

            private Object handle(Message message) {
                short id = ClassConfigurator.getProtocolId(RequestCorrelator.class);
                RequestCorrelator.Header header = (RequestCorrelator.Header)message.getHeader(id);
                if (header != null) {
                    log.debugf("Sending CacheNotFoundResponse reply for %s", (Object)header);
                    short flags = JGroupsTransport.REPLY_FLAGS;
                    Message response = new BytesMessage(message.getSrc()).setFlag(flags, false);
                    response.putHeader(FORK.ID, message.getHeader(FORK.ID));
                    response.putHeader(id, (Header)new RequestCorrelator.Header(1, header.req_id, id));
                    response.setArray(FORK_NOT_FOUND_BUFFER);
                    fork.down(response);
                }
                return null;
            }
        });
        channel.getProtocolStack().addProtocol((Protocol)fork);
        ForkChannel fch = new ForkChannel(channel, "stack1", "channel1", new Protocol[0]);
        GlobalConfigurationBuilder gcb = new GlobalConfigurationBuilder();
        gcb.transport().transport((Transport)new JGroupsTransport((JChannel)fch));
        gcb.transport().nodeName(channel.getName());
        gcb.transport().distributedSyncTimeout(30L, TimeUnit.SECONDS);
        EmbeddedCacheManager cm = TestCacheManagerFactory.newDefaultCacheManager(false, gcb, cacheCfg);
        this.registerCacheManager(new CacheContainer[]{cm});
        return cm;
    }

    private JChannel createChannel(String name) throws Exception {
        String configString = JGroupsConfigBuilder.getJGroupsConfig(ConcurrentStartForkChannelTest.class.getName(), new TransportFlags());
        JChannel channel = new JChannel((InputStream)new ByteArrayInputStream(configString.getBytes()));
        channel.setName(name);
        channel.connect(ConcurrentStartForkChannelTest.class.getSimpleName());
        log.tracef("Channel %s connected: %s", (Object)channel, (Object)channel.getViewAsString());
        return channel;
    }
}

