/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapreduce;

import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.io.PrintStream;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.ClusterMapReduceTestCase;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobPriority;
import org.apache.hadoop.mapreduce.JobSubmissionFiles;
import org.apache.hadoop.mapreduce.MapReduceTestUtil;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.tools.CLI;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class TestMRJobClient
extends ClusterMapReduceTestCase {
    private static final Log LOG = LogFactory.getLog(TestMRJobClient.class);

    private Job runJob(Configuration conf) throws Exception {
        String input = "hello1\nhello2\nhello3\n";
        Job job = MapReduceTestUtil.createJob(conf, this.getInputDir(), this.getOutputDir(), 1, 1, input);
        job.setJobName("mr");
        job.setPriority(JobPriority.NORMAL);
        job.waitForCompletion(true);
        return job;
    }

    private Job runJobInBackGround(Configuration conf) throws Exception {
        String input = "hello1\nhello2\nhello3\n";
        Job job = MapReduceTestUtil.createJob(conf, this.getInputDir(), this.getOutputDir(), 1, 1, input);
        job.setJobName("mr");
        job.setPriority(JobPriority.NORMAL);
        job.submit();
        int i = 0;
        while (i++ < 200 && job.getJobID() == null) {
            LOG.info((Object)"waiting for jobId...");
            Thread.sleep(100L);
        }
        return job;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static int runTool(Configuration conf, Tool tool, String[] args, OutputStream out) throws Exception {
        PrintStream oldOut = System.out;
        PrintStream newOut = new PrintStream(out, true);
        try {
            System.setOut(newOut);
            int n = ToolRunner.run((Configuration)conf, (Tool)tool, (String[])args);
            return n;
        }
        finally {
            System.setOut(oldOut);
        }
    }

    public void testJobSubmissionSpecsAndFiles() throws Exception {
        JobConf conf = this.createJobConf();
        Job job = MapReduceTestUtil.createJob((Configuration)conf, this.getInputDir(), this.getOutputDir(), 1, 1);
        job.setOutputFormatClass(BadOutputFormat.class);
        try {
            job.submit();
            TestMRJobClient.fail((String)"Should've thrown an exception while checking output specs.");
        }
        catch (Exception e) {
            TestMRJobClient.assertTrue((boolean)(e instanceof IOException));
        }
        Cluster cluster = new Cluster((Configuration)conf);
        Path jobStagingArea = JobSubmissionFiles.getStagingDir((Cluster)cluster, (Configuration)job.getConfiguration());
        Path submitJobDir = new Path(jobStagingArea, "JobId");
        Path submitJobFile = JobSubmissionFiles.getJobConfPath((Path)submitJobDir);
        TestMRJobClient.assertFalse((String)"Shouldn't have created a job file if job specs failed.", (boolean)FileSystem.get((Configuration)conf).exists(submitJobFile));
    }

    public void testJobClient() throws Exception {
        JobConf conf = this.createJobConf();
        Job job = this.runJob((Configuration)conf);
        String jobId = job.getJobID().toString();
        this.testAllJobList(jobId, (Configuration)conf);
        this.testSubmittedJobList((Configuration)conf);
        this.testGetCounter(jobId, (Configuration)conf);
        this.testJobStatus(jobId, (Configuration)conf);
        this.testJobEvents(jobId, (Configuration)conf);
        this.testJobHistory((Configuration)conf);
        this.testListTrackers((Configuration)conf);
        this.testListAttemptIds(jobId, (Configuration)conf);
        this.testListBlackList((Configuration)conf);
        this.startStop();
        this.testChangingJobPriority(jobId, (Configuration)conf);
        this.testSubmit((Configuration)conf);
        this.testKillTask((Configuration)conf);
        this.testfailTask((Configuration)conf);
        this.testKillJob((Configuration)conf);
    }

    private void testfailTask(Configuration conf) throws Exception {
        Job job = this.runJobInBackGround(conf);
        CLI jc = this.createJobClient();
        TaskID tid = new TaskID(job.getJobID(), TaskType.MAP, 0);
        TaskAttemptID taid = new TaskAttemptID(tid, 1);
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        int exitCode = TestMRJobClient.runTool(conf, (Tool)jc, new String[]{"-fail-task"}, out);
        TestMRJobClient.assertEquals((String)"Exit code", (int)-1, (int)exitCode);
        TestMRJobClient.runTool(conf, (Tool)jc, new String[]{"-fail-task", taid.toString()}, out);
        String answer = new String(out.toByteArray(), "UTF-8");
        Assert.assertTrue((boolean)answer.contains("Killed task " + taid + " by failing it"));
    }

    private void testKillTask(Configuration conf) throws Exception {
        Job job = this.runJobInBackGround(conf);
        CLI jc = this.createJobClient();
        TaskID tid = new TaskID(job.getJobID(), TaskType.MAP, 0);
        TaskAttemptID taid = new TaskAttemptID(tid, 1);
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        int exitCode = TestMRJobClient.runTool(conf, (Tool)jc, new String[]{"-kill-task"}, out);
        TestMRJobClient.assertEquals((String)"Exit code", (int)-1, (int)exitCode);
        TestMRJobClient.runTool(conf, (Tool)jc, new String[]{"-kill-task", taid.toString()}, out);
        String answer = new String(out.toByteArray(), "UTF-8");
        Assert.assertTrue((boolean)answer.contains("Killed task " + taid));
    }

    private void testKillJob(Configuration conf) throws Exception {
        Job job = this.runJobInBackGround(conf);
        String jobId = job.getJobID().toString();
        CLI jc = this.createJobClient();
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        int exitCode = TestMRJobClient.runTool(conf, (Tool)jc, new String[]{"-kill"}, out);
        TestMRJobClient.assertEquals((String)"Exit code", (int)-1, (int)exitCode);
        exitCode = TestMRJobClient.runTool(conf, (Tool)jc, new String[]{"-kill", jobId}, out);
        TestMRJobClient.assertEquals((String)"Exit code", (int)0, (int)exitCode);
        String answer = new String(out.toByteArray(), "UTF-8");
        TestMRJobClient.assertTrue((boolean)answer.contains("Killed job " + jobId));
    }

    private void testSubmit(Configuration conf) throws Exception {
        CLI jc = this.createJobClient();
        Job job = MapReduceTestUtil.createJob(conf, this.getInputDir(), this.getOutputDir(), 1, 1, "ping");
        job.setJobName("mr");
        job.setPriority(JobPriority.NORMAL);
        File fcon = File.createTempFile("config", ".xml");
        LocalFileSystem localFs = FileSystem.getLocal((Configuration)conf);
        String fconUri = new Path(fcon.getAbsolutePath()).makeQualified(localFs.getUri(), localFs.getWorkingDirectory()).toUri().toString();
        job.getConfiguration().writeXml((OutputStream)new FileOutputStream(fcon));
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        int exitCode = TestMRJobClient.runTool(conf, (Tool)jc, new String[]{"-submit"}, out);
        TestMRJobClient.assertEquals((String)"Exit code", (int)-1, (int)exitCode);
        exitCode = TestMRJobClient.runTool(conf, (Tool)jc, new String[]{"-submit", fconUri}, out);
        TestMRJobClient.assertEquals((String)"Exit code", (int)0, (int)exitCode);
        String answer = new String(out.toByteArray());
        TestMRJobClient.assertTrue((boolean)answer.contains("Created job "));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void startStop() {
        ByteArrayOutputStream data = new ByteArrayOutputStream();
        PrintStream error = System.err;
        System.setErr(new PrintStream(data));
        ExitUtil.disableSystemExit();
        try {
            CLI.main((String[])new String[0]);
            TestMRJobClient.fail((String)" CLI.main should call System.exit");
        }
        catch (ExitUtil.ExitException e) {
            ExitUtil.resetFirstExitException();
            TestMRJobClient.assertEquals((int)-1, (int)e.status);
        }
        catch (Exception e) {
        }
        finally {
            System.setErr(error);
        }
        String s = new String(data.toByteArray());
        TestMRJobClient.assertTrue((boolean)s.contains("-submit"));
        TestMRJobClient.assertTrue((boolean)s.contains("-status"));
        TestMRJobClient.assertTrue((boolean)s.contains("-kill"));
        TestMRJobClient.assertTrue((boolean)s.contains("-set-priority"));
        TestMRJobClient.assertTrue((boolean)s.contains("-events"));
        TestMRJobClient.assertTrue((boolean)s.contains("-history"));
        TestMRJobClient.assertTrue((boolean)s.contains("-list"));
        TestMRJobClient.assertTrue((boolean)s.contains("-list-active-trackers"));
        TestMRJobClient.assertTrue((boolean)s.contains("-list-blacklisted-trackers"));
        TestMRJobClient.assertTrue((boolean)s.contains("-list-attempt-ids"));
        TestMRJobClient.assertTrue((boolean)s.contains("-kill-task"));
        TestMRJobClient.assertTrue((boolean)s.contains("-fail-task"));
        TestMRJobClient.assertTrue((boolean)s.contains("-logs"));
    }

    private void testListBlackList(Configuration conf) throws Exception {
        String line;
        CLI jc = this.createJobClient();
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        int exitCode = TestMRJobClient.runTool(conf, (Tool)jc, new String[]{"-list-blacklisted-trackers", "second in"}, out);
        TestMRJobClient.assertEquals((String)"Exit code", (int)-1, (int)exitCode);
        exitCode = TestMRJobClient.runTool(conf, (Tool)jc, new String[]{"-list-blacklisted-trackers"}, out);
        TestMRJobClient.assertEquals((String)"Exit code", (int)0, (int)exitCode);
        BufferedReader br = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(out.toByteArray())));
        int counter = 0;
        while ((line = br.readLine()) != null) {
            LOG.info((Object)("line = " + line));
            ++counter;
        }
        TestMRJobClient.assertEquals((int)0, (int)counter);
    }

    private void testListAttemptIds(String jobId, Configuration conf) throws Exception {
        String line;
        CLI jc = this.createJobClient();
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        int exitCode = TestMRJobClient.runTool(conf, (Tool)jc, new String[]{"-list-attempt-ids"}, out);
        TestMRJobClient.assertEquals((String)"Exit code", (int)-1, (int)exitCode);
        exitCode = TestMRJobClient.runTool(conf, (Tool)jc, new String[]{"-list-attempt-ids", jobId, "MAP", "completed"}, out);
        TestMRJobClient.assertEquals((String)"Exit code", (int)0, (int)exitCode);
        BufferedReader br = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(out.toByteArray())));
        int counter = 0;
        while ((line = br.readLine()) != null) {
            LOG.info((Object)("line = " + line));
            ++counter;
        }
        TestMRJobClient.assertEquals((int)1, (int)counter);
    }

    private void testListTrackers(Configuration conf) throws Exception {
        String line;
        CLI jc = this.createJobClient();
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        int exitCode = TestMRJobClient.runTool(conf, (Tool)jc, new String[]{"-list-active-trackers", "second parameter"}, out);
        TestMRJobClient.assertEquals((String)"Exit code", (int)-1, (int)exitCode);
        exitCode = TestMRJobClient.runTool(conf, (Tool)jc, new String[]{"-list-active-trackers"}, out);
        TestMRJobClient.assertEquals((String)"Exit code", (int)0, (int)exitCode);
        BufferedReader br = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(out.toByteArray())));
        int counter = 0;
        while ((line = br.readLine()) != null) {
            LOG.info((Object)("line = " + line));
            ++counter;
        }
        TestMRJobClient.assertEquals((int)2, (int)counter);
    }

    private void testJobHistory(Configuration conf) throws Exception {
        String line;
        CLI jc = this.createJobClient();
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        File f = new File("src/test/resources/job_1329348432655_0001-10.jhist");
        LocalFileSystem localFs = FileSystem.getLocal((Configuration)conf);
        String historyFileUri = new Path(f.getAbsolutePath()).makeQualified(localFs.getUri(), localFs.getWorkingDirectory()).toUri().toString();
        int exitCode = TestMRJobClient.runTool(conf, (Tool)jc, new String[]{"-history", "pul", historyFileUri}, out);
        TestMRJobClient.assertEquals((String)"Exit code", (int)-1, (int)exitCode);
        exitCode = TestMRJobClient.runTool(conf, (Tool)jc, new String[]{"-history", "all", historyFileUri}, out);
        TestMRJobClient.assertEquals((String)"Exit code", (int)0, (int)exitCode);
        BufferedReader br = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(out.toByteArray())));
        int counter = 0;
        while ((line = br.readLine()) != null) {
            LOG.info((Object)("line = " + line));
            if (!line.startsWith("task_")) continue;
            ++counter;
        }
        TestMRJobClient.assertEquals((int)23, (int)counter);
    }

    private void testJobEvents(String jobId, Configuration conf) throws Exception {
        String line;
        CLI jc = this.createJobClient();
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        int exitCode = TestMRJobClient.runTool(conf, (Tool)jc, new String[]{"-events"}, out);
        TestMRJobClient.assertEquals((String)"Exit code", (int)-1, (int)exitCode);
        exitCode = TestMRJobClient.runTool(conf, (Tool)jc, new String[]{"-events", jobId, "0", "100"}, out);
        TestMRJobClient.assertEquals((String)"Exit code", (int)0, (int)exitCode);
        BufferedReader br = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(out.toByteArray())));
        int counter = 0;
        String attemptId = "attempt" + jobId.substring(3);
        while ((line = br.readLine()) != null) {
            LOG.info((Object)("line = " + line));
            if (!line.contains(attemptId)) continue;
            ++counter;
        }
        TestMRJobClient.assertEquals((int)2, (int)counter);
    }

    private void testJobStatus(String jobId, Configuration conf) throws Exception {
        String line;
        CLI jc = this.createJobClient();
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        int exitCode = TestMRJobClient.runTool(conf, (Tool)jc, new String[]{"-status"}, out);
        TestMRJobClient.assertEquals((String)"Exit code", (int)-1, (int)exitCode);
        exitCode = TestMRJobClient.runTool(conf, (Tool)jc, new String[]{"-status", jobId}, out);
        TestMRJobClient.assertEquals((String)"Exit code", (int)0, (int)exitCode);
        BufferedReader br = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(out.toByteArray())));
        while ((line = br.readLine()) != null) {
            LOG.info((Object)("line = " + line));
            if (!line.contains("Job state:")) continue;
        }
        TestMRJobClient.assertNotNull((Object)line);
        TestMRJobClient.assertTrue((boolean)line.contains("SUCCEEDED"));
    }

    public void testGetCounter(String jobId, Configuration conf) throws Exception {
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        int exitCode = TestMRJobClient.runTool(conf, (Tool)this.createJobClient(), new String[]{"-counter"}, out);
        TestMRJobClient.assertEquals((String)"Exit code", (int)-1, (int)exitCode);
        exitCode = TestMRJobClient.runTool(conf, (Tool)this.createJobClient(), new String[]{"-counter", jobId, "org.apache.hadoop.mapreduce.TaskCounter", "MAP_INPUT_RECORDS"}, out);
        TestMRJobClient.assertEquals((String)"Exit code", (int)0, (int)exitCode);
        TestMRJobClient.assertEquals((String)"Counter", (String)"3", (String)out.toString().trim());
    }

    protected void testAllJobList(String jobId, Configuration conf) throws Exception {
        String line;
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        int exitCode = TestMRJobClient.runTool(conf, (Tool)this.createJobClient(), new String[]{"-list", "alldata"}, out);
        TestMRJobClient.assertEquals((String)"Exit code", (int)-1, (int)exitCode);
        exitCode = TestMRJobClient.runTool(conf, (Tool)this.createJobClient(), new String[]{"-list", "all"}, out);
        TestMRJobClient.assertEquals((String)"Exit code", (int)0, (int)exitCode);
        BufferedReader br = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(out.toByteArray())));
        int counter = 0;
        while ((line = br.readLine()) != null) {
            LOG.info((Object)("line = " + line));
            if (!line.contains(jobId)) continue;
            ++counter;
        }
        TestMRJobClient.assertEquals((int)1, (int)counter);
        out.reset();
    }

    protected void testSubmittedJobList(Configuration conf) throws Exception {
        String line;
        Job job = this.runJobInBackGround(conf);
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        int counter = 0;
        int exitCode = TestMRJobClient.runTool(conf, (Tool)this.createJobClient(), new String[]{"-list"}, out);
        TestMRJobClient.assertEquals((String)"Exit code", (int)0, (int)exitCode);
        BufferedReader br = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(out.toByteArray())));
        counter = 0;
        while ((line = br.readLine()) != null) {
            LOG.info((Object)("line = " + line));
            if (!line.contains(job.getJobID().toString())) continue;
            ++counter;
        }
        TestMRJobClient.assertEquals((int)1, (int)counter);
    }

    protected void verifyJobPriority(String jobId, String priority, Configuration conf, CLI jc) throws Exception {
        String line;
        PipedInputStream pis = new PipedInputStream();
        PipedOutputStream pos = new PipedOutputStream(pis);
        int exitCode = TestMRJobClient.runTool(conf, (Tool)jc, new String[]{"-list", "all"}, pos);
        TestMRJobClient.assertEquals((String)"Exit code", (int)0, (int)exitCode);
        BufferedReader br = new BufferedReader(new InputStreamReader(pis));
        while ((line = br.readLine()) != null) {
            LOG.info((Object)("line = " + line));
            if (!line.contains(jobId)) continue;
            TestMRJobClient.assertTrue((boolean)line.contains(priority));
            break;
        }
        pis.close();
    }

    public void testChangingJobPriority(String jobId, Configuration conf) throws Exception {
        int exitCode = TestMRJobClient.runTool(conf, (Tool)this.createJobClient(), new String[]{"-set-priority"}, new ByteArrayOutputStream());
        TestMRJobClient.assertEquals((String)"Exit code", (int)-1, (int)exitCode);
        exitCode = TestMRJobClient.runTool(conf, (Tool)this.createJobClient(), new String[]{"-set-priority", jobId, "VERY_LOW"}, new ByteArrayOutputStream());
        TestMRJobClient.assertEquals((String)"Exit code", (int)0, (int)exitCode);
        this.verifyJobPriority(jobId, "NORMAL", conf, this.createJobClient());
    }

    protected CLI createJobClient() throws IOException {
        return new CLI();
    }

    private static class BadOutputFormat
    extends TextOutputFormat<Object, Object> {
        private BadOutputFormat() {
        }

        public void checkOutputSpecs(JobContext job) throws IOException {
            throw new IOException();
        }
    }
}

