/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.kinesis.spout;

import java.io.Serializable;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
import org.apache.storm.kinesis.spout.FailedMessageRetryHandler;
import org.apache.storm.kinesis.spout.KinesisMessageId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ExponentialBackoffRetrier
implements FailedMessageRetryHandler,
Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(ExponentialBackoffRetrier.class);
    private final Long initialDelayMillis;
    private final Long baseSeconds;
    private final Long maxRetries;
    private Map<KinesisMessageId, Long> failCounts = new HashMap<KinesisMessageId, Long>();
    private Map<KinesisMessageId, Long> retryTimes = new HashMap<KinesisMessageId, Long>();
    private SortedSet<KinesisMessageId> retryMessageSet = new TreeSet<KinesisMessageId>(new RetryTimeComparator());

    public ExponentialBackoffRetrier() {
        this(100L, 2L, Long.MAX_VALUE);
    }

    public ExponentialBackoffRetrier(Long initialDelayMillis, Long baseSeconds, Long maxRetries) {
        this.initialDelayMillis = initialDelayMillis;
        this.baseSeconds = baseSeconds;
        this.maxRetries = maxRetries;
        this.validate();
    }

    private void validate() {
        if (this.initialDelayMillis < 0L) {
            throw new IllegalArgumentException("initialDelayMillis cannot be negative.");
        }
        if (this.baseSeconds < 0L) {
            throw new IllegalArgumentException("baseSeconds cannot be negative.");
        }
        if (this.maxRetries < 0L) {
            throw new IllegalArgumentException("maxRetries cannot be negative.");
        }
    }

    @Override
    public boolean failed(KinesisMessageId messageId) {
        LOG.debug("Handling failed message " + messageId);
        if (this.maxRetries == 0L) {
            LOG.warn("maxRetries set to 0. Hence not queueing " + messageId);
            return false;
        }
        if (!this.failCounts.containsKey(messageId)) {
            this.failCounts.put(messageId, 0L);
        }
        Long failCount = this.failCounts.get(messageId);
        failCount = failCount + 1L;
        this.failCounts.put(messageId, failCount);
        if (failCount > this.maxRetries) {
            LOG.warn("maxRetries reached so dropping " + messageId);
            this.failCounts.remove(messageId);
            return false;
        }
        this.retryTimes.put(messageId, this.getRetryTime(failCount));
        this.retryMessageSet.add(messageId);
        LOG.debug("Scheduled " + messageId + " for retry at " + this.retryTimes.get(messageId) + " and retry attempt " + failCount);
        return true;
    }

    @Override
    public void acked(KinesisMessageId messageId) {
        LOG.debug("Ack received for " + messageId + ". Hence cleaning state.");
        this.failCounts.remove(messageId);
    }

    @Override
    public KinesisMessageId getNextFailedMessageToRetry() {
        KinesisMessageId result = null;
        if (!this.retryMessageSet.isEmpty() && this.retryTimes.get(result = this.retryMessageSet.first()) > System.nanoTime()) {
            result = null;
        }
        LOG.debug("Returning " + result + " to spout for retrying.");
        return result;
    }

    @Override
    public void failedMessageEmitted(KinesisMessageId messageId) {
        LOG.debug("Spout says " + messageId + " emitted. Hence removing it from queue and wait for its ack or fail");
        this.retryMessageSet.remove(messageId);
        this.retryTimes.remove(messageId);
    }

    private Long getRetryTime(Long retryNum) {
        Long retryTime = System.nanoTime();
        Long nanoMultiplierForMillis = 1000000L;
        if (retryNum == 1L) {
            retryTime = retryTime + this.initialDelayMillis * nanoMultiplierForMillis;
        } else {
            Long maxValue = Long.MAX_VALUE;
            double time = Math.pow(this.baseSeconds.longValue(), retryNum - 1L) * 1000.0 * (double)nanoMultiplierForMillis.longValue();
            retryTime = time >= maxValue.doubleValue() || retryTime + (long)time < retryTime ? maxValue : Long.valueOf(retryTime + (long)time);
        }
        return retryTime;
    }

    private class RetryTimeComparator
    implements Serializable,
    Comparator<KinesisMessageId> {
        private RetryTimeComparator() {
        }

        @Override
        public int compare(KinesisMessageId o1, KinesisMessageId o2) {
            return ((Long)ExponentialBackoffRetrier.this.retryTimes.get(o1)).compareTo((Long)ExponentialBackoffRetrier.this.retryTimes.get(o2));
        }
    }
}

