/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializationHelper;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.query.PositionBound;
import org.apache.kafka.streams.query.Query;
import org.apache.kafka.streams.query.QueryConfig;
import org.apache.kafka.streams.query.QueryResult;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.RocksDBConfigSetter;
import org.apache.kafka.streams.state.internals.BatchWritingStore;
import org.apache.kafka.streams.state.internals.BlockBasedTableConfigWithAccessibleCache;
import org.apache.kafka.streams.state.internals.KeyValueIterators;
import org.apache.kafka.streams.state.internals.ManagedKeyValueIterator;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.streams.state.internals.RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter;
import org.apache.kafka.streams.state.internals.RocksDBRangeIterator;
import org.apache.kafka.streams.state.internals.RocksDbIterator;
import org.apache.kafka.streams.state.internals.StoreQueryUtils;
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.BloomFilter;
import org.rocksdb.Cache;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.CompactionStyle;
import org.rocksdb.CompressionType;
import org.rocksdb.DBOptions;
import org.rocksdb.Filter;
import org.rocksdb.FlushOptions;
import org.rocksdb.InfoLogLevel;
import org.rocksdb.LRUCache;
import org.rocksdb.Options;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.Snapshot;
import org.rocksdb.Statistics;
import org.rocksdb.TableFormatConfig;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteBatchInterface;
import org.rocksdb.WriteBatchWithIndex;
import org.rocksdb.WriteOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RocksDBStore
implements KeyValueStore<Bytes, byte[]>,
BatchWritingStore {
    private static final Logger log = LoggerFactory.getLogger(RocksDBStore.class);
    private static final CompressionType COMPRESSION_TYPE = CompressionType.NO_COMPRESSION;
    private static final CompactionStyle COMPACTION_STYLE = CompactionStyle.UNIVERSAL;
    private static final long WRITE_BUFFER_SIZE = 0x1000000L;
    private static final long BLOCK_CACHE_SIZE = 0x3200000L;
    private static final long BLOCK_SIZE = 4096L;
    private static final int MAX_WRITE_BUFFERS = 3;
    static final String DB_FILE_DIR = "rocksdb";
    final String name;
    private final String parentDir;
    final Set<KeyValueIterator<Bytes, byte[]>> openIterators = Collections.synchronizedSet(new HashSet());
    private boolean consistencyEnabled = false;
    protected File dbDir;
    RocksDB db;
    DBAccessor dbAccessor;
    ColumnFamilyAccessor cfAccessor;
    private RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter userSpecifiedOptions;
    WriteOptions wOptions;
    FlushOptions fOptions;
    private Cache cache;
    private BloomFilter filter;
    private Statistics statistics;
    private RocksDBConfigSetter configSetter;
    private boolean userSpecifiedStatistics = false;
    private final RocksDBMetricsRecorder metricsRecorder;
    private final boolean autoManagedIterators;
    protected volatile boolean open = false;
    protected StateStoreContext context;
    protected Position position;
    private OffsetCheckpoint positionCheckpoint;

    public RocksDBStore(String name, String metricsScope) {
        this(name, DB_FILE_DIR, new RocksDBMetricsRecorder(metricsScope, name));
    }

    RocksDBStore(String name, String parentDir, RocksDBMetricsRecorder metricsRecorder) {
        this(name, parentDir, metricsRecorder, true);
    }

    RocksDBStore(String name, String parentDir, RocksDBMetricsRecorder metricsRecorder, boolean autoManagedIterators) {
        this.name = name;
        this.parentDir = parentDir;
        this.metricsRecorder = metricsRecorder;
        this.autoManagedIterators = autoManagedIterators;
    }

    @Override
    @Deprecated
    public void init(ProcessorContext context, StateStore root) {
        if (!(context instanceof StateStoreContext)) {
            throw new UnsupportedOperationException("Use RocksDBStore#init(StateStoreContext, StateStore) instead.");
        }
        this.init((StateStoreContext)((Object)context), root);
    }

    @Override
    public void init(StateStoreContext context, StateStore root) {
        this.metricsRecorder.init(ProcessorContextUtils.getMetricsImpl(context), context.taskId());
        this.openDB(context.appConfigs(), context.stateDir());
        File positionCheckpointFile = new File(context.stateDir(), this.name() + ".position");
        this.positionCheckpoint = new OffsetCheckpoint(positionCheckpointFile);
        this.position = StoreQueryUtils.readPositionFromCheckpoint(this.positionCheckpoint);
        this.context = context;
        context.register(root, this::restoreBatch, () -> StoreQueryUtils.checkpointPosition(this.positionCheckpoint, this.position));
        this.consistencyEnabled = StreamsConfig.InternalConfig.getBoolean(context.appConfigs(), "__iq.consistency.offset.vector.enabled__", false);
    }

    void openDB(Map<String, Object> configs, File stateDir) {
        DBOptions dbOptions = new DBOptions();
        ColumnFamilyOptions columnFamilyOptions = new ColumnFamilyOptions();
        this.userSpecifiedOptions = new RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter(dbOptions, columnFamilyOptions);
        BlockBasedTableConfigWithAccessibleCache tableConfig = new BlockBasedTableConfigWithAccessibleCache();
        this.cache = new LRUCache(0x3200000L);
        tableConfig.setBlockCache(this.cache);
        tableConfig.setBlockSize(4096L);
        this.filter = new BloomFilter();
        tableConfig.setFilterPolicy((Filter)this.filter);
        this.userSpecifiedOptions.optimizeFiltersForHits();
        this.userSpecifiedOptions.setTableFormatConfig((TableFormatConfig)tableConfig);
        this.userSpecifiedOptions.setWriteBufferSize(0x1000000L);
        this.userSpecifiedOptions.setCompressionType(COMPRESSION_TYPE);
        this.userSpecifiedOptions.setCompactionStyle(COMPACTION_STYLE);
        this.userSpecifiedOptions.setMaxWriteBufferNumber(3);
        this.userSpecifiedOptions.setCreateIfMissing(true);
        this.userSpecifiedOptions.setErrorIfExists(false);
        this.userSpecifiedOptions.setInfoLogLevel(InfoLogLevel.ERROR_LEVEL);
        this.userSpecifiedOptions.setIncreaseParallelism(Math.max(Runtime.getRuntime().availableProcessors(), 2));
        this.wOptions = new WriteOptions();
        this.wOptions.setDisableWAL(true);
        this.fOptions = new FlushOptions();
        this.fOptions.setWaitForFlush(true);
        Class configSetterClass = (Class)configs.get("rocksdb.config.setter");
        if (configSetterClass != null) {
            this.configSetter = (RocksDBConfigSetter)Utils.newInstance((Class)configSetterClass);
            this.configSetter.setConfig(this.name, this.userSpecifiedOptions, configs);
        }
        this.dbDir = new File(new File(stateDir, this.parentDir), this.name);
        try {
            Files.createDirectories(this.dbDir.getParentFile().toPath(), new FileAttribute[0]);
            Files.createDirectories(this.dbDir.getAbsoluteFile().toPath(), new FileAttribute[0]);
        }
        catch (IOException fatal) {
            throw new ProcessorStateException(fatal);
        }
        this.setupStatistics(configs, dbOptions);
        this.openRocksDB(dbOptions, columnFamilyOptions);
        this.dbAccessor = new DirectDBAccessor(this.db, this.fOptions, this.wOptions);
        this.open = true;
        this.addValueProvidersToMetricsRecorder();
    }

    private void setupStatistics(Map<String, Object> configs, DBOptions dbOptions) {
        this.statistics = this.userSpecifiedOptions.statistics();
        if (this.statistics == null) {
            if (Sensor.RecordingLevel.forName((String)((String)configs.get("metrics.recording.level"))) == Sensor.RecordingLevel.DEBUG) {
                this.statistics = new Statistics();
                dbOptions.setStatistics(this.statistics);
            }
            this.userSpecifiedStatistics = false;
        } else {
            this.userSpecifiedStatistics = true;
        }
    }

    private void addValueProvidersToMetricsRecorder() {
        TableFormatConfig tableFormatConfig = this.userSpecifiedOptions.tableFormatConfig();
        if (tableFormatConfig instanceof BlockBasedTableConfigWithAccessibleCache) {
            Cache cache = ((BlockBasedTableConfigWithAccessibleCache)tableFormatConfig).blockCache();
            this.metricsRecorder.addValueProviders(this.name, this.db, cache, this.userSpecifiedStatistics ? null : this.statistics);
        } else {
            if (tableFormatConfig instanceof BlockBasedTableConfig) {
                throw new ProcessorStateException("The used block-based table format configuration does not expose the block cache. Use the BlockBasedTableConfig instance provided by Options#tableFormatConfig() to configure the block-based table format of RocksDB. Do not provide a new instance of BlockBasedTableConfig to the RocksDB options.");
            }
            this.metricsRecorder.addValueProviders(this.name, this.db, null, this.userSpecifiedStatistics ? null : this.statistics);
        }
    }

    void openRocksDB(DBOptions dbOptions, ColumnFamilyOptions columnFamilyOptions) {
        List<ColumnFamilyHandle> columnFamilies = this.openRocksDB(dbOptions, new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnFamilyOptions), new ColumnFamilyDescriptor[0]);
        this.cfAccessor = new SingleColumnFamilyAccessor(columnFamilies.get(0));
    }

    protected List<ColumnFamilyHandle> openRocksDB(DBOptions dbOptions, ColumnFamilyDescriptor defaultColumnFamilyDescriptor, ColumnFamilyDescriptor ... columnFamilyDescriptors) {
        String absolutePath = this.dbDir.getAbsolutePath();
        List<ColumnFamilyDescriptor> extraDescriptors = Arrays.asList(columnFamilyDescriptors);
        ArrayList<ColumnFamilyDescriptor> allDescriptors = new ArrayList<ColumnFamilyDescriptor>(1 + columnFamilyDescriptors.length);
        allDescriptors.add(defaultColumnFamilyDescriptor);
        allDescriptors.addAll(extraDescriptors);
        try {
            List allExisting = RocksDB.listColumnFamilies((Options)this.userSpecifiedOptions, (String)absolutePath);
            LinkedList<ColumnFamilyDescriptor> existingDescriptors = new LinkedList<ColumnFamilyDescriptor>();
            existingDescriptors.add(defaultColumnFamilyDescriptor);
            existingDescriptors.addAll(extraDescriptors.stream().filter(descriptor -> allExisting.stream().anyMatch(existing -> Arrays.equals(existing, descriptor.getName()))).collect(Collectors.toList()));
            List toCreate = extraDescriptors.stream().filter(descriptor -> allExisting.stream().noneMatch(existing -> Arrays.equals(existing, descriptor.getName()))).collect(Collectors.toList());
            ArrayList<ColumnFamilyHandle> existingColumnFamilies = new ArrayList<ColumnFamilyHandle>(existingDescriptors.size());
            this.db = RocksDB.open((DBOptions)dbOptions, (String)absolutePath, existingDescriptors, existingColumnFamilies);
            List createdColumnFamilies = this.db.createColumnFamilies(toCreate);
            return this.mergeColumnFamilyHandleLists(existingColumnFamilies, createdColumnFamilies, allDescriptors);
        }
        catch (RocksDBException e) {
            throw new ProcessorStateException("Error opening store " + this.name + " at location " + this.dbDir.toString(), e);
        }
    }

    private List<ColumnFamilyHandle> mergeColumnFamilyHandleLists(List<ColumnFamilyHandle> existingColumnFamilyHandles, List<ColumnFamilyHandle> createdColumnFamilyHandles, List<ColumnFamilyDescriptor> allDescriptors) throws RocksDBException {
        ArrayList<ColumnFamilyHandle> columnFamilies = new ArrayList<ColumnFamilyHandle>(allDescriptors.size());
        int existing = 0;
        int created = 0;
        while (existing + created < allDescriptors.size()) {
            ColumnFamilyHandle createdHandle;
            ColumnFamilyHandle existingHandle = existing < existingColumnFamilyHandles.size() ? existingColumnFamilyHandles.get(existing) : null;
            ColumnFamilyHandle columnFamilyHandle = createdHandle = created < createdColumnFamilyHandles.size() ? createdColumnFamilyHandles.get(created) : null;
            if (existingHandle != null && Arrays.equals(existingHandle.getName(), allDescriptors.get(existing + created).getName())) {
                columnFamilies.add(existingHandle);
                ++existing;
                continue;
            }
            if (createdHandle != null && Arrays.equals(createdHandle.getName(), allDescriptors.get(existing + created).getName())) {
                columnFamilies.add(createdHandle);
                ++created;
                continue;
            }
            throw new IllegalStateException("Unable to match up column family handles with descriptors.");
        }
        return columnFamilies;
    }

    @Override
    public String name() {
        return this.name;
    }

    @Override
    public boolean persistent() {
        return true;
    }

    @Override
    public boolean isOpen() {
        return this.open;
    }

    private void validateStoreOpen() {
        if (!this.open) {
            throw new InvalidStateStoreException("Store " + this.name + " is currently closed");
        }
    }

    public Snapshot getSnapshot() {
        return this.db.getSnapshot();
    }

    public void releaseSnapshot(Snapshot snapshot) {
        this.db.releaseSnapshot(snapshot);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public synchronized void put(Bytes key, byte[] value) {
        Objects.requireNonNull(key, "key cannot be null");
        this.validateStoreOpen();
        Position position = this.position;
        synchronized (position) {
            this.cfAccessor.put(this.dbAccessor, key.get(), value);
            StoreQueryUtils.updatePosition(this.position, this.context);
        }
    }

    @Override
    public synchronized byte[] putIfAbsent(Bytes key, byte[] value) {
        Objects.requireNonNull(key, "key cannot be null");
        byte[] originalValue = this.get(key);
        if (originalValue == null) {
            this.put(key, value);
        }
        return originalValue;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void putAll(List<KeyValue<Bytes, byte[]>> entries) {
        Position position = this.position;
        synchronized (position) {
            try (WriteBatch batch = new WriteBatch();){
                this.cfAccessor.prepareBatch(entries, (WriteBatchInterface)batch);
                this.write((WriteBatchInterface)batch);
                StoreQueryUtils.updatePosition(this.position, this.context);
            }
            catch (RocksDBException e) {
                throw new ProcessorStateException("Error while batch writing to store " + this.name, e);
            }
        }
    }

    @Override
    public <R> QueryResult<R> query(Query<R> query, PositionBound positionBound, QueryConfig config) {
        return StoreQueryUtils.handleBasicQueries(query, positionBound, config, this, this.position, this.context);
    }

    @Override
    public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> prefixScan(P prefix, PS prefixKeySerializer) {
        if (!this.autoManagedIterators) {
            throw new IllegalStateException("Must specify openIterators in call to prefixScan()");
        }
        return this.doPrefixScan(prefix, prefixKeySerializer, this.openIterators);
    }

    <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> prefixScan(P prefix, PS prefixKeySerializer, Set<KeyValueIterator<Bytes, byte[]>> openIterators) {
        if (this.autoManagedIterators) {
            throw new IllegalStateException("Cannot specify openIterators when using auto-managed iterators");
        }
        return this.doPrefixScan(prefix, prefixKeySerializer, openIterators);
    }

    <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> doPrefixScan(P prefix, PS prefixKeySerializer, Set<KeyValueIterator<Bytes, byte[]>> openIterators) {
        this.validateStoreOpen();
        Objects.requireNonNull(prefix, "prefix cannot be null");
        Objects.requireNonNull(prefixKeySerializer, "prefixKeySerializer cannot be null");
        Bytes prefixBytes = Bytes.wrap((byte[])prefixKeySerializer.serialize(null, prefix));
        ManagedKeyValueIterator<Bytes, byte[]> rocksDbPrefixSeekIterator = this.cfAccessor.prefixScan(this.dbAccessor, prefixBytes);
        openIterators.add(rocksDbPrefixSeekIterator);
        rocksDbPrefixSeekIterator.onClose(() -> openIterators.remove(rocksDbPrefixSeekIterator));
        return rocksDbPrefixSeekIterator;
    }

    @Override
    public synchronized byte[] get(Bytes key) {
        return this.get(key, Optional.empty());
    }

    public synchronized byte[] get(Bytes key, ReadOptions readOptions) {
        return this.get(key, Optional.of(readOptions));
    }

    private synchronized byte[] get(Bytes key, Optional<ReadOptions> readOptions) {
        this.validateStoreOpen();
        try {
            return readOptions.isPresent() ? this.cfAccessor.get(this.dbAccessor, key.get(), readOptions.get()) : this.cfAccessor.get(this.dbAccessor, key.get());
        }
        catch (RocksDBException e) {
            throw new ProcessorStateException("Error while getting value for key from store " + this.name, e);
        }
    }

    @Override
    public synchronized byte[] delete(Bytes key) {
        byte[] oldValue;
        Objects.requireNonNull(key, "key cannot be null");
        try {
            oldValue = this.cfAccessor.getOnly(this.dbAccessor, key.get());
        }
        catch (RocksDBException e) {
            throw new ProcessorStateException("Error while getting value for key from store " + this.name, e);
        }
        this.put(key, null);
        return oldValue;
    }

    void deleteRange(Bytes keyFrom, Bytes keyTo) {
        Objects.requireNonNull(keyFrom, "keyFrom cannot be null");
        Objects.requireNonNull(keyTo, "keyTo cannot be null");
        this.validateStoreOpen();
        this.cfAccessor.deleteRange(this.dbAccessor, keyFrom.get(), Bytes.increment((Bytes)keyTo).get());
    }

    @Override
    public synchronized KeyValueIterator<Bytes, byte[]> range(Bytes from, Bytes to) {
        if (!this.autoManagedIterators) {
            throw new IllegalStateException("Must specify openIterators in call to range()");
        }
        return this.range(from, to, true, this.openIterators);
    }

    synchronized KeyValueIterator<Bytes, byte[]> range(Bytes from, Bytes to, Set<KeyValueIterator<Bytes, byte[]>> openIterators) {
        if (this.autoManagedIterators) {
            throw new IllegalStateException("Cannot specify openIterators when using auto-managed iterators");
        }
        return this.range(from, to, true, openIterators);
    }

    @Override
    public synchronized KeyValueIterator<Bytes, byte[]> reverseRange(Bytes from, Bytes to) {
        if (!this.autoManagedIterators) {
            throw new IllegalStateException("Must specify openIterators in call to reverseRange()");
        }
        return this.range(from, to, false, this.openIterators);
    }

    synchronized KeyValueIterator<Bytes, byte[]> reverseRange(Bytes from, Bytes to, Set<KeyValueIterator<Bytes, byte[]>> openIterators) {
        if (this.autoManagedIterators) {
            throw new IllegalStateException("Cannot specify openIterators when using auto-managed iterators");
        }
        return this.range(from, to, false, openIterators);
    }

    private KeyValueIterator<Bytes, byte[]> range(Bytes from, Bytes to, boolean forward, Set<KeyValueIterator<Bytes, byte[]>> openIterators) {
        if (Objects.nonNull(from) && Objects.nonNull(to) && from.compareTo(to) > 0) {
            log.warn("Returning empty iterator for fetch with invalid key range: from > to. This may be due to range arguments set in the wrong order, or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. Note that the built-in numerical serdes do not follow this for negative numbers");
            return KeyValueIterators.emptyIterator();
        }
        this.validateStoreOpen();
        ManagedKeyValueIterator<Bytes, byte[]> rocksDBRangeIterator = this.cfAccessor.range(this.dbAccessor, from, to, forward);
        openIterators.add(rocksDBRangeIterator);
        rocksDBRangeIterator.onClose(() -> openIterators.remove(rocksDBRangeIterator));
        return rocksDBRangeIterator;
    }

    @Override
    public synchronized KeyValueIterator<Bytes, byte[]> all() {
        if (!this.autoManagedIterators) {
            throw new IllegalStateException("Must specify openIterators in call to all()");
        }
        return this.all(true, this.openIterators);
    }

    synchronized KeyValueIterator<Bytes, byte[]> all(Set<KeyValueIterator<Bytes, byte[]>> openIterators) {
        if (this.autoManagedIterators) {
            throw new IllegalStateException("Cannot specify openIterators when using auto-managed iterators");
        }
        return this.all(true, openIterators);
    }

    @Override
    public KeyValueIterator<Bytes, byte[]> reverseAll() {
        if (!this.autoManagedIterators) {
            throw new IllegalStateException("Must specify openIterators in call to reverseAll()");
        }
        return this.all(false, this.openIterators);
    }

    KeyValueIterator<Bytes, byte[]> reverseAll(Set<KeyValueIterator<Bytes, byte[]>> openIterators) {
        if (this.autoManagedIterators) {
            throw new IllegalStateException("Cannot specify openIterators when using auto-managed iterators");
        }
        return this.all(false, openIterators);
    }

    private KeyValueIterator<Bytes, byte[]> all(boolean forward, Set<KeyValueIterator<Bytes, byte[]>> openIterators) {
        this.validateStoreOpen();
        ManagedKeyValueIterator<Bytes, byte[]> rocksDbIterator = this.cfAccessor.all(this.dbAccessor, forward);
        openIterators.add(rocksDbIterator);
        rocksDbIterator.onClose(() -> openIterators.remove(rocksDbIterator));
        return rocksDbIterator;
    }

    @Override
    public long approximateNumEntries() {
        long numEntries;
        this.validateStoreOpen();
        try {
            numEntries = this.cfAccessor.approximateNumEntries(this.dbAccessor);
        }
        catch (RocksDBException e) {
            throw new ProcessorStateException("Error fetching property from store " + this.name, e);
        }
        if (this.isOverflowing(numEntries)) {
            return Long.MAX_VALUE;
        }
        return numEntries;
    }

    private boolean isOverflowing(long value) {
        return value < 0L;
    }

    @Override
    public synchronized void flush() {
        if (this.db == null) {
            return;
        }
        try {
            this.cfAccessor.flush(this.dbAccessor);
        }
        catch (RocksDBException e) {
            throw new ProcessorStateException("Error while executing flush from store " + this.name, e);
        }
    }

    @Override
    public void addToBatch(KeyValue<byte[], byte[]> record, WriteBatchInterface batch) throws RocksDBException {
        this.cfAccessor.addToBatch((byte[])record.key, (byte[])record.value, batch);
    }

    @Override
    public void write(WriteBatchInterface batch) throws RocksDBException {
        if (batch instanceof WriteBatch) {
            this.db.write(this.wOptions, (WriteBatch)batch);
        } else if (batch instanceof WriteBatchWithIndex) {
            this.db.write(this.wOptions, (WriteBatchWithIndex)batch);
        } else {
            log.error("Unknown type of batch {}. This is a bug in Kafka Streams. Please file a bug report at https://issues.apache.org/jira/projects/KAFKA.", (Object)batch.getClass().getCanonicalName());
            throw new IllegalStateException("Unknown type of batch " + batch.getClass().getCanonicalName());
        }
    }

    @Override
    public synchronized void close() {
        if (!this.open) {
            return;
        }
        this.open = false;
        this.closeOpenIterators();
        if (this.configSetter != null) {
            this.configSetter.close(this.name, this.userSpecifiedOptions);
            this.configSetter = null;
        }
        this.metricsRecorder.removeValueProviders(this.name);
        this.cfAccessor.close();
        this.dbAccessor.close();
        this.db.close();
        this.userSpecifiedOptions.close();
        this.wOptions.close();
        this.fOptions.close();
        this.filter.close();
        this.cache.close();
        if (this.statistics != null) {
            this.statistics.close();
        }
        this.cfAccessor = null;
        this.dbAccessor = null;
        this.userSpecifiedOptions = null;
        this.wOptions = null;
        this.fOptions = null;
        this.db = null;
        this.filter = null;
        this.cache = null;
        this.statistics = null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void closeOpenIterators() {
        HashSet<KeyValueIterator<Bytes, byte[]>> iterators;
        Set<KeyValueIterator<Bytes, byte[]>> set = this.openIterators;
        synchronized (set) {
            iterators = new HashSet<KeyValueIterator<Bytes, byte[]>>(this.openIterators);
        }
        if (iterators.size() != 0) {
            log.warn("Closing {} open iterators for store {}", (Object)iterators.size(), (Object)this.name);
            for (KeyValueIterator keyValueIterator : iterators) {
                keyValueIterator.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void restoreBatch(Collection<ConsumerRecord<byte[], byte[]>> records) {
        Position position = this.position;
        synchronized (position) {
            try (WriteBatch batch = new WriteBatch();){
                for (ConsumerRecord<byte[], byte[]> record : records) {
                    ChangelogRecordDeserializationHelper.applyChecksAndUpdatePosition(record, this.consistencyEnabled, this.position);
                    this.cfAccessor.addToBatch((byte[])record.key(), (byte[])record.value(), (WriteBatchInterface)batch);
                }
                this.write((WriteBatchInterface)batch);
            }
            catch (RocksDBException e) {
                throw new ProcessorStateException("Error restoring batch to store " + this.name, e);
            }
        }
    }

    public Options getOptions() {
        return this.userSpecifiedOptions;
    }

    @Override
    public Position getPosition() {
        return this.position;
    }

    static Bytes incrementWithoutOverflow(Bytes input) {
        try {
            return Bytes.increment((Bytes)input);
        }
        catch (IndexOutOfBoundsException e) {
            return null;
        }
    }

    class SingleColumnFamilyAccessor
    implements ColumnFamilyAccessor {
        private final ColumnFamilyHandle columnFamily;

        SingleColumnFamilyAccessor(ColumnFamilyHandle columnFamily) {
            this.columnFamily = columnFamily;
        }

        @Override
        public void put(DBAccessor accessor, byte[] key, byte[] value) {
            if (value == null) {
                try {
                    accessor.delete(this.columnFamily, key);
                }
                catch (RocksDBException e) {
                    throw new ProcessorStateException("Error while removing key from store " + RocksDBStore.this.name, e);
                }
            }
            try {
                accessor.put(this.columnFamily, key, value);
            }
            catch (RocksDBException e) {
                throw new ProcessorStateException("Error while putting key/value into store " + RocksDBStore.this.name, e);
            }
        }

        @Override
        public void prepareBatch(List<KeyValue<Bytes, byte[]>> entries, WriteBatchInterface batch) throws RocksDBException {
            for (KeyValue<Bytes, byte[]> entry : entries) {
                Objects.requireNonNull(entry.key, "key cannot be null");
                this.addToBatch(((Bytes)entry.key).get(), (byte[])entry.value, batch);
            }
        }

        @Override
        public byte[] get(DBAccessor accessor, byte[] key) throws RocksDBException {
            return accessor.get(this.columnFamily, key);
        }

        @Override
        public byte[] get(DBAccessor accessor, byte[] key, ReadOptions readOptions) throws RocksDBException {
            return accessor.get(this.columnFamily, readOptions, key);
        }

        @Override
        public byte[] getOnly(DBAccessor accessor, byte[] key) throws RocksDBException {
            return this.get(accessor, key);
        }

        @Override
        public ManagedKeyValueIterator<Bytes, byte[]> range(DBAccessor accessor, Bytes from, Bytes to, boolean forward) {
            return new RocksDBRangeIterator(RocksDBStore.this.name, accessor.newIterator(this.columnFamily), from, to, forward, true);
        }

        @Override
        public void deleteRange(DBAccessor accessor, byte[] from, byte[] to) {
            try {
                accessor.deleteRange(this.columnFamily, from, to);
            }
            catch (RocksDBException e) {
                throw new ProcessorStateException("Error while removing key from store " + RocksDBStore.this.name, e);
            }
        }

        @Override
        public ManagedKeyValueIterator<Bytes, byte[]> all(DBAccessor accessor, boolean forward) {
            RocksIterator innerIterWithTimestamp = accessor.newIterator(this.columnFamily);
            if (forward) {
                innerIterWithTimestamp.seekToFirst();
            } else {
                innerIterWithTimestamp.seekToLast();
            }
            return new RocksDbIterator(RocksDBStore.this.name, innerIterWithTimestamp, forward);
        }

        @Override
        public ManagedKeyValueIterator<Bytes, byte[]> prefixScan(DBAccessor accessor, Bytes prefix) {
            Bytes to = RocksDBStore.incrementWithoutOverflow(prefix);
            return new RocksDBRangeIterator(RocksDBStore.this.name, accessor.newIterator(this.columnFamily), prefix, to, true, false);
        }

        @Override
        public long approximateNumEntries(DBAccessor accessor) throws RocksDBException {
            return accessor.approximateNumEntries(this.columnFamily);
        }

        @Override
        public void flush(DBAccessor accessor) throws RocksDBException {
            accessor.flush(this.columnFamily);
        }

        @Override
        public void addToBatch(byte[] key, byte[] value, WriteBatchInterface batch) throws RocksDBException {
            if (value == null) {
                batch.delete(this.columnFamily, key);
            } else {
                batch.put(this.columnFamily, key, value);
            }
        }

        @Override
        public void close() {
            this.columnFamily.close();
        }
    }

    static interface ColumnFamilyAccessor {
        public void put(DBAccessor var1, byte[] var2, byte[] var3);

        public void prepareBatch(List<KeyValue<Bytes, byte[]>> var1, WriteBatchInterface var2) throws RocksDBException;

        public byte[] get(DBAccessor var1, byte[] var2) throws RocksDBException;

        public byte[] get(DBAccessor var1, byte[] var2, ReadOptions var3) throws RocksDBException;

        public byte[] getOnly(DBAccessor var1, byte[] var2) throws RocksDBException;

        public ManagedKeyValueIterator<Bytes, byte[]> range(DBAccessor var1, Bytes var2, Bytes var3, boolean var4);

        public void deleteRange(DBAccessor var1, byte[] var2, byte[] var3);

        public ManagedKeyValueIterator<Bytes, byte[]> all(DBAccessor var1, boolean var2);

        public ManagedKeyValueIterator<Bytes, byte[]> prefixScan(DBAccessor var1, Bytes var2);

        public long approximateNumEntries(DBAccessor var1) throws RocksDBException;

        public void flush(DBAccessor var1) throws RocksDBException;

        public void addToBatch(byte[] var1, byte[] var2, WriteBatchInterface var3) throws RocksDBException;

        public void close();
    }

    static class DirectDBAccessor
    implements DBAccessor {
        private final RocksDB db;
        private final FlushOptions flushOptions;
        private final WriteOptions wOptions;

        DirectDBAccessor(RocksDB db, FlushOptions flushOptions, WriteOptions wOptions) {
            this.db = db;
            this.flushOptions = flushOptions;
            this.wOptions = wOptions;
        }

        @Override
        public byte[] get(ColumnFamilyHandle columnFamily, byte[] key) throws RocksDBException {
            return this.db.get(columnFamily, key);
        }

        @Override
        public byte[] get(ColumnFamilyHandle columnFamily, ReadOptions readOptions, byte[] key) throws RocksDBException {
            return this.db.get(columnFamily, readOptions, key);
        }

        @Override
        public RocksIterator newIterator(ColumnFamilyHandle columnFamily) {
            return this.db.newIterator(columnFamily);
        }

        @Override
        public void put(ColumnFamilyHandle columnFamily, byte[] key, byte[] value) throws RocksDBException {
            this.db.put(columnFamily, this.wOptions, key, value);
        }

        @Override
        public void delete(ColumnFamilyHandle columnFamily, byte[] key) throws RocksDBException {
            this.db.delete(columnFamily, this.wOptions, key);
        }

        @Override
        public void deleteRange(ColumnFamilyHandle columnFamily, byte[] from, byte[] to) throws RocksDBException {
            this.db.deleteRange(columnFamily, this.wOptions, from, to);
        }

        @Override
        public long approximateNumEntries(ColumnFamilyHandle columnFamily) throws RocksDBException {
            return this.db.getLongProperty(columnFamily, "rocksdb.estimate-num-keys");
        }

        @Override
        public void flush(ColumnFamilyHandle ... columnFamilies) throws RocksDBException {
            if (columnFamilies.length == 0) {
                this.db.flush(this.flushOptions);
            } else if (columnFamilies.length == 1) {
                this.db.flush(this.flushOptions, columnFamilies[0]);
            } else {
                this.db.flush(this.flushOptions, Arrays.asList(columnFamilies));
            }
        }

        @Override
        public void reset() {
        }

        @Override
        public void close() {
        }
    }

    static interface DBAccessor {
        public byte[] get(ColumnFamilyHandle var1, byte[] var2) throws RocksDBException;

        public byte[] get(ColumnFamilyHandle var1, ReadOptions var2, byte[] var3) throws RocksDBException;

        public RocksIterator newIterator(ColumnFamilyHandle var1);

        public void put(ColumnFamilyHandle var1, byte[] var2, byte[] var3) throws RocksDBException;

        public void delete(ColumnFamilyHandle var1, byte[] var2) throws RocksDBException;

        public void deleteRange(ColumnFamilyHandle var1, byte[] var2, byte[] var3) throws RocksDBException;

        public long approximateNumEntries(ColumnFamilyHandle var1) throws RocksDBException;

        public void flush(ColumnFamilyHandle ... var1) throws RocksDBException;

        public void reset();

        public void close();
    }
}

