/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.executors;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.infinispan.executors.LimitedExecutor;
import org.infinispan.test.AbstractInfinispanTest;
import org.infinispan.util.concurrent.WithinThreadExecutor;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;

@Test(groups={"functional"}, testName="executors.LimitedExecutorTest")
public class LimitedExecutorTest
extends AbstractInfinispanTest {
    public static final String NAME = "Test";
    private final ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), this.getTestThreadFactory("Test"));

    @AfterClass(alwaysRun=true)
    public void stopExecutors() {
        this.executor.shutdownNow();
    }

    public void testBasicWithinThread() throws Exception {
        LimitedExecutor limitedExecutor = new LimitedExecutor(NAME, (Executor)new WithinThreadExecutor(), 1);
        CompletableFuture<String> cf = new CompletableFuture<String>();
        limitedExecutor.execute(() -> cf.complete("value"));
        AssertJUnit.assertEquals((String)"value", (String)cf.getNow("task did not run synchronously"));
    }

    public void testConcurrencyLimit() throws Exception {
        this.eventuallyEquals(0, this.executor::getActiveCount);
        LimitedExecutor limitedExecutor = new LimitedExecutor(NAME, (Executor)this.executor, 1);
        CompletableFuture<String> blocker1 = new CompletableFuture<String>();
        CompletableFuture<String> cf1 = new CompletableFuture<String>();
        limitedExecutor.execute(() -> {
            try {
                cf1.complete((String)blocker1.get(10L, TimeUnit.SECONDS));
            }
            catch (Exception e) {
                cf1.completeExceptionally(e);
            }
        });
        this.verifyTaskIsBlocked(limitedExecutor, blocker1, cf1);
    }

    public void testConcurrencyLimitExecuteAsync() throws Exception {
        this.eventuallyEquals(0, this.executor::getActiveCount);
        LimitedExecutor limitedExecutor = new LimitedExecutor(NAME, (Executor)this.executor, 1);
        CompletableFuture<String> blocker1 = new CompletableFuture<String>();
        CompletableFuture<String> cf1 = new CompletableFuture<String>();
        limitedExecutor.executeAsync(() -> blocker1.thenAccept(cf1::complete));
        this.verifyTaskIsBlocked(limitedExecutor, blocker1, cf1);
    }

    public void testConcurrencyLimitWithinThread() throws Exception {
        LimitedExecutor limitedExecutor = new LimitedExecutor(NAME, (Executor)new WithinThreadExecutor(), 1);
        CompletableFuture<String> blocker1 = new CompletableFuture<String>();
        CompletableFuture blocker2 = new CompletableFuture();
        CompletableFuture<String> cf1 = new CompletableFuture<String>();
        Future<Void> fork1 = this.fork(() -> limitedExecutor.execute(() -> {
            blocker2.complete("blocking");
            try {
                cf1.complete((String)blocker1.get(10L, TimeUnit.SECONDS));
            }
            catch (Exception e) {
                cf1.completeExceptionally(e);
            }
        }));
        AssertJUnit.assertEquals((String)"blocking", (String)((String)blocker2.get(10L, TimeUnit.SECONDS)));
        this.verifyTaskIsBlocked(limitedExecutor, blocker1, cf1);
        fork1.get(10L, TimeUnit.SECONDS);
    }

    public void testConcurrencyLimitExecuteAsyncWithinThread() throws Exception {
        LimitedExecutor limitedExecutor = new LimitedExecutor(NAME, (Executor)new WithinThreadExecutor(), 1);
        CompletableFuture<String> blocker1 = new CompletableFuture<String>();
        CompletableFuture<String> cf1 = new CompletableFuture<String>();
        limitedExecutor.executeAsync(() -> blocker1.thenAccept(cf1::complete));
        this.verifyTaskIsBlocked(limitedExecutor, blocker1, cf1);
    }

    public void testExecuteAsyncSupplierReturnsNull() throws Exception {
        this.eventuallyEquals(0, this.executor::getActiveCount);
        LimitedExecutor limitedExecutor = new LimitedExecutor(NAME, (Executor)this.executor, 1);
        limitedExecutor.executeAsync(() -> null);
        CompletableFuture cf1 = new CompletableFuture();
        limitedExecutor.execute(() -> cf1.complete("a"));
        cf1.get(10L, TimeUnit.SECONDS);
    }

    private void verifyTaskIsBlocked(LimitedExecutor limitedExecutor, CompletableFuture<String> blocker1, CompletableFuture<String> cf1) throws Exception {
        CompletableFuture<String> blocker2 = new CompletableFuture<String>();
        CompletableFuture cf2 = new CompletableFuture();
        Future<Void> fork2 = this.fork(() -> limitedExecutor.execute(() -> {
            try {
                cf2.complete(cf1.getNow("task 2 ran too early") + " " + (String)blocker2.get(10L, TimeUnit.SECONDS));
            }
            catch (Exception e) {
                cf2.completeExceptionally(e);
            }
        }));
        AssertJUnit.assertFalse((boolean)cf1.isDone());
        AssertJUnit.assertFalse((boolean)cf2.isDone());
        blocker1.complete("value1");
        AssertJUnit.assertEquals((String)"value1", (String)cf1.get(10L, TimeUnit.SECONDS));
        AssertJUnit.assertFalse((boolean)cf2.isDone());
        blocker2.complete("value2");
        AssertJUnit.assertEquals((String)"value1 value2", (String)((String)cf2.get(10L, TimeUnit.SECONDS)));
        fork2.get(10L, TimeUnit.SECONDS);
        this.eventuallyEquals(0, this.executor::getActiveCount);
    }
}

