/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.streaming;

import java.util.ArrayList;
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.sstable.SSTableWriter;
import org.apache.cassandra.streaming.StreamLockfile;
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.streaming.StreamTask;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;

public class StreamReceiveTask
extends StreamTask {
    private static final ThreadPoolExecutor executor = DebuggableThreadPoolExecutor.createWithMaximumPoolSize("StreamReceiveTask", FBUtilities.getAvailableProcessors(), 60, TimeUnit.SECONDS);
    private final int totalFiles;
    private final long totalSize;
    private boolean done = false;
    protected Collection<SSTableWriter> sstables;

    public StreamReceiveTask(StreamSession session, UUID cfId, int totalFiles, long totalSize) {
        super(session, cfId);
        this.totalFiles = totalFiles;
        this.totalSize = totalSize;
        this.sstables = new ArrayList<SSTableWriter>(totalFiles);
    }

    public synchronized void received(SSTableWriter sstable) {
        if (this.done) {
            return;
        }
        assert (this.cfId.equals(sstable.metadata.cfId));
        this.sstables.add(sstable);
        if (this.sstables.size() == this.totalFiles) {
            this.done = true;
            executor.submit(new OnCompletionRunnable(this));
        }
    }

    @Override
    public int getTotalNumberOfFiles() {
        return this.totalFiles;
    }

    @Override
    public long getTotalSize() {
        return this.totalSize;
    }

    @Override
    public synchronized void abort() {
        if (this.done) {
            return;
        }
        this.done = true;
        for (SSTableWriter writer : this.sstables) {
            writer.abort();
        }
        this.sstables.clear();
    }

    private static class OnCompletionRunnable
    implements Runnable {
        private final StreamReceiveTask task;

        public OnCompletionRunnable(StreamReceiveTask task) {
            this.task = task;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            Pair<String, String> kscf = Schema.instance.getCF(this.task.cfId);
            if (kscf == null) {
                for (SSTableWriter writer : this.task.sstables) {
                    writer.abort();
                }
                this.task.sstables.clear();
                return;
            }
            ColumnFamilyStore cfs = Keyspace.open((String)kscf.left).getColumnFamilyStore((String)kscf.right);
            StreamLockfile lockfile = new StreamLockfile(cfs.directories.getWriteableLocationAsFile(), UUID.randomUUID());
            lockfile.create(this.task.sstables);
            ArrayList<SSTableReader> readers = new ArrayList<SSTableReader>();
            for (SSTableWriter writer : this.task.sstables) {
                readers.add(writer.closeAndOpenReader());
            }
            lockfile.delete();
            this.task.sstables.clear();
            if (!SSTableReader.acquireReferences(readers)) {
                throw new AssertionError((Object)"We shouldn't fail acquiring a reference on a sstable that has just been transferred");
            }
            try {
                cfs.addSSTables(readers);
                cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames());
            }
            finally {
                SSTableReader.releaseReferences(readers);
            }
            this.task.session.taskCompleted(this.task);
        }
    }
}

