package org.apache.kafka.storage.internals.log;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.stream.Stream;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
import org.apache.kafka.server.util.ShutdownableThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/storage/internals/log/RemoteIndexCache.class */
public class RemoteIndexCache implements Closeable {
    private static final Logger log = LoggerFactory.getLogger(RemoteIndexCache.class);
    private static final String TMP_FILE_SUFFIX = ".tmp";
    public static final String REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD = "remote-log-index-cleaner";
    public static final String DIR_NAME = "remote-log-index-cache";
    private final File cacheDir;
    private final AtomicBoolean isRemoteIndexCacheClosed;
    private final LinkedBlockingQueue<Entry> expiredIndexes;
    private final ReentrantReadWriteLock lock;
    private final Cache<Uuid, Entry> internalCache;
    private final RemoteStorageManager remoteStorageManager;
    private final ShutdownableThread cleanerThread;

    /* loaded from: input_file:org/apache/kafka/storage/internals/log/RemoteIndexCache$Entry.class */
    public static class Entry implements AutoCloseable {
        private final OffsetIndex offsetIndex;
        private final TimeIndex timeIndex;
        private final TransactionIndex txnIndex;
        private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
        private boolean cleanStarted = false;
        private boolean markedForCleanup = false;

        public Entry(OffsetIndex offsetIndex, TimeIndex timeIndex, TransactionIndex transactionIndex) {
            this.offsetIndex = offsetIndex;
            this.timeIndex = timeIndex;
            this.txnIndex = transactionIndex;
        }

        public OffsetIndex offsetIndex() {
            return this.offsetIndex;
        }

        public TimeIndex timeIndex() {
            return this.timeIndex;
        }

        public TransactionIndex txnIndex() {
            return this.txnIndex;
        }

        public boolean isCleanStarted() {
            return this.cleanStarted;
        }

        public boolean isMarkedForCleanup() {
            return this.markedForCleanup;
        }

        public OffsetPosition lookupOffset(long j) {
            this.lock.readLock().lock();
            try {
                if (this.markedForCleanup) {
                    throw new IllegalStateException("This entry is marked for cleanup");
                }
                OffsetPosition lookup = this.offsetIndex.lookup(j);
                this.lock.readLock().unlock();
                return lookup;
            } catch (Throwable th) {
                this.lock.readLock().unlock();
                throw th;
            }
        }

        public OffsetPosition lookupTimestamp(long j, long j2) throws IOException {
            this.lock.readLock().lock();
            try {
                if (this.markedForCleanup) {
                    throw new IllegalStateException("This entry is marked for cleanup");
                }
                OffsetPosition lookup = this.offsetIndex.lookup(Math.max(j2, this.timeIndex.lookup(j).offset));
                this.lock.readLock().unlock();
                return lookup;
            } catch (Throwable th) {
                this.lock.readLock().unlock();
                throw th;
            }
        }

        public void markForCleanup() throws IOException {
            this.lock.writeLock().lock();
            try {
                if (!this.markedForCleanup) {
                    this.markedForCleanup = true;
                    this.offsetIndex.renameTo(new File(Utils.replaceSuffix(this.offsetIndex.file().getPath(), "", LogFileUtils.DELETED_FILE_SUFFIX)));
                    this.timeIndex.renameTo(new File(Utils.replaceSuffix(this.timeIndex.file().getPath(), "", LogFileUtils.DELETED_FILE_SUFFIX)));
                    this.txnIndex.renameTo(new File(Utils.replaceSuffix(this.txnIndex.file().getPath(), "", LogFileUtils.DELETED_FILE_SUFFIX)));
                }
            } finally {
                this.lock.writeLock().unlock();
            }
        }

        public void cleanup() throws IOException {
            this.lock.writeLock().lock();
            try {
                markForCleanup();
                if (!this.cleanStarted) {
                    this.cleanStarted = true;
                    RemoteIndexCache.tryAll(Arrays.asList(() -> {
                        this.offsetIndex.deleteIfExists();
                        return null;
                    }, () -> {
                        this.timeIndex.deleteIfExists();
                        return null;
                    }, () -> {
                        this.txnIndex.deleteIfExists();
                        return null;
                    }));
                }
            } finally {
                this.lock.writeLock().unlock();
            }
        }

        @Override // java.lang.AutoCloseable
        public void close() {
            this.lock.writeLock().lock();
            try {
                Utils.closeQuietly(this.offsetIndex, "OffsetIndex");
                Utils.closeQuietly(this.timeIndex, "TimeIndex");
                Utils.closeQuietly(this.txnIndex, "TransactionIndex");
            } finally {
                this.lock.writeLock().unlock();
            }
        }

        public String toString() {
            return "Entry{offsetIndex=" + this.offsetIndex.file().getName() + ", timeIndex=" + this.timeIndex.file().getName() + ", txnIndex=" + this.txnIndex.file().getName() + '}';
        }
    }

    public RemoteIndexCache(RemoteStorageManager remoteStorageManager, String str) throws IOException {
        this(1024, remoteStorageManager, str);
    }

    public RemoteIndexCache(int i, RemoteStorageManager remoteStorageManager, String str) throws IOException {
        this.isRemoteIndexCacheClosed = new AtomicBoolean(false);
        this.expiredIndexes = new LinkedBlockingQueue<>();
        this.lock = new ReentrantReadWriteLock();
        this.remoteStorageManager = remoteStorageManager;
        this.cacheDir = new File(str, DIR_NAME);
        this.internalCache = Caffeine.newBuilder().maximumSize(i).evictionListener((uuid, entry, removalCause) -> {
            if (entry != null) {
                enqueueEntryForCleanup(entry, uuid);
            } else {
                log.error("Received entry as null for key {} when the it is removed from the cache.", uuid);
            }
        }).build();
        init();
        this.cleanerThread = createCleanerThread();
        this.cleanerThread.start();
    }

    public Collection<Entry> expiredIndexes() {
        return Collections.unmodifiableCollection(this.expiredIndexes);
    }

    public Cache<Uuid, Entry> internalCache() {
        return this.internalCache;
    }

    public File cacheDir() {
        return this.cacheDir;
    }

    public void remove(Uuid uuid) {
        this.lock.readLock().lock();
        try {
            this.internalCache.asMap().computeIfPresent(uuid, (uuid2, entry) -> {
                enqueueEntryForCleanup(entry, uuid2);
                return null;
            });
        } finally {
            this.lock.readLock().unlock();
        }
    }

    public void removeAll(Collection<Uuid> collection) {
        this.lock.readLock().lock();
        try {
            collection.forEach(uuid -> {
            });
        } finally {
            this.lock.readLock().unlock();
        }
    }

    private void enqueueEntryForCleanup(Entry entry, Uuid uuid) {
        try {
            entry.markForCleanup();
            if (!this.expiredIndexes.offer(entry)) {
                log.error("Error while inserting entry {} for key {} into the cleaner queue because queue is full.", entry, uuid);
            }
        } catch (IOException e) {
            throw new KafkaException(e);
        }
    }

    public ShutdownableThread cleanerThread() {
        return this.cleanerThread;
    }

    private ShutdownableThread createCleanerThread() {
        ShutdownableThread shutdownableThread = new ShutdownableThread(REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD) { // from class: org.apache.kafka.storage.internals.log.RemoteIndexCache.1
            public void doWork() {
                try {
                    Entry entry = (Entry) RemoteIndexCache.this.expiredIndexes.take();
                    this.log.debug("Cleaning up index entry {}", entry);
                    entry.cleanup();
                } catch (InterruptedException e) {
                    if (RemoteIndexCache.this.isRemoteIndexCacheClosed.get()) {
                        this.log.debug("Cleaner thread was interrupted on cache shutdown");
                    } else {
                        this.log.error("Cleaner thread received interruption but remote index cache is not closed", e);
                        throw new KafkaException(e);
                    }
                } catch (Exception e2) {
                    this.log.error("Error occurred while cleaning up expired entry", e2);
                }
            }
        };
        shutdownableThread.setDaemon(true);
        return shutdownableThread;
    }

    private void init() throws IOException {
        Throwable th;
        long hiResClockMs = Time.SYSTEM.hiResClockMs();
        try {
            Files.createDirectory(this.cacheDir.toPath(), new FileAttribute[0]);
            log.info("Created new file {} for RemoteIndexCache", this.cacheDir);
        } catch (FileAlreadyExistsException e) {
            log.info("RemoteIndexCache directory {} already exists. Re-using the same directory.", this.cacheDir);
        } catch (Exception e2) {
            log.error("Unable to create directory {} for RemoteIndexCache.", this.cacheDir, e2);
            throw new KafkaException(e2);
        }
        Stream<Path> list = Files.list(this.cacheDir.toPath());
        Throwable th2 = null;
        try {
            try {
                list.forEach(path -> {
                    if (path.endsWith(LogFileUtils.DELETED_FILE_SUFFIX) || path.endsWith(TMP_FILE_SUFFIX)) {
                        try {
                            if (Files.deleteIfExists(path)) {
                                log.debug("Deleted file path {} on cache initialization", path);
                            }
                        } catch (IOException e3) {
                            throw new KafkaException(e3);
                        }
                    }
                });
                if (list != null) {
                    if (0 != 0) {
                        try {
                            list.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        list.close();
                    }
                }
                list = Files.list(this.cacheDir.toPath());
                th = null;
            } finally {
            }
            try {
                try {
                    Iterator<Path> it = list.iterator();
                    while (it.hasNext()) {
                        Path fileName = it.next().getFileName();
                        if (fileName == null) {
                            throw new KafkaException("Empty file name in remote index cache directory: " + this.cacheDir);
                        }
                        String path2 = fileName.toString();
                        Uuid remoteLogSegmentIdFromRemoteIndexFileName = remoteLogSegmentIdFromRemoteIndexFileName(path2);
                        if (!this.internalCache.asMap().containsKey(remoteLogSegmentIdFromRemoteIndexFileName)) {
                            String substring = path2.substring(0, path2.indexOf("."));
                            File file = new File(this.cacheDir, substring + LogFileUtils.INDEX_FILE_SUFFIX);
                            File file2 = new File(this.cacheDir, substring + LogFileUtils.TIME_INDEX_FILE_SUFFIX);
                            File file3 = new File(this.cacheDir, substring + LogFileUtils.TXN_INDEX_FILE_SUFFIX);
                            if (Files.exists(file.toPath(), new LinkOption[0]) && Files.exists(file2.toPath(), new LinkOption[0]) && Files.exists(file3.toPath(), new LinkOption[0])) {
                                long offsetFromRemoteIndexFileName = offsetFromRemoteIndexFileName(path2);
                                OffsetIndex offsetIndex = new OffsetIndex(file, offsetFromRemoteIndexFileName, Integer.MAX_VALUE, false);
                                offsetIndex.sanityCheck();
                                TimeIndex timeIndex = new TimeIndex(file2, offsetFromRemoteIndexFileName, Integer.MAX_VALUE, false);
                                timeIndex.sanityCheck();
                                TransactionIndex transactionIndex = new TransactionIndex(offsetFromRemoteIndexFileName, file3);
                                transactionIndex.sanityCheck();
                                this.internalCache.put(remoteLogSegmentIdFromRemoteIndexFileName, new Entry(offsetIndex, timeIndex, transactionIndex));
                            } else {
                                tryAll(Arrays.asList(() -> {
                                    Files.deleteIfExists(file.toPath());
                                    return null;
                                }, () -> {
                                    Files.deleteIfExists(file2.toPath());
                                    return null;
                                }, () -> {
                                    Files.deleteIfExists(file3.toPath());
                                    return null;
                                }));
                            }
                        }
                    }
                    if (list != null) {
                        if (0 != 0) {
                            try {
                                list.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            list.close();
                        }
                    }
                    log.info("RemoteIndexCache starts up in {} ms.", Long.valueOf(Time.SYSTEM.hiResClockMs() - hiResClockMs));
                } finally {
                }
            } finally {
            }
        } finally {
        }
    }

    private <T> T loadIndexFile(File file, RemoteLogSegmentMetadata remoteLogSegmentMetadata, Function<RemoteLogSegmentMetadata, InputStream> function, Function<File, T> function2) throws IOException {
        File file2 = new File(this.cacheDir, file.getName());
        T t = null;
        if (Files.exists(file2.toPath(), new LinkOption[0])) {
            try {
                t = function2.apply(file2);
            } catch (CorruptIndexException e) {
                log.info("Error occurred while loading the stored index file {}", file2.getPath(), e);
            }
        }
        if (t == null) {
            File file3 = new File(file2.getParentFile(), file2.getName() + TMP_FILE_SUFFIX);
            InputStream apply = function.apply(remoteLogSegmentMetadata);
            Throwable th = null;
            try {
                Files.copy(apply, file3.toPath(), StandardCopyOption.REPLACE_EXISTING);
                if (apply != null) {
                    if (0 != 0) {
                        try {
                            apply.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        apply.close();
                    }
                }
                Utils.atomicMoveWithFallback(file3.toPath(), file2.toPath(), false);
                t = function2.apply(file2);
            } catch (Throwable th3) {
                if (apply != null) {
                    if (0 != 0) {
                        try {
                            apply.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        apply.close();
                    }
                }
                throw th3;
            }
        }
        return t;
    }

    public Entry getIndexEntry(RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
        if (this.isRemoteIndexCacheClosed.get()) {
            throw new IllegalStateException("Unable to fetch index for segment id=" + remoteLogSegmentMetadata.remoteLogSegmentId().id() + ". Instance is already closed.");
        }
        this.lock.readLock().lock();
        try {
            if (this.isRemoteIndexCacheClosed.get()) {
                throw new IllegalStateException("Unable to fetch index for segment-id = " + remoteLogSegmentMetadata.remoteLogSegmentId().id() + ". Index instance is already closed.");
            }
            return (Entry) this.internalCache.get(remoteLogSegmentMetadata.remoteLogSegmentId().id(), uuid -> {
                return createCacheEntry(remoteLogSegmentMetadata);
            });
        } finally {
            this.lock.readLock().unlock();
        }
    }

    private Entry createCacheEntry(RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
        long startOffset = remoteLogSegmentMetadata.startOffset();
        try {
            return new Entry((OffsetIndex) loadIndexFile(remoteOffsetIndexFile(this.cacheDir, remoteLogSegmentMetadata), remoteLogSegmentMetadata, remoteLogSegmentMetadata2 -> {
                try {
                    return this.remoteStorageManager.fetchIndex(remoteLogSegmentMetadata2, RemoteStorageManager.IndexType.OFFSET);
                } catch (RemoteStorageException e) {
                    throw new KafkaException(e);
                }
            }, file -> {
                try {
                    OffsetIndex offsetIndex = new OffsetIndex(file, startOffset, Integer.MAX_VALUE, false);
                    offsetIndex.sanityCheck();
                    return offsetIndex;
                } catch (IOException e) {
                    throw new KafkaException(e);
                }
            }), (TimeIndex) loadIndexFile(remoteTimeIndexFile(this.cacheDir, remoteLogSegmentMetadata), remoteLogSegmentMetadata, remoteLogSegmentMetadata3 -> {
                try {
                    return this.remoteStorageManager.fetchIndex(remoteLogSegmentMetadata3, RemoteStorageManager.IndexType.TIMESTAMP);
                } catch (RemoteStorageException e) {
                    throw new KafkaException(e);
                }
            }, file2 -> {
                try {
                    TimeIndex timeIndex = new TimeIndex(file2, startOffset, Integer.MAX_VALUE, false);
                    timeIndex.sanityCheck();
                    return timeIndex;
                } catch (IOException e) {
                    throw new KafkaException(e);
                }
            }), (TransactionIndex) loadIndexFile(remoteTransactionIndexFile(this.cacheDir, remoteLogSegmentMetadata), remoteLogSegmentMetadata, remoteLogSegmentMetadata4 -> {
                try {
                    return this.remoteStorageManager.fetchIndex(remoteLogSegmentMetadata4, RemoteStorageManager.IndexType.TRANSACTION);
                } catch (RemoteStorageException e) {
                    throw new KafkaException(e);
                } catch (RemoteResourceNotFoundException e2) {
                    return new ByteArrayInputStream(new byte[0]);
                }
            }, file3 -> {
                try {
                    TransactionIndex transactionIndex = new TransactionIndex(startOffset, file3);
                    transactionIndex.sanityCheck();
                    return transactionIndex;
                } catch (IOException e) {
                    throw new KafkaException(e);
                }
            }));
        } catch (IOException e) {
            throw new KafkaException(e);
        }
    }

    public int lookupOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, long j) {
        this.lock.readLock().lock();
        try {
            int i = getIndexEntry(remoteLogSegmentMetadata).lookupOffset(j).position;
            this.lock.readLock().unlock();
            return i;
        } catch (Throwable th) {
            this.lock.readLock().unlock();
            throw th;
        }
    }

    public int lookupTimestamp(RemoteLogSegmentMetadata remoteLogSegmentMetadata, long j, long j2) throws IOException {
        this.lock.readLock().lock();
        try {
            int i = getIndexEntry(remoteLogSegmentMetadata).lookupTimestamp(j, j2).position;
            this.lock.readLock().unlock();
            return i;
        } catch (Throwable th) {
            this.lock.readLock().unlock();
            throw th;
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.isRemoteIndexCacheClosed.getAndSet(true)) {
            return;
        }
        this.lock.writeLock().lock();
        try {
            try {
                log.info("Close initiated for RemoteIndexCache. Cache stats={}. Cache entries pending delete={}", this.internalCache.stats(), Integer.valueOf(this.expiredIndexes.size()));
                boolean initiateShutdown = this.cleanerThread.initiateShutdown();
                this.internalCache.asMap().forEach((uuid, entry) -> {
                    entry.close();
                });
                if (initiateShutdown) {
                    this.cleanerThread.awaitShutdown();
                }
                log.info("Close completed for RemoteIndexCache");
                this.lock.writeLock().unlock();
            } catch (InterruptedException e) {
                throw new KafkaException(e);
            }
        } catch (Throwable th) {
            this.lock.writeLock().unlock();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void tryAll(List<StorageAction<Void, Exception>> list) throws IOException {
        IOException iOException = null;
        ArrayList arrayList = new ArrayList();
        Iterator<StorageAction<Void, Exception>> it = list.iterator();
        while (it.hasNext()) {
            try {
                it.next().execute();
            } catch (IOException e) {
                if (iOException == null) {
                    iOException = e;
                } else {
                    arrayList.add(e);
                }
            } catch (Exception e2) {
                arrayList.add(e2);
            }
        }
        if (iOException != null) {
            IOException iOException2 = iOException;
            iOException2.getClass();
            arrayList.forEach((v1) -> {
                r1.addSuppressed(v1);
            });
            throw iOException;
        }
        if (arrayList.isEmpty()) {
            return;
        }
        Iterator it2 = arrayList.iterator();
        KafkaException kafkaException = new KafkaException((Throwable) it2.next());
        while (it2.hasNext()) {
            kafkaException.addSuppressed((Throwable) it2.next());
        }
        throw kafkaException;
    }

    private static Uuid remoteLogSegmentIdFromRemoteIndexFileName(String str) {
        return Uuid.fromString(str.substring(str.indexOf("_") + 1, str.indexOf(".")));
    }

    private static long offsetFromRemoteIndexFileName(String str) {
        return Long.parseLong(str.substring(0, str.indexOf("_")));
    }

    private static String generateFileNamePrefixForIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
        return remoteLogSegmentMetadata.startOffset() + "_" + remoteLogSegmentMetadata.remoteLogSegmentId().id().toString();
    }

    public static File remoteOffsetIndexFile(File file, RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
        return new File(file, remoteOffsetIndexFileName(remoteLogSegmentMetadata));
    }

    public static String remoteOffsetIndexFileName(RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
        return generateFileNamePrefixForIndex(remoteLogSegmentMetadata) + LogFileUtils.INDEX_FILE_SUFFIX;
    }

    public static File remoteTimeIndexFile(File file, RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
        return new File(file, remoteTimeIndexFileName(remoteLogSegmentMetadata));
    }

    public static String remoteTimeIndexFileName(RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
        return generateFileNamePrefixForIndex(remoteLogSegmentMetadata) + LogFileUtils.TIME_INDEX_FILE_SUFFIX;
    }

    public static File remoteTransactionIndexFile(File file, RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
        return new File(file, remoteTransactionIndexFileName(remoteLogSegmentMetadata));
    }

    public static String remoteTransactionIndexFileName(RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
        return generateFileNamePrefixForIndex(remoteLogSegmentMetadata) + LogFileUtils.TXN_INDEX_FILE_SUFFIX;
    }
}
