/*
 * Decompiled with CFR 0.152.
 */
package net.timewalker.ffmq4.local.destination;

import java.util.ArrayList;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.JMSException;
import javax.jms.Queue;
import net.timewalker.ffmq4.FFMQException;
import net.timewalker.ffmq4.common.message.AbstractMessage;
import net.timewalker.ffmq4.common.message.MessageSelector;
import net.timewalker.ffmq4.local.FFMQEngine;
import net.timewalker.ffmq4.local.MessageLock;
import net.timewalker.ffmq4.local.MessageLockSet;
import net.timewalker.ffmq4.local.TransactionItem;
import net.timewalker.ffmq4.local.TransactionSet;
import net.timewalker.ffmq4.local.destination.AbstractLocalDestination;
import net.timewalker.ffmq4.local.destination.LocalQueueMBean;
import net.timewalker.ffmq4.local.session.LocalMessageConsumer;
import net.timewalker.ffmq4.local.session.LocalQueueBrowserCursor;
import net.timewalker.ffmq4.local.session.LocalSession;
import net.timewalker.ffmq4.management.destination.definition.QueueDefinition;
import net.timewalker.ffmq4.storage.data.DataStoreFullException;
import net.timewalker.ffmq4.storage.message.MessageStore;
import net.timewalker.ffmq4.storage.message.impl.BlockFileMessageStore;
import net.timewalker.ffmq4.storage.message.impl.InMemoryMessageStore;
import net.timewalker.ffmq4.utils.ErrorTools;
import net.timewalker.ffmq4.utils.async.AsyncTask;
import net.timewalker.ffmq4.utils.concurrent.BlockingBoundedFIFO;
import net.timewalker.ffmq4.utils.concurrent.SynchronizationBarrier;
import net.timewalker.ffmq4.utils.concurrent.WaitTimeoutException;
import net.timewalker.ffmq4.utils.watchdog.ActiveObject;
import net.timewalker.ffmq4.utils.watchdog.ActivityWatchdog;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public final class LocalQueue
extends AbstractLocalDestination
implements Queue,
LocalQueueMBean,
ActiveObject {
    private static final Log log = LogFactory.getLog(LocalQueue.class);
    private static final Timer redeliveryTimer = new Timer(true);
    private FFMQEngine engine;
    private QueueDefinition queueDef;
    private MessageStore volatileStore;
    private MessageStore persistentStore;
    private Object storeLock = new Object();
    private AtomicLong sentToQueueCount = new AtomicLong();
    private AtomicLong receivedFromQueueCount = new AtomicLong();
    private AtomicLong acknowledgedGetCount = new AtomicLong();
    private AtomicLong rollbackedGetCount = new AtomicLong();
    private AtomicLong expiredCount = new AtomicLong();
    private long inactivityTimeout;
    private long redeliveryDelay;
    private boolean traceEnabled = log.isTraceEnabled();
    private boolean pendingChanges;
    private long lastActivity;
    private volatile int consumerOffset = 0;
    private BlockingBoundedFIFO<AbstractMessage> notificationQueue;
    private final NotificationTask notificationTask = new NotificationTask();

    public LocalQueue(FFMQEngine engine, QueueDefinition queueDef) throws JMSException {
        super(queueDef);
        this.engine = engine;
        this.queueDef = queueDef;
        int notificationQueueMaxSize = Math.max(engine.getSetup().getNotificationAsyncTaskManagerThreadPoolMaxSize() + 1, engine.getSetup().getInternalNotificationQueueMaxSize());
        this.notificationQueue = new BlockingBoundedFIFO(notificationQueueMaxSize, 5000L);
        if (queueDef.getMaxNonPersistentMessages() > 0) {
            this.volatileStore = new InMemoryMessageStore(queueDef);
            this.volatileStore.init();
        }
        if (queueDef.hasPersistentStore()) {
            this.persistentStore = new BlockFileMessageStore(queueDef, engine.getDiskIOAsyncTaskManager());
            this.persistentStore.init();
        }
        this.inactivityTimeout = (long)engine.getSetup().getWatchdogConsumerInactivityTimeout() * 1000L;
        this.redeliveryDelay = engine.getSetup().getRedeliveryDelay();
        this.lastActivity = System.currentTimeMillis();
        ActivityWatchdog.getInstance().register(this);
    }

    public QueueDefinition getDefinition() {
        return this.queueDef;
    }

    public String getQueueName() {
        return this.getName();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean putLocked(AbstractMessage message, LocalSession session, MessageLockSet locks) throws JMSException {
        this.checkNotClosed();
        this.checkTransactionLock();
        if (!message.isInternalCopy()) {
            throw new FFMQException("Message instance is not an FFMQ internal copy !", "CONSISTENCY_ERROR");
        }
        MessageStore targetStore = message.getJMSDeliveryMode() == 1 ? (this.volatileStore != null ? this.volatileStore : this.persistentStore) : this.persistentStore;
        if (targetStore == null) {
            throw new FFMQException("Queue does not support this delivery mode : " + (message.getJMSDeliveryMode() == 1 ? "DeliveryMode.NON_PERSISTENT" : "DeliveryMode.PERSISTENT"), "INVALID_DELIVERY_MODE");
        }
        Object object = this.storeLock;
        synchronized (object) {
            int newHandle = targetStore.store(message);
            if (newHandle == -1) {
                if (targetStore == this.volatileStore && this.persistentStore != null && this.queueDef.isOverflowToPersistent()) {
                    targetStore = this.persistentStore;
                    newHandle = targetStore.store(message);
                }
                if (newHandle == -1) {
                    throw new DataStoreFullException("Cannot store message : queue is full : " + this.getName());
                }
            }
            targetStore.lock(newHandle);
            locks.add(newHandle, targetStore.getDeliveryMode(), this, message);
        }
        if (message.getJMSDeliveryMode() == 2 && this.requiresTransactionalUpdate()) {
            this.pendingChanges = true;
            return true;
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void unlockAndDeliver(MessageLock lockRef) throws JMSException {
        MessageStore targetStore = lockRef.getDeliveryMode() == 1 ? this.volatileStore : this.persistentStore;
        int handle = lockRef.getHandle();
        AbstractMessage message = lockRef.getMessage();
        Object object = this.storeLock;
        synchronized (object) {
            targetStore.unlock(handle);
        }
        this.sentToQueueCount.incrementAndGet();
        this.sendAvailabilityNotification(message);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void removeLocked(MessageLock lockRef) throws JMSException {
        MessageStore targetStore;
        this.checkTransactionLock();
        if (lockRef.getDeliveryMode() == 1) {
            targetStore = this.volatileStore;
        } else {
            targetStore = this.persistentStore;
            if (this.requiresTransactionalUpdate()) {
                this.pendingChanges = true;
            }
        }
        Object object = this.storeLock;
        synchronized (object) {
            targetStore.delete(lockRef.getHandle());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean remove(LocalSession localSession, TransactionItem[] items) throws JMSException {
        this.checkNotClosed();
        this.checkTransactionLock();
        int volatileCommitted = 0;
        int persistentCommitted = 0;
        Object object = this.storeLock;
        synchronized (object) {
            for (int n = 0; n < items.length; ++n) {
                TransactionItem transactionItem = items[n];
                if (transactionItem.getDestination() != this) continue;
                if (this.traceEnabled) {
                    log.trace((Object)(localSession + " COMMIT " + transactionItem.getMessageId()));
                }
                if (transactionItem.getDeliveryMode() == 2) {
                    this.persistentStore.delete(transactionItem.getHandle());
                    ++persistentCommitted;
                    continue;
                }
                this.volatileStore.delete(transactionItem.getHandle());
                ++volatileCommitted;
            }
        }
        this.acknowledgedGetCount.addAndGet(volatileCommitted + persistentCommitted);
        if (persistentCommitted > 0 && this.requiresTransactionalUpdate()) {
            this.pendingChanges = true;
            return true;
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean redeliverLocked(TransactionItem[] items, MessageLockSet locks) throws JMSException {
        this.checkNotClosed();
        this.checkTransactionLock();
        int volatileRollbacked = 0;
        int persistentRollbacked = 0;
        Object object = this.storeLock;
        synchronized (object) {
            for (int n = 0; n < items.length; ++n) {
                TransactionItem transactionItem = items[n];
                if (transactionItem.getDestination() != this) continue;
                MessageStore store = transactionItem.getDeliveryMode() == 2 ? this.persistentStore : this.volatileStore;
                int handle = transactionItem.getHandle();
                AbstractMessage msg = store.retrieve(handle);
                msg.setJMSRedelivered(true);
                handle = store.replace(handle, msg);
                if (this.redeliveryDelay > 0L) {
                    redeliveryTimer.schedule((TimerTask)new RedeliveryTask(msg, store, handle), this.redeliveryDelay);
                } else {
                    locks.add(handle, store.getDeliveryMode(), this, msg);
                }
                if (transactionItem.getDeliveryMode() == 2) {
                    ++persistentRollbacked;
                    continue;
                }
                ++volatileRollbacked;
            }
        }
        this.rollbackedGetCount.addAndGet(volatileRollbacked + persistentRollbacked);
        if (persistentRollbacked > 0 && this.requiresTransactionalUpdate()) {
            this.pendingChanges = true;
            return true;
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void commitChanges(SynchronizationBarrier barrier) throws JMSException {
        this.checkNotClosed();
        this.checkTransactionLock();
        if (this.persistentStore != null) {
            long start = System.currentTimeMillis();
            Object object = this.storeLock;
            synchronized (object) {
                this.persistentStore.commitChanges(barrier);
            }
            long end = System.currentTimeMillis();
            this.notifyCommitTime(end - start);
            this.pendingChanges = false;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void redeliverMessage(AbstractMessage msg, MessageStore store, int handle) {
        try {
            Object object = this.storeLock;
            synchronized (object) {
                store.unlock(handle);
                if (this.traceEnabled) {
                    log.trace((Object)("(Deferred) UNLOCKED " + msg.getJMSMessageID()));
                }
            }
            this.sendAvailabilityNotification(msg);
        }
        catch (JMSException e) {
            ErrorTools.log(e, log);
        }
    }

    public AbstractMessage get(LocalSession localSession, TransactionSet transactionSet, MessageSelector selector) throws JMSException {
        if (this.closed) {
            return null;
        }
        this.lastActivity = System.currentTimeMillis();
        AbstractMessage msg = null;
        if (this.volatileStore != null) {
            msg = this.getFromStore(localSession, this.volatileStore, transactionSet, selector);
            if (msg == null && this.persistentStore != null) {
                msg = this.getFromStore(localSession, this.persistentStore, transactionSet, selector);
            }
        } else if (this.persistentStore != null) {
            msg = this.getFromStore(localSession, this.persistentStore, transactionSet, selector);
        }
        return msg;
    }

    public AbstractMessage browse(LocalQueueBrowserCursor cursor, MessageSelector selector) throws JMSException {
        AbstractMessage msg;
        cursor.reset();
        if (this.volatileStore != null && (msg = this.browseStore(this.volatileStore, cursor, selector)) != null) {
            cursor.move();
            return msg;
        }
        if (this.persistentStore != null && (msg = this.browseStore(this.persistentStore, cursor, selector)) != null) {
            cursor.move();
            return msg;
        }
        cursor.setEndOfQueueReached();
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private AbstractMessage browseStore(MessageStore store, LocalQueueBrowserCursor cursor, MessageSelector selector) throws JMSException {
        AbstractMessage result = null;
        ArrayList<Integer> expiredHandles = null;
        long now = System.currentTimeMillis();
        Object object = this.storeLock;
        synchronized (object) {
            int current = store.first();
            while (current != -1 && cursor.position() > cursor.skipped()) {
                cursor.skip();
                current = store.next(current);
            }
            while (current != -1) {
                if (!store.isLocked(current)) {
                    AbstractMessage msg = store.retrieve(current);
                    if (msg.getJMSExpiration() > 0L && msg.getJMSExpiration() < now) {
                        if (expiredHandles == null) {
                            expiredHandles = new ArrayList<Integer>();
                        }
                        store.lock(current);
                        expiredHandles.add(current);
                        current = store.next(current);
                        continue;
                    }
                    if (selector == null) {
                        result = msg;
                        break;
                    }
                    msg.ensureDeserializationLevel(2);
                    if (selector.matches(msg)) {
                        result = msg;
                        break;
                    }
                }
                cursor.skip();
                current = store.next(current);
            }
        }
        if (expiredHandles != null) {
            this.openTransaction();
            try {
                for (int i = 0; i < expiredHandles.size(); ++i) {
                    int expiredHandle = (Integer)expiredHandles.get(i);
                    Object object2 = this.storeLock;
                    synchronized (object2) {
                        store.delete(expiredHandle);
                    }
                    this.expiredCount.incrementAndGet();
                }
                this.commitChanges(null);
            }
            finally {
                this.closeTransaction();
            }
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private AbstractMessage getFromStore(LocalSession localSession, MessageStore store, TransactionSet transactionSet, MessageSelector selector) throws JMSException {
        AbstractMessage result = null;
        ArrayList<Integer> expiredHandles = null;
        Object object = this.storeLock;
        synchronized (object) {
            int current = store.first();
            while (current != -1) {
                if (!store.isLocked(current)) {
                    boolean matchesSelector;
                    AbstractMessage msg = store.retrieve(current);
                    if (msg.getJMSExpiration() > 0L && msg.getJMSExpiration() < this.lastActivity) {
                        if (expiredHandles == null) {
                            expiredHandles = new ArrayList<Integer>();
                        }
                        store.lock(current);
                        expiredHandles.add(current);
                        current = store.next(current);
                        continue;
                    }
                    if (selector != null) {
                        msg.ensureDeserializationLevel(2);
                        matchesSelector = selector.matches(msg);
                    } else {
                        matchesSelector = true;
                    }
                    if (matchesSelector) {
                        store.lock(current);
                        if (this.traceEnabled) {
                            log.trace((Object)(localSession + " LOCKED " + msg.getJMSMessageID()));
                        }
                        transactionSet.add(current, msg.getJMSMessageID(), store.getDeliveryMode(), this);
                        this.receivedFromQueueCount.incrementAndGet();
                        result = msg;
                        break;
                    }
                }
                current = store.next(current);
            }
        }
        if (expiredHandles != null) {
            this.openTransaction();
            try {
                for (int i = 0; i < expiredHandles.size(); ++i) {
                    int expiredHandle = (Integer)expiredHandles.get(i);
                    Object object2 = this.storeLock;
                    synchronized (object2) {
                        store.delete(expiredHandle);
                    }
                    this.expiredCount.incrementAndGet();
                }
                this.commitChanges(null);
            }
            finally {
                this.closeTransaction();
            }
        }
        return result;
    }

    public void purge(MessageSelector selector) throws JMSException {
        if (this.volatileStore != null) {
            this.purgeStore(this.volatileStore, selector);
        }
        if (this.persistentStore != null) {
            this.openTransaction();
            try {
                this.purgeStore(this.persistentStore, selector);
                this.commitChanges();
            }
            finally {
                this.closeTransaction();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void purgeStore(MessageStore store, MessageSelector selector) throws JMSException {
        Object object = this.storeLock;
        synchronized (object) {
            int current = store.first();
            while (current != -1) {
                int next = store.next(current);
                if (!store.isLocked(current)) {
                    if (selector != null) {
                        AbstractMessage msg = store.retrieve(current);
                        msg.ensureDeserializationLevel(2);
                        if (selector.matches(msg)) {
                            store.delete(current);
                        }
                    } else {
                        store.delete(current);
                    }
                }
                current = next;
            }
        }
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private void notifyConsumer(AbstractMessage message) {
        this.consumersLock.readLock().lock();
        try {
            switch (this.localConsumers.size()) {
                case 0: {
                    return;
                }
                case 1: {
                    this.notifySingleConsumer((LocalMessageConsumer)this.localConsumers.get(0), message);
                    return;
                }
                default: {
                    this.notifyNextConsumer(this.localConsumers, message);
                    return;
                }
            }
        }
        finally {
            this.consumersLock.readLock().unlock();
        }
    }

    private void notifySingleConsumer(LocalMessageConsumer consumer, AbstractMessage message) {
        try {
            MessageSelector consumerSelector;
            if (message != null && (consumerSelector = consumer.getReceiveSelector()) != null) {
                message.ensureDeserializationLevel(2);
                if (!consumerSelector.matches(message)) {
                    return;
                }
            }
            consumer.wakeUp();
        }
        catch (JMSException e) {
            ErrorTools.log(e, log);
        }
    }

    private void notifyNextConsumer(List<LocalMessageConsumer> allConsumers, AbstractMessage message) {
        int localConsumersCount = allConsumers.size();
        int currentOffset = this.consumerOffset++;
        for (int n = 0; n < localConsumersCount; ++n) {
            LocalMessageConsumer consumer;
            block7: {
                MessageSelector consumerSelector;
                int offset = (n + currentOffset) % localConsumersCount;
                consumer = allConsumers.get(offset);
                if (!consumer.getSession().getConnection().isStarted()) continue;
                if (message != null && (consumerSelector = consumer.getReceiveSelector()) != null) {
                    message.ensureDeserializationLevel(2);
                    try {
                        if (!consumerSelector.matches(message)) {
                        }
                        break block7;
                    }
                    catch (JMSException e) {
                        ErrorTools.log(e, log);
                    }
                    continue;
                }
            }
            try {
                consumer.wakeUp();
                break;
            }
            catch (JMSException e) {
                ErrorTools.log(e, log);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public int getSize() {
        int size = 0;
        Object object = this.storeLock;
        synchronized (object) {
            if (this.volatileStore != null) {
                size += this.volatileStore.size();
            }
            if (this.persistentStore != null) {
                size += this.persistentStore.size();
            }
        }
        return size;
    }

    @Override
    public int getMemoryStoreUsage() {
        return this.volatileStore != null ? this.volatileStore.getAbsoluteStoreUsage() : -1;
    }

    @Override
    public int getPersistentStoreUsage() {
        return this.persistentStore != null ? this.persistentStore.getAbsoluteStoreUsage() : -1;
    }

    @Override
    public void resetStats() {
        super.resetStats();
        this.sentToQueueCount.set(0L);
        this.receivedFromQueueCount.set(0L);
        this.acknowledgedGetCount.set(0L);
        this.rollbackedGetCount.set(0L);
        this.expiredCount.set(0L);
    }

    public String toString() {
        StringBuilder sb = new StringBuilder();
        sb.append("Queue{");
        sb.append(this.getName());
        sb.append("}[size=");
        sb.append(this.getSize());
        sb.append(",consumers=");
        sb.append(this.localConsumers.size());
        sb.append(",in=");
        sb.append(this.sentToQueueCount);
        sb.append(",out=");
        sb.append(this.receivedFromQueueCount);
        sb.append(",ack=");
        sb.append(this.acknowledgedGetCount);
        sb.append(",rollback=");
        sb.append(this.rollbackedGetCount);
        sb.append(",expired=");
        sb.append(this.expiredCount);
        sb.append("]");
        return sb.toString();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public final void close() throws JMSException {
        ArrayList consumers;
        Object object = this.closeLock;
        synchronized (object) {
            if (this.closed) {
                return;
            }
            this.closed = true;
        }
        ActivityWatchdog.getInstance().unregister(this);
        object = this.storeLock;
        synchronized (object) {
            if (this.volatileStore != null) {
                this.volatileStore.close();
                if (this.queueDef.isTemporary()) {
                    this.volatileStore.delete();
                }
            }
            if (this.persistentStore != null) {
                this.persistentStore.close();
                if (this.queueDef.isTemporary()) {
                    this.persistentStore.delete();
                }
            }
        }
        this.consumersLock.readLock().lock();
        try {
            if (this.localConsumers.isEmpty()) {
                return;
            }
            consumers = new ArrayList(this.localConsumers);
        }
        finally {
            this.consumersLock.readLock().unlock();
        }
        for (int n = 0; n < consumers.size(); ++n) {
            LocalMessageConsumer consumer = (LocalMessageConsumer)consumers.get(n);
            try {
                consumer.close();
                continue;
            }
            catch (JMSException e) {
                ErrorTools.log(e, log);
            }
        }
    }

    @Override
    public long getSentToQueueCount() {
        return this.sentToQueueCount.get();
    }

    @Override
    public long getReceivedFromQueueCount() {
        return this.receivedFromQueueCount.get();
    }

    @Override
    public long getAcknowledgedGetCount() {
        return this.acknowledgedGetCount.get();
    }

    @Override
    public long getRollbackedGetCount() {
        return this.rollbackedGetCount.get();
    }

    @Override
    public long getExpiredCount() {
        return this.expiredCount.get();
    }

    @Override
    public long getLastActivity() {
        return this.lastActivity;
    }

    @Override
    public long getTimeoutDelay() {
        return this.inactivityTimeout;
    }

    @Override
    public boolean onActivityTimeout() throws Exception {
        if (this.closed) {
            return true;
        }
        if (this.getSize() == 0) {
            return false;
        }
        this.notifyConsumer(null);
        return false;
    }

    protected void processAvailabilityNotificationQueue() {
        while (!this.closed) {
            AbstractMessage message = this.notificationQueue.removeFirst();
            if (message == null) {
                return;
            }
            this.notifyConsumer(message);
        }
    }

    private void sendAvailabilityNotification(AbstractMessage message) throws JMSException {
        if (this.localConsumers.isEmpty()) {
            return;
        }
        try {
            this.notificationQueue.addLast(message);
        }
        catch (WaitTimeoutException e) {
            log.error((Object)("Cannot enqueue notification " + e));
            return;
        }
        this.engine.getNotificationAsyncTaskManager().execute(this.notificationTask);
    }

    @Override
    protected boolean hasPendingChanges() {
        return this.pendingChanges;
    }

    @Override
    protected boolean requiresTransactionalUpdate() {
        return this.persistentStore != null && this.persistentStore.isFailSafe();
    }

    private final class NotificationTask
    implements AsyncTask {
        @Override
        public boolean isMergeable() {
            return true;
        }

        @Override
        public void execute() {
            LocalQueue.this.processAvailabilityNotificationQueue();
        }
    }

    private final class RedeliveryTask
    extends TimerTask {
        private AbstractMessage msg;
        private MessageStore store;
        private int handle;

        public RedeliveryTask(AbstractMessage msg, MessageStore store, int handle) {
            this.msg = msg;
            this.store = store;
            this.handle = handle;
        }

        @Override
        public void run() {
            LocalQueue.this.redeliverMessage(this.msg, this.store, this.handle);
        }
    }
}

