package cn.ponfee.scheduler.supervisor;

import cn.ponfee.scheduler.common.date.Dates;
import cn.ponfee.scheduler.common.lock.DoInLocked;
import cn.ponfee.scheduler.core.base.AbstractHeartbeatThread;
import cn.ponfee.scheduler.core.enums.ExecuteState;
import cn.ponfee.scheduler.core.model.SchedJob;
import cn.ponfee.scheduler.core.model.SchedTask;
import cn.ponfee.scheduler.core.model.SchedTrack;
import cn.ponfee.scheduler.supervisor.manager.JobManager;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;

/* loaded from: input_file:cn/ponfee/scheduler/supervisor/ScanTrackHeartbeatThread.class */
public class ScanTrackHeartbeatThread extends AbstractHeartbeatThread {
    private static final int QUERY_BATCH_SIZE = 200;
    private static final long EXPIRE_WAITING_MILLISECONDS = 60000;
    private static final long EXPIRE_RUNNING_MILLISECONDS = 120000;
    private final DoInLocked doInLocked;
    private final JobManager jobManager;
    private long nextScanExpireRunningTimeMillis;

    public ScanTrackHeartbeatThread(int i, DoInLocked doInLocked, JobManager jobManager) {
        super(i);
        this.nextScanExpireRunningTimeMillis = 0L;
        this.doInLocked = doInLocked;
        this.jobManager = jobManager;
    }

    protected boolean heartbeat() {
        Boolean bool;
        return (this.jobManager.hasNotFoundWorkers() || (bool = (Boolean) this.doInLocked.apply(() -> {
            Date date = new Date();
            return Boolean.valueOf(processExpireWaiting(date) || processExpireRunning(date));
        })) == null || !bool.booleanValue()) ? false : true;
    }

    private boolean processExpireWaiting(Date date) {
        long time = date.getTime() - EXPIRE_WAITING_MILLISECONDS;
        List<SchedTrack> findExpireWaiting = this.jobManager.findExpireWaiting(time, new Date(time), QUERY_BATCH_SIZE);
        if (CollectionUtils.isEmpty(findExpireWaiting)) {
            return false;
        }
        Iterator<SchedTrack> it = findExpireWaiting.iterator();
        while (it.hasNext()) {
            processExpireWaiting(it.next(), date);
        }
        return findExpireWaiting.size() == QUERY_BATCH_SIZE;
    }

    private void processExpireWaiting(SchedTrack schedTrack, Date date) {
        List<SchedTask> findTasks = this.jobManager.findTasks(schedTrack.getTrackId().longValue());
        if (findTasks.stream().allMatch(schedTask -> {
            return ExecuteState.of(schedTask.getExecuteState()).isTerminal();
        })) {
            if (this.jobManager.renewUpdateTime(schedTrack, date)) {
                this.log.info("All task terminal, terminate the sched track: {}", schedTrack.getTrackId());
                this.jobManager.terminate(schedTrack.getTrackId().longValue());
                return;
            }
            return;
        }
        List<SchedTask> list = (List) findTasks.stream().filter(schedTask2 -> {
            return ExecuteState.WAITING.equals(schedTask2.getExecuteState());
        }).collect(Collectors.toList());
        if (CollectionUtils.isEmpty(list)) {
            this.log.info("Not has waiting tasks: {}", schedTrack);
            return;
        }
        SchedJob job = this.jobManager.getJob(schedTrack.getJobId().longValue());
        if (job == null) {
            this.log.error("Job not exists: {}, {}", schedTrack, findTasks);
            this.jobManager.updateState(ExecuteState.DATA_INCONSISTENT, findTasks, schedTrack);
        } else if (this.jobManager.hasNotFoundWorkers(job.getJobGroup())) {
            this.jobManager.renewUpdateTime(schedTrack, date);
            this.log.warn("Scan track not found available group '{}' workers.", job.getJobGroup());
        } else if (this.jobManager.renewUpdateTime(schedTrack, date)) {
            this.log.info("Redispatch sched track: {} - {}", schedTrack, Dates.format(date));
            this.jobManager.dispatch(job, schedTrack, list);
        }
    }

    private boolean processExpireRunning(Date date) {
        if (date.getTime() < this.nextScanExpireRunningTimeMillis) {
            return false;
        }
        long time = date.getTime() - EXPIRE_RUNNING_MILLISECONDS;
        List<SchedTrack> findExpireRunning = this.jobManager.findExpireRunning(time, new Date(time), QUERY_BATCH_SIZE);
        if (CollectionUtils.isEmpty(findExpireRunning) || findExpireRunning.size() < QUERY_BATCH_SIZE) {
            this.nextScanExpireRunningTimeMillis = date.getTime() + EXPIRE_RUNNING_MILLISECONDS;
        }
        if (CollectionUtils.isEmpty(findExpireRunning)) {
            return false;
        }
        Iterator<SchedTrack> it = findExpireRunning.iterator();
        while (it.hasNext()) {
            processExpireRunning(it.next(), date);
        }
        return findExpireRunning.size() == QUERY_BATCH_SIZE;
    }

    private void processExpireRunning(SchedTrack schedTrack, Date date) {
        List<SchedTask> findByTrackId = this.jobManager.findByTrackId(schedTrack.getTrackId().longValue());
        List<SchedTask> list = (List) findByTrackId.stream().filter(schedTask -> {
            return ExecuteState.WAITING.equals(schedTask.getExecuteState());
        }).collect(Collectors.toList());
        if (CollectionUtils.isNotEmpty(list)) {
            if (this.jobManager.renewUpdateTime(schedTrack, date)) {
                this.log.info("Redispatch sched track: {} - {}", schedTrack, Dates.format(date));
                this.jobManager.dispatch(this.jobManager.getJob(schedTrack.getJobId().longValue()), schedTrack, list);
                return;
            }
            return;
        }
        Stream filter = findByTrackId.stream().filter(schedTask2 -> {
            return ExecuteState.EXECUTING.equals(schedTask2.getExecuteState());
        }).map((v0) -> {
            return v0.getWorker();
        }).filter((v0) -> {
            return StringUtils.isNotBlank(v0);
        });
        JobManager jobManager = this.jobManager;
        jobManager.getClass();
        if (filter.anyMatch(jobManager::isAliveWorker)) {
            this.jobManager.renewUpdateTime(schedTrack, date);
        } else {
            this.log.info("Scan track, all worker dead, terminate the sched track: {}", schedTrack.getTrackId());
            this.jobManager.terminate(schedTrack.getTrackId().longValue());
        }
    }
}
