/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.persistence.support;

import java.lang.reflect.Method;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.infinispan.Cache;
import org.infinispan.commons.CacheException;
import org.infinispan.commons.marshall.Marshaller;
import org.infinispan.commons.test.Exceptions;
import org.infinispan.commons.test.TestResourceTracker;
import org.infinispan.commons.util.IntSets;
import org.infinispan.commons.util.concurrent.CompletionStages;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.marshall.TestObjectStreamMarshaller;
import org.infinispan.marshall.persistence.impl.MarshalledEntryUtil;
import org.infinispan.persistence.async.AsyncNonBlockingStore;
import org.infinispan.persistence.dummy.DummyInMemoryStore;
import org.infinispan.persistence.dummy.DummyInMemoryStoreConfigurationBuilder;
import org.infinispan.persistence.spi.InitializationContext;
import org.infinispan.persistence.spi.MarshallableEntry;
import org.infinispan.persistence.spi.NonBlockingStore;
import org.infinispan.persistence.spi.PersistenceException;
import org.infinispan.persistence.support.DelayStore;
import org.infinispan.persistence.support.DelegatingInitializationContext;
import org.infinispan.test.AbstractInfinispanTest;
import org.infinispan.test.CacheManagerCallable;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.infinispan.test.fwk.TestInternalCacheEntryFactory;
import org.infinispan.util.PersistenceMockUtil;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups={"unit"}, testName="persistence.support.AsyncStoreTest")
public class AsyncStoreTest
extends AbstractInfinispanTest {
    private static final Log log = LogFactory.getLog(AsyncStoreTest.class);
    private AsyncNonBlockingStore<Object, Object> store;
    private TestObjectStreamMarshaller marshaller;
    private final int CACHE_SEGMENT_MAX = 256;

    private InitializationContext createStore() throws PersistenceException {
        ConfigurationBuilder builder = TestCacheManagerFactory.getDefaultCacheConfiguration(false);
        DummyInMemoryStoreConfigurationBuilder dummyCfg = (DummyInMemoryStoreConfigurationBuilder)((DummyInMemoryStoreConfigurationBuilder)builder.persistence().addStore(DummyInMemoryStoreConfigurationBuilder.class)).asyncOperation(true).storeName(AsyncStoreTest.class.getName()).segmented(false);
        dummyCfg.async().enable();
        final InitializationContext testCtx = PersistenceMockUtil.createContext(this.getClass(), builder.build(), this.marshaller);
        DelegatingInitializationContext ctx = new DelegatingInitializationContext(this){
            final /* synthetic */ AsyncStoreTest this$0;
            {
                this.this$0 = this$0;
            }

            public InitializationContext delegate() {
                return testCtx;
            }

            public Executor getNonBlockingExecutor() {
                return this.this$0.testExecutor();
            }
        };
        DummyInMemoryStore underlying = new DummyInMemoryStore();
        this.store = new AsyncNonBlockingStore(underlying);
        CompletionStages.join((CompletionStage)this.store.start((InitializationContext)ctx));
        return ctx;
    }

    @BeforeMethod
    public void createMarshalledEntryFactory() {
        this.marshaller = new TestObjectStreamMarshaller();
    }

    @AfterMethod
    public void tearDown() throws PersistenceException {
        if (this.store != null) {
            CompletionStages.join((CompletionStage)this.store.stop());
        }
        this.marshaller.stop();
    }

    @Test(timeOut=30000L)
    public void testPutRemove() throws Exception {
        TestResourceTracker.testThreadStarted((String)this.getTestName());
        this.createStore();
        int number = 1000;
        String key = "testPutRemove-k-";
        String value = "testPutRemove-v-";
        this.doTestPut(1000, key, value);
        this.doTestRemove(1000, key);
    }

    @Test(timeOut=30000L)
    public void testRepeatedPutRemove() throws Exception {
        TestResourceTracker.testThreadStarted((String)this.getTestName());
        this.createStore();
        int number = 10;
        int loops = 2000;
        String key = "testRepeatedPutRemove-k-";
        String value = "testRepeatedPutRemove-v-";
        int failures = 0;
        for (int i = 0; i < 2000; ++i) {
            try {
                this.doTestPut(10, key, value);
                this.doTestRemove(10, key);
                continue;
            }
            catch (Error e) {
                ++failures;
            }
        }
        AssertJUnit.assertEquals((int)0, (int)failures);
    }

    @Test(timeOut=30000L)
    public void testPutClearPut() throws Exception {
        TestResourceTracker.testThreadStarted((String)this.getTestName());
        this.createStore();
        int number = 1000;
        String key = "testPutClearPut-k-";
        String value = "testPutClearPut-v-";
        this.doTestPut(1000, key, value);
        this.doTestClear(1000, key);
        value = "testPutClearPut-v[2]-";
        this.doTestPut(1000, key, value);
        this.doTestRemove(1000, key);
    }

    @Test(timeOut=30000L)
    public void testRepeatedPutClearPut() throws Exception {
        TestResourceTracker.testThreadStarted((String)this.getTestName());
        this.createStore();
        int number = 10;
        int loops = 2000;
        String key = "testRepeatedPutClearPut-k-";
        String value = "testRepeatedPutClearPut-v-";
        String value2 = "testRepeatedPutClearPut-v[2]-";
        int failures = 0;
        for (int i = 0; i < 2000; ++i) {
            try {
                this.doTestPut(10, key, value);
                this.doTestClear(10, key);
                this.doTestPut(10, key, value2);
                continue;
            }
            catch (Error e) {
                ++failures;
            }
        }
        AssertJUnit.assertEquals((int)0, (int)failures);
    }

    @Test(timeOut=30000L)
    public void testMultiplePutsOnSameKey() throws Exception {
        TestResourceTracker.testThreadStarted((String)this.getTestName());
        this.createStore();
        int number = 1000;
        String key = "testMultiplePutsOnSameKey-k";
        String value = "testMultiplePutsOnSameKey-v-";
        this.doTestSameKeyPut(1000, key, value);
        this.doTestSameKeyRemove(key);
    }

    @Test(timeOut=30000L)
    public void testRestrictionOnAddingToAsyncQueue() throws Exception {
        TestResourceTracker.testThreadStarted((String)this.getTestName());
        InitializationContext ctx = this.createStore();
        this.store.delete(0, (Object)"blah");
        int number = 10;
        String key = "testRestrictionOnAddingToAsyncQueue-k";
        String value = "testRestrictionOnAddingToAsyncQueue-v-";
        this.doTestPut(10, key, value);
        CompletionStages.join((CompletionStage)this.store.stop());
        try {
            this.store.write(0, MarshalledEntryUtil.create("k", (Marshaller)this.marshaller));
            AssertJUnit.fail((String)"Should have restricted this entry from being made");
        }
        catch (CacheException cacheException) {
            // empty catch block
        }
        CompletionStages.join((CompletionStage)this.store.start(ctx));
        this.doTestRemove(10, key);
    }

    public int segmentForKey(Object key) {
        return String.valueOf(key).hashCode() % 256;
    }

    private void doTestPut(int number, String key, String value) {
        int i;
        for (i = 0; i < number; ++i) {
            InternalCacheEntry cacheEntry = TestInternalCacheEntryFactory.create(key + i, value + i);
            this.store.write(this.segmentForKey(cacheEntry.getKey()), MarshalledEntryUtil.create(cacheEntry, (Marshaller)this.marshaller));
        }
        for (i = 0; i < number; ++i) {
            String keyStr = key + i;
            MarshallableEntry me = (MarshallableEntry)CompletionStages.join((CompletionStage)this.store.load(this.segmentForKey(keyStr), (Object)(key + i)));
            AssertJUnit.assertNotNull((Object)me);
            AssertJUnit.assertEquals((Object)(value + i), (Object)me.getValue());
        }
    }

    private void doTestSameKeyPut(int number, String key, String value) {
        int segment = this.segmentForKey(key);
        for (int i = 0; i < number; ++i) {
            this.store.write(segment, MarshalledEntryUtil.create(key, value + i, (Marshaller)this.marshaller));
        }
        MarshallableEntry me = (MarshallableEntry)CompletionStages.join((CompletionStage)this.store.load(segment, (Object)key));
        AssertJUnit.assertNotNull((Object)me);
        AssertJUnit.assertEquals((Object)(value + (number - 1)), (Object)me.getValue());
    }

    private void doTestRemove(int number, String key) throws Exception {
        int i;
        for (i = 0; i < number; ++i) {
            this.store.delete(0, (Object)(key + i));
        }
        for (i = 0; i < number; ++i) {
            AssertJUnit.assertNull((Object)CompletionStages.join((CompletionStage)this.store.load(0, (Object)(key + i))));
        }
    }

    private void doTestSameKeyRemove(String key) {
        this.store.delete(0, (Object)key);
        AssertJUnit.assertNull((Object)CompletionStages.join((CompletionStage)this.store.load(0, (Object)key)));
    }

    private void doTestClear(int number, String key) throws Exception {
        CompletionStages.join((CompletionStage)this.store.clear());
        for (int i = 0; i < number; ++i) {
            AssertJUnit.assertNull((Object)CompletionStages.join((CompletionStage)this.store.load(0, (Object)(key + i))));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testModificationQueueSize(Method m) throws Exception {
        int queueSize = 5;
        DelayStore underlying = new DelayStore();
        ConfigurationBuilder builder = TestCacheManagerFactory.getDefaultCacheConfiguration(false);
        ((DelayStore.ConfigurationBuilder)builder.persistence().addStore(DelayStore.ConfigurationBuilder.class)).async().modificationQueueSize(queueSize);
        this.store = new AsyncNonBlockingStore((NonBlockingStore)underlying);
        InitializationContext ctx = PersistenceMockUtil.createContext(this.getClass(), builder.build(), this.marshaller);
        CompletionStages.join((CompletionStage)this.store.start(ctx));
        underlying.delayAfterModification(queueSize + 2);
        try {
            CountDownLatch queueFullLatch = new CountDownLatch(1);
            Future<Void> f = this.fork(() -> {
                for (int i = 0; i < queueSize; ++i) {
                    CompletionStages.join((CompletionStage)this.store.write(0, MarshalledEntryUtil.create(TestingUtil.k(m, i), TestingUtil.v(m, i), (Marshaller)this.marshaller)));
                }
                CompletionStage blockedWrite = this.store.write(0, MarshalledEntryUtil.create(TestingUtil.k(m, queueSize + 1), TestingUtil.v(m, queueSize + 1), (Marshaller)this.marshaller));
                AssertJUnit.assertFalse((boolean)blockedWrite.toCompletableFuture().isDone());
                queueFullLatch.countDown();
                CompletionStages.join((CompletionStage)blockedWrite);
            });
            AssertJUnit.assertTrue((boolean)queueFullLatch.await(10L, TimeUnit.SECONDS));
            Thread.sleep(50L);
            AssertJUnit.assertFalse((boolean)f.isDone());
            AssertJUnit.assertEquals((long)1L, (long)underlying.size());
            AssertJUnit.assertEquals((long)1L, (long)((Long)CompletionStages.join((CompletionStage)this.store.size(IntSets.immutableSet((int)0)))));
            underlying.endDelay();
            f.get(10L, TimeUnit.SECONDS);
            AssertJUnit.assertEquals((long)(queueSize + 1), (long)underlying.size());
        }
        finally {
            underlying.endDelay();
            CompletionStages.join((CompletionStage)this.store.stop());
        }
    }

    public void testEndToEndPutPutPassivation() throws Exception {
        this.doTestEndToEndPutPut(true);
    }

    public void testEndToEndPutPut() throws Exception {
        this.doTestEndToEndPutPut(false);
    }

    private void doTestEndToEndPutPut(final boolean passivation) throws Exception {
        TestingUtil.withCacheManager(new OneEntryCacheManagerCallable(this, passivation){
            final /* synthetic */ AsyncStoreTest this$0;
            {
                this.this$0 = this$0;
                super(passivation2);
            }

            @Override
            public void call() throws InterruptedException {
                this.cache.put((Object)"X", (Object)"1");
                this.cache.put((Object)"Y", (Object)"1");
                AsyncStoreTest.eventually(() -> this.store.loadEntry("X") != null);
                this.store.delayAfterModification(3);
                try {
                    this.cache.put((Object)"X", (Object)"2");
                    CompletableFuture stage = this.cache.putAsync((Object)"Y", (Object)"2");
                    if (!passivation) {
                        CompletionStages.join((CompletionStage)stage);
                        this.cache.putAsync((Object)"Z", (Object)"1");
                    }
                    AssertJUnit.assertEquals((String)"2", (String)((String)this.cache.get((Object)"X")));
                    if (!passivation) {
                        AssertJUnit.assertEquals((String)"1", (String)((String)this.cache.get((Object)"Z")));
                    }
                }
                finally {
                    this.store.endDelay();
                }
            }
        });
    }

    public void testEndToEndPutRemovePassivation() throws Exception {
        this.doTestEndToEndPutRemove(true);
    }

    public void testEndToEndPutRemove() throws Exception {
        this.doTestEndToEndPutRemove(false);
    }

    private void doTestEndToEndPutRemove(final boolean passivation) throws Exception {
        TestingUtil.withCacheManager(new OneEntryCacheManagerCallable(this, passivation){
            final /* synthetic */ AsyncStoreTest this$0;
            {
                this.this$0 = this$0;
                super(passivation2);
            }

            @Override
            public void call() throws InterruptedException {
                this.cache.put((Object)"X", (Object)"1");
                this.cache.put((Object)"Y", (Object)"1");
                AsyncStoreTest.eventually(() -> this.store.loadEntry("X") != null);
                this.store.delayAfterModification(3);
                try {
                    this.cache.put((Object)"replicating", (Object)"completes, but replication is stuck on delayed Future");
                    if (!passivation) {
                        this.cache.put((Object)"in-queue", (Object)"completes, but waiting on previous replication to complete before replicating");
                    }
                    Future f = this.this$0.fork(() -> this.cache.remove((Object)"X"));
                    Exceptions.expectException(TimeoutException.class, () -> f.get(100L, TimeUnit.MILLISECONDS));
                }
                finally {
                    this.store.endDelay();
                }
            }
        });
    }

    private static abstract class OneEntryCacheManagerCallable
    extends CacheManagerCallable {
        protected final Cache<String, String> cache;
        protected final DelayStore store;

        private static ConfigurationBuilder config(boolean passivation) {
            ConfigurationBuilder config = new ConfigurationBuilder();
            ((DelayStore.ConfigurationBuilder)config.memory().maxCount(1L).persistence().passivation(passivation).addStore(DelayStore.ConfigurationBuilder.class)).async().modificationQueueSize(2).enable();
            return config;
        }

        OneEntryCacheManagerCallable(boolean passivation) {
            super(TestCacheManagerFactory.createCacheManager(OneEntryCacheManagerCallable.config(passivation)));
            this.cache = this.cm.getCache();
            this.store = (DelayStore)TestingUtil.getFirstStore(this.cache);
        }
    }
}

