package cn.ponfee.scheduler.registry.etcd;

import cn.ponfee.scheduler.common.base.exception.Throwables;
import cn.ponfee.scheduler.common.concurrent.NamedThreadFactory;
import cn.ponfee.scheduler.common.concurrent.ThreadPoolExecutors;
import cn.ponfee.scheduler.common.util.ClassUtils;
import cn.ponfee.scheduler.common.util.Fields;
import cn.ponfee.scheduler.registry.ConnectionStateListener;
import cn.ponfee.scheduler.registry.etcd.configuration.EtcdProperties;
import com.google.common.collect.ImmutableList;
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.Watch;
import io.etcd.jetcd.kv.GetResponse;
import io.etcd.jetcd.lease.LeaseGrantResponse;
import io.etcd.jetcd.lease.LeaseKeepAliveResponse;
import io.etcd.jetcd.lease.LeaseTimeToLiveResponse;
import io.etcd.jetcd.options.GetOption;
import io.etcd.jetcd.options.LeaseOption;
import io.etcd.jetcd.options.PutOption;
import io.etcd.jetcd.options.WatchOption;
import io.etcd.jetcd.support.CloseableClient;
import io.etcd.jetcd.support.Observers;
import io.etcd.jetcd.watch.WatchEvent;
import io.etcd.jetcd.watch.WatchResponse;
import io.grpc.ConnectivityState;
import io.grpc.ManagedChannel;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cn/ponfee/scheduler/registry/etcd/EtcdClient.class */
public class EtcdClient implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(EtcdClient.class);
    private static final List<ConnectivityState> CONNECTED_STATUS_LIST = ImmutableList.of(ConnectivityState.READY, ConnectivityState.IDLE);
    private static final List<WatchEvent.EventType> CHANGED_EVENT_TYPES = ImmutableList.of(WatchEvent.EventType.PUT, WatchEvent.EventType.DELETE);
    private static final GetOption GET_PREFIX_OPTION = GetOption.newBuilder().isPrefix(true).build();
    private static final GetOption GET_COUNT_OPTION = GetOption.newBuilder().withCountOnly(true).build();
    private static final WatchOption WATCH_PREFIX_OPTION = WatchOption.newBuilder().isPrefix(true).build();
    private final EtcdProperties config;
    private final Client client;
    private final ScheduledExecutorService healthCheckScheduler = new ScheduledThreadPoolExecutor(1, (ThreadFactory) new NamedThreadFactory("health_check_scheduler", true));
    private final Map<String, Pair<Watch.Watcher, ChildChangedListener>> childWatchers = new HashMap();
    private final Set<ConnectionStateListener<EtcdClient>> connectionStateListeners = ConcurrentHashMap.newKeySet();
    private volatile boolean lastConnectState = isConnected();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:cn/ponfee/scheduler/registry/etcd/EtcdClient$ChildChangedListener.class */
    public class ChildChangedListener implements Consumer<WatchResponse>, AutoCloseable {
        private final String parentKey;
        private final CountDownLatch latch;
        private final Consumer<List<String>> processor;
        private final ThreadPoolExecutor asyncExecutor = ThreadPoolExecutors.create(1, 1, 600, 2, ThreadPoolExecutors.DISCARD);

        public ChildChangedListener(String str, CountDownLatch countDownLatch, Consumer<List<String>> consumer) {
            this.parentKey = str;
            this.latch = countDownLatch;
            this.processor = consumer;
        }

        @Override // java.lang.AutoCloseable
        public void close() {
            ThreadPoolExecutor threadPoolExecutor = this.asyncExecutor;
            threadPoolExecutor.getClass();
            Throwables.caught(threadPoolExecutor::shutdownNow);
        }

        @Override // java.util.function.Consumer
        public void accept(WatchResponse watchResponse) {
            CountDownLatch countDownLatch = this.latch;
            countDownLatch.getClass();
            Throwables.caught(countDownLatch::await);
            List events = watchResponse.getEvents();
            if (events.stream().allMatch(watchEvent -> {
                return !EtcdClient.CHANGED_EVENT_TYPES.contains(watchEvent.getEventType());
            }) || events.stream().allMatch(watchEvent2 -> {
                return this.parentKey.equals(watchEvent2.getKeyValue().getKey().toString(StandardCharsets.UTF_8));
            })) {
                return;
            }
            this.asyncExecutor.submit(() -> {
                try {
                    this.processor.accept(EtcdClient.this.getKeyChildren(this.parentKey));
                } catch (Exception e) {
                    EtcdClient.LOG.error("Get key '" + this.parentKey + "' children occur error.", e);
                }
            });
        }
    }

    public EtcdClient(EtcdProperties etcdProperties) {
        this.config = etcdProperties;
        this.client = Client.builder().endpoints(etcdProperties.endpoints()).maxInboundMessageSize(Integer.valueOf(etcdProperties.getMaxInboundMessageSize())).build();
        this.healthCheckScheduler.scheduleWithFixedDelay(() -> {
            boolean isConnected = isConnected();
            if (this.lastConnectState == isConnected) {
                return;
            }
            for (ConnectionStateListener<EtcdClient> connectionStateListener : this.connectionStateListeners) {
                if (isConnected) {
                    try {
                        connectionStateListener.onConnected(this);
                    } catch (Exception e) {
                        LOG.error("Notify connection state changed occur error: " + isConnected, e);
                    }
                } else {
                    connectionStateListener.onDisconnected(this);
                }
            }
            this.lastConnectState = isConnected;
        }, 3L, 3L, TimeUnit.SECONDS);
    }

    public void addConnectionStateListener(ConnectionStateListener<EtcdClient> connectionStateListener) {
        this.connectionStateListeners.add(connectionStateListener);
    }

    public void removeConnectionStateListener(ConnectionStateListener<EtcdClient> connectionStateListener) {
        this.connectionStateListeners.remove(connectionStateListener);
    }

    public long createLease(long j) throws Exception {
        return ((LeaseGrantResponse) this.client.getLeaseClient().grant(j).get(this.config.getRequestTimeoutMs(), TimeUnit.MILLISECONDS)).getID();
    }

    public long getLeaseTTL(long j) throws Exception {
        LeaseTimeToLiveResponse leaseTimeToLiveResponse = (LeaseTimeToLiveResponse) this.client.getLeaseClient().timeToLive(j, LeaseOption.DEFAULT).get(this.config.getRequestTimeoutMs(), TimeUnit.MILLISECONDS);
        if (leaseTimeToLiveResponse == null) {
            return 0L;
        }
        return leaseTimeToLiveResponse.getTTl();
    }

    public boolean keepAliveOnceLease(long j) throws Exception {
        LeaseKeepAliveResponse leaseKeepAliveResponse = (LeaseKeepAliveResponse) this.client.getLeaseClient().keepAliveOnce(j).get(this.config.getRequestTimeoutMs(), TimeUnit.MILLISECONDS);
        return leaseKeepAliveResponse != null && leaseKeepAliveResponse.getTTL() > 0;
    }

    public CloseableClient keepAliveLease(long j, Consumer<Throwable> consumer, Runnable runnable) {
        return keepAliveLease(j, null, consumer, runnable);
    }

    public CloseableClient keepAliveLease(long j, Consumer<LeaseKeepAliveResponse> consumer, Consumer<Throwable> consumer2, Runnable runnable) {
        return this.client.getLeaseClient().keepAlive(j, Observers.builder().onNext(consumer).onError(consumer2).onCompleted(runnable).build());
    }

    public void revokeLease(long j) throws Exception {
        this.client.getLeaseClient().revoke(j).get(this.config.getRequestTimeoutMs(), TimeUnit.MILLISECONDS);
    }

    public void createPersistentKey(String str, String str2) throws Exception {
        this.client.getKVClient().put(utf8(str), utf8(str2)).get(this.config.getRequestTimeoutMs(), TimeUnit.MILLISECONDS);
    }

    public void createEphemeralKey(String str, String str2, long j) throws Exception {
        this.client.getKVClient().put(utf8(str), utf8(str2), PutOption.newBuilder().withLeaseId(j).build()).get(this.config.getRequestTimeoutMs(), TimeUnit.MILLISECONDS);
    }

    public void deleteKey(String str) throws Exception {
        this.client.getKVClient().delete(utf8(str)).get(this.config.getRequestTimeoutMs(), TimeUnit.MILLISECONDS);
    }

    public boolean existsKey(String str) throws Exception {
        return ((GetResponse) this.client.getKVClient().get(utf8(str), GET_COUNT_OPTION).get((long) this.config.getRequestTimeoutMs(), TimeUnit.MILLISECONDS)).getCount() > 0;
    }

    public List<String> getKeyChildren(String str) throws Exception {
        return (List) ((GetResponse) this.client.getKVClient().get(utf8(str), GET_PREFIX_OPTION).get(this.config.getRequestTimeoutMs(), TimeUnit.MILLISECONDS)).getKvs().stream().map(keyValue -> {
            String byteSequence = keyValue.getKey().toString(StandardCharsets.UTF_8);
            int length = str.length() + 1;
            if (byteSequence.length() > length) {
                return byteSequence.substring(length);
            }
            return null;
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).collect(Collectors.toList());
    }

    public synchronized void watchChildChanged(String str, CountDownLatch countDownLatch, Consumer<List<String>> consumer) {
        if (this.childWatchers.containsKey(str)) {
            throw new IllegalStateException("Parent key already watched: " + str);
        }
        ChildChangedListener childChangedListener = new ChildChangedListener(str, countDownLatch, consumer);
        this.childWatchers.put(str, Pair.of(this.client.getWatchClient().watch(utf8(str), WATCH_PREFIX_OPTION, childChangedListener), childChangedListener));
    }

    public synchronized boolean unwatchChildChanged(String str) {
        Pair<Watch.Watcher, ChildChangedListener> remove = this.childWatchers.remove(str);
        if (remove == null) {
            return false;
        }
        ((Watch.Watcher) remove.getLeft()).close();
        ((ChildChangedListener) remove.getRight()).close();
        return true;
    }

    public boolean isConnected() {
        return CONNECTED_STATUS_LIST.contains(((ManagedChannel) ClassUtils.invoke(Fields.get(this.client, "connectionManager"), "getChannel")).getState(false));
    }

    @Override // java.lang.AutoCloseable
    public synchronized void close() {
        new ArrayList(this.childWatchers.keySet()).forEach(this::unwatchChildChanged);
        ScheduledExecutorService scheduledExecutorService = this.healthCheckScheduler;
        scheduledExecutorService.getClass();
        Throwables.caught(scheduledExecutorService::shutdownNow);
        this.client.close();
    }

    private static ByteSequence utf8(String str) {
        return ByteSequence.from(str, StandardCharsets.UTF_8);
    }
}
