/*
 * Decompiled with CFR 0.152.
 */
package cn.ponfee.scheduler.registry.etcd;

import cn.ponfee.scheduler.common.base.exception.Throwables;
import cn.ponfee.scheduler.common.concurrent.NamedThreadFactory;
import cn.ponfee.scheduler.common.util.ObjectUtils;
import cn.ponfee.scheduler.core.base.Server;
import cn.ponfee.scheduler.registry.ConnectionStateListener;
import cn.ponfee.scheduler.registry.ServerRegistry;
import cn.ponfee.scheduler.registry.etcd.EtcdClient;
import cn.ponfee.scheduler.registry.etcd.configuration.EtcdProperties;
import io.etcd.jetcd.common.exception.ErrorCode;
import io.etcd.jetcd.common.exception.EtcdException;
import io.etcd.jetcd.support.CloseableClient;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;

public abstract class EtcdServerRegistry<R extends Server, D extends Server>
extends ServerRegistry<R, D> {
    private static final String PLACEHOLDER_VALUE = "1";
    private final Object keepAliveLock = new Object();
    private final long ttl;
    private final EtcdClient client;
    private final ScheduledExecutorService keepAliveCheckScheduler;
    private volatile long leaseId;
    private volatile CloseableClient keepAlive;

    protected EtcdServerRegistry(String namespace, EtcdProperties properties) {
        super(namespace, '/');
        this.ttl = properties.getSessionTimeoutMs() / 2000;
        CountDownLatch latch = new CountDownLatch(1);
        try {
            this.client = new EtcdClient(properties);
            this.keepAliveCheckScheduler = new ScheduledThreadPoolExecutor(1, (ThreadFactory)new NamedThreadFactory("keep_alive_check_scheduler", true));
            this.client.createPersistentKey(this.registryRootPath, PLACEHOLDER_VALUE);
            this.createLeaseIdAndKeepAlive();
            this.client.watchChildChanged(this.discoveryRootPath, latch, this::doRefreshDiscoveryServers);
            long period = Math.max(this.ttl / 4L, 1L);
            this.keepAliveCheckScheduler.scheduleWithFixedDelay(this::keepAliveCheck, period, period, TimeUnit.SECONDS);
            this.client.addConnectionStateListener((ConnectionStateListener<EtcdClient>)ConnectionStateListener.builder().onConnected(c -> this.keepAliveRecover()).build());
            this.doRefreshDiscoveryServers(this.client.getKeyChildren(this.discoveryRootPath));
        }
        catch (Exception e) {
            throw new IllegalStateException(e);
        }
        finally {
            latch.countDown();
        }
    }

    public final boolean isConnected() {
        return this.client.isConnected();
    }

    public final void register(R server) {
        if (this.closed) {
            return;
        }
        try {
            this.client.createEphemeralKey(this.buildRegistryServerId(server), PLACEHOLDER_VALUE, this.leaseId);
            this.registered.add(server);
            this.log.info("Etcd server registered: {} - {}", (Object)this.registryRole.name(), server);
        }
        catch (Throwable e) {
            throw new RuntimeException("Etcd server registered failed: " + server, e);
        }
    }

    public final void deregister(R server) {
        try {
            this.registered.remove(server);
            this.client.deleteKey(this.buildRegistryServerId(server));
            this.log.info("Etcd server deregister: {} - {}", (Object)this.registryRole.name(), server);
        }
        catch (Exception e) {
            this.log.error("Etcd server deregister error.", (Throwable)e);
        }
    }

    private String buildRegistryServerId(R server) {
        return this.registryRootPath + this.separator + server.serialize();
    }

    public void close() {
        this.closed = true;
        if (!this.close.compareAndSet(false, true)) {
            this.log.warn("Repeat call close method\n{}", (Object)ObjectUtils.getStackTrace());
            return;
        }
        Throwables.caught(this.keepAliveCheckScheduler::shutdownNow);
        Throwables.caught(() -> this.keepAliveCheckScheduler.awaitTermination(1L, TimeUnit.SECONDS));
        Throwables.caught(() -> ((CloseableClient)this.keepAlive).close());
        this.registered.forEach(this::deregister);
        this.registered.clear();
        Throwables.caught(() -> this.client.revokeLease(this.leaseId));
        Throwables.caught(this.client::close);
        Throwables.caught(() -> super.close());
    }

    private synchronized void doRefreshDiscoveryServers(List<String> list) {
        List servers;
        if (CollectionUtils.isEmpty(list)) {
            this.log.error("Not discovered available {} from etcd.", (Object)this.discoveryRole.name());
            servers = Collections.emptyList();
        } else {
            servers = list.stream().filter(Objects::nonNull).map(s -> this.discoveryRole.deserialize(s)).collect(Collectors.toList());
        }
        this.refreshDiscoveredServers(servers);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void keepAliveCheck() {
        Object object = this.keepAliveLock;
        synchronized (object) {
            if (this.keepAlive == null) {
                this.log.warn("Keep alive is null, will be create.");
                try {
                    this.createLeaseIdAndKeepAlive();
                }
                catch (Exception e) {
                    this.log.error("keep alive check occur error.", (Throwable)e);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void keepAliveRecover() {
        Object object = this.keepAliveLock;
        synchronized (object) {
            try {
                if (this.keepAlive != null) {
                    this.keepAlive.close();
                    this.keepAlive = null;
                    this.client.revokeLease(this.leaseId);
                }
                this.createLeaseIdAndKeepAlive();
            }
            catch (Exception e) {
                this.log.error("Keep alive retry occur error.", (Throwable)e);
            }
        }
    }

    private void createLeaseIdAndKeepAlive() throws Exception {
        this.leaseId = this.client.createLease(this.ttl);
        this.keepAlive = this.client.keepAliveLease(this.leaseId, t -> {
            if (t instanceof EtcdException) {
                EtcdException e = (EtcdException)t;
                this.log.error("Keep alive on error: " + e.getErrorCode(), t);
                if (e.getErrorCode() != ErrorCode.NOT_FOUND) {
                    this.keepAliveRecover();
                }
            } else {
                this.log.error("Keep alive on fail.", t);
            }
        }, () -> {
            this.log.error("Keep alive on completed.");
            this.keepAliveRecover();
        });
        this.registered.forEach(this::register);
    }
}

