/*
 * Decompiled with CFR 0.152.
 */
package io.lettuce.core.masterslave;

import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisException;
import io.lettuce.core.RedisURI;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.masterslave.MasterSlaveChannelWriter;
import io.lettuce.core.masterslave.MasterSlaveConnectionProvider;
import io.lettuce.core.masterslave.MasterSlaveConnector;
import io.lettuce.core.masterslave.MasterSlaveTopologyRefresh;
import io.lettuce.core.masterslave.ResumeAfter;
import io.lettuce.core.masterslave.SentinelTopologyProvider;
import io.lettuce.core.masterslave.SentinelTopologyRefresh;
import io.lettuce.core.masterslave.StatefulRedisMasterSlaveConnection;
import io.lettuce.core.masterslave.StatefulRedisMasterSlaveConnectionImpl;
import io.lettuce.core.models.role.RedisNodeDescription;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import reactor.core.publisher.Mono;

class SentinelConnector<K, V>
implements MasterSlaveConnector<K, V> {
    private static final InternalLogger LOG = InternalLoggerFactory.getInstance(SentinelConnector.class);
    private final RedisClient redisClient;
    private final RedisCodec<K, V> codec;
    private final RedisURI redisURI;

    SentinelConnector(RedisClient redisClient, RedisCodec<K, V> codec, RedisURI redisURI) {
        this.redisClient = redisClient;
        this.codec = codec;
        this.redisURI = redisURI;
    }

    @Override
    public CompletableFuture<StatefulRedisMasterSlaveConnection<K, V>> connectAsync() {
        SentinelTopologyProvider topologyProvider = new SentinelTopologyProvider(this.redisURI.getSentinelMasterId(), this.redisClient, this.redisURI);
        SentinelTopologyRefresh sentinelTopologyRefresh = new SentinelTopologyRefresh(this.redisClient, this.redisURI.getSentinelMasterId(), this.redisURI.getSentinels());
        MasterSlaveTopologyRefresh refresh = new MasterSlaveTopologyRefresh(this.redisClient, topologyProvider);
        MasterSlaveConnectionProvider<K, V> connectionProvider = new MasterSlaveConnectionProvider<K, V>(this.redisClient, this.codec, this.redisURI, Collections.emptyMap());
        Runnable runnable = this.getTopologyRefreshRunnable(refresh, connectionProvider);
        return refresh.getNodes(this.redisURI).flatMap(nodes -> {
            if (nodes.isEmpty()) {
                return Mono.error((Throwable)new RedisException(String.format("Cannot determine topology from %s", this.redisURI)));
            }
            return this.initializeConnection(this.codec, sentinelTopologyRefresh, connectionProvider, runnable, (List<RedisNodeDescription>)nodes);
        }).onErrorMap(ExecutionException.class, Throwable::getCause).toFuture();
    }

    private Mono<StatefulRedisMasterSlaveConnection<K, V>> initializeConnection(RedisCodec<K, V> codec, final SentinelTopologyRefresh sentinelTopologyRefresh, MasterSlaveConnectionProvider<K, V> connectionProvider, Runnable runnable, List<RedisNodeDescription> nodes) {
        connectionProvider.setKnownNodes(nodes);
        MasterSlaveChannelWriter channelWriter = new MasterSlaveChannelWriter(connectionProvider, this.redisClient.getResources()){

            @Override
            public CompletableFuture<Void> closeAsync() {
                return CompletableFuture.allOf(super.closeAsync(), sentinelTopologyRefresh.closeAsync());
            }
        };
        StatefulRedisMasterSlaveConnectionImpl connection = new StatefulRedisMasterSlaveConnectionImpl(channelWriter, codec, this.redisURI.getTimeout());
        connection.setOptions(this.redisClient.getOptions());
        CompletionStage<Void> bind = sentinelTopologyRefresh.bind(runnable);
        return Mono.fromCompletionStage(bind).onErrorResume(t -> ResumeAfter.close(connection).thenError((Throwable)t)).then(Mono.just(connection));
    }

    private Runnable getTopologyRefreshRunnable(MasterSlaveTopologyRefresh refresh, MasterSlaveConnectionProvider<K, V> connectionProvider) {
        return () -> {
            try {
                LOG.debug("Refreshing topology");
                refresh.getNodes(this.redisURI).subscribe(nodes -> {
                    if (nodes.isEmpty()) {
                        LOG.warn("Topology refresh returned no nodes from {}", (Object)this.redisURI);
                    }
                    LOG.debug("New topology: {}", nodes);
                    connectionProvider.setKnownNodes((Collection<RedisNodeDescription>)nodes);
                }, t -> LOG.error("Error during background refresh", t));
            }
            catch (Exception e) {
                LOG.error("Error during background refresh", (Throwable)e);
            }
        };
    }
}

