/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.kafka.core.reactive;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.receiver.ReceiverRecord;
import reactor.kafka.sender.TransactionManager;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;

public class ReactiveKafkaConsumerTemplate<K, V> {
    private final KafkaReceiver<K, V> kafkaReceiver;

    public ReactiveKafkaConsumerTemplate(ReceiverOptions<K, V> receiverOptions) {
        Assert.notNull(receiverOptions, (String)"Receiver options can not be null");
        this.kafkaReceiver = KafkaReceiver.create(receiverOptions);
    }

    public ReactiveKafkaConsumerTemplate(KafkaReceiver<K, V> kafkaReceiver) {
        Assert.notNull(kafkaReceiver, (String)"Kafka receiver can not be null");
        this.kafkaReceiver = kafkaReceiver;
    }

    public Flux<ReceiverRecord<K, V>> receive() {
        return this.kafkaReceiver.receive();
    }

    public Flux<ConsumerRecord<K, V>> receiveAutoAck() {
        return this.kafkaReceiver.receiveAutoAck().concatMap(Function.identity());
    }

    public Flux<ConsumerRecord<K, V>> receiveAtMostOnce() {
        return this.kafkaReceiver.receiveAtmostOnce();
    }

    public Flux<Flux<ConsumerRecord<K, V>>> receiveExactlyOnce(TransactionManager transactionManager) {
        return this.kafkaReceiver.receiveExactlyOnce(transactionManager);
    }

    public <T> Mono<T> doOnConsumer(Function<Consumer<K, V>, ? extends T> function) {
        return this.kafkaReceiver.doOnConsumer(function);
    }

    public Flux<TopicPartition> assignment() {
        Mono<Set> partitions = this.doOnConsumer(Consumer::assignment);
        return partitions.flatMapIterable(Function.identity());
    }

    public Flux<String> subscription() {
        Mono<Set> subscriptions = this.doOnConsumer(Consumer::subscription);
        return subscriptions.flatMapIterable(Function.identity());
    }

    public Mono<Void> seek(TopicPartition partition, long offset) {
        return this.doOnConsumer(consumer -> {
            consumer.seek(partition, offset);
            return null;
        });
    }

    public Mono<Void> seekToBeginning(TopicPartition ... partitions) {
        return this.doOnConsumer(consumer -> {
            consumer.seekToBeginning(Arrays.asList(partitions));
            return null;
        });
    }

    public Mono<Void> seekToEnd(TopicPartition ... partitions) {
        return this.doOnConsumer(consumer -> {
            consumer.seekToEnd(Arrays.asList(partitions));
            return null;
        });
    }

    public Mono<Long> position(TopicPartition partition) {
        return this.doOnConsumer(consumer -> consumer.position(partition));
    }

    public Mono<Map<TopicPartition, OffsetAndMetadata>> committed(Set<TopicPartition> partitions) {
        return this.doOnConsumer(consumer -> consumer.committed(partitions));
    }

    public Flux<PartitionInfo> partitionsFromConsumerFor(String topic) {
        Mono<List> partitions = this.doOnConsumer(c -> c.partitionsFor(topic));
        return partitions.flatMapIterable(Function.identity());
    }

    public Flux<TopicPartition> paused() {
        Mono<Set> paused = this.doOnConsumer(Consumer::paused);
        return paused.flatMapIterable(Function.identity());
    }

    public Mono<Void> pause(TopicPartition ... partitions) {
        return this.doOnConsumer(c -> {
            c.pause(Arrays.asList(partitions));
            return null;
        });
    }

    public Mono<Void> resume(TopicPartition ... partitions) {
        return this.doOnConsumer(c -> {
            c.resume(Arrays.asList(partitions));
            return null;
        });
    }

    public Flux<Tuple2<MetricName, ? extends Metric>> metricsFromConsumer() {
        return this.doOnConsumer(Consumer::metrics).flatMapIterable(Map::entrySet).map(m -> Tuples.of((Object)((MetricName)m.getKey()), (Object)((Metric)m.getValue())));
    }

    public Flux<Tuple2<String, List<PartitionInfo>>> listTopics() {
        return this.doOnConsumer(Consumer::listTopics).flatMapIterable(Map::entrySet).map(topicAndPartition -> Tuples.of((Object)((String)topicAndPartition.getKey()), (Object)((List)topicAndPartition.getValue())));
    }

    public Flux<Tuple2<TopicPartition, OffsetAndTimestamp>> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
        return this.doOnConsumer(c -> c.offsetsForTimes(timestampsToSearch)).flatMapIterable(Map::entrySet).map(partitionAndOffset -> Tuples.of((Object)((TopicPartition)partitionAndOffset.getKey()), (Object)((OffsetAndTimestamp)partitionAndOffset.getValue())));
    }

    public Flux<Tuple2<TopicPartition, Long>> beginningOffsets(TopicPartition ... partitions) {
        return this.doOnConsumer(c -> c.beginningOffsets(Arrays.asList(partitions))).flatMapIterable(Map::entrySet).map(partitionsOffsets -> Tuples.of((Object)((TopicPartition)partitionsOffsets.getKey()), (Object)((Long)partitionsOffsets.getValue())));
    }

    public Flux<Tuple2<TopicPartition, Long>> endOffsets(TopicPartition ... partitions) {
        return this.doOnConsumer(c -> c.endOffsets(Arrays.asList(partitions))).flatMapIterable(Map::entrySet).map(partitionsOffsets -> Tuples.of((Object)((TopicPartition)partitionsOffsets.getKey()), (Object)((Long)partitionsOffsets.getValue())));
    }
}

