/*
 * Decompiled with CFR 0.152.
 */
package net.timewalker.ffmq4.cluster.bridge;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.NamingException;
import net.timewalker.ffmq4.cluster.bridge.JMSBridgeMBean;
import net.timewalker.ffmq4.cluster.resolver.DestinationResolver;
import net.timewalker.ffmq4.cluster.resolver.SessionDestinationResolver;
import net.timewalker.ffmq4.management.bridge.BridgeDefinition;
import net.timewalker.ffmq4.management.peer.PeerDescriptor;
import net.timewalker.ffmq4.utils.JNDITools;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public final class JMSBridge
implements JMSBridgeMBean {
    protected static final Log log = LogFactory.getLog(JMSBridge.class);
    protected BridgeDefinition bridgeDefinition;
    private JMSBridgeThread bridgeThread;
    protected volatile long forwardedMessages;
    protected volatile long failures;
    private boolean started;

    public JMSBridge(BridgeDefinition bridgeDefinition) {
        this.bridgeDefinition = bridgeDefinition;
    }

    @Override
    public String getName() {
        return this.bridgeDefinition.getName();
    }

    public BridgeDefinition getBridgeDefinition() {
        return this.bridgeDefinition;
    }

    @Override
    public long getForwardedMessages() {
        return this.forwardedMessages;
    }

    @Override
    public long getFailures() {
        return this.failures;
    }

    @Override
    public void resetStats() {
        this.forwardedMessages = 0L;
        this.failures = 0L;
    }

    @Override
    public synchronized void start() {
        if (this.started) {
            return;
        }
        this.bridgeThread = new JMSBridgeThread();
        this.bridgeThread.start();
        log.info((Object)("[" + this.bridgeDefinition.getName() + "] JMS bridge started"));
        this.started = true;
    }

    @Override
    public synchronized void stop() {
        if (!this.started) {
            return;
        }
        this.bridgeThread.pleaseStop();
        try {
            this.bridgeThread.join();
        }
        catch (InterruptedException e) {
            log.error((Object)"Wait for bridge thread completion was interrupted");
        }
        finally {
            this.bridgeThread = null;
        }
        log.info((Object)("[" + this.bridgeDefinition.getName() + "] JMS bridge stopped."));
        this.started = false;
    }

    @Override
    public synchronized boolean isStarted() {
        return this.started;
    }

    @Override
    public int getRetryInterval() {
        return this.bridgeDefinition.getRetryInterval();
    }

    @Override
    public boolean isCommitSourceFirst() {
        return this.bridgeDefinition.isCommitSourceFirst();
    }

    @Override
    public boolean isProducerTransacted() {
        return this.bridgeDefinition.isProducerTransacted();
    }

    @Override
    public boolean isConsumerTransacted() {
        return this.bridgeDefinition.isConsumerTransacted();
    }

    @Override
    public int getConsumerAcknowledgeMode() {
        return this.bridgeDefinition.getConsumerAcknowledgeMode();
    }

    @Override
    public int getProducerDeliveryMode() {
        return this.bridgeDefinition.getProducerDeliveryMode();
    }

    private class JMSBridgeThread
    extends Thread {
        private DestinationResolver destinationResolver;
        private boolean stopRequired;
        private ConnectionFactory sourceConnectionFactory;
        private ConnectionFactory targetConnectionFactory;
        private Connection sourceConnection;
        private Session sourceSession;
        private MessageConsumer sourceConsumer;
        private Connection targetConnection;
        private Session targetSession;
        private MessageProducer targetProducer;
        private boolean debugEnabled;

        public JMSBridgeThread() {
            super("JMSBridge[" + JMSBridge.this.bridgeDefinition.getName() + "]");
            this.destinationResolver = new SessionDestinationResolver();
            this.debugEnabled = log.isDebugEnabled();
        }

        @Override
        public void run() {
            try {
                log.debug((Object)("[" + JMSBridge.this.bridgeDefinition.getName() + "] JMS bridge thread starting"));
                log.trace((Object)JMSBridge.this.bridgeDefinition);
                this.sourceConnectionFactory = this.getConnectionFactory(JMSBridge.this.bridgeDefinition.getSource());
                this.targetConnectionFactory = this.getConnectionFactory(JMSBridge.this.bridgeDefinition.getTarget());
                while (!this.stopRequired) {
                    Message msg = this.receiveFromSource();
                    if (msg == null) {
                        if (this.stopRequired) {
                            break;
                        }
                        ++JMSBridge.this.failures;
                        log.error((Object)("[" + JMSBridge.this.bridgeDefinition.getName() + "] Consumer was closed"));
                        this.dropSourceResources();
                        this.dropTargetResources();
                        continue;
                    }
                    if (!this.forwardToTarget(msg)) {
                        break;
                    }
                    try {
                        if (JMSBridge.this.bridgeDefinition.isCommitSourceFirst()) {
                            if (JMSBridge.this.bridgeDefinition.isConsumerTransacted()) {
                                this.getSourceSession().commit();
                            } else if (JMSBridge.this.bridgeDefinition.getConsumerAcknowledgeMode() == 2) {
                                msg.acknowledge();
                            }
                            if (JMSBridge.this.bridgeDefinition.isProducerTransacted()) {
                                this.getTargetSession().commit();
                            }
                        } else {
                            if (JMSBridge.this.bridgeDefinition.isProducerTransacted()) {
                                this.getTargetSession().commit();
                            }
                            if (JMSBridge.this.bridgeDefinition.isConsumerTransacted()) {
                                this.getSourceSession().commit();
                            } else if (JMSBridge.this.bridgeDefinition.getConsumerAcknowledgeMode() == 2) {
                                msg.acknowledge();
                            }
                        }
                        ++JMSBridge.this.forwardedMessages;
                    }
                    catch (JMSException e) {
                        ++JMSBridge.this.failures;
                        log.error((Object)("[" + JMSBridge.this.bridgeDefinition.getName() + "] Double phase commit failed"), (Throwable)e);
                        this.dropSourceResources();
                        this.dropTargetResources();
                    }
                    if (!this.debugEnabled) continue;
                    log.debug((Object)("[" + JMSBridge.this.bridgeDefinition.getName() + "] Forwarded message : " + msg));
                }
            }
            catch (Throwable e) {
                log.fatal((Object)("[" + JMSBridge.this.bridgeDefinition.getName() + "] JMSBridge thread failed"), e);
            }
            finally {
                this.dropSourceResources();
                this.dropTargetResources();
                log.debug((Object)("[" + JMSBridge.this.bridgeDefinition.getName() + "] JMS bridge thread exiting"));
            }
        }

        private Message receiveFromSource() {
            while (!this.stopRequired) {
                try {
                    MessageConsumer consumer = this.getSourceConsumer();
                    if (consumer == null) {
                        return null;
                    }
                    Message msg = consumer.receive();
                    if (msg == null) {
                        if (!this.stopRequired) {
                            log.error((Object)"Consumer was unexpectedly closed, restarting bridge.");
                            this.dropSourceResources();
                            this.retryWait();
                            continue;
                        }
                        break;
                    }
                    return msg;
                }
                catch (JMSException e) {
                    ++JMSBridge.this.failures;
                    log.error((Object)("[" + JMSBridge.this.bridgeDefinition.getName() + "] Receive failed"), (Throwable)e);
                    this.dropSourceResources();
                    this.retryWait();
                }
            }
            return null;
        }

        private boolean forwardToTarget(Message message) {
            while (!this.stopRequired) {
                try {
                    MessageProducer producer = this.getTargetProducer();
                    if (producer != null) {
                        long TTL = 0L;
                        if (message.getJMSExpiration() > 0L) {
                            long now = System.currentTimeMillis();
                            if (now >= message.getJMSExpiration()) {
                                log.warn((Object)("Message " + message.getJMSMessageID() + " has expired, discarding it."));
                                return true;
                            }
                            TTL = message.getJMSExpiration() - now;
                        }
                        producer.send(message, JMSBridge.this.bridgeDefinition.getProducerDeliveryMode(), message.getJMSPriority(), TTL);
                        return true;
                    }
                    break;
                }
                catch (JMSException e) {
                    ++JMSBridge.this.failures;
                    log.error((Object)("[" + JMSBridge.this.bridgeDefinition.getName() + "] Send failed"), (Throwable)e);
                    this.dropTargetResources();
                    this.retryWait();
                }
            }
            return false;
        }

        public synchronized void pleaseStop() {
            this.stopRequired = true;
            this.notify();
            this.dropSourceResources();
        }

        private ConnectionFactory getConnectionFactory(PeerDescriptor peer) throws JMSException {
            try {
                Context context = JNDITools.getContext(peer.getJdniInitialContextFactoryName(), peer.getProviderURL(), null);
                return (ConnectionFactory)context.lookup(peer.getJndiConnectionFactoryName());
            }
            catch (NamingException e) {
                throw new JMSException("JNDI error : " + e.toString());
            }
        }

        private synchronized MessageConsumer getSourceConsumer() {
            if (this.sourceConsumer == null) {
                while (!this.stopRequired) {
                    try {
                        Session sourceSession = this.getSourceSession();
                        if (sourceSession == null) break;
                        Destination source = this.destinationResolver.getDestination(JMSBridge.this.bridgeDefinition.getSource(), JMSBridge.this.bridgeDefinition.getSourceDestination(), sourceSession);
                        this.sourceConsumer = sourceSession.createConsumer(source);
                        this.getSourceConnection().start();
                        break;
                    }
                    catch (JMSException e) {
                        ++JMSBridge.this.failures;
                        log.error((Object)("[" + JMSBridge.this.bridgeDefinition.getName() + "] Cannot create consumer on source queuer"), (Throwable)e);
                        this.dropSourceResources();
                        this.retryWait();
                    }
                }
            }
            return this.sourceConsumer;
        }

        private synchronized MessageProducer getTargetProducer() {
            if (this.targetProducer == null) {
                while (!this.stopRequired) {
                    try {
                        Session targetSession = this.getTargetSession();
                        if (targetSession == null) break;
                        Destination target = this.destinationResolver.getDestination(JMSBridge.this.bridgeDefinition.getTarget(), JMSBridge.this.bridgeDefinition.getTargetDestination(), targetSession);
                        this.targetProducer = targetSession.createProducer(target);
                        break;
                    }
                    catch (JMSException e) {
                        ++JMSBridge.this.failures;
                        log.error((Object)("[" + JMSBridge.this.bridgeDefinition.getName() + "] Cannot create producer on target queuer"), (Throwable)e);
                        this.dropTargetResources();
                        this.retryWait();
                    }
                }
            }
            return this.targetProducer;
        }

        private synchronized Session getTargetSession() {
            if (this.targetSession == null) {
                while (!this.stopRequired) {
                    try {
                        Connection targetConnection = this.getTargetConnection();
                        if (targetConnection == null) break;
                        this.targetSession = targetConnection.createSession(JMSBridge.this.bridgeDefinition.isProducerTransacted(), JMSBridge.this.bridgeDefinition.isProducerTransacted() ? 0 : 1);
                        break;
                    }
                    catch (JMSException e) {
                        ++JMSBridge.this.failures;
                        log.error((Object)("[" + JMSBridge.this.bridgeDefinition.getName() + "] Cannot create session on target queuer"), (Throwable)e);
                        this.dropTargetResources();
                        this.retryWait();
                    }
                }
            }
            return this.targetSession;
        }

        private synchronized Session getSourceSession() {
            if (this.sourceSession == null) {
                while (!this.stopRequired) {
                    try {
                        Connection sourceConnection = this.getSourceConnection();
                        if (sourceConnection == null) break;
                        this.sourceSession = sourceConnection.createSession(JMSBridge.this.bridgeDefinition.isConsumerTransacted(), JMSBridge.this.bridgeDefinition.isConsumerTransacted() ? 0 : JMSBridge.this.bridgeDefinition.getConsumerAcknowledgeMode());
                        break;
                    }
                    catch (JMSException e) {
                        ++JMSBridge.this.failures;
                        log.error((Object)("[" + JMSBridge.this.bridgeDefinition.getName() + "] Cannot create session on source queuer"), (Throwable)e);
                        this.dropSourceResources();
                        this.retryWait();
                    }
                }
            }
            return this.sourceSession;
        }

        private synchronized Connection getTargetConnection() {
            if (this.targetConnection == null) {
                while (!this.stopRequired) {
                    try {
                        this.targetConnection = this.targetConnectionFactory.createConnection(JMSBridge.this.bridgeDefinition.getTarget().getUserName(), JMSBridge.this.bridgeDefinition.getTarget().getPassword());
                        break;
                    }
                    catch (JMSException e) {
                        ++JMSBridge.this.failures;
                        log.error((Object)("[" + JMSBridge.this.bridgeDefinition.getName() + "] Cannot create connection to target queuer"), (Throwable)e);
                        this.dropTargetResources();
                        this.retryWait();
                    }
                }
            }
            return this.targetConnection;
        }

        private synchronized Connection getSourceConnection() {
            if (this.sourceConnection == null) {
                while (!this.stopRequired) {
                    try {
                        this.sourceConnection = this.sourceConnectionFactory.createConnection(JMSBridge.this.bridgeDefinition.getSource().getUserName(), JMSBridge.this.bridgeDefinition.getSource().getPassword());
                        break;
                    }
                    catch (JMSException e) {
                        ++JMSBridge.this.failures;
                        log.error((Object)("[" + JMSBridge.this.bridgeDefinition.getName() + "] Cannot create connection to source queuer"), (Throwable)e);
                        this.dropSourceResources();
                        this.retryWait();
                    }
                }
            }
            return this.sourceConnection;
        }

        private synchronized void retryWait() {
            if (this.stopRequired) {
                return;
            }
            log.error((Object)("[" + JMSBridge.this.bridgeDefinition.getName() + "] Waiting " + JMSBridge.this.bridgeDefinition.getRetryInterval() + " second(s) before retrying"));
            try {
                this.wait((long)JMSBridge.this.bridgeDefinition.getRetryInterval() * 1000L);
            }
            catch (InterruptedException e) {
                log.error((Object)("[" + JMSBridge.this.bridgeDefinition.getName() + "] Retry wait was interrupted"));
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private synchronized void dropSourceResources() {
            try {
                if (this.sourceConsumer != null) {
                    this.sourceConsumer.close();
                }
            }
            catch (Exception e) {
                log.error((Object)("[" + JMSBridge.this.bridgeDefinition.getName() + "] Could not close source consumer"), (Throwable)e);
            }
            finally {
                this.sourceConsumer = null;
            }
            try {
                if (this.sourceSession != null) {
                    this.sourceSession.close();
                }
            }
            catch (Exception e) {
                log.error((Object)("[" + JMSBridge.this.bridgeDefinition.getName() + "] Could not close source session"), (Throwable)e);
            }
            finally {
                this.sourceSession = null;
            }
            try {
                if (this.sourceConnection != null) {
                    this.sourceConnection.close();
                }
            }
            catch (Exception e) {
                log.error((Object)("[" + JMSBridge.this.bridgeDefinition.getName() + "] Could not close source connection"), (Throwable)e);
            }
            finally {
                this.sourceConnection = null;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private synchronized void dropTargetResources() {
            try {
                if (this.targetProducer != null) {
                    this.targetProducer.close();
                }
            }
            catch (Exception e) {
                log.error((Object)("[" + JMSBridge.this.bridgeDefinition.getName() + "] Could not close target producer"), (Throwable)e);
            }
            finally {
                this.targetProducer = null;
            }
            try {
                if (this.targetSession != null) {
                    this.targetSession.close();
                }
            }
            catch (Exception e) {
                log.error((Object)("[" + JMSBridge.this.bridgeDefinition.getName() + "] Could not close target session"), (Throwable)e);
            }
            finally {
                this.targetSession = null;
            }
            try {
                if (this.targetConnection != null) {
                    this.targetConnection.close();
                }
            }
            catch (Exception e) {
                log.error((Object)("[" + JMSBridge.this.bridgeDefinition.getName() + "] Could not close target connection"), (Throwable)e);
            }
            finally {
                this.targetConnection = null;
            }
        }
    }
}

