/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.kafka.requestreply;

import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.SmartLifecycle;
import org.springframework.kafka.KafkaException;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.BatchMessageListener;
import org.springframework.kafka.listener.GenericMessageListenerContainer;
import org.springframework.kafka.requestreply.CorrelationKey;
import org.springframework.kafka.requestreply.ReplyingKafkaOperations;
import org.springframework.kafka.requestreply.RequestReplyFuture;
import org.springframework.kafka.support.SendResult;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.ListenableFuture;

public class ReplyingKafkaTemplate<K, V, R>
extends KafkaTemplate<K, V>
implements BatchMessageListener<K, R>,
InitializingBean,
SmartLifecycle,
DisposableBean,
ReplyingKafkaOperations<K, V, R> {
    private static final long DEFAULT_REPLY_TIMEOUT = 5000L;
    private final GenericMessageListenerContainer<K, R> replyContainer;
    private final ConcurrentMap<CorrelationKey, RequestReplyFuture<K, V, R>> futures = new ConcurrentHashMap<CorrelationKey, RequestReplyFuture<K, V, R>>();
    private TaskScheduler scheduler = new ThreadPoolTaskScheduler();
    private int phase;
    private boolean autoStartup = true;
    private long replyTimeout = 5000L;
    private volatile boolean schedulerSet;
    private volatile boolean running;

    public ReplyingKafkaTemplate(ProducerFactory<K, V> producerFactory, GenericMessageListenerContainer<K, R> replyContainer) {
        this(producerFactory, replyContainer, false);
    }

    public ReplyingKafkaTemplate(ProducerFactory<K, V> producerFactory, GenericMessageListenerContainer<K, R> replyContainer, boolean autoFlush) {
        super(producerFactory, autoFlush);
        Assert.notNull(replyContainer, (String)"'replyContainer' cannot be null");
        this.replyContainer = replyContainer;
        this.replyContainer.setupMessageListener(this);
    }

    public void setTaskScheduler(TaskScheduler scheduler) {
        Assert.notNull((Object)scheduler, (String)"'scheduler' cannot be null");
        this.scheduler = scheduler;
        this.schedulerSet = true;
    }

    public void setReplyTimeout(long replyTimeout) {
        Assert.isTrue((replyTimeout >= 0L ? 1 : 0) != 0, (String)"'replyTimeout' must be >= 0");
        this.replyTimeout = replyTimeout;
    }

    public boolean isRunning() {
        return this.running;
    }

    public int getPhase() {
        return this.phase;
    }

    public void setPhase(int phase) {
        this.phase = phase;
    }

    public boolean isAutoStartup() {
        return this.autoStartup;
    }

    public void setAutoStartup(boolean autoStartup) {
        this.autoStartup = autoStartup;
    }

    public Collection<TopicPartition> getAssignedReplyTopicPartitions() {
        return this.replyContainer.getAssignedPartitions();
    }

    public void afterPropertiesSet() throws Exception {
        if (!this.schedulerSet) {
            ((ThreadPoolTaskScheduler)this.scheduler).initialize();
        }
    }

    public synchronized void start() {
        if (!this.running) {
            try {
                this.afterPropertiesSet();
            }
            catch (Exception e) {
                throw new KafkaException("Failed to initialize", e);
            }
            this.replyContainer.start();
            this.running = true;
        }
    }

    public synchronized void stop() {
        if (this.running) {
            this.running = false;
            this.replyContainer.stop();
            this.futures.clear();
        }
    }

    public void stop(Runnable callback) {
        this.stop();
        callback.run();
    }

    @Override
    public RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record) {
        Assert.state((boolean)this.running, (String)"Template has not been start()ed");
        CorrelationKey correlationId = this.createCorrelationId(record);
        Assert.notNull((Object)correlationId, (String)"the created 'correlationId' cannot be null");
        record.headers().add((Header)new RecordHeader("kafka_correlationId", correlationId.getCorrelationId()));
        if (this.logger.isDebugEnabled()) {
            this.logger.debug((Object)("Sending: " + record + " with correlationId: " + correlationId));
        }
        TemplateRequestReplyFuture future = new TemplateRequestReplyFuture();
        this.futures.put(correlationId, future);
        try {
            future.setSendFuture(this.send(record));
        }
        catch (Exception e) {
            this.futures.remove(correlationId);
            throw new KafkaException("Send failed", e);
        }
        this.scheduler.schedule(() -> {
            RequestReplyFuture removed = (RequestReplyFuture)((Object)((Object)this.futures.remove(correlationId)));
            if (removed != null) {
                if (this.logger.isWarnEnabled()) {
                    this.logger.warn((Object)("Reply timed out for: " + record + " with correlationId: " + correlationId));
                }
                removed.setException((Throwable)((Object)new KafkaException("Reply timed out")));
            }
        }, Instant.now().plusMillis(this.replyTimeout));
        return future;
    }

    public void destroy() throws Exception {
        if (!this.schedulerSet) {
            ((ThreadPoolTaskScheduler)this.scheduler).destroy();
        }
    }

    protected CorrelationKey createCorrelationId(ProducerRecord<K, V> record) {
        UUID uuid = UUID.randomUUID();
        byte[] bytes = new byte[16];
        ByteBuffer bb = ByteBuffer.wrap(bytes);
        bb.putLong(uuid.getMostSignificantBits());
        bb.putLong(uuid.getLeastSignificantBits());
        return new CorrelationKey(bytes);
    }

    @Override
    public void onMessage(List<ConsumerRecord<K, R>> data) {
        data.forEach(record -> {
            Iterator iterator = record.headers().iterator();
            CorrelationKey correlationId = null;
            while (correlationId == null && iterator.hasNext()) {
                Header next = (Header)iterator.next();
                if (!next.key().equals("kafka_correlationId")) continue;
                correlationId = new CorrelationKey(next.value());
            }
            if (correlationId == null) {
                this.logger.error((Object)("No correlationId found in reply: " + record + " - to use request/reply semantics, the responding server must return the correlation id  in the '" + "kafka_correlationId" + "' header"));
            } else {
                RequestReplyFuture future = (RequestReplyFuture)((Object)((Object)this.futures.remove(correlationId)));
                if (future == null) {
                    this.logger.error((Object)("No pending reply: " + record + " with correlationId: " + correlationId + ", perhaps timed out"));
                } else {
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug((Object)("Received: " + record + " with correlationId: " + correlationId));
                    }
                    future.set(record);
                }
            }
        });
    }

    public static class TemplateRequestReplyFuture<K, V, R>
    extends RequestReplyFuture<K, V, R> {
        TemplateRequestReplyFuture() {
        }

        @Override
        protected void setSendFuture(ListenableFuture<SendResult<K, V>> sendFuture) {
            super.setSendFuture(sendFuture);
        }
    }
}

