/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.stress;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.infinispan.Cache;
import org.infinispan.commons.io.ByteBufferFactory;
import org.infinispan.commons.io.ByteBufferFactoryImpl;
import org.infinispan.commons.marshall.Marshaller;
import org.infinispan.commons.test.CommonsTestingUtil;
import org.infinispan.commons.time.DefaultTimeService;
import org.infinispan.commons.time.TimeService;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.IntSets;
import org.infinispan.commons.util.ProcessorInfo;
import org.infinispan.commons.util.Util;
import org.infinispan.commons.util.concurrent.BlockingRejectedExecutionHandler;
import org.infinispan.commons.util.concurrent.CompletionStages;
import org.infinispan.commons.util.concurrent.NonBlockingRejectedExecutionHandler;
import org.infinispan.configuration.cache.SingleFileStoreConfiguration;
import org.infinispan.configuration.cache.SingleFileStoreConfigurationBuilder;
import org.infinispan.configuration.cache.StoreConfiguration;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.container.impl.InternalEntryFactory;
import org.infinispan.container.impl.InternalEntryFactoryImpl;
import org.infinispan.distribution.ch.KeyPartitioner;
import org.infinispan.distribution.ch.impl.SingleSegmentKeyPartitioner;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.factories.KnownComponentNames;
import org.infinispan.factories.impl.TestComponentAccessors;
import org.infinispan.factories.threads.NonBlockingThreadFactory;
import org.infinispan.marshall.TestObjectStreamMarshaller;
import org.infinispan.marshall.persistence.impl.MarshalledEntryFactoryImpl;
import org.infinispan.marshall.persistence.impl.MarshalledEntryUtil;
import org.infinispan.metadata.EmbeddedMetadata;
import org.infinispan.persistence.DummyInitializationContext;
import org.infinispan.persistence.PersistenceUtil;
import org.infinispan.persistence.async.AsyncNonBlockingStore;
import org.infinispan.persistence.dummy.DummyInMemoryStore;
import org.infinispan.persistence.dummy.DummyInMemoryStoreConfiguration;
import org.infinispan.persistence.dummy.DummyInMemoryStoreConfigurationBuilder;
import org.infinispan.persistence.file.SingleFileStore;
import org.infinispan.persistence.spi.InitializationContext;
import org.infinispan.persistence.spi.MarshallableEntry;
import org.infinispan.persistence.spi.MarshallableEntryFactory;
import org.infinispan.persistence.spi.NonBlockingStore;
import org.infinispan.persistence.spi.PersistenceException;
import org.infinispan.test.AbstractInfinispanTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.infinispan.util.concurrent.BlockingManager;
import org.infinispan.util.concurrent.BlockingManagerImpl;
import org.infinispan.util.concurrent.locks.impl.LockContainer;
import org.infinispan.util.concurrent.locks.impl.PerKeyLockContainer;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(testName="stress.AsyncStoreStressTest", groups={"stress"})
public class AsyncStoreStressTest
extends AbstractInfinispanTest {
    static final Log log = LogFactory.getLog(AsyncStoreStressTest.class);
    static final int CAPACITY = Integer.getInteger("size", 100000);
    static final int LOOP_FACTOR = 10;
    static final long RUNNING_TIME = Integer.getInteger("time", 1) * 60 * 1000;
    static final Random RANDOM = new Random(12345L);
    private volatile CountDownLatch latch;
    private List<String> keys = new ArrayList<String>();
    private final InternalEntryFactory entryFactory = new InternalEntryFactoryImpl();
    private final Map<Object, InternalCacheEntry> expectedState = new ConcurrentHashMap<Object, InternalCacheEntry>();
    private TestObjectStreamMarshaller marshaller;
    private ExecutorService nonBlockingExecutor;
    private ExecutorService blockingExecutor;
    private TimeService timeService;
    protected String location;
    private LockContainer locks;

    @BeforeClass(alwaysRun=true)
    void startMarshaller() {
        this.marshaller = new TestObjectStreamMarshaller();
        this.location = CommonsTestingUtil.tmpDirectory(this.getClass());
        this.nonBlockingExecutor = new ThreadPoolExecutor(0, ProcessorInfo.availableProcessors() * 2, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(KnownComponentNames.getDefaultQueueSize((String)"org.infinispan.executors.non-blocking")), (ThreadFactory)new NonBlockingThreadFactory(5, "%c-%n-p%f-t%t", "Test", "non-blocking"), NonBlockingRejectedExecutionHandler.getInstance());
        this.blockingExecutor = new ThreadPoolExecutor(0, 150, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(KnownComponentNames.getDefaultQueueSize((String)"org.infinispan.executors.blocking")), this.getTestThreadFactory("Blocking"), (RejectedExecutionHandler)BlockingRejectedExecutionHandler.getInstance());
        PerKeyLockContainer lockContainer = new PerKeyLockContainer();
        TestingUtil.inject(lockContainer, new DefaultTimeService());
        this.locks = lockContainer;
        this.timeService = new DefaultTimeService();
        TestingUtil.inject(this.locks, this.timeService, this.nonBlockingExecutor);
    }

    @AfterClass(alwaysRun=true)
    void stopMarshaller() throws InterruptedException {
        this.marshaller.stop();
        Util.recursiveFileRemove((String)this.location);
        if (this.nonBlockingExecutor != null) {
            this.nonBlockingExecutor.shutdown();
            this.nonBlockingExecutor.awaitTermination(10L, TimeUnit.SECONDS);
        }
        if (this.blockingExecutor != null) {
            this.blockingExecutor.shutdown();
            this.blockingExecutor.awaitTermination(10L, TimeUnit.SECONDS);
        }
    }

    private AsyncNonBlockingStore<Object, Object> createDummyAsyncStore() {
        DummyInMemoryStoreConfiguration dummyConfiguration = ((DummyInMemoryStoreConfigurationBuilder)((DummyInMemoryStoreConfigurationBuilder)TestCacheManagerFactory.getDefaultCacheConfiguration(false).persistence().addStore(DummyInMemoryStoreConfigurationBuilder.class)).segmented(false)).storeName("async2").create();
        return this.createAsyncStore(new DummyInMemoryStore(), (StoreConfiguration)dummyConfiguration);
    }

    private AsyncNonBlockingStore<Object, Object> createFileAsyncStore() {
        SingleFileStoreConfiguration singleFileStoreConfiguration = ((SingleFileStoreConfigurationBuilder)TestCacheManagerFactory.getDefaultCacheConfiguration(false).persistence().addSingleFileStore().location(this.location).segmented(false)).create();
        return this.createAsyncStore((NonBlockingStore)new SingleFileStore(), (StoreConfiguration)singleFileStoreConfiguration);
    }

    private AsyncNonBlockingStore<Object, Object> createAsyncStore(NonBlockingStore backendStore, StoreConfiguration storeConfiguration) throws PersistenceException {
        AsyncNonBlockingStore store = new AsyncNonBlockingStore(backendStore);
        BlockingManagerImpl blockingManager = new BlockingManagerImpl();
        TestingUtil.inject(blockingManager, new TestComponentAccessors.NamedComponent("org.infinispan.executors.non-blocking", this.nonBlockingExecutor), new TestComponentAccessors.NamedComponent("org.infinispan.executors.blocking", this.blockingExecutor));
        TestingUtil.startComponent(blockingManager);
        Cache cacheMock = (Cache)Mockito.mock(Cache.class, (Answer)Mockito.RETURNS_DEEP_STUBS);
        Mockito.when((Object)((KeyPartitioner)ComponentRegistry.componentOf((Cache)cacheMock, KeyPartitioner.class))).thenReturn((Object)SingleSegmentKeyPartitioner.getInstance());
        CompletionStages.join((CompletionStage)store.start((InitializationContext)new DummyInitializationContext(storeConfiguration, cacheMock, this.marshaller, (ByteBufferFactory)new ByteBufferFactoryImpl(), (MarshallableEntryFactory)new MarshalledEntryFactoryImpl((Marshaller)this.marshaller), this.nonBlockingExecutor, new GlobalConfigurationBuilder().globalState().persistentLocation(this.location).build(), (BlockingManager)blockingManager, null, (TimeService)new DefaultTimeService())));
        return store;
    }

    private Map<String, AsyncNonBlockingStore<Object, Object>> createAsyncStores() throws PersistenceException {
        TreeMap<String, AsyncNonBlockingStore<Object, Object>> stores = new TreeMap<String, AsyncNonBlockingStore<Object, Object>>();
        stores.put("Dummy-ASYNC", this.createDummyAsyncStore());
        stores.put("File-ASYNC", this.createFileAsyncStore());
        return stores;
    }

    @DataProvider(name="readWriteRemove")
    public Object[][] independentReadWriteRemoveParams() {
        return new Object[][]{{CAPACITY, 3 * CAPACITY, 9, 20, 1}, {CAPACITY, 3 * CAPACITY, 90, 1, 0}};
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="readWriteRemove")
    public void testReadWriteRemove(int capacity, int numKeys, int readerThreads, int writerThreads, int removerThreads) throws Exception {
        System.out.printf("Testing independent read/write/remove performance with capacity %d, keys %d, readers %d, writers %d, removers %d\n", capacity, numKeys, readerThreads, writerThreads, removerThreads);
        this.generateKeyList(numKeys);
        Map<String, AsyncNonBlockingStore<Object, Object>> stores = this.createAsyncStores();
        try {
            for (Map.Entry<String, AsyncNonBlockingStore<Object, Object>> e : stores.entrySet()) {
                this.mapTestReadWriteRemove(e.getKey(), e.getValue(), numKeys, readerThreads, writerThreads, removerThreads);
            }
        }
        finally {
            Iterator<AsyncNonBlockingStore<Object, Object>> it = stores.values().iterator();
            while (it.hasNext()) {
                AsyncNonBlockingStore<Object, Object> store = it.next();
                try {
                    CompletionStages.join((CompletionStage)store.stop());
                    it.remove();
                }
                catch (Exception ex) {
                    log.error((Object)"Failed to stop cache store", (Throwable)ex);
                }
            }
        }
        AssertJUnit.assertTrue((String)"Not all stores were properly shut down", (boolean)stores.isEmpty());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void mapTestReadWriteRemove(String name, AsyncNonBlockingStore<Object, Object> store, int numKeys, int readerThreads, int writerThreads, int removerThreads) throws Exception {
        NonBlockingStore delegate = store.delegate();
        try {
            System.out.printf("[store=%s] Warming up\n", name);
            this.runMapTestReadWriteRemove(name, (NonBlockingStore<Object, Object>)store, readerThreads, writerThreads, removerThreads, 1000L);
            System.out.printf("[store=%s] Testing...\n", name);
            TotalStats perf = this.runMapTestReadWriteRemove(name, (NonBlockingStore<Object, Object>)store, readerThreads, writerThreads, removerThreads, RUNNING_TIME);
            System.out.printf("[store=%s] Verify contents\n", name);
            AsyncStoreStressTest.eventually(() -> "Store contains: " + String.valueOf(PersistenceUtil.toKeySet((NonBlockingStore)delegate, (IntSet)IntSets.immutableSet((int)0), null)) + " but expected: " + String.valueOf(this.expectedState.keySet()), () -> PersistenceUtil.toKeySet((NonBlockingStore)delegate, (IntSet)IntSets.immutableSet((int)0), null).equals(this.expectedState.keySet()));
            System.out.printf("Container %-12s  ", name);
            System.out.printf("Ops/s %10.2f  ", perf.getTotalOpsPerSec());
            System.out.printf("Gets/s %10.2f  ", perf.getOpsPerSec("GET"));
            System.out.printf("Puts/s %10.2f  ", perf.getOpsPerSec("PUT"));
            System.out.printf("Removes/s %10.2f  ", perf.getOpsPerSec("REMOVE"));
            System.out.printf("HitRatio %10.2f  ", perf.getTotalHitRatio() * 100.0);
            System.out.printf("Size %10d  ", CompletionStages.join((CompletionStage)store.size(IntSets.immutableSet((int)0))), null);
            double stdDev = this.computeStdDev((NonBlockingStore)store, numKeys);
            System.out.printf("StdDev %10.2f\n", stdDev);
        }
        finally {
            this.expectedState.clear();
            CompletionStages.join((CompletionStage)delegate.clear());
        }
    }

    private TotalStats runMapTestReadWriteRemove(String name, NonBlockingStore<Object, Object> store, int numReaders, int numWriters, int numRemovers, long runningTimeout) throws Exception {
        int i;
        this.latch = new CountDownLatch(1);
        TotalStats perf = new TotalStats();
        LinkedList<WorkerThread> threads = new LinkedList<WorkerThread>();
        for (i = 0; i < numReaders; ++i) {
            WorkerThread workerThread = new WorkerThread("worker-" + name + "-get-" + i, runningTimeout, perf, this.readOperation(store));
            threads.add(workerThread);
        }
        for (i = 0; i < numWriters; ++i) {
            WorkerThread workerThread = new WorkerThread("worker-" + name + "-put-" + i, runningTimeout, perf, this.writeOperation(store));
            threads.add(workerThread);
        }
        for (i = 0; i < numRemovers; ++i) {
            WorkerThread workerThread = new WorkerThread("worker-" + name + "-remove-" + i, runningTimeout, perf, this.removeOperation(store));
            threads.add(workerThread);
        }
        for (Thread thread : threads) {
            thread.start();
        }
        this.latch.countDown();
        for (Thread thread : threads) {
            thread.join();
        }
        return perf;
    }

    private void generateKeyList(int numKeys) {
        this.keys = null;
        this.keys = new ArrayList<String>(numKeys * 10);
        for (int i = 0; i < numKeys * 10; ++i) {
            this.keys.add("key" + this.nextIntGaussian(numKeys));
        }
    }

    private int nextIntGaussian(int numKeys) {
        double gaussian = RANDOM.nextGaussian();
        if (gaussian < -3.0 || gaussian > 3.0) {
            return this.nextIntGaussian(numKeys);
        }
        return (int)Math.abs((gaussian + 3.0) * (double)numKeys / 6.0);
    }

    private void waitForStart() {
        try {
            this.latch.await();
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private Operation<String, Integer> readOperation(final NonBlockingStore<Object, Object> store) {
        return new Operation<String, Integer>(this, "GET"){
            final /* synthetic */ AsyncStoreStressTest this$0;
            {
                this.this$0 = this$0;
                super(name);
            }

            @Override
            public boolean call(String key, long run) {
                MarshallableEntry me = (MarshallableEntry)CompletionStages.join((CompletionStage)store.load(0, (Object)key));
                if (log.isTraceEnabled()) {
                    log.tracef("Loaded key=%s, value=%s", (Object)key, me != null ? me.getValue() : "null");
                }
                return me != null;
            }
        };
    }

    private Operation<String, Integer> writeOperation(final NonBlockingStore<Object, Object> store) {
        return new Operation<String, Integer>(this, "PUT"){
            final /* synthetic */ AsyncStoreStressTest this$0;
            {
                this.this$0 = this$0;
                super(name);
            }

            @Override
            public boolean call(String key, long run) {
                int value = (int)run;
                InternalCacheEntry entry = this.this$0.entryFactory.create((Object)key, (Object)value, new EmbeddedMetadata.Builder().build());
                return this.this$0.withKeyLock(key, () -> {
                    CompletionStages.join((CompletionStage)store.write(0, MarshalledEntryUtil.create(entry, (Marshaller)this.this$0.marshaller)));
                    this.this$0.expectedState.put(key, entry);
                    if (log.isTraceEnabled()) {
                        log.tracef("Expected state updated with key=%s, value=%s", (Object)key, (Object)value);
                    }
                    return true;
                });
            }
        };
    }

    private Operation<String, Integer> removeOperation(final NonBlockingStore<Object, Object> store) {
        return new Operation<String, Integer>(this, "REMOVE"){
            final /* synthetic */ AsyncStoreStressTest this$0;
            {
                this.this$0 = this$0;
                super(name);
            }

            @Override
            public boolean call(String key, long run) {
                return this.this$0.withKeyLock(key, () -> {
                    Boolean removed = (Boolean)CompletionStages.join((CompletionStage)store.delete(0, (Object)key));
                    AssertJUnit.assertNull((Object)removed);
                    this.this$0.expectedState.remove(key);
                    if (log.isTraceEnabled()) {
                        log.tracef("Expected state removed key=%s", (Object)key);
                    }
                    return true;
                });
            }
        };
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean withKeyLock(String key, Callable<Boolean> call) {
        boolean result = false;
        try {
            this.locks.acquire((Object)key, (Object)Thread.currentThread(), 5L, TimeUnit.SECONDS).lock();
            result = call.call();
        }
        catch (Exception e) {
            e.printStackTrace();
        }
        finally {
            this.locks.release((Object)key, (Object)Thread.currentThread());
        }
        return result;
    }

    private double computeStdDev(NonBlockingStore store, int numKeys) throws PersistenceException {
        double variance = 0.0;
        Set keys = PersistenceUtil.toKeySet((NonBlockingStore)store, (IntSet)IntSets.immutableSet((int)0), null);
        for (Object key : keys) {
            double value = Integer.parseInt(((String)key).substring(3));
            variance += (value - (double)(numKeys / 2)) * (value - (double)(numKeys / 2));
        }
        return Math.sqrt(variance / (double)keys.size());
    }

    @Test(enabled=false)
    public static void main(String[] args) throws Exception {
        AsyncStoreStressTest test = new AsyncStoreStressTest();
        test.testReadWriteRemove(100000, 300000, 90, 9, 1);
        test.testReadWriteRemove(10000, 30000, 9, 1, 0);
        System.exit(0);
    }

    private static class TotalStats {
        private final ConcurrentHashMap<String, OpStats> statsMap = new ConcurrentHashMap();

        private TotalStats() {
        }

        public void addStats(String opName, long opCount, long runningTime, long missCount) {
            boolean replaced;
            OpStats s = new OpStats(opName, opCount, runningTime, missCount);
            OpStats old = this.statsMap.putIfAbsent(opName, s);
            boolean bl = replaced = old == null;
            while (!replaced) {
                old = this.statsMap.get(opName);
                s = new OpStats(old, opCount, runningTime, missCount);
                replaced = this.statsMap.replace(opName, old, s);
            }
        }

        public double getOpsPerSec(String opName) {
            OpStats s = this.statsMap.get(opName);
            if (s == null) {
                return 0.0;
            }
            return (double)s.opCount * 1000.0 / (double)s.runningTime * (double)s.threadCount;
        }

        public double getTotalOpsPerSec() {
            long totalOpCount = 0L;
            long totalRunningTime = 0L;
            long totalThreadCount = 0L;
            for (Map.Entry<String, OpStats> e : this.statsMap.entrySet()) {
                OpStats s = e.getValue();
                totalOpCount += s.opCount;
                totalRunningTime += s.runningTime;
                totalThreadCount += (long)s.threadCount;
            }
            return (double)totalOpCount * 1000.0 / (double)totalRunningTime * (double)totalThreadCount;
        }

        public double getHitRatio(String opName) {
            OpStats s = this.statsMap.get(opName);
            if (s == null) {
                return 0.0;
            }
            return 1.0 - 1.0 * (double)s.missCount / (double)s.opCount;
        }

        public double getTotalHitRatio() {
            long totalOpCount = 0L;
            long totalMissCount = 0L;
            for (Map.Entry<String, OpStats> e : this.statsMap.entrySet()) {
                OpStats s = e.getValue();
                totalOpCount += s.opCount;
                totalMissCount += s.missCount;
            }
            return 1.0 - 1.0 * (double)totalMissCount / (double)totalOpCount;
        }
    }

    private class WorkerThread
    extends Thread {
        private final long runningTimeout;
        private final TotalStats perf;
        private final Operation<String, Integer> op;

        public WorkerThread(String name, long runningTimeout, TotalStats perf, Operation<String, Integer> op) {
            super(name);
            this.runningTimeout = runningTimeout;
            this.perf = perf;
            this.op = op;
        }

        @Override
        public void run() {
            AsyncStoreStressTest.this.waitForStart();
            long startMilis = System.currentTimeMillis();
            long endMillis = startMilis + this.runningTimeout;
            int keyIndex = RANDOM.nextInt(AsyncStoreStressTest.this.keys.size());
            long runs = 0L;
            long missCount = 0L;
            while ((runs & 0x3FFFL) != 0L || System.currentTimeMillis() < endMillis) {
                boolean hit = this.op.call(AsyncStoreStressTest.this.keys.get(keyIndex), runs);
                if (!hit) {
                    ++missCount;
                }
                ++runs;
                if (++keyIndex < AsyncStoreStressTest.this.keys.size()) continue;
                keyIndex = 0;
            }
            this.perf.addStats(this.op.getName(), runs, System.currentTimeMillis() - startMilis, missCount);
        }
    }

    private static abstract class Operation<K, V> {
        protected final String name;

        public Operation(String name) {
            this.name = name;
        }

        public abstract boolean call(K var1, long var2);

        public String getName() {
            return this.name;
        }
    }

    private static class OpStats {
        public final String opName;
        public final int threadCount;
        public final long opCount;
        public final long runningTime;
        public final long missCount;

        private OpStats(String opName, long opCount, long runningTime, long missCount) {
            this.opName = opName;
            this.threadCount = 1;
            this.opCount = opCount;
            this.runningTime = runningTime;
            this.missCount = missCount;
        }

        private OpStats(OpStats base, long opCount, long runningTime, long missCount) {
            this.opName = base.opName;
            this.threadCount = base.threadCount + 1;
            this.opCount = base.opCount + opCount;
            this.runningTime = base.runningTime + runningTime;
            this.missCount = base.missCount + missCount;
        }
    }
}

