/*
 * Decompiled with CFR 0.152.
 */
package com.orientechnologies.orient.server.hazelcast;

import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.core.IMap;
import com.hazelcast.core.IQueue;
import com.hazelcast.spi.exception.DistributedObjectDestroyedException;
import com.orientechnologies.common.concur.lock.OModificationOperationProhibitedException;
import com.orientechnologies.common.log.OLogManager;
import com.orientechnologies.orient.core.config.OGlobalConfiguration;
import com.orientechnologies.orient.core.db.OScenarioThreadLocal;
import com.orientechnologies.orient.core.db.document.ODatabaseDocumentTx;
import com.orientechnologies.orient.core.metadata.security.OSecurityUser;
import com.orientechnologies.orient.core.metadata.security.OUser;
import com.orientechnologies.orient.core.record.ORecord;
import com.orientechnologies.orient.core.serialization.serializer.record.OSerializationSetThreadLocal;
import com.orientechnologies.orient.server.config.OServerUserConfiguration;
import com.orientechnologies.orient.server.distributed.ODiscardedResponse;
import com.orientechnologies.orient.server.distributed.ODistributedException;
import com.orientechnologies.orient.server.distributed.ODistributedRequest;
import com.orientechnologies.orient.server.distributed.ODistributedServerLog;
import com.orientechnologies.orient.server.distributed.task.OAbstractRemoteTask;
import com.orientechnologies.orient.server.distributed.task.OCreateRecordTask;
import com.orientechnologies.orient.server.distributed.task.ODeleteRecordTask;
import com.orientechnologies.orient.server.distributed.task.OFixTxTask;
import com.orientechnologies.orient.server.distributed.task.OResurrectRecordTask;
import com.orientechnologies.orient.server.distributed.task.OSQLCommandTask;
import com.orientechnologies.orient.server.distributed.task.OTxTask;
import com.orientechnologies.orient.server.distributed.task.OUpdateRecordTask;
import com.orientechnologies.orient.server.hazelcast.OHazelcastDistributedDatabase;
import com.orientechnologies.orient.server.hazelcast.OHazelcastDistributedMessageService;
import com.orientechnologies.orient.server.hazelcast.OHazelcastDistributedResponse;
import com.orientechnologies.orient.server.hazelcast.OHazelcastPlugin;
import com.orientechnologies.orient.server.hazelcast.OHotAlignmentNotPossibleExeption;
import java.io.Serializable;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;

public class ODistributedWorker
extends Thread {
    private static final int LOCAL_QUEUE_MAXSIZE = 1000;
    protected final OHazelcastDistributedDatabase distributed;
    protected final OHazelcastPlugin manager;
    protected final OHazelcastDistributedMessageService msgService;
    protected final String databaseName;
    protected final IQueue requestQueue;
    protected Queue<ODistributedRequest> localQueue = new ArrayBlockingQueue<ODistributedRequest>(1000);
    protected volatile ODatabaseDocumentTx database;
    protected volatile OUser lastUser;
    protected boolean restoringMessages;
    protected volatile boolean running = true;

    public ODistributedWorker(OHazelcastDistributedDatabase iDistributed, IQueue iRequestQueue, String iDatabaseName, int i, boolean iRestoringMessages) {
        this.setName("OrientDB DistributedWorker node=" + iDistributed.getLocalNodeName() + " db=" + iDatabaseName + " id=" + i);
        this.distributed = iDistributed;
        this.requestQueue = iRequestQueue;
        this.databaseName = iDatabaseName;
        this.manager = this.distributed.manager;
        this.msgService = this.distributed.msgService;
        this.restoringMessages = iRestoringMessages;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        int queuedMsg = this.requestQueue.size();
        long lastMessageId = -1L;
        long processedMessages = 0L;
        while (this.running) {
            if (this.restoringMessages && processedMessages >= (long)queuedMsg) {
                ODistributedServerLog.debug((Object)this, (String)this.getLocalNodeName(), null, (ODistributedServerLog.DIRECTION)ODistributedServerLog.DIRECTION.NONE, (String)"executed all pending tasks in queue (%d), set restoringMessages=false and database '%s' as online. Last req=%d", (Object[])new Object[]{queuedMsg, this.databaseName, lastMessageId});
                this.restoringMessages = false;
            }
            String senderNode = null;
            ODistributedRequest message = null;
            try {
                message = this.readRequest();
                if (message != null) {
                    lastMessageId = message.getId();
                    senderNode = message.getSenderNodeName();
                    this.onMessage(message);
                }
            }
            catch (InterruptedException e) {
                Thread.interrupted();
                break;
            }
            catch (DistributedObjectDestroyedException e) {
                Thread.interrupted();
                break;
            }
            catch (HazelcastInstanceNotActiveException e) {
                Thread.interrupted();
                break;
            }
            catch (Throwable e) {
                if (e.getCause() instanceof InterruptedException) {
                    Thread.interrupted();
                } else {
                    ODistributedServerLog.error((Object)this, (String)this.manager.getLocalNodeName(), (String)senderNode, (ODistributedServerLog.DIRECTION)ODistributedServerLog.DIRECTION.IN, (String)"error on executing distributed request %d: %s", (Throwable)e, (Object[])new Object[]{message != null ? message.getId() : -1L, message != null ? message.getTask() : "-"});
                }
            }
            finally {
                if (OSerializationSetThreadLocal.INSTANCE != null) {
                    OSerializationSetThreadLocal.clear();
                }
            }
            ++processedMessages;
        }
        ODistributedServerLog.debug((Object)this, (String)this.manager.getLocalNodeName(), null, (ODistributedServerLog.DIRECTION)ODistributedServerLog.DIRECTION.NONE, (String)"end of reading requests for database %s", (Object[])new Object[]{this.databaseName});
    }

    public void initDatabaseInstance() {
        if (this.database == null) {
            OServerUserConfiguration replicatorUser = this.manager.getServerInstance().getUser("replicator");
            this.database = (ODatabaseDocumentTx)this.manager.getServerInstance().openDatabase("document", this.databaseName, replicatorUser.name, replicatorUser.password);
        } else if (this.database.isClosed()) {
            OServerUserConfiguration replicatorUser = this.manager.getServerInstance().getUser("replicator");
            this.database.open(replicatorUser.name, replicatorUser.password);
        }
    }

    public void shutdown() {
        int pendingMsgs = this.localQueue.size();
        if (pendingMsgs > 0) {
            ODistributedServerLog.warn((Object)this, (String)this.getLocalNodeName(), null, (ODistributedServerLog.DIRECTION)ODistributedServerLog.DIRECTION.NONE, (String)"Received shutdown signal, waiting for distributed worker queue is empty (pending msgs=%d)...", (Object[])new Object[]{pendingMsgs});
        }
        try {
            this.running = false;
            this.interrupt();
            if (pendingMsgs > 0) {
                this.join();
            }
            ODistributedServerLog.warn((Object)this, (String)this.getLocalNodeName(), null, (ODistributedServerLog.DIRECTION)ODistributedServerLog.DIRECTION.NONE, (String)"Shutdown distributed worker completed", (Object[])new Object[0]);
            this.localQueue.clear();
            if (this.database != null) {
                this.database.activateOnCurrentThread();
                this.database.close();
            }
        }
        catch (Exception e) {
            ODistributedServerLog.warn((Object)this, (String)this.getLocalNodeName(), null, (ODistributedServerLog.DIRECTION)ODistributedServerLog.DIRECTION.NONE, (String)"Error on shutting down distributed worker", (Throwable)e, (Object[])new Object[0]);
        }
    }

    public ODatabaseDocumentTx getDatabase() {
        return this.database;
    }

    protected ODistributedRequest readRequest() throws InterruptedException {
        ODistributedRequest req = this.nextMessage();
        while (this.distributed.waitForMessageId.get() > -1L) {
            if (req == null) continue;
            if (req.getId() >= this.distributed.waitForMessageId.get()) {
                ODistributedServerLog.debug((Object)this, (String)this.manager.getLocalNodeName(), (String)req.getSenderNodeName(), (ODistributedServerLog.DIRECTION)ODistributedServerLog.DIRECTION.IN, (String)"reached waited request %d on request=%s sourceNode=%s", (Object[])new Object[]{this.distributed.waitForMessageId.get(), req, req.getSenderNodeName()});
                this.distributed.waitForMessageId.set(-1L);
                break;
            }
            ODistributedServerLog.debug((Object)this, (String)this.manager.getLocalNodeName(), (String)req.getSenderNodeName(), (ODistributedServerLog.DIRECTION)ODistributedServerLog.DIRECTION.IN, (String)"discarded request %d because waiting for %d request=%s sourceNode=%s", (Object[])new Object[]{req.getId(), this.distributed.waitForMessageId.get(), req, req.getSenderNodeName()});
            this.sendResponseBack(req, req.getTask(), (Serializable)new ODiscardedResponse());
            req = this.nextMessage();
        }
        if (ODistributedServerLog.isDebugEnabled()) {
            ODistributedServerLog.debug((Object)this, (String)this.manager.getLocalNodeName(), (String)req.getSenderNodeName(), (ODistributedServerLog.DIRECTION)ODistributedServerLog.DIRECTION.IN, (String)"processing request=%s sourceNode=%s", (Object[])new Object[]{req, req.getSenderNodeName()});
        }
        return req;
    }

    protected ODistributedRequest nextMessage() throws InterruptedException {
        while (this.localQueue.isEmpty()) {
            this.localQueue.offer((ODistributedRequest)this.requestQueue.take());
            this.requestQueue.drainTo(this.localQueue, 999);
        }
        return this.localQueue.poll();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void onMessage(ODistributedRequest iRequest) {
        OScenarioThreadLocal.INSTANCE.setRunMode(OScenarioThreadLocal.RUN_MODE.RUNNING_DISTRIBUTED);
        try {
            Serializable responsePayload;
            OAbstractRemoteTask task = iRequest.getTask();
            if (ODistributedServerLog.isDebugEnabled()) {
                ODistributedServerLog.debug((Object)this, (String)this.manager.getLocalNodeName(), (String)iRequest.getSenderNodeName(), (ODistributedServerLog.DIRECTION)ODistributedServerLog.DIRECTION.OUT, (String)"received request: %s", (Object[])new Object[]{iRequest});
            }
            OSecurityUser origin = null;
            try {
                int retry = 1;
                while (true) {
                    if (task.isRequiredOpenDatabase()) {
                        this.initDatabaseInstance();
                    }
                    this.database.activateOnCurrentThread();
                    task.setNodeSource(iRequest.getSenderNodeName());
                    if (this.database != null) {
                        origin = this.database.getUser();
                        try {
                            if (this.lastUser == null || !this.lastUser.getIdentity().equals(iRequest.getUserRID())) {
                                this.lastUser = this.database.getMetadata().getSecurity().getUser(iRequest.getUserRID());
                            }
                            this.database.setUser((OSecurityUser)this.lastUser);
                        }
                        catch (Throwable ex) {
                            OLogManager.instance().error((Object)this, "Failed on user switching " + ex.getMessage(), new Object[0]);
                        }
                    }
                    if (!((responsePayload = this.manager.executeOnLocalNode(iRequest, this.database)) instanceof OModificationOperationProhibitedException)) {
                        if (retry > 1) {
                            ODistributedServerLog.info((Object)this, (String)this.manager.getLocalNodeName(), (String)iRequest.getSenderNodeName(), (ODistributedServerLog.DIRECTION)ODistributedServerLog.DIRECTION.OUT, (String)"Request %s succeed after retry=%d", (Object[])new Object[]{iRequest, retry});
                        }
                        break;
                    }
                    try {
                        ODistributedServerLog.info((Object)this, (String)this.manager.getLocalNodeName(), (String)iRequest.getSenderNodeName(), (ODistributedServerLog.DIRECTION)ODistributedServerLog.DIRECTION.OUT, (String)"Database is frozen, waiting and retrying. Request %s (retry=%d)", (Object[])new Object[]{iRequest, retry});
                        Thread.sleep(1000L);
                    }
                    catch (InterruptedException interruptedException) {}
                    ++retry;
                }
            }
            finally {
                if (this.database != null) {
                    this.database.activateOnCurrentThread();
                    if (!this.database.isClosed()) {
                        this.database.rollback();
                        this.database.getLocalCache().clear();
                        this.database.setUser(origin);
                    }
                }
            }
            if (this.running) {
                this.sendResponseBack(iRequest, task, responsePayload);
            }
        }
        finally {
            OScenarioThreadLocal.INSTANCE.setRunMode(OScenarioThreadLocal.RUN_MODE.DEFAULT);
        }
    }

    protected String getPendingRequestMapName() {
        StringBuilder buffer = new StringBuilder(128);
        buffer.append("orientdb.node.");
        buffer.append(this.manager.getLocalNodeName());
        buffer.append(".pending");
        return buffer.toString();
    }

    protected String getLocalNodeName() {
        return this.manager.getLocalNodeName();
    }

    protected IMap<String, Object> restoreMessagesBeforeFailure(boolean iRestoreMessages) {
        ODistributedRequest lastPendingRequest;
        IMap lastPendingRequestMap = this.manager.getHazelcastInstance().getMap(this.getPendingRequestMapName());
        if (iRestoreMessages && (lastPendingRequest = (ODistributedRequest)lastPendingRequestMap.remove((Object)this.databaseName)) != null) {
            ODistributedServerLog.warn((Object)this, (String)this.getLocalNodeName(), null, (ODistributedServerLog.DIRECTION)ODistributedServerLog.DIRECTION.NONE, (String)"restore last replication message before the crash for database '%s': %s...", (Object[])new Object[]{this.databaseName, lastPendingRequest});
            try {
                this.initDatabaseInstance();
                boolean executeLastPendingRequest = this.checkIfOperationHasBeenExecuted(lastPendingRequest, lastPendingRequest.getTask());
                if (executeLastPendingRequest) {
                    this.onMessage(lastPendingRequest);
                }
            }
            catch (Throwable t) {
                ODistributedServerLog.error((Object)this, (String)this.getLocalNodeName(), null, (ODistributedServerLog.DIRECTION)ODistributedServerLog.DIRECTION.NONE, (String)"error on executing restored message for database %s", (Throwable)t, (Object[])new Object[]{this.databaseName});
            }
        }
        return lastPendingRequestMap;
    }

    protected void hotAlignmentError(ODistributedRequest iLastPendingRequest, String iMessage, Object ... iParams) throws OHotAlignmentNotPossibleExeption {
        String msg = String.format(iMessage, iParams);
        ODistributedServerLog.warn((Object)this, (String)this.getLocalNodeName(), (String)iLastPendingRequest.getSenderNodeName(), (ODistributedServerLog.DIRECTION)ODistributedServerLog.DIRECTION.IN, (String)("- " + msg), (Object[])new Object[0]);
        throw new OHotAlignmentNotPossibleExeption(msg);
    }

    protected boolean checkIfOperationHasBeenExecuted(ODistributedRequest lastPendingRequest, OAbstractRemoteTask task) {
        boolean executeLastPendingRequest = false;
        if (task instanceof ODeleteRecordTask) {
            executeLastPendingRequest = ((ODeleteRecordTask)task).getRid().getRecord() != null;
        } else if (task instanceof OUpdateRecordTask) {
            ORecord rec = ((OUpdateRecordTask)task).getRid().getRecord();
            if (rec == null) {
                ODistributedServerLog.warn((Object)this, (String)this.getLocalNodeName(), (String)lastPendingRequest.getSenderNodeName(), (ODistributedServerLog.DIRECTION)ODistributedServerLog.DIRECTION.IN, (String)"- cannot update deleted record %s, database could be not aligned", (Object[])new Object[]{((OUpdateRecordTask)task).getRid()});
            } else {
                executeLastPendingRequest = !rec.getRecordVersion().equals((Object)((OUpdateRecordTask)task).getVersion());
            }
        } else if (task instanceof OCreateRecordTask) {
            executeLastPendingRequest = ((OCreateRecordTask)task).getRid().getRecord() == null;
        } else if (task instanceof OSQLCommandTask) {
            if (!task.isIdempotent()) {
                this.hotAlignmentError(lastPendingRequest, "Not able to assure last command has been completed before last crash. Command='%s'", ((OSQLCommandTask)task).getPayload());
            }
        } else if (task instanceof OResurrectRecordTask) {
            if (((OResurrectRecordTask)task).getRid().getRecord() == null) {
                this.hotAlignmentError(lastPendingRequest, "Not able to resurrect deleted record '%s'", ((OResurrectRecordTask)task).getRid());
            }
        } else if (task instanceof OTxTask) {
            for (OAbstractRemoteTask t : ((OTxTask)task).getTasks()) {
                executeLastPendingRequest = this.checkIfOperationHasBeenExecuted(lastPendingRequest, t);
                if (!executeLastPendingRequest) continue;
                return true;
            }
        } else if (task instanceof OFixTxTask) {
            for (OAbstractRemoteTask t : ((OFixTxTask)task).getTasks()) {
                executeLastPendingRequest = this.checkIfOperationHasBeenExecuted(lastPendingRequest, t);
                if (!executeLastPendingRequest) continue;
                return true;
            }
        } else {
            this.hotAlignmentError(lastPendingRequest, "Not able to assure last operation has been completed before last crash. Task='%s'", task);
        }
        return executeLastPendingRequest;
    }

    private void sendResponseBack(ODistributedRequest iRequest, OAbstractRemoteTask task, Serializable responsePayload) {
        ODistributedServerLog.debug((Object)this, (String)this.manager.getLocalNodeName(), (String)iRequest.getSenderNodeName(), (ODistributedServerLog.DIRECTION)ODistributedServerLog.DIRECTION.OUT, (String)"sending back response '%s' to request %d (%s)", (Object[])new Object[]{responsePayload, iRequest.getId(), task});
        OHazelcastDistributedResponse response = new OHazelcastDistributedResponse(iRequest.getId(), this.manager.getLocalNodeName(), iRequest.getSenderNodeName(), responsePayload);
        try {
            IQueue queue = this.msgService.getQueue(OHazelcastDistributedMessageService.getResponseQueueName(iRequest.getSenderNodeName()));
            if (!queue.offer((Object)response, OGlobalConfiguration.DISTRIBUTED_QUEUE_TIMEOUT.getValueAsLong(), TimeUnit.MILLISECONDS)) {
                throw new ODistributedException("Timeout on dispatching response to the thread queue " + iRequest.getSenderNodeName());
            }
        }
        catch (Exception e) {
            throw new ODistributedException("Cannot dispatch response to the thread queue " + iRequest.getSenderNodeName(), (Throwable)e);
        }
    }

    private void createReplicatorUser(ODatabaseDocumentTx database, OServerUserConfiguration replicatorUser) {
        OUser replUser = database.getMetadata().getSecurity().getUser(replicatorUser.name);
        if (replUser == null) {
            database.getMetadata().getSecurity().createUser(replicatorUser.name, replicatorUser.password, new String[]{"admin"});
        }
    }
}

