/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.amqp.rabbit.listener.adapter;

import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.lang.reflect.WildcardType;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Address;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.listener.adapter.InvocationResult;
import org.springframework.amqp.rabbit.listener.adapter.MonoHandler;
import org.springframework.amqp.rabbit.listener.adapter.ReplyFailureException;
import org.springframework.amqp.rabbit.listener.adapter.ReplyPostProcessor;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.support.ContainerUtils;
import org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter;
import org.springframework.amqp.rabbit.support.MessagePropertiesConverter;
import org.springframework.amqp.rabbit.support.RabbitExceptionTranslator;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.context.expression.MapAccessor;
import org.springframework.expression.BeanResolver;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.expression.ParserContext;
import org.springframework.expression.PropertyAccessor;
import org.springframework.expression.TypeConverter;
import org.springframework.expression.common.TemplateParserContext;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.expression.spel.support.StandardTypeConverter;
import org.springframework.retry.RecoveryCallback;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;

public abstract class AbstractAdaptableMessageListener
implements ChannelAwareMessageListener {
    private static final String DEFAULT_RESPONSE_ROUTING_KEY = "";
    private static final String DEFAULT_ENCODING = "UTF-8";
    private static final SpelExpressionParser PARSER = new SpelExpressionParser();
    private static final ParserContext PARSER_CONTEXT = new TemplateParserContext("!{", "}");
    static final boolean monoPresent = ClassUtils.isPresent((String)"reactor.core.publisher.Mono", (ClassLoader)ChannelAwareMessageListener.class.getClassLoader());
    protected final Log logger = LogFactory.getLog(this.getClass());
    private final StandardEvaluationContext evalContext = new StandardEvaluationContext();
    private String responseRoutingKey = "";
    private String responseExchange = null;
    private Address responseAddress = null;
    private Expression responseExpression;
    private boolean mandatoryPublish;
    private MessageConverter messageConverter = new SimpleMessageConverter();
    private volatile MessagePropertiesConverter messagePropertiesConverter = new DefaultMessagePropertiesConverter();
    private String encoding = "UTF-8";
    private MessagePostProcessor[] beforeSendReplyPostProcessors;
    private RetryTemplate retryTemplate;
    private RecoveryCallback<?> recoveryCallback;
    private boolean isManualAck;
    private boolean defaultRequeueRejected = true;
    private ReplyPostProcessor replyPostProcessor;
    private String replyContentType;
    private boolean converterWinsContentType = true;

    public void setResponseRoutingKey(String responseRoutingKey) {
        this.responseRoutingKey = responseRoutingKey;
    }

    public void setEncoding(String encoding) {
        this.encoding = encoding;
    }

    public String getEncoding() {
        return this.encoding;
    }

    public void setResponseExchange(String responseExchange) {
        this.responseExchange = responseExchange;
    }

    public void setResponseAddress(String defaultReplyTo) {
        if (defaultReplyTo.startsWith(PARSER_CONTEXT.getExpressionPrefix())) {
            this.responseExpression = PARSER.parseExpression(defaultReplyTo, PARSER_CONTEXT);
        } else {
            this.responseAddress = new Address(defaultReplyTo);
        }
    }

    public void setMandatoryPublish(boolean mandatoryPublish) {
        this.mandatoryPublish = mandatoryPublish;
    }

    public void setMessageConverter(MessageConverter messageConverter) {
        this.messageConverter = messageConverter;
    }

    public void setBeforeSendReplyPostProcessors(MessagePostProcessor ... beforeSendReplyPostProcessors) {
        Assert.noNullElements((Object[])beforeSendReplyPostProcessors, (String)"'replyPostProcessors' must not have any null elements");
        this.beforeSendReplyPostProcessors = Arrays.copyOf(beforeSendReplyPostProcessors, beforeSendReplyPostProcessors.length);
    }

    public void setRetryTemplate(RetryTemplate retryTemplate) {
        this.retryTemplate = retryTemplate;
    }

    public void setRecoveryCallback(RecoveryCallback<?> recoveryCallback) {
        this.recoveryCallback = recoveryCallback;
    }

    public void setBeanResolver(BeanResolver beanResolver) {
        this.evalContext.setBeanResolver(beanResolver);
        this.evalContext.setTypeConverter((TypeConverter)new StandardTypeConverter());
        this.evalContext.addPropertyAccessor((PropertyAccessor)new MapAccessor());
    }

    public void setReplyPostProcessor(ReplyPostProcessor replyPostProcessor) {
        this.replyPostProcessor = replyPostProcessor;
    }

    protected String getReplyContentType() {
        return this.replyContentType;
    }

    public void setReplyContentType(String replyContentType) {
        this.replyContentType = replyContentType;
    }

    protected boolean isConverterWinsContentType() {
        return this.converterWinsContentType;
    }

    public void setConverterWinsContentType(boolean converterWinsContentType) {
        this.converterWinsContentType = converterWinsContentType;
    }

    protected MessageConverter getMessageConverter() {
        return this.messageConverter;
    }

    public void setDefaultRequeueRejected(boolean defaultRequeueRejected) {
        this.defaultRequeueRejected = defaultRequeueRejected;
    }

    public void containerAckMode(AcknowledgeMode mode) {
        this.isManualAck = AcknowledgeMode.MANUAL.equals((Object)mode);
    }

    protected void handleListenerException(Throwable ex) {
        this.logger.error((Object)"Listener execution failed", ex);
    }

    protected Object extractMessage(Message message) {
        MessageConverter converter = this.getMessageConverter();
        if (converter != null) {
            return converter.fromMessage(message);
        }
        return message;
    }

    protected void handleResult(InvocationResult resultArg, Message request, Channel channel) {
        this.handleResult(resultArg, request, channel, null);
    }

    protected void handleResult(InvocationResult resultArg, Message request, Channel channel, Object source) {
        if (channel != null) {
            Object object = resultArg.getReturnValue();
            if (object instanceof CompletableFuture) {
                CompletableFuture completable = (CompletableFuture)object;
                if (!this.isManualAck) {
                    this.logger.warn((Object)"Container AcknowledgeMode must be MANUAL for a Future<?> return type; otherwise the container will ack the message immediately");
                }
                completable.whenComplete((r, t) -> {
                    if (t == null) {
                        this.asyncSuccess(resultArg, request, channel, source, r);
                        this.basicAck(request, channel);
                    } else {
                        this.asyncFailure(request, channel, (Throwable)t, source);
                    }
                });
            } else if (monoPresent && MonoHandler.isMono(resultArg.getReturnValue())) {
                if (!this.isManualAck) {
                    this.logger.warn((Object)"Container AcknowledgeMode must be MANUAL for a Mono<?> return type(or Kotlin suspend function); otherwise the container will ack the message immediately");
                }
                MonoHandler.subscribe(resultArg.getReturnValue(), r -> this.asyncSuccess(resultArg, request, channel, source, r), t -> this.asyncFailure(request, channel, (Throwable)t, source), () -> this.basicAck(request, channel));
            } else {
                this.doHandleResult(resultArg, request, channel, source);
            }
        } else if (this.logger.isWarnEnabled()) {
            this.logger.warn((Object)("Listener method returned result [" + resultArg + "]: not generating response message for it because no Rabbit Channel given"));
        }
    }

    private void asyncSuccess(InvocationResult resultArg, Message request, Channel channel, Object source, Object deferredResult) {
        if (deferredResult == null) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)"Async result is null, ignoring");
            }
        } else {
            Type[] actualTypeArguments;
            Type returnType = resultArg.getReturnType();
            if (returnType != null && !Object.class.getName().equals(returnType.getTypeName()) && (actualTypeArguments = ((ParameterizedType)returnType).getActualTypeArguments()).length > 0 && (returnType = actualTypeArguments[0]) instanceof WildcardType) {
                returnType = null;
            }
            this.doHandleResult(new InvocationResult(deferredResult, resultArg.getSendTo(), returnType, resultArg.getBean(), resultArg.getMethod()), request, channel, source);
        }
    }

    private void basicAck(Message request, Channel channel) {
        try {
            channel.basicAck(request.getMessageProperties().getDeliveryTag(), false);
        }
        catch (IOException e) {
            this.logger.error((Object)"Failed to ack message", (Throwable)e);
        }
    }

    protected void asyncFailure(Message request, Channel channel, Throwable t, Object source) {
        this.logger.error((Object)("Future, Mono, or suspend function was completed with an exception for " + request), t);
        try {
            channel.basicNack(request.getMessageProperties().getDeliveryTag(), false, ContainerUtils.shouldRequeue(this.defaultRequeueRejected, t, this.logger));
        }
        catch (IOException e) {
            this.logger.error((Object)"Failed to nack message", (Throwable)e);
        }
    }

    protected void doHandleResult(InvocationResult resultArg, Message request, Channel channel, Object source) {
        if (this.logger.isDebugEnabled()) {
            this.logger.debug((Object)("Listener method returned result [" + resultArg + "] - generating response message for it"));
        }
        try {
            Message response = this.buildMessage(channel, resultArg.getReturnValue(), resultArg.getReturnType());
            MessageProperties props = response.getMessageProperties();
            props.setTargetBean(resultArg.getBean());
            props.setTargetMethod(resultArg.getMethod());
            this.postProcessResponse(request, response);
            if (this.replyPostProcessor != null) {
                response = (Message)this.replyPostProcessor.apply(request, response);
            }
            Address replyTo = this.getReplyToAddress(request, source, resultArg);
            this.sendResponse(channel, replyTo, response);
        }
        catch (Exception ex) {
            throw new ReplyFailureException("Failed to send reply with payload '" + resultArg + "'", ex);
        }
    }

    protected String getReceivedExchange(Message request) {
        return request.getMessageProperties().getReceivedExchange();
    }

    protected Message buildMessage(Channel channel, Object result, Type genericType) {
        MessageConverter converter = this.getMessageConverter();
        if (converter != null && !(result instanceof Message)) {
            return this.convert(result, genericType, converter);
        }
        if (result instanceof Message) {
            Message msg = (Message)result;
            return msg;
        }
        throw new MessageConversionException("No MessageConverter specified - cannot handle message [" + result + "]");
    }

    protected Message convert(Object result, Type genericType, MessageConverter converter) {
        MessageProperties messageProperties = new MessageProperties();
        if (this.replyContentType != null) {
            messageProperties.setContentType(this.replyContentType);
        }
        Message message = converter.toMessage(result, messageProperties, genericType);
        if (this.replyContentType != null && !this.converterWinsContentType) {
            message.getMessageProperties().setContentType(this.replyContentType);
        }
        return message;
    }

    protected void postProcessResponse(Message request, Message response) {
        String messageId;
        String correlation = request.getMessageProperties().getCorrelationId();
        if (correlation == null && (messageId = request.getMessageProperties().getMessageId()) != null) {
            correlation = messageId;
        }
        response.getMessageProperties().setCorrelationId(correlation);
    }

    protected Address getReplyToAddress(Message request, Object source, InvocationResult result) {
        Address replyTo = request.getMessageProperties().getReplyToAddress();
        if (replyTo == null) {
            if (this.responseAddress == null && this.responseExchange != null) {
                this.responseAddress = new Address(this.responseExchange, this.responseRoutingKey);
            }
            if (result.getSendTo() != null) {
                replyTo = this.evaluateReplyTo(request, source, result.getReturnValue(), result.getSendTo());
            } else if (this.responseExpression != null) {
                replyTo = this.evaluateReplyTo(request, source, result.getReturnValue(), this.responseExpression);
            } else {
                if (this.responseAddress == null) {
                    throw new AmqpException("Cannot determine ReplyTo message property value: Request message does not contain reply-to property, and no default response Exchange was set.");
                }
                replyTo = this.responseAddress;
            }
        }
        return replyTo;
    }

    private Address evaluateReplyTo(Message request, Object source, Object result, Expression expression) {
        Address replyTo;
        Object value = expression.getValue((EvaluationContext)this.evalContext, (Object)new ReplyExpressionRoot(request, source, result));
        Assert.state((value instanceof String || value instanceof Address ? 1 : 0) != 0, (String)"response expression must evaluate to a String or Address");
        if (value instanceof String) {
            String sValue = (String)value;
            replyTo = new Address(sValue);
        } else {
            replyTo = (Address)value;
        }
        return replyTo;
    }

    protected void sendResponse(Channel channel, Address replyTo, Message messageIn) {
        Message message = messageIn;
        if (this.beforeSendReplyPostProcessors != null) {
            for (MessagePostProcessor postProcessor : this.beforeSendReplyPostProcessors) {
                message = postProcessor.postProcessMessage(message);
            }
        }
        this.postProcessChannel(channel, message);
        try {
            this.logger.debug((Object)("Publishing response to exchange = [" + replyTo.getExchangeName() + "], routingKey = [" + replyTo.getRoutingKey() + "]"));
            if (this.retryTemplate == null) {
                this.doPublish(channel, replyTo, message);
            } else {
                Message messageToSend = message;
                this.retryTemplate.execute(ctx -> {
                    this.doPublish(channel, replyTo, messageToSend);
                    return null;
                }, ctx -> {
                    if (this.recoveryCallback != null) {
                        ctx.setAttribute("message", (Object)messageToSend);
                        ctx.setAttribute("address", (Object)replyTo);
                        this.recoveryCallback.recover(ctx);
                        return null;
                    }
                    throw RabbitExceptionTranslator.convertRabbitAccessException(ctx.getLastThrowable());
                });
            }
        }
        catch (Exception ex) {
            throw RabbitExceptionTranslator.convertRabbitAccessException(ex);
        }
    }

    protected void doPublish(Channel channel, Address replyTo, Message message) throws IOException {
        channel.basicPublish(replyTo.getExchangeName(), replyTo.getRoutingKey(), this.mandatoryPublish, this.messagePropertiesConverter.fromMessageProperties(message.getMessageProperties(), this.encoding), message.getBody());
    }

    protected void postProcessChannel(Channel channel, Message response) {
    }

    public static final class ReplyExpressionRoot {
        private final Message request;
        private final Object source;
        private final Object result;

        protected ReplyExpressionRoot(Message request, Object source, Object result) {
            this.request = request;
            this.source = source;
            this.result = result;
        }

        public Message getRequest() {
            return this.request;
        }

        public Object getSource() {
            return this.source;
        }

        public Object getResult() {
            return this.result;
        }
    }
}

