/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.ra;

import java.lang.reflect.Method;
import javax.jms.Connection;
import javax.jms.ConnectionConsumer;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ServerSessionPool;
import javax.jms.Session;
import javax.jms.Topic;
import javax.resource.ResourceException;
import javax.resource.spi.endpoint.MessageEndpointFactory;
import javax.resource.spi.work.Work;
import javax.resource.spi.work.WorkException;
import javax.resource.spi.work.WorkManager;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.ra.ActiveMQActivationSpec;
import org.apache.activemq.ra.ActiveMQEndpointActivationKey;
import org.apache.activemq.ra.ActiveMQResourceAdapter;
import org.apache.activemq.ra.ServerSessionPoolImpl;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class ActiveMQEndpointWorker {
    private static final Log log = LogFactory.getLog((Class)ActiveMQEndpointWorker.class);
    public static final Method ON_MESSAGE_METHOD;
    private static final long INITIAL_RECONNECT_DELAY = 1000L;
    private static final long MAX_RECONNECT_DELAY = 30000L;
    private static final ThreadLocal threadLocal;
    protected ActiveMQResourceAdapter adapter;
    protected ActiveMQEndpointActivationKey endpointActivationKey;
    protected MessageEndpointFactory endpointFactory;
    protected WorkManager workManager;
    protected boolean transacted;
    private ConnectionConsumer consumer;
    private ServerSessionPoolImpl serverSessionPool;
    private ActiveMQDestination dest;
    private boolean running;
    private Work connectWork;
    protected ActiveMQConnection connection;
    private long reconnectDelay = 1000L;

    public static void safeClose(Session s) {
        try {
            if (s != null) {
                s.close();
            }
        }
        catch (JMSException jMSException) {
            // empty catch block
        }
    }

    public static void safeClose(Connection c) {
        try {
            if (c != null) {
                c.close();
            }
        }
        catch (JMSException jMSException) {
            // empty catch block
        }
    }

    public static void safeClose(ConnectionConsumer cc) {
        try {
            if (cc != null) {
                cc.close();
            }
        }
        catch (JMSException jMSException) {
            // empty catch block
        }
    }

    public ActiveMQEndpointWorker(final ActiveMQResourceAdapter adapter, ActiveMQEndpointActivationKey key) throws ResourceException {
        this.endpointActivationKey = key;
        this.adapter = adapter;
        this.endpointFactory = this.endpointActivationKey.getMessageEndpointFactory();
        this.workManager = adapter.getBootstrapContext().getWorkManager();
        try {
            this.transacted = this.endpointFactory.isDeliveryTransacted(ON_MESSAGE_METHOD);
        }
        catch (NoSuchMethodException e) {
            throw new ResourceException("Endpoint does not implement the onMessage method.");
        }
        this.connectWork = new Work(){

            public void release() {
            }

            public synchronized void run() {
                if (!ActiveMQEndpointWorker.this.isRunning()) {
                    return;
                }
                if (ActiveMQEndpointWorker.this.connection != null) {
                    return;
                }
                ActiveMQActivationSpec activationSpec = ActiveMQEndpointWorker.this.endpointActivationKey.getActivationSpec();
                try {
                    ActiveMQEndpointWorker.this.connection = adapter.makeConnection(activationSpec);
                    ActiveMQEndpointWorker.this.connection.start();
                    ActiveMQEndpointWorker.this.connection.setExceptionListener(new ExceptionListener(){

                        public void onException(JMSException error) {
                            if (!ActiveMQEndpointWorker.this.serverSessionPool.isClosing()) {
                                ActiveMQEndpointWorker.this.reconnect(error);
                            }
                        }
                    });
                    if (activationSpec.isDurableSubscription()) {
                        ActiveMQEndpointWorker.this.consumer = ActiveMQEndpointWorker.this.connection.createDurableConnectionConsumer((Topic)ActiveMQEndpointWorker.this.dest, activationSpec.getSubscriptionName(), ActiveMQEndpointWorker.this.emptyToNull(activationSpec.getMessageSelector()), (ServerSessionPool)ActiveMQEndpointWorker.this.serverSessionPool, activationSpec.getMaxMessagesPerSessionsIntValue(), activationSpec.getNoLocalBooleanValue());
                    } else {
                        ActiveMQEndpointWorker.this.consumer = ActiveMQEndpointWorker.this.connection.createConnectionConsumer((Destination)ActiveMQEndpointWorker.this.dest, ActiveMQEndpointWorker.this.emptyToNull(activationSpec.getMessageSelector()), (ServerSessionPool)ActiveMQEndpointWorker.this.serverSessionPool, activationSpec.getMaxMessagesPerSessionsIntValue(), activationSpec.getNoLocalBooleanValue());
                    }
                }
                catch (JMSException error) {
                    log.debug((Object)("Fail to to connect: " + (Object)((Object)error)), (Throwable)error);
                    ActiveMQEndpointWorker.this.reconnect(error);
                }
            }
        };
        ActiveMQActivationSpec activationSpec = this.endpointActivationKey.getActivationSpec();
        if ("javax.jms.Queue".equals(activationSpec.getDestinationType())) {
            this.dest = new ActiveMQQueue(activationSpec.getDestination());
        } else if ("javax.jms.Topic".equals(activationSpec.getDestinationType())) {
            this.dest = new ActiveMQTopic(activationSpec.getDestination());
        } else {
            throw new ResourceException("Unknown destination type: " + activationSpec.getDestinationType());
        }
    }

    public synchronized void start() throws WorkException, ResourceException {
        if (this.running) {
            return;
        }
        this.running = true;
        log.debug((Object)"Starting");
        this.serverSessionPool = new ServerSessionPoolImpl(this, this.endpointActivationKey.getActivationSpec().getMaxSessionsIntValue());
        this.connect();
        log.debug((Object)"Started");
    }

    public synchronized void stop() throws InterruptedException {
        if (!this.running) {
            return;
        }
        this.running = false;
        this.serverSessionPool.close();
        this.disconnect();
    }

    private boolean isRunning() {
        return this.running;
    }

    private synchronized void connect() {
        if (!this.running) {
            return;
        }
        try {
            this.workManager.scheduleWork(this.connectWork, Long.MAX_VALUE, null, null);
        }
        catch (WorkException e) {
            this.running = false;
            log.error((Object)"Work Manager did not accept work: ", (Throwable)e);
        }
    }

    private synchronized void disconnect() {
        ActiveMQEndpointWorker.safeClose(this.consumer);
        this.consumer = null;
        ActiveMQEndpointWorker.safeClose((Connection)this.connection);
        this.connection = null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void reconnect(JMSException error) {
        long reconnectDelay;
        log.debug((Object)"Reconnect cause: ", (Throwable)error);
        ActiveMQEndpointWorker activeMQEndpointWorker = this;
        synchronized (activeMQEndpointWorker) {
            reconnectDelay = this.reconnectDelay;
            if (reconnectDelay == 30000L) {
                log.error((Object)("Endpoint connection to JMS broker failed: " + error.getMessage()));
                log.error((Object)"Endpoint will try to reconnect to the JMS broker in 30 seconds");
            }
        }
        try {
            this.disconnect();
            Thread.sleep(reconnectDelay);
            activeMQEndpointWorker = this;
            synchronized (activeMQEndpointWorker) {
                this.reconnectDelay *= 2L;
                if (this.reconnectDelay > 30000L) {
                    this.reconnectDelay = 30000L;
                }
            }
            this.connect();
        }
        catch (InterruptedException e) {
            // empty catch block
        }
    }

    protected void registerThreadSession(Session session) {
        threadLocal.set(session);
    }

    protected void unregisterThreadSession(Session session) {
        threadLocal.set(null);
    }

    private String emptyToNull(String value) {
        if (value == null || value.length() == 0) {
            return null;
        }
        return value;
    }

    static {
        threadLocal = new ThreadLocal();
        try {
            ON_MESSAGE_METHOD = MessageListener.class.getMethod("onMessage", Message.class);
        }
        catch (Exception e) {
            throw new ExceptionInInitializerError(e);
        }
    }
}

