/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapred;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.BindException;
import java.util.ArrayList;
import java.util.Arrays;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.examples.PiEstimator;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapred.OutputLogFilter;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapred.TaskID;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.WordCount;

public class TestMiniMRWithDFS
extends TestCase {
    private static final Log LOG = LogFactory.getLog((String)TestMiniMRWithDFS.class.getName());
    static final int NUM_MAPS = 10;
    static final int NUM_SAMPLES = 100000;

    public static TestResult launchWordCount(JobConf conf, Path inDir, Path outDir, String input, int numMaps, int numReduces) throws IOException {
        FileSystem inFs = inDir.getFileSystem((Configuration)conf);
        FileSystem outFs = outDir.getFileSystem((Configuration)conf);
        outFs.delete(outDir, true);
        if (!inFs.mkdirs(inDir)) {
            throw new IOException("Mkdirs failed to create " + inDir.toString());
        }
        FSDataOutputStream file = inFs.create(new Path(inDir, "part-0"));
        file.writeBytes(input);
        file.close();
        conf.setJobName("wordcount");
        conf.setInputFormat(TextInputFormat.class);
        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(IntWritable.class);
        conf.setMapperClass(WordCount.MapClass.class);
        conf.setCombinerClass(WordCount.Reduce.class);
        conf.setReducerClass(WordCount.Reduce.class);
        FileInputFormat.setInputPaths((JobConf)conf, (Path[])new Path[]{inDir});
        FileOutputFormat.setOutputPath((JobConf)conf, (Path)outDir);
        conf.setNumMapTasks(numMaps);
        conf.setNumReduceTasks(numReduces);
        RunningJob job = JobClient.runJob((JobConf)conf);
        return new TestResult(job, TestMiniMRWithDFS.readOutput(outDir, conf));
    }

    public static String readOutput(Path outDir, JobConf conf) throws IOException {
        FileSystem fs = outDir.getFileSystem((Configuration)conf);
        StringBuffer result = new StringBuffer();
        Path[] fileList = FileUtil.stat2Paths((FileStatus[])fs.listStatus(outDir, (PathFilter)new OutputLogFilter()));
        for (int i = 0; i < fileList.length; ++i) {
            LOG.info((Object)("File list[" + i + "]" + ": " + fileList[i]));
            BufferedReader file = new BufferedReader(new InputStreamReader((InputStream)fs.open(fileList[i])));
            String line = file.readLine();
            while (line != null) {
                result.append(line);
                result.append("\n");
                line = file.readLine();
            }
            file.close();
        }
        return result.toString();
    }

    static void checkTaskDirectories(MiniMRCluster mr, String[] jobIds, String[] taskDirs) {
        int i;
        mr.waitUntilIdle();
        int trackers = mr.getNumTaskTrackers();
        ArrayList<String> neededDirs = new ArrayList<String>(Arrays.asList(taskDirs));
        boolean[] found = new boolean[taskDirs.length];
        for (i = 0; i < trackers; ++i) {
            String name;
            int j;
            int numNotDel = 0;
            File localDir = new File(mr.getTaskTrackerLocalDir(i));
            LOG.debug((Object)("Tracker directory: " + localDir));
            File trackerDir = new File(localDir, "taskTracker");
            TestMiniMRWithDFS.assertTrue((String)("local dir " + localDir + " does not exist."), (boolean)localDir.isDirectory());
            TestMiniMRWithDFS.assertTrue((String)("task tracker dir " + trackerDir + " does not exist."), (boolean)trackerDir.isDirectory());
            String[] contents = localDir.list();
            String[] trackerContents = trackerDir.list();
            for (j = 0; j < contents.length; ++j) {
                System.out.println("Local " + localDir + ": " + contents[j]);
            }
            for (j = 0; j < trackerContents.length; ++j) {
                System.out.println("Local jobcache " + trackerDir + ": " + trackerContents[j]);
            }
            for (int fileIdx = 0; fileIdx < contents.length; ++fileIdx) {
                name = contents[fileIdx];
                if ("taskTracker".equals(contents[fileIdx])) continue;
                LOG.debug((Object)("Looking at " + name));
                TestMiniMRWithDFS.assertTrue((String)("Spurious directory " + name + " found in " + localDir), (boolean)false);
            }
            for (int idx = 0; idx < neededDirs.size(); ++idx) {
                name = (String)neededDirs.get(idx);
                if (!new File(new File(new File(trackerDir, "jobcache"), jobIds[idx]), name).isDirectory()) continue;
                found[idx] = true;
                ++numNotDel;
            }
        }
        for (i = 0; i < found.length; ++i) {
            TestMiniMRWithDFS.assertTrue((String)("Directory " + taskDirs[i] + " not found"), (boolean)found[i]);
        }
    }

    public static void runPI(MiniMRCluster mr, JobConf jobconf) throws IOException {
        LOG.info((Object)"runPI");
        double estimate = PiEstimator.estimate((int)10, (long)100000L, (JobConf)jobconf).doubleValue();
        double error = Math.abs(Math.PI - estimate);
        TestMiniMRWithDFS.assertTrue((String)("Error in PI estimation " + error + " exceeds 0.01"), (error < 0.01 ? 1 : 0) != 0);
        TestMiniMRWithDFS.checkTaskDirectories(mr, new String[0], new String[0]);
    }

    public static void runWordCount(MiniMRCluster mr, JobConf jobConf) throws IOException {
        LOG.info((Object)"runWordCount");
        String pattern = TaskAttemptID.getTaskAttemptIDsPattern(null, null, (Boolean)true, (Integer)1, null);
        jobConf.setKeepTaskFilesPattern(pattern);
        Path inDir = new Path("./wc/input");
        Path outDir = new Path("./wc/output");
        String input = "The quick brown fox\nhas many silly\nred fox sox\n";
        TestResult result = TestMiniMRWithDFS.launchWordCount(jobConf, inDir, outDir, input, 3, 1);
        TestMiniMRWithDFS.assertEquals((String)"The\t1\nbrown\t1\nfox\t2\nhas\t1\nmany\t1\nquick\t1\nred\t1\nsilly\t1\nsox\t1\n", (String)result.output);
        JobID jobid = result.job.getID();
        TaskAttemptID taskid = new TaskAttemptID(new TaskID((org.apache.hadoop.mapreduce.JobID)jobid, true, 1), 0);
        TestMiniMRWithDFS.checkTaskDirectories(mr, new String[]{jobid.toString()}, new String[]{taskid.toString()});
        jobConf = mr.createJobConf();
        input = "owen is oom";
        result = TestMiniMRWithDFS.launchWordCount(jobConf, inDir, outDir, input, 0, 1);
        TestMiniMRWithDFS.assertEquals((String)"is\t1\noom\t1\nowen\t1\n", (String)result.output);
        Counters counters = result.job.getCounters();
        long hdfsRead = counters.findCounter("FileSystemCounters", Task.getFileSystemCounterNames((String)"hdfs")[0]).getCounter();
        long hdfsWrite = counters.findCounter("FileSystemCounters", Task.getFileSystemCounterNames((String)"hdfs")[1]).getCounter();
        TestMiniMRWithDFS.assertEquals((long)result.output.length(), (long)hdfsWrite);
        TestMiniMRWithDFS.assertEquals((long)input.length(), (long)hdfsRead);
        LocalFileSystem localfs = FileSystem.getLocal((Configuration)jobConf);
        String TEST_ROOT_DIR = new File(System.getProperty("test.build.data", "/tmp")).toString().replace(' ', '+');
        Path localIn = localfs.makeQualified(new Path(TEST_ROOT_DIR + "/local/in"));
        Path localOut = localfs.makeQualified(new Path(TEST_ROOT_DIR + "/local/out"));
        result = TestMiniMRWithDFS.launchWordCount(jobConf, localIn, localOut, "all your base belong to us", 1, 1);
        TestMiniMRWithDFS.assertEquals((String)"all\t1\nbase\t1\nbelong\t1\nto\t1\nus\t1\nyour\t1\n", (String)result.output);
        TestMiniMRWithDFS.assertTrue((String)"outputs on localfs", (boolean)localfs.exists(localOut));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testWithDFS() throws IOException {
        MiniDFSCluster dfs = null;
        MiniMRCluster mr = null;
        FileSystem fileSys = null;
        try {
            int taskTrackers = 4;
            Configuration conf = new Configuration();
            dfs = new MiniDFSCluster(conf, 4, true, null);
            fileSys = dfs.getFileSystem();
            mr = new MiniMRCluster(4, fileSys.getUri().toString(), 1);
            TestMiniMRWithDFS.runPI(mr, mr.createJobConf());
            TestMiniMRWithDFS.runWordCount(mr, mr.createJobConf());
        }
        finally {
            if (dfs != null) {
                dfs.shutdown();
            }
            if (mr != null) {
                mr.shutdown();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testWithDFSWithDefaultPort() throws IOException {
        MiniDFSCluster dfs = null;
        MiniMRCluster mr = null;
        FileSystem fileSys = null;
        try {
            int taskTrackers = 4;
            Configuration conf = new Configuration();
            dfs = new MiniDFSCluster(8020, conf, 4, true, true, null, null);
            fileSys = dfs.getFileSystem();
            mr = new MiniMRCluster(4, fileSys.getUri().toString(), 1);
            JobConf jobConf = mr.createJobConf();
            Path inDir = new Path("./wc/input");
            Path outDir = new Path("hdfs://" + dfs.getNameNode().getNameNodeAddress().getHostName() + ":" + 8020 + "/./wc/output");
            String input = "The quick brown fox\nhas many silly\nred fox sox\n";
            TestResult result = TestMiniMRWithDFS.launchWordCount(jobConf, inDir, outDir, input, 3, 1);
            TestMiniMRWithDFS.assertEquals((String)"The\t1\nbrown\t1\nfox\t2\nhas\t1\nmany\t1\nquick\t1\nred\t1\nsilly\t1\nsox\t1\n", (String)result.output);
            Path outDir2 = new Path("hdfs:/test/wc/output2");
            jobConf.set("fs.default.name", "hdfs://localhost:8020");
            result = TestMiniMRWithDFS.launchWordCount(jobConf, inDir, outDir2, input, 3, 1);
            TestMiniMRWithDFS.assertEquals((String)"The\t1\nbrown\t1\nfox\t2\nhas\t1\nmany\t1\nquick\t1\nred\t1\nsilly\t1\nsox\t1\n", (String)result.output);
        }
        catch (BindException be) {
            LOG.info((Object)"Skip the test this time because can not start namenode on port 8020", (Throwable)be);
        }
        finally {
            if (dfs != null) {
                dfs.shutdown();
            }
            if (mr != null) {
                mr.shutdown();
            }
        }
    }

    public static class TestResult {
        public String output;
        public RunningJob job;

        TestResult(RunningJob job, String output) {
            this.job = job;
            this.output = output;
        }
    }
}

