/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.transport;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.Version;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.internal.logging.InternalLoggerFactory;
import io.netty.util.internal.logging.Slf4JLoggerFactory;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.util.EnumMap;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import org.apache.cassandra.auth.IAuthenticator;
import org.apache.cassandra.auth.ISaslAwareAuthenticator;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.EncryptionOptions;
import org.apache.cassandra.metrics.ClientMetrics;
import org.apache.cassandra.security.SSLFactory;
import org.apache.cassandra.service.CassandraDaemon;
import org.apache.cassandra.service.IEndpointLifecycleSubscriber;
import org.apache.cassandra.service.IMigrationListener;
import org.apache.cassandra.service.MigrationManager;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.transport.CBUtil;
import org.apache.cassandra.transport.Connection;
import org.apache.cassandra.transport.Event;
import org.apache.cassandra.transport.Frame;
import org.apache.cassandra.transport.Message;
import org.apache.cassandra.transport.RequestThreadPoolExecutor;
import org.apache.cassandra.transport.ServerConnection;
import org.apache.cassandra.transport.messages.EventMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Server
implements CassandraDaemon.Server {
    private static final Logger logger;
    private static final boolean enableEpoll;
    public static final int VERSION_3 = 3;
    public static final int CURRENT_VERSION = 3;
    private final ConnectionTracker connectionTracker = new ConnectionTracker();
    private final Connection.Factory connectionFactory = new Connection.Factory(){

        @Override
        public Connection newConnection(Channel channel, int version) {
            return new ServerConnection(channel, version, Server.this.connectionTracker);
        }
    };
    public final InetSocketAddress socket;
    private final AtomicBoolean isRunning = new AtomicBoolean(false);
    private EventLoopGroup workerGroup;
    private EventExecutor eventExecutorGroup;

    public Server(InetSocketAddress socket) {
        this.socket = socket;
        EventNotifier notifier = new EventNotifier(this);
        StorageService.instance.register(notifier);
        MigrationManager.instance.register(notifier);
        this.registerMetrics();
    }

    public Server(String hostname, int port) {
        this(new InetSocketAddress(hostname, port));
    }

    public Server(InetAddress host, int port) {
        this(new InetSocketAddress(host, port));
    }

    public Server(int port) {
        this(new InetSocketAddress(port));
    }

    @Override
    public void start() {
        if (!this.isRunning()) {
            this.run();
        }
    }

    @Override
    public void stop() {
        if (this.isRunning.compareAndSet(true, false)) {
            this.close();
        }
    }

    @Override
    public boolean isRunning() {
        return this.isRunning.get();
    }

    private void run() {
        boolean hasEpoll;
        IAuthenticator authenticator = DatabaseDescriptor.getAuthenticator();
        if (authenticator.requireAuthentication() && !(authenticator instanceof ISaslAwareAuthenticator)) {
            logger.error("Not starting native transport as the configured IAuthenticator is not capable of SASL authentication");
            this.isRunning.compareAndSet(true, false);
            return;
        }
        this.eventExecutorGroup = new RequestThreadPoolExecutor();
        boolean bl = hasEpoll = enableEpoll ? Epoll.isAvailable() : false;
        if (hasEpoll) {
            this.workerGroup = new EpollEventLoopGroup();
            logger.info("Netty using native Epoll event loop");
        } else {
            this.workerGroup = new NioEventLoopGroup();
            logger.info("Netty using Java NIO event loop");
        }
        ServerBootstrap bootstrap = ((ServerBootstrap)new ServerBootstrap().group(this.workerGroup).channel(hasEpoll ? EpollServerSocketChannel.class : NioServerSocketChannel.class)).childOption(ChannelOption.TCP_NODELAY, (Object)true).childOption(ChannelOption.SO_LINGER, (Object)0).childOption(ChannelOption.SO_KEEPALIVE, (Object)DatabaseDescriptor.getRpcKeepAlive()).childOption(ChannelOption.ALLOCATOR, (Object)CBUtil.allocator).childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, (Object)32768).childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, (Object)8192);
        EncryptionOptions.ClientEncryptionOptions clientEnc = DatabaseDescriptor.getClientEncryptionOptions();
        if (clientEnc.enabled) {
            logger.info("Enabling encrypted CQL connections between client and server");
            bootstrap.childHandler((ChannelHandler)new SecureInitializer(this, clientEnc));
        } else {
            bootstrap.childHandler((ChannelHandler)new Initializer(this));
        }
        logger.info("Using Netty Version: {}", Version.identify().entrySet());
        logger.info("Starting listening for CQL clients on {}...", (Object)this.socket);
        ChannelFuture bindFuture = bootstrap.bind((SocketAddress)this.socket);
        if (!bindFuture.awaitUninterruptibly().isSuccess()) {
            throw new IllegalStateException(String.format("Failed to bind port %d on %s.", this.socket.getPort(), this.socket.getAddress().getHostAddress()));
        }
        this.connectionTracker.allChannels.add((Object)bindFuture.channel());
        this.isRunning.set(true);
    }

    private void registerMetrics() {
        ClientMetrics.instance.addCounter("connectedNativeClients", new Callable<Integer>(){

            @Override
            public Integer call() throws Exception {
                return Server.this.connectionTracker.getConnectedClients();
            }
        });
    }

    private void close() {
        this.connectionTracker.closeAll();
        this.workerGroup.shutdownGracefully();
        this.workerGroup = null;
        this.eventExecutorGroup.shutdown();
        this.eventExecutorGroup = null;
        logger.info("Stop listening for CQL clients");
    }

    static {
        InternalLoggerFactory.setDefaultFactory((InternalLoggerFactory)new Slf4JLoggerFactory());
        logger = LoggerFactory.getLogger(Server.class);
        enableEpoll = Boolean.valueOf(System.getProperty("cassandra.native.epoll.enabled", "true"));
    }

    private static class EventNotifier
    implements IEndpointLifecycleSubscriber,
    IMigrationListener {
        private final Server server;
        private static final InetAddress bindAll;

        private EventNotifier(Server server) {
            this.server = server;
        }

        private InetAddress getRpcAddress(InetAddress endpoint) {
            try {
                InetAddress rpcAddress = InetAddress.getByName(StorageService.instance.getRpcaddress(endpoint));
                return rpcAddress.equals(bindAll) ? endpoint : rpcAddress;
            }
            catch (UnknownHostException e) {
                logger.error("Problem retrieving RPC address for {}", (Object)endpoint, (Object)e);
                return endpoint;
            }
        }

        @Override
        public void onJoinCluster(InetAddress endpoint) {
            this.server.connectionTracker.send(Event.TopologyChange.newNode(this.getRpcAddress(endpoint), this.server.socket.getPort()));
        }

        @Override
        public void onLeaveCluster(InetAddress endpoint) {
            this.server.connectionTracker.send(Event.TopologyChange.removedNode(this.getRpcAddress(endpoint), this.server.socket.getPort()));
        }

        @Override
        public void onMove(InetAddress endpoint) {
            this.server.connectionTracker.send(Event.TopologyChange.movedNode(this.getRpcAddress(endpoint), this.server.socket.getPort()));
        }

        @Override
        public void onUp(InetAddress endpoint) {
            this.server.connectionTracker.send(Event.StatusChange.nodeUp(this.getRpcAddress(endpoint), this.server.socket.getPort()));
        }

        @Override
        public void onDown(InetAddress endpoint) {
            this.server.connectionTracker.send(Event.StatusChange.nodeDown(this.getRpcAddress(endpoint), this.server.socket.getPort()));
        }

        @Override
        public void onCreateKeyspace(String ksName) {
            this.server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, ksName));
        }

        @Override
        public void onCreateColumnFamily(String ksName, String cfName) {
            this.server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.TABLE, ksName, cfName));
        }

        @Override
        public void onCreateUserType(String ksName, String typeName) {
            this.server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.TYPE, ksName, typeName));
        }

        @Override
        public void onUpdateKeyspace(String ksName) {
            this.server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, ksName));
        }

        @Override
        public void onUpdateColumnFamily(String ksName, String cfName) {
            this.server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.TABLE, ksName, cfName));
        }

        @Override
        public void onUpdateUserType(String ksName, String typeName) {
            this.server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.TYPE, ksName, typeName));
        }

        @Override
        public void onDropKeyspace(String ksName) {
            this.server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, ksName));
        }

        @Override
        public void onDropColumnFamily(String ksName, String cfName) {
            this.server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.TABLE, ksName, cfName));
        }

        @Override
        public void onDropUserType(String ksName, String typeName) {
            this.server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.TYPE, ksName, typeName));
        }

        static {
            try {
                bindAll = InetAddress.getByAddress(new byte[4]);
            }
            catch (UnknownHostException e) {
                throw new AssertionError((Object)e);
            }
        }
    }

    private static class SecureInitializer
    extends Initializer {
        private final SSLContext sslContext;
        private final EncryptionOptions encryptionOptions;

        public SecureInitializer(Server server, EncryptionOptions encryptionOptions) {
            super(server);
            this.encryptionOptions = encryptionOptions;
            try {
                this.sslContext = SSLFactory.createSSLContext(encryptionOptions, encryptionOptions.require_client_auth);
            }
            catch (IOException e) {
                throw new RuntimeException("Failed to setup secure pipeline", e);
            }
        }

        @Override
        protected void initChannel(Channel channel) throws Exception {
            SSLEngine sslEngine = this.sslContext.createSSLEngine();
            sslEngine.setUseClientMode(false);
            sslEngine.setEnabledCipherSuites(this.encryptionOptions.cipher_suites);
            sslEngine.setNeedClientAuth(this.encryptionOptions.require_client_auth);
            SslHandler sslHandler = new SslHandler(sslEngine);
            super.initChannel(channel);
            channel.pipeline().addFirst("ssl", (ChannelHandler)sslHandler);
        }
    }

    private static class Initializer
    extends ChannelInitializer {
        private static final Message.ProtocolDecoder messageDecoder = new Message.ProtocolDecoder();
        private static final Message.ProtocolEncoder messageEncoder = new Message.ProtocolEncoder();
        private static final Frame.Decompressor frameDecompressor = new Frame.Decompressor();
        private static final Frame.Compressor frameCompressor = new Frame.Compressor();
        private static final Frame.Encoder frameEncoder = new Frame.Encoder();
        private static final Message.Dispatcher dispatcher = new Message.Dispatcher();
        private final Server server;

        public Initializer(Server server) {
            this.server = server;
        }

        protected void initChannel(Channel channel) throws Exception {
            ChannelPipeline pipeline = channel.pipeline();
            pipeline.addLast("frameDecoder", (ChannelHandler)new Frame.Decoder(this.server.connectionFactory));
            pipeline.addLast("frameEncoder", (ChannelHandler)frameEncoder);
            pipeline.addLast("frameDecompressor", (ChannelHandler)frameDecompressor);
            pipeline.addLast("frameCompressor", (ChannelHandler)frameCompressor);
            pipeline.addLast("messageDecoder", (ChannelHandler)messageDecoder);
            pipeline.addLast("messageEncoder", (ChannelHandler)messageEncoder);
            pipeline.addLast((EventExecutorGroup)this.server.eventExecutorGroup, "executor", (ChannelHandler)dispatcher);
        }
    }

    public static class ConnectionTracker
    implements Connection.Tracker {
        public final ChannelGroup allChannels = new DefaultChannelGroup((EventExecutor)GlobalEventExecutor.INSTANCE);
        private final EnumMap<Event.Type, ChannelGroup> groups = new EnumMap(Event.Type.class);

        public ConnectionTracker() {
            for (Event.Type type : Event.Type.values()) {
                this.groups.put(type, (ChannelGroup)new DefaultChannelGroup(type.toString(), (EventExecutor)GlobalEventExecutor.INSTANCE));
            }
        }

        @Override
        public void addConnection(Channel ch, Connection connection) {
            this.allChannels.add((Object)ch);
        }

        public void register(Event.Type type, Channel ch) {
            this.groups.get((Object)type).add((Object)ch);
        }

        public void unregister(Channel ch) {
            for (ChannelGroup group : this.groups.values()) {
                group.remove((Object)ch);
            }
        }

        public void send(Event event) {
            this.groups.get((Object)event.type).writeAndFlush((Object)new EventMessage(event));
        }

        @Override
        public void closeAll() {
            this.allChannels.close().awaitUninterruptibly();
        }

        public int getConnectedClients() {
            return this.allChannels.size() != 0 ? this.allChannels.size() - 1 : 0;
        }
    }
}

