/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapred;

import java.io.ByteArrayOutputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.io.serializer.Serializer;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.IFileOutputStream;
import org.apache.hadoop.mapred.IndexRecord;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapOutputCollector;
import org.apache.hadoop.mapred.MapOutputFile;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.MapTask;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.MiniMRClientCluster;
import org.apache.hadoop.mapred.MiniMRClientClusterFactory;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Partitioner;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.SpillRecord;
import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.Utils;

public class TestMerge
extends TestCase {
    private static final int NUM_HADOOP_DATA_NODES = 2;
    private static final int NUM_MAPPERS = 10;
    private static final int NUM_REDUCERS = 4;
    private static final int NUM_LINES = 1000;
    private static final Path INPUT_DIR = new Path("/testplugin/input");
    private static final Path OUTPUT = new Path("/testplugin/output");

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testMerge() throws Exception {
        MiniDFSCluster dfsCluster = null;
        MiniMRClientCluster mrCluster = null;
        DistributedFileSystem fileSystem = null;
        try {
            Configuration conf = new Configuration();
            dfsCluster = new MiniDFSCluster(conf, 2, true, null);
            fileSystem = dfsCluster.getFileSystem();
            mrCluster = MiniMRClientClusterFactory.create(((Object)((Object)this)).getClass(), 2, conf);
            this.createInput((FileSystem)fileSystem);
            this.runMergeTest(new JobConf(mrCluster.getConfig()), (FileSystem)fileSystem);
        }
        finally {
            if (dfsCluster != null) {
                dfsCluster.shutdown();
            }
            if (mrCluster != null) {
                mrCluster.stop();
            }
        }
    }

    private void createInput(FileSystem fs) throws Exception {
        fs.delete(INPUT_DIR, true);
        for (int i = 0; i < 10; ++i) {
            FSDataOutputStream os = fs.create(new Path(INPUT_DIR, "input_" + i + ".txt"));
            OutputStreamWriter writer = new OutputStreamWriter((OutputStream)os);
            for (int j = 0; j < 1000; ++j) {
                int k = j + 1;
                String formattedNumber = String.format("%09d", k);
                writer.write(formattedNumber + " " + formattedNumber + "\n");
            }
            ((Writer)writer).close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void runMergeTest(JobConf job, FileSystem fileSystem) throws Exception {
        fileSystem.delete(OUTPUT, true);
        job.setJobName("MergeTest");
        JobClient client = new JobClient(job);
        RunningJob submittedJob = null;
        FileInputFormat.setInputPaths((JobConf)job, (Path[])new Path[]{INPUT_DIR});
        FileOutputFormat.setOutputPath((JobConf)job, (Path)OUTPUT);
        job.set("mapreduce.output.textoutputformat.separator", " ");
        job.setInputFormat(TextInputFormat.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.setMapperClass(MyMapper.class);
        job.setPartitionerClass(MyPartitioner.class);
        job.setOutputFormat(TextOutputFormat.class);
        job.setNumReduceTasks(4);
        job.set("mapreduce.job.map.output.collector.class", MapOutputCopier.class.getName());
        try {
            submittedJob = client.submitJob(job);
            try {
                if (!client.monitorAndPrintJob(job, submittedJob)) {
                    throw new IOException("Job failed!");
                }
            }
            catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
            }
        }
        catch (IOException ioe) {
            System.err.println("Job failed with: " + ioe);
        }
        finally {
            this.verifyOutput(submittedJob, fileSystem);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void verifyOutput(RunningJob submittedJob, FileSystem fileSystem) throws Exception {
        Path[] fileList;
        FSDataInputStream dis = null;
        long numValidRecords = 0L;
        long numInvalidRecords = 0L;
        long numMappersLaunched = 10L;
        String prevKeyValue = "000000000";
        for (Path outFile : fileList = FileUtil.stat2Paths((FileStatus[])fileSystem.listStatus(OUTPUT, (PathFilter)new Utils.OutputFileUtils.OutputFilesFilter()))) {
            try {
                String record;
                dis = fileSystem.open(outFile);
                while ((record = dis.readLine()) != null) {
                    int blankPos = record.indexOf(" ");
                    String keyString = record.substring(0, blankPos);
                    String valueString = record.substring(blankPos + 1);
                    if (keyString.compareTo(prevKeyValue) >= 0 && keyString.equals(valueString)) {
                        prevKeyValue = keyString;
                        ++numValidRecords;
                        continue;
                    }
                    ++numInvalidRecords;
                }
            }
            finally {
                if (dis != null) {
                    dis.close();
                    dis = null;
                }
            }
        }
        TestMerge.assertEquals((long)10000L, (long)numValidRecords);
        TestMerge.assertEquals((long)0L, (long)numInvalidRecords);
    }

    static class KeyValueWriter<K, V> {
        private Class<K> keyClass;
        private Class<V> valueClass;
        private DataOutputBuffer dataBuffer;
        private Serializer<K> keySerializer;
        private Serializer<V> valueSerializer;
        private DataOutputStream outputStream;

        public KeyValueWriter(Configuration conf, OutputStream output, Class<K> kyClass, Class<V> valClass) throws IOException {
            this.keyClass = kyClass;
            this.valueClass = valClass;
            this.dataBuffer = new DataOutputBuffer();
            SerializationFactory serializationFactory = new SerializationFactory(conf);
            this.keySerializer = serializationFactory.getSerializer(this.keyClass);
            this.keySerializer.open((OutputStream)this.dataBuffer);
            this.valueSerializer = serializationFactory.getSerializer(this.valueClass);
            this.valueSerializer.open((OutputStream)this.dataBuffer);
            this.outputStream = new DataOutputStream(output);
        }

        public void write(K key, V value) throws IOException {
            if (key.getClass() != this.keyClass) {
                throw new IOException("wrong key class: " + key.getClass() + " is not " + this.keyClass);
            }
            if (value.getClass() != this.valueClass) {
                throw new IOException("wrong value class: " + value.getClass() + " is not " + this.valueClass);
            }
            this.keySerializer.serialize(key);
            int keyLength = this.dataBuffer.getLength();
            if (keyLength < 0) {
                throw new IOException("Negative key-length not allowed: " + keyLength + " for " + key);
            }
            this.valueSerializer.serialize(value);
            int valueLength = this.dataBuffer.getLength() - keyLength;
            if (valueLength < 0) {
                throw new IOException("Negative value-length not allowed: " + valueLength + " for " + value);
            }
            WritableUtils.writeVInt((DataOutput)this.outputStream, (int)keyLength);
            WritableUtils.writeVInt((DataOutput)this.outputStream, (int)valueLength);
            this.outputStream.write(this.dataBuffer.getData(), 0, this.dataBuffer.getLength());
            this.dataBuffer.reset();
        }

        public void close() throws IOException {
            this.keySerializer.close();
            this.valueSerializer.close();
            WritableUtils.writeVInt((DataOutput)this.outputStream, (int)-1);
            WritableUtils.writeVInt((DataOutput)this.outputStream, (int)-1);
            this.outputStream.close();
        }
    }

    static class MapOutputCopier<K, V>
    implements MapOutputCollector<K, V> {
        private static final int BUF_SIZE = 131072;
        private MapTask mapTask;
        private JobConf jobConf;
        private Task.TaskReporter reporter;
        private int numberOfPartitions;
        private Class<K> keyClass;
        private Class<V> valueClass;
        private KeyValueWriter<K, V>[] recordWriters;
        private ByteArrayOutputStream[] outStreams;

        public void init(MapOutputCollector.Context context) throws IOException, ClassNotFoundException {
            this.mapTask = context.getMapTask();
            this.jobConf = context.getJobConf();
            this.reporter = context.getReporter();
            this.numberOfPartitions = this.jobConf.getNumReduceTasks();
            this.keyClass = this.jobConf.getMapOutputKeyClass();
            this.valueClass = this.jobConf.getMapOutputValueClass();
            this.recordWriters = new KeyValueWriter[this.numberOfPartitions];
            this.outStreams = new ByteArrayOutputStream[this.numberOfPartitions];
            for (int i = 0; i < this.numberOfPartitions; ++i) {
                this.outStreams[i] = new ByteArrayOutputStream();
                this.recordWriters[i] = new KeyValueWriter<K, V>((Configuration)this.jobConf, this.outStreams[i], this.keyClass, this.valueClass);
            }
        }

        public synchronized void collect(K key, V value, int partitionNumber) throws IOException, InterruptedException {
            if (partitionNumber < 0 || partitionNumber >= this.numberOfPartitions) {
                throw new IOException("Invalid partition number: " + partitionNumber);
            }
            this.recordWriters[partitionNumber].write(key, value);
            this.reporter.progress();
        }

        public void close() throws IOException, InterruptedException {
            long totalSize = 0L;
            for (int i = 0; i < this.numberOfPartitions; ++i) {
                this.recordWriters[i].close();
                this.outStreams[i].close();
                totalSize += (long)this.outStreams[i].size();
            }
            MapOutputFile mapOutputFile = this.mapTask.getMapOutputFile();
            Path finalOutput = mapOutputFile.getOutputFileForWrite(totalSize);
            Path indexPath = mapOutputFile.getOutputIndexFileForWrite((long)(this.numberOfPartitions * 24));
            this.copyPartitions(finalOutput, indexPath);
        }

        public void flush() throws IOException, InterruptedException, ClassNotFoundException {
        }

        private void copyPartitions(Path mapOutputPath, Path indexPath) throws IOException {
            LocalFileSystem localFs = FileSystem.getLocal((Configuration)this.jobConf);
            FileSystem rfs = localFs.getRaw();
            FSDataOutputStream rawOutput = rfs.create(mapOutputPath, true, 131072);
            SpillRecord spillRecord = new SpillRecord(this.numberOfPartitions);
            IndexRecord indexRecord = new IndexRecord();
            for (int i = 0; i < this.numberOfPartitions; ++i) {
                indexRecord.startOffset = rawOutput.getPos();
                byte[] buffer = this.outStreams[i].toByteArray();
                IFileOutputStream checksumOutput = new IFileOutputStream((OutputStream)rawOutput);
                checksumOutput.write(buffer);
                checksumOutput.finish();
                indexRecord.rawLength = buffer.length;
                indexRecord.partLength = rawOutput.getPos() - indexRecord.startOffset;
                spillRecord.putIndex(indexRecord, i);
                this.reporter.progress();
            }
            rawOutput.close();
            spillRecord.writeToFile(indexPath, this.jobConf);
        }
    }

    static class MyPartitioner
    implements Partitioner<Text, Text> {
        public void configure(JobConf job) {
        }

        public int getPartition(Text key, Text value, int numPartitions) {
            int keyValue = 0;
            try {
                keyValue = Integer.parseInt(key.toString());
            }
            catch (NumberFormatException nfe) {
                keyValue = 0;
            }
            int partitionNumber = numPartitions * Math.max(0, keyValue - 1) / 1000;
            return partitionNumber;
        }
    }

    public static class MyMapper
    extends MapReduceBase
    implements Mapper<LongWritable, Text, Text, Text> {
        private Text keyText = new Text();
        private Text valueText = new Text();

        public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
            String record = value.toString();
            int blankPos = record.indexOf(" ");
            this.keyText.set(record.substring(0, blankPos));
            this.valueText.set(record.substring(blankPos + 1));
            output.collect((Object)this.keyText, (Object)this.valueText);
        }

        public void close() throws IOException {
        }
    }
}

