/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.streaming.scheduler;

import java.nio.ByteBuffer;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.Logging;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.scheduler.AllocatedBlocks;
import org.apache.spark.streaming.scheduler.BatchAllocationEvent;
import org.apache.spark.streaming.scheduler.BatchCleanupEvent;
import org.apache.spark.streaming.scheduler.BlockAdditionEvent;
import org.apache.spark.streaming.scheduler.ReceivedBlockInfo;
import org.apache.spark.streaming.scheduler.ReceivedBlockTracker$;
import org.apache.spark.streaming.scheduler.ReceivedBlockTrackerLogEvent;
import org.apache.spark.streaming.util.WriteAheadLog;
import org.apache.spark.streaming.util.WriteAheadLogUtils$;
import org.apache.spark.util.Clock;
import org.apache.spark.util.Utils$;
import org.slf4j.Logger;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.Option;
import scala.Predef$;
import scala.Serializable;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.Iterator;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Map;
import scala.collection.mutable.HashMap;
import scala.collection.mutable.Queue;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.Nothing$;
import scala.util.control.NonFatal$;

@ScalaSignature(bytes="\u0006\u0001\u0005}f!B\u0001\u0003\u0001\u0011a!\u0001\u0006*fG\u0016Lg/\u001a3CY>\u001c7\u000e\u0016:bG.,'O\u0003\u0002\u0004\t\u0005I1o\u00195fIVdWM\u001d\u0006\u0003\u000b\u0019\t\u0011b\u001d;sK\u0006l\u0017N\\4\u000b\u0005\u001dA\u0011!B:qCJ\\'BA\u0005\u000b\u0003\u0019\t\u0007/Y2iK*\t1\"A\u0002pe\u001e\u001c2\u0001A\u0007\u0014!\tq\u0011#D\u0001\u0010\u0015\u0005\u0001\u0012!B:dC2\f\u0017B\u0001\n\u0010\u0005\u0019\te.\u001f*fMB\u0011A#F\u0007\u0002\r%\u0011aC\u0002\u0002\b\u0019><w-\u001b8h\u0011!A\u0002A!A!\u0002\u0013Q\u0012\u0001B2p]\u001a\u001c\u0001\u0001\u0005\u0002\u00157%\u0011AD\u0002\u0002\n'B\f'o[\"p]\u001aD\u0001B\b\u0001\u0003\u0002\u0003\u0006IaH\u0001\u000bQ\u0006$wn\u001c9D_:4\u0007C\u0001\u0011%\u001b\u0005\t#B\u0001\r#\u0015\t\u0019\u0003\"\u0001\u0004iC\u0012|w\u000e]\u0005\u0003K\u0005\u0012QbQ8oM&<WO]1uS>t\u0007\u0002C\u0014\u0001\u0005\u0003\u0005\u000b\u0011\u0002\u0015\u0002\u0013M$(/Z1n\u0013\u0012\u001c\bcA\u00152i9\u0011!f\f\b\u0003W9j\u0011\u0001\f\u0006\u0003[e\ta\u0001\u0010:p_Rt\u0014\"\u0001\t\n\u0005Az\u0011a\u00029bG.\fw-Z\u0005\u0003eM\u00121aU3r\u0015\t\u0001t\u0002\u0005\u0002\u000fk%\u0011ag\u0004\u0002\u0004\u0013:$\b\u0002\u0003\u001d\u0001\u0005\u0003\u0005\u000b\u0011B\u001d\u0002\u000b\rdwnY6\u0011\u0005ijT\"A\u001e\u000b\u0005q2\u0011\u0001B;uS2L!AP\u001e\u0003\u000b\rcwnY6\t\u0011\u0001\u0003!\u0011!Q\u0001\n\u0005\u000b\u0001D]3d_Z,'O\u0012:p[^\u0013\u0018\u000e^3BQ\u0016\fG\rT8h!\tq!)\u0003\u0002D\u001f\t9!i\\8mK\u0006t\u0007\u0002C#\u0001\u0005\u0003\u0005\u000b\u0011\u0002$\u0002'\rDWmY6q_&tG\u000fR5s\u001fB$\u0018n\u001c8\u0011\u000799\u0015*\u0003\u0002I\u001f\t1q\n\u001d;j_:\u0004\"AS'\u000f\u00059Y\u0015B\u0001'\u0010\u0003\u0019\u0001&/\u001a3fM&\u0011aj\u0014\u0002\u0007'R\u0014\u0018N\\4\u000b\u00051{\u0001\"B)\u0001\t\u0003\u0011\u0016A\u0002\u001fj]&$h\bF\u0004T+Z;\u0006,\u0017.\u0011\u0005Q\u0003Q\"\u0001\u0002\t\u000ba\u0001\u0006\u0019\u0001\u000e\t\u000by\u0001\u0006\u0019A\u0010\t\u000b\u001d\u0002\u0006\u0019\u0001\u0015\t\u000ba\u0002\u0006\u0019A\u001d\t\u000b\u0001\u0003\u0006\u0019A!\t\u000b\u0015\u0003\u0006\u0019\u0001$\u0006\tq\u0003A!\u0018\u0002\u0013%\u0016\u001cW-\u001b<fI\ncwnY6Rk\u0016,X\rE\u0002_G\u0016l\u0011a\u0018\u0006\u0003A\u0006\fq!\\;uC\ndWM\u0003\u0002c\u001f\u0005Q1m\u001c7mK\u000e$\u0018n\u001c8\n\u0005\u0011|&!B)vKV,\u0007C\u0001+g\u0013\t9'AA\tSK\u000e,\u0017N^3e\u00052|7m[%oM>Dq!\u001b\u0001C\u0002\u0013%!.\u0001\u0011tiJ,\u0017-\\%e)>,f.\u00197m_\u000e\fG/\u001a3CY>\u001c7.U;fk\u0016\u001cX#A6\u0011\tycGG\\\u0005\u0003[~\u0013q\u0001S1tQ6\u000b\u0007\u000f\u0005\u0002p76\t\u0001\u0001\u0003\u0004r\u0001\u0001\u0006Ia[\u0001\"gR\u0014X-Y7JIR{WK\\1mY>\u001c\u0017\r^3e\u00052|7m[)vKV,7\u000f\t\u0005\bg\u0002\u0011\r\u0011\"\u0003u\u0003U!\u0018.\\3U_\u0006cGn\\2bi\u0016$'\t\\8dWN,\u0012!\u001e\t\u0005=24(\u0010\u0005\u0002xq6\tA!\u0003\u0002z\t\t!A+[7f!\t!60\u0003\u0002}\u0005\ty\u0011\t\u001c7pG\u0006$X\r\u001a\"m_\u000e\\7\u000f\u0003\u0004\u007f\u0001\u0001\u0006I!^\u0001\u0017i&lW\rV8BY2|7-\u0019;fI\ncwnY6tA!I\u0011\u0011\u0001\u0001C\u0002\u0013%\u00111A\u0001\u0014oJLG/Z!iK\u0006$Gj\\4PaRLwN\\\u000b\u0003\u0003\u000b\u0001BAD$\u0002\bA!\u0011\u0011BA\u0007\u001b\t\tYA\u0003\u0002=\t%!\u0011qBA\u0006\u000559&/\u001b;f\u0003\",\u0017\r\u001a'pO\"A\u00111\u0003\u0001!\u0002\u0013\t)!\u0001\u000bxe&$X-\u00115fC\u0012dunZ(qi&|g\u000e\t\u0005\n\u0003/\u0001\u0001\u0019!C\u0005\u00033\ta\u0003\\1ti\u0006cGn\\2bi\u0016$')\u0019;dQRKW.Z\u000b\u0002m\"I\u0011Q\u0004\u0001A\u0002\u0013%\u0011qD\u0001\u001bY\u0006\u001cH/\u00117m_\u000e\fG/\u001a3CCR\u001c\u0007\u000eV5nK~#S-\u001d\u000b\u0005\u0003C\t9\u0003E\u0002\u000f\u0003GI1!!\n\u0010\u0005\u0011)f.\u001b;\t\u0013\u0005%\u00121DA\u0001\u0002\u00041\u0018a\u0001=%c!9\u0011Q\u0006\u0001!B\u00131\u0018a\u00067bgR\fE\u000e\\8dCR,GMQ1uG\"$\u0016.\\3!\u0011\u001d\t\t\u0004\u0001C\u0001\u0003g\t\u0001\"\u00193e\u00052|7m\u001b\u000b\u0004\u0003\u0006U\u0002bBA\u001c\u0003_\u0001\r!Z\u0001\u0012e\u0016\u001cW-\u001b<fI\ncwnY6J]\u001a|\u0007bBA\u001e\u0001\u0011\u0005\u0011QH\u0001\u0016C2dwnY1uK\ncwnY6t)>\u0014\u0015\r^2i)\u0011\t\t#a\u0010\t\u000f\u0005\u0005\u0013\u0011\ba\u0001m\u0006I!-\u0019;dQRKW.\u001a\u0005\b\u0003\u000b\u0002A\u0011AA$\u0003A9W\r\u001e\"m_\u000e\\7o\u00144CCR\u001c\u0007\u000e\u0006\u0003\u0002J\u0005E\u0003C\u0002&\u0002LQ\ny%C\u0002\u0002N=\u00131!T1q!\rI\u0013'\u001a\u0005\b\u0003\u0003\n\u0019\u00051\u0001w\u0011\u001d\t)\u0006\u0001C\u0001\u0003/\n\u0011dZ3u\u00052|7m[:PM\n\u000bGo\u00195B]\u0012\u001cFO]3b[R1\u0011qJA-\u00037Bq!!\u0011\u0002T\u0001\u0007a\u000fC\u0004\u0002^\u0005M\u0003\u0019\u0001\u001b\u0002\u0011M$(/Z1n\u0013\u0012Dq!!\u0019\u0001\t\u0003\t\u0019'\u0001\u000fiCN,f.\u00197m_\u000e\fG/\u001a3SK\u000e,\u0017N^3e\u00052|7m[:\u0016\u0003\u0005Cq!a\u001a\u0001\t\u0003\tI'\u0001\u000bhKR,f.\u00197m_\u000e\fG/\u001a3CY>\u001c7n\u001d\u000b\u0005\u0003\u001f\nY\u0007C\u0004\u0002^\u0005\u0015\u0004\u0019\u0001\u001b\t\u000f\u0005=\u0004\u0001\"\u0001\u0002r\u0005\t2\r\\3b]V\u0004x\n\u001c3CCR\u001c\u0007.Z:\u0015\r\u0005\u0005\u00121OA<\u0011\u001d\t)(!\u001cA\u0002Y\f\u0011c\u00197fC:,\b\u000f\u00165sKNDG+[7f\u0011\u001d\tI(!\u001cA\u0002\u0005\u000b\u0011c^1ji\u001a{'oQ8na2,G/[8o\u0011\u001d\ti\b\u0001C\u0001\u0003\u007f\nAa\u001d;paR\u0011\u0011\u0011\u0005\u0005\b\u0003\u0007\u0003A\u0011BA@\u0003E\u0011XmY8wKJ\u0004\u0016m\u001d;Fm\u0016tGo\u001d\u0005\b\u0003\u000f\u0003A\u0011BAE\u0003)9(/\u001b;f)>dun\u001a\u000b\u0004\u0003\u0006-\u0005\u0002CAG\u0003\u000b\u0003\r!a$\u0002\rI,7m\u001c:e!\r!\u0016\u0011S\u0005\u0004\u0003'\u0013!\u0001\b*fG\u0016Lg/\u001a3CY>\u001c7\u000e\u0016:bG.,'\u000fT8h\u000bZ,g\u000e\u001e\u0005\b\u0003/\u0003A\u0011BAM\u0003U9W\r\u001e*fG\u0016Lg/\u001a3CY>\u001c7.U;fk\u0016$2A\\AN\u0011\u001d\ti&!&A\u0002QBq!a(\u0001\t\u0013\t\t+A\nde\u0016\fG/Z,sSR,\u0017\t[3bI2{w\r\u0006\u0002\u0002\u0006!A\u0011Q\u0015\u0001\u0005\u0002\u0011\t\u0019'\u0001\fjg^\u0013\u0018\u000e^3BQ\u0016\fG\rT8h\u000b:\f'\r\\3e\u000f!\tIK\u0001E\u0001\t\u0005-\u0016\u0001\u0006*fG\u0016Lg/\u001a3CY>\u001c7\u000e\u0016:bG.,'\u000fE\u0002U\u0003[3q!\u0001\u0002\t\u0002\u0011\tykE\u0002\u0002.6Aq!UAW\t\u0003\t\u0019\f\u0006\u0002\u0002,\"A\u0011qWAW\t\u0003\tI,A\u000bdQ\u0016\u001c7\u000e]8j]R$\u0015N\u001d+p\u0019><G)\u001b:\u0015\u0007%\u000bY\fC\u0004\u0002>\u0006U\u0006\u0019A%\u0002\u001b\rDWmY6q_&tG\u000fR5s\u0001")
public class ReceivedBlockTracker
implements Logging {
    public final SparkConf org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$conf;
    public final Configuration org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$hadoopConf;
    private final Seq<Object> streamIds;
    private final Clock clock;
    public final Option<String> org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$checkpointDirOption;
    private final HashMap<Object, Queue<ReceivedBlockInfo>> streamIdToUnallocatedBlockQueues;
    private final HashMap<Time, AllocatedBlocks> timeToAllocatedBlocks;
    private final Option<WriteAheadLog> writeAheadLogOption;
    private Time lastAllocatedBatchTime;
    private transient Logger org$apache$spark$Logging$$log_;

    public static String checkpointDirToLogDir(String string) {
        return ReceivedBlockTracker$.MODULE$.checkpointDirToLogDir(string);
    }

    public Logger org$apache$spark$Logging$$log_() {
        return this.org$apache$spark$Logging$$log_;
    }

    public void org$apache$spark$Logging$$log__$eq(Logger x$1) {
        this.org$apache$spark$Logging$$log_ = x$1;
    }

    public String logName() {
        return Logging.class.logName((Logging)this);
    }

    public Logger log() {
        return Logging.class.log((Logging)this);
    }

    public void logInfo(Function0<String> msg) {
        Logging.class.logInfo((Logging)this, msg);
    }

    public void logDebug(Function0<String> msg) {
        Logging.class.logDebug((Logging)this, msg);
    }

    public void logTrace(Function0<String> msg) {
        Logging.class.logTrace((Logging)this, msg);
    }

    public void logWarning(Function0<String> msg) {
        Logging.class.logWarning((Logging)this, msg);
    }

    public void logError(Function0<String> msg) {
        Logging.class.logError((Logging)this, msg);
    }

    public void logInfo(Function0<String> msg, Throwable throwable) {
        Logging.class.logInfo((Logging)this, msg, (Throwable)throwable);
    }

    public void logDebug(Function0<String> msg, Throwable throwable) {
        Logging.class.logDebug((Logging)this, msg, (Throwable)throwable);
    }

    public void logTrace(Function0<String> msg, Throwable throwable) {
        Logging.class.logTrace((Logging)this, msg, (Throwable)throwable);
    }

    public void logWarning(Function0<String> msg, Throwable throwable) {
        Logging.class.logWarning((Logging)this, msg, (Throwable)throwable);
    }

    public void logError(Function0<String> msg, Throwable throwable) {
        Logging.class.logError((Logging)this, msg, (Throwable)throwable);
    }

    public boolean isTraceEnabled() {
        return Logging.class.isTraceEnabled((Logging)this);
    }

    private HashMap<Object, Queue<ReceivedBlockInfo>> streamIdToUnallocatedBlockQueues() {
        return this.streamIdToUnallocatedBlockQueues;
    }

    private HashMap<Time, AllocatedBlocks> timeToAllocatedBlocks() {
        return this.timeToAllocatedBlocks;
    }

    private Option<WriteAheadLog> writeAheadLogOption() {
        return this.writeAheadLogOption;
    }

    private Time lastAllocatedBatchTime() {
        return this.lastAllocatedBatchTime;
    }

    private void lastAllocatedBatchTime_$eq(Time x$1) {
        this.lastAllocatedBatchTime = x$1;
    }

    /*
     * WARNING - void declaration
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public boolean addBlock(ReceivedBlockInfo receivedBlockInfo) {
        boolean bl;
        try {
            void var2_2;
            boolean writeResult = this.writeToLog(new BlockAdditionEvent(receivedBlockInfo));
            if (writeResult) {
                ReceivedBlockTracker receivedBlockTracker = this;
                synchronized (receivedBlockTracker) {
                    this.org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$getReceivedBlockQueue(receivedBlockInfo.streamId()).$plus$eq((Object)receivedBlockInfo);
                    // MONITOREXIT @DISABLED, blocks:[0, 1, 2, 7] lbl9 : MonitorExitStatement: MONITOREXIT : receivedBlockTracker
                    this.logDebug((Function0<String>)new Serializable(this, receivedBlockInfo){
                        public static final long serialVersionUID = 0L;
                        private final ReceivedBlockInfo receivedBlockInfo$1;

                        public final String apply() {
                            return new StringBuilder().append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Stream ", " received "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)this.receivedBlockInfo$1.streamId())}))).append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"block ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.receivedBlockInfo$1.blockStoreResult().blockId()}))).toString();
                        }
                        {
                            this.receivedBlockInfo$1 = receivedBlockInfo$1;
                        }
                    });
                }
            } else {
                this.logDebug((Function0<String>)new Serializable(this, receivedBlockInfo){
                    public static final long serialVersionUID = 0L;
                    private final ReceivedBlockInfo receivedBlockInfo$1;

                    public final String apply() {
                        return new StringBuilder().append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Failed to acknowledge stream ", " receiving "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)this.receivedBlockInfo$1.streamId())}))).append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"block ", " in the Write Ahead Log."})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.receivedBlockInfo$1.blockStoreResult().blockId()}))).toString();
                    }
                    {
                        this.receivedBlockInfo$1 = receivedBlockInfo$1;
                    }
                });
            }
            bl = var2_2;
            return bl;
        }
        catch (Throwable throwable) {
            boolean bl2;
            Throwable throwable2 = throwable;
            Option option = NonFatal$.MODULE$.unapply(throwable2);
            if (option.isEmpty()) {
                throw throwable;
            }
            Throwable e = (Throwable)option.get();
            this.logError((Function0<String>)new Serializable(this, receivedBlockInfo){
                public static final long serialVersionUID = 0L;
                private final ReceivedBlockInfo receivedBlockInfo$1;

                public final String apply() {
                    return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Error adding block ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.receivedBlockInfo$1}));
                }
                {
                    this.receivedBlockInfo$1 = receivedBlockInfo$1;
                }
            }, e);
            bl = bl2 = false;
        }
        return bl;
    }

    public synchronized void allocateBlocksToBatch(Time batchTime) {
        if (this.lastAllocatedBatchTime() == null || batchTime.$greater(this.lastAllocatedBatchTime())) {
            Map streamIdToBlocks = ((TraversableOnce)this.streamIds.map((Function1)new Serializable(this){
                public static final long serialVersionUID = 0L;
                private final /* synthetic */ ReceivedBlockTracker $outer;

                public final Tuple2<Object, scala.collection.mutable.Seq<ReceivedBlockInfo>> apply(int streamId) {
                    return new Tuple2((Object)BoxesRunTime.boxToInteger((int)streamId), (Object)this.$outer.org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$getReceivedBlockQueue(streamId).dequeueAll((Function1)new Serializable(this){
                        public static final long serialVersionUID = 0L;

                        public final boolean apply(ReceivedBlockInfo x) {
                            return true;
                        }
                    }));
                }
                {
                    if ($outer == null) {
                        throw null;
                    }
                    this.$outer = $outer;
                }
            }, Seq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms());
            AllocatedBlocks allocatedBlocks = new AllocatedBlocks((Map<Object, Seq<ReceivedBlockInfo>>)streamIdToBlocks);
            if (this.writeToLog(new BatchAllocationEvent(batchTime, allocatedBlocks))) {
                this.timeToAllocatedBlocks().put((Object)batchTime, (Object)allocatedBlocks);
                this.lastAllocatedBatchTime_$eq(batchTime);
            } else {
                this.logInfo((Function0<String>)new Serializable(this, batchTime){
                    public static final long serialVersionUID = 0L;
                    private final Time batchTime$1;

                    public final String apply() {
                        return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Possibly processed batch ", " need to be processed again in WAL recovery"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.batchTime$1}));
                    }
                    {
                        this.batchTime$1 = batchTime$1;
                    }
                });
            }
        } else {
            this.logInfo((Function0<String>)new Serializable(this, batchTime){
                public static final long serialVersionUID = 0L;
                private final Time batchTime$1;

                public final String apply() {
                    return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Possibly processed batch ", " need to be processed again in WAL recovery"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.batchTime$1}));
                }
                {
                    this.batchTime$1 = batchTime$1;
                }
            });
        }
    }

    public synchronized Map<Object, Seq<ReceivedBlockInfo>> getBlocksOfBatch(Time batchTime) {
        return (Map)this.timeToAllocatedBlocks().get((Object)batchTime).map((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final Map<Object, Seq<ReceivedBlockInfo>> apply(AllocatedBlocks x$1) {
                return x$1.streamIdToAllocatedBlocks();
            }
        }).getOrElse((Function0)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final Map<Object, Nothing$> apply() {
                return Predef$.MODULE$.Map().empty();
            }
        });
    }

    public synchronized Seq<ReceivedBlockInfo> getBlocksOfBatchAndStream(Time batchTime, int streamId) {
        return (Seq)this.timeToAllocatedBlocks().get((Object)batchTime).map((Function1)new Serializable(this, streamId){
            public static final long serialVersionUID = 0L;
            private final int streamId$1;

            public final Seq<ReceivedBlockInfo> apply(AllocatedBlocks x$2) {
                return x$2.getBlocksOfStream(this.streamId$1);
            }
            {
                this.streamId$1 = streamId$1;
            }
        }).getOrElse((Function0)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final Seq<Nothing$> apply() {
                return (Seq)Seq$.MODULE$.empty();
            }
        });
    }

    public synchronized boolean hasUnallocatedReceivedBlocks() {
        return !this.streamIdToUnallocatedBlockQueues().values().forall((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final boolean apply(Queue<ReceivedBlockInfo> x$3) {
                return x$3.isEmpty();
            }
        });
    }

    public synchronized Seq<ReceivedBlockInfo> getUnallocatedBlocks(int streamId) {
        return this.org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$getReceivedBlockQueue(streamId).toSeq();
    }

    public synchronized void cleanupOldBatches(Time cleanupThreshTime, boolean waitForCompletion) {
        Predef$.MODULE$.require(cleanupThreshTime.milliseconds() < this.clock.getTimeMillis());
        Seq timesToCleanup = ((TraversableOnce)this.timeToAllocatedBlocks().keys().filter((Function1)new Serializable(this, cleanupThreshTime){
            public static final long serialVersionUID = 0L;
            private final Time cleanupThreshTime$1;

            public final boolean apply(Time x$4) {
                return x$4.$less(this.cleanupThreshTime$1);
            }
            {
                this.cleanupThreshTime$1 = cleanupThreshTime$1;
            }
        })).toSeq();
        this.logInfo((Function0<String>)new Serializable(this, timesToCleanup){
            public static final long serialVersionUID = 0L;
            private final Seq timesToCleanup$1;

            public final String apply() {
                return new StringBuilder().append((Object)"Deleting batches ").append((Object)this.timesToCleanup$1).toString();
            }
            {
                this.timesToCleanup$1 = timesToCleanup$1;
            }
        });
        if (this.writeToLog(new BatchCleanupEvent((Seq<Time>)timesToCleanup))) {
            this.timeToAllocatedBlocks().$minus$minus$eq((TraversableOnce)timesToCleanup);
            this.writeAheadLogOption().foreach((Function1)new Serializable(this, cleanupThreshTime, waitForCompletion){
                public static final long serialVersionUID = 0L;
                private final Time cleanupThreshTime$1;
                private final boolean waitForCompletion$1;

                public final void apply(WriteAheadLog x$5) {
                    x$5.clean(this.cleanupThreshTime$1.milliseconds(), this.waitForCompletion$1);
                }
                {
                    this.cleanupThreshTime$1 = cleanupThreshTime$1;
                    this.waitForCompletion$1 = waitForCompletion$1;
                }
            });
        } else {
            this.logWarning((Function0<String>)new Serializable(this){
                public static final long serialVersionUID = 0L;

                public final String apply() {
                    return "Failed to acknowledge batch clean up in the Write Ahead Log.";
                }
            });
        }
    }

    public void stop() {
        this.writeAheadLogOption().foreach((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final void apply(WriteAheadLog x$6) {
                x$6.close();
            }
        });
    }

    private synchronized void recoverPastEvents() {
        this.writeAheadLogOption().foreach((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ ReceivedBlockTracker $outer;

            public final void apply(WriteAheadLog writeAheadLog) {
                this.$outer.logInfo((Function0<String>)new Serializable(this){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ $anonfun$recoverPastEvents$1 $outer;

                    public final String apply() {
                        return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Recovering from write ahead logs in ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.$outer.org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$anonfun$$$outer().org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$checkpointDirOption.get()}));
                    }
                    {
                        if ($outer == null) {
                            throw null;
                        }
                        this.$outer = $outer;
                    }
                });
                ((Iterator)JavaConverters$.MODULE$.asScalaIteratorConverter(writeAheadLog.readAll()).asScala()).foreach((Function1)new Serializable(this){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ $anonfun$recoverPastEvents$1 $outer;

                    public final void apply(ByteBuffer byteBuffer) {
                        ReceivedBlockTrackerLogEvent receivedBlockTrackerLogEvent;
                        block5: {
                            block3: {
                                block4: {
                                    block2: {
                                        this.$outer.org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$anonfun$$$outer().logTrace((Function0<String>)new Serializable(this, byteBuffer){
                                            public static final long serialVersionUID = 0L;
                                            private final ByteBuffer byteBuffer$1;

                                            public final String apply() {
                                                return new StringBuilder().append((Object)"Recovering record ").append((Object)this.byteBuffer$1).toString();
                                            }
                                            {
                                                this.byteBuffer$1 = byteBuffer$1;
                                            }
                                        });
                                        receivedBlockTrackerLogEvent = (ReceivedBlockTrackerLogEvent)Utils$.MODULE$.deserialize(byteBuffer.array(), Thread.currentThread().getContextClassLoader());
                                        if (!(receivedBlockTrackerLogEvent instanceof BlockAdditionEvent)) break block2;
                                        BlockAdditionEvent blockAdditionEvent = (BlockAdditionEvent)receivedBlockTrackerLogEvent;
                                        ReceivedBlockInfo receivedBlockInfo = blockAdditionEvent.receivedBlockInfo();
                                        this.$outer.org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$anonfun$$$outer().org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$insertAddedBlock$1(receivedBlockInfo);
                                        BoxedUnit boxedUnit = BoxedUnit.UNIT;
                                        break block3;
                                    }
                                    if (!(receivedBlockTrackerLogEvent instanceof BatchAllocationEvent)) break block4;
                                    BatchAllocationEvent batchAllocationEvent = (BatchAllocationEvent)receivedBlockTrackerLogEvent;
                                    Time time = batchAllocationEvent.time();
                                    AllocatedBlocks allocatedBlocks = batchAllocationEvent.allocatedBlocks();
                                    this.$outer.org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$anonfun$$$outer().org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$insertAllocatedBatch$1(time, allocatedBlocks);
                                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                                    break block3;
                                }
                                if (!(receivedBlockTrackerLogEvent instanceof BatchCleanupEvent)) break block5;
                                BatchCleanupEvent batchCleanupEvent = (BatchCleanupEvent)receivedBlockTrackerLogEvent;
                                Seq<Time> batchTimes = batchCleanupEvent.times();
                                this.$outer.org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$anonfun$$$outer().org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$cleanupBatches$1(batchTimes);
                                BoxedUnit boxedUnit = BoxedUnit.UNIT;
                            }
                            return;
                        }
                        throw new MatchError((Object)receivedBlockTrackerLogEvent);
                    }
                    {
                        if ($outer == null) {
                            throw null;
                        }
                        this.$outer = $outer;
                    }
                });
            }

            public /* synthetic */ ReceivedBlockTracker org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$anonfun$$$outer() {
                return this.$outer;
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
            }
        });
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private boolean writeToLog(ReceivedBlockTrackerLogEvent record) {
        boolean bl;
        if (!this.isWriteAheadLogEnabled()) return true;
        this.logTrace((Function0<String>)new Serializable(this, record){
            public static final long serialVersionUID = 0L;
            private final ReceivedBlockTrackerLogEvent record$1;

            public final String apply() {
                return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Writing record: ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.record$1}));
            }
            {
                this.record$1 = record$1;
            }
        });
        try {
            ((WriteAheadLog)this.writeAheadLogOption().get()).write(ByteBuffer.wrap(Utils$.MODULE$.serialize((Object)record)), this.clock.getTimeMillis());
            return true;
        }
        catch (Throwable throwable) {
            boolean bl2;
            Throwable throwable2 = throwable;
            Option option = NonFatal$.MODULE$.unapply(throwable2);
            if (option.isEmpty()) {
                throw throwable;
            }
            Throwable e = (Throwable)option.get();
            this.logWarning((Function0<String>)new Serializable(this, record){
                public static final long serialVersionUID = 0L;
                private final ReceivedBlockTrackerLogEvent record$1;

                public final String apply() {
                    return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Exception thrown while writing record: ", " to the WriteAheadLog."})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.record$1}));
                }
                {
                    this.record$1 = record$1;
                }
            }, e);
            bl = bl2 = false;
        }
        return bl;
    }

    public Queue<ReceivedBlockInfo> org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$getReceivedBlockQueue(int streamId) {
        return (Queue)this.streamIdToUnallocatedBlockQueues().getOrElseUpdate((Object)BoxesRunTime.boxToInteger((int)streamId), (Function0)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final Queue<ReceivedBlockInfo> apply() {
                return new Queue();
            }
        });
    }

    private Option<WriteAheadLog> createWriteAheadLog() {
        return this.org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$checkpointDirOption.map((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ ReceivedBlockTracker $outer;

            public final WriteAheadLog apply(String checkpointDir) {
                String logDir = ReceivedBlockTracker$.MODULE$.checkpointDirToLogDir((String)this.$outer.org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$checkpointDirOption.get());
                return WriteAheadLogUtils$.MODULE$.createLogForDriver(this.$outer.org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$conf, logDir, this.$outer.org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$hadoopConf);
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
            }
        });
    }

    public boolean isWriteAheadLogEnabled() {
        return this.writeAheadLogOption().nonEmpty();
    }

    public final void org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$insertAddedBlock$1(ReceivedBlockInfo receivedBlockInfo) {
        this.logTrace((Function0<String>)new Serializable(this, receivedBlockInfo){
            public static final long serialVersionUID = 0L;
            private final ReceivedBlockInfo receivedBlockInfo$2;

            public final String apply() {
                return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Recovery: Inserting added block ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.receivedBlockInfo$2}));
            }
            {
                this.receivedBlockInfo$2 = receivedBlockInfo$2;
            }
        });
        receivedBlockInfo.setBlockIdInvalid();
        this.org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$getReceivedBlockQueue(receivedBlockInfo.streamId()).$plus$eq((Object)receivedBlockInfo);
    }

    public final void org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$insertAllocatedBatch$1(Time batchTime, AllocatedBlocks allocatedBlocks) {
        this.logTrace((Function0<String>)new Serializable(this, batchTime, allocatedBlocks){
            public static final long serialVersionUID = 0L;
            private final Time batchTime$2;
            private final AllocatedBlocks allocatedBlocks$1;

            public final String apply() {
                return new StringBuilder().append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Recovery: Inserting allocated batch for time ", " to "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.batchTime$2}))).append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.allocatedBlocks$1.streamIdToAllocatedBlocks()}))).toString();
            }
            {
                this.batchTime$2 = batchTime$2;
                this.allocatedBlocks$1 = allocatedBlocks$1;
            }
        });
        this.streamIdToUnallocatedBlockQueues().values().foreach((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final void apply(Queue<ReceivedBlockInfo> x$7) {
                x$7.clear();
            }
        });
        this.timeToAllocatedBlocks().put((Object)batchTime, (Object)allocatedBlocks);
        this.lastAllocatedBatchTime_$eq(batchTime);
    }

    public final void org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$cleanupBatches$1(Seq batchTimes) {
        this.logTrace((Function0<String>)new Serializable(this, batchTimes){
            public static final long serialVersionUID = 0L;
            private final Seq batchTimes$1;

            public final String apply() {
                return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Recovery: Cleaning up batches ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.batchTimes$1}));
            }
            {
                this.batchTimes$1 = batchTimes$1;
            }
        });
        this.timeToAllocatedBlocks().$minus$minus$eq((TraversableOnce)batchTimes);
    }

    public ReceivedBlockTracker(SparkConf conf, Configuration hadoopConf, Seq<Object> streamIds2, Clock clock, boolean recoverFromWriteAheadLog, Option<String> checkpointDirOption) {
        this.org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$conf = conf;
        this.org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$hadoopConf = hadoopConf;
        this.streamIds = streamIds2;
        this.clock = clock;
        this.org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$checkpointDirOption = checkpointDirOption;
        Logging.class.$init$((Logging)this);
        this.streamIdToUnallocatedBlockQueues = new HashMap();
        this.timeToAllocatedBlocks = new HashMap();
        this.writeAheadLogOption = this.createWriteAheadLog();
        this.lastAllocatedBatchTime = null;
        if (recoverFromWriteAheadLog) {
            this.recoverPastEvents();
        }
    }
}

