/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.stress;

import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.infinispan.Cache;
import org.infinispan.commons.TimeoutException;
import org.infinispan.commons.executors.BlockingThreadPoolExecutorFactory;
import org.infinispan.commons.executors.ThreadPoolExecutorFactory;
import org.infinispan.commons.test.TestResourceTracker;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.distribution.ch.ConsistentHashFactory;
import org.infinispan.distribution.ch.impl.SyncConsistentHashFactory;
import org.infinispan.manager.CacheContainer;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.CleanupAfterTest;
import org.jgroups.conf.ConfiguratorFactory;
import org.jgroups.conf.ProtocolConfiguration;
import org.jgroups.conf.ProtocolStackConfigurator;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;

@CleanupAfterTest
@Test(groups={"stress"}, testName="stress.LargeCluster2StressTest", timeOut=900000L)
public class LargeCluster2StressTest
extends MultipleCacheManagersTest {
    private static final int NUM_NODES = 10;
    private static final int NUM_CACHES = 100;
    private static final int NUM_THREADS = 200;
    private static final int NUM_SEGMENTS = 1000;
    private static final int TIMEOUT_SECONDS = 180;
    public static final int JGROUPS_MAX_THREADS = 50;
    public static final int TRANSPORT_MAX_THREADS = 10;
    public static final int TRANSPORT_QUEUE_SIZE = 1000;
    public static final int REMOTE_MAX_THREADS = 50;
    public static final int REMOTE_QUEUE_SIZE = 0;
    public static final int STATE_TRANSFER_MAX_THREADS = 10;
    public static final int STATE_TRANSFER_QUEUE_SIZE = 0;

    @Override
    protected void createCacheManagers() throws Throwable {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testLargeClusterStart() throws Exception {
        final ProtocolStackConfigurator configurator = ConfiguratorFactory.getStackConfigurator((String)"default-configs/default-jgroups-udp.xml");
        ProtocolConfiguration udpConfiguration = (ProtocolConfiguration)configurator.getProtocolStack().get(0);
        AssertJUnit.assertEquals((String)"UDP", (String)udpConfiguration.getProtocolName());
        udpConfiguration.getProperties().put("mcast_addr", "239.0.0.15");
        udpConfiguration.getProperties().put("thread_pool.min_threads", "0");
        udpConfiguration.getProperties().put("thread_pool.max_threads", String.valueOf(50));
        ProtocolConfiguration gmsConfiguration = (ProtocolConfiguration)configurator.getProtocolStack().get(9);
        AssertJUnit.assertEquals((String)"pbcast.GMS", (String)gmsConfiguration.getProtocolName());
        gmsConfiguration.getProperties().put("join_timeout", "2000");
        final Configuration distConfig = new ConfigurationBuilder().clustering().cacheMode(CacheMode.DIST_SYNC).clustering().stateTransfer().awaitInitialTransfer(false).hash().consistentHashFactory((ConsistentHashFactory)new SyncConsistentHashFactory()).numSegments(1000).build();
        final Configuration replConfig = new ConfigurationBuilder().clustering().cacheMode(CacheMode.REPL_SYNC).clustering().hash().numSegments(1000).clustering().stateTransfer().awaitInitialTransfer(false).build();
        final CountDownLatch managersLatch = new CountDownLatch(10);
        ExecutorService executor = Executors.newFixedThreadPool(200, this.getTestThreadFactory("Worker"));
        ExecutorCompletionService<Void> managerCompletionService = new ExecutorCompletionService<Void>(executor);
        final ExecutorCompletionService cacheCompletionService = new ExecutorCompletionService(executor);
        try {
            Future future;
            int i;
            for (int nodeIndex = 0; nodeIndex < 10; ++nodeIndex) {
                final String nodeName = TestResourceTracker.getNameForIndex((int)nodeIndex);
                String machineId = "m" + nodeIndex / 2;
                managerCompletionService.submit(new Callable<Void>(){
                    final /* synthetic */ LargeCluster2StressTest this$0;
                    {
                        this.this$0 = this$0;
                    }

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    @Override
                    public Void call() throws Exception {
                        GlobalConfigurationBuilder gcb = new GlobalConfigurationBuilder();
                        gcb.transport().defaultTransport().nodeName(nodeName).addProperty("configurationString", (Object)configurator.getProtocolStackString());
                        BlockingThreadPoolExecutorFactory transportExecutorFactory = new BlockingThreadPoolExecutorFactory(10, 10, 1000, 60000L);
                        gcb.transport().transportThreadPool().threadPoolFactory((ThreadPoolExecutorFactory)transportExecutorFactory);
                        BlockingThreadPoolExecutorFactory remoteExecutorFactory = new BlockingThreadPoolExecutorFactory(50, 50, 0, 60000L);
                        gcb.transport().remoteCommandThreadPool().threadPoolFactory((ThreadPoolExecutorFactory)remoteExecutorFactory);
                        DefaultCacheManager cm = new DefaultCacheManager(gcb.build());
                        try {
                            int i = 0;
                            while (i < 50) {
                                final int cacheIndex = i++;
                                cm.defineConfiguration("repl-cache-" + cacheIndex, replConfig);
                                cm.defineConfiguration("dist-cache-" + cacheIndex, distConfig);
                                cacheCompletionService.submit(new Callable<Void>(){
                                    final /* synthetic */ EmbeddedCacheManager val$cm;
                                    final /* synthetic */ 1 this$1;
                                    {
                                        this.val$cm = embeddedCacheManager;
                                        this.this$1 = this$1;
                                    }

                                    @Override
                                    public Void call() throws Exception {
                                        String cacheName = "repl-cache-" + cacheIndex;
                                        Thread.currentThread().setName(cacheName + "-start-thread," + nodeName);
                                        Cache replCache = this.val$cm.getCache(cacheName);
                                        return null;
                                    }
                                });
                                cacheCompletionService.submit(new Callable<Void>(){
                                    final /* synthetic */ EmbeddedCacheManager val$cm;
                                    final /* synthetic */ 1 this$1;
                                    {
                                        this.val$cm = embeddedCacheManager;
                                        this.this$1 = this$1;
                                    }

                                    @Override
                                    public Void call() throws Exception {
                                        String cacheName = "dist-cache-" + cacheIndex;
                                        Thread.currentThread().setName(cacheName + "-start-thread," + nodeName);
                                        Cache distCache = this.val$cm.getCache(cacheName);
                                        return null;
                                    }
                                });
                                managersLatch.countDown();
                            }
                        }
                        catch (Throwable throwable) {
                            this.this$0.registerCacheManager(new CacheContainer[]{cm});
                            throw throwable;
                        }
                        this.this$0.registerCacheManager(new CacheContainer[]{cm});
                        log.infof("Started cache manager %s", (Object)nodeName);
                        return null;
                    }
                });
            }
            long endTime = System.nanoTime() + TimeUnit.SECONDS.toNanos(180L);
            for (i = 0; i < 10; ++i) {
                future = managerCompletionService.poll(180L, TimeUnit.SECONDS);
                future.get(0L, TimeUnit.SECONDS);
                if (System.nanoTime() - endTime <= 0L) continue;
                throw new TimeoutException("Took too long to start the cluster");
            }
            i = 0;
            while (i < 1000) {
                future = cacheCompletionService.poll(1L, TimeUnit.SECONDS);
                if (future != null) {
                    future.get(0L, TimeUnit.SECONDS);
                    ++i;
                }
                if (System.nanoTime() - endTime <= 0L) continue;
                throw new TimeoutException("Took too long to start the cluster");
            }
        }
        finally {
            executor.shutdownNow();
        }
        log.infof("All %d cache managers started, waiting for state transfer to finish for each cache", (Object)10);
        for (int j = 0; j < 50; ++j) {
            this.waitForClusterToForm("repl-cache-" + j);
            this.waitForClusterToForm("dist-cache-" + j);
        }
    }

    @Test(dependsOnMethods={"testLargeClusterStart"})
    public void testLargeClusterStop() {
        for (int i = 0; i < 9; ++i) {
            int j;
            int killIndex = -1;
            for (j = 0; j < this.cacheManagers.size(); ++j) {
                if (!this.address(j).equals((Object)this.manager(0).getCoordinator())) continue;
                killIndex = j;
                break;
            }
            log.debugf("Killing coordinator %s", (Object)this.address(killIndex));
            this.manager(killIndex).stop();
            this.cacheManagers.remove(killIndex);
            if (this.cacheManagers.isEmpty()) continue;
            TestingUtil.blockUntilViewsReceived(60000, false, this.cacheManagers);
            for (j = 0; j < 50; ++j) {
                TestingUtil.waitForNoRebalance(this.caches("repl-cache-" + j));
                TestingUtil.waitForNoRebalance(this.caches("dist-cache-" + j));
            }
        }
    }

    @Override
    @AfterMethod
    protected void clearContent() throws Throwable {
    }
}

