/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.persistence.sifs;

import io.reactivex.rxjava3.internal.subscriptions.AsyncSubscription;
import io.reactivex.rxjava3.processors.FlowableProcessor;
import io.reactivex.rxjava3.processors.UnicastProcessor;
import io.reactivex.rxjava3.subscribers.TestSubscriber;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayDeque;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.assertj.core.api.Assertions;
import org.infinispan.commons.test.CommonsTestingUtil;
import org.infinispan.commons.test.Exceptions;
import org.infinispan.commons.util.IntSets;
import org.infinispan.commons.util.Util;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.persistence.BaseNonBlockingStoreTest;
import org.infinispan.persistence.sifs.Compactor;
import org.infinispan.persistence.sifs.EntryPosition;
import org.infinispan.persistence.sifs.EntryRecord;
import org.infinispan.persistence.sifs.Index;
import org.infinispan.persistence.sifs.LogAppender;
import org.infinispan.persistence.sifs.NonBlockingSoftIndexFileStore;
import org.infinispan.persistence.sifs.SoftIndexFileStoreTestUtils;
import org.infinispan.persistence.sifs.TemporaryTable;
import org.infinispan.persistence.spi.MarshallableEntry;
import org.infinispan.persistence.spi.NonBlockingStore;
import org.infinispan.test.Mocks;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.CheckPoint;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.reactivestreams.Subscription;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups={"unit"}, testName="persistence.sifs.SoftIndexFileStoreTest")
public class SoftIndexFileStoreTest
extends BaseNonBlockingStoreTest {
    protected String tmpDirectory;

    @BeforeClass(alwaysRun=true)
    protected void setUpTempDir() {
        this.tmpDirectory = CommonsTestingUtil.tmpDirectory(this.getClass());
    }

    @AfterClass(alwaysRun=true)
    protected void clearTempDir() {
        Util.recursiveFileRemove((String)this.tmpDirectory);
    }

    protected NonBlockingStore createStore() {
        return new NonBlockingSoftIndexFileStore();
    }

    @Override
    protected Configuration buildConfig(ConfigurationBuilder configurationBuilder) {
        configurationBuilder.clustering().hash().numSegments(2);
        return configurationBuilder.persistence().addSoftIndexFileStore().dataLocation(Paths.get(this.tmpDirectory, "data").toString()).indexLocation(Paths.get(this.tmpDirectory, "index").toString()).maxFileSize(1000).build();
    }

    public void testOverrideWithExpirableAndCompaction() {
        this.store.write(this.marshalledEntry(this.internalCacheEntry("key", "value1", -1L)));
        this.writeGibberish(-1L, true);
        this.store.write(this.marshalledEntry(this.internalCacheEntry("key", "value2", 1L)));
        this.timeService.advance(2L);
        this.writeGibberish(-1L, true);
        this.store.stop();
        this.startStore(this.store);
        MarshallableEntry entry = this.store.loadEntry("key");
        AssertJUnit.assertNull(entry != null ? String.valueOf(entry.getKey()) + "=" + String.valueOf(entry.getValue()) : null, (Object)entry);
    }

    private void writeGibberish(long lifespan, boolean shouldDelete) {
        for (int i = 0; i < 100; ++i) {
            this.store.write(this.marshalledEntry(this.internalCacheEntry("foo" + i, "bar", lifespan)));
            if (!shouldDelete) continue;
            this.store.delete("foo" + i);
        }
    }

    public void testStopWithCompactorIndexNotComplete() throws InterruptedException, ExecutionException, TimeoutException {
        long lifespan = 10L;
        this.store.write(this.marshalledEntry(this.internalCacheEntry("never", "dies", -1L)));
        this.writeGibberish(lifespan, false);
        this.store.write(this.marshalledEntry(this.internalCacheEntry("foo0", "bar", -1L)));
        this.timeService.advance(lifespan + 1L);
        Compactor compactor = (Compactor)TestingUtil.extractField(this.store.delegate(), "compactor");
        if (compactor.getFiles().isEmpty()) {
            AssertJUnit.fail((String)("Compactor needs to have more than one file, had: " + String.valueOf(compactor.getFileStats())));
        }
        Index index = (Index)TestingUtil.extractField(compactor, "index");
        FlowableProcessor[] processors = (FlowableProcessor[])TestingUtil.extractField(index, "flowableProcessors");
        AssertJUnit.assertEquals((int)2, (int)processors.length);
        FlowableProcessor original = processors[0];
        ArrayDeque queue = new ArrayDeque();
        UnicastProcessor unicastProcessor = UnicastProcessor.create();
        unicastProcessor.serialize().subscribe(queue::add);
        processors[0] = unicastProcessor;
        final CountDownLatch latch = new CountDownLatch(1);
        Compactor.CompactionExpirationSubscriber expSub = new Compactor.CompactionExpirationSubscriber(){

            public void onEntryPosition(EntryPosition entryPosition) {
            }

            public void onEntryEntryRecord(EntryRecord entryRecord) {
            }

            public void onComplete() {
                latch.countDown();
            }

            public void onError(Throwable t) {
            }
        };
        this.fork(() -> compactor.performExpirationCompaction(expSub));
        AssertJUnit.assertFalse((boolean)latch.await(100L, TimeUnit.MILLISECONDS));
        processors[0] = original;
        queue.forEach(arg_0 -> ((FlowableProcessor)original).onNext(arg_0));
        AssertJUnit.assertTrue((boolean)latch.await(10L, TimeUnit.SECONDS));
        Future<CompletionStage> stage = this.fork(() -> this.store.stop());
        CompletionStage innerStage = stage.get(10L, TimeUnit.SECONDS);
        innerStage.toCompletableFuture().get(10L, TimeUnit.SECONDS);
        this.startStore(this.store);
    }

    public void testCompactLogFileNotInTemporaryTable() throws InterruptedException, TimeoutException, ExecutionException {
        Compactor compactor = (Compactor)TestingUtil.extractField(this.store.delegate(), "compactor");
        LogAppender logAppender = (LogAppender)TestingUtil.extractField(this.store.delegate(), "logAppender");
        CheckPoint checkPoint = new CheckPoint();
        checkPoint.triggerForever("after_release");
        TemporaryTable original = Mocks.blockingFieldMock(checkPoint, TemporaryTable.class, logAppender, LogAppender.class, "temporaryTable", (stubber, temporaryTable) -> ((TemporaryTable)stubber.when(temporaryTable)).set(Mockito.anyInt(), ArgumentMatchers.any(), Mockito.anyInt(), Mockito.anyInt()), new Class[0]);
        Future<Void> future = this.fork(() -> this.store.write(this.marshalledEntry(this.internalCacheEntry("foo", "bar", -1L))));
        checkPoint.awaitStrict("before_invocation", 10L, TimeUnit.SECONDS);
        Exceptions.expectException(TimeoutException.class, () -> future.get(10L, TimeUnit.MILLISECONDS));
        TestingUtil.replaceField(original, "temporaryTable", logAppender, LogAppender.class);
        final TestSubscriber testSubscriber = TestSubscriber.create();
        testSubscriber.onSubscribe((Subscription)new AsyncSubscription());
        Compactor.CompactionExpirationSubscriber expSub = new Compactor.CompactionExpirationSubscriber(){

            public void onEntryPosition(EntryPosition entryPosition) {
            }

            public void onEntryEntryRecord(EntryRecord entryRecord) {
            }

            public void onComplete() {
                testSubscriber.onComplete();
            }

            public void onError(Throwable t) {
                testSubscriber.onError(t);
            }
        };
        compactor.performExpirationCompaction(expSub);
        ((TestSubscriber)((TestSubscriber)testSubscriber.awaitDone(10L, TimeUnit.SECONDS)).assertComplete()).assertNoErrors();
        checkPoint.triggerForever("before_release");
        future.get(10L, TimeUnit.SECONDS);
    }

    public void testWriteDuringCompaction() throws Exception {
        Compactor compactor = (Compactor)TestingUtil.extractField(this.store.delegate(), "compactor");
        CheckPoint checkPoint = new CheckPoint();
        TemporaryTable ignore = Mocks.blockingFieldMock(checkPoint, TemporaryTable.class, compactor, Compactor.class, "temporaryTable", (stubber, table) -> ((TemporaryTable)stubber.when(table)).get(Mockito.anyInt(), ArgumentMatchers.any()), new Class[0]);
        this.store.write(this.marshalledEntry(this.internalCacheEntry("foo", "bar", 10L)));
        final AtomicInteger expired = new AtomicInteger(0);
        final TestSubscriber testSubscriber = TestSubscriber.create();
        testSubscriber.onSubscribe((Subscription)new AsyncSubscription());
        Compactor.CompactionExpirationSubscriber sub = new Compactor.CompactionExpirationSubscriber(){

            public void onEntryPosition(EntryPosition entryPosition) {
            }

            public void onEntryEntryRecord(EntryRecord entryRecord) {
                expired.incrementAndGet();
            }

            public void onComplete() {
                testSubscriber.onComplete();
            }

            public void onError(Throwable t) {
                testSubscriber.onError(t);
            }
        };
        this.timeService.advance(11L);
        Future<Void> compaction = this.fork(() -> compactor.performExpirationCompaction(sub));
        checkPoint.awaitStrict("before_invocation", 10L, TimeUnit.SECONDS);
        testSubscriber.assertNotComplete();
        this.store.write(this.marshalledEntry(this.internalCacheEntry("newer", "entry", 10L)));
        checkPoint.trigger("before_release");
        checkPoint.awaitStrict("after_invocation", 10L, TimeUnit.SECONDS);
        checkPoint.triggerForever("after_release");
        SoftIndexFileStoreTest.eventually(compaction::isDone);
        compaction.get(10L, TimeUnit.SECONDS);
        ((TestSubscriber)((TestSubscriber)testSubscriber.awaitDone(10L, TimeUnit.SECONDS)).assertComplete()).assertNoErrors();
        Assertions.assertThat((int)expired.get()).isEqualTo(1);
    }

    public void testRemoveSegmentsCleansUpProperly() throws ExecutionException, InterruptedException, TimeoutException {
        Compactor compactor = (Compactor)TestingUtil.extractField(this.store.delegate(), "compactor");
        ConcurrentMap fileStats = compactor.getFileStats();
        AssertJUnit.assertEquals((int)0, (int)fileStats.size());
        TestingUtil.join(this.store.write(0, this.marshalledEntry(this.internalCacheEntry("foo", "bar", 10L))));
        TestingUtil.join(this.store.removeSegments(IntSets.immutableSet((int)0)));
        AssertJUnit.assertNull(TestingUtil.join(this.store.load(0, "foo")));
        this.verifyStatsHaveNoData(-77L, fileStats);
        TestingUtil.join(this.store.addSegments(IntSets.immutableSet((int)0)));
        AssertJUnit.assertNull(TestingUtil.join(this.store.load(0, "foo")));
        this.verifyStatsHaveNoData(-77L, fileStats);
        this.store.stopAndWait();
        this.startStore(this.store);
        AssertJUnit.assertNull(TestingUtil.join(this.store.load(0, "foo")));
        compactor = (Compactor)TestingUtil.extractField(this.store.delegate(), "compactor");
        fileStats = compactor.getFileStats();
        AssertJUnit.assertTrue((String)("fileStats were: " + String.valueOf(fileStats)), (boolean)fileStats.isEmpty());
        AssertJUnit.assertEquals((long)0L, (long)SoftIndexFileStoreTestUtils.dataDirectorySize(this.tmpDirectory, "mock-cache"));
    }

    private void verifyStatsHaveNoData(long expected, ConcurrentMap<Integer, Compactor.Stats> fileStats) {
        long sizeAfterAddingBack = 0L;
        for (Compactor.Stats stats : fileStats.values()) {
            sizeAfterAddingBack -= (long)stats.getFree();
            if (stats.getTotal() <= 0) continue;
            sizeAfterAddingBack += (long)stats.getTotal();
        }
        AssertJUnit.assertEquals((long)expected, (long)sizeAfterAddingBack);
    }

    public void testFileStatsWriteNotOwnedSegment() throws ExecutionException, InterruptedException, TimeoutException {
        Compactor compactor = (Compactor)TestingUtil.extractField(this.store.delegate(), "compactor");
        ConcurrentMap fileStats = compactor.getFileStats();
        AssertJUnit.assertEquals((int)0, (int)fileStats.size());
        TestingUtil.join(this.store.write(0, this.marshalledEntry(this.internalCacheEntry("foo-0", "bar-0", 10L))));
        AssertJUnit.assertTrue((boolean)fileStats.isEmpty());
        TestingUtil.join(this.store.removeSegments(IntSets.immutableSet((int)1)));
        TestingUtil.join(this.store.write(1, this.marshalledEntry(this.internalCacheEntry("foo-1", "bar-1", 10L))));
        this.verifyStatsHaveNoData(-81L, fileStats);
    }

    public void testFileStatsAfterRemovingSegment() throws ExecutionException, InterruptedException, TimeoutException {
        Compactor compactor = (Compactor)TestingUtil.extractField(this.store.delegate(), "compactor");
        ConcurrentMap fileStats = compactor.getFileStats();
        AssertJUnit.assertEquals((int)0, (int)fileStats.size());
        TestingUtil.join(this.store.write(1, this.marshalledEntry(this.internalCacheEntry("foo-1", "bar-1", 10L))));
        TestingUtil.join(this.store.removeSegments(IntSets.immutableSet((int)1)));
        this.verifyStatsHaveNoData(-81L, fileStats);
    }

    public void testFileStatsAfterRemovingWithRemovedEntry() throws ExecutionException, InterruptedException, TimeoutException {
        Compactor compactor = (Compactor)TestingUtil.extractField(this.store.delegate(), "compactor");
        ConcurrentMap fileStats = compactor.getFileStats();
        AssertJUnit.assertEquals((int)0, (int)fileStats.size());
        TestingUtil.join(this.store.write(1, this.marshalledEntry(this.internalCacheEntry("foo-1", "bar-1", 10L))));
        TestingUtil.join(this.store.delete(1, "foo-1"));
        this.verifyStatsHaveNoData(-81L, fileStats);
        TestingUtil.join(this.store.removeSegments(IntSets.immutableSet((int)1)));
        this.verifyStatsHaveNoData(-123L, fileStats);
    }

    @DataProvider(name="booleans")
    Object[][] booleans() {
        return new Object[][]{{Boolean.TRUE}, {Boolean.FALSE}};
    }

    @Test(dataProvider="booleans")
    public void testWriteDuringStop(boolean deleteIndexes) throws InterruptedException, TimeoutException, ExecutionException {
        LogAppender logAppender = (LogAppender)TestingUtil.extractField(this.store.delegate(), "logAppender");
        CheckPoint putCheckPoint = new CheckPoint();
        putCheckPoint.triggerForever("after_release");
        Mocks.blockingFieldMock(putCheckPoint, FlowableProcessor.class, logAppender, LogAppender.class, "completionProcessor", (stub, processor) -> ((FlowableProcessor)stub.when(processor)).onNext(Mockito.any()), new Class[0]);
        Future<Void> putFuture = this.fork(() -> TestingUtil.join(this.store.write(1, this.marshalledEntry(this.internalCacheEntry("foo-231", "bar-231", -1L)))));
        putCheckPoint.awaitStrict("before_invocation", 10L, TimeUnit.SECONDS);
        Future<Void> stopFuture = this.fork(() -> TestingUtil.join(this.store.stop()));
        Thread.sleep(10L);
        putCheckPoint.triggerForever("before_release");
        putFuture.get(10L, TimeUnit.SECONDS);
        stopFuture.get(10L, TimeUnit.SECONDS);
        if (deleteIndexes) {
            Util.recursiveFileRemove((Path)Paths.get(this.tmpDirectory, "index"));
        }
        this.startStore(this.store);
        AssertJUnit.assertNotNull((String)"bar-231", TestingUtil.join(this.store.load(1, "foo-231")));
    }
}

