/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.master;

import java.io.IOException;
import java.util.List;
import java.util.NavigableSet;
import java.util.Set;
import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(value={LargeTests.class})
public class TestRollingRestart {
    private static final Log LOG = LogFactory.getLog(TestRollingRestart.class);

    @Test(timeout=500000L)
    public void testBasicRollingRestart() throws Exception {
        int NUM_MASTERS = 2;
        int NUM_RS = 3;
        int NUM_REGIONS_TO_CREATE = 20;
        int expectedNumRS = 3;
        this.log("Starting cluster");
        Configuration conf = HBaseConfiguration.create();
        conf.setInt("hbase.master.assignment.timeoutmonitor.period", 2000);
        conf.setInt("hbase.master.assignment.timeoutmonitor.timeout", 5000);
        HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(conf);
        TEST_UTIL.startMiniCluster(2, 3);
        MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
        this.log("Waiting for active/ready master");
        cluster.waitForActiveAndReadyMaster();
        ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "testRollingRestart", null);
        HMaster master = cluster.getMaster();
        byte[] table = Bytes.toBytes((String)"tableRestart");
        byte[] family = Bytes.toBytes((String)"family");
        this.log("Creating table with 20 regions");
        HTable ht = TEST_UTIL.createTable(table, family);
        int numRegions = TEST_UTIL.createMultiRegions(conf, ht, family, 20);
        ++numRegions;
        this.log("Waiting for no more RIT\n");
        this.blockUntilNoRIT(zkw, master);
        this.log("Disabling table\n");
        TEST_UTIL.getHBaseAdmin().disableTable(table);
        this.log("Waiting for no more RIT\n");
        this.blockUntilNoRIT(zkw, master);
        NavigableSet<String> regions = this.getAllOnlineRegions(cluster);
        this.log("Verifying only catalog and namespace regions are assigned\n");
        if (regions.size() != 2) {
            for (String oregion : regions) {
                this.log("Region still online: " + oregion);
            }
        }
        Assert.assertEquals((long)2L, (long)regions.size());
        this.log("Enabling table\n");
        TEST_UTIL.getHBaseAdmin().enableTable(table);
        this.log("Waiting for no more RIT\n");
        this.blockUntilNoRIT(zkw, master);
        this.log("Verifying there are " + numRegions + " assigned on cluster\n");
        regions = this.getAllOnlineRegions(cluster);
        this.assertRegionsAssigned(cluster, regions);
        Assert.assertEquals((long)expectedNumRS, (long)cluster.getRegionServerThreads().size());
        this.log("Adding a fourth RS");
        JVMClusterUtil.RegionServerThread restarted = cluster.startRegionServer();
        ++expectedNumRS;
        restarted.waitForServerOnline();
        this.log("Additional RS is online");
        this.log("Waiting for no more RIT");
        this.blockUntilNoRIT(zkw, master);
        this.log("Verifying there are " + numRegions + " assigned on cluster");
        this.assertRegionsAssigned(cluster, regions);
        Assert.assertEquals((long)expectedNumRS, (long)cluster.getRegionServerThreads().size());
        List<JVMClusterUtil.MasterThread> masterThreads = cluster.getMasterThreads();
        JVMClusterUtil.MasterThread activeMaster = null;
        JVMClusterUtil.MasterThread backupMaster = null;
        Assert.assertEquals((long)2L, (long)masterThreads.size());
        if (masterThreads.get(0).getMaster().isActiveMaster()) {
            activeMaster = masterThreads.get(0);
            backupMaster = masterThreads.get(1);
        } else {
            activeMaster = masterThreads.get(1);
            backupMaster = masterThreads.get(0);
        }
        this.log("Stopping backup master\n\n");
        backupMaster.getMaster().stop("Stop of backup during rolling restart");
        cluster.hbaseCluster.waitOnMaster(backupMaster);
        this.log("Stopping primary master\n\n");
        activeMaster.getMaster().stop("Stop of active during rolling restart");
        cluster.hbaseCluster.waitOnMaster(activeMaster);
        this.log("Restarting primary master\n\n");
        activeMaster = cluster.startMaster();
        cluster.waitForActiveAndReadyMaster();
        master = activeMaster.getMaster();
        this.log("Restarting backup master\n\n");
        backupMaster = cluster.startMaster();
        Assert.assertEquals((long)expectedNumRS, (long)cluster.getRegionServerThreads().size());
        List<JVMClusterUtil.RegionServerThread> regionServers = cluster.getLiveRegionServerThreads();
        int num = 1;
        int total = regionServers.size();
        for (JVMClusterUtil.RegionServerThread rst : regionServers) {
            ServerName serverName = rst.getRegionServer().getServerName();
            this.log("Stopping region server " + num + " of " + total + " [ " + serverName + "]");
            rst.getRegionServer().stop("Stopping RS during rolling restart");
            cluster.hbaseCluster.waitOnRegionServer(rst);
            this.log("Waiting for RS shutdown to be handled by master");
            this.waitForRSShutdownToStartAndFinish(activeMaster, serverName);
            this.log("RS shutdown done, waiting for no more RIT");
            this.blockUntilNoRIT(zkw, master);
            this.log("Verifying there are " + numRegions + " assigned on cluster");
            this.assertRegionsAssigned(cluster, regions);
            Assert.assertEquals((long)(--expectedNumRS), (long)cluster.getRegionServerThreads().size());
            this.log("Restarting region server " + num + " of " + total);
            restarted = cluster.startRegionServer();
            restarted.waitForServerOnline();
            ++expectedNumRS;
            this.log("Region server " + num + " is back online");
            this.log("Waiting for no more RIT");
            this.blockUntilNoRIT(zkw, master);
            this.log("Verifying there are " + numRegions + " assigned on cluster");
            this.assertRegionsAssigned(cluster, regions);
            Assert.assertEquals((long)expectedNumRS, (long)cluster.getRegionServerThreads().size());
            ++num;
        }
        Thread.sleep(1000L);
        this.assertRegionsAssigned(cluster, regions);
        JVMClusterUtil.RegionServerThread metaServer = this.getServerHostingMeta(cluster);
        this.log("Stopping server hosting hbase:meta #1");
        metaServer.getRegionServer().stop("Stopping hbase:meta server");
        cluster.hbaseCluster.waitOnRegionServer(metaServer);
        this.log("Meta server down #1");
        --expectedNumRS;
        this.log("Waiting for meta server #1 RS shutdown to be handled by master");
        this.waitForRSShutdownToStartAndFinish(activeMaster, metaServer.getRegionServer().getServerName());
        this.log("Waiting for no more RIT");
        long start = System.currentTimeMillis();
        do {
            this.blockUntilNoRIT(zkw, master);
        } while (this.getNumberOfOnlineRegions(cluster) < numRegions && System.currentTimeMillis() - start < 60000L);
        this.log("Verifying there are " + numRegions + " assigned on cluster");
        this.assertRegionsAssigned(cluster, regions);
        Assert.assertEquals((long)expectedNumRS, (long)cluster.getRegionServerThreads().size());
        metaServer = this.getServerHostingMeta(cluster);
        this.log("Stopping server hosting hbase:meta #2");
        metaServer.getRegionServer().stop("Stopping hbase:meta server");
        cluster.hbaseCluster.waitOnRegionServer(metaServer);
        this.log("Meta server down");
        --expectedNumRS;
        this.log("Waiting for RS shutdown to be handled by master");
        this.waitForRSShutdownToStartAndFinish(activeMaster, metaServer.getRegionServer().getServerName());
        this.log("RS shutdown done, waiting for no more RIT");
        this.blockUntilNoRIT(zkw, master);
        this.log("Verifying there are " + numRegions + " assigned on cluster");
        this.assertRegionsAssigned(cluster, regions);
        Assert.assertEquals((long)expectedNumRS, (long)cluster.getRegionServerThreads().size());
        cluster.startRegionServer().waitForServerOnline();
        cluster.startRegionServer().waitForServerOnline();
        cluster.startRegionServer().waitForServerOnline();
        Thread.sleep(1000L);
        this.log("Waiting for no more RIT");
        this.blockUntilNoRIT(zkw, master);
        this.log("Verifying there are " + numRegions + " assigned on cluster");
        this.assertRegionsAssigned(cluster, regions);
        metaServer = this.getServerHostingMeta(cluster);
        this.log("Stopping server hosting hbase:meta (1 of 3)");
        metaServer.getRegionServer().stop("Stopping hbase:meta server");
        cluster.hbaseCluster.waitOnRegionServer(metaServer);
        this.log("Meta server down (1 of 3)");
        this.log("Waiting for RS shutdown to be handled by master");
        this.waitForRSShutdownToStartAndFinish(activeMaster, metaServer.getRegionServer().getServerName());
        this.log("RS shutdown done, waiting for no more RIT");
        this.blockUntilNoRIT(zkw, master);
        this.log("Verifying there are " + numRegions + " assigned on cluster");
        this.assertRegionsAssigned(cluster, regions);
        metaServer = this.getServerHostingMeta(cluster);
        this.log("Stopping server hosting hbase:meta (2 of 3)");
        metaServer.getRegionServer().stop("Stopping hbase:meta server");
        cluster.hbaseCluster.waitOnRegionServer(metaServer);
        this.log("Meta server down (2 of 3)");
        this.log("Waiting for RS shutdown to be handled by master");
        this.waitForRSShutdownToStartAndFinish(activeMaster, metaServer.getRegionServer().getServerName());
        this.log("RS shutdown done, waiting for no more RIT");
        this.blockUntilNoRIT(zkw, master);
        this.log("Verifying there are " + numRegions + " assigned on cluster");
        this.assertRegionsAssigned(cluster, regions);
        metaServer = this.getServerHostingMeta(cluster);
        this.log("Stopping server hosting hbase:meta (3 of 3)");
        metaServer.getRegionServer().stop("Stopping hbase:meta server");
        cluster.hbaseCluster.waitOnRegionServer(metaServer);
        this.log("Meta server down (3 of 3)");
        this.log("Waiting for RS shutdown to be handled by master");
        this.waitForRSShutdownToStartAndFinish(activeMaster, metaServer.getRegionServer().getServerName());
        this.log("RS shutdown done, waiting for no more RIT");
        this.blockUntilNoRIT(zkw, master);
        this.log("Verifying there are " + numRegions + " assigned on cluster");
        this.assertRegionsAssigned(cluster, regions);
        if (cluster.getRegionServerThreads().size() != 1) {
            this.log("Online regionservers:");
            for (JVMClusterUtil.RegionServerThread rst : cluster.getRegionServerThreads()) {
                this.log("RS: " + rst.getRegionServer().getServerName());
            }
        }
        Assert.assertEquals((long)2L, (long)cluster.getRegionServerThreads().size());
        ht.close();
        TEST_UTIL.shutdownMiniCluster();
    }

    private void blockUntilNoRIT(ZooKeeperWatcher zkw, HMaster master) throws KeeperException, InterruptedException {
        ZKAssign.blockUntilNoRIT((ZooKeeperWatcher)zkw);
        master.assignmentManager.waitUntilNoRegionsInTransition(60000L);
    }

    private void waitForRSShutdownToStartAndFinish(JVMClusterUtil.MasterThread activeMaster, ServerName serverName) throws InterruptedException {
        ServerManager sm = activeMaster.getMaster().getServerManager();
        while (!sm.getDeadServers().isDeadServer(serverName)) {
            this.log("Waiting for [" + serverName + "] to be listed as dead in master");
            Thread.sleep(1L);
        }
        this.log("Server [" + serverName + "] marked as dead, waiting for it to " + "finish dead processing");
        while (sm.areDeadServersInProgress()) {
            this.log("Server [" + serverName + "] still being processed, waiting");
            Thread.sleep(100L);
        }
        this.log("Server [" + serverName + "] done with server shutdown processing");
    }

    private void log(String msg) {
        LOG.debug((Object)("\n\nTRR: " + msg + "\n"));
    }

    private JVMClusterUtil.RegionServerThread getServerHostingMeta(MiniHBaseCluster cluster) throws IOException {
        return this.getServerHosting(cluster, HRegionInfo.FIRST_META_REGIONINFO);
    }

    private JVMClusterUtil.RegionServerThread getServerHosting(MiniHBaseCluster cluster, HRegionInfo region) throws IOException {
        for (JVMClusterUtil.RegionServerThread rst : cluster.getRegionServerThreads()) {
            if (!ProtobufUtil.getOnlineRegions((AdminProtos.AdminService.BlockingInterface)rst.getRegionServer()).contains(region)) continue;
            return rst;
        }
        return null;
    }

    private int getNumberOfOnlineRegions(MiniHBaseCluster cluster) {
        int numFound = 0;
        for (JVMClusterUtil.RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
            numFound += rst.getRegionServer().getNumberOfOnlineRegions();
        }
        return numFound;
    }

    private void assertRegionsAssigned(MiniHBaseCluster cluster, Set<String> expectedRegions) throws IOException {
        int numFound = this.getNumberOfOnlineRegions(cluster);
        if (expectedRegions.size() > numFound) {
            this.log("Expected to find " + expectedRegions.size() + " but only found" + " " + numFound);
            NavigableSet<String> foundRegions = this.getAllOnlineRegions(cluster);
            for (String region : expectedRegions) {
                if (foundRegions.contains(region)) continue;
                this.log("Missing region: " + region);
            }
            Assert.assertEquals((long)expectedRegions.size(), (long)numFound);
        } else if (expectedRegions.size() < numFound) {
            int doubled = numFound - expectedRegions.size();
            this.log("Expected to find " + expectedRegions.size() + " but found" + " " + numFound + " (" + doubled + " double assignments?)");
            NavigableSet<String> doubleRegions = this.getDoubleAssignedRegions(cluster);
            for (String region : doubleRegions) {
                this.log("Region is double assigned: " + region);
            }
            Assert.assertEquals((long)expectedRegions.size(), (long)numFound);
        } else {
            this.log("Success!  Found expected number of " + numFound + " regions");
        }
    }

    private NavigableSet<String> getAllOnlineRegions(MiniHBaseCluster cluster) throws IOException {
        TreeSet<String> online = new TreeSet<String>();
        for (JVMClusterUtil.RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
            for (HRegionInfo region : ProtobufUtil.getOnlineRegions((AdminProtos.AdminService.BlockingInterface)rst.getRegionServer())) {
                online.add(region.getRegionNameAsString());
            }
        }
        return online;
    }

    private NavigableSet<String> getDoubleAssignedRegions(MiniHBaseCluster cluster) throws IOException {
        TreeSet<String> online = new TreeSet<String>();
        TreeSet<String> doubled = new TreeSet<String>();
        for (JVMClusterUtil.RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
            for (HRegionInfo region : ProtobufUtil.getOnlineRegions((AdminProtos.AdminService.BlockingInterface)rst.getRegionServer())) {
                if (online.add(region.getRegionNameAsString())) continue;
                doubled.add(region.getRegionNameAsString());
            }
        }
        return doubled;
    }
}

