/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.util;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileWriter;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Scanner;
import java.util.Set;
import java.util.TreeMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.HBaseHomePath;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.experimental.categories.Category;

@Category(value={LargeTests.class})
public class ProcessBasedLocalHBaseCluster {
    private final String hbaseHome;
    private final String workDir;
    private final Configuration conf;
    private final int numMasters;
    private final int numRegionServers;
    private final int numDataNodes;
    private final List<Integer> rsPorts;
    private final List<Integer> masterPorts;
    private final int zkClientPort;
    private static final int MAX_FILE_SIZE_OVERRIDE = 10000000;
    private static final Log LOG = LogFactory.getLog(ProcessBasedLocalHBaseCluster.class);
    private List<String> daemonPidFiles = Collections.synchronizedList(new ArrayList());
    private boolean shutdownHookInstalled;
    private String hbaseDaemonScript;
    private MiniDFSCluster dfsCluster;
    private HBaseTestingUtility testUtil;
    private Thread logTailerThread;
    private List<String> logTailDirs = Collections.synchronizedList(new ArrayList());
    private static final Pattern TO_REMOVE_FROM_LOG_LINES_RE = Pattern.compile("org\\.apache\\.hadoop\\.hbase\\.");
    private static final Pattern LOG_PATH_FORMAT_RE = Pattern.compile("^.*/([A-Z]+)-(\\d+)/[^/]+$");

    public ProcessBasedLocalHBaseCluster(Configuration conf, int numDataNodes, int numRegionServers) {
        this.conf = conf;
        this.hbaseHome = HBaseHomePath.getHomePath();
        this.numMasters = 1;
        this.numRegionServers = numRegionServers;
        this.workDir = this.hbaseHome + "/target/local_cluster";
        this.numDataNodes = numDataNodes;
        this.hbaseDaemonScript = this.hbaseHome + "/bin/hbase-daemon.sh";
        this.zkClientPort = HBaseTestingUtility.randomFreePort();
        this.rsPorts = ProcessBasedLocalHBaseCluster.sortedPorts(numRegionServers);
        this.masterPorts = ProcessBasedLocalHBaseCluster.sortedPorts(this.numMasters);
        conf.set("hbase.zookeeper.quorum", "localhost");
        conf.setInt("hbase.zookeeper.property.clientPort", this.zkClientPort);
    }

    public void startMiniDFS() throws Exception {
        if (this.testUtil == null) {
            this.testUtil = new HBaseTestingUtility(this.conf);
        }
        this.dfsCluster = this.testUtil.startMiniDFSCluster(this.numDataNodes);
    }

    private static List<Integer> sortedPorts(int n) {
        ArrayList<Integer> ports = new ArrayList<Integer>(n);
        for (int i = 0; i < n; ++i) {
            ports.add(HBaseTestingUtility.randomFreePort());
        }
        Collections.sort(ports);
        return ports;
    }

    public void startHBase() throws IOException {
        this.startDaemonLogTailer();
        this.cleanupOldState();
        LOG.info((Object)("Starting ZooKeeper on port " + this.zkClientPort));
        this.startZK();
        HBaseTestingUtility.waitForHostPort("localhost", this.zkClientPort);
        for (int masterPort : this.masterPorts) {
            this.startMaster(masterPort);
        }
        ZKUtil.waitForBaseZNode((Configuration)this.conf);
        for (int rsPort : this.rsPorts) {
            this.startRegionServer(rsPort);
        }
        LOG.info((Object)"Waiting for HBase startup by scanning META");
        int attemptsLeft = 10;
        while (attemptsLeft-- > 0) {
            try {
                new HTable(this.conf, TableName.META_TABLE_NAME);
            }
            catch (Exception e) {
                LOG.info((Object)("Waiting for HBase to startup. Retries left: " + attemptsLeft), (Throwable)e);
                Threads.sleep((long)1000L);
            }
        }
        LOG.info((Object)("Process-based HBase Cluster with " + this.numRegionServers + " region servers up and running... \n\n"));
    }

    public void startRegionServer(int port) {
        this.startServer(ServerType.RS, port);
    }

    public void startMaster(int port) {
        this.startServer(ServerType.MASTER, port);
    }

    public void killRegionServer(int port) throws IOException {
        this.killServer(ServerType.RS, port);
    }

    public void killMaster() throws IOException {
        this.killServer(ServerType.MASTER, 0);
    }

    public void startZK() {
        this.startServer(ServerType.ZK, 0);
    }

    private void executeCommand(String command) {
        this.executeCommand(command, null);
    }

    private void executeCommand(String command, Map<String, String> envOverrides) {
        this.ensureShutdownHookInstalled();
        LOG.debug((Object)("Command : " + command));
        try {
            String[] envp = null;
            if (envOverrides != null) {
                HashMap<String, String> map = new HashMap<String, String>(System.getenv());
                map.putAll(envOverrides);
                envp = new String[map.size()];
                int idx = 0;
                for (Map.Entry e : map.entrySet()) {
                    envp[idx++] = (String)e.getKey() + "=" + (String)e.getValue();
                }
            }
            Process p = Runtime.getRuntime().exec(command, envp);
            BufferedReader stdInput = new BufferedReader(new InputStreamReader(p.getInputStream()));
            BufferedReader stdError = new BufferedReader(new InputStreamReader(p.getErrorStream()));
            String s = null;
            while ((s = stdInput.readLine()) != null) {
                System.out.println(s);
            }
            while ((s = stdError.readLine()) != null) {
                System.out.println(s);
            }
        }
        catch (IOException e) {
            LOG.error((Object)("Error running: " + command), (Throwable)e);
        }
    }

    private void shutdownAllProcesses() {
        LOG.info((Object)"Killing daemons using pid files");
        ArrayList<String> pidFiles = new ArrayList<String>(this.daemonPidFiles);
        for (String pidFile : pidFiles) {
            int pid = 0;
            try {
                pid = ProcessBasedLocalHBaseCluster.readPidFromFile(pidFile);
            }
            catch (IOException ex) {
                LOG.error((Object)("Could not read pid from file " + pidFile));
            }
            if (pid <= 0) continue;
            LOG.info((Object)("Killing pid " + pid + " (" + pidFile + ")"));
            this.killProcess(pid);
        }
    }

    private void ensureShutdownHookInstalled() {
        if (this.shutdownHookInstalled) {
            return;
        }
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable(){

            @Override
            public void run() {
                ProcessBasedLocalHBaseCluster.this.shutdownAllProcesses();
            }
        }));
        this.shutdownHookInstalled = true;
    }

    private void cleanupOldState() {
        this.executeCommand("rm -rf " + this.workDir);
    }

    private void writeStringToFile(String s, String fileName) {
        try {
            BufferedWriter out = new BufferedWriter(new FileWriter(fileName));
            out.write(s);
            out.close();
        }
        catch (IOException e) {
            LOG.error((Object)("Error writing to: " + fileName), (Throwable)e);
        }
    }

    private String serverWorkingDir(ServerType serverType, int port) {
        return this.workDir + "/" + (Object)((Object)serverType) + "-" + port;
    }

    private int getServerPID(ServerType serverType, int port) throws IOException {
        String pidFile = this.pidFilePath(serverType, port);
        return ProcessBasedLocalHBaseCluster.readPidFromFile(pidFile);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static int readPidFromFile(String pidFile) throws IOException {
        Scanner scanner = new Scanner(new File(pidFile));
        try {
            int n = scanner.nextInt();
            return n;
        }
        finally {
            scanner.close();
        }
    }

    private String pidFilePath(ServerType serverType, int port) {
        String dir = this.serverWorkingDir(serverType, port);
        String user = System.getenv("USER");
        String pidFile = String.format("%s/hbase-%s-%s.pid", dir, user, serverType.fullName);
        return pidFile;
    }

    private void killServer(ServerType serverType, int port) throws IOException {
        int pid = this.getServerPID(serverType, port);
        if (pid > 0) {
            LOG.info((Object)("Killing " + (Object)((Object)serverType) + "; pid=" + pid));
            this.killProcess(pid);
        }
    }

    private void killProcess(int pid) {
        String cmd = "kill -s KILL " + pid;
        this.executeCommand(cmd);
    }

    private void startServer(ServerType serverType, int rsPort) {
        String dir = this.serverWorkingDir(serverType, rsPort);
        String confStr = this.generateConfig(serverType, rsPort, dir);
        LOG.debug((Object)("Creating directory " + dir));
        new File(dir).mkdirs();
        this.writeStringToFile(confStr, dir + "/hbase-site.xml");
        this.writeStringToFile("unset HBASE_MASTER_OPTS\nunset HBASE_REGIONSERVER_OPTS\nunset HBASE_ZOOKEEPER_OPTS\nHBASE_MASTER_DBG_OPTS=' '\nHBASE_REGIONSERVER_DBG_OPTS=' '\nHBASE_ZOOKEEPER_DBG_OPTS=' '\nHBASE_MASTER_JMX_OPTS=' '\nHBASE_REGIONSERVER_JMX_OPTS=' '\nHBASE_ZOOKEEPER_JMX_OPTS=' '\n", dir + "/hbase-env.sh");
        HashMap<String, String> envOverrides = new HashMap<String, String>();
        envOverrides.put("HBASE_LOG_DIR", dir);
        envOverrides.put("HBASE_PID_DIR", dir);
        try {
            FileUtils.copyFile((File)new File(this.hbaseHome, "conf/log4j.properties"), (File)new File(dir, "log4j.properties"));
        }
        catch (IOException ex) {
            LOG.error((Object)("Could not install log4j.properties into " + dir));
        }
        this.executeCommand(this.hbaseDaemonScript + " --config " + dir + " start " + serverType.fullName, envOverrides);
        this.daemonPidFiles.add(this.pidFilePath(serverType, rsPort));
        this.logTailDirs.add(dir);
    }

    private final String generateConfig(ServerType serverType, int rpcPort, String daemonDir) {
        StringBuilder sb = new StringBuilder();
        TreeMap<String, Object> confMap = new TreeMap<String, Object>();
        confMap.put("hbase.cluster.distributed", true);
        if (serverType == ServerType.MASTER) {
            confMap.put("hbase.master.port", rpcPort);
            int masterInfoPort = HBaseTestingUtility.randomFreePort();
            ProcessBasedLocalHBaseCluster.reportWebUIPort("master", masterInfoPort);
            confMap.put("hbase.master.info.port", masterInfoPort);
        } else if (serverType == ServerType.RS) {
            confMap.put("hbase.regionserver.port", rpcPort);
            int rsInfoPort = HBaseTestingUtility.randomFreePort();
            ProcessBasedLocalHBaseCluster.reportWebUIPort("region server", rsInfoPort);
            confMap.put("hbase.regionserver.info.port", rsInfoPort);
        } else {
            confMap.put("hbase.zookeeper.property.dataDir", daemonDir);
        }
        confMap.put("hbase.zookeeper.property.clientPort", this.zkClientPort);
        confMap.put("hbase.hregion.max.filesize", 10000000);
        if (this.dfsCluster != null) {
            String fsURL = "hdfs://localhost:" + this.dfsCluster.getNameNodePort();
            confMap.put("fs.default.name", fsURL);
            confMap.put("fs.defaultFS", fsURL);
            confMap.put("hbase.rootdir", fsURL + "/hbase_test");
        }
        sb.append("<configuration>\n");
        for (Map.Entry entry : confMap.entrySet()) {
            sb.append("  <property>\n");
            sb.append("    <name>" + (String)entry.getKey() + "</name>\n");
            sb.append("    <value>" + entry.getValue() + "</value>\n");
            sb.append("  </property>\n");
        }
        sb.append("</configuration>\n");
        return sb.toString();
    }

    private static void reportWebUIPort(String daemon, int port) {
        LOG.info((Object)("Local " + daemon + " web UI is at http://" + "localhost" + ":" + port));
    }

    public Configuration getConf() {
        return this.conf;
    }

    public void shutdown() {
        if (this.dfsCluster != null) {
            this.dfsCluster.shutdown();
        }
        this.shutdownAllProcesses();
    }

    private static String processLine(String line) {
        Matcher m = TO_REMOVE_FROM_LOG_LINES_RE.matcher(line);
        return m.replaceAll("");
    }

    private void startDaemonLogTailer() {
        this.logTailerThread = new Thread(new LocalDaemonLogTailer());
        this.logTailerThread.setDaemon(true);
        this.logTailerThread.start();
    }

    private final class LocalDaemonLogTailer
    implements Runnable {
        private final Set<String> tailedFiles = new HashSet<String>();
        private final List<String> dirList = new ArrayList<String>();
        private final Object printLock = new Object();
        private final FilenameFilter LOG_FILES = new FilenameFilter(){

            @Override
            public boolean accept(File dir, String name) {
                return name.endsWith(".out") || name.endsWith(".log");
            }
        };

        private LocalDaemonLogTailer() {
        }

        @Override
        public void run() {
            try {
                this.runInternal();
            }
            catch (IOException ex) {
                LOG.error((Object)ex);
            }
        }

        private void runInternal() throws IOException {
            Thread.currentThread().setName(this.getClass().getSimpleName());
            while (true) {
                this.scanDirs();
                try {
                    Thread.sleep(500L);
                }
                catch (InterruptedException e) {
                    LOG.error((Object)"Log tailer thread interrupted", (Throwable)e);
                    return;
                }
            }
        }

        private void scanDirs() throws FileNotFoundException {
            this.dirList.clear();
            this.dirList.addAll(ProcessBasedLocalHBaseCluster.this.logTailDirs);
            for (String d : this.dirList) {
                for (File f : new File(d).listFiles(this.LOG_FILES)) {
                    String filePath = f.getAbsolutePath();
                    if (this.tailedFiles.contains(filePath)) continue;
                    this.tailedFiles.add(filePath);
                    this.startTailingFile(filePath);
                }
            }
        }

        private void startTailingFile(final String filePath) throws FileNotFoundException {
            final PrintStream dest = filePath.endsWith(".log") ? System.err : System.out;
            Matcher m = LOG_PATH_FORMAT_RE.matcher(filePath);
            if (!m.matches()) {
                LOG.error((Object)("Unrecognized log path format: " + filePath));
                return;
            }
            ServerType serverType = ServerType.valueOf(m.group(1));
            int serverPort = Integer.valueOf(m.group(2));
            final String logMsgPrefix = "[" + (Object)((Object)serverType) + (serverPort != 0 ? ":" + serverPort : "") + "] ";
            LOG.debug((Object)("Tailing " + filePath));
            Thread t = new Thread(new Runnable(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 * Enabled aggressive block sorting
                 * Enabled unnecessary exception pruning
                 * Enabled aggressive exception aggregation
                 */
                @Override
                public void run() {
                    try {
                        FileInputStream fis = new FileInputStream(filePath);
                        BufferedReader br = new BufferedReader(new InputStreamReader(fis));
                        block6: while (true) {
                            try {
                                Thread.sleep(200L);
                            }
                            catch (InterruptedException e) {
                                LOG.error((Object)("Tailer for " + filePath + " interrupted"));
                                return;
                            }
                            while (true) {
                                String line;
                                if ((line = br.readLine()) == null) continue block6;
                                line = logMsgPrefix + ProcessBasedLocalHBaseCluster.processLine(line);
                                Object object = LocalDaemonLogTailer.this.printLock;
                                synchronized (object) {
                                    if (line.endsWith("\n")) {
                                        dest.print(line);
                                    } else {
                                        dest.println(line);
                                    }
                                    dest.flush();
                                }
                            }
                            break;
                        }
                    }
                    catch (IOException ex) {
                        LOG.error((Object)("Failed tailing " + filePath), (Throwable)ex);
                    }
                }
            });
            t.setDaemon(true);
            t.setName("Tailer for " + filePath);
            t.start();
        }
    }

    private static enum ServerType {
        MASTER("master"),
        RS("regionserver"),
        ZK("zookeeper");

        private final String fullName;

        private ServerType(String fullName) {
            this.fullName = fullName;
        }
    }
}

