/*
 * Decompiled with CFR 0.152.
 */
package net.timewalker.ffmq4.remote.session;

import java.util.LinkedList;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageListener;
import net.timewalker.ffmq4.client.ClientEnvironment;
import net.timewalker.ffmq4.common.message.AbstractMessage;
import net.timewalker.ffmq4.common.session.AbstractMessageConsumer;
import net.timewalker.ffmq4.remote.session.RemoteSession;
import net.timewalker.ffmq4.transport.PacketTransportEndpoint;
import net.timewalker.ffmq4.transport.packet.query.CloseConsumerQuery;
import net.timewalker.ffmq4.transport.packet.query.CreateConsumerQuery;
import net.timewalker.ffmq4.transport.packet.query.PrefetchQuery;
import net.timewalker.ffmq4.utils.ErrorTools;
import net.timewalker.ffmq4.utils.Settings;
import net.timewalker.ffmq4.utils.async.AsyncTask;
import net.timewalker.ffmq4.utils.async.AsyncTaskManager;
import net.timewalker.ffmq4.utils.id.IntegerID;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class RemoteMessageConsumer
extends AbstractMessageConsumer {
    private static final Log log = LogFactory.getLog(RemoteMessageConsumer.class);
    protected PacketTransportEndpoint transportEndpoint;
    private boolean traceEnabled;
    private LinkedList<AbstractMessage> prefetchQueue = new LinkedList();
    private Semaphore prefetchSemaphore = new Semaphore(0);
    protected boolean donePrefetching = false;
    private AsyncTaskManager asyncTaskManager;
    private boolean logListenersFailures;
    private final WakeUpTask wakeUpTask = new WakeUpTask();

    public RemoteMessageConsumer(IntegerID consumerId, RemoteSession session, Destination destination, String messageSelector, boolean noLocal) throws JMSException {
        super(session, destination, messageSelector, noLocal, consumerId);
        this.transportEndpoint = session.getTransportEndpoint();
        this.asyncTaskManager = ClientEnvironment.getAsyncTaskManager();
        this.traceEnabled = log.isTraceEnabled();
        this.logListenersFailures = this.getSettings().getBooleanProperty("delivery.logListenersFailures", false);
        log.debug((Object)("New remote consumer ID is " + consumerId));
    }

    protected void remoteInit() throws JMSException {
        CreateConsumerQuery query = new CreateConsumerQuery();
        query.setConsumerId(this.id);
        query.setSessionId(this.session.getId());
        query.setDestination(this.destination);
        query.setMessageSelector(this.messageSelector);
        query.setNoLocal(this.noLocal);
        this.transportEndpoint.blockingRequest(query);
    }

    @Override
    protected final boolean shouldLogListenersFailures() {
        return this.logListenersFailures;
    }

    private final Settings getSettings() {
        return ClientEnvironment.getSettings();
    }

    @Override
    public final void setMessageListener(MessageListener messageListener) throws JMSException {
        super.setMessageListener(messageListener);
        if (messageListener != null && this.session.getConnection().isStarted()) {
            this.wakeUpMessageListenerAsync();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected final void onConsumerClose() {
        super.onConsumerClose();
        this.prefetchSemaphore.release();
        try {
            CloseConsumerQuery query = new CloseConsumerQuery();
            query.setSessionId(this.session.getId());
            query.setConsumerId(this.id);
            LinkedList<AbstractMessage> linkedList = this.prefetchQueue;
            synchronized (linkedList) {
                while (!this.prefetchQueue.isEmpty()) {
                    AbstractMessage msg = this.prefetchQueue.removeFirst();
                    query.addUndeliveredMessageID(msg.getJMSMessageID());
                }
            }
            this.transportEndpoint.blockingRequest(query);
        }
        catch (JMSException e) {
            ErrorTools.log(e, log);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public final boolean addToPrefetchQueue(AbstractMessage prefetchedMessage, boolean donePrefetching) {
        this.externalAccessLock.readLock().lock();
        try {
            if (this.closed) {
                boolean bl = false;
                return bl;
            }
            LinkedList<AbstractMessage> linkedList = this.prefetchQueue;
            synchronized (linkedList) {
                if (this.traceEnabled) {
                    log.trace((Object)("#" + this.id + " [PREFETCHED] from " + this.destination + " - " + prefetchedMessage));
                }
                this.prefetchQueue.add(prefetchedMessage);
                this.donePrefetching = donePrefetching;
            }
        }
        finally {
            this.externalAccessLock.readLock().unlock();
        }
        this.prefetchSemaphore.release();
        if (this.messageListener != null) {
            this.wakeUpMessageListenerAsync();
        }
        return true;
    }

    private void wakeUpMessageListenerAsync() {
        try {
            this.asyncTaskManager.execute(this.wakeUpTask);
        }
        catch (JMSException e) {
            ErrorTools.log(e, log);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private AbstractMessage getFromPrefetchQueue(long timeout) {
        AbstractMessage message;
        boolean shouldPrefetchMore = false;
        LinkedList<AbstractMessage> linkedList = this.prefetchQueue;
        synchronized (linkedList) {
            if (this.donePrefetching && this.prefetchQueue.isEmpty()) {
                shouldPrefetchMore = true;
                this.donePrefetching = false;
            }
        }
        if (shouldPrefetchMore) {
            try {
                this.prefetchFromDestination();
            }
            catch (JMSException e) {
                log.error((Object)"Cannot prefetch more messages from remote server", (Throwable)e);
            }
        }
        try {
            if (timeout >= 0L) {
                if (!this.prefetchSemaphore.tryAcquire(timeout, TimeUnit.MILLISECONDS)) {
                    return null;
                }
            } else {
                this.prefetchSemaphore.acquire();
            }
        }
        catch (InterruptedException e) {
            return null;
        }
        this.externalAccessLock.readLock().lock();
        try {
            if (this.closed) {
                AbstractMessage abstractMessage = null;
                return abstractMessage;
            }
            LinkedList<AbstractMessage> linkedList2 = this.prefetchQueue;
            synchronized (linkedList2) {
                if (this.prefetchQueue.isEmpty()) {
                    throw new IllegalStateException("Prefetch queue is empty");
                }
                message = this.prefetchQueue.removeFirst();
            }
        }
        finally {
            this.externalAccessLock.readLock().unlock();
        }
        ((RemoteSession)this.session).notifyDeliveredMessage(message.getJMSMessageID());
        if (this.traceEnabled) {
            log.trace((Object)("#" + this.id + " [GET PREFETCHED] in " + this.destination + " - " + message));
        }
        message.ensureDeserializationLevel(3);
        message.markAsReadOnly();
        return message;
    }

    private void prefetchFromDestination() throws JMSException {
        if (this.closed) {
            return;
        }
        if (this.traceEnabled) {
            log.trace((Object)("#" + this.id + " Prefetching more from destination " + this.destination));
        }
        PrefetchQuery query = new PrefetchQuery();
        query.setSessionId(this.session.getId());
        query.setConsumerId(this.id);
        this.transportEndpoint.nonBlockingRequest(query);
    }

    @Override
    protected final AbstractMessage receiveFromDestination(long timeout, boolean duplicateRequired) throws JMSException {
        if (this.closed) {
            return null;
        }
        return this.getFromPrefetchQueue(timeout);
    }

    @Override
    protected final void wakeUp() {
        if (this.closed) {
            return;
        }
        if (!this.session.getConnection().isStarted()) {
            return;
        }
        if (this.messageListener == null) {
            throw new IllegalStateException("Unexpected message availability notification");
        }
        this.wakeUpMessageListener();
    }

    private final class WakeUpTask
    implements AsyncTask {
        @Override
        public final boolean isMergeable() {
            return true;
        }

        @Override
        public final void execute() {
            RemoteMessageConsumer.this.wakeUp();
        }
    }
}

