package net.timewalker.ffmq4.local.destination;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.jms.JMSException;
import javax.jms.Topic;
import net.timewalker.ffmq4.FFMQException;
import net.timewalker.ffmq4.common.message.AbstractMessage;
import net.timewalker.ffmq4.common.message.MessageSelector;
import net.timewalker.ffmq4.common.message.selector.SelectorIndexKey;
import net.timewalker.ffmq4.local.MessageLockSet;
import net.timewalker.ffmq4.local.destination.subscription.LocalTopicSubscription;
import net.timewalker.ffmq4.local.session.LocalMessageConsumer;
import net.timewalker.ffmq4.local.session.LocalSession;
import net.timewalker.ffmq4.management.destination.definition.TopicDefinition;
import net.timewalker.ffmq4.storage.data.DataStoreFullException;
import net.timewalker.ffmq4.utils.Committable;
import net.timewalker.ffmq4.utils.ErrorTools;
import net.timewalker.ffmq4.utils.concurrent.SynchronizationBarrier;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:net/timewalker/ffmq4/local/destination/LocalTopic.class */
public final class LocalTopic extends AbstractLocalDestination implements Topic, LocalTopicMBean {
    private static final Log log = LogFactory.getLog(LocalTopic.class);
    private TopicDefinition topicDef;
    private Map<String, LocalTopicSubscription> subscriptionMap;
    private List<LocalTopicSubscription> flatSubscriptions;
    private Map<String, Map<Object, List<LocalTopicSubscription>>> indexedSubscriptionMap;
    private ReentrantReadWriteLock subscriptionsLock;
    private AtomicLong sentToTopicCount;
    private AtomicLong dispatchedFromTopicCount;
    private Set<Committable> committables;
    private boolean pendingChanges;

    public LocalTopic(TopicDefinition topicDefinition) {
        super(topicDefinition);
        this.subscriptionMap = new HashMap();
        this.flatSubscriptions = new ArrayList();
        this.indexedSubscriptionMap = new HashMap();
        this.subscriptionsLock = new ReentrantReadWriteLock();
        this.sentToTopicCount = new AtomicLong();
        this.dispatchedFromTopicCount = new AtomicLong();
        this.committables = new HashSet();
        this.topicDef = topicDefinition;
    }

    public TopicDefinition getDefinition() {
        return this.topicDef;
    }

    public String getTopicName() {
        return getName();
    }

    @Override // net.timewalker.ffmq4.local.destination.AbstractLocalDestination
    public void registerConsumer(LocalMessageConsumer localMessageConsumer) {
        super.registerConsumer(localMessageConsumer);
        this.subscriptionsLock.writeLock().lock();
        try {
            LocalTopicSubscription remove = this.subscriptionMap.remove(localMessageConsumer.getSubscriberId());
            if (remove == null) {
                storeNewSubscription(localMessageConsumer);
            } else {
                removeSubscription(remove);
                storeNewSubscription(localMessageConsumer);
            }
        } finally {
            this.subscriptionsLock.writeLock().unlock();
        }
    }

    private void storeNewSubscription(LocalMessageConsumer localMessageConsumer) {
        SelectorIndexKey findBestMatch;
        LocalTopicSubscription localTopicSubscription = new LocalTopicSubscription(localMessageConsumer);
        String[] partitionsKeysToIndex = this.topicDef.getPartitionsKeysToIndex();
        if (partitionsKeysToIndex != null && localTopicSubscription.getMessageSelector() != null && (findBestMatch = findBestMatch(partitionsKeysToIndex, localTopicSubscription.getMessageSelector().getIndexableKeys())) != null) {
            localTopicSubscription.setIndexKey(findBestMatch);
            addToIndexMap(localTopicSubscription);
        }
        if (!localTopicSubscription.isIndexed()) {
            this.flatSubscriptions.add(localTopicSubscription);
        }
        this.subscriptionMap.put(localMessageConsumer.getSubscriberId(), localTopicSubscription);
    }

    private void addToIndexMap(LocalTopicSubscription localTopicSubscription) {
        SelectorIndexKey indexKey = localTopicSubscription.getIndexKey();
        Map<Object, List<LocalTopicSubscription>> map = this.indexedSubscriptionMap.get(indexKey.getHeaderName());
        if (map == null) {
            map = new HashMap();
            this.indexedSubscriptionMap.put(indexKey.getHeaderName(), map);
        }
        for (Object obj : indexKey.getValues()) {
            List<LocalTopicSubscription> list = map.get(obj);
            if (list == null) {
                list = new ArrayList(4);
                map.put(obj, list);
            }
            list.add(localTopicSubscription);
        }
    }

    private boolean removeFromIndexMap(LocalTopicSubscription localTopicSubscription) {
        SelectorIndexKey indexKey = localTopicSubscription.getIndexKey();
        if (indexKey == null) {
            return true;
        }
        Map<Object, List<LocalTopicSubscription>> map = this.indexedSubscriptionMap.get(indexKey.getHeaderName());
        if (map == null) {
            return false;
        }
        Object[] values = indexKey.getValues();
        int i = 0;
        for (Object obj : values) {
            List<LocalTopicSubscription> list = map.get(obj);
            if (list != null && list.remove(localTopicSubscription)) {
                i++;
                if (list.isEmpty()) {
                    map.remove(obj);
                    if (map.isEmpty()) {
                        this.indexedSubscriptionMap.remove(indexKey.getHeaderName());
                    }
                }
            }
        }
        return i == values.length;
    }

    private SelectorIndexKey findBestMatch(String[] strArr, List<SelectorIndexKey> list) {
        for (String str : strArr) {
            for (SelectorIndexKey selectorIndexKey : list) {
                if (selectorIndexKey.getHeaderName().equals(str)) {
                    return selectorIndexKey;
                }
            }
        }
        return null;
    }

    @Override // net.timewalker.ffmq4.local.destination.AbstractLocalDestination
    public void unregisterConsumer(LocalMessageConsumer localMessageConsumer) {
        super.unregisterConsumer(localMessageConsumer);
        if (localMessageConsumer.isDurable()) {
            return;
        }
        log.debug("Removing non-durable subscription " + localMessageConsumer.getSubscriberId());
        this.subscriptionsLock.writeLock().lock();
        try {
            LocalTopicSubscription remove = this.subscriptionMap.remove(localMessageConsumer.getSubscriberId());
            if (remove != null) {
                removeSubscription(remove);
            }
        } finally {
            this.subscriptionsLock.writeLock().unlock();
        }
    }

    public void unsubscribe(String str, String str2) throws JMSException {
        String str3 = str + "-" + str2;
        this.subscriptionsLock.writeLock().lock();
        try {
            LocalTopicSubscription localTopicSubscription = this.subscriptionMap.get(str3);
            if (localTopicSubscription == null) {
                return;
            }
            if (isConsumerRegistered(str3)) {
                throw new FFMQException("Subscription " + str2 + " is still in use", "SUBSCRIPTION_STILL_IN_USE");
            }
            this.subscriptionMap.remove(str3);
            removeSubscription(localTopicSubscription);
            this.subscriptionsLock.writeLock().unlock();
        } finally {
            this.subscriptionsLock.writeLock().unlock();
        }
    }

    private void removeSubscription(LocalTopicSubscription localTopicSubscription) {
        if (localTopicSubscription.isIndexed()) {
            if (!removeFromIndexMap(localTopicSubscription)) {
                throw new IllegalStateException("Cannot find subscription in index map : " + localTopicSubscription);
            }
        } else if (!this.flatSubscriptions.remove(localTopicSubscription)) {
            throw new IllegalStateException("Cannot find subscription in non-indexed list : " + localTopicSubscription);
        }
    }

    @Override // net.timewalker.ffmq4.local.destination.AbstractLocalDestination
    public boolean putLocked(AbstractMessage abstractMessage, LocalSession localSession, MessageLockSet messageLockSet) throws JMSException {
        checkNotClosed();
        checkTransactionLock();
        if (!this.topicDef.supportDeliveryMode(abstractMessage.getJMSDeliveryMode())) {
            throw new FFMQException("Topic does not support this delivery mode : " + (abstractMessage.getJMSDeliveryMode() == 1 ? "DeliveryMode.NON_PERSISTENT" : "DeliveryMode.PERSISTENT"), "INVALID_DELIVERY_MODE");
        }
        this.sentToTopicCount.incrementAndGet();
        boolean z = false;
        this.subscriptionsLock.readLock().lock();
        try {
            if (!this.indexedSubscriptionMap.isEmpty()) {
                abstractMessage.ensureDeserializationLevel(2);
                for (Map.Entry<String, Map<Object, List<LocalTopicSubscription>>> entry : this.indexedSubscriptionMap.entrySet()) {
                    String key = entry.getKey();
                    Object jMSCorrelationID = key.equals("JMSCorrelationID") ? abstractMessage.getJMSCorrelationID() : abstractMessage.getObjectProperty(key);
                    if (jMSCorrelationID != null) {
                        List<LocalTopicSubscription> list = entry.getValue().get(normalizeLiteralValue(jMSCorrelationID));
                        if (list != null && pushToSubscriptions(abstractMessage, localSession, messageLockSet, list)) {
                            z = true;
                        }
                    }
                }
            }
            if (!this.flatSubscriptions.isEmpty()) {
                if (pushToSubscriptions(abstractMessage, localSession, messageLockSet, this.flatSubscriptions)) {
                    z = true;
                }
            }
            return z;
        } finally {
            this.subscriptionsLock.readLock().unlock();
        }
    }

    private Object normalizeLiteralValue(Object obj) {
        if (obj instanceof Number) {
            if (!(obj instanceof Long) && !(obj instanceof Double)) {
                if ((obj instanceof Byte) || (obj instanceof Short) || (obj instanceof Integer)) {
                    return Long.valueOf(((Number) obj).longValue());
                }
                if (obj instanceof Float) {
                    return Double.valueOf(((Number) obj).doubleValue());
                }
            }
            return obj;
        }
        return obj;
    }

    private boolean pushToSubscriptions(AbstractMessage abstractMessage, LocalSession localSession, MessageLockSet messageLockSet, List<LocalTopicSubscription> list) throws JMSException {
        String id = localSession.getConnection().getId();
        boolean z = false;
        for (int i = 0; i < list.size(); i++) {
            LocalTopicSubscription localTopicSubscription = list.get(i);
            if (!localTopicSubscription.getNoLocal() || !localTopicSubscription.getConnectionID().equals(id)) {
                try {
                    MessageSelector messageSelector = localTopicSubscription.getMessageSelector();
                    if (messageSelector != null) {
                        abstractMessage.ensureDeserializationLevel(2);
                        if (!messageSelector.matches(abstractMessage)) {
                        }
                    }
                    LocalQueue localQueue = localTopicSubscription.getLocalQueue();
                    if (localQueue.requiresTransactionalUpdate() && localTopicSubscription.isDurable()) {
                        if (this.committables.add(localQueue)) {
                            localQueue.openTransaction();
                        }
                        if (!localQueue.putLocked(abstractMessage, localSession, messageLockSet) && abstractMessage.getJMSDeliveryMode() == 2) {
                            throw new IllegalStateException("Should require a commit");
                        }
                        this.pendingChanges = true;
                        z = true;
                    } else if (localQueue.putLocked(abstractMessage, localSession, messageLockSet)) {
                        throw new IllegalStateException("Should not require a commit");
                    }
                    this.dispatchedFromTopicCount.incrementAndGet();
                } catch (DataStoreFullException e) {
                    processPutError(localTopicSubscription.getSubscriberId(), e, getDefinition().getSubscriberOverflowPolicy());
                } catch (JMSException e2) {
                    processPutError(localTopicSubscription.getSubscriberId(), e2, getDefinition().getSubscriberFailurePolicy());
                }
            }
        }
        return z;
    }

    private void processPutError(String str, JMSException jMSException, int i) throws JMSException {
        if ((i & 1) > 0) {
            ErrorTools.log("subscriber=" + str, jMSException, log);
        }
        if ((i & 2) > 0) {
            throw jMSException;
        }
    }

    @Override // net.timewalker.ffmq4.local.destination.LocalDestinationMBean
    public int getSize() {
        int i = 0;
        this.subscriptionsLock.readLock().lock();
        try {
            Iterator<LocalTopicSubscription> it = this.subscriptionMap.values().iterator();
            while (it.hasNext()) {
                i += it.next().getLocalQueue().getSize();
            }
            return i;
        } finally {
            this.subscriptionsLock.readLock().unlock();
        }
    }

    @Override // net.timewalker.ffmq4.local.destination.AbstractLocalDestination, net.timewalker.ffmq4.local.destination.LocalDestinationMBean
    public void resetStats() {
        super.resetStats();
        this.sentToTopicCount.set(0L);
        this.dispatchedFromTopicCount.set(0L);
    }

    @Override // net.timewalker.ffmq4.local.destination.LocalTopicMBean
    public long getSentToTopicCount() {
        return this.sentToTopicCount.get();
    }

    @Override // net.timewalker.ffmq4.local.destination.LocalTopicMBean
    public long getDispatchedFromTopicCount() {
        return this.dispatchedFromTopicCount.get();
    }

    @Override // net.timewalker.ffmq4.local.destination.LocalTopicMBean
    public int getSubscriptionsCount() {
        return this.subscriptionMap.size();
    }

    @Override // net.timewalker.ffmq4.local.destination.LocalTopicMBean
    public int getIndexedSubscriptionsCount() {
        return this.indexedSubscriptionMap.size();
    }

    public String toString() {
        return "Topic{" + getName() + "}[size=" + getSize() + ",consumers=" + this.localConsumers.size() + ",in=" + this.sentToTopicCount + ",out=" + this.dispatchedFromTopicCount + "]";
    }

    public String getConsumersSummary() {
        StringBuilder sb = new StringBuilder();
        this.subscriptionsLock.readLock().lock();
        try {
            int i = 0;
            for (LocalTopicSubscription localTopicSubscription : this.subscriptionMap.values()) {
                int i2 = i;
                i++;
                if (i2 > 0) {
                    sb.append("\n");
                }
                sb.append(localTopicSubscription);
            }
            return sb.toString();
        } finally {
            this.subscriptionsLock.readLock().unlock();
        }
    }

    @Override // net.timewalker.ffmq4.local.destination.AbstractLocalDestination
    protected boolean requiresTransactionalUpdate() {
        return this.topicDef.hasPersistentStore();
    }

    @Override // net.timewalker.ffmq4.local.destination.AbstractLocalDestination
    protected boolean hasPendingChanges() {
        return this.pendingChanges;
    }

    @Override // net.timewalker.ffmq4.local.destination.AbstractLocalDestination
    public final void close() throws JMSException {
        synchronized (this.closeLock) {
            if (this.closed) {
                return;
            }
            this.closed = true;
        }
    }

    @Override // net.timewalker.ffmq4.utils.Committable
    public void commitChanges(SynchronizationBarrier synchronizationBarrier) throws JMSException {
        checkNotClosed();
        checkTransactionLock();
        if (this.committables.isEmpty()) {
            return;
        }
        long currentTimeMillis = System.currentTimeMillis();
        Iterator<Committable> it = this.committables.iterator();
        while (it.hasNext()) {
            it.next().commitChanges(synchronizationBarrier);
        }
        notifyCommitTime(System.currentTimeMillis() - currentTimeMillis);
        this.pendingChanges = false;
    }

    @Override // net.timewalker.ffmq4.local.destination.AbstractLocalDestination, net.timewalker.ffmq4.utils.Committable
    public void closeTransaction() {
        if (!this.committables.isEmpty()) {
            Iterator<Committable> it = this.committables.iterator();
            while (it.hasNext()) {
                it.next().closeTransaction();
            }
            this.committables.clear();
        }
        super.closeTransaction();
    }
}
