/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals.tasks;

import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.internals.ReadOnlyTask;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.tasks.TaskExecutor;
import org.apache.kafka.streams.processor.internals.tasks.TaskManager;
import org.slf4j.Logger;

public class DefaultTaskExecutor
implements TaskExecutor {
    private final Time time;
    private final String name;
    private final TaskManager taskManager;
    private StreamTask currentTask = null;
    private TaskExecutorThread taskExecutorThread = null;
    private CountDownLatch shutdownGate;

    public DefaultTaskExecutor(TaskManager taskManager, String name, Time time) {
        this.time = time;
        this.name = name;
        this.taskManager = taskManager;
    }

    @Override
    public String name() {
        return this.name;
    }

    @Override
    public void start() {
        if (this.taskExecutorThread == null) {
            this.taskExecutorThread = new TaskExecutorThread(this.name);
            this.taskExecutorThread.start();
            this.shutdownGate = new CountDownLatch(1);
        }
    }

    @Override
    public void shutdown(Duration timeout) {
        if (this.taskExecutorThread != null) {
            this.taskExecutorThread.isRunning.set(false);
            this.taskExecutorThread.interrupt();
            try {
                if (!this.shutdownGate.await(timeout.toMillis(), TimeUnit.MILLISECONDS)) {
                    throw new StreamsException("State updater thread did not shutdown within the timeout");
                }
                this.taskExecutorThread = null;
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
    }

    @Override
    public ReadOnlyTask currentTask() {
        return this.currentTask != null ? new ReadOnlyTask(this.currentTask) : null;
    }

    @Override
    public KafkaFuture<StreamTask> unassign() {
        KafkaFutureImpl future = new KafkaFutureImpl();
        if (this.taskExecutorThread != null) {
            this.taskExecutorThread.pauseRequested.set(future);
        } else {
            future.complete(null);
        }
        return future;
    }

    private class TaskExecutorThread
    extends Thread {
        private final AtomicBoolean isRunning;
        private final AtomicReference<KafkaFutureImpl<StreamTask>> pauseRequested;
        private final Logger log;

        public TaskExecutorThread(String name) {
            super(name);
            this.isRunning = new AtomicBoolean(true);
            this.pauseRequested = new AtomicReference<Object>(null);
            String logPrefix = String.format("%s ", name);
            LogContext logContext = new LogContext(logPrefix);
            this.log = logContext.logger(DefaultTaskExecutor.class);
        }

        @Override
        public void run() {
            this.log.info("Task executor thread started");
            try {
                while (this.isRunning.get()) {
                    this.runOnce(DefaultTaskExecutor.this.time.milliseconds());
                }
            }
            finally {
                if (DefaultTaskExecutor.this.currentTask != null) {
                    this.unassignCurrentTask();
                }
                DefaultTaskExecutor.this.shutdownGate.countDown();
                this.log.info("Task executor thread shutdown");
            }
        }

        private void runOnce(long nowMs) {
            KafkaFutureImpl pauseFuture = this.pauseRequested.getAndSet(null);
            if (pauseFuture != null) {
                StreamTask unassignedTask = this.unassignCurrentTask();
                pauseFuture.complete((Object)unassignedTask);
            }
            if (DefaultTaskExecutor.this.currentTask == null) {
                DefaultTaskExecutor.this.currentTask = DefaultTaskExecutor.this.taskManager.assignNextTask(DefaultTaskExecutor.this);
            } else if (DefaultTaskExecutor.this.currentTask.isProcessable(nowMs)) {
                DefaultTaskExecutor.this.currentTask.process(nowMs);
            } else {
                this.unassignCurrentTask();
            }
        }

        private StreamTask unassignCurrentTask() {
            if (DefaultTaskExecutor.this.currentTask == null) {
                throw new IllegalStateException("Does not own any task while being ask to unassign from task manager");
            }
            DefaultTaskExecutor.this.currentTask.prepareCommit();
            DefaultTaskExecutor.this.taskManager.unassignTask(DefaultTaskExecutor.this.currentTask, DefaultTaskExecutor.this);
            StreamTask retTask = DefaultTaskExecutor.this.currentTask;
            DefaultTaskExecutor.this.currentTask = null;
            return retTask;
        }
    }
}

