/*
 * Decompiled with CFR 0.152.
 */
package com.orientechnologies.orient.server.hazelcast;

import com.hazelcast.config.QueueConfig;
import com.hazelcast.core.DistributedObject;
import com.hazelcast.core.HazelcastException;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.core.IAtomicLong;
import com.hazelcast.core.IQueue;
import com.hazelcast.monitor.LocalQueueStats;
import com.hazelcast.spi.exception.DistributedObjectDestroyedException;
import com.orientechnologies.orient.core.Orient;
import com.orientechnologies.orient.core.config.OGlobalConfiguration;
import com.orientechnologies.orient.core.record.impl.ODocument;
import com.orientechnologies.orient.server.distributed.ODistributedMessageService;
import com.orientechnologies.orient.server.distributed.ODistributedResponse;
import com.orientechnologies.orient.server.distributed.ODistributedResponseManager;
import com.orientechnologies.orient.server.distributed.ODistributedServerLog;
import com.orientechnologies.orient.server.distributed.ODistributedServerManager;
import com.orientechnologies.orient.server.distributed.task.OAbstractRemoteTask;
import com.orientechnologies.orient.server.hazelcast.OHazelcastDistributedDatabase;
import com.orientechnologies.orient.server.hazelcast.OHazelcastPlugin;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;

public class OHazelcastDistributedMessageService
implements ODistributedMessageService {
    public static final int STATS_MAX_MESSAGES = 20;
    public static final String NODE_QUEUE_PREFIX = "orientdb.node.";
    public static final String NODE_QUEUE_REQUEST_POSTFIX = ".request";
    public static final String NODE_QUEUE_RESPONSE_POSTFIX = ".response";
    protected final OHazelcastPlugin manager;
    protected final IQueue nodeResponseQueue;
    protected final ConcurrentHashMap<Long, ODistributedResponseManager> responsesByRequestIds;
    protected final TimerTask asynchMessageManager;
    protected Map<String, OHazelcastDistributedDatabase> databases = new ConcurrentHashMap<String, OHazelcastDistributedDatabase>();
    protected Thread responseThread;
    protected long[] responseTimeMetrics = new long[10];
    protected int responseTimeMetricIndex = 0;
    protected volatile boolean running = true;

    public OHazelcastDistributedMessageService(final OHazelcastPlugin manager) {
        this.manager = manager;
        this.responsesByRequestIds = new ConcurrentHashMap();
        for (int i = 0; i < this.responseTimeMetrics.length; ++i) {
            this.responseTimeMetrics[i] = -1L;
        }
        final String queueName = OHazelcastDistributedMessageService.getResponseQueueName(manager.getLocalNodeName());
        this.nodeResponseQueue = this.getQueue(queueName);
        if (ODistributedServerLog.isDebugEnabled()) {
            ODistributedServerLog.debug((Object)this, (String)this.getLocalNodeNameAndThread(), null, (ODistributedServerLog.DIRECTION)ODistributedServerLog.DIRECTION.NONE, (String)"listening for incoming responses on queue: %s", (Object[])new Object[]{queueName});
        }
        this.checkForPendingMessages(this.nodeResponseQueue, queueName, false);
        this.asynchMessageManager = new TimerTask(){

            @Override
            public void run() {
                OHazelcastDistributedMessageService.this.purgePendingMessages();
            }
        };
        this.responseThread = new Thread(new Runnable(){

            @Override
            public void run() {
                Thread.currentThread().setName("OrientDB Node Response " + queueName);
                while (OHazelcastDistributedMessageService.this.running) {
                    String senderNode = null;
                    ODistributedResponse message = null;
                    try {
                        message = (ODistributedResponse)OHazelcastDistributedMessageService.this.nodeResponseQueue.take();
                        if (message == null) continue;
                        senderNode = message.getSenderNodeName();
                        long reqId = message.getRequestId();
                        if (reqId < 0L) {
                            OAbstractRemoteTask task = (OAbstractRemoteTask)message.getPayload();
                            task.execute(manager.getServerInstance(), (ODistributedServerManager)manager, null);
                            continue;
                        }
                        long responseTime = OHazelcastDistributedMessageService.this.dispatchResponseToThread(message);
                        if (responseTime <= -1L) continue;
                        OHazelcastDistributedMessageService.this.collectMetric(responseTime);
                    }
                    catch (InterruptedException e) {
                        Thread.interrupted();
                        break;
                    }
                    catch (DistributedObjectDestroyedException e) {
                        Thread.interrupted();
                        break;
                    }
                    catch (HazelcastInstanceNotActiveException e) {
                        Thread.interrupted();
                        break;
                    }
                    catch (HazelcastException e) {
                        if (e.getCause() instanceof InterruptedException) {
                            Thread.interrupted();
                            continue;
                        }
                        ODistributedServerLog.error((Object)this, (String)manager.getLocalNodeName(), (String)senderNode, (ODistributedServerLog.DIRECTION)ODistributedServerLog.DIRECTION.IN, (String)"error on reading distributed response", (Throwable)e, (Object[])new Object[]{message != null ? message.getPayload() : "-"});
                    }
                    catch (Throwable e) {
                        ODistributedServerLog.error((Object)this, (String)manager.getLocalNodeName(), (String)senderNode, (ODistributedServerLog.DIRECTION)ODistributedServerLog.DIRECTION.IN, (String)"error on reading distributed response", (Throwable)e, (Object[])new Object[]{message != null ? message.getPayload() : "-"});
                    }
                }
                ODistributedServerLog.debug((Object)this, (String)manager.getLocalNodeName(), null, (ODistributedServerLog.DIRECTION)ODistributedServerLog.DIRECTION.NONE, (String)"end of reading responses", (Object[])new Object[0]);
            }
        });
        this.responseThread.setDaemon(true);
        this.responseThread.start();
    }

    public static String getRequestQueueName(String iNodeName, String iDatabaseName) {
        StringBuilder buffer = new StringBuilder(128);
        buffer.append(NODE_QUEUE_PREFIX);
        buffer.append(iNodeName);
        if (iDatabaseName != null) {
            buffer.append('.');
            buffer.append(iDatabaseName);
        }
        buffer.append(NODE_QUEUE_REQUEST_POSTFIX);
        return buffer.toString();
    }

    protected static String getResponseQueueName(String iNodeName) {
        StringBuilder buffer = new StringBuilder(128);
        buffer.append(NODE_QUEUE_PREFIX);
        buffer.append(iNodeName);
        buffer.append(NODE_QUEUE_RESPONSE_POSTFIX);
        return buffer.toString();
    }

    public OHazelcastDistributedDatabase getDatabase(String iDatabaseName) {
        return this.databases.get(iDatabaseName);
    }

    public void shutdown() {
        this.running = false;
        if (this.responseThread != null) {
            this.responseThread.interrupt();
            if (!this.nodeResponseQueue.isEmpty()) {
                try {
                    this.responseThread.join();
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
            }
            this.responseThread = null;
        }
        for (Map.Entry<String, OHazelcastDistributedDatabase> m : this.databases.entrySet()) {
            m.getValue().shutdown();
        }
        this.asynchMessageManager.cancel();
        this.responsesByRequestIds.clear();
        if (this.nodeResponseQueue != null) {
            this.nodeResponseQueue.clear();
            this.nodeResponseQueue.destroy();
        }
    }

    public void registerRequest(long id, ODistributedResponseManager currentResponseMgr) {
        this.responsesByRequestIds.put(id, currentResponseMgr);
    }

    public void handleUnreachableNode(String nodeName) {
        Set<String> dbs = this.getDatabases();
        if (dbs != null) {
            for (String dbName : dbs) {
                this.getDatabase(dbName).removeNodeInConfiguration(nodeName, false);
            }
        }
        for (ODistributedResponseManager r : this.responsesByRequestIds.values()) {
            r.notifyWaiters();
        }
    }

    public List<String> getManagedQueueNames() {
        ArrayList<String> queueNames = new ArrayList<String>();
        for (DistributedObject d : this.manager.getHazelcastInstance().getDistributedObjects()) {
            if (!d.getServiceName().equals("hz:impl:queueService")) continue;
            queueNames.add(d.getName());
        }
        return queueNames;
    }

    public IAtomicLong getMessageIdCounter() {
        return this.manager.getHazelcastInstance().getAtomicLong("orientdb.requestId");
    }

    public ODocument getQueueStats(String iQueueName) {
        IQueue queue = this.manager.getHazelcastInstance().getQueue(iQueueName);
        if (queue == null) {
            throw new IllegalArgumentException("Queue '" + iQueueName + "' not found");
        }
        ODocument doc = new ODocument();
        doc.field("name", (Object)queue.getName());
        doc.field("partitionKey", (Object)queue.getPartitionKey());
        doc.field("serviceName", (Object)queue.getServiceName());
        doc.field("size", (Object)queue.size());
        LocalQueueStats stats = queue.getLocalQueueStats();
        doc.field("minAge", (Object)stats.getMinAge());
        doc.field("maxAge", (Object)stats.getMaxAge());
        doc.field("avgAge", (Object)stats.getAvgAge());
        doc.field("backupItemCount", (Object)stats.getBackupItemCount());
        doc.field("emptyPollOperationCount", (Object)stats.getEmptyPollOperationCount());
        doc.field("offerOperationCount", (Object)stats.getOfferOperationCount());
        doc.field("eventOperationCount", (Object)stats.getEventOperationCount());
        doc.field("otherOperationsCount", (Object)stats.getOtherOperationsCount());
        doc.field("pollOperationCount", (Object)stats.getPollOperationCount());
        doc.field("emptyPollOperationCount", (Object)stats.getEmptyPollOperationCount());
        doc.field("ownedItemCount", (Object)stats.getOwnedItemCount());
        doc.field("rejectedOfferOperationCount", (Object)stats.getRejectedOfferOperationCount());
        ArrayList<String> nextMessages = new ArrayList<String>(20);
        for (Object next : queue) {
            if (next != null) {
                nextMessages.add(next.toString());
            }
            if (nextMessages.size() < 20) continue;
            break;
        }
        doc.field("nextMessages", nextMessages);
        return doc;
    }

    public long getAverageResponseTime() {
        long total = 0L;
        int involved = 0;
        for (long metric : this.responseTimeMetrics) {
            if (metric <= -1L) continue;
            total += metric;
            ++involved;
        }
        return total > 0L ? total / (long)involved : 0L;
    }

    public OHazelcastDistributedDatabase registerDatabase(String iDatabaseName) {
        OHazelcastDistributedDatabase db = new OHazelcastDistributedDatabase(this.manager, this, iDatabaseName);
        this.databases.put(iDatabaseName, db);
        return db;
    }

    public OHazelcastDistributedDatabase unregisterDatabase(String iDatabaseName) {
        OHazelcastDistributedDatabase db = this.databases.remove(iDatabaseName);
        if (db != null) {
            db.shutdown();
        }
        return db;
    }

    public Set<String> getDatabases() {
        return this.databases.keySet();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected long dispatchResponseToThread(ODistributedResponse response) {
        long chrono = Orient.instance().getProfiler().startChrono();
        try {
            long reqId = response.getRequestId();
            ODistributedResponseManager asynchMgr = this.responsesByRequestIds.get(reqId);
            if (asynchMgr == null) {
                if (ODistributedServerLog.isDebugEnabled()) {
                    ODistributedServerLog.debug((Object)this, (String)this.manager.getLocalNodeName(), (String)response.getExecutorNodeName(), (ODistributedServerLog.DIRECTION)ODistributedServerLog.DIRECTION.IN, (String)"received response for message %d after the timeout (%dms)", (Object[])new Object[]{reqId, OGlobalConfiguration.DISTRIBUTED_ASYNCH_RESPONSES_TIMEOUT.getValueAsLong()});
                }
            } else if (asynchMgr.collectResponse(response)) {
                this.responsesByRequestIds.remove(reqId);
                long l = System.currentTimeMillis() - asynchMgr.getSentOn();
                return l;
            }
        }
        finally {
            Orient.instance().getProfiler().stopChrono("distributed.node." + response.getExecutorNodeName() + ".latency", "Latency in ms from current node", chrono);
            Orient.instance().getProfiler().updateCounter("distributed.node.msgReceived", "Number of replication messages received in current node", 1L, "distributed.node.msgReceived");
            Orient.instance().getProfiler().updateCounter("distributed.node." + response.getExecutorNodeName() + ".msgReceived", "Number of replication messages received in current node from a node", 1L, "distributed.node.*.msgReceived");
        }
        return -1L;
    }

    protected String getLocalNodeNameAndThread() {
        return this.manager.getLocalNodeName() + ":" + Thread.currentThread().getId();
    }

    protected void purgePendingMessages() {
        long now = System.currentTimeMillis();
        long timeout = OGlobalConfiguration.DISTRIBUTED_ASYNCH_RESPONSES_TIMEOUT.getValueAsLong();
        Iterator<Map.Entry<Long, ODistributedResponseManager>> it = this.responsesByRequestIds.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<Long, ODistributedResponseManager> item = it.next();
            ODistributedResponseManager resp = item.getValue();
            long timeElapsed = now - resp.getSentOn();
            if (timeElapsed <= timeout) continue;
            List missingNodes = resp.getMissingNodes();
            ODistributedServerLog.warn((Object)this, (String)this.manager.getLocalNodeName(), (String)missingNodes.toString(), (ODistributedServerLog.DIRECTION)ODistributedServerLog.DIRECTION.IN, (String)"%d missed response(s) for message %d by nodes %s after %dms when timeout is %dms", (Object[])new Object[]{missingNodes.size(), resp.getMessageId(), missingNodes, timeElapsed, timeout});
            Orient.instance().getProfiler().updateCounter("distributed.db." + resp.getDatabaseName() + ".timeouts", "Number of messages in timeouts", 1L, "distributed.db.*.timeouts");
            Orient.instance().getProfiler().updateCounter("distributed.node.timeouts", "Number of messages in timeouts", 1L, "distributed.node.timeouts");
            resp.timeout();
            it.remove();
        }
    }

    /*
     * Enabled aggressive block sorting
     */
    protected boolean checkForPendingMessages(IQueue iQueue, String iQueueName, boolean iUnqueuePendingMessages) {
        int queueSize = iQueue.size();
        if (queueSize <= 0) {
            ODistributedServerLog.info((Object)this, (String)this.manager.getLocalNodeName(), null, (ODistributedServerLog.DIRECTION)ODistributedServerLog.DIRECTION.NONE, (String)"found no previous messages in queue %s", (Object[])new Object[]{iQueueName});
            return false;
        }
        if (!iUnqueuePendingMessages) {
            ODistributedServerLog.warn((Object)this, (String)this.manager.getLocalNodeName(), null, (ODistributedServerLog.DIRECTION)ODistributedServerLog.DIRECTION.NONE, (String)"found %d messages in queue %s, clearing them...", (Object[])new Object[]{queueSize, iQueueName});
            iQueue.clear();
            return false;
        }
        ODistributedServerLog.warn((Object)this, (String)this.manager.getLocalNodeName(), null, (ODistributedServerLog.DIRECTION)ODistributedServerLog.DIRECTION.NONE, (String)"found %d messages in queue %s, aligning the database...", (Object[])new Object[]{queueSize, iQueueName});
        return true;
    }

    public <T> IQueue<T> getQueue(String iQueueName) {
        return this.manager.getHazelcastInstance().getQueue(iQueueName);
    }

    protected void configureQueue(String iQueueName, int synchReplica, int asynchReplica) {
        QueueConfig queueCfg = this.manager.getHazelcastInstance().getConfig().getQueueConfig(iQueueName);
        queueCfg.setBackupCount(synchReplica);
        queueCfg.setAsyncBackupCount(asynchReplica);
    }

    protected void removeQueue(String iQueueName) {
        IQueue queue = this.manager.getHazelcastInstance().getQueue(iQueueName);
        if (queue != null) {
            ODistributedServerLog.info((Object)this, (String)this.manager.getLocalNodeName(), null, (ODistributedServerLog.DIRECTION)ODistributedServerLog.DIRECTION.NONE, (String)"removing queue '%s' containing %d messages", (Object[])new Object[]{iQueueName, queue.size()});
            queue.clear();
        }
    }

    protected void collectMetric(long iTime) {
        if (this.responseTimeMetricIndex >= this.responseTimeMetrics.length) {
            this.responseTimeMetricIndex = 0;
        }
        this.responseTimeMetrics[this.responseTimeMetricIndex++] = iTime;
    }
}

