/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.amqp.rabbit.listener.adapter;

import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.jspecify.annotations.Nullable;
import org.springframework.amqp.rabbit.batch.BatchingStrategy;
import org.springframework.amqp.rabbit.batch.SimpleBatchingStrategy;
import org.springframework.amqp.rabbit.listener.adapter.InvocationResult;
import org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter;
import org.springframework.amqp.rabbit.listener.adapter.MonoHandler;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareBatchMessageListener;
import org.springframework.amqp.rabbit.listener.api.RabbitListenerErrorHandler;
import org.springframework.amqp.rabbit.listener.support.ContainerUtils;
import org.springframework.amqp.rabbit.support.RabbitExceptionTranslator;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;

public class BatchMessagingMessageListenerAdapter
extends MessagingMessageListenerAdapter
implements ChannelAwareBatchMessageListener {
    private final BatchingStrategy batchingStrategy;

    public BatchMessagingMessageListenerAdapter(@Nullable Object bean, @Nullable Method method, boolean returnExceptions, @Nullable RabbitListenerErrorHandler errorHandler, @Nullable BatchingStrategy batchingStrategy) {
        super(bean, method, returnExceptions, errorHandler, true);
        this.batchingStrategy = batchingStrategy == null ? new SimpleBatchingStrategy(0, 0, 0L) : batchingStrategy;
    }

    @Override
    public void onMessageBatch(List<org.springframework.amqp.core.Message> messages, @Nullable Channel channel) {
        GenericMessage converted;
        if (this.messagingMessageConverter.isAmqpMessageList()) {
            converted = new GenericMessage(messages);
        } else {
            ArrayList messagingMessages = new ArrayList(messages.size());
            for (org.springframework.amqp.core.Message message : messages) {
                try {
                    Message<?> message2 = this.toMessagingMessage(message);
                    messagingMessages.add(message2);
                }
                catch (MessageConversionException messageConversionException) {
                    this.logger.error((Object)"Could not convert incoming message", (Throwable)messageConversionException);
                    try {
                        Assert.notNull((Object)channel, (String)"'channel' cannot be null");
                        channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
                    }
                    catch (Exception ex) {
                        this.logger.error((Object)"Failed to reject message with conversion error", (Throwable)ex);
                        throw messageConversionException;
                    }
                }
            }
            if (this.messagingMessageConverter.isMessageList()) {
                converted = new GenericMessage(messagingMessages);
            } else {
                ArrayList<Object> payloads = new ArrayList<Object>(messagingMessages.size());
                for (Message message : messagingMessages) {
                    payloads.add(message.getPayload());
                }
                converted = new GenericMessage(payloads);
            }
        }
        try {
            this.invokeHandlerAndProcessResult(messages, channel, (Message<?>)converted);
        }
        catch (Exception e) {
            throw RabbitExceptionTranslator.convertRabbitAccessException(e);
        }
    }

    private void invokeHandlerAndProcessResult(List<org.springframework.amqp.core.Message> amqpMessages, @Nullable Channel channel, Message<?> message) {
        InvocationResult result;
        if (this.logger.isDebugEnabled()) {
            this.logger.debug((Object)("Processing [" + String.valueOf(message) + "]"));
        }
        if ((result = this.invokeHandler(channel, message, true, amqpMessages.toArray(new org.springframework.amqp.core.Message[0]))).getReturnValue() != null) {
            this.handleResult(result, amqpMessages, channel);
        } else {
            this.logger.trace((Object)"No result object given - no result to handle");
        }
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private void handleResult(InvocationResult resultArg, List<org.springframework.amqp.core.Message> amqpMessages, @Nullable Channel channel) {
        if (channel != null) {
            Object object = resultArg.getReturnValue();
            if (object instanceof CompletableFuture) {
                CompletableFuture completable = (CompletableFuture)object;
                if (!this.isManualAck()) {
                    this.logger.warn((Object)"Container AcknowledgeMode must be MANUAL for a Future<?> return type; otherwise the container will ack the message immediately");
                }
                completable.whenComplete((r, t) -> {
                    if (t == null) {
                        amqpMessages.forEach(request -> this.basicAck((org.springframework.amqp.core.Message)request, channel));
                    } else {
                        this.asyncFailure(amqpMessages, channel, (Throwable)t);
                    }
                });
                return;
            } else {
                if (!monoPresent || !MonoHandler.isMono(resultArg.getReturnValue())) throw new IllegalStateException("The listener in batch mode does not support replies.");
                if (!this.isManualAck()) {
                    this.logger.warn((Object)"Container AcknowledgeMode must be MANUAL for a Mono<?> return type(or Kotlin suspend function); otherwise the container will ack the message immediately");
                }
                MonoHandler.subscribe(resultArg.getReturnValue(), null, t -> this.asyncFailure(amqpMessages, channel, (Throwable)t), () -> amqpMessages.forEach(request -> this.basicAck((org.springframework.amqp.core.Message)request, channel)));
            }
            return;
        } else {
            if (!this.logger.isWarnEnabled()) return;
            this.logger.warn((Object)("Listener method returned result [" + String.valueOf(resultArg) + "]: not generating response message for it because no Rabbit Channel given"));
        }
    }

    private void asyncFailure(List<org.springframework.amqp.core.Message> requests, Channel channel, Throwable t) {
        this.logger.error((Object)("Future, Mono, or suspend function was completed with an exception for " + String.valueOf(requests)), t);
        for (org.springframework.amqp.core.Message request : requests) {
            try {
                channel.basicNack(request.getMessageProperties().getDeliveryTag(), false, ContainerUtils.shouldRequeue(this.isDefaultRequeueRejected(), t, this.logger));
            }
            catch (IOException e) {
                this.logger.error((Object)"Failed to nack message", (Throwable)e);
            }
        }
    }

    @Override
    protected Message<?> toMessagingMessage(org.springframework.amqp.core.Message amqpMessage) {
        if (this.batchingStrategy.canDebatch(amqpMessage.getMessageProperties())) {
            if (this.messagingMessageConverter.isMessageList()) {
                ArrayList messages = new ArrayList();
                this.batchingStrategy.deBatch(amqpMessage, fragment -> messages.add(super.toMessagingMessage((org.springframework.amqp.core.Message)fragment)));
                return new GenericMessage(messages);
            }
            ArrayList list = new ArrayList();
            this.batchingStrategy.deBatch(amqpMessage, fragment -> list.add(this.messagingMessageConverter.extractPayload((org.springframework.amqp.core.Message)fragment)));
            return MessageBuilder.withPayload(list).copyHeaders((Map)this.messagingMessageConverter.getHeaderMapper().toHeaders((Object)amqpMessage.getMessageProperties())).build();
        }
        return super.toMessagingMessage(amqpMessage);
    }
}

