/*
 * Decompiled with CFR 0.152.
 */
package net.spy.memcached;

import java.io.IOException;
import java.net.ConnectException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.channels.UnresolvedAddressException;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import net.spy.memcached.BroadcastOpFactory;
import net.spy.memcached.ConnectionFactory;
import net.spy.memcached.ConnectionObserver;
import net.spy.memcached.FailureMode;
import net.spy.memcached.MemcachedNode;
import net.spy.memcached.NodeLocator;
import net.spy.memcached.OperationFactory;
import net.spy.memcached.compat.SpyThread;
import net.spy.memcached.compat.log.Logger;
import net.spy.memcached.compat.log.LoggerFactory;
import net.spy.memcached.config.ClusterConfiguration;
import net.spy.memcached.config.ClusterConfigurationObserver;
import net.spy.memcached.config.NodeEndPoint;
import net.spy.memcached.internal.OperationFuture;
import net.spy.memcached.metrics.MetricCollector;
import net.spy.memcached.metrics.MetricType;
import net.spy.memcached.ops.GetOperation;
import net.spy.memcached.ops.KeyedOperation;
import net.spy.memcached.ops.NoopOperation;
import net.spy.memcached.ops.Operation;
import net.spy.memcached.ops.OperationCallback;
import net.spy.memcached.ops.OperationException;
import net.spy.memcached.ops.OperationState;
import net.spy.memcached.ops.OperationStatus;
import net.spy.memcached.ops.TapOperation;
import net.spy.memcached.ops.VBucketAware;
import net.spy.memcached.protocol.binary.BinaryOperationFactory;
import net.spy.memcached.protocol.binary.MultiGetOperationImpl;
import net.spy.memcached.protocol.binary.TapAckOperationImpl;
import net.spy.memcached.util.StringUtils;

public class MemcachedConnection
extends SpyThread
implements ClusterConfigurationObserver {
    private static final int DOUBLE_CHECK_EMPTY = 256;
    private static final int EXCESSIVE_EMPTY = 0x1000000;
    private static final int DEFAULT_WAKEUP_DELAY = 1000;
    private static final int DEFAULT_RETRY_QUEUE_SIZE = -1;
    private static final int MAX_CLONE_COUNT = 100;
    private static final String RECON_QUEUE_METRIC = "[MEM] Reconnecting Nodes (ReconnectQueue)";
    private static final String SHUTD_QUEUE_METRIC = "[MEM] Shutting Down Nodes (NodesToShutdown)";
    private static final String OVERALL_REQUEST_METRIC = "[MEM] Request Rate: All";
    private static final String OVERALL_AVG_BYTES_WRITE_METRIC = "[MEM] Average Bytes written to OS per write";
    private static final String OVERALL_AVG_BYTES_READ_METRIC = "[MEM] Average Bytes read from OS per read";
    private static final String OVERALL_AVG_TIME_ON_WIRE_METRIC = "[MEM] Average Time on wire for operations (\u00b5s)";
    private static final String OVERALL_RESPONSE_METRIC = "[MEM] Response Rate: All (Failure + Success + Retry)";
    private static final String OVERALL_RESPONSE_RETRY_METRIC = "[MEM] Response Rate: Retry";
    private static final String OVERALL_RESPONSE_FAIL_METRIC = "[MEM] Response Rate: Failure";
    private static final String OVERALL_RESPONSE_SUCC_METRIC = "[MEM] Response Rate: Success";
    protected volatile boolean shutDown = false;
    private final boolean shouldOptimize;
    protected Selector selector = null;
    protected NodeLocator locator;
    protected final FailureMode failureMode;
    private final long maxDelay;
    private int emptySelects = 0;
    private final int bufSize;
    private final ConnectionFactory connectionFactory;
    protected final ConcurrentLinkedQueue<MemcachedNode> addedQueue;
    private final SortedMap<Long, MemcachedNode> reconnectQueue;
    protected volatile boolean running = true;
    private final Collection<ConnectionObserver> connObservers = new ConcurrentLinkedQueue<ConnectionObserver>();
    private final OperationFactory opFact;
    private final int timeoutExceptionThreshold;
    private final List<Operation> retryOps;
    protected List<NodeEndPoint> nodesToAdd;
    protected List<MemcachedNode> nodesToDelete;
    protected final ReentrantLock lockForNodeUpdates;
    protected final ReentrantLock conditionLock;
    protected final Condition nodeUpdateCondition;
    protected boolean isInitialClusterConfigApplied;
    protected final ConcurrentLinkedQueue<MemcachedNode> nodesToShutdown;
    protected final List<NodeEndPoint> newEndPoints;
    private final boolean verifyAliveOnConnect;
    private final ExecutorService listenerExecutorService;
    protected final MetricCollector metrics;
    protected final MetricType metricType;
    private final int wakeupDelay;
    private final boolean isTlsMode;
    private final int retryQueueSize;

    public MemcachedConnection(int bufSize, ConnectionFactory f, List<InetSocketAddress> socketAddressList, Collection<ConnectionObserver> obs, FailureMode fm, OperationFactory opfactory) throws IOException {
        this.connObservers.addAll(obs);
        this.reconnectQueue = new TreeMap<Long, MemcachedNode>();
        this.addedQueue = new ConcurrentLinkedQueue();
        this.failureMode = fm;
        this.shouldOptimize = f.shouldOptimize();
        this.maxDelay = TimeUnit.SECONDS.toMillis(f.getMaxReconnectDelay());
        this.opFact = opfactory;
        this.timeoutExceptionThreshold = f.getTimeoutExceptionThreshold();
        this.selector = Selector.open();
        this.retryOps = Collections.synchronizedList(new ArrayList());
        this.lockForNodeUpdates = new ReentrantLock();
        this.conditionLock = new ReentrantLock();
        this.nodeUpdateCondition = this.conditionLock.newCondition();
        this.newEndPoints = new ArrayList<NodeEndPoint>();
        this.nodesToDelete = new ArrayList<MemcachedNode>();
        this.nodesToShutdown = new ConcurrentLinkedQueue();
        this.listenerExecutorService = f.getListenerExecutorService();
        this.bufSize = bufSize;
        this.connectionFactory = f;
        this.isInitialClusterConfigApplied = false;
        this.metrics = f.getMetricCollector();
        this.metricType = f.enableMetrics();
        this.registerMetrics();
        this.isTlsMode = f.getSSLContext() != null;
        String verifyAlive = System.getProperty("net.spy.verifyAliveOnConnect");
        this.verifyAliveOnConnect = verifyAlive != null && verifyAlive.equals("true");
        this.wakeupDelay = Integer.parseInt(System.getProperty("net.spy.wakeupDelay", Integer.toString(1000)));
        this.retryQueueSize = Integer.parseInt(System.getProperty("net.spy.retryQueueSize", Integer.toString(-1)));
        this.getLogger().info("Setting retryQueueSize to " + this.retryQueueSize);
        ArrayList<NodeEndPoint> endPoints = new ArrayList<NodeEndPoint>(socketAddressList.size());
        for (InetSocketAddress sa : socketAddressList) {
            InetAddress addr = sa.getAddress();
            String ipAddress = addr != null ? addr.getHostAddress() : null;
            NodeEndPoint endPoint = new NodeEndPoint(sa.getHostString(), ipAddress, sa.getPort());
            endPoints.add(endPoint);
        }
        List<MemcachedNode> connections = this.createConnections(endPoints);
        this.locator = f.createLocator(connections);
        this.setName("Memcached IO over " + this);
        this.setDaemon(f.isDaemon());
        this.start();
    }

    public void waitForInitialConfigApplied() {
        this.conditionLock.lock();
        try {
            if (!this.isInitialClusterConfigApplied) {
                this.nodeUpdateCondition.await();
            }
        }
        catch (InterruptedException interruptedException) {
        }
        finally {
            this.conditionLock.unlock();
        }
    }

    @Override
    public void waitForConfigChangeApplied() {
        this.conditionLock.lock();
        try {
            this.nodeUpdateCondition.await(50L, TimeUnit.SECONDS);
        }
        catch (InterruptedException interruptedException) {
        }
        finally {
            this.conditionLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void notifyUpdate(ClusterConfiguration clusterConfiguration) {
        if (this.shutDown) {
            this.getLogger().info("Ignoring config updates as the client is shutting down");
            return;
        }
        if (clusterConfiguration == null) {
            return;
        }
        this.lockForNodeUpdates.lock();
        try {
            this.newEndPoints.clear();
            for (NodeEndPoint endPoint : clusterConfiguration.getCacheNodeEndPoints()) {
                this.newEndPoints.add(endPoint);
            }
        }
        finally {
            this.lockForNodeUpdates.unlock();
        }
        Selector s = this.selector.wakeup();
        assert (s == this.selector) : "Wakeup returned the wrong selector.";
    }

    protected void registerMetrics() {
        if (this.metricType.equals((Object)MetricType.DEBUG) || this.metricType.equals((Object)MetricType.PERFORMANCE)) {
            this.metrics.addHistogram(OVERALL_AVG_BYTES_READ_METRIC);
            this.metrics.addHistogram(OVERALL_AVG_BYTES_WRITE_METRIC);
            this.metrics.addHistogram(OVERALL_AVG_TIME_ON_WIRE_METRIC);
            this.metrics.addMeter(OVERALL_RESPONSE_METRIC);
            this.metrics.addMeter(OVERALL_REQUEST_METRIC);
            if (this.metricType.equals((Object)MetricType.DEBUG)) {
                this.metrics.addCounter(RECON_QUEUE_METRIC);
                this.metrics.addCounter(SHUTD_QUEUE_METRIC);
                this.metrics.addMeter(OVERALL_RESPONSE_RETRY_METRIC);
                this.metrics.addMeter(OVERALL_RESPONSE_SUCC_METRIC);
                this.metrics.addMeter(OVERALL_RESPONSE_FAIL_METRIC);
            }
        }
    }

    protected MemcachedNode createConnection(NodeEndPoint endPoint) throws IOException {
        return this.createConnections(Collections.singletonList(endPoint)).get(0);
    }

    protected List<MemcachedNode> createConnections(Collection<NodeEndPoint> endPoints) throws IOException {
        ArrayList<MemcachedNode> connections = new ArrayList<MemcachedNode>(endPoints.size());
        for (NodeEndPoint endPoint : endPoints) {
            InetSocketAddress sa = endPoint.getInetSocketAddress();
            SocketChannel ch = SocketChannel.open();
            ch.configureBlocking(false);
            MemcachedNode qa = this.connectionFactory.createMemcachedNode(sa, ch, this.bufSize);
            qa.setNodeEndPoint(endPoint);
            int ops = 0;
            Socket socket = ch.socket();
            socket.setTcpNoDelay(!this.connectionFactory.useNagleAlgorithm());
            socket.setKeepAlive(this.connectionFactory.getKeepAlive());
            try {
                if (ch.connect(sa)) {
                    this.getLogger().info("Connected to %s immediately", qa);
                    this.connected(qa);
                } else {
                    this.getLogger().info("Added %s to connect queue", qa);
                    ops = 8;
                }
                this.selector.wakeup();
                qa.setSk(ch.register(this.selector, ops, qa));
                assert (ch.isConnected() || qa.getSk().interestOps() == 8) : "Not connected, and not wanting to connect";
            }
            catch (SocketException e) {
                this.getLogger().warn((Object)"Socket error on initial connect", e);
                this.queueReconnect(qa);
            }
            catch (UnresolvedAddressException e) {
                this.getLogger().warn((Object)"Unresolved Address error on initial connect", e);
                this.queueReconnect(qa);
            }
            connections.add(qa);
        }
        return connections;
    }

    private boolean selectorsMakeSense() {
        for (MemcachedNode qa : this.locator.getAll()) {
            int sops;
            if (qa.getSk() == null || !qa.getSk().isValid()) continue;
            if (qa.getChannel().isConnected()) {
                sops = qa.getSk().interestOps();
                int expected = 0;
                if (qa.hasReadOp()) {
                    expected |= 1;
                }
                if (qa.hasWriteOp()) {
                    expected |= 4;
                }
                if (qa.getBytesRemainingToWrite() > 0) {
                    expected |= 4;
                }
                assert (sops == expected) : "Invalid ops:  " + qa + ", expected " + expected + ", got " + sops;
                continue;
            }
            sops = qa.getSk().interestOps();
            assert (sops == 8) : "Not connected, and not watching for connect: " + sops;
        }
        this.getLogger().debug("Checked the selectors.");
        return true;
    }

    public void handleIO() throws IOException {
        if (this.shutDown) {
            this.getLogger().debug("No IO while shut down.");
            return;
        }
        this.handleInputQueue();
        this.getLogger().debug("Done dealing with queue.");
        long delay = this.wakeupDelay;
        if (!this.reconnectQueue.isEmpty()) {
            long now = System.currentTimeMillis();
            long then = this.reconnectQueue.firstKey();
            delay = Math.max(then - now, 1L);
        }
        this.getLogger().debug("Selecting with delay of %sms", delay);
        assert (this.selectorsMakeSense()) : "Selectors don't make sense.";
        int selected = this.selector.select(delay);
        if (this.shutDown) {
            return;
        }
        if (selected == 0 && this.addedQueue.isEmpty()) {
            this.handleWokenUpSelector();
        } else if (this.selector.selectedKeys().isEmpty()) {
            this.handleEmptySelects();
        } else {
            this.getLogger().debug("Selected %d, selected %d keys", selected, this.selector.selectedKeys().size());
            this.emptySelects = 0;
            Iterator<SelectionKey> iterator = this.selector.selectedKeys().iterator();
            while (iterator.hasNext()) {
                SelectionKey sk = iterator.next();
                this.handleIO(sk);
                iterator.remove();
            }
        }
        this.handleOperationalTasks();
    }

    protected void handleWokenUpSelector() {
    }

    private void handleOperationalTasks() throws IOException {
        this.checkPotentiallyTimedOutConnection();
        if (!this.shutDown && !this.reconnectQueue.isEmpty()) {
            this.attemptReconnects();
        }
        if (!this.retryOps.isEmpty()) {
            ArrayList<Operation> operations = new ArrayList<Operation>(this.retryOps);
            this.retryOps.clear();
            this.redistributeOperations(operations);
        }
        this.handleShutdownQueue();
    }

    private void handleEmptySelects() {
        this.getLogger().debug("No selectors ready, interrupted: %b", Thread.interrupted());
        if (++this.emptySelects > 256) {
            for (SelectionKey sk : this.selector.keys()) {
                this.getLogger().debug("%s has %s, interested in %s", sk, sk.readyOps(), sk.interestOps());
                if (sk.readyOps() != 0) {
                    this.getLogger().debug("%s has a ready op, handling IO", sk);
                    this.handleIO(sk);
                    continue;
                }
                this.lostConnection((MemcachedNode)sk.attachment());
            }
            assert (this.emptySelects < 0x1000000) : "Too many empty selects";
        }
    }

    private void handleShutdownQueue() throws IOException {
        for (MemcachedNode qa : this.nodesToShutdown) {
            if (this.addedQueue.contains(qa)) continue;
            this.nodesToShutdown.remove(qa);
            this.metrics.decrementCounter(SHUTD_QUEUE_METRIC);
            Collection<Operation> notCompletedOperations = this.shutdownNode(qa);
            this.redistributeOperations(notCompletedOperations);
        }
    }

    private void checkPotentiallyTimedOutConnection() {
        this.updateNodeList();
        Collection<MemcachedNode> nodes = this.locator.getAll();
        boolean stillCheckingTimeouts = true;
        while (stillCheckingTimeouts) {
            try {
                for (SelectionKey sk : this.selector.keys()) {
                    MemcachedNode mn = (MemcachedNode)sk.attachment();
                    if (!nodes.contains(mn) || mn.getContinuousTimeout() <= this.timeoutExceptionThreshold) continue;
                    this.getLogger().warn("%s exceeded continuous timeout threshold", sk);
                    this.lostConnection(mn);
                }
                stillCheckingTimeouts = false;
            }
            catch (ConcurrentModificationException e) {
                this.getLogger().warn((Object)"Retrying selector keys after ConcurrentModificationException caught", e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateNodeList() {
        ArrayList<NodeEndPoint> endPoints = new ArrayList<NodeEndPoint>();
        try {
            this.lockForNodeUpdates.lock();
            if (this.newEndPoints.size() == 0) {
                return;
            }
            endPoints.addAll(this.newEndPoints);
            this.newEndPoints.clear();
        }
        finally {
            this.lockForNodeUpdates.unlock();
        }
        try {
            ArrayList<MemcachedNode> currentNodes = new ArrayList<MemcachedNode>(this.locator.getAll());
            ArrayList<MemcachedNode> newNodes = new ArrayList<MemcachedNode>();
            for (NodeEndPoint newEndPoint : endPoints) {
                Iterator curentNodesIterator = currentNodes.iterator();
                boolean foundMatch = false;
                while (curentNodesIterator.hasNext()) {
                    MemcachedNode currentNode = (MemcachedNode)curentNodesIterator.next();
                    NodeEndPoint endPointFromCurrentNode = currentNode.getNodeEndPoint();
                    if (!endPointFromCurrentNode.getHostName().equals(newEndPoint.getHostName()) || endPointFromCurrentNode.getPort() != newEndPoint.getPort()) continue;
                    if (endPointFromCurrentNode.getIpAddress() != null && !endPointFromCurrentNode.getIpAddress().equals(newEndPoint.getIpAddress()) || endPointFromCurrentNode.getIpAddress() == null && newEndPoint.getIpAddress() != null) {
                        currentNode.setNodeEndPoint(newEndPoint);
                        this.queueReconnect(currentNode);
                    }
                    newNodes.add(currentNode);
                    curentNodesIterator.remove();
                    foundMatch = true;
                    break;
                }
                if (foundMatch) continue;
                MemcachedNode node = this.createConnection(newEndPoint);
                newNodes.add(node);
            }
            if (currentNodes.size() > 0) {
                ArrayList<Operation> opsToRequeue = new ArrayList<Operation>();
                for (MemcachedNode qa : currentNodes) {
                    Collection<Operation> pendingOps = this.shutdownNode(qa);
                    opsToRequeue.addAll(pendingOps);
                }
                this.redistributeOperations(opsToRequeue);
                opsToRequeue.clear();
            }
            this.locator.updateLocator(newNodes);
        }
        catch (Exception e) {
            this.getLogger().error((Object)"Error encountered while updating the node list. Adding back to endpoint list for reattempt.", e);
            try {
                this.lockForNodeUpdates.lock();
                if (this.newEndPoints.size() == 0) {
                    this.newEndPoints.addAll(endPoints);
                }
            }
            finally {
                this.lockForNodeUpdates.unlock();
            }
        }
        this.conditionLock.lock();
        if (!this.isInitialClusterConfigApplied) {
            this.isInitialClusterConfigApplied = true;
        }
        try {
            this.nodeUpdateCondition.signal();
        }
        finally {
            this.conditionLock.unlock();
        }
    }

    private Collection<Operation> shutdownNode(MemcachedNode node) throws IOException {
        Collection<Operation> notCompletedOperations = node.destroyInputQueue();
        if (node.getChannel() != null) {
            node.getChannel().close();
            node.setSk(null);
            if (node.getBytesRemainingToWrite() > 0) {
                this.getLogger().warn("Shut down with %d bytes remaining to write", node.getBytesRemainingToWrite());
            }
            this.getLogger().debug("Shut down channel %s", node.getChannel());
        }
        return notCompletedOperations;
    }

    private void handleInputQueue() {
        if (!this.addedQueue.isEmpty()) {
            MemcachedNode qaNode;
            this.getLogger().debug("Handling queue");
            HashSet<MemcachedNode> toAdd = new HashSet<MemcachedNode>();
            HashSet<MemcachedNode> todo = new HashSet<MemcachedNode>();
            while ((qaNode = this.addedQueue.poll()) != null) {
                todo.add(qaNode);
            }
            Collection<MemcachedNode> nodeList = this.locator.getAll();
            for (MemcachedNode node : todo) {
                if (!nodeList.contains(node)) continue;
                boolean readyForIO = false;
                if (node.isActive()) {
                    if (node.getCurrentWriteOp() != null) {
                        readyForIO = true;
                        this.getLogger().debug("Handling queued write %s", node);
                    }
                } else {
                    toAdd.add(node);
                }
                node.copyInputQueue();
                if (readyForIO) {
                    try {
                        if (node.getWbuf().hasRemaining()) {
                            this.handleWrites(node);
                        }
                    }
                    catch (IOException e) {
                        this.getLogger().warn((Object)"Exception handling write", e);
                        this.lostConnection(node);
                    }
                }
                node.fixupOps();
            }
            this.addedQueue.addAll(toAdd);
        }
    }

    public boolean addObserver(ConnectionObserver obs) {
        return this.connObservers.add(obs);
    }

    public boolean removeObserver(ConnectionObserver obs) {
        return this.connObservers.remove(obs);
    }

    private void connected(MemcachedNode node) {
        assert (node.getChannel().isConnected()) : "Not connected.";
        int rt = node.getReconnectCount();
        node.connected();
        for (ConnectionObserver observer : this.connObservers) {
            observer.connectionEstablished(node.getSocketAddress(), rt);
        }
    }

    private void lostConnection(MemcachedNode node) {
        this.queueReconnect(node);
        for (ConnectionObserver observer : this.connObservers) {
            observer.connectionLost(node.getSocketAddress());
        }
    }

    boolean belongsToCluster(MemcachedNode node) {
        for (MemcachedNode n : this.locator.getAll()) {
            if (!n.getSocketAddress().equals(node.getSocketAddress())) continue;
            return true;
        }
        return false;
    }

    private void handleIO(SelectionKey sk) {
        MemcachedNode node = (MemcachedNode)sk.attachment();
        Collection<MemcachedNode> nodeList = this.locator.getAll();
        if (!nodeList.contains(node)) {
            return;
        }
        try {
            this.getLogger().debug("Handling IO for:  %s (r=%s, w=%s, c=%s, op=%s)", sk, sk.isReadable(), sk.isWritable(), sk.isConnectable(), sk.attachment());
            if (sk.isConnectable() && this.belongsToCluster(node)) {
                this.getLogger().debug("Connection state changed for %s", sk);
                SocketChannel channel = node.getChannel();
                if (channel.finishConnect()) {
                    this.finishConnect(sk, node);
                } else assert (!channel.isConnected()) : "connected";
            } else {
                this.handleReadsAndWrites(sk, node);
            }
        }
        catch (ClosedChannelException e) {
            if (!this.shutDown) {
                this.getLogger().info("Closed channel and not shutting down. Queueing reconnect on %s", node, e);
                this.lostConnection(node);
            }
        }
        catch (ConnectException e) {
            this.getLogger().info("Reconnecting due to failure to connect to %s", node, e);
            this.queueReconnect(node);
        }
        catch (OperationException e) {
            node.setupForAuth();
            this.getLogger().info("Reconnection due to exception handling a memcached operation on %s. This may be due to an authentication failure.", node, e);
            this.lostConnection(node);
        }
        catch (Exception e) {
            node.setupForAuth();
            this.getLogger().info("Reconnecting due to exception on %s", node, e);
            this.lostConnection(node);
        }
        node.fixupOps();
    }

    private void handleReadsAndWrites(SelectionKey sk, MemcachedNode node) throws IOException {
        if (sk.isValid() && sk.isReadable()) {
            this.handleReads(node);
        }
        if (sk.isValid() && sk.isWritable()) {
            this.handleWrites(node);
        }
    }

    private void finishConnect(SelectionKey sk, MemcachedNode node) throws IOException {
        boolean isHandShakeSuccess;
        if (this.isTlsMode && !(isHandShakeSuccess = node.doTlsHandshake(this.connectionFactory.getOperationTimeout()))) {
            throw new RuntimeException("The TLS connection can't be established due to TLS handshake failure.");
        }
        if (this.verifyAliveOnConnect) {
            final CountDownLatch latch = new CountDownLatch(1);
            final OperationFuture rv = new OperationFuture("noop", latch, 2500L, this.listenerExecutorService);
            NoopOperation testOp = this.opFact.noop(new OperationCallback(){

                @Override
                public void receivedStatus(OperationStatus status) {
                    rv.set(status.isSuccess(), status);
                }

                @Override
                public void complete() {
                    latch.countDown();
                }
            });
            testOp.setHandlingNode(node);
            testOp.initialize();
            this.checkState();
            this.insertOperation(node, testOp);
            node.copyInputQueue();
            boolean done = false;
            if (sk.isValid()) {
                long timeout = TimeUnit.MILLISECONDS.toNanos(this.connectionFactory.getOperationTimeout());
                long stop = System.nanoTime() + timeout;
                while (stop > System.nanoTime()) {
                    this.handleWrites(node);
                    this.handleReads(node);
                    done = latch.getCount() == 0L;
                    if (!done) continue;
                }
            }
            if (!done || testOp.isCancelled() || testOp.hasErrored() || testOp.isTimedOut()) {
                throw new ConnectException("Could not send noop upon connect! This may indicate a running, but not responding memcached instance.");
            }
        }
        this.connected(node);
        this.addedQueue.offer(node);
        if (node.getWbuf().hasRemaining()) {
            this.handleWrites(node);
        }
    }

    private void handleWrites(MemcachedNode node) throws IOException {
        boolean canWriteMore;
        node.fillWriteBuffer(this.shouldOptimize);
        boolean bl = canWriteMore = node.getBytesRemainingToWrite() > 0;
        while (canWriteMore) {
            int wrote = node.writeSome();
            this.metrics.updateHistogram(OVERALL_AVG_BYTES_WRITE_METRIC, wrote);
            node.fillWriteBuffer(this.shouldOptimize);
            canWriteMore = wrote > 0 && node.getBytesRemainingToWrite() > 0;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleReads(MemcachedNode node) throws IOException {
        Operation currentOp = node.getCurrentReadOp();
        if (currentOp instanceof TapAckOperationImpl) {
            node.removeCurrentReadOp();
            return;
        }
        ByteBuffer rbuf = node.getRbuf();
        SocketChannel channel = node.getChannel();
        int read = channel.read(rbuf);
        this.metrics.updateHistogram(OVERALL_AVG_BYTES_READ_METRIC, read);
        if (read < 0) {
            currentOp = this.handleReadsWhenChannelEndOfStream(currentOp, node, rbuf);
        }
        while (read > 0 || this.isTlsMode && rbuf.position() > 0) {
            this.getLogger().debug("Read %d bytes", read);
            rbuf.flip();
            while (rbuf.remaining() > 0) {
                if (currentOp == null) {
                    throw new IllegalStateException("No read operation.");
                }
                long timeOnWire = System.nanoTime() - currentOp.getWriteCompleteTimestamp();
                this.metrics.updateHistogram(OVERALL_AVG_TIME_ON_WIRE_METRIC, (int)(timeOnWire / 1000L));
                this.metrics.markMeter(OVERALL_RESPONSE_METRIC);
                if (!this.isTlsMode) {
                    Operation operation = currentOp;
                    synchronized (operation) {
                        this.readBufferAndLogMetrics(currentOp, rbuf, node);
                    }
                    currentOp = node.getCurrentReadOp();
                    continue;
                }
                ByteBuffer deBuf = node.decryptNextTLSDataRecord(rbuf);
                if (deBuf != null) {
                    Operation operation = currentOp;
                    synchronized (operation) {
                        this.readBufferAndLogMetrics(currentOp, deBuf, node);
                    }
                }
                currentOp = node.getCurrentReadOp();
                break;
            }
            if (this.isTlsMode) {
                rbuf.compact();
            } else {
                rbuf.clear();
            }
            read = channel.read(rbuf);
            node.completedRead();
        }
    }

    private void readBufferAndLogMetrics(Operation currentOp, ByteBuffer rbuf, MemcachedNode node) throws IOException {
        currentOp.readFromBuffer(rbuf);
        if (currentOp.getState() == OperationState.COMPLETE) {
            this.getLogger().debug("Completed read op: %s and giving the next %d bytes", currentOp, rbuf.remaining());
            Operation op = node.removeCurrentReadOp();
            assert (op == currentOp) : "Expected to pop " + currentOp + " got " + op;
            if (op.hasErrored()) {
                this.metrics.markMeter(OVERALL_RESPONSE_FAIL_METRIC);
            } else {
                this.metrics.markMeter(OVERALL_RESPONSE_SUCC_METRIC);
            }
        } else if (currentOp.getState() == OperationState.RETRY) {
            this.handleRetryInformation(currentOp.getErrorMsg());
            this.getLogger().debug("Reschedule read op due to NOT_MY_VBUCKET error: %s ", currentOp);
            ((VBucketAware)((Object)currentOp)).addNotMyVbucketNode(currentOp.getHandlingNode());
            Operation op = node.removeCurrentReadOp();
            assert (op == currentOp) : "Expected to pop " + currentOp + " got " + op;
            this.retryOperation(currentOp);
            this.metrics.markMeter(OVERALL_RESPONSE_RETRY_METRIC);
        }
    }

    private Operation handleReadsWhenChannelEndOfStream(Operation currentOp, MemcachedNode node, ByteBuffer rbuf) throws IOException {
        if (currentOp instanceof TapOperation) {
            currentOp.getCallback().complete();
            ((TapOperation)currentOp).streamClosed(OperationState.COMPLETE);
            this.getLogger().debug("Completed read op: %s and giving the next %d bytes", currentOp, rbuf.remaining());
            Operation op = node.removeCurrentReadOp();
            assert (op == currentOp) : "Expected to pop " + currentOp + " got " + op;
            return node.getCurrentReadOp();
        }
        throw new IOException("Disconnected unexpected, will reconnect.");
    }

    static String dbgBuffer(ByteBuffer b, int size) {
        StringBuilder sb = new StringBuilder();
        byte[] bytes = b.array();
        for (int i = 0; i < size; ++i) {
            char ch = (char)bytes[i];
            if (Character.isWhitespace(ch) || Character.isLetterOrDigit(ch)) {
                sb.append(ch);
                continue;
            }
            sb.append("\\x");
            sb.append(Integer.toHexString(bytes[i] & 0xFF));
        }
        return sb.toString();
    }

    protected void handleRetryInformation(byte[] retryMessage) {
        this.getLogger().debug("Got RETRY message: " + new String(retryMessage) + ", but not handled.");
    }

    protected void queueReconnect(MemcachedNode node) {
        if (this.shutDown) {
            return;
        }
        this.getLogger().warn("Closing, and reopening %s, attempt %d.", node, node.getReconnectCount());
        if (node.getSk() != null) {
            node.getSk().cancel();
            assert (!node.getSk().isValid()) : "Cancelled selection key is valid";
        }
        node.reconnecting();
        try {
            if (node.getChannel() != null && node.getChannel().socket() != null) {
                node.getChannel().socket().close();
            } else {
                this.getLogger().info("The channel or socket was null for %s", node);
            }
        }
        catch (IOException e) {
            this.getLogger().warn((Object)"IOException trying to close a socket", e);
        }
        node.setChannel(null);
        long delay = (long)Math.min((double)this.maxDelay, Math.pow(2.0, node.getReconnectCount()) * 1000.0);
        long reconnectTime = System.currentTimeMillis() + delay;
        while (this.reconnectQueue.containsKey(reconnectTime)) {
            ++reconnectTime;
        }
        this.reconnectQueue.put(reconnectTime, node);
        this.metrics.incrementCounter(RECON_QUEUE_METRIC);
        node.setupResend();
        if (this.failureMode == FailureMode.Redistribute) {
            this.redistributeOperations(node.destroyInputQueue());
        } else if (this.failureMode == FailureMode.Cancel) {
            this.cancelOperations(node.destroyInputQueue());
        }
    }

    private void cancelOperations(Collection<Operation> ops) {
        for (Operation op : ops) {
            op.cancel();
        }
    }

    public void redistributeOperations(Collection<Operation> ops) {
        for (Operation op : ops) {
            this.redistributeOperation(op);
        }
    }

    public void redistributeOperation(Operation op) {
        if (op.isCancelled() || op.isTimedOut()) {
            return;
        }
        if (op.getCloneCount() >= 100) {
            this.getLogger().warn("Cancelling operation " + op + "because it has been retried (cloned) more than " + 100 + "times.");
            op.cancel();
            return;
        }
        if (op.getState() == OperationState.WRITE_QUEUED && op.getHandlingNode() != null) {
            this.addOperation(op.getHandlingNode(), op);
            return;
        }
        if (op instanceof MultiGetOperationImpl) {
            for (String key : ((MultiGetOperationImpl)op).getRetryKeys()) {
                this.addOperation(key, (Operation)this.opFact.get(key, (GetOperation.Callback)op.getCallback()));
            }
        } else if (op instanceof KeyedOperation) {
            KeyedOperation ko = (KeyedOperation)op;
            int added = 0;
            for (Operation newop : this.opFact.clone(ko)) {
                if (newop instanceof KeyedOperation) {
                    KeyedOperation newKeyedOp = (KeyedOperation)newop;
                    for (String k : newKeyedOp.getKeys()) {
                        this.addOperation(k, newop);
                        op.addClone(newop);
                        newop.setCloneCount(op.getCloneCount() + 1);
                    }
                } else {
                    newop.cancel();
                    this.getLogger().warn("Could not redistribute cloned non-keyed operation", newop);
                }
                ++added;
            }
            assert (added > 0) : "Didn't add any new operations when redistributing";
        } else {
            op.cancel();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void attemptReconnects() {
        long now = System.currentTimeMillis();
        IdentityHashMap<MemcachedNode, Boolean> seen = new IdentityHashMap<MemcachedNode, Boolean>();
        ArrayList<MemcachedNode> rereQueue = new ArrayList<MemcachedNode>();
        SocketChannel ch = null;
        Iterator<MemcachedNode> i = this.reconnectQueue.headMap(now).values().iterator();
        while (i.hasNext()) {
            MemcachedNode node = i.next();
            i.remove();
            this.metrics.decrementCounter(RECON_QUEUE_METRIC);
            try {
                if (!this.belongsToCluster(node)) {
                    this.getLogger().debug("Node does not belong to cluster anymore, skipping reconnect: %s", node);
                    continue;
                }
                if (!seen.containsKey(node)) {
                    seen.put(node, Boolean.TRUE);
                    this.getLogger().info("Reconnecting %s", node);
                    ch = SocketChannel.open();
                    ch.configureBlocking(false);
                    Socket socket = ch.socket();
                    socket.setTcpNoDelay(!this.connectionFactory.useNagleAlgorithm());
                    socket.setKeepAlive(this.connectionFactory.getKeepAlive());
                    int ops = 0;
                    SocketAddress sa = node.getNodeEndPoint() != null ? node.getNodeEndPoint().getInetSocketAddress(true) : node.getSocketAddress();
                    if (ch.connect(sa)) {
                        this.connected(node);
                        this.addedQueue.offer(node);
                        this.getLogger().info("Immediately reconnected to %s", node);
                        assert (ch.isConnected());
                    } else {
                        ops = 8;
                    }
                    node.registerChannel(ch, ch.register(this.selector, ops, node));
                    assert (node.getChannel() == ch) : "Channel was lost.";
                    continue;
                }
                this.getLogger().debug("Skipping duplicate reconnect request for %s", node);
            }
            catch (SocketException e) {
                this.getLogger().warn((Object)"Error on reconnect", e);
                rereQueue.add(node);
            }
            catch (Exception e) {
                this.getLogger().error("Exception on reconnect, lost node %s", node, e);
            }
            finally {
                this.potentiallyCloseLeakingChannel(ch, node);
            }
        }
        for (MemcachedNode n : rereQueue) {
            this.queueReconnect(n);
        }
    }

    private void potentiallyCloseLeakingChannel(SocketChannel ch, MemcachedNode node) {
        if (ch != null && !ch.isConnected() && !ch.isConnectionPending()) {
            try {
                ch.close();
            }
            catch (IOException e) {
                this.getLogger().error("Exception closing channel: %s", node, e);
            }
        }
    }

    public NodeLocator getLocator() {
        return this.locator;
    }

    public void enqueueOperation(String key, Operation o) {
        this.checkState();
        StringUtils.validateKey(key, this.opFact instanceof BinaryOperationFactory);
        this.addOperation(key, o);
    }

    public void enqueueOperation(InetSocketAddress addr, Operation o) {
        this.checkState();
        this.addOperation(addr, o);
    }

    protected void addOperation(String key, Operation o) {
        MemcachedNode placeIn = null;
        MemcachedNode primary = this.locator.getPrimary(key);
        if (primary.isActive() || this.failureMode == FailureMode.Retry) {
            placeIn = primary;
        } else if (this.failureMode == FailureMode.Cancel) {
            o.cancel();
        } else {
            Iterator<MemcachedNode> i = this.locator.getSequence(key);
            while (placeIn == null && i.hasNext()) {
                MemcachedNode n = i.next();
                if (!n.isActive()) continue;
                placeIn = n;
            }
            if (placeIn == null) {
                placeIn = primary;
                this.getLogger().warn("Could not redistribute to another node, retrying primary node for %s.", key);
            }
        }
        assert (o.isCancelled() || placeIn != null) : "No node found for key " + key;
        if (placeIn != null) {
            this.addOperation(placeIn, o);
        } else assert (o.isCancelled()) : "No node found for " + key + " (and not immediately cancelled)";
    }

    protected void addOperation(InetSocketAddress addr, Operation o) {
        Collection<MemcachedNode> nodes = this.locator.getAll();
        boolean foundNode = false;
        for (MemcachedNode node : nodes) {
            NodeEndPoint endpoint = node.getNodeEndPoint();
            String hostName = addr.getHostName();
            String ipAddress = null;
            if (addr.getAddress() != null) {
                ipAddress = addr.getAddress().getHostAddress();
            }
            if ((hostName == null || !hostName.equals(endpoint.getHostName())) && (ipAddress == null || !ipAddress.equals(endpoint.getIpAddress()))) continue;
            this.addOperation(node, o);
            foundNode = true;
            break;
        }
        if (!foundNode) {
            throw new IllegalArgumentException("The specified address does not belong to the cluster");
        }
    }

    public void insertOperation(MemcachedNode node, Operation o) {
        o.setHandlingNode(node);
        o.initialize();
        node.insertOp(o);
        this.addedQueue.offer(node);
        this.metrics.markMeter(OVERALL_REQUEST_METRIC);
        Selector s = this.selector.wakeup();
        assert (s == this.selector) : "Wakeup returned the wrong selector.";
        this.getLogger().debug("Added %s to %s", o, node);
    }

    protected void addOperation(MemcachedNode node, Operation o) {
        if (!node.isAuthenticated()) {
            this.retryOperation(o);
            return;
        }
        o.setHandlingNode(node);
        o.initialize();
        node.addOp(o);
        this.addedQueue.offer(node);
        this.metrics.markMeter(OVERALL_REQUEST_METRIC);
        Selector s = this.selector.wakeup();
        assert (s == this.selector) : "Wakeup returned the wrong selector.";
        this.getLogger().debug("Added %s to %s", o, node);
    }

    public void addOperations(Map<MemcachedNode, Operation> ops) {
        for (Map.Entry<MemcachedNode, Operation> me : ops.entrySet()) {
            this.addOperation(me.getKey(), me.getValue());
        }
    }

    public CountDownLatch broadcastOperation(BroadcastOpFactory of) {
        return this.broadcastOperation(of, this.locator.getAll());
    }

    public CountDownLatch broadcastOperation(BroadcastOpFactory of, Collection<MemcachedNode> nodes) {
        CountDownLatch latch = new CountDownLatch(nodes.size());
        for (MemcachedNode node : nodes) {
            this.getLogger().debug("broadcast Operation: node = " + node);
            Operation op = of.newOp(node, latch);
            op.initialize();
            node.addOp(op);
            op.setHandlingNode(node);
            this.addedQueue.offer(node);
            this.metrics.markMeter(OVERALL_REQUEST_METRIC);
        }
        Selector s = this.selector.wakeup();
        assert (s == this.selector) : "Wakeup returned the wrong selector.";
        return latch;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void shutdown() throws IOException {
        this.shutDown = true;
        try {
            Selector s = this.selector.wakeup();
            assert (s == this.selector) : "Wakeup returned the wrong selector.";
            for (MemcachedNode node : this.locator.getAll()) {
                if (node.getChannel() == null) continue;
                node.getChannel().close();
                node.setSk(null);
                if (node.getBytesRemainingToWrite() > 0) {
                    this.getLogger().warn("Shut down with %d bytes remaining to write", node.getBytesRemainingToWrite());
                }
                this.getLogger().debug("Shut down channel %s", node.getChannel());
            }
            this.selector.close();
            this.getLogger().debug("Shut down selector %s", this.selector);
        }
        finally {
            this.running = false;
        }
    }

    @Override
    public String toString() {
        StringBuilder sb = new StringBuilder();
        sb.append("{MemcachedConnection to");
        for (MemcachedNode qa : this.locator.getAll()) {
            sb.append(" ").append(qa.getSocketAddress());
        }
        sb.append("}");
        return sb.toString();
    }

    public String connectionsStatus() {
        StringBuilder connStatus = new StringBuilder();
        connStatus.append("Connection Status {");
        for (MemcachedNode node : this.locator.getAll()) {
            connStatus.append(" ").append(node.getSocketAddress()).append(" active: ").append(node.isActive()).append(", authed: ").append(node.isAuthenticated()).append(MessageFormat.format(", last read: {0} ms ago", node.lastReadDelta()));
        }
        connStatus.append(" }");
        return connStatus.toString();
    }

    public static void opTimedOut(Operation op) {
        MemcachedConnection.setTimeout(op, true);
    }

    public static void opSucceeded(Operation op) {
        MemcachedConnection.setTimeout(op, false);
    }

    private static void setTimeout(Operation op, boolean isTimeout) {
        Logger logger = LoggerFactory.getLogger(MemcachedConnection.class);
        try {
            if (op == null || op.isTimedOutUnsent()) {
                return;
            }
            MemcachedNode node = op.getHandlingNode();
            if (node != null) {
                node.setContinuousTimeout(isTimeout);
            }
        }
        catch (Exception e) {
            logger.error(e.getMessage());
        }
    }

    protected void checkState() {
        if (this.shutDown) {
            throw new IllegalStateException("Shutting down");
        }
        assert (this.isAlive()) : "IO Thread is not running.";
    }

    @Override
    public void run() {
        while (this.running) {
            try {
                this.handleIO();
            }
            catch (IOException e) {
                this.logRunException(e);
            }
            catch (CancelledKeyException e) {
                this.logRunException(e);
            }
            catch (ClosedSelectorException e) {
                this.logRunException(e);
            }
            catch (IllegalStateException e) {
                this.logRunException(e);
            }
            catch (ConcurrentModificationException e) {
                this.logRunException(e);
            }
        }
        this.getLogger().info("Shut down memcached client");
    }

    private void logRunException(Exception e) {
        if (this.shutDown) {
            this.getLogger().debug((Object)"Exception occurred during shutdown", e);
        } else {
            this.getLogger().warn((Object)"Problem handling memcached IO", e);
        }
    }

    public boolean isShutDown() {
        return this.shutDown;
    }

    public void retryOperation(Operation op) {
        if (this.retryQueueSize >= 0 && this.retryOps.size() >= this.retryQueueSize && !op.isCancelled()) {
            op.cancel();
        }
        this.retryOps.add(op);
    }
}

