/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.db.commitlog;

import com.google.common.base.Predicate;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Multimap;
import com.google.common.collect.Ordering;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.Cell;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.ColumnSerializer;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.UnknownColumnFamilyException;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.commitlog.CommitLogDescriptor;
import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.io.util.FastByteArrayInputStream;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.PureJavaCrc32;
import org.apache.cassandra.utils.WrappedRunnable;
import org.apache.commons.lang3.StringUtils;
import org.cliffc.high_scale_lib.NonBlockingHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CommitLogReplayer {
    private static final Logger logger = LoggerFactory.getLogger(CommitLogReplayer.class);
    private static final int MAX_OUTSTANDING_REPLAY_COUNT = 1024;
    private static final int LEGACY_END_OF_SEGMENT_MARKER = 0;
    private final Set<Keyspace> keyspacesRecovered = new NonBlockingHashSet();
    private final List<Future<?>> futures = new ArrayList();
    private final Map<UUID, AtomicInteger> invalidMutations;
    private final AtomicInteger replayedCount;
    private final Map<UUID, ReplayPosition> cfPositions;
    private final ReplayPosition globalPosition;
    private final PureJavaCrc32 checksum;
    private byte[] buffer = new byte[4096];

    public CommitLogReplayer() {
        this.invalidMutations = new HashMap<UUID, AtomicInteger>();
        this.replayedCount = new AtomicInteger();
        this.checksum = new PureJavaCrc32();
        this.cfPositions = new HashMap<UUID, ReplayPosition>();
        Ordering replayPositionOrdering = Ordering.from(ReplayPosition.comparator);
        for (ColumnFamilyStore cfs : ColumnFamilyStore.all()) {
            ReplayPosition rp = ReplayPosition.getReplayPosition(cfs.getSSTables());
            ReplayPosition truncatedAt = SystemKeyspace.getTruncatedPosition(cfs.metadata.cfId);
            if (truncatedAt != null) {
                rp = (ReplayPosition)replayPositionOrdering.max(Arrays.asList(rp, truncatedAt));
            }
            this.cfPositions.put(cfs.metadata.cfId, rp);
        }
        this.globalPosition = (ReplayPosition)replayPositionOrdering.min(this.cfPositions.values());
        logger.debug("Global replay position is {} from columnfamilies {}", (Object)this.globalPosition, (Object)FBUtilities.toString(this.cfPositions));
    }

    public void recover(File[] clogs) throws IOException {
        for (File file : clogs) {
            this.recover(file);
        }
    }

    public int blockForWrites() {
        for (Map.Entry<UUID, AtomicInteger> entry : this.invalidMutations.entrySet()) {
            logger.info(String.format("Skipped %d mutations from unknown (probably removed) CF with id %s", entry.getValue().intValue(), entry.getKey()));
        }
        FBUtilities.waitOnFutures(this.futures);
        logger.debug("Finished waiting on mutations from recovery");
        this.futures.clear();
        for (Keyspace keyspace : this.keyspacesRecovered) {
            this.futures.addAll(keyspace.flush());
        }
        FBUtilities.waitOnFutures(this.futures);
        return this.replayedCount.get();
    }

    private int readSyncMarker(CommitLogDescriptor descriptor, int offset, RandomAccessReader reader) throws IOException {
        if ((long)offset > reader.length() - 8L) {
            if ((long)offset != reader.length() && offset != Integer.MAX_VALUE) {
                logger.warn("Encountered bad header at position {} of Commit log {}; not enough room for a header", (Object)offset, (Object)reader.getPath());
            }
            return -1;
        }
        reader.seek(offset);
        PureJavaCrc32 crc = new PureJavaCrc32();
        crc.updateInt((int)(descriptor.id & 0xFFFFFFFFL));
        crc.updateInt((int)(descriptor.id >>> 32));
        crc.updateInt((int)reader.getPosition());
        int end = reader.readInt();
        long filecrc = descriptor.version < 4 ? reader.readLong() : (long)reader.readInt() & 0xFFFFFFFFL;
        if (crc.getValue() != filecrc) {
            if (end != 0 || filecrc != 0L) {
                logger.warn("Encountered bad header at position {} of commit log {}, with invalid CRC. The end of segment marker should be zero.", (Object)offset, (Object)reader.getPath());
            }
            return -1;
        }
        if (end < offset || (long)end > reader.length()) {
            logger.warn("Encountered bad header at position {} of commit log {}, with bad position but valid CRC", (Object)offset, (Object)reader.getPath());
            return -1;
        }
        return end;
    }

    private int getStartOffset(long segmentId, int version) {
        if (this.globalPosition.segment < segmentId) {
            if (version >= 4) {
                return 24;
            }
            return 0;
        }
        if (this.globalPosition.segment == segmentId) {
            return this.globalPosition.position;
        }
        return -1;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void recover(File file) throws IOException {
        block33: {
            final ReplayFilter replayFilter = ReplayFilter.create();
            logger.info("Replaying {}", (Object)file.getPath());
            CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName());
            final long segmentId = desc.id;
            logger.info("Replaying {} (CL version {}, messaging version {})", new Object[]{file.getPath(), desc.version, desc.getMessagingVersion()});
            RandomAccessReader reader = RandomAccessReader.open(new File(file.getAbsolutePath()));
            try {
                assert (reader.length() <= Integer.MAX_VALUE);
                int offset = this.getStartOffset(segmentId, desc.version);
                if (offset < 0) {
                    logger.debug("skipping replay of fully-flushed {}", (Object)file);
                    return;
                }
                int prevEnd = 16;
                while (true) {
                    int end = prevEnd;
                    if (desc.version < 4) {
                        end = Integer.MAX_VALUE;
                    } else {
                        while ((end = this.readSyncMarker(desc, end, reader)) < offset && end > prevEnd) {
                        }
                    }
                    if (end < prevEnd) {
                        break;
                    }
                    if (logger.isDebugEnabled()) {
                        logger.debug("Replaying {} between {} and {}", new Object[]{file, offset, end});
                    }
                    reader.seek(offset);
                    while (reader.getPosition() < (long)end && !reader.isEOF()) {
                        Mutation mutation;
                        long claimedCRC32;
                        int serializedSize;
                        if (logger.isDebugEnabled()) {
                            logger.debug("Reading mutation at {}", (Object)reader.getFilePointer());
                        }
                        try {
                            serializedSize = reader.readInt();
                            if (serializedSize == 0) {
                                logger.debug("Encountered end of segment marker at {}", (Object)reader.getFilePointer());
                                break block33;
                            }
                            if (serializedSize < 10) {
                                break block33;
                            }
                            long claimedSizeChecksum = desc.version < 4 ? reader.readLong() : (long)reader.readInt() & 0xFFFFFFFFL;
                            this.checksum.reset();
                            if (desc.version < 3) {
                                this.checksum.update(serializedSize);
                            } else {
                                this.checksum.updateInt(serializedSize);
                            }
                            if (this.checksum.getValue() != claimedSizeChecksum) {
                                break block33;
                            }
                            if (serializedSize > this.buffer.length) {
                                this.buffer = new byte[(int)(1.2 * (double)serializedSize)];
                            }
                            reader.readFully(this.buffer, 0, serializedSize);
                            claimedCRC32 = desc.version < 4 ? reader.readLong() : (long)reader.readInt() & 0xFFFFFFFFL;
                        }
                        catch (EOFException eof) {
                            break block33;
                        }
                        this.checksum.update(this.buffer, 0, serializedSize);
                        if (claimedCRC32 != this.checksum.getValue()) continue;
                        FastByteArrayInputStream bufIn = new FastByteArrayInputStream(this.buffer, 0, serializedSize);
                        try {
                            mutation = Mutation.serializer.deserialize(new DataInputStream(bufIn), desc.getMessagingVersion(), ColumnSerializer.Flag.LOCAL);
                            for (ColumnFamily cf : mutation.getColumnFamilies()) {
                                for (Cell cell : cf) {
                                    cf.getComparator().validate(cell.name());
                                }
                            }
                        }
                        catch (UnknownColumnFamilyException ex) {
                            if (ex.cfId == null) continue;
                            AtomicInteger i = this.invalidMutations.get(ex.cfId);
                            if (i == null) {
                                i = new AtomicInteger(1);
                                this.invalidMutations.put(ex.cfId, i);
                                continue;
                            }
                            i.incrementAndGet();
                            continue;
                        }
                        catch (Throwable t) {
                            JVMStabilityInspector.inspectThrowable(t);
                            File f = File.createTempFile("mutation", "dat");
                            try (DataOutputStream out = new DataOutputStream(new FileOutputStream(f));){
                                out.write(this.buffer, 0, serializedSize);
                            }
                            String st = String.format("Unexpected error deserializing mutation; saved to %s and ignored.  This may be caused by replaying a mutation against a table with the same name but incompatible schema.  Exception follows: ", f.getAbsolutePath());
                            logger.error(st, t);
                            continue;
                        }
                        if (logger.isDebugEnabled()) {
                            logger.debug("replaying mutation for {}.{}: {}", new Object[]{mutation.getKeyspaceName(), ByteBufferUtil.bytesToHex(mutation.key()), "{" + StringUtils.join(mutation.getColumnFamilies().iterator(), (String)", ") + "}"});
                        }
                        final long entryLocation = reader.getFilePointer();
                        WrappedRunnable runnable = new WrappedRunnable(){

                            @Override
                            public void runMayThrow() throws IOException {
                                if (Schema.instance.getKSMetaData(mutation.getKeyspaceName()) == null) {
                                    return;
                                }
                                if (CommitLogReplayer.this.pointInTimeExceeded(mutation)) {
                                    return;
                                }
                                Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
                                Mutation newMutation = null;
                                for (ColumnFamily columnFamily : replayFilter.filter(mutation)) {
                                    if (Schema.instance.getCF(columnFamily.id()) == null) continue;
                                    ReplayPosition rp = (ReplayPosition)CommitLogReplayer.this.cfPositions.get(columnFamily.id());
                                    if (segmentId <= rp.segment && (segmentId != rp.segment || entryLocation <= (long)rp.position)) continue;
                                    if (newMutation == null) {
                                        newMutation = new Mutation(mutation.getKeyspaceName(), mutation.key());
                                    }
                                    newMutation.add(columnFamily);
                                    CommitLogReplayer.this.replayedCount.incrementAndGet();
                                }
                                if (newMutation != null) {
                                    assert (!newMutation.isEmpty());
                                    Keyspace.open(newMutation.getKeyspaceName()).apply(newMutation, false);
                                    CommitLogReplayer.this.keyspacesRecovered.add(keyspace);
                                }
                            }
                        };
                        this.futures.add(StageManager.getStage(Stage.MUTATION).submit(runnable));
                        if (this.futures.size() <= 1024) continue;
                        FBUtilities.waitOnFutures(this.futures);
                        this.futures.clear();
                    }
                    if (desc.version < 4) {
                        break;
                    }
                    offset = end + 8;
                    prevEnd = end;
                }
            }
            finally {
                FileUtils.closeQuietly(reader);
                logger.info("Finished reading {}", (Object)file);
            }
        }
    }

    protected boolean pointInTimeExceeded(Mutation fm) {
        long restoreTarget = CommitLog.instance.archiver.restorePointInTime;
        for (ColumnFamily families : fm.getColumnFamilies()) {
            if (CommitLog.instance.archiver.precision.toMillis(families.maxTimestamp()) <= restoreTarget) continue;
            return true;
        }
        return false;
    }

    private static class CustomReplayFilter
    extends ReplayFilter {
        private Multimap<String, String> toReplay;

        public CustomReplayFilter(Multimap<String, String> toReplay) {
            this.toReplay = toReplay;
        }

        @Override
        public Iterable<ColumnFamily> filter(Mutation mutation) {
            final Collection cfNames = this.toReplay.get((Object)mutation.getKeyspaceName());
            if (cfNames == null) {
                return Collections.emptySet();
            }
            return Iterables.filter(mutation.getColumnFamilies(), (Predicate)new Predicate<ColumnFamily>(){

                public boolean apply(ColumnFamily cf) {
                    return cfNames.contains(cf.metadata().cfName);
                }
            });
        }
    }

    private static class AlwaysReplayFilter
    extends ReplayFilter {
        private AlwaysReplayFilter() {
        }

        @Override
        public Iterable<ColumnFamily> filter(Mutation mutation) {
            return mutation.getColumnFamilies();
        }
    }

    private static abstract class ReplayFilter {
        private ReplayFilter() {
        }

        public abstract Iterable<ColumnFamily> filter(Mutation var1);

        public static ReplayFilter create() {
            if (System.getProperty("cassandra.replayList") == null) {
                return new AlwaysReplayFilter();
            }
            HashMultimap toReplay = HashMultimap.create();
            for (String rawPair : System.getProperty("cassandra.replayList").split(",")) {
                String[] pair = rawPair.trim().split("\\.");
                if (pair.length != 2) {
                    throw new IllegalArgumentException("Each table to be replayed must be fully qualified with keyspace name, e.g., 'system.peers'");
                }
                Keyspace ks = Schema.instance.getKeyspaceInstance(pair[0]);
                if (ks == null) {
                    throw new IllegalArgumentException("Unknown keyspace " + pair[0]);
                }
                if (ks.getColumnFamilyStore(pair[1]) == null) {
                    throw new IllegalArgumentException(String.format("Unknown table %s.%s", pair[0], pair[1]));
                }
                toReplay.put((Object)pair[0], (Object)pair[1]);
            }
            return new CustomReplayFilter((Multimap<String, String>)toReplay);
        }
    }
}

