/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.namenode.ha;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.JournalSet;
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.hdfs.server.namenode.ha.StandbyCheckpointer;
import org.apache.hadoop.hdfs.util.Canceler;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.ThreadUtil;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;

public class TestStandbyCheckpoints {
    private static final int NUM_DIRS_IN_LOG = 200000;
    protected MiniDFSCluster cluster;
    protected NameNode nn0;
    protected NameNode nn1;
    protected FileSystem fs;
    private static final Log LOG = LogFactory.getLog(TestStandbyCheckpoints.class);

    @Before
    public void setupCluster() throws Exception {
        Configuration conf = new Configuration();
        conf.setInt("dfs.namenode.checkpoint.check.period", 1);
        conf.setInt("dfs.namenode.checkpoint.txns", 5);
        conf.setInt("dfs.ha.tail-edits.period", 1);
        conf.setInt("dfs.namenode.num.checkpoints.retained", 1);
        conf.setInt("dfs.namenode.num.extra.edits.retained", 0);
        conf.setBoolean("dfs.image.compress", true);
        conf.set("dfs.image.compression.codec", SlowCodec.class.getCanonicalName());
        CompressionCodecFactory.setCodecClasses((Configuration)conf, (List)ImmutableList.of(SlowCodec.class));
        MiniDFSNNTopology topology = new MiniDFSNNTopology().addNameservice(new MiniDFSNNTopology.NSConf("ns1").addNN(new MiniDFSNNTopology.NNConf("nn1").setHttpPort(10001)).addNN(new MiniDFSNNTopology.NNConf("nn2").setHttpPort(10002)));
        this.cluster = new MiniDFSCluster.Builder(conf).nnTopology(topology).numDataNodes(0).build();
        this.cluster.waitActive();
        this.nn0 = this.cluster.getNameNode(0);
        this.nn1 = this.cluster.getNameNode(1);
        this.fs = HATestUtil.configureFailoverFs(this.cluster, conf);
        this.cluster.transitionToActive(0);
    }

    @After
    public void shutdownCluster() throws IOException {
        if (this.cluster != null) {
            this.cluster.shutdown();
        }
    }

    @Test
    public void testSBNCheckpoints() throws Exception {
        JournalSet standbyJournalSet = NameNodeAdapter.spyOnJournalSet(this.nn1);
        this.doEdits(0, 10);
        HATestUtil.waitForStandbyToCatchUp(this.nn0, this.nn1);
        HATestUtil.waitForCheckpoint(this.cluster, 1, (List<Integer>)ImmutableList.of((Object)12));
        HATestUtil.waitForCheckpoint(this.cluster, 0, (List<Integer>)ImmutableList.of((Object)12));
        ((JournalSet)Mockito.verify((Object)standbyJournalSet, (VerificationMode)Mockito.never())).purgeLogsOlderThan(Mockito.anyLong());
    }

    @Test
    public void testBothNodesInStandbyState() throws Exception {
        this.doEdits(0, 10);
        this.cluster.transitionToStandby(0);
        HATestUtil.waitForCheckpoint(this.cluster, 1, (List<Integer>)ImmutableList.of((Object)12));
        HATestUtil.waitForCheckpoint(this.cluster, 0, (List<Integer>)ImmutableList.of((Object)12));
        Assert.assertEquals((long)12L, (long)this.nn0.getNamesystem().getFSImage().getMostRecentCheckpointTxId());
        Assert.assertEquals((long)12L, (long)this.nn1.getNamesystem().getFSImage().getMostRecentCheckpointTxId());
        ArrayList dirs = Lists.newArrayList();
        dirs.addAll(FSImageTestUtil.getNameNodeCurrentDirs(this.cluster, 0));
        dirs.addAll(FSImageTestUtil.getNameNodeCurrentDirs(this.cluster, 1));
        FSImageTestUtil.assertParallelFilesAreIdentical(dirs, (Set<String>)ImmutableSet.of());
    }

    @Test
    public void testCheckpointWhenNoNewTransactionsHappened() throws Exception {
        this.cluster.getConfiguration(1).setInt("dfs.namenode.checkpoint.period", 0);
        this.cluster.restartNameNode(1);
        this.nn1 = this.cluster.getNameNode(1);
        FSImage spyImage1 = NameNodeAdapter.spyOnFsImage(this.nn1);
        Thread.sleep(1000L);
        ((FSImage)Mockito.verify((Object)spyImage1, (VerificationMode)Mockito.never())).saveNamespace((FSNamesystem)Mockito.anyObject());
        HATestUtil.waitForStandbyToCatchUp(this.nn0, this.nn1);
        Thread.sleep(2000L);
        ((FSImage)Mockito.verify((Object)spyImage1, (VerificationMode)Mockito.times((int)1))).saveNamespace((FSNamesystem)Mockito.anyObject(), (Canceler)Mockito.anyObject());
    }

    @Test(timeout=120000L)
    public void testCheckpointCancellation() throws Exception {
        this.cluster.transitionToStandby(0);
        URI sharedUri = this.cluster.getSharedEditsDir(0, 1);
        File sharedDir = new File(sharedUri.getPath(), "current");
        File tmpDir = new File(MiniDFSCluster.getBaseDirectory(), "testCheckpointCancellation-tmp");
        FSNamesystem fsn = this.cluster.getNamesystem(0);
        FSImageTestUtil.createAbortedLogWithMkdirs(tmpDir, 200000, 3L, fsn.getLastInodeId() + 1L);
        String fname = NNStorage.getInProgressEditsFileName((long)3L);
        new File(tmpDir, fname).renameTo(new File(sharedDir, fname));
        this.cluster.getConfiguration(1).setInt("dfs.namenode.checkpoint.period", 0);
        this.cluster.restartNameNode(1);
        this.nn1 = this.cluster.getNameNode(1);
        this.cluster.transitionToActive(0);
        boolean canceledOne = false;
        for (int i = 0; i < 10 && !canceledOne; ++i) {
            this.doEdits(i * 10, i * 10 + 10);
            this.cluster.transitionToStandby(0);
            this.cluster.transitionToActive(1);
            this.cluster.transitionToStandby(1);
            this.cluster.transitionToActive(0);
            canceledOne = StandbyCheckpointer.getCanceledCount() > 0;
        }
        Assert.assertTrue((boolean)canceledOne);
    }

    @Test(timeout=300000L)
    public void testStandbyExceptionThrownDuringCheckpoint() throws Exception {
        FSImage spyImage1 = NameNodeAdapter.spyOnFsImage(this.nn1);
        GenericTestUtils.DelayAnswer answerer = new GenericTestUtils.DelayAnswer(LOG);
        ((FSImage)Mockito.doAnswer((Answer)answerer).when((Object)spyImage1)).saveNamespace((FSNamesystem)Mockito.any(FSNamesystem.class), (Canceler)Mockito.any(Canceler.class));
        this.doEdits(0, 1000);
        this.nn0.getRpcServer().rollEditLog();
        answerer.waitForCall();
        answerer.proceed();
        Assert.assertTrue((String)"SBN is not performing checkpoint but it should be.", (answerer.getFireCount() == 1 && answerer.getResultCount() == 0 ? 1 : 0) != 0);
        ThreadUtil.sleepAtLeastIgnoreInterrupts((long)1000L);
        try {
            this.nn1.getRpcServer().getFileInfo("/");
            Assert.fail((String)"Should have thrown StandbyException, but instead succeeded.");
        }
        catch (StandbyException se) {
            GenericTestUtils.assertExceptionContains((String)"is not supported", (Throwable)se);
        }
        Assert.assertTrue((String)"SBN should have still been checkpointing.", (answerer.getFireCount() == 1 && answerer.getResultCount() == 0 ? 1 : 0) != 0);
        answerer.waitForResult();
        Assert.assertTrue((String)"SBN should have finished checkpointing.", (answerer.getFireCount() == 1 && answerer.getResultCount() == 1 ? 1 : 0) != 0);
    }

    private void doEdits(int start, int stop) throws IOException {
        for (int i = start; i < stop; ++i) {
            Path p = new Path("/test" + i);
            this.fs.mkdirs(p);
        }
    }

    public static class SlowCodec
    extends GzipCodec {
        public CompressionOutputStream createOutputStream(OutputStream out) throws IOException {
            CompressionOutputStream ret = super.createOutputStream(out);
            CompressionOutputStream spy = (CompressionOutputStream)Mockito.spy((Object)ret);
            ((CompressionOutputStream)Mockito.doAnswer((Answer)new GenericTestUtils.SleepAnswer(2)).when((Object)spy)).write((byte[])Mockito.any(), Mockito.anyInt(), Mockito.anyInt());
            return spy;
        }
    }
}

