package cn.ponfee.scheduler.registry.etcd;

import cn.ponfee.scheduler.common.base.exception.Throwables;
import cn.ponfee.scheduler.common.concurrent.NamedThreadFactory;
import cn.ponfee.scheduler.common.util.ObjectUtils;
import cn.ponfee.scheduler.core.base.Server;
import cn.ponfee.scheduler.registry.ConnectionStateListener;
import cn.ponfee.scheduler.registry.ServerRegistry;
import cn.ponfee.scheduler.registry.etcd.configuration.EtcdProperties;
import io.etcd.jetcd.common.exception.ErrorCode;
import io.etcd.jetcd.common.exception.EtcdException;
import io.etcd.jetcd.support.CloseableClient;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;

/* loaded from: input_file:cn/ponfee/scheduler/registry/etcd/EtcdServerRegistry.class */
public abstract class EtcdServerRegistry<R extends Server, D extends Server> extends ServerRegistry<R, D> {
    private static final String PLACEHOLDER_VALUE = "1";
    private final Object keepAliveLock;
    private final long ttl;
    private final EtcdClient client;
    private final ScheduledExecutorService keepAliveCheckScheduler;
    private volatile long leaseId;
    private volatile CloseableClient keepAlive;

    /* JADX INFO: Access modifiers changed from: protected */
    public EtcdServerRegistry(String str, EtcdProperties etcdProperties) {
        super(str, '/');
        this.keepAliveLock = new Object();
        this.ttl = etcdProperties.getSessionTimeoutMs() / 2000;
        CountDownLatch countDownLatch = new CountDownLatch(1);
        try {
            try {
                this.client = new EtcdClient(etcdProperties);
                this.keepAliveCheckScheduler = new ScheduledThreadPoolExecutor(1, (ThreadFactory) new NamedThreadFactory("keep_alive_check_scheduler", true));
                this.client.createPersistentKey(this.registryRootPath, PLACEHOLDER_VALUE);
                createLeaseIdAndKeepAlive();
                this.client.watchChildChanged(this.discoveryRootPath, countDownLatch, this::doRefreshDiscoveryServers);
                long max = Math.max(this.ttl / 4, 1L);
                this.keepAliveCheckScheduler.scheduleWithFixedDelay(this::keepAliveCheck, max, max, TimeUnit.SECONDS);
                this.client.addConnectionStateListener(ConnectionStateListener.builder().onConnected(etcdClient -> {
                    keepAliveRecover();
                }).build());
                doRefreshDiscoveryServers(this.client.getKeyChildren(this.discoveryRootPath));
                countDownLatch.countDown();
            } catch (Exception e) {
                throw new IllegalStateException(e);
            }
        } catch (Throwable th) {
            countDownLatch.countDown();
            throw th;
        }
    }

    public final boolean isConnected() {
        return this.client.isConnected();
    }

    public final void register(R r) {
        if (this.closed) {
            return;
        }
        try {
            this.client.createEphemeralKey(buildRegistryServerId(r), PLACEHOLDER_VALUE, this.leaseId);
            this.registered.add(r);
            this.log.info("Etcd server registered: {} - {}", this.registryRole.name(), r);
        } catch (Throwable th) {
            throw new RuntimeException("Etcd server registered failed: " + r, th);
        }
    }

    public final void deregister(R r) {
        try {
            this.registered.remove(r);
            this.client.deleteKey(buildRegistryServerId(r));
            this.log.info("Etcd server deregister: {} - {}", this.registryRole.name(), r);
        } catch (Exception e) {
            this.log.error("Etcd server deregister error.", e);
        }
    }

    private String buildRegistryServerId(R r) {
        return this.registryRootPath + this.separator + r.serialize();
    }

    public void close() {
        this.closed = true;
        if (!this.close.compareAndSet(false, true)) {
            this.log.warn("Repeat call close method\n{}", ObjectUtils.getStackTrace());
            return;
        }
        ScheduledExecutorService scheduledExecutorService = this.keepAliveCheckScheduler;
        scheduledExecutorService.getClass();
        Throwables.caught(scheduledExecutorService::shutdownNow);
        Throwables.caught(() -> {
            this.keepAliveCheckScheduler.awaitTermination(1L, TimeUnit.SECONDS);
        });
        CloseableClient closeableClient = this.keepAlive;
        closeableClient.getClass();
        Throwables.caught(closeableClient::close);
        this.registered.forEach(this::deregister);
        this.registered.clear();
        Throwables.caught(() -> {
            this.client.revokeLease(this.leaseId);
        });
        EtcdClient etcdClient = this.client;
        etcdClient.getClass();
        Throwables.caught(etcdClient::close);
        Throwables.caught(() -> {
            super.close();
        });
    }

    private synchronized void doRefreshDiscoveryServers(List<String> list) {
        List list2;
        if (CollectionUtils.isEmpty(list)) {
            this.log.error("Not discovered available {} from etcd.", this.discoveryRole.name());
            list2 = Collections.emptyList();
        } else {
            list2 = (List) list.stream().filter((v0) -> {
                return Objects.nonNull(v0);
            }).map(str -> {
                return this.discoveryRole.deserialize(str);
            }).collect(Collectors.toList());
        }
        refreshDiscoveredServers(list2);
    }

    private void keepAliveCheck() {
        synchronized (this.keepAliveLock) {
            if (this.keepAlive == null) {
                this.log.warn("Keep alive is null, will be create.");
                try {
                    createLeaseIdAndKeepAlive();
                } catch (Exception e) {
                    this.log.error("keep alive check occur error.", e);
                }
            }
        }
    }

    private void keepAliveRecover() {
        synchronized (this.keepAliveLock) {
            try {
                if (this.keepAlive != null) {
                    this.keepAlive.close();
                    this.keepAlive = null;
                    this.client.revokeLease(this.leaseId);
                }
                createLeaseIdAndKeepAlive();
            } catch (Exception e) {
                this.log.error("Keep alive retry occur error.", e);
            }
        }
    }

    private void createLeaseIdAndKeepAlive() throws Exception {
        this.leaseId = this.client.createLease(this.ttl);
        this.keepAlive = this.client.keepAliveLease(this.leaseId, th -> {
            if (!(th instanceof EtcdException)) {
                this.log.error("Keep alive on fail.", th);
                return;
            }
            EtcdException etcdException = (EtcdException) th;
            this.log.error("Keep alive on error: " + etcdException.getErrorCode(), th);
            if (etcdException.getErrorCode() != ErrorCode.NOT_FOUND) {
                keepAliveRecover();
            }
        }, () -> {
            this.log.error("Keep alive on completed.");
            keepAliveRecover();
        });
        this.registered.forEach(this::register);
    }
}
