/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.kinesis.spout;

import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
import java.math.BigInteger;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import org.apache.storm.kinesis.spout.KinesisConfig;
import org.apache.storm.kinesis.spout.KinesisConnection;
import org.apache.storm.kinesis.spout.KinesisMessageId;
import org.apache.storm.kinesis.spout.ZKConnection;
import org.apache.storm.spout.SpoutOutputCollector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class KinesisRecordsManager {
    private static final Logger LOG = LoggerFactory.getLogger(KinesisRecordsManager.class);
    private transient ZKConnection zkConnection;
    private transient KinesisConnection kinesisConnection;
    private final transient KinesisConfig kinesisConfig;
    private transient Map<String, LinkedList<Record>> toEmitPerShard = new HashMap<String, LinkedList<Record>>();
    private transient Map<KinesisMessageId, Record> failedandFetchedRecords = new HashMap<KinesisMessageId, Record>();
    private transient Map<String, TreeSet<BigInteger>> emittedPerShard = new HashMap<String, TreeSet<BigInteger>>();
    private transient Map<String, TreeSet<BigInteger>> ackedPerShard = new HashMap<String, TreeSet<BigInteger>>();
    private transient Map<String, TreeSet<BigInteger>> failedPerShard = new HashMap<String, TreeSet<BigInteger>>();
    private transient Map<String, String> shardIteratorPerShard = new HashMap<String, String>();
    private transient Map<String, String> fetchedSequenceNumberPerShard = new HashMap<String, String>();
    private transient Map<KinesisMessageId, String> shardIteratorPerFailedMessage = new HashMap<KinesisMessageId, String>();
    private transient long lastCommitTime;
    private transient boolean deactivated;

    KinesisRecordsManager(KinesisConfig kinesisConfig) {
        this.kinesisConfig = kinesisConfig;
        this.zkConnection = new ZKConnection(kinesisConfig.getZkInfo());
        this.kinesisConnection = new KinesisConnection(kinesisConfig.getKinesisConnectionInfo());
    }

    void initialize(int myTaskIndex, int totalTasks) {
        this.deactivated = false;
        this.lastCommitTime = System.currentTimeMillis();
        this.kinesisConnection.initialize();
        this.zkConnection.initialize();
        List<Shard> shards = this.kinesisConnection.getShardsForStream(this.kinesisConfig.getStreamName());
        LOG.info("myTaskIndex is " + myTaskIndex);
        LOG.info("totalTasks is " + totalTasks);
        for (int i = myTaskIndex; i < shards.size(); i += totalTasks) {
            LOG.info("Shard id " + shards.get(i).getShardId() + " assigned to task " + myTaskIndex);
            this.toEmitPerShard.put(shards.get(i).getShardId(), new LinkedList());
        }
        this.initializeFetchedSequenceNumbers();
        this.refreshShardIteratorsForNewRecords();
    }

    void next(SpoutOutputCollector collector) {
        KinesisMessageId failedMessageId;
        if (this.shouldCommit()) {
            this.commit();
        }
        if ((failedMessageId = this.kinesisConfig.getFailedMessageRetryHandler().getNextFailedMessageToRetry()) != null) {
            BigInteger failedSequenceNumber = new BigInteger(failedMessageId.getSequenceNumber());
            if (this.failedPerShard.containsKey(failedMessageId.getShardId()) && this.failedPerShard.get(failedMessageId.getShardId()).contains(failedSequenceNumber)) {
                if (!this.failedandFetchedRecords.containsKey(failedMessageId)) {
                    this.fetchFailedRecords(failedMessageId);
                }
                if (this.emitFailedRecord(collector, failedMessageId)) {
                    this.failedPerShard.get(failedMessageId.getShardId()).remove(failedSequenceNumber);
                    this.kinesisConfig.getFailedMessageRetryHandler().failedMessageEmitted(failedMessageId);
                    return;
                }
                LOG.warn("failedMessageEmitted not called on retrier for " + failedMessageId + ". This can happen a few times but should not happen infinitely");
            } else {
                LOG.warn("failedPerShard does not contain " + failedMessageId + ". This should never happen.");
            }
        }
        LOG.debug("No failed record to emit for now. Hence will try to emit new records");
        if (this.getUncommittedRecordsCount() >= this.kinesisConfig.getMaxUncommittedRecords()) {
            LOG.warn("maximum uncommitted records count has reached. so not emitting any new records and returning");
            return;
        }
        if (this.toEmitPerShard.isEmpty()) {
            LOG.warn("No shard is assigned to this task. Hence not emitting any tuple.");
            return;
        }
        if (this.shouldFetchNewRecords()) {
            this.fetchNewRecords();
        }
        this.emitNewRecord(collector);
    }

    void ack(KinesisMessageId kinesisMessageId) {
        String shardId = kinesisMessageId.getShardId();
        BigInteger sequenceNumber = new BigInteger(kinesisMessageId.getSequenceNumber());
        LOG.debug("Ack received for shardId: " + shardId + " sequenceNumber: " + sequenceNumber);
        if (!this.ackedPerShard.containsKey(shardId)) {
            this.ackedPerShard.put(shardId, new TreeSet());
        }
        this.ackedPerShard.get(shardId).add(sequenceNumber);
        if (this.emittedPerShard.containsKey(shardId)) {
            TreeSet<BigInteger> emitted = this.emittedPerShard.get(shardId);
            emitted.remove(sequenceNumber);
        }
        if (this.failedPerShard.containsKey(shardId)) {
            this.failedPerShard.get(shardId).remove(sequenceNumber);
        }
        if (this.failedandFetchedRecords.containsKey(kinesisMessageId)) {
            this.kinesisConfig.getFailedMessageRetryHandler().acked(kinesisMessageId);
            this.failedandFetchedRecords.remove(kinesisMessageId);
        }
        if (this.deactivated) {
            this.commit();
        }
    }

    void fail(KinesisMessageId kinesisMessageId) {
        String shardId = kinesisMessageId.getShardId();
        BigInteger sequenceNumber = new BigInteger(kinesisMessageId.getSequenceNumber());
        LOG.debug("Fail received for shardId: " + shardId + " sequenceNumber: " + sequenceNumber);
        if (this.kinesisConfig.getFailedMessageRetryHandler().failed(kinesisMessageId)) {
            if (!this.failedPerShard.containsKey(shardId)) {
                this.failedPerShard.put(shardId, new TreeSet());
            }
            this.failedPerShard.get(shardId).add(sequenceNumber);
            TreeSet<BigInteger> emitted = this.emittedPerShard.get(shardId);
            emitted.remove(sequenceNumber);
        } else {
            this.ack(kinesisMessageId);
        }
        if (this.deactivated) {
            this.commit();
        }
    }

    void commit() {
        for (String shardId : this.toEmitPerShard.keySet()) {
            if (!this.ackedPerShard.containsKey(shardId)) continue;
            BigInteger commitSequenceNumberBound = null;
            if (this.failedPerShard.containsKey(shardId) && !this.failedPerShard.get(shardId).isEmpty()) {
                commitSequenceNumberBound = this.failedPerShard.get(shardId).first();
            }
            if (this.emittedPerShard.containsKey(shardId) && !this.emittedPerShard.get(shardId).isEmpty()) {
                BigInteger smallestEmittedSequenceNumber = this.emittedPerShard.get(shardId).first();
                if (commitSequenceNumberBound == null || commitSequenceNumberBound.compareTo(smallestEmittedSequenceNumber) == 1) {
                    commitSequenceNumberBound = smallestEmittedSequenceNumber;
                }
            }
            Iterator<BigInteger> ackedSequenceNumbers = this.ackedPerShard.get(shardId).iterator();
            BigInteger ackedSequenceNumberToCommit = null;
            while (ackedSequenceNumbers.hasNext()) {
                BigInteger ackedSequenceNumber = ackedSequenceNumbers.next();
                if (commitSequenceNumberBound != null && commitSequenceNumberBound.compareTo(ackedSequenceNumber) != 1) break;
                ackedSequenceNumberToCommit = ackedSequenceNumber;
                ackedSequenceNumbers.remove();
            }
            if (ackedSequenceNumberToCommit == null) continue;
            HashMap<Object, Object> state = new HashMap<Object, Object>();
            state.put("committedSequenceNumber", ackedSequenceNumberToCommit.toString());
            LOG.debug("Committing sequence number " + ackedSequenceNumberToCommit.toString() + " for shardId " + shardId);
            this.zkConnection.commitState(this.kinesisConfig.getStreamName(), shardId, state);
        }
        this.lastCommitTime = System.currentTimeMillis();
    }

    void activate() {
        LOG.info("Activate called");
        this.deactivated = false;
        this.kinesisConnection.initialize();
    }

    void deactivate() {
        LOG.info("Deactivate called");
        this.deactivated = true;
        this.commit();
        this.kinesisConnection.shutdown();
    }

    void close() {
        this.commit();
        this.kinesisConnection.shutdown();
        this.zkConnection.shutdown();
    }

    private void fetchFailedRecords(KinesisMessageId kinesisMessageId) {
        if (!this.shardIteratorPerFailedMessage.containsKey(kinesisMessageId)) {
            this.refreshShardIteratorForFailedRecord(kinesisMessageId);
        }
        String shardIterator = this.shardIteratorPerFailedMessage.get(kinesisMessageId);
        LOG.debug("Fetching failed records for shard id :" + kinesisMessageId.getShardId() + " at sequence number " + kinesisMessageId.getSequenceNumber() + " using shardIterator " + shardIterator);
        try {
            GetRecordsResult getRecordsResult = this.kinesisConnection.fetchRecords(shardIterator);
            if (getRecordsResult != null) {
                List records = getRecordsResult.getRecords();
                LOG.debug("Records size from fetchFailedRecords is " + records.size());
                this.shardIteratorPerFailedMessage.put(kinesisMessageId, getRecordsResult.getNextShardIterator());
                if (records.size() == 0) {
                    LOG.warn("No records returned from kinesis. Hence sleeping for 1 second");
                    Thread.sleep(1000L);
                } else {
                    for (Record record : records) {
                        KinesisMessageId current = new KinesisMessageId(kinesisMessageId.getStreamName(), kinesisMessageId.getShardId(), record.getSequenceNumber());
                        if (!this.failedPerShard.get(kinesisMessageId.getShardId()).contains(new BigInteger(current.getSequenceNumber()))) continue;
                        this.failedandFetchedRecords.put(current, record);
                        this.shardIteratorPerFailedMessage.remove(current);
                    }
                }
            }
        }
        catch (InterruptedException ie) {
            LOG.warn("Thread interrupted while sleeping", (Throwable)ie);
        }
        catch (ExpiredIteratorException ex) {
            LOG.warn("shardIterator for failedRecord " + kinesisMessageId + " has expired. Refreshing shardIterator");
            this.refreshShardIteratorForFailedRecord(kinesisMessageId);
        }
        catch (ProvisionedThroughputExceededException pe) {
            try {
                LOG.warn("ProvisionedThroughputExceededException occured. Check your limits. Sleeping for 1 second.", (Throwable)pe);
                Thread.sleep(1000L);
            }
            catch (InterruptedException e) {
                LOG.warn("Thread interrupted exception", (Throwable)e);
            }
        }
    }

    private void fetchNewRecords() {
        for (Map.Entry<String, LinkedList<Record>> entry : this.toEmitPerShard.entrySet()) {
            String shardId = entry.getKey();
            try {
                String shardIterator = this.shardIteratorPerShard.get(shardId);
                LOG.debug("Fetching new records for shard id :" + shardId + " using shardIterator " + shardIterator + " after sequence number " + this.fetchedSequenceNumberPerShard.get(shardId));
                GetRecordsResult getRecordsResult = this.kinesisConnection.fetchRecords(shardIterator);
                if (getRecordsResult == null) continue;
                List records = getRecordsResult.getRecords();
                LOG.debug("Records size from fetchNewRecords is " + records.size());
                this.shardIteratorPerShard.put(shardId, getRecordsResult.getNextShardIterator());
                if (records.size() == 0) {
                    LOG.warn("No records returned from kinesis. Hence sleeping for 1 second");
                    Thread.sleep(1000L);
                    continue;
                }
                entry.getValue().addAll(records);
                this.fetchedSequenceNumberPerShard.put(shardId, ((Record)records.get(records.size() - 1)).getSequenceNumber());
            }
            catch (InterruptedException ie) {
                LOG.warn("Thread interrupted while sleeping", (Throwable)ie);
            }
            catch (ExpiredIteratorException ex) {
                LOG.warn("shardIterator for shardId " + shardId + " has expired. Refreshing shardIterator");
                this.refreshShardIteratorForNewRecords(shardId);
            }
            catch (ProvisionedThroughputExceededException pe) {
                try {
                    LOG.warn("ProvisionedThroughputExceededException occured. Check your limits. Sleeping for 1 second.", (Throwable)pe);
                    Thread.sleep(1000L);
                }
                catch (InterruptedException e) {
                    LOG.warn("Thread interrupted exception", (Throwable)e);
                }
            }
        }
    }

    private void emitNewRecord(SpoutOutputCollector collector) {
        for (Map.Entry<String, LinkedList<Record>> entry : this.toEmitPerShard.entrySet()) {
            Record record;
            String shardId = entry.getKey();
            LinkedList<Record> listOfRecords = entry.getValue();
            while ((record = listOfRecords.pollFirst()) != null) {
                KinesisMessageId kinesisMessageId = new KinesisMessageId(this.kinesisConfig.getStreamName(), shardId, record.getSequenceNumber());
                if (!this.emitRecord(collector, record, kinesisMessageId)) continue;
                return;
            }
        }
    }

    private boolean emitFailedRecord(SpoutOutputCollector collector, KinesisMessageId kinesisMessageId) {
        if (!this.failedandFetchedRecords.containsKey(kinesisMessageId)) {
            return false;
        }
        return this.emitRecord(collector, this.failedandFetchedRecords.get(kinesisMessageId), kinesisMessageId);
    }

    private boolean emitRecord(SpoutOutputCollector collector, Record record, KinesisMessageId kinesisMessageId) {
        boolean result = false;
        List<Object> tuple = this.kinesisConfig.getRecordToTupleMapper().getTuple(record);
        if (tuple != null && tuple.size() > 0) {
            collector.emit(tuple, (Object)kinesisMessageId);
            if (!this.emittedPerShard.containsKey(kinesisMessageId.getShardId())) {
                this.emittedPerShard.put(kinesisMessageId.getShardId(), new TreeSet());
            }
            this.emittedPerShard.get(kinesisMessageId.getShardId()).add(new BigInteger(record.getSequenceNumber()));
            result = true;
        } else {
            LOG.warn("Record " + record + " did not return a tuple to emit. Hence acking it");
            this.ack(kinesisMessageId);
        }
        return result;
    }

    private boolean shouldCommit() {
        return System.currentTimeMillis() - this.lastCommitTime >= this.kinesisConfig.getZkInfo().getCommitIntervalMs();
    }

    private void initializeFetchedSequenceNumbers() {
        for (String shardId : this.toEmitPerShard.keySet()) {
            Map<Object, Object> state = this.zkConnection.readState(this.kinesisConfig.getStreamName(), shardId);
            if (state == null) continue;
            Object committedSequenceNumber = state.get("committedSequenceNumber");
            LOG.info("State read is committedSequenceNumber: " + committedSequenceNumber + " shardId:" + shardId);
            if (committedSequenceNumber == null) continue;
            this.fetchedSequenceNumberPerShard.put(shardId, (String)committedSequenceNumber);
        }
    }

    private void refreshShardIteratorsForNewRecords() {
        for (String shardId : this.toEmitPerShard.keySet()) {
            this.refreshShardIteratorForNewRecords(shardId);
        }
    }

    private void refreshShardIteratorForNewRecords(String shardId) {
        String shardIterator = null;
        String lastFetchedSequenceNumber = this.fetchedSequenceNumberPerShard.get(shardId);
        ShardIteratorType shardIteratorType = lastFetchedSequenceNumber == null ? this.kinesisConfig.getShardIteratorType() : ShardIteratorType.AFTER_SEQUENCE_NUMBER;
        shardIterator = this.kinesisConnection.getShardIterator(this.kinesisConfig.getStreamName(), shardId, shardIteratorType, lastFetchedSequenceNumber, this.kinesisConfig.getTimestamp());
        if (shardIterator != null && !shardIterator.isEmpty()) {
            LOG.warn("Refreshing shard iterator for new records for shardId " + shardId + " with shardIterator " + shardIterator);
            this.shardIteratorPerShard.put(shardId, shardIterator);
        }
    }

    private void refreshShardIteratorForFailedRecord(KinesisMessageId kinesisMessageId) {
        String shardIterator = null;
        shardIterator = this.kinesisConnection.getShardIterator(this.kinesisConfig.getStreamName(), kinesisMessageId.getShardId(), ShardIteratorType.AT_SEQUENCE_NUMBER, kinesisMessageId.getSequenceNumber(), null);
        if (shardIterator != null && !shardIterator.isEmpty()) {
            LOG.warn("Refreshing shard iterator for failed records for message " + kinesisMessageId + " with shardIterator " + shardIterator);
            this.shardIteratorPerFailedMessage.put(kinesisMessageId, shardIterator);
        }
    }

    private Long getUncommittedRecordsCount() {
        Long result = 0L;
        for (Map.Entry<String, TreeSet<BigInteger>> emitted : this.emittedPerShard.entrySet()) {
            result = result + (long)emitted.getValue().size();
        }
        for (Map.Entry<String, TreeSet<BigInteger>> acked : this.ackedPerShard.entrySet()) {
            result = result + (long)acked.getValue().size();
        }
        for (Map.Entry<String, TreeSet<BigInteger>> failed : this.failedPerShard.entrySet()) {
            result = result + (long)failed.getValue().size();
        }
        LOG.debug("Returning uncommittedRecordsCount as " + result);
        return result;
    }

    private boolean shouldFetchNewRecords() {
        boolean fetchRecords = true;
        for (Map.Entry<String, LinkedList<Record>> entry : this.toEmitPerShard.entrySet()) {
            if (entry.getValue().isEmpty()) continue;
            fetchRecords = false;
            break;
        }
        return fetchRecords;
    }
}

