/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.amqp.rabbitmq.client.listener;

import com.rabbitmq.client.amqp.Connection;
import com.rabbitmq.client.amqp.Consumer;
import com.rabbitmq.client.amqp.Resource;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.aopalliance.aop.Advice;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jspecify.annotations.Nullable;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler;
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
import org.springframework.amqp.rabbit.listener.support.ContainerUtils;
import org.springframework.amqp.rabbitmq.client.AmqpConnectionFactory;
import org.springframework.amqp.rabbitmq.client.RabbitAmqpUtils;
import org.springframework.amqp.rabbitmq.client.listener.RabbitAmqpMessageListener;
import org.springframework.amqp.rabbitmq.client.listener.RabbitAmqpMessageListenerAdapter;
import org.springframework.amqp.support.postprocessor.MessagePostProcessorUtils;
import org.springframework.aop.Advisor;
import org.springframework.aop.framework.ProxyFactory;
import org.springframework.aop.support.DefaultPointcutAdvisor;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.core.log.LogAccessor;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.util.Assert;
import org.springframework.util.ErrorHandler;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.util.ObjectUtils;

public class RabbitAmqpListenerContainer
implements MessageListenerContainer,
BeanNameAware,
DisposableBean {
    private static final LogAccessor LOG = new LogAccessor(LogFactory.getLog(RabbitAmqpListenerContainer.class));
    private final Lock lock = new ReentrantLock();
    private final AmqpConnectionFactory connectionFactory;
    private final MultiValueMap<String, Consumer> queueToConsumers = new LinkedMultiValueMap();
    private String @Nullable [] queues;
    private Advice @Nullable [] adviceChain;
    private int initialCredits = 100;
    private int priority;
    private // Could not load outer class - annotation placement on inner may be incorrect
    Resource.StateListener @Nullable [] stateListeners;
    private boolean autoSettle = true;
    private boolean defaultRequeue = true;
    private int consumersPerQueue = 1;
    private @Nullable MessageListener messageListener;
    private @Nullable MessageListener proxy;
    private boolean asyncReplies;
    private ErrorHandler errorHandler = new ConditionalRejectingErrorHandler();
    private @Nullable Collection<MessagePostProcessor> afterReceivePostProcessors;
    private boolean autoStartup = true;
    private String beanName = "not.a.Spring.bean";
    private @Nullable String listenerId;
    private Duration gracefulShutdownPeriod = Duration.ofSeconds(30L);
    private int batchSize;
    private Duration batchReceiveDuration = Duration.ofSeconds(30L);
    private @Nullable TaskScheduler taskScheduler;
    private boolean internalTaskScheduler = true;

    public RabbitAmqpListenerContainer(AmqpConnectionFactory connectionFactory) {
        this.connectionFactory = connectionFactory;
    }

    public void setQueueNames(String ... queueNames) {
        this.queues = Arrays.copyOf(queueNames, queueNames.length);
    }

    public void setInitialCredits(int initialCredits) {
        this.initialCredits = initialCredits;
    }

    public void setPriority(int priority) {
        this.priority = priority;
    }

    public void setStateListeners(Resource.StateListener ... stateListeners) {
        this.stateListeners = Arrays.copyOf(stateListeners, stateListeners.length);
    }

    public void setAfterReceivePostProcessors(MessagePostProcessor ... afterReceivePostProcessors) {
        this.afterReceivePostProcessors = MessagePostProcessorUtils.sort(Arrays.asList(afterReceivePostProcessors));
    }

    public void setBatchSize(int batchSize) {
        Assert.isTrue((batchSize > 1 ? 1 : 0) != 0, (String)"'batchSize' must be greater than 1");
        this.batchSize = batchSize;
    }

    public void setBatchReceiveTimeout(long batchReceiveTimeout) {
        this.batchReceiveDuration = Duration.ofMillis(batchReceiveTimeout);
    }

    public void setTaskScheduler(TaskScheduler taskScheduler) {
        this.taskScheduler = taskScheduler;
        this.internalTaskScheduler = false;
    }

    public void setBeanName(String name) {
        this.beanName = name;
    }

    public void setAutoStartup(boolean autoStart) {
        this.autoStartup = autoStart;
    }

    public boolean isAutoStartup() {
        return this.autoStartup;
    }

    public void setAdviceChain(Advice ... advices) {
        Assert.notNull((Object)advices, (String)"'advices' cannot be null");
        Assert.noNullElements((Object[])advices, (String)"'advices' cannot have null elements");
        this.adviceChain = Arrays.copyOf(advices, advices.length);
    }

    public void setAutoSettle(boolean autoSettle) {
        this.autoSettle = autoSettle;
    }

    public void setDefaultRequeue(boolean defaultRequeue) {
        this.defaultRequeue = defaultRequeue;
    }

    public void setGracefulShutdownPeriod(Duration gracefulShutdownPeriod) {
        this.gracefulShutdownPeriod = gracefulShutdownPeriod;
    }

    public void setConsumersPerQueue(int consumersPerQueue) {
        this.consumersPerQueue = consumersPerQueue;
    }

    public void setErrorHandler(ErrorHandler errorHandler) {
        this.errorHandler = errorHandler;
    }

    public void setListenerId(String id) {
        this.listenerId = id;
    }

    public String getListenerId() {
        return this.listenerId != null ? this.listenerId : this.beanName;
    }

    public void setupMessageListener(MessageListener messageListener) {
        this.messageListener = messageListener;
        this.asyncReplies = messageListener.isAsyncReplies();
        Advice[] adviceArray = this.messageListener;
        if (adviceArray instanceof RabbitAmqpMessageListenerAdapter) {
            RabbitAmqpMessageListenerAdapter rabbitAmqpMessageListenerAdapter = (RabbitAmqpMessageListenerAdapter)adviceArray;
            rabbitAmqpMessageListenerAdapter.setConnectionFactory(this.connectionFactory);
        }
        this.proxy = this.messageListener;
        if (!ObjectUtils.isEmpty((Object[])this.adviceChain)) {
            ProxyFactory factory = new ProxyFactory((Object)messageListener);
            for (Advice advice : this.adviceChain) {
                factory.addAdvisor((Advisor)new DefaultPointcutAdvisor(advice));
            }
            factory.setInterfaces((Class[])messageListener.getClass().getInterfaces());
            this.proxy = (MessageListener)factory.getProxy(this.getClass().getClassLoader());
        }
    }

    public @Nullable Object getMessageListener() {
        return this.proxy;
    }

    public void afterPropertiesSet() {
        Assert.state((this.queues != null ? 1 : 0) != 0, (String)"At least one queue has to be provided for consuming.");
        Assert.state((this.messageListener != null ? 1 : 0) != 0, (String)"The 'messageListener' must be provided.");
        if (this.asyncReplies && this.autoSettle) {
            LOG.info((CharSequence)"Enforce MANUAL settlement for async replies.");
            this.autoSettle = false;
        }
        this.messageListener.containerAckMode(this.autoSettle ? AcknowledgeMode.AUTO : AcknowledgeMode.MANUAL);
        MessageListener messageListener = this.messageListener;
        if (messageListener instanceof RabbitAmqpMessageListenerAdapter) {
            RabbitAmqpMessageListenerAdapter adapter = (RabbitAmqpMessageListenerAdapter)messageListener;
            if (this.afterReceivePostProcessors != null) {
                adapter.setAfterReceivePostProcessors(this.afterReceivePostProcessors);
            }
        }
        if (this.batchSize > 1 && this.internalTaskScheduler) {
            ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
            threadPoolTaskScheduler.setThreadNamePrefix(this.getListenerId() + "-consumerMonitor-");
            threadPoolTaskScheduler.afterPropertiesSet();
            this.taskScheduler = threadPoolTaskScheduler;
        }
    }

    public boolean isRunning() {
        this.lock.lock();
        try {
            boolean bl = !this.queueToConsumers.isEmpty();
            return bl;
        }
        finally {
            this.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start() {
        this.lock.lock();
        try {
            if (this.queueToConsumers.isEmpty()) {
                Connection connection = this.connectionFactory.getConnection();
                for (String queue : this.queues) {
                    for (int i = 0; i < this.consumersPerQueue; ++i) {
                        Consumer consumer = connection.consumerBuilder().queue(queue).priority(this.priority).initialCredits(this.initialCredits).listeners(this.stateListeners).messageHandler((Consumer.MessageHandler)new ConsumerMessageHandler()).build();
                        this.queueToConsumers.add((Object)queue, (Object)consumer);
                    }
                }
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    private void invokeListener(Consumer.Context context, com.rabbitmq.client.amqp.Message amqpMessage) {
        try {
            this.doInvokeListener(context, amqpMessage);
            if (this.autoSettle) {
                context.accept();
            }
        }
        catch (Exception ex) {
            this.handleListenerError(ex, context, amqpMessage);
        }
    }

    private void doInvokeListener(Consumer.Context context, com.rabbitmq.client.amqp.Message amqpMessage) {
        // Could not load outer class - annotation placement on inner may be incorrect
        @Nullable Consumer.Context contextToUse = this.autoSettle ? null : context;
        MessageListener messageListener = this.proxy;
        if (messageListener instanceof RabbitAmqpMessageListener) {
            RabbitAmqpMessageListener amqpMessageListener = (RabbitAmqpMessageListener)messageListener;
            amqpMessageListener.onAmqpMessage(amqpMessage, contextToUse);
        } else {
            Message message = RabbitAmqpUtils.fromAmqpMessage(amqpMessage, contextToUse);
            this.proxy.onMessage(message);
        }
    }

    private void invokeBatchListener(Consumer.Context context, List<com.rabbitmq.client.amqp.Message> batch) {
        // Could not load outer class - annotation placement on inner may be incorrect
        @Nullable Consumer.Context contextToUse = this.autoSettle ? null : context;
        List<Message> messages = batch.stream().map(amqpMessage -> RabbitAmqpUtils.fromAmqpMessage(amqpMessage, contextToUse)).toList();
        try {
            this.doInvokeBatchListener(messages);
            if (this.autoSettle) {
                context.accept();
            }
        }
        catch (Exception ex) {
            this.handleListenerError(ex, context, batch);
        }
    }

    private void doInvokeBatchListener(List<Message> messages) {
        this.proxy.onMessageBatch(messages);
    }

    private void handleListenerError(Exception ex, Consumer.Context context, Object messageOrBatch) {
        block5: {
            try {
                this.errorHandler.handleError((Throwable)ex);
                if (!this.handleSpecialErrors(ex, context)) {
                    context.accept();
                }
            }
            catch (Exception rethrow) {
                if (this.handleSpecialErrors(rethrow, context)) break block5;
                if (this.defaultRequeue) {
                    context.requeue();
                } else {
                    context.discard();
                }
                LOG.error((Throwable)rethrow, () -> "The 'errorHandler' has thrown an exception. The '" + String.valueOf(messageOrBatch) + "' is " + (this.defaultRequeue ? "re-queued." : "discarded."));
            }
        }
    }

    private boolean handleSpecialErrors(Exception ex, Consumer.Context context) {
        if (ContainerUtils.shouldRequeue((boolean)this.defaultRequeue, (Throwable)ex, (Log)LOG.getLog())) {
            context.requeue();
            return true;
        }
        if (ContainerUtils.isAmqpReject((Throwable)ex)) {
            context.discard();
            return true;
        }
        if (ContainerUtils.isImmediateAcknowledge((Throwable)ex)) {
            context.accept();
            return true;
        }
        return false;
    }

    public void stop() {
        this.stop(() -> {});
    }

    public void stop(Runnable callback) {
        this.lock.lock();
        try {
            CompletableFuture[] completableFutures = (CompletableFuture[])this.queueToConsumers.values().stream().flatMap(Collection::stream).map(consumer -> CompletableFuture.supplyAsync(() -> {
                consumer.pause();
                try (Consumer twrVar0$ = consumer;){
                    while (consumer.unsettledMessageCount() > 0L) {
                        Thread.sleep(100L);
                    }
                }
                catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(ex);
                }
                return null;
            })).toArray(CompletableFuture[]::new);
            CompletableFuture.allOf(completableFutures).orTimeout(this.gracefulShutdownPeriod.toMillis(), TimeUnit.MILLISECONDS).whenComplete((unused, throwable) -> {
                this.queueToConsumers.clear();
                callback.run();
            });
        }
        finally {
            this.lock.unlock();
        }
    }

    public void pause() {
        this.queueToConsumers.values().stream().flatMap(Collection::stream).forEach(Consumer::pause);
    }

    public void resume() {
        this.queueToConsumers.values().stream().flatMap(Collection::stream).forEach(Consumer::unpause);
    }

    public void pause(String queueName) {
        List consumers = (List)this.queueToConsumers.get((Object)queueName);
        if (consumers != null) {
            consumers.forEach(Consumer::pause);
        }
    }

    public void resume(String queueName) {
        List consumers = (List)this.queueToConsumers.get((Object)queueName);
        if (consumers != null) {
            consumers.forEach(Consumer::unpause);
        }
    }

    public void destroy() {
        if (this.internalTaskScheduler && this.taskScheduler != null) {
            ((ThreadPoolTaskScheduler)this.taskScheduler).shutdown();
        }
    }

    private class ConsumerMessageHandler
    implements Consumer.MessageHandler {
        private volatile @Nullable ConsumerBatch consumerBatch;

        ConsumerMessageHandler() {
        }

        public void handle(Consumer.Context context, com.rabbitmq.client.amqp.Message message) {
            if (RabbitAmqpListenerContainer.this.batchSize > 1) {
                ConsumerBatch currentBatch = this.consumerBatch;
                if (currentBatch == null || currentBatch.batchReleaseFuture == null) {
                    this.consumerBatch = currentBatch = new ConsumerBatch(RabbitAmqpListenerContainer.this.batchSize, context.batch(RabbitAmqpListenerContainer.this.batchSize));
                }
                currentBatch.add(context, message);
                if (currentBatch.batchContext.size() == RabbitAmqpListenerContainer.this.batchSize) {
                    currentBatch.release();
                    this.consumerBatch = null;
                }
            } else {
                RabbitAmqpListenerContainer.this.invokeListener(context, message);
            }
        }

        private class ConsumerBatch {
            private final List<com.rabbitmq.client.amqp.Message> batch;
            private final Consumer.BatchContext batchContext;
            private volatile @Nullable ScheduledFuture<?> batchReleaseFuture;

            ConsumerBatch(int batchSize, Consumer.BatchContext batchContext) {
                this.batchContext = batchContext;
                this.batch = new ArrayList<com.rabbitmq.client.amqp.Message>(batchSize);
            }

            void add(Consumer.Context context, com.rabbitmq.client.amqp.Message message) {
                this.batchContext.add(context);
                this.batch.add(message);
                if (this.batchReleaseFuture == null) {
                    this.batchReleaseFuture = Objects.requireNonNull(RabbitAmqpListenerContainer.this.taskScheduler).schedule(this::releaseInternal, Instant.now().plus(RabbitAmqpListenerContainer.this.batchReceiveDuration));
                }
            }

            void release() {
                ScheduledFuture<?> currentBatchReleaseFuture = this.batchReleaseFuture;
                if (currentBatchReleaseFuture != null) {
                    currentBatchReleaseFuture.cancel(true);
                    this.releaseInternal();
                }
            }

            private void releaseInternal() {
                if (this.batchReleaseFuture != null) {
                    this.batchReleaseFuture = null;
                    RabbitAmqpListenerContainer.this.invokeBatchListener((Consumer.Context)this.batchContext, this.batch);
                }
            }
        }
    }
}

