/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.ipc;

import com.google.protobuf.DescriptorProtos;
import java.io.Closeable;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.ConnectException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.SocketFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.UTF8;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.ProtocolMetaInfoPB;
import org.apache.hadoop.ipc.ProtocolProxy;
import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.RpcEngine;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.VersionedProtocol;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.authorize.Service;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.test.MetricsAsserts;
import org.apache.hadoop.test.MockitoUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class TestRPC {
    private static final String ADDRESS = "0.0.0.0";
    public static final Log LOG = LogFactory.getLog(TestRPC.class);
    private static Configuration conf;
    int datasize = 102400;
    int numThreads = 50;
    private static final String ACL_CONFIG = "test.protocol.acl";

    @Before
    public void setupConf() {
        conf = new Configuration();
        conf.setClass("rpc.engine." + StoppedProtocol.class.getName(), StoppedRpcEngine.class, RpcEngine.class);
        UserGroupInformation.setConfiguration((Configuration)conf);
    }

    @Test
    public void testConfRpc() throws IOException {
        RPC.Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class).setInstance((Object)new TestImpl()).setBindAddress(ADDRESS).setPort(0).setNumHandlers(1).setVerbose(false).build();
        int confQ = conf.getInt("ipc.server.handler.queue.size", 100);
        Assert.assertEquals((long)confQ, (long)server.getMaxQueueSize());
        int confReaders = conf.getInt("ipc.server.read.threadpool.size", 1);
        Assert.assertEquals((long)confReaders, (long)server.getNumReaders());
        server.stop();
        server = new RPC.Builder(conf).setProtocol(TestProtocol.class).setInstance((Object)new TestImpl()).setBindAddress(ADDRESS).setPort(0).setNumHandlers(1).setnumReaders(3).setQueueSizePerHandler(200).setVerbose(false).build();
        Assert.assertEquals((long)3L, (long)server.getNumReaders());
        Assert.assertEquals((long)200L, (long)server.getMaxQueueSize());
        server.stop();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testProxyAddress() throws IOException {
        RPC.Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class).setInstance((Object)new TestImpl()).setBindAddress(ADDRESS).setPort(0).build();
        TestProtocol proxy = null;
        try {
            server.start();
            InetSocketAddress addr = NetUtils.getConnectAddress((Server)server);
            proxy = (TestProtocol)RPC.getProxy(TestProtocol.class, (long)1L, (InetSocketAddress)addr, (Configuration)conf);
            Assert.assertEquals((Object)addr, (Object)RPC.getServerAddress((Object)proxy));
        }
        catch (Throwable throwable) {
            server.stop();
            if (proxy != null) {
                RPC.stopProxy(proxy);
            }
            throw throwable;
        }
        server.stop();
        if (proxy != null) {
            RPC.stopProxy((Object)proxy);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSlowRpc() throws IOException {
        System.out.println("Testing Slow RPC");
        RPC.Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class).setInstance((Object)new TestImpl()).setBindAddress(ADDRESS).setPort(0).setNumHandlers(2).setVerbose(false).build();
        TestProtocol proxy = null;
        try {
            server.start();
            InetSocketAddress addr = NetUtils.getConnectAddress((Server)server);
            proxy = (TestProtocol)RPC.getProxy(TestProtocol.class, (long)1L, (InetSocketAddress)addr, (Configuration)conf);
            SlowRPC slowrpc = new SlowRPC(proxy);
            Thread thread = new Thread((Runnable)slowrpc, "SlowRPC");
            thread.start();
            Assert.assertTrue((String)"Slow RPC should not have finished1.", (!slowrpc.isDone() ? 1 : 0) != 0);
            proxy.slowPing(false);
            Assert.assertTrue((String)"Slow RPC should not have finished2.", (!slowrpc.isDone() ? 1 : 0) != 0);
            proxy.slowPing(false);
            while (!slowrpc.isDone()) {
                System.out.println("Waiting for slow RPC to get done.");
                try {
                    Thread.sleep(1000L);
                }
                catch (InterruptedException e) {}
            }
        }
        catch (Throwable throwable) {
            server.stop();
            if (proxy != null) {
                RPC.stopProxy(proxy);
            }
            System.out.println("Down slow rpc testing");
            throw throwable;
        }
        server.stop();
        if (proxy != null) {
            RPC.stopProxy((Object)proxy);
        }
        System.out.println("Down slow rpc testing");
    }

    @Test
    public void testCalls() throws IOException {
        this.testCallsInternal(conf);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testCallsInternal(Configuration conf) throws IOException {
        RPC.Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class).setInstance((Object)new TestImpl()).setBindAddress(ADDRESS).setPort(0).build();
        TestProtocol proxy = null;
        try {
            int i;
            server.start();
            InetSocketAddress addr = NetUtils.getConnectAddress((Server)server);
            proxy = (TestProtocol)RPC.getProxy(TestProtocol.class, (long)1L, (InetSocketAddress)addr, (Configuration)conf);
            proxy.ping();
            String stringResult = proxy.echo("foo");
            Assert.assertEquals((Object)stringResult, (Object)"foo");
            stringResult = proxy.echo((String)null);
            Assert.assertEquals((Object)stringResult, null);
            MetricsRecordBuilder rb = MetricsAsserts.getMetrics(server.rpcMetrics.name());
            MetricsAsserts.assertCounter("RpcProcessingTimeNumOps", 3L, rb);
            MetricsAsserts.assertCounterGt("SentBytes", 0L, rb);
            MetricsAsserts.assertCounterGt("ReceivedBytes", 0L, rb);
            rb = MetricsAsserts.getMetrics(server.rpcDetailedMetrics.name());
            MetricsAsserts.assertCounter("EchoNumOps", 2L, rb);
            MetricsAsserts.assertCounter("PingNumOps", 1L, rb);
            Object[] stringResults = proxy.echo(new String[]{"foo", "bar"});
            Assert.assertTrue((boolean)Arrays.equals(stringResults, new String[]{"foo", "bar"}));
            stringResults = proxy.echo((String[])null);
            Assert.assertTrue((boolean)Arrays.equals(stringResults, null));
            UTF8 utf8Result = (UTF8)proxy.echo((Writable)new UTF8("hello world"));
            Assert.assertEquals((Object)utf8Result, (Object)new UTF8("hello world"));
            utf8Result = (UTF8)proxy.echo((Writable)((UTF8)null));
            Assert.assertEquals((Object)utf8Result, null);
            int intResult = proxy.add(1, 2);
            Assert.assertEquals((long)intResult, (long)3L);
            intResult = proxy.add(new int[]{1, 2});
            Assert.assertEquals((long)intResult, (long)3L);
            DescriptorProtos.EnumDescriptorProto sendProto = DescriptorProtos.EnumDescriptorProto.newBuilder().setName("test").build();
            DescriptorProtos.EnumDescriptorProto retProto = proxy.exchangeProto(sendProto);
            Assert.assertEquals((Object)sendProto, (Object)retProto);
            Assert.assertNotSame((Object)sendProto, (Object)retProto);
            boolean caught = false;
            try {
                proxy.error();
            }
            catch (IOException e) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug((Object)("Caught " + e));
                }
                caught = true;
            }
            Assert.assertTrue((boolean)caught);
            proxy.testServerGet();
            System.out.println("Starting multi-threaded RPC test...");
            server.setSocketSendBufSize(1024);
            Thread[] threadId = new Thread[this.numThreads];
            for (i = 0; i < this.numThreads; ++i) {
                Transactions trans = new Transactions(proxy, this.datasize);
                threadId[i] = new Thread((Runnable)trans, "TransactionThread-" + i);
                threadId[i].start();
            }
            System.out.println("Waiting for all threads to finish RPCs...");
            for (i = 0; i < this.numThreads; ++i) {
                try {
                    threadId[i].join();
                    continue;
                }
                catch (InterruptedException e) {
                    --i;
                }
            }
        }
        catch (Throwable throwable) {
            server.stop();
            if (proxy != null) {
                RPC.stopProxy(proxy);
            }
            throw throwable;
        }
        server.stop();
        if (proxy != null) {
            RPC.stopProxy((Object)proxy);
        }
    }

    @Test
    public void testStandaloneClient() throws IOException {
        try {
            TestProtocol proxy = (TestProtocol)RPC.waitForProxy(TestProtocol.class, (long)1L, (InetSocketAddress)new InetSocketAddress(ADDRESS, 20), (Configuration)conf, (long)15000L);
            proxy.echo("");
            Assert.fail((String)"We should not have reached here");
        }
        catch (ConnectException connectException) {
            // empty catch block
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doRPCs(Configuration conf, boolean expectFailure) throws IOException {
        block10: {
            RPC.Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class).setInstance((Object)new TestImpl()).setBindAddress(ADDRESS).setPort(0).setNumHandlers(5).setVerbose(true).build();
            server.refreshServiceAcl(conf, (PolicyProvider)new TestPolicyProvider());
            TestProtocol proxy = null;
            server.start();
            InetSocketAddress addr = NetUtils.getConnectAddress((Server)server);
            try {
                proxy = (TestProtocol)RPC.getProxy(TestProtocol.class, (long)1L, (InetSocketAddress)addr, (Configuration)conf);
                proxy.ping();
                if (expectFailure) {
                    Assert.fail((String)"Expect RPC.getProxy to fail with AuthorizationException!");
                }
            }
            catch (RemoteException e) {
                if (expectFailure) {
                    Assert.assertTrue((boolean)(e.unwrapRemoteException() instanceof AuthorizationException));
                    break block10;
                }
                throw e;
            }
            finally {
                server.stop();
                if (proxy != null) {
                    RPC.stopProxy((Object)proxy);
                }
                MetricsRecordBuilder rb = MetricsAsserts.getMetrics(server.rpcMetrics.name());
                if (expectFailure) {
                    MetricsAsserts.assertCounter("RpcAuthorizationFailures", 1, rb);
                } else {
                    MetricsAsserts.assertCounter("RpcAuthorizationSuccesses", 1, rb);
                }
                MetricsAsserts.assertCounter("RpcAuthenticationFailures", 0, rb);
                MetricsAsserts.assertCounter("RpcAuthenticationSuccesses", 0, rb);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testServerAddress() throws IOException {
        RPC.Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class).setInstance((Object)new TestImpl()).setBindAddress(ADDRESS).setPort(0).setNumHandlers(5).setVerbose(true).build();
        InetSocketAddress bindAddr = null;
        try {
            bindAddr = NetUtils.getConnectAddress((Server)server);
        }
        finally {
            server.stop();
        }
        Assert.assertEquals((Object)bindAddr.getAddress(), (Object)InetAddress.getLocalHost());
    }

    @Test
    public void testAuthorization() throws IOException {
        Configuration conf = new Configuration();
        conf.setBoolean("hadoop.security.authorization", true);
        conf.set(ACL_CONFIG, "*");
        this.doRPCs(conf, false);
        conf.set(ACL_CONFIG, "invalid invalid");
        this.doRPCs(conf, true);
        conf.setInt("ipc.server.read.threadpool.size", 2);
        conf.set(ACL_CONFIG, "*");
        this.doRPCs(conf, false);
        conf.set(ACL_CONFIG, "invalid invalid");
        this.doRPCs(conf, true);
    }

    public void testNoPings() throws IOException {
        Configuration conf = new Configuration();
        conf.setBoolean("ipc.client.ping", false);
        new TestRPC().testCallsInternal(conf);
        conf.setInt("ipc.server.read.threadpool.size", 2);
        new TestRPC().testCallsInternal(conf);
    }

    @Test(expected=HadoopIllegalArgumentException.class)
    public void testStopNonRegisteredProxy() throws IOException {
        RPC.stopProxy(null);
    }

    @Test
    public void testStopMockObject() throws IOException {
        RPC.stopProxy((Object)MockitoUtil.mockProtocol(TestProtocol.class));
    }

    @Test
    public void testStopProxy() throws IOException {
        StoppedProtocol proxy = (StoppedProtocol)RPC.getProxy(StoppedProtocol.class, (long)0L, null, (Configuration)conf);
        StoppedInvocationHandler invocationHandler = (StoppedInvocationHandler)Proxy.getInvocationHandler(proxy);
        Assert.assertEquals((long)invocationHandler.getCloseCalled(), (long)0L);
        RPC.stopProxy((Object)proxy);
        Assert.assertEquals((long)invocationHandler.getCloseCalled(), (long)1L);
    }

    @Test
    public void testWrappedStopProxy() throws IOException {
        StoppedProtocol wrappedProxy = (StoppedProtocol)RPC.getProxy(StoppedProtocol.class, (long)0L, null, (Configuration)conf);
        StoppedInvocationHandler invocationHandler = (StoppedInvocationHandler)Proxy.getInvocationHandler(wrappedProxy);
        StoppedProtocol proxy = (StoppedProtocol)RetryProxy.create(StoppedProtocol.class, (Object)wrappedProxy, (RetryPolicy)RetryPolicies.RETRY_FOREVER);
        Assert.assertEquals((long)invocationHandler.getCloseCalled(), (long)0L);
        RPC.stopProxy((Object)proxy);
        Assert.assertEquals((long)invocationHandler.getCloseCalled(), (long)1L);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testErrorMsgForInsecureClient() throws IOException {
        Configuration serverConf = new Configuration(conf);
        SecurityUtil.setAuthenticationMethod((UserGroupInformation.AuthenticationMethod)UserGroupInformation.AuthenticationMethod.KERBEROS, (Configuration)serverConf);
        UserGroupInformation.setConfiguration((Configuration)serverConf);
        RPC.Server server = new RPC.Builder(serverConf).setProtocol(TestProtocol.class).setInstance((Object)new TestImpl()).setBindAddress(ADDRESS).setPort(0).setNumHandlers(5).setVerbose(true).build();
        server.start();
        UserGroupInformation.setConfiguration((Configuration)conf);
        boolean succeeded = false;
        InetSocketAddress addr = NetUtils.getConnectAddress((Server)server);
        TestProtocol proxy = null;
        try {
            proxy = (TestProtocol)RPC.getProxy(TestProtocol.class, (long)1L, (InetSocketAddress)addr, (Configuration)conf);
            proxy.echo("");
        }
        catch (RemoteException e) {
            LOG.info((Object)("LOGGING MESSAGE: " + e.getLocalizedMessage()));
            Assert.assertTrue((boolean)(e.unwrapRemoteException() instanceof AccessControlException));
            succeeded = true;
        }
        finally {
            server.stop();
            if (proxy != null) {
                RPC.stopProxy((Object)proxy);
            }
        }
        Assert.assertTrue((boolean)succeeded);
        conf.setInt("ipc.server.read.threadpool.size", 2);
        UserGroupInformation.setConfiguration((Configuration)serverConf);
        RPC.Server multiServer = new RPC.Builder(serverConf).setProtocol(TestProtocol.class).setInstance((Object)new TestImpl()).setBindAddress(ADDRESS).setPort(0).setNumHandlers(5).setVerbose(true).build();
        multiServer.start();
        succeeded = false;
        InetSocketAddress mulitServerAddr = NetUtils.getConnectAddress((Server)multiServer);
        proxy = null;
        try {
            UserGroupInformation.setConfiguration((Configuration)conf);
            proxy = (TestProtocol)RPC.getProxy(TestProtocol.class, (long)1L, (InetSocketAddress)mulitServerAddr, (Configuration)conf);
            proxy.echo("");
        }
        catch (RemoteException e) {
            LOG.info((Object)("LOGGING MESSAGE: " + e.getLocalizedMessage()));
            Assert.assertTrue((boolean)(e.unwrapRemoteException() instanceof AccessControlException));
            succeeded = true;
        }
        finally {
            multiServer.stop();
            if (proxy != null) {
                RPC.stopProxy((Object)proxy);
            }
        }
        Assert.assertTrue((boolean)succeeded);
    }

    private static int countThreads(String search) {
        ThreadInfo[] infos;
        ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
        int count = 0;
        block0: for (ThreadInfo info : infos = threadBean.getThreadInfo(threadBean.getAllThreadIds(), 20)) {
            if (info == null) continue;
            for (StackTraceElement elem : info.getStackTrace()) {
                if (!elem.getClassName().contains(search)) continue;
                ++count;
                continue block0;
            }
        }
        return count;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testStopsAllThreads() throws IOException, InterruptedException {
        int threadsBefore = TestRPC.countThreads("Server$Listener$Reader");
        Assert.assertEquals((String)"Expect no Reader threads running before test", (long)0L, (long)threadsBefore);
        RPC.Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class).setInstance((Object)new TestImpl()).setBindAddress(ADDRESS).setPort(0).setNumHandlers(5).setVerbose(true).build();
        server.start();
        try {
            int threadsRunning = 0;
            long totalSleepTime = 0L;
            do {
                Thread.sleep(10L);
            } while ((threadsRunning = TestRPC.countThreads("Server$Listener$Reader")) == 0 && (totalSleepTime += 10L) < 5000L);
            threadsRunning = TestRPC.countThreads("Server$Listener$Reader");
            Assert.assertTrue((threadsRunning > 0 ? 1 : 0) != 0);
        }
        finally {
            server.stop();
        }
        int threadsAfter = TestRPC.countThreads("Server$Listener$Reader");
        Assert.assertEquals((String)"Expect no Reader threads left running after test", (long)0L, (long)threadsAfter);
    }

    @Test
    public void testRPCBuilder() throws IOException {
        block8: {
            block7: {
                block6: {
                    try {
                        new RPC.Builder(null).setProtocol(TestProtocol.class).setInstance((Object)new TestImpl()).setBindAddress(ADDRESS).setPort(0).setNumHandlers(5).setVerbose(true).build();
                        Assert.fail((String)"Didn't throw HadoopIllegalArgumentException");
                    }
                    catch (Exception e) {
                        if (e instanceof HadoopIllegalArgumentException) break block6;
                        Assert.fail((String)("Expecting HadoopIllegalArgumentException but caught " + e));
                    }
                }
                try {
                    new RPC.Builder(conf).setInstance((Object)new TestImpl()).setBindAddress(ADDRESS).setPort(0).setNumHandlers(5).setVerbose(true).build();
                    Assert.fail((String)"Didn't throw HadoopIllegalArgumentException");
                }
                catch (Exception e) {
                    if (e instanceof HadoopIllegalArgumentException) break block7;
                    Assert.fail((String)("Expecting HadoopIllegalArgumentException but caught " + e));
                }
            }
            try {
                new RPC.Builder(conf).setProtocol(TestProtocol.class).setBindAddress(ADDRESS).setPort(0).setNumHandlers(5).setVerbose(true).build();
                Assert.fail((String)"Didn't throw HadoopIllegalArgumentException");
            }
            catch (Exception e) {
                if (e instanceof HadoopIllegalArgumentException) break block8;
                Assert.fail((String)("Expecting HadoopIllegalArgumentException but caught " + e));
            }
        }
    }

    @Test(timeout=90000L)
    public void testRPCInterruptedSimple() throws IOException {
        Configuration conf = new Configuration();
        RPC.Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class).setInstance((Object)new TestImpl()).setBindAddress(ADDRESS).setPort(0).setNumHandlers(5).setVerbose(true).setSecretManager(null).build();
        server.start();
        InetSocketAddress addr = NetUtils.getConnectAddress((Server)server);
        TestProtocol proxy = (TestProtocol)RPC.getProxy(TestProtocol.class, (long)1L, (InetSocketAddress)addr, (Configuration)conf);
        proxy.ping();
        Thread.currentThread().interrupt();
        try {
            proxy.ping();
            Assert.fail((String)"Interruption did not cause IPC to fail");
        }
        catch (IOException ioe) {
            if (!ioe.toString().contains("InterruptedException")) {
                throw ioe;
            }
            Thread.interrupted();
        }
    }

    @Test(timeout=30000L)
    public void testRPCInterrupted() throws IOException, InterruptedException {
        Configuration conf = new Configuration();
        RPC.Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class).setInstance((Object)new TestImpl()).setBindAddress(ADDRESS).setPort(0).setNumHandlers(5).setVerbose(true).setSecretManager(null).build();
        server.start();
        int numConcurrentRPC = 200;
        InetSocketAddress addr = NetUtils.getConnectAddress((Server)server);
        final CyclicBarrier barrier = new CyclicBarrier(numConcurrentRPC);
        final CountDownLatch latch = new CountDownLatch(numConcurrentRPC);
        final AtomicBoolean leaderRunning = new AtomicBoolean(true);
        final AtomicReference error = new AtomicReference();
        Thread leaderThread = null;
        for (int i = 0; i < numConcurrentRPC; ++i) {
            final int num = i;
            final TestProtocol proxy = (TestProtocol)RPC.getProxy(TestProtocol.class, (long)1L, (InetSocketAddress)addr, (Configuration)conf);
            Thread rpcThread = new Thread(new Runnable(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void run() {
                    try {
                        barrier.await();
                        while (num == 0 || leaderRunning.get()) {
                            proxy.slowPing(false);
                        }
                        proxy.slowPing(false);
                    }
                    catch (Exception e) {
                        if (num == 0) {
                            leaderRunning.set(false);
                        } else {
                            error.set(e);
                        }
                        LOG.error((Object)e);
                    }
                    finally {
                        latch.countDown();
                    }
                }
            });
            rpcThread.start();
            if (leaderThread != null) continue;
            leaderThread = rpcThread;
        }
        Thread.sleep(1000L);
        while (leaderRunning.get()) {
            leaderThread.interrupt();
        }
        latch.await();
        Assert.assertTrue((String)("rpc got exception " + error.get()), (error.get() == null ? 1 : 0) != 0);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConnectionPing() throws Exception {
        Configuration conf = new Configuration();
        int pingInterval = 50;
        conf.setBoolean("ipc.client.ping", true);
        conf.setInt("ipc.ping.interval", pingInterval);
        RPC.Server server = new RPC.Builder(conf).setProtocol(TestProtocol.class).setInstance((Object)new TestImpl()).setBindAddress(ADDRESS).setPort(0).setNumHandlers(5).setVerbose(true).build();
        server.start();
        TestProtocol proxy = (TestProtocol)RPC.getProxy(TestProtocol.class, (long)1L, (InetSocketAddress)server.getListenerAddress(), (Configuration)conf);
        try {
            proxy.sleep(pingInterval * 4);
        }
        finally {
            if (proxy != null) {
                RPC.stopProxy((Object)proxy);
            }
        }
    }

    public static void main(String[] args) throws IOException {
        new TestRPC().testCallsInternal(conf);
    }

    private static class TestPolicyProvider
    extends PolicyProvider {
        private TestPolicyProvider() {
        }

        public Service[] getServices() {
            return new Service[]{new Service(TestRPC.ACL_CONFIG, TestProtocol.class)};
        }
    }

    private static class StoppedInvocationHandler
    implements InvocationHandler,
    Closeable {
        private int closeCalled = 0;

        private StoppedInvocationHandler() {
        }

        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            return null;
        }

        @Override
        public void close() throws IOException {
            ++this.closeCalled;
        }

        public int getCloseCalled() {
            return this.closeCalled;
        }
    }

    private static class StoppedRpcEngine
    implements RpcEngine {
        private StoppedRpcEngine() {
        }

        public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy) throws IOException {
            Object proxy = Proxy.newProxyInstance(protocol.getClassLoader(), new Class[]{protocol}, (InvocationHandler)new StoppedInvocationHandler());
            return new ProtocolProxy(protocol, proxy, false);
        }

        public RPC.Server getServer(Class<?> protocol, Object instance, String bindAddress, int port, int numHandlers, int numReaders, int queueSizePerHandler, boolean verbose, Configuration conf, SecretManager<? extends TokenIdentifier> secretManager, String portRangeConfig) throws IOException {
            return null;
        }

        public ProtocolProxy<ProtocolMetaInfoPB> getProtocolMetaInfoProxy(Client.ConnectionId connId, Configuration conf, SocketFactory factory) throws IOException {
            throw new UnsupportedOperationException("This proxy is not supported");
        }
    }

    private static interface StoppedProtocol {
        public static final long versionID = 0L;

        public void stop();
    }

    static class SlowRPC
    implements Runnable {
        private TestProtocol proxy;
        private volatile boolean done;

        SlowRPC(TestProtocol proxy) {
            this.proxy = proxy;
            this.done = false;
        }

        boolean isDone() {
            return this.done;
        }

        @Override
        public void run() {
            try {
                this.proxy.slowPing(true);
                this.done = true;
            }
            catch (IOException e) {
                Assert.assertTrue((String)("SlowRPC ping exception " + e), (boolean)false);
            }
        }
    }

    static class Transactions
    implements Runnable {
        int datasize;
        TestProtocol proxy;

        Transactions(TestProtocol proxy, int datasize) {
            this.proxy = proxy;
            this.datasize = datasize;
        }

        @Override
        public void run() {
            int[] indata = new int[this.datasize];
            int[] outdata = null;
            int val = 0;
            try {
                outdata = this.proxy.exchange(indata);
                val = this.proxy.add(1, 2);
            }
            catch (IOException e) {
                Assert.assertTrue((String)("Exception from RPC exchange() " + e), (boolean)false);
            }
            Assert.assertEquals((long)indata.length, (long)outdata.length);
            Assert.assertEquals((long)val, (long)3L);
            for (int i = 0; i < outdata.length; ++i) {
                Assert.assertEquals((long)outdata[i], (long)i);
            }
        }
    }

    public static class TestImpl
    implements TestProtocol {
        int fastPingCounter = 0;

        public long getProtocolVersion(String protocol, long clientVersion) {
            return 1L;
        }

        public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, int hashcode) {
            return new ProtocolSignature(1L, null);
        }

        @Override
        public void ping() {
        }

        @Override
        public synchronized void slowPing(boolean shouldSlow) {
            if (shouldSlow) {
                while (this.fastPingCounter < 2) {
                    try {
                        this.wait();
                    }
                    catch (InterruptedException interruptedException) {}
                }
                this.fastPingCounter -= 2;
            } else {
                ++this.fastPingCounter;
                this.notify();
            }
        }

        @Override
        public void sleep(long delay) throws InterruptedException {
            Thread.sleep(delay);
        }

        @Override
        public String echo(String value) throws IOException {
            return value;
        }

        @Override
        public String[] echo(String[] values) throws IOException {
            return values;
        }

        @Override
        public Writable echo(Writable writable) {
            return writable;
        }

        @Override
        public int add(int v1, int v2) {
            return v1 + v2;
        }

        @Override
        public int add(int[] values) {
            int sum = 0;
            for (int i = 0; i < values.length; ++i) {
                sum += values[i];
            }
            return sum;
        }

        @Override
        public int error() throws IOException {
            throw new IOException("bobo");
        }

        @Override
        public void testServerGet() throws IOException {
            if (!(Server.get() instanceof RPC.Server)) {
                throw new IOException("Server.get() failed");
            }
        }

        @Override
        public int[] exchange(int[] values) {
            for (int i = 0; i < values.length; ++i) {
                values[i] = i;
            }
            return values;
        }

        @Override
        public DescriptorProtos.EnumDescriptorProto exchangeProto(DescriptorProtos.EnumDescriptorProto arg) {
            return arg;
        }
    }

    public static interface TestProtocol
    extends VersionedProtocol {
        public static final long versionID = 1L;

        public void ping() throws IOException;

        public void slowPing(boolean var1) throws IOException;

        public void sleep(long var1) throws IOException, InterruptedException;

        public String echo(String var1) throws IOException;

        public String[] echo(String[] var1) throws IOException;

        public Writable echo(Writable var1) throws IOException;

        public int add(int var1, int var2) throws IOException;

        public int add(int[] var1) throws IOException;

        public int error() throws IOException;

        public void testServerGet() throws IOException;

        public int[] exchange(int[] var1) throws IOException;

        public DescriptorProtos.EnumDescriptorProto exchangeProto(DescriptorProtos.EnumDescriptorProto var1);
    }
}

