/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapred;

import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobInProgress;
import org.apache.hadoop.mapred.JobStatus;
import org.apache.hadoop.mapred.JobTracker;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.UtilsForTests;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.IdentityReducer;

public class TestJobInProgress
extends TestCase {
    static final Log LOG = LogFactory.getLog(TestJobInProgress.class);
    private MiniMRCluster mrCluster;
    private MiniDFSCluster dfsCluster;
    JobTracker jt;
    private static Path TEST_DIR = new Path(System.getProperty("test.build.data", "/tmp"), "jip-testing");
    private static int numSlaves = 4;

    protected void setUp() throws Exception {
        super.setUp();
        Configuration conf = new Configuration();
        this.dfsCluster = new MiniDFSCluster(conf, numSlaves, true, null);
        this.mrCluster = new MiniMRCluster(numSlaves, this.dfsCluster.getFileSystem().getUri().toString(), 1);
        this.jt = this.mrCluster.getJobTrackerRunner().getJobTracker();
    }

    public void testPendingMapTaskCount() throws Exception {
        this.launchTask(FailMapTaskJob.class, IdentityReducer.class);
        this.checkTaskCounts();
    }

    public void testPendingReduceTaskCount() throws Exception {
        this.launchTask(IdentityMapper.class, FailReduceTaskJob.class);
        this.checkTaskCounts();
    }

    private void testRunningTaskCount(boolean speculation, boolean locality) throws Exception {
        LOG.info((Object)("Testing running jobs with speculation : " + speculation + ", locality : " + locality));
        this.dfsCluster.getFileSystem().delete(TEST_DIR, true);
        Path mapSignalFile = new Path(TEST_DIR, "map-signal");
        Path redSignalFile = new Path(TEST_DIR, "reduce-signal");
        JobConf job = this.configure(UtilsForTests.WaitingMapper.class, IdentityReducer.class, 1, 1, locality);
        job.set(UtilsForTests.getTaskSignalParameter(true), mapSignalFile.toString());
        job.set(UtilsForTests.getTaskSignalParameter(false), redSignalFile.toString());
        job.setFloat("mapred.reduce.slowstart.completed.maps", 0.0f);
        job.setSpeculativeExecution(speculation);
        JobClient jc = new JobClient(job);
        RunningJob running = jc.submitJob(job);
        JobTracker jobtracker = this.mrCluster.getJobTrackerRunner().getJobTracker();
        JobInProgress jip = jobtracker.getJob(running.getID());
        LOG.info((Object)("Running job " + jip.getJobID()));
        LOG.info((Object)("Waiting for job " + jip.getJobID() + " to be ready"));
        this.waitTillReady(jip, job);
        HashSet uniqueTasks = new HashSet();
        for (Map.Entry s : jip.getRunningMapCache().entrySet()) {
            uniqueTasks.addAll((Collection)s.getValue());
        }
        uniqueTasks.addAll(jip.getNonLocalRunningMaps());
        TestJobInProgress.assertEquals((String)("Running map count doesnt match for jobs with speculation " + speculation + ", and locality " + locality), (int)jip.runningMaps(), (int)uniqueTasks.size());
        TestJobInProgress.assertEquals((String)("Running reducer count doesnt match for jobs with speculation " + speculation + ", and locality " + locality), (int)jip.runningReduces(), (int)jip.getRunningReduces().size());
        LOG.info((Object)"Signaling the tasks");
        UtilsForTests.signalTasks(this.dfsCluster, this.dfsCluster.getFileSystem(), mapSignalFile.toString(), redSignalFile.toString(), numSlaves);
        LOG.info((Object)("Waiting for job " + jip.getJobID() + " to be complete"));
        UtilsForTests.waitTillDone(jc);
        this.dfsCluster.getFileSystem().delete(TEST_DIR, true);
    }

    private void waitTillReady(JobInProgress jip, JobConf job) {
        while (jip.runningMaps() < job.getNumMapTasks()) {
            UtilsForTests.waitFor(10L);
        }
        while (jip.runningReduces() < job.getNumReduceTasks()) {
            UtilsForTests.waitFor(10L);
        }
    }

    public void testRunningTaskCount() throws Exception {
        this.testRunningTaskCount(false, true);
        this.testRunningTaskCount(true, true);
        this.testRunningTaskCount(false, false);
        this.testRunningTaskCount(true, false);
    }

    protected void tearDown() throws Exception {
        this.mrCluster.shutdown();
        this.dfsCluster.shutdown();
        super.tearDown();
    }

    void launchTask(Class MapClass3, Class ReduceClass2) throws Exception {
        JobConf job = this.configure(MapClass3, ReduceClass2, 5, 10, true);
        try {
            JobClient.runJob((JobConf)job);
        }
        catch (IOException ioe) {
            // empty catch block
        }
    }

    JobConf configure(Class MapClass3, Class ReduceClass2, int maps, int reducers, boolean locality) throws Exception {
        JobConf jobConf = this.mrCluster.createJobConf();
        Path inDir = new Path("./failjob/input");
        Path outDir = new Path("./failjob/output");
        String input = "Test failing job.\n One more line";
        FileSystem inFs = inDir.getFileSystem((Configuration)jobConf);
        FileSystem outFs = outDir.getFileSystem((Configuration)jobConf);
        outFs.delete(outDir, true);
        if (!inFs.mkdirs(inDir)) {
            throw new IOException("create directory failed" + inDir.toString());
        }
        FSDataOutputStream file = inFs.create(new Path(inDir, "part-0"));
        file.writeBytes(input);
        file.close();
        jobConf.setJobName("failmaptask");
        if (locality) {
            jobConf.setInputFormat(TextInputFormat.class);
        } else {
            jobConf.setInputFormat(UtilsForTests.RandomInputFormat.class);
        }
        jobConf.setOutputKeyClass(Text.class);
        jobConf.setOutputValueClass(Text.class);
        jobConf.setMapperClass(MapClass3);
        jobConf.setCombinerClass(ReduceClass2);
        jobConf.setReducerClass(ReduceClass2);
        FileInputFormat.setInputPaths((JobConf)jobConf, (Path[])new Path[]{inDir});
        FileOutputFormat.setOutputPath((JobConf)jobConf, (Path)outDir);
        jobConf.setNumMapTasks(maps);
        jobConf.setNumReduceTasks(reducers);
        return jobConf;
    }

    void checkTaskCounts() {
        JobStatus[] status;
        for (JobStatus js : status = this.jt.getAllJobs()) {
            JobInProgress jip = this.jt.getJob(js.getJobID());
            Counters counter = jip.getJobCounters();
            long totalTaskCount = counter.getCounter((Enum)JobInProgress.Counter.TOTAL_LAUNCHED_MAPS) + counter.getCounter((Enum)JobInProgress.Counter.TOTAL_LAUNCHED_REDUCES);
            while ((long)jip.getNumTaskCompletionEvents() < totalTaskCount) {
                TestJobInProgress.assertEquals((boolean)true, (jip.runningMaps() >= 0 ? 1 : 0) != 0);
                TestJobInProgress.assertEquals((boolean)true, (jip.pendingMaps() >= 0 ? 1 : 0) != 0);
                TestJobInProgress.assertEquals((boolean)true, (jip.runningReduces() >= 0 ? 1 : 0) != 0);
                TestJobInProgress.assertEquals((boolean)true, (jip.pendingReduces() >= 0 ? 1 : 0) != 0);
            }
        }
    }

    public static class FailReduceTaskJob
    extends MapReduceBase
    implements Reducer {
        public void reduce(Object key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException e) {
                throw new IllegalArgumentException("Failing Reduce task");
            }
            throw new IllegalArgumentException("Failing Reduce task");
        }
    }

    public static class FailMapTaskJob
    extends MapReduceBase
    implements Mapper<LongWritable, Text, Text, IntWritable> {
        public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException e) {
                throw new IllegalArgumentException("Interrupted MAP task");
            }
            throw new IllegalArgumentException("Failing MAP task");
        }
    }
}

