/*
 * Decompiled with CFR 0.152.
 */
package cn.ponfee.scheduler.registry.etcd;

import cn.ponfee.scheduler.common.base.exception.Throwables;
import cn.ponfee.scheduler.common.concurrent.NamedThreadFactory;
import cn.ponfee.scheduler.common.concurrent.ThreadPoolExecutors;
import cn.ponfee.scheduler.common.util.ClassUtils;
import cn.ponfee.scheduler.common.util.Fields;
import cn.ponfee.scheduler.registry.ConnectionStateListener;
import cn.ponfee.scheduler.registry.etcd.configuration.EtcdProperties;
import com.google.common.collect.ImmutableList;
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.Watch;
import io.etcd.jetcd.kv.GetResponse;
import io.etcd.jetcd.lease.LeaseGrantResponse;
import io.etcd.jetcd.lease.LeaseKeepAliveResponse;
import io.etcd.jetcd.lease.LeaseTimeToLiveResponse;
import io.etcd.jetcd.options.GetOption;
import io.etcd.jetcd.options.LeaseOption;
import io.etcd.jetcd.options.PutOption;
import io.etcd.jetcd.options.WatchOption;
import io.etcd.jetcd.support.CloseableClient;
import io.etcd.jetcd.support.Observers;
import io.etcd.jetcd.watch.WatchEvent;
import io.etcd.jetcd.watch.WatchResponse;
import io.grpc.ConnectivityState;
import io.grpc.ManagedChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EtcdClient
implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(EtcdClient.class);
    private static final List<ConnectivityState> CONNECTED_STATUS_LIST = ImmutableList.of((Object)ConnectivityState.READY, (Object)ConnectivityState.IDLE);
    private static final List<WatchEvent.EventType> CHANGED_EVENT_TYPES = ImmutableList.of((Object)WatchEvent.EventType.PUT, (Object)WatchEvent.EventType.DELETE);
    private static final GetOption GET_PREFIX_OPTION = GetOption.newBuilder().isPrefix(true).build();
    private static final GetOption GET_COUNT_OPTION = GetOption.newBuilder().withCountOnly(true).build();
    private static final WatchOption WATCH_PREFIX_OPTION = WatchOption.newBuilder().isPrefix(true).build();
    private final EtcdProperties config;
    private final Client client;
    private final ScheduledExecutorService healthCheckScheduler;
    private final Map<String, Pair<Watch.Watcher, ChildChangedListener>> childWatchers;
    private final Set<ConnectionStateListener<EtcdClient>> connectionStateListeners;
    private volatile boolean lastConnectState;

    public EtcdClient(EtcdProperties properties) {
        this.config = properties;
        this.client = Client.builder().endpoints(properties.endpoints()).maxInboundMessageSize(Integer.valueOf(properties.getMaxInboundMessageSize())).build();
        this.healthCheckScheduler = new ScheduledThreadPoolExecutor(1, (ThreadFactory)new NamedThreadFactory("health_check_scheduler", true));
        this.childWatchers = new HashMap<String, Pair<Watch.Watcher, ChildChangedListener>>();
        this.connectionStateListeners = ConcurrentHashMap.newKeySet();
        this.lastConnectState = this.isConnected();
        this.healthCheckScheduler.scheduleWithFixedDelay(() -> {
            boolean currConnectState = this.isConnected();
            if (this.lastConnectState == currConnectState) {
                return;
            }
            for (ConnectionStateListener<EtcdClient> listener : this.connectionStateListeners) {
                try {
                    if (currConnectState) {
                        listener.onConnected((Object)this);
                        continue;
                    }
                    listener.onDisconnected((Object)this);
                }
                catch (Exception e) {
                    LOG.error("Notify connection state changed occur error: " + currConnectState, (Throwable)e);
                }
            }
            this.lastConnectState = currConnectState;
        }, 3L, 3L, TimeUnit.SECONDS);
    }

    public void addConnectionStateListener(ConnectionStateListener<EtcdClient> listener) {
        this.connectionStateListeners.add(listener);
    }

    public void removeConnectionStateListener(ConnectionStateListener<EtcdClient> listener) {
        this.connectionStateListeners.remove(listener);
    }

    public long createLease(long ttl) throws Exception {
        return ((LeaseGrantResponse)this.client.getLeaseClient().grant(ttl).get(this.config.getRequestTimeoutMs(), TimeUnit.MILLISECONDS)).getID();
    }

    public long getLeaseTTL(long leaseId) throws Exception {
        LeaseTimeToLiveResponse resp = (LeaseTimeToLiveResponse)this.client.getLeaseClient().timeToLive(leaseId, LeaseOption.DEFAULT).get(this.config.getRequestTimeoutMs(), TimeUnit.MILLISECONDS);
        return resp == null ? 0L : resp.getTTl();
    }

    public boolean keepAliveOnceLease(long leaseId) throws Exception {
        LeaseKeepAliveResponse resp = (LeaseKeepAliveResponse)this.client.getLeaseClient().keepAliveOnce(leaseId).get(this.config.getRequestTimeoutMs(), TimeUnit.MILLISECONDS);
        return resp != null && resp.getTTL() > 0L;
    }

    public CloseableClient keepAliveLease(long leaseId, Consumer<Throwable> error, Runnable completed) {
        return this.keepAliveLease(leaseId, null, error, completed);
    }

    public CloseableClient keepAliveLease(long leaseId, Consumer<LeaseKeepAliveResponse> next, Consumer<Throwable> error, Runnable completed) {
        return this.client.getLeaseClient().keepAlive(leaseId, Observers.builder().onNext(next).onError(error).onCompleted(completed).build());
    }

    public void revokeLease(long leaseId) throws Exception {
        this.client.getLeaseClient().revoke(leaseId).get(this.config.getRequestTimeoutMs(), TimeUnit.MILLISECONDS);
    }

    public void createPersistentKey(String key, String value) throws Exception {
        this.client.getKVClient().put(EtcdClient.utf8(key), EtcdClient.utf8(value)).get(this.config.getRequestTimeoutMs(), TimeUnit.MILLISECONDS);
    }

    public void createEphemeralKey(String key, String value, long leaseId) throws Exception {
        this.client.getKVClient().put(EtcdClient.utf8(key), EtcdClient.utf8(value), PutOption.newBuilder().withLeaseId(leaseId).build()).get(this.config.getRequestTimeoutMs(), TimeUnit.MILLISECONDS);
    }

    public void deleteKey(String key) throws Exception {
        this.client.getKVClient().delete(EtcdClient.utf8(key)).get(this.config.getRequestTimeoutMs(), TimeUnit.MILLISECONDS);
    }

    public boolean existsKey(String key) throws Exception {
        long count = ((GetResponse)this.client.getKVClient().get(EtcdClient.utf8(key), GET_COUNT_OPTION).get(this.config.getRequestTimeoutMs(), TimeUnit.MILLISECONDS)).getCount();
        return count > 0L;
    }

    public List<String> getKeyChildren(String parentKey) throws Exception {
        return ((GetResponse)this.client.getKVClient().get(EtcdClient.utf8(parentKey), GET_PREFIX_OPTION).get(this.config.getRequestTimeoutMs(), TimeUnit.MILLISECONDS)).getKvs().stream().map(kv -> {
            String key = kv.getKey().toString(StandardCharsets.UTF_8);
            int childIndex = parentKey.length() + 1;
            return key.length() > childIndex ? key.substring(childIndex) : null;
        }).filter(Objects::nonNull).collect(Collectors.toList());
    }

    public synchronized void watchChildChanged(String parentKey, CountDownLatch latch, Consumer<List<String>> listener) {
        if (this.childWatchers.containsKey(parentKey)) {
            throw new IllegalStateException("Parent key already watched: " + parentKey);
        }
        ChildChangedListener innerListener = new ChildChangedListener(parentKey, latch, listener);
        Watch.Watcher watcher = this.client.getWatchClient().watch(EtcdClient.utf8(parentKey), WATCH_PREFIX_OPTION, (Consumer)innerListener);
        this.childWatchers.put(parentKey, (Pair<Watch.Watcher, ChildChangedListener>)Pair.of((Object)watcher, (Object)innerListener));
    }

    public synchronized boolean unwatchChildChanged(String parentKey) {
        Pair<Watch.Watcher, ChildChangedListener> pair = this.childWatchers.remove(parentKey);
        if (pair != null) {
            ((Watch.Watcher)pair.getLeft()).close();
            ((ChildChangedListener)pair.getRight()).close();
            return true;
        }
        return false;
    }

    public boolean isConnected() {
        Object connectionManager = Fields.get((Object)this.client, (String)"connectionManager");
        ManagedChannel managedChannel = (ManagedChannel)ClassUtils.invoke((Object)connectionManager, (String)"getChannel");
        ConnectivityState state = managedChannel.getState(false);
        return CONNECTED_STATUS_LIST.contains(state);
    }

    @Override
    public synchronized void close() {
        new ArrayList<String>(this.childWatchers.keySet()).forEach(this::unwatchChildChanged);
        Throwables.caught(this.healthCheckScheduler::shutdownNow);
        this.client.close();
    }

    private static ByteSequence utf8(String key) {
        return ByteSequence.from((String)key, (Charset)StandardCharsets.UTF_8);
    }

    private class ChildChangedListener
    implements Consumer<WatchResponse>,
    AutoCloseable {
        private final String parentKey;
        private final CountDownLatch latch;
        private final Consumer<List<String>> processor;
        private final ThreadPoolExecutor asyncExecutor = ThreadPoolExecutors.create((int)1, (int)1, (long)600L, (int)2, (RejectedExecutionHandler)ThreadPoolExecutors.DISCARD);

        public ChildChangedListener(String parentKey, CountDownLatch latch, Consumer<List<String>> processor) {
            this.parentKey = parentKey;
            this.latch = latch;
            this.processor = processor;
        }

        @Override
        public void close() {
            Throwables.caught(this.asyncExecutor::shutdownNow);
        }

        @Override
        public void accept(WatchResponse response) {
            Throwables.caught(this.latch::await);
            List events = response.getEvents();
            if (events.stream().allMatch(e -> !CHANGED_EVENT_TYPES.contains(e.getEventType()))) {
                return;
            }
            if (events.stream().allMatch(e -> this.parentKey.equals(e.getKeyValue().getKey().toString(StandardCharsets.UTF_8)))) {
                return;
            }
            this.asyncExecutor.submit(() -> {
                try {
                    List<String> children = EtcdClient.this.getKeyChildren(this.parentKey);
                    this.processor.accept(children);
                }
                catch (Exception e) {
                    LOG.error("Get key '" + this.parentKey + "' children occur error.", (Throwable)e);
                }
            });
        }
    }
}

