/*
 * Decompiled with CFR 0.152.
 */
package org.hibernate.search.engine.backend.orchestration.spi;

import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.hibernate.search.engine.backend.orchestration.spi.BatchedWork;
import org.hibernate.search.engine.backend.orchestration.spi.BatchedWorkProcessor;
import org.hibernate.search.engine.backend.orchestration.spi.SingletonTask;
import org.hibernate.search.engine.backend.work.execution.OperationSubmitter;
import org.hibernate.search.engine.common.execution.spi.SimpleScheduledExecutor;
import org.hibernate.search.engine.logging.impl.Log;
import org.hibernate.search.engine.reporting.FailureHandler;
import org.hibernate.search.util.common.AssertionFailure;
import org.hibernate.search.util.common.logging.impl.LoggerFactory;

public final class BatchingExecutor<P extends BatchedWorkProcessor, W extends BatchedWork<? super P>> {
    private static final Log log = (Log)LoggerFactory.make(Log.class, (MethodHandles.Lookup)MethodHandles.lookup());
    private static final BiConsumer<? super BatchedWork<?>, Throwable> ASYNC_FAILURE_REPORTER = BatchedWork::markAsFailed;
    private final String name;
    private final FailureHandler failureHandler;
    private final BlockingQueue<W> workQueue;
    private final BatchWorker<P, ? super W> worker;
    private final Consumer<? super W> blockingRetryProducer;
    private SingletonTask processingTask;

    public BatchingExecutor(String name, P processor, int maxTasksPerBatch, boolean fair, FailureHandler failureHandler, Consumer<? super W> blockingRetryProducer) {
        this.name = name;
        this.failureHandler = failureHandler;
        this.blockingRetryProducer = blockingRetryProducer;
        this.workQueue = new ArrayBlockingQueue<W>(maxTasksPerBatch, fair);
        this.worker = new BatchWorker<P, W>(name, processor, this.workQueue, maxTasksPerBatch);
    }

    public String toString() {
        return "BatchingExecutor[name=" + this.name + ", queue size=" + this.workQueue.size() + ", processing=" + String.valueOf(this.processingTask) + "]";
    }

    public synchronized void start(SimpleScheduledExecutor executorService) {
        log.startingExecutor(this.name);
        this.processingTask = new SingletonTask(this.name, this.worker, new BatchScheduler(executorService), this.failureHandler);
    }

    public synchronized void stop() {
        log.stoppingExecutor(this.name);
        this.workQueue.clear();
        this.processingTask.stop();
        this.processingTask = null;
    }

    @Deprecated
    public void submit(W work) throws InterruptedException {
        this.submit(work, OperationSubmitter.blocking());
    }

    public void submit(W work, OperationSubmitter operationSubmitter) throws InterruptedException {
        if (this.processingTask == null) {
            throw new AssertionFailure("Attempt to submit a work to executor '" + this.name + "', which is stopped.");
        }
        operationSubmitter.submitToQueue(this.workQueue, work, this.blockingRetryProducer, ASYNC_FAILURE_REPORTER);
        this.processingTask.ensureScheduled();
    }

    public CompletableFuture<?> completion() {
        if (this.processingTask == null) {
            return CompletableFuture.completedFuture(null);
        }
        return this.processingTask.completion();
    }

    private static final class BatchWorker<P extends BatchedWorkProcessor, W extends BatchedWork<? super P>>
    implements SingletonTask.Worker {
        private final CompletableFuture<?> completedFuture = CompletableFuture.completedFuture(null);
        private final String name;
        private final P processor;
        private final BlockingQueue<W> workQueue;
        private final int maxTasksPerBatch;
        private final List<W> workBuffer;

        private BatchWorker(String name, P processor, BlockingQueue<W> workQueue, int maxTasksPerBatch) {
            this.name = name;
            this.processor = processor;
            this.workQueue = workQueue;
            this.maxTasksPerBatch = maxTasksPerBatch;
            this.workBuffer = new ArrayList<W>(maxTasksPerBatch);
        }

        @Override
        public CompletableFuture<?> work() {
            this.workBuffer.clear();
            this.workQueue.drainTo(this.workBuffer, this.maxTasksPerBatch);
            if (this.workBuffer.isEmpty()) {
                return this.completedFuture;
            }
            int workCount = this.workBuffer.size();
            boolean traceEnabled = log.isTraceEnabled();
            if (traceEnabled) {
                log.tracef("Processing %d works in executor '%s'", workCount, this.name);
            }
            this.processor.beginBatch();
            for (BatchedWork work : this.workBuffer) {
                try {
                    work.submitTo(this.processor);
                }
                catch (Throwable e) {
                    work.markAsFailed(e);
                }
            }
            CompletableFuture<?> future = this.processor.endBatch();
            if (traceEnabled) {
                future.whenComplete((result, throwable) -> log.tracef("Processed %d works in executor '%s'", workCount, this.name));
            }
            return future;
        }

        @Override
        public void complete() {
            this.processor.complete();
        }
    }

    private static final class BatchScheduler
    implements SingletonTask.Scheduler {
        private final SimpleScheduledExecutor delegate;

        public BatchScheduler(SimpleScheduledExecutor delegate) {
            this.delegate = delegate;
        }

        @Override
        public Future<?> schedule(Runnable runnable) {
            return this.delegate.submit(runnable);
        }
    }
}

