/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.persistence.file;

import io.reactivex.rxjava3.core.Flowable;
import java.io.File;
import java.lang.invoke.CallSite;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.infinispan.Cache;
import org.infinispan.commons.test.CommonsTestingUtil;
import org.infinispan.commons.test.TestResourceTracker;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.IntSets;
import org.infinispan.commons.util.Util;
import org.infinispan.commons.util.concurrent.CompletionStages;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.cache.SingleFileStoreConfigurationBuilder;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.marshall.persistence.impl.MarshalledEntryUtil;
import org.infinispan.persistence.file.SingleFileStore;
import org.infinispan.persistence.spi.MarshallableEntry;
import org.infinispan.test.SingleCacheManagerTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.reactivestreams.Publisher;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@Test(groups={"unit"}, testName="persistence.file.SingleFileStoreStressTest")
public class SingleFileStoreStressTest
extends SingleCacheManagerTest {
    private static final String CACHE_NAME = "testCache";
    private static final String TIMES_STRING = "123456789_";
    public static final int NUM_SEGMENTS = 256;
    public static final IntSet ALL_SEGMENTS = IntSets.immutableRangeSet((int)256);
    private String location;

    @Override
    protected void teardown() {
        super.teardown();
        Util.recursiveFileRemove((String)this.location);
    }

    @Override
    protected EmbeddedCacheManager createCacheManager() throws Exception {
        this.location = CommonsTestingUtil.tmpDirectory(SingleFileStoreStressTest.class);
        GlobalConfigurationBuilder globalBuilder = new GlobalConfigurationBuilder().nonClusteredDefault();
        globalBuilder.globalState().enable().persistentLocation(this.location);
        ConfigurationBuilder builder = new ConfigurationBuilder();
        builder.clustering().hash().numSegments(256);
        ((SingleFileStoreConfigurationBuilder)builder.persistence().addSingleFileStore().purgeOnStartup(true)).segmented(false);
        EmbeddedCacheManager cacheManager = TestCacheManagerFactory.createCacheManager(globalBuilder, builder);
        cacheManager.defineConfiguration(CACHE_NAME, builder.build());
        return cacheManager;
    }

    private File getFileStore() {
        return new File(this.location, "testCache.dat");
    }

    public void testReadsAndWrites() throws ExecutionException, InterruptedException {
        int i;
        int writerThreads = 2;
        int readerThreads = 2;
        Cache cache = this.cacheManager.getCache(CACHE_NAME);
        SingleFileStore store = (SingleFileStore)TestingUtil.getFirstStore(cache);
        AssertJUnit.assertEquals((long)0L, (long)((Long)CompletionStages.join((CompletionStage)store.size(IntSets.immutableSet((int)0)))));
        List<String> keys = this.populateStore(5, 0, (SingleFileStore<String, String>)store, (Cache<String, String>)cache);
        CountDownLatch stopLatch = new CountDownLatch(1);
        Future[] writeFutures = new Future[2];
        for (int i2 = 0; i2 < 2; ++i2) {
            writeFutures[i2] = this.fork(this.stopOnException(new WriteTask(store, cache, keys, stopLatch), stopLatch));
        }
        Future[] readFutures = new Future[2];
        for (i = 0; i < 2; ++i) {
            readFutures[i] = this.fork(this.stopOnException(new ReadTask((SingleFileStore<String, String>)store, keys, false, stopLatch), stopLatch));
        }
        stopLatch.await(2L, TimeUnit.SECONDS);
        stopLatch.countDown();
        for (i = 0; i < 2; ++i) {
            writeFutures[i].get();
        }
        for (i = 0; i < 2; ++i) {
            readFutures[i].get();
        }
    }

    public void testWritesAndClear() throws ExecutionException, InterruptedException {
        int i;
        int writerThreads = 2;
        int readerThreads = 2;
        int numberOfKeys = 5;
        Cache cache = this.cacheManager.getCache(CACHE_NAME);
        SingleFileStore store = (SingleFileStore)TestingUtil.getFirstStore(cache);
        AssertJUnit.assertEquals((long)0L, (long)((Long)CompletionStages.join((CompletionStage)store.size(IntSets.immutableSet((int)0)))));
        ArrayList<String> keys = new ArrayList<String>(5);
        for (int j = 0; j < 5; ++j) {
            String key = "key" + j;
            keys.add(key);
        }
        CountDownLatch stopLatch = new CountDownLatch(1);
        Future[] writeFutures = new Future[2];
        for (int i2 = 0; i2 < 2; ++i2) {
            writeFutures[i2] = this.fork(this.stopOnException(new WriteTask(store, cache, keys, stopLatch), stopLatch));
        }
        Future[] readFutures = new Future[2];
        for (int i3 = 0; i3 < 2; ++i3) {
            readFutures[i3] = this.fork(this.stopOnException(new ReadTask((SingleFileStore<String, String>)store, keys, true, stopLatch), stopLatch));
        }
        Future<Object> clearFuture = this.fork(this.stopOnException(new ClearTask(store, stopLatch), stopLatch));
        stopLatch.await(2L, TimeUnit.SECONDS);
        stopLatch.countDown();
        for (i = 0; i < 2; ++i) {
            writeFutures[i].get();
        }
        for (i = 0; i < 2; ++i) {
            readFutures[i].get();
        }
        clearFuture.get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testSpaceOptimization() throws InterruptedException {
        int numberOfKeys = 100;
        int times = 10;
        Cache cache = this.cacheManager.getCache(CACHE_NAME);
        SingleFileStore store = (SingleFileStore)TestingUtil.getFirstStore(cache);
        AssertJUnit.assertEquals((long)0L, (long)((Long)CompletionStages.join((CompletionStage)store.size(IntSets.immutableSet((int)0)))));
        long[] fileSizesWithoutPurge = new long[10];
        long[] fileSizesWithPurge = new long[10];
        File file = new File(this.location, "testCache.dat");
        for (int i = 0; i < 10; ++i) {
            this.populateStore(100, i, (SingleFileStore<String, String>)store, (Cache<String, String>)cache);
            fileSizesWithoutPurge[i] = file.length();
        }
        store.clear();
        ExecutorService executor = Executors.newSingleThreadExecutor(this.getTestThreadFactory("Purge"));
        try {
            for (int i = 0; i < 10; ++i) {
                this.populateStore(100, i, (SingleFileStore<String, String>)store, (Cache<String, String>)cache);
                Flowable.fromPublisher((Publisher)store.purgeExpired()).blockingSubscribe();
                fileSizesWithPurge[i] = file.length();
            }
        }
        finally {
            executor.shutdownNow();
        }
        for (int j = 2; j < 10; ++j) {
            AssertJUnit.assertTrue((fileSizesWithPurge[j] < fileSizesWithoutPurge[j] ? 1 : 0) != 0);
        }
    }

    public void testFileTruncation() throws ExecutionException, InterruptedException {
        int i;
        int writerThreads = 2;
        int readerThreads = 2;
        int numberOfKeys = 5;
        Cache cache = this.cacheManager.getCache(CACHE_NAME);
        SingleFileStore store = (SingleFileStore)TestingUtil.getFirstStore(cache);
        AssertJUnit.assertEquals((long)0L, (long)((Long)CompletionStages.join((CompletionStage)store.size(IntSets.immutableSet((int)0)))));
        List<String> keys = this.populateStore(5, 0, (SingleFileStore<String, String>)store, (Cache<String, String>)cache);
        CountDownLatch stopLatch = new CountDownLatch(1);
        Future[] writeFutures = new Future[2];
        for (int i2 = 0; i2 < 2; ++i2) {
            writeFutures[i2] = this.fork(this.stopOnException(new WriteTask(store, cache, keys, stopLatch), stopLatch));
        }
        Future[] readFutures = new Future[2];
        for (i = 0; i < 2; ++i) {
            readFutures[i] = this.fork(this.stopOnException(new ReadTask((SingleFileStore<String, String>)store, keys, false, stopLatch), stopLatch));
        }
        stopLatch.await(2L, TimeUnit.SECONDS);
        stopLatch.countDown();
        for (i = 0; i < 2; ++i) {
            writeFutures[i].get();
        }
        for (i = 0; i < 2; ++i) {
            readFutures[i].get();
        }
        File file = this.getFileStore();
        long length1 = file.length();
        Flowable.fromPublisher((Publisher)store.purgeExpired()).blockingSubscribe();
        long length2 = file.length();
        AssertJUnit.assertTrue((String)String.format("Length1=%d, Length2=%d", length1, length2), (length2 <= length1 ? 1 : 0) != 0);
        String key = "key5";
        byte[] bytes = new byte[(int)store.getFileSize()];
        Arrays.fill(bytes, (byte)97);
        CompletionStages.join((CompletionStage)store.write(0, MarshalledEntryUtil.create(key, new String(bytes), cache)));
        length1 = file.length();
        CompletionStages.join((CompletionStage)store.delete(0, (Object)key));
        Flowable.fromPublisher((Publisher)store.purgeExpired()).blockingSubscribe();
        length2 = file.length();
        AssertJUnit.assertTrue((String)String.format("Length1=%d, Length2=%d", length1, length2), (length2 < length1 ? 1 : 0) != 0);
    }

    public List<String> populateStore(int numKeys, int numPadding, SingleFileStore<String, String> store, Cache<String, String> cache) {
        ArrayList<String> keys = new ArrayList<String>(numKeys);
        for (int j = 0; j < numKeys; ++j) {
            String key = "key" + j;
            String value = key + "_value_" + j + this.times(numPadding);
            keys.add(key);
            CompletionStages.join((CompletionStage)store.write(0, MarshalledEntryUtil.create(key, value, cache)));
        }
        return keys;
    }

    public void testProcess() throws ExecutionException, InterruptedException {
        int writerThreads = 2;
        int numberOfKeys = 2000;
        Cache cache = this.cacheManager.getCache(CACHE_NAME);
        SingleFileStore store = (SingleFileStore)TestingUtil.getFirstStore(cache);
        AssertJUnit.assertEquals((long)0L, (long)((Long)CompletionStages.join((CompletionStage)store.size(IntSets.immutableSet((int)0)))));
        ArrayList<String> keys = new ArrayList<String>(2000);
        this.populateStoreRandomValues(2000, (SingleFileStore<String, String>)store, (Cache<String, String>)cache, keys);
        CountDownLatch stopLatch = new CountDownLatch(1);
        Future[] writeFutures = new Future[2];
        for (int i = 0; i < 2; ++i) {
            writeFutures[i] = this.fork(this.stopOnException(new WriteTask(store, cache, keys, stopLatch), stopLatch));
        }
        Future<Object> processFuture = this.fork(this.stopOnException(new ProcessTask((SingleFileStore<String, String>)store), stopLatch));
        processFuture.get();
        stopLatch.countDown();
        for (int i = 0; i < 2; ++i) {
            writeFutures[i].get();
        }
    }

    public void testProcessWithNoDiskAccess() throws ExecutionException, InterruptedException {
        int writerThreads = 2;
        int numberOfKeys = 2000;
        Cache cache = this.cacheManager.getCache(CACHE_NAME);
        SingleFileStore store = (SingleFileStore)TestingUtil.getFirstStore(cache);
        AssertJUnit.assertEquals((long)0L, (long)((Long)CompletionStages.join((CompletionStage)store.size(IntSets.immutableSet((int)0)))));
        ArrayList<String> keys = new ArrayList<String>(2000);
        this.populateStoreRandomValues(2000, (SingleFileStore<String, String>)store, (Cache<String, String>)cache, keys);
        CountDownLatch stopLatch = new CountDownLatch(1);
        Future[] writeFutures = new Future[2];
        for (int i = 0; i < 2; ++i) {
            writeFutures[i] = this.fork(this.stopOnException(new WriteTask(store, cache, keys, stopLatch), stopLatch));
        }
        Future<Object> processFuture = this.fork(this.stopOnException(new ProcessTaskNoDiskRead(store), stopLatch));
        processFuture.get();
        stopLatch.countDown();
        for (int i = 0; i < 2; ++i) {
            writeFutures[i].get();
        }
    }

    private void populateStoreRandomValues(int numberOfKeys, SingleFileStore<String, String> store, Cache<String, String> cache, List<String> keys) {
        for (int j = 0; j < numberOfKeys; ++j) {
            String key = "key" + j;
            String value = key + "_value_" + j + this.times(new Random().nextInt(10));
            keys.add(key);
            CompletionStages.join((CompletionStage)store.write(0, MarshalledEntryUtil.create(key, value, cache)));
        }
    }

    private Callable<Object> stopOnException(Callable<Object> task, CountDownLatch stopLatch) {
        return new StopOnExceptionTask(task, stopLatch);
    }

    private String times(int count) {
        if (count == 0) {
            return "";
        }
        StringBuilder sb = new StringBuilder(TIMES_STRING.length() * count);
        for (int i = 0; i < count; ++i) {
            sb.append(TIMES_STRING);
        }
        return sb.toString();
    }

    private class WriteTask
    implements Callable<Object> {
        SingleFileStore<String, String> store;
        final Cache cache;
        final List<String> keys;
        final CountDownLatch stopLatch;

        WriteTask(SingleFileStore store, Cache cache, List<String> keys, CountDownLatch stopLatch) {
            this.store = store;
            this.cache = cache;
            this.keys = keys;
            this.stopLatch = stopLatch;
        }

        @Override
        public Object call() throws Exception {
            TestResourceTracker.testThreadStarted((String)SingleFileStoreStressTest.this.getTestName());
            Random random = new Random();
            int i = 0;
            while (this.stopLatch.getCount() != 0L) {
                String key = this.keys.get(random.nextInt(this.keys.size()));
                String value = key + "_value_" + i + "_" + SingleFileStoreStressTest.this.times(random.nextInt(1000) / 10);
                MarshallableEntry<String, CallSite> entry = MarshalledEntryUtil.create(key, value, this.cache);
                CompletionStages.join((CompletionStage)this.store.write(0, entry));
                ++i;
            }
            return null;
        }
    }

    private static class ReadTask
    implements Callable<Object> {
        final boolean allowNulls;
        final CountDownLatch stopLatch;
        final List<String> keys;
        SingleFileStore<String, String> store;

        ReadTask(SingleFileStore<String, String> store, List<String> keys, boolean allowNulls, CountDownLatch stopLatch) {
            this.allowNulls = allowNulls;
            this.stopLatch = stopLatch;
            this.keys = keys;
            this.store = store;
        }

        @Override
        public Random call() throws Exception {
            Random random = new Random();
            while (this.stopLatch.getCount() != 0L) {
                String key = this.keys.get(random.nextInt(this.keys.size()));
                MarshallableEntry entryFromStore = (MarshallableEntry)CompletionStages.join((CompletionStage)this.store.load(0, (Object)key));
                if (entryFromStore == null) {
                    AssertJUnit.assertTrue((boolean)this.allowNulls);
                    continue;
                }
                String storeValue = (String)entryFromStore.getValue();
                AssertJUnit.assertEquals((String)key, (String)((String)entryFromStore.getKey()));
                AssertJUnit.assertTrue((boolean)storeValue.startsWith(key));
            }
            return null;
        }
    }

    private class ClearTask
    implements Callable<Object> {
        final CountDownLatch stopLatch;
        SingleFileStore<String, String> store;

        ClearTask(SingleFileStore store, CountDownLatch stopLatch) {
            this.stopLatch = stopLatch;
            this.store = store;
        }

        @Override
        public Object call() throws Exception {
            File file = SingleFileStoreStressTest.this.getFileStore();
            AssertJUnit.assertTrue((boolean)file.exists());
            TimeUnit.MILLISECONDS.sleep(100L);
            while (this.stopLatch.getCount() != 0L) {
                log.tracef("Clearing store, store size before = %d, file size before = %d", this.store.getFileSize(), file.length());
                this.store.clear();
                TimeUnit.MILLISECONDS.sleep(1L);
                long fileSizeAfterClear = file.length();
                long storeSizeAfterClear = this.store.getFileSize();
                log.tracef("Cleared store, store size after = %d, file size after = %d", storeSizeAfterClear, fileSizeAfterClear);
                AssertJUnit.assertTrue((String)("Store size " + storeSizeAfterClear + " is smaller than the file size " + fileSizeAfterClear), (fileSizeAfterClear <= storeSizeAfterClear ? 1 : 0) != 0);
                TimeUnit.MILLISECONDS.sleep(100L);
            }
            return null;
        }
    }

    private class ProcessTask
    implements Callable<Object> {
        SingleFileStore<String, String> store;

        ProcessTask(SingleFileStore<String, String> store) {
            this.store = store;
        }

        @Override
        public Object call() throws Exception {
            File file = SingleFileStoreStressTest.this.getFileStore();
            AssertJUnit.assertTrue((boolean)file.exists());
            Long sum = (Long)Flowable.fromPublisher((Publisher)this.store.publishEntries(ALL_SEGMENTS, null, true)).doOnNext(me -> {
                String key = (String)me.getKey();
                String value = (String)me.getValue();
                AssertJUnit.assertEquals((String)key, (String)value.substring(0, key.length()));
            }).count().blockingGet();
            log.tracef("Processed %d entries from the store", (Object)sum);
            return null;
        }
    }

    private class ProcessTaskNoDiskRead
    implements Callable<Object> {
        final SingleFileStore<?, ?> store;

        ProcessTaskNoDiskRead(SingleFileStore<?, ?> store) {
            this.store = store;
        }

        @Override
        public Object call() throws Exception {
            File file = SingleFileStoreStressTest.this.getFileStore();
            AssertJUnit.assertTrue((boolean)file.exists());
            Long sum = (Long)Flowable.fromPublisher((Publisher)this.store.publishEntries(ALL_SEGMENTS, null, false)).doOnNext(me -> {
                Object key = me.getKey();
                AssertJUnit.assertNotNull((Object)key);
            }).count().blockingGet();
            log.tracef("Processed %d in-memory keys from the store", (Object)sum);
            return null;
        }
    }

    private static class StopOnExceptionTask
    implements Callable<Object> {
        final CountDownLatch stopLatch;
        final Callable<Object> delegate;

        StopOnExceptionTask(Callable<Object> delegate, CountDownLatch stopLatch) {
            this.stopLatch = stopLatch;
            this.delegate = delegate;
        }

        @Override
        public Object call() throws Exception {
            try {
                return this.delegate.call();
            }
            catch (Throwable t) {
                this.stopLatch.countDown();
                throw new Exception(t);
            }
        }
    }
}

