/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.processor.aggregate.hazelcast;

import com.hazelcast.config.Config;
import com.hazelcast.config.XmlConfigBuilder;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.cp.lock.FencedLock;
import com.hazelcast.transaction.TransactionContext;
import com.hazelcast.transaction.TransactionOptions;
import com.hazelcast.transaction.TransactionalMap;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.processor.aggregate.hazelcast.HazelcastAggregationRepository;
import org.apache.camel.spi.OptimisticLockingAggregationRepository;
import org.apache.camel.support.DefaultExchangeHolder;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.StringHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReplicatedHazelcastAggregationRepository
extends HazelcastAggregationRepository {
    private static final Logger LOG = LoggerFactory.getLogger((String)ReplicatedHazelcastAggregationRepository.class.getName());
    protected Map<String, DefaultExchangeHolder> replicatedCache;
    protected Map<String, DefaultExchangeHolder> replicatedPersistedCache;

    public ReplicatedHazelcastAggregationRepository(String repositoryName) {
        super(repositoryName);
    }

    public ReplicatedHazelcastAggregationRepository(String repositoryName, String persistentRepositoryName) {
        super(repositoryName, persistentRepositoryName);
    }

    public ReplicatedHazelcastAggregationRepository(String repositoryName, boolean optimistic) {
        super(repositoryName, optimistic);
    }

    public ReplicatedHazelcastAggregationRepository(String repositoryName, String persistentRepositoryName, boolean optimistic) {
        super(repositoryName, persistentRepositoryName, optimistic);
    }

    public ReplicatedHazelcastAggregationRepository(String repositoryName, HazelcastInstance hzInstanse) {
        super(repositoryName, hzInstanse);
    }

    public ReplicatedHazelcastAggregationRepository(String repositoryName, String persistentRepositoryName, HazelcastInstance hzInstanse) {
        super(repositoryName, persistentRepositoryName, hzInstanse);
    }

    public ReplicatedHazelcastAggregationRepository(String repositoryName, boolean optimistic, HazelcastInstance hzInstance) {
        super(repositoryName, optimistic, hzInstance);
    }

    public ReplicatedHazelcastAggregationRepository(String repositoryName, String persistentRepositoryName, boolean optimistic, HazelcastInstance hzInstance) {
        super(repositoryName, persistentRepositoryName, optimistic, hzInstance);
    }

    @Override
    public Exchange add(CamelContext camelContext, String key, Exchange oldExchange, Exchange newExchange) throws OptimisticLockingAggregationRepository.OptimisticLockingException {
        if (!this.optimistic) {
            throw new UnsupportedOperationException();
        }
        LOG.trace("Adding an Exchange with ID {} for key {} in an optimistic manner.", (Object)newExchange.getExchangeId(), (Object)key);
        if (oldExchange == null) {
            DefaultExchangeHolder holder = DefaultExchangeHolder.marshal((Exchange)newExchange, (boolean)true, (boolean)this.allowSerializedHeaders);
            DefaultExchangeHolder misbehaviorHolder = this.replicatedCache.putIfAbsent(key, holder);
            if (misbehaviorHolder != null) {
                Exchange misbehaviorEx = this.unmarshallExchange(camelContext, misbehaviorHolder);
                LOG.error("Optimistic locking failed for exchange with key {}: IMap#putIfAbsend returned Exchange with ID {}, while it's expected no exchanges to be returned", (Object)key, (Object)(misbehaviorEx != null ? misbehaviorEx.getExchangeId() : "<null>"));
                throw new OptimisticLockingAggregationRepository.OptimisticLockingException();
            }
        } else {
            DefaultExchangeHolder newHolder;
            DefaultExchangeHolder oldHolder = DefaultExchangeHolder.marshal((Exchange)oldExchange, (boolean)true, (boolean)this.allowSerializedHeaders);
            if (!this.replicatedCache.replace(key, oldHolder, newHolder = DefaultExchangeHolder.marshal((Exchange)newExchange, (boolean)true, (boolean)this.allowSerializedHeaders))) {
                LOG.error("Optimistic locking failed for exchange with key {}: IMap#replace returned no Exchanges, while it's expected to replace one", (Object)key);
                throw new OptimisticLockingAggregationRepository.OptimisticLockingException();
            }
        }
        LOG.trace("Added an Exchange with ID {} for key {} in optimistic manner.", (Object)newExchange.getExchangeId(), (Object)key);
        return oldExchange;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Exchange add(CamelContext camelContext, String key, Exchange exchange) {
        if (this.optimistic) {
            throw new UnsupportedOperationException();
        }
        LOG.trace("Adding an Exchange with ID {} for key {} in a thread-safe manner.", (Object)exchange.getExchangeId(), (Object)key);
        FencedLock l = this.hzInstance.getCPSubsystem().getLock(this.mapName);
        try {
            l.lock();
            DefaultExchangeHolder newHolder = DefaultExchangeHolder.marshal((Exchange)exchange, (boolean)true, (boolean)this.allowSerializedHeaders);
            DefaultExchangeHolder oldHolder = this.replicatedCache.put(key, newHolder);
            Exchange exchange2 = this.unmarshallExchange(camelContext, oldHolder);
            return exchange2;
        }
        finally {
            LOG.trace("Added an Exchange with ID {} for key {} in a thread-safe manner.", (Object)exchange.getExchangeId(), (Object)key);
            l.unlock();
        }
    }

    @Override
    public Set<String> scan(CamelContext camelContext) {
        if (this.useRecovery) {
            LOG.trace("Scanning for exchanges to recover in {} context", (Object)camelContext.getName());
            Set<String> scanned = Collections.unmodifiableSet(this.replicatedPersistedCache.keySet());
            LOG.trace("Found {} keys for exchanges to recover in {} context", (Object)scanned.size(), (Object)camelContext.getName());
            return scanned;
        }
        LOG.warn("What for to run recovery scans in {} context while repository {} is running in non-recoverable aggregation repository mode?!", (Object)camelContext.getName(), (Object)this.mapName);
        return Collections.emptySet();
    }

    @Override
    public Exchange recover(CamelContext camelContext, String exchangeId) {
        LOG.trace("Recovering an Exchange with ID {}.", (Object)exchangeId);
        return this.useRecovery ? this.unmarshallExchange(camelContext, this.replicatedPersistedCache.get(exchangeId)) : null;
    }

    @Override
    public Exchange get(CamelContext camelContext, String key) {
        return this.unmarshallExchange(camelContext, this.replicatedCache.get(key));
    }

    @Override
    public boolean containsKey(Object key) {
        if (this.replicatedCache != null) {
            return this.replicatedCache.containsKey(key);
        }
        return false;
    }

    @Override
    public void remove(CamelContext camelContext, String key, Exchange exchange) {
        DefaultExchangeHolder holder = DefaultExchangeHolder.marshal((Exchange)exchange, (boolean)true, (boolean)this.allowSerializedHeaders);
        if (this.optimistic) {
            LOG.trace("Removing an exchange with ID {} for key {} in an optimistic manner.", (Object)exchange.getExchangeId(), (Object)key);
            if (!this.replicatedCache.remove(key, holder)) {
                LOG.error("Optimistic locking failed for exchange with key {}: IMap#remove removed no Exchanges, while it's expected to remove one.", (Object)key);
                throw new OptimisticLockingAggregationRepository.OptimisticLockingException();
            }
            LOG.trace("Removed an exchange with ID {} for key {} in an optimistic manner.", (Object)exchange.getExchangeId(), (Object)key);
            if (this.useRecovery) {
                LOG.trace("Putting an exchange with ID {} for key {} into a recoverable storage in an optimistic manner.", (Object)exchange.getExchangeId(), (Object)key);
                this.replicatedPersistedCache.put(exchange.getExchangeId(), holder);
                LOG.trace("Put an exchange with ID {} for key {} into a recoverable storage in an optimistic manner.", (Object)exchange.getExchangeId(), (Object)key);
            }
        } else if (this.useRecovery) {
            LOG.trace("Removing an exchange with ID {} for key {} in a thread-safe manner.", (Object)exchange.getExchangeId(), (Object)key);
            TransactionOptions tOpts = new TransactionOptions();
            tOpts.setTransactionType(TransactionOptions.TransactionType.ONE_PHASE);
            TransactionContext tCtx = this.hzInstance.newTransactionContext(tOpts);
            try {
                tCtx.beginTransaction();
                TransactionalMap tCache = tCtx.getMap(this.mapName);
                TransactionalMap tPersistentCache = tCtx.getMap(this.persistenceMapName);
                DefaultExchangeHolder removedHolder = (DefaultExchangeHolder)tCache.remove((Object)key);
                LOG.trace("Putting an exchange with ID {} for key {} into a recoverable storage in a thread-safe manner.", (Object)exchange.getExchangeId(), (Object)key);
                tPersistentCache.put((Object)exchange.getExchangeId(), (Object)removedHolder);
                tCtx.commitTransaction();
                LOG.trace("Removed an exchange with ID {} for key {} in a thread-safe manner.", (Object)exchange.getExchangeId(), (Object)key);
                LOG.trace("Put an exchange with ID {} for key {} into a recoverable storage in a thread-safe manner.", (Object)exchange.getExchangeId(), (Object)key);
            }
            catch (Throwable throwable) {
                tCtx.rollbackTransaction();
                String msg = String.format("Transaction with ID %s was rolled back for remove operation with a key %s and an Exchange ID %s.", tCtx.getTxnId(), key, exchange.getExchangeId());
                LOG.warn(msg, throwable);
                throw new RuntimeException(msg, throwable);
            }
        } else {
            this.replicatedCache.remove(key);
        }
    }

    @Override
    public void confirm(CamelContext camelContext, String exchangeId) {
        LOG.trace("Confirming an exchange with ID {}.", (Object)exchangeId);
        if (this.useRecovery) {
            this.replicatedPersistedCache.remove(exchangeId);
        }
    }

    @Override
    public Set<String> getKeys() {
        return Collections.unmodifiableSet(this.replicatedCache.keySet());
    }

    @Override
    protected void doStart() throws Exception {
        if (this.maximumRedeliveries < 0) {
            throw new IllegalArgumentException("Maximum redelivery retries must be zero or a positive integer.");
        }
        if (this.recoveryInterval < 0L) {
            throw new IllegalArgumentException("Recovery interval must be zero or a positive integer.");
        }
        StringHelper.notEmpty((String)this.mapName, (String)"repositoryName");
        if (this.useLocalHzInstance) {
            Config cfg = new XmlConfigBuilder().build();
            cfg.setProperty("hazelcast.version.check.enabled", "false");
            this.hzInstance = Hazelcast.newHazelcastInstance((Config)cfg);
        } else {
            ObjectHelper.notNull((Object)this.hzInstance, (String)"hzInstanse");
        }
        this.replicatedCache = this.hzInstance.getReplicatedMap(this.mapName);
        if (this.useRecovery) {
            this.replicatedPersistedCache = this.hzInstance.getReplicatedMap(this.persistenceMapName);
        }
    }
}

