/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.astyanax.recipes.queue;

import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
import com.netflix.astyanax.recipes.locks.BusyLockException;
import com.netflix.astyanax.recipes.queue.Message;
import com.netflix.astyanax.recipes.queue.MessageConsumer;
import com.netflix.astyanax.recipes.queue.MessageContext;
import com.netflix.astyanax.recipes.queue.MessageHandlerFactory;
import com.netflix.astyanax.recipes.queue.MessageQueue;
import com.netflix.astyanax.recipes.queue.MessageQueueException;
import com.netflix.astyanax.recipes.queue.MessageStatus;
import com.netflix.astyanax.recipes.queue.SimpleMessageHandlerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MessageQueueDispatcher {
    private static final Logger LOG = LoggerFactory.getLogger(MessageQueueDispatcher.class);
    public static final int DEFAULT_BATCH_SIZE = 5;
    public static final int DEFAULT_POLLING_INTERVAL = 1000;
    public static final int DEFAULT_THREAD_COUNT = 1;
    public static final int DEFAULT_CONSUMER_COUNT = 1;
    public static final int DEFAULT_ACK_SIZE = 100;
    public static final int DEFAULT_ACK_INTERVAL = 100;
    public static final int DEFAULT_BACKLOG_SIZE = 1000;
    private int processorThreadCount = 1;
    private int batchSize = 5;
    private int consumerCount = 1;
    private int ackSize = 100;
    private long ackInterval = 100L;
    private int backlogSize = 1000;
    private long pollingInterval = 1000L;
    private boolean terminate = false;
    private MessageQueue messageQueue;
    private ExecutorService executor;
    private MessageConsumer ackConsumer;
    private Function<MessageContext, Boolean> callback;
    private MessageHandlerFactory handlerFactory;
    private LinkedBlockingQueue<MessageContext> toAck = Queues.newLinkedBlockingQueue();
    private LinkedBlockingQueue<MessageContext> toProcess = Queues.newLinkedBlockingQueue((int)500);

    private MessageQueueDispatcher() {
    }

    private void initialize() {
        Preconditions.checkNotNull((Object)this.messageQueue, (Object)"Must specify message queue");
        if (this.handlerFactory == null) {
            this.handlerFactory = new SimpleMessageHandlerFactory();
        }
        this.toProcess = Queues.newLinkedBlockingQueue((int)this.backlogSize);
    }

    public void start() {
        int i;
        this.executor = Executors.newScheduledThreadPool(this.processorThreadCount + this.consumerCount + 1);
        this.startAckThread();
        for (i = 0; i < this.consumerCount; ++i) {
            this.startConsumer(i);
        }
        for (i = 0; i < this.processorThreadCount; ++i) {
            this.startProcessor(i);
        }
    }

    public void stop() {
        this.terminate = true;
        this.executor.shutdownNow();
    }

    private void startAckThread() {
        this.ackConsumer = this.messageQueue.createConsumer();
        this.executor.submit(new Runnable(){

            @Override
            public void run() {
                String name = StringUtils.join((Collection)Lists.newArrayList((Object[])new String[]{MessageQueueDispatcher.this.messageQueue.getName(), "Ack"}), (String)":");
                Thread.currentThread().setName(name);
                while (!MessageQueueDispatcher.this.terminate) {
                    try {
                        ArrayList messages = Lists.newArrayList();
                        MessageQueueDispatcher.this.toAck.drainTo(messages);
                        if (!messages.isEmpty()) {
                            try {
                                MessageQueueDispatcher.this.ackConsumer.ackMessages(messages);
                            }
                            catch (MessageQueueException e) {
                                MessageQueueDispatcher.this.toAck.addAll(messages);
                                LOG.warn("Failed to ack consumer", (Throwable)e);
                            }
                        }
                    }
                    catch (Throwable t) {
                        LOG.info("Error acking messages", t);
                    }
                    try {
                        Thread.sleep(MessageQueueDispatcher.this.ackInterval);
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        return;
                    }
                }
            }
        });
    }

    private void startConsumer(final int id) {
        this.executor.submit(new Runnable(){

            @Override
            public void run() {
                String name = StringUtils.join((Collection)Lists.newArrayList((Object[])new String[]{MessageQueueDispatcher.this.messageQueue.getName(), "Consumer", Integer.toString(id)}), (String)":");
                Thread.currentThread().setName(name);
                MessageConsumer consumer = MessageQueueDispatcher.this.messageQueue.createConsumer();
                while (!MessageQueueDispatcher.this.terminate) {
                    List<MessageContext> messages = null;
                    try {
                        messages = consumer.readMessages(MessageQueueDispatcher.this.batchSize);
                        if (messages.isEmpty()) {
                            Thread.sleep(MessageQueueDispatcher.this.pollingInterval);
                            continue;
                        }
                        for (MessageContext context : messages) {
                            MessageQueueDispatcher.this.toProcess.put(context);
                        }
                    }
                    catch (BusyLockException e) {
                        try {
                            Thread.sleep(MessageQueueDispatcher.this.pollingInterval);
                        }
                        catch (InterruptedException e1) {
                            Thread.interrupted();
                            return;
                        }
                    }
                    catch (Throwable t) {
                        LOG.warn("Error consuming messages ", t);
                    }
                }
            }
        });
    }

    private void startProcessor(final int id) {
        this.executor.submit(new Runnable(){

            /*
             * Enabled aggressive block sorting
             * Enabled unnecessary exception pruning
             * Enabled aggressive exception aggregation
             */
            @Override
            public void run() {
                String name = StringUtils.join((Collection)Lists.newArrayList((Object[])new String[]{MessageQueueDispatcher.this.messageQueue.getName(), "Processor", Integer.toString(id)}), (String)":");
                Thread.currentThread().setName(name);
                LOG.info("Starting message processor : " + name);
                try {
                    while (!MessageQueueDispatcher.this.terminate) {
                        MessageContext context;
                        try {
                            context = (MessageContext)MessageQueueDispatcher.this.toProcess.take();
                            if (context == null) {
                                continue;
                            }
                        }
                        catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            return;
                        }
                        Message message = context.getMessage();
                        try {
                            if (message.getTaskClass() != null) {
                                Function<MessageContext, Boolean> task = MessageQueueDispatcher.this.handlerFactory.createInstance(message.getTaskClass());
                                if (!((Boolean)task.apply((Object)context)).booleanValue()) continue;
                                MessageQueueDispatcher.this.toAck.add(context);
                                continue;
                            }
                            if (!((Boolean)MessageQueueDispatcher.this.callback.apply((Object)context)).booleanValue()) continue;
                            context.setStatus(MessageStatus.DONE);
                            MessageQueueDispatcher.this.toAck.add(context);
                        }
                        catch (Throwable t) {
                            context.setException(t);
                            MessageQueueDispatcher.this.toAck.add(context);
                            LOG.error("Error processing message " + message.getKey(), t);
                        }
                    }
                    return;
                }
                catch (Throwable t) {
                    LOG.error("Error running producer : " + name, t);
                }
            }
        });
    }

    public static class Builder {
        private final MessageQueueDispatcher dispatcher = new MessageQueueDispatcher();

        public Builder withMessageQueue(MessageQueue messageQueue) {
            this.dispatcher.messageQueue = messageQueue;
            return this;
        }

        public Builder withThreadCount(int threadCount) {
            return this.withProcessorThreadCount(threadCount);
        }

        public Builder withProcessorThreadCount(int threadCount) {
            this.dispatcher.processorThreadCount = threadCount;
            return this;
        }

        public Builder withBacklogSize(int size) {
            this.dispatcher.backlogSize = size;
            return this;
        }

        public Builder withConsumerCount(int consumerCount) {
            this.dispatcher.consumerCount = consumerCount;
            return this;
        }

        public Builder withBatchSize(int batchSize) {
            this.dispatcher.batchSize = batchSize;
            return this;
        }

        public Builder withAckInterval(long interval, TimeUnit units) {
            this.dispatcher.ackInterval = TimeUnit.MILLISECONDS.convert(interval, units);
            return this;
        }

        public Builder withPollingInterval(long interval, TimeUnit units) {
            this.dispatcher.pollingInterval = TimeUnit.MILLISECONDS.convert(interval, units);
            return this;
        }

        public Builder withCallback(Function<MessageContext, Boolean> callback) {
            this.dispatcher.callback = callback;
            return this;
        }

        public Builder withMessageHandlerFactory(MessageHandlerFactory factory) {
            this.dispatcher.handlerFactory = factory;
            return this;
        }

        public MessageQueueDispatcher build() {
            Preconditions.checkArgument((this.dispatcher.consumerCount <= this.dispatcher.processorThreadCount ? 1 : 0) != 0, (Object)"consumerCounter must be <= threadCount");
            this.dispatcher.initialize();
            return this.dispatcher;
        }
    }
}

