/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs;

import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.net.URI;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.AppendTestUtil;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.TestBlockReaderLocal;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

public class TestShortCircuitLocalRead {
    private static TemporarySocketDirectory sockDir;
    static final long seed = 3735928559L;
    static final int blockSize = 5120;
    boolean simulatedStorage = false;

    @BeforeClass
    public static void init() {
        sockDir = new TemporarySocketDirectory();
        DomainSocket.disableBindPathValidation();
    }

    @AfterClass
    public static void shutdown() throws IOException {
        sockDir.close();
    }

    @Before
    public void before() {
        Assume.assumeThat((Object)DomainSocket.getLoadingFailureReason(), (Matcher)CoreMatchers.equalTo(null));
    }

    static FSDataOutputStream createFile(FileSystem fileSys, Path name, int repl) throws IOException {
        FSDataOutputStream stm = fileSys.create(name, true, fileSys.getConf().getInt("io.file.buffer.size", 4096), (short)repl, 5120L);
        return stm;
    }

    private static void checkData(byte[] actual, int from, byte[] expected, String message) {
        TestShortCircuitLocalRead.checkData(actual, from, expected, actual.length, message);
    }

    private static void checkData(byte[] actual, int from, byte[] expected, int len, String message) {
        for (int idx = 0; idx < len; ++idx) {
            if (expected[from + idx] == actual[idx]) continue;
            Assert.fail((String)(message + " byte " + (from + idx) + " differs. expected " + expected[from + idx] + " actual " + actual[idx] + "\nexpected: " + StringUtils.byteToHexString((byte[])expected, (int)from, (int)(from + len)) + "\nactual:   " + StringUtils.byteToHexString((byte[])actual, (int)0, (int)len)));
        }
    }

    private static String getCurrentUser() throws IOException {
        return UserGroupInformation.getCurrentUser().getShortUserName();
    }

    static void checkFileContent(URI uri, Path name, byte[] expected, int readOffset, String readingUser, Configuration conf, boolean legacyShortCircuitFails) throws IOException, InterruptedException {
        DistributedFileSystem fs = TestShortCircuitLocalRead.getFileSystem(readingUser, uri, conf);
        if (legacyShortCircuitFails) {
            Assert.assertTrue((boolean)fs.getClient().useLegacyBlockReaderLocal());
        }
        FSDataInputStream stm = fs.open(name);
        byte[] actual = new byte[expected.length - readOffset];
        stm.readFully((long)readOffset, actual);
        TestShortCircuitLocalRead.checkData(actual, readOffset, expected, "Read 2");
        stm.close();
        actual = new byte[expected.length - readOffset];
        stm = fs.open(name);
        IOUtils.skipFully((InputStream)stm, (long)readOffset);
        int nread = stm.read(actual, 0, 3);
        nread += stm.read(actual, nread, 2);
        nread += stm.read(actual, nread, 517);
        TestShortCircuitLocalRead.checkData(actual, readOffset, expected, nread, "A few bytes");
        while (nread < actual.length) {
            int nbytes = stm.read(actual, nread, actual.length - nread);
            if (nbytes < 0) {
                throw new EOFException("End of file reached before reading fully.");
            }
            nread += nbytes;
        }
        TestShortCircuitLocalRead.checkData(actual, readOffset, expected, "Read 3");
        if (legacyShortCircuitFails) {
            Assert.assertFalse((boolean)fs.getClient().useLegacyBlockReaderLocal());
        }
        stm.close();
    }

    private static byte[] arrayFromByteBuffer(ByteBuffer buf) {
        ByteBuffer alt = buf.duplicate();
        alt.clear();
        byte[] arr = new byte[alt.remaining()];
        alt.get(arr);
        return arr;
    }

    static void checkFileContentDirect(URI uri, Path name, byte[] expected, int readOffset, String readingUser, Configuration conf, boolean legacyShortCircuitFails) throws IOException, InterruptedException {
        DistributedFileSystem fs = TestShortCircuitLocalRead.getFileSystem(readingUser, uri, conf);
        if (legacyShortCircuitFails) {
            Assert.assertTrue((boolean)fs.getClient().useLegacyBlockReaderLocal());
        }
        HdfsDataInputStream stm = (HdfsDataInputStream)fs.open(name);
        ByteBuffer actual = ByteBuffer.allocateDirect(expected.length - readOffset);
        IOUtils.skipFully((InputStream)stm, (long)readOffset);
        actual.limit(3);
        int nread = stm.read(actual);
        actual.limit(nread + 2);
        actual.limit(Math.min(actual.capacity(), (nread += stm.read(actual)) + 517));
        TestShortCircuitLocalRead.checkData(TestShortCircuitLocalRead.arrayFromByteBuffer(actual), readOffset, expected, nread += stm.read(actual), "A few bytes");
        actual.limit(actual.capacity());
        while (actual.hasRemaining()) {
            int nbytes = stm.read(actual);
            if (nbytes < 0) {
                throw new EOFException("End of file reached before reading fully.");
            }
            nread += nbytes;
        }
        TestShortCircuitLocalRead.checkData(TestShortCircuitLocalRead.arrayFromByteBuffer(actual), readOffset, expected, "Read 3");
        if (legacyShortCircuitFails) {
            Assert.assertFalse((boolean)fs.getClient().useLegacyBlockReaderLocal());
        }
        stm.close();
    }

    public void doTestShortCircuitReadLegacy(boolean ignoreChecksum, int size, int readOffset, String shortCircuitUser, String readingUser, boolean legacyShortCircuitFails) throws IOException, InterruptedException {
        this.doTestShortCircuitReadImpl(ignoreChecksum, size, readOffset, shortCircuitUser, readingUser, legacyShortCircuitFails);
    }

    public void doTestShortCircuitRead(boolean ignoreChecksum, int size, int readOffset) throws IOException, InterruptedException {
        String shortCircuitUser = TestShortCircuitLocalRead.getCurrentUser();
        this.doTestShortCircuitReadImpl(ignoreChecksum, size, readOffset, null, TestShortCircuitLocalRead.getCurrentUser(), false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void doTestShortCircuitReadImpl(boolean ignoreChecksum, int size, int readOffset, String shortCircuitUser, String readingUser, boolean legacyShortCircuitFails) throws IOException, InterruptedException {
        Configuration conf = new Configuration();
        conf.setBoolean("dfs.client.read.shortcircuit", true);
        conf.setBoolean("dfs.client.read.shortcircuit.skip.checksum", ignoreChecksum);
        conf.set("dfs.domain.socket.path", new File(sockDir.getDir(), "TestShortCircuitLocalRead._PORT.sock").getAbsolutePath());
        if (shortCircuitUser != null) {
            conf.set("dfs.block.local-path-access.user", shortCircuitUser);
            conf.setBoolean("dfs.client.use.legacy.blockreader.local", true);
        }
        if (this.simulatedStorage) {
            SimulatedFSDataset.setFactory(conf);
        }
        MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build();
        DistributedFileSystem fs = cluster.getFileSystem();
        try {
            Path path = new Path("/");
            Assert.assertTrue((String)"/ should be a directory", (fs.getFileStatus(path).isDirectory() ? 1 : 0) != 0);
            byte[] fileData = AppendTestUtil.randomBytes(3735928559L, size);
            Path file1 = fs.makeQualified(new Path("filelocal.dat"));
            FSDataOutputStream stm = TestShortCircuitLocalRead.createFile((FileSystem)fs, file1, 1);
            stm.write(fileData);
            stm.close();
            URI uri = cluster.getURI();
            TestShortCircuitLocalRead.checkFileContent(uri, file1, fileData, readOffset, readingUser, conf, legacyShortCircuitFails);
            TestShortCircuitLocalRead.checkFileContentDirect(uri, file1, fileData, readOffset, readingUser, conf, legacyShortCircuitFails);
        }
        finally {
            fs.close();
            cluster.shutdown();
        }
    }

    @Test(timeout=10000L)
    public void testFileLocalReadNoChecksum() throws Exception {
        this.doTestShortCircuitRead(true, 15460, 0);
    }

    @Test(timeout=10000L)
    public void testFileLocalReadChecksum() throws Exception {
        this.doTestShortCircuitRead(false, 15460, 0);
    }

    @Test(timeout=10000L)
    public void testSmallFileLocalRead() throws Exception {
        this.doTestShortCircuitRead(false, 13, 0);
        this.doTestShortCircuitRead(false, 13, 5);
        this.doTestShortCircuitRead(true, 13, 0);
        this.doTestShortCircuitRead(true, 13, 5);
    }

    @Test(timeout=10000L)
    public void testLocalReadLegacy() throws Exception {
        this.doTestShortCircuitReadLegacy(true, 13, 0, TestShortCircuitLocalRead.getCurrentUser(), TestShortCircuitLocalRead.getCurrentUser(), false);
    }

    @Test(timeout=10000L)
    public void testLocalReadFallback() throws Exception {
        this.doTestShortCircuitReadLegacy(true, 13, 0, TestShortCircuitLocalRead.getCurrentUser(), "notallowed", true);
    }

    @Test(timeout=10000L)
    public void testReadFromAnOffset() throws Exception {
        this.doTestShortCircuitRead(false, 15460, 777);
        this.doTestShortCircuitRead(true, 15460, 777);
    }

    @Test(timeout=10000L)
    public void testLongFile() throws Exception {
        this.doTestShortCircuitRead(false, 51300, 777);
        this.doTestShortCircuitRead(true, 51300, 777);
    }

    private ClientDatanodeProtocol getProxy(UserGroupInformation ugi, final DatanodeID dnInfo, final Configuration conf) throws IOException, InterruptedException {
        return (ClientDatanodeProtocol)ugi.doAs((PrivilegedExceptionAction)new PrivilegedExceptionAction<ClientDatanodeProtocol>(){

            @Override
            public ClientDatanodeProtocol run() throws Exception {
                return DFSUtil.createClientDatanodeProtocolProxy((DatanodeID)dnInfo, (Configuration)conf, (int)60000, (boolean)false);
            }
        });
    }

    private static DistributedFileSystem getFileSystem(String user, final URI uri, final Configuration conf) throws InterruptedException, IOException {
        UserGroupInformation ugi = UserGroupInformation.createRemoteUser((String)user);
        return (DistributedFileSystem)ugi.doAs((PrivilegedExceptionAction)new PrivilegedExceptionAction<DistributedFileSystem>(){

            @Override
            public DistributedFileSystem run() throws Exception {
                return (DistributedFileSystem)FileSystem.get((URI)uri, (Configuration)conf);
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=10000L)
    public void testDeprecatedGetBlockLocalPathInfoRpc() throws IOException, InterruptedException {
        Configuration conf = new Configuration();
        MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build();
        cluster.waitActive();
        DistributedFileSystem fs = cluster.getFileSystem();
        try {
            DFSTestUtil.createFile((FileSystem)fs, new Path("/tmp/x"), 16L, (short)1, 23L);
            LocatedBlocks lb = cluster.getNameNode().getRpcServer().getBlockLocations("/tmp/x", 0L, 16L);
            ExtendedBlock blk = new ExtendedBlock(lb.get(0).getBlock());
            Token token = lb.get(0).getBlockToken();
            DatanodeInfo dnInfo = lb.get(0).getLocations()[0];
            ClientDatanodeProtocol proxy = DFSUtil.createClientDatanodeProtocolProxy((DatanodeID)dnInfo, (Configuration)conf, (int)60000, (boolean)false);
            try {
                proxy.getBlockLocalPathInfo(blk, token);
                Assert.fail((String)"The call should have failed as this user  is not allowed to call getBlockLocalPathInfo");
            }
            catch (IOException ex) {
                Assert.assertTrue((boolean)ex.getMessage().contains("not allowed to call getBlockLocalPathInfo"));
            }
        }
        finally {
            fs.close();
            cluster.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=10000L)
    public void testSkipWithVerifyChecksum() throws IOException {
        int size = 5120;
        Configuration conf = new Configuration();
        conf.setBoolean("dfs.client.read.shortcircuit", true);
        conf.setBoolean("dfs.client.read.shortcircuit.skip.checksum", false);
        conf.set("dfs.domain.socket.path", "/tmp/testSkipWithVerifyChecksum._PORT");
        DomainSocket.disableBindPathValidation();
        if (this.simulatedStorage) {
            SimulatedFSDataset.setFactory(conf);
        }
        MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build();
        DistributedFileSystem fs = cluster.getFileSystem();
        try {
            Path path = new Path("/");
            Assert.assertTrue((String)"/ should be a directory", (fs.getFileStatus(path).isDirectory() ? 1 : 0) != 0);
            byte[] fileData = AppendTestUtil.randomBytes(3735928559L, size * 3);
            Path file1 = new Path("filelocal.dat");
            FSDataOutputStream stm = TestShortCircuitLocalRead.createFile((FileSystem)fs, file1, 1);
            stm.write(fileData);
            stm.close();
            FSDataInputStream instm = fs.open(file1);
            byte[] actual = new byte[fileData.length];
            int nread = instm.read(actual, 0, 3);
            long skipped = 2 * size + 3;
            instm.seek(skipped);
            nread = instm.read(actual, (int)(skipped + (long)nread), 3);
            instm.close();
        }
        finally {
            fs.close();
            cluster.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testHandleTruncatedBlockFile() throws IOException {
        MiniDFSCluster cluster = null;
        HdfsConfiguration conf = new HdfsConfiguration();
        conf.setBoolean("dfs.client.read.shortcircuit", true);
        conf.setBoolean("dfs.client.read.shortcircuit.skip.checksum", false);
        conf.set("dfs.domain.socket.path", "/tmp/testHandleTruncatedBlockFile._PORT");
        conf.set("dfs.checksum.type", "CRC32C");
        Path TEST_PATH = new Path("/a");
        Path TEST_PATH2 = new Path("/b");
        long RANDOM_SEED = 4567L;
        long RANDOM_SEED2 = 4568L;
        FSDataInputStream fsIn = null;
        int TEST_LENGTH = 3456;
        try {
            byte[] buf;
            cluster = new MiniDFSCluster.Builder((Configuration)conf).numDataNodes(1).build();
            cluster.waitActive();
            DistributedFileSystem fs = cluster.getFileSystem();
            DFSTestUtil.createFile((FileSystem)fs, TEST_PATH, 3456L, (short)1, 4567L);
            DFSTestUtil.createFile((FileSystem)fs, TEST_PATH2, 3456L, (short)1, 4568L);
            fsIn = cluster.getFileSystem().open(TEST_PATH2);
            byte[] original = new byte[3456];
            IOUtils.readFully((InputStream)fsIn, (byte[])original, (int)0, (int)3456);
            fsIn.close();
            fsIn = null;
            try {
                DFSTestUtil.waitReplication((FileSystem)fs, TEST_PATH, (short)1);
            }
            catch (InterruptedException e) {
                Assert.fail((String)("unexpected InterruptedException during waitReplication: " + e));
            }
            catch (TimeoutException e) {
                Assert.fail((String)("unexpected TimeoutException during waitReplication: " + e));
            }
            ExtendedBlock block = DFSTestUtil.getFirstBlock((FileSystem)fs, TEST_PATH);
            File dataFile = MiniDFSCluster.getBlockFile(0, block);
            cluster.shutdown();
            cluster = null;
            RandomAccessFile raf = null;
            try {
                raf = new RandomAccessFile(dataFile, "rw");
                raf.setLength(0L);
            }
            finally {
                if (raf != null) {
                    raf.close();
                }
            }
            cluster = new MiniDFSCluster.Builder((Configuration)conf).numDataNodes(1).format(false).build();
            cluster.waitActive();
            fs = cluster.getFileSystem();
            fsIn = fs.open(TEST_PATH);
            try {
                buf = new byte[100];
                fsIn.seek(2000L);
                fsIn.readFully(buf, 0, buf.length);
                Assert.fail((String)"shouldn't be able to read from corrupt 0-length block file.");
            }
            catch (IOException e) {
                DFSClient.LOG.error((Object)"caught exception ", (Throwable)e);
            }
            fsIn.close();
            fsIn = null;
            fsIn = fs.open(TEST_PATH2);
            buf = new byte[original.length];
            fsIn.readFully(buf, 0, buf.length);
            TestBlockReaderLocal.assertArrayRegionsEqual(original, 0, buf, 0, original.length);
            fsIn.close();
            fsIn = null;
        }
        finally {
            if (fsIn != null) {
                fsIn.close();
            }
            if (cluster != null) {
                cluster.shutdown();
            }
        }
    }

    public static void main(String[] args) throws Exception {
        int i;
        if (args.length != 3) {
            System.out.println("Usage: test shortcircuit checksum threadCount");
            System.exit(1);
        }
        boolean shortcircuit = Boolean.valueOf(args[0]);
        boolean checksum = Boolean.valueOf(args[1]);
        int threadCount = Integer.valueOf(args[2]);
        final Configuration conf = new Configuration();
        conf.setBoolean("dfs.client.read.shortcircuit", shortcircuit);
        conf.set("dfs.domain.socket.path", "/tmp/TestShortCircuitLocalRead._PORT");
        conf.setBoolean("dfs.client.read.shortcircuit.skip.checksum", checksum);
        int fileSize = 5120100;
        final byte[] dataToWrite = AppendTestUtil.randomBytes(3735928559L, fileSize);
        final Path file1 = new Path("filelocal.dat");
        final FileSystem fs = FileSystem.get((Configuration)conf);
        FSDataOutputStream stm = TestShortCircuitLocalRead.createFile(fs, file1, 1);
        stm.write(dataToWrite);
        stm.close();
        long start = Time.now();
        int iteration = 20;
        Thread[] threads = new Thread[threadCount];
        for (i = 0; i < threadCount; ++i) {
            threads[i] = new Thread(){

                @Override
                public void run() {
                    for (int i = 0; i < 20; ++i) {
                        try {
                            String user = TestShortCircuitLocalRead.getCurrentUser();
                            TestShortCircuitLocalRead.checkFileContent(fs.getUri(), file1, dataToWrite, 0, user, conf, true);
                            continue;
                        }
                        catch (IOException e) {
                            e.printStackTrace();
                            continue;
                        }
                        catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            };
        }
        for (i = 0; i < threadCount; ++i) {
            threads[i].start();
        }
        for (i = 0; i < threadCount; ++i) {
            threads[i].join();
        }
        long end = Time.now();
        System.out.println("Iteration 20 took " + (end - start));
        fs.delete(file1, false);
    }
}

