package org.kth.dks.dks_comm;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.log4j.Logger;
import org.kth.dks.dks_marshal.BootstrapMsg;
import org.kth.dks.dks_marshal.DKSMarshal;
import org.kth.dks.dks_marshal.MsgSrcDestWrapper;

/* loaded from: input_file:org/kth/dks/dks_comm/ConnHandlerOutNB.class */
public class ConnHandlerOutNB implements Runnable {
    private static Logger log = Logger.getLogger(ConnHandlerOutNB.class);
    private SocketChannel sock;
    private static final int BUFFSIZE = 4096;
    private static final double DEFAULTRTT = 1000.0d;
    private static final double RTOFACTOR = 20.0d;
    private DKSMarshal marshal;
    private ListenerNB listener;
    private ConnectionHandler connHandler;
    Status state = Status.INIT;
    private final int MAXSEND = 100;
    private List msgQueue = Collections.synchronizedList(new LinkedList());
    private Map unackedMsgs = Collections.synchronizedMap(new HashMap());
    private double rtt = DEFAULTRTT;
    private boolean finish = false;
    private ByteBuffer buff = ByteBuffer.allocateDirect(BUFFSIZE);
    private SelectionKey key = null;
    private DKSNetAddress remoteNetAddr = null;
    private OutMsgElement currMsg = null;
    private byte[] varBuffArr = null;
    private int varBuffArrPos = 0;
    private int msgNo = 0;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/kth/dks/dks_comm/ConnHandlerOutNB$OutMsgElement.class */
    public class OutMsgElement {
        public ConnMessageTypes type;
        public Object load;
        public DKSNetAddress src;

        public OutMsgElement(DKSNetAddress dKSNetAddress, ConnMessageTypes connMessageTypes, Object obj) {
            this.src = dKSNetAddress;
            this.type = connMessageTypes;
            this.load = obj;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/kth/dks/dks_comm/ConnHandlerOutNB$Status.class */
    public static class Status {
        public static final Status INIT = new Status("INIT");
        public static final Status SENDTYPE = new Status("SENDTYPE");
        public static final Status ACK = new Status("ACK");
        public static final Status CONT_TRANS = new Status("CONT_TRANS");
        public static final Status CONT_SEQNR = new Status("CONT_SEQNR");
        public static final Status WRITE_ARRLEN = new Status("WRITE_ARRLEN");
        public static final Status WRITE_ARR = new Status("WRITE_ARR");
        private final String name;

        private Status(String str) {
            this.name = str;
        }

        public String toString() {
            return this.name;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/kth/dks/dks_comm/ConnHandlerOutNB$UnackedMsg.class */
    public class UnackedMsg {
        private MsgSrcDestWrapper msg;
        private long rttStart;

        UnackedMsg(MsgSrcDestWrapper msgSrcDestWrapper) {
            this.rttStart = 0L;
            this.msg = msgSrcDestWrapper;
            this.rttStart = new Date().getTime();
        }

        long calculateRTT() {
            long time = new Date().getTime() - this.rttStart;
            this.rttStart = 0L;
            return time;
        }
    }

    public ConnHandlerOutNB(ListenerNB listenerNB, SocketChannel socketChannel, DKSMarshal dKSMarshal, ConnectionHandler connectionHandler) {
        this.marshal = null;
        this.listener = null;
        this.connHandler = null;
        this.sock = socketChannel;
        this.marshal = dKSMarshal;
        this.listener = listenerNB;
        this.connHandler = connectionHandler;
        this.buff.order(ByteOrder.BIG_ENDIAN);
        this.buff.limit(0);
    }

    public static synchronized SocketChannel createConnection(DKSNetAddress dKSNetAddress) {
        try {
            log.debug("Entering createConnection");
            SocketChannel open = SocketChannel.open();
            open.configureBlocking(false);
            open.connect(new InetSocketAddress(dKSNetAddress.getIP(), dKSNetAddress.getPort()));
            log.debug("CONNECTED TO " + dKSNetAddress.getIP() + ":" + dKSNetAddress.getPort());
            return open;
        } catch (IOException e) {
            e.printStackTrace();
            log.warn("Returning null");
            return null;
        }
    }

    public synchronized void connected() {
        log.debug("Presenting myself to endpoint");
        addElement(new OutMsgElement(new DKSNetAddress(this.listener.getHostAddress(), this.listener.getLocalPort()), ConnMessageTypes.PRESENT_MSG, null));
    }

    public synchronized void setRemoteAddress(DKSNetAddress dKSNetAddress) {
        this.remoteNetAddr = dKSNetAddress;
    }

    private void interestedWrite() {
        if (this.key != null) {
            this.key.interestOps(this.key.interestOps() | 4);
            this.key.selector().wakeup();
        }
    }

    private void interestedWriteNot() {
        if (this.key != null) {
            this.key.interestOps(this.key.interestOps() & (-5));
        }
    }

    public synchronized void setKey(SelectionKey selectionKey) {
        this.key = selectionKey;
    }

    private boolean initH() {
        this.currMsg = getNextElement();
        if (this.currMsg == null) {
            return false;
        }
        this.state = Status.SENDTYPE;
        return true;
    }

    private boolean sendtypeH() {
        if (this.buff.remaining() < 1) {
            return false;
        }
        this.buff.put(this.currMsg.type.toByte());
        if (this.currMsg.type == ConnMessageTypes.CONTENTS_MSG) {
            bufferUnacked();
            this.state = Status.CONT_TRANS;
            return true;
        }
        if (this.currMsg.type == ConnMessageTypes.PRESENT_MSG) {
            this.varBuffArr = new BootstrapMsg(this.currMsg.src).flatten();
            this.state = Status.WRITE_ARRLEN;
            return true;
        }
        if (this.currMsg.type != ConnMessageTypes.ACK_MSG) {
            return true;
        }
        this.state = Status.ACK;
        return true;
    }

    private boolean ackH() {
        if (this.buff.remaining() < 4) {
            return false;
        }
        this.buff.putInt(((Integer) this.currMsg.load).intValue());
        this.state = Status.INIT;
        return true;
    }

    private boolean cont_transH() {
        if (this.buff.remaining() < 1) {
            return false;
        }
        ByteBuffer byteBuffer = this.buff;
        DKSMarshal dKSMarshal = this.marshal;
        byteBuffer.put((byte) 1);
        this.state = Status.CONT_SEQNR;
        return true;
    }

    private boolean cont_seqnrH() {
        if (this.buff.remaining() < 4) {
            return false;
        }
        this.buff.putInt(this.msgNo);
        this.varBuffArr = this.marshal.marshalMsgSrcDestWrapper((MsgSrcDestWrapper) this.currMsg.load);
        this.state = Status.WRITE_ARRLEN;
        return true;
    }

    private boolean write_arrlenH() {
        if (this.buff.remaining() < 4) {
            return false;
        }
        this.buff.putInt(this.varBuffArr.length);
        this.state = Status.WRITE_ARR;
        return true;
    }

    private boolean write_arrH() {
        int min = Math.min(this.buff.remaining(), this.varBuffArr.length - this.varBuffArrPos);
        this.buff.put(this.varBuffArr, this.varBuffArrPos, min);
        this.varBuffArrPos += min;
        if (this.varBuffArrPos != this.varBuffArr.length) {
            return false;
        }
        this.state = Status.INIT;
        this.varBuffArrPos = 0;
        this.varBuffArr = null;
        return true;
    }

    private void stateMachine() {
        this.buff.compact();
        boolean z = true;
        while (z) {
            if (this.state == Status.INIT) {
                z = initH();
            } else if (this.state == Status.SENDTYPE) {
                z = sendtypeH();
            } else if (this.state == Status.ACK) {
                z = ackH();
            } else if (this.state == Status.CONT_TRANS) {
                z = cont_transH();
            } else if (this.state == Status.CONT_SEQNR) {
                z = cont_seqnrH();
            } else if (this.state == Status.WRITE_ARRLEN) {
                z = write_arrlenH();
            } else if (this.state == Status.WRITE_ARR) {
                z = write_arrH();
            }
        }
        this.buff.flip();
    }

    @Override // java.lang.Runnable
    public void run() {
        write();
    }

    private synchronized void write() {
        int i = 1;
        try {
            stateMachine();
            while (i > 0) {
                if (!this.buff.hasRemaining()) {
                    break;
                }
                i = this.sock.write(this.buff);
                this.connHandler.statAddBytesSent(i);
                if (!this.buff.hasRemaining()) {
                    stateMachine();
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        if (i == 0 && this.buff.hasRemaining()) {
            interestedWrite();
        } else {
            interestedWriteNot();
        }
        if (i < 0) {
            this.listener.connectionClosed(this.key, i, this.remoteNetAddr);
        }
    }

    synchronized void end() {
        this.finish = true;
        notifyAll();
    }

    private synchronized OutMsgElement getNextElement() {
        if (this.msgQueue.isEmpty() || this.finish) {
            return null;
        }
        return (OutMsgElement) this.msgQueue.remove(0);
    }

    private synchronized void addElement(OutMsgElement outMsgElement) {
        while (this.msgQueue.size() >= 100) {
            try {
                log.error("Waiting because outgoing buffer full");
                wait();
            } catch (Exception e) {
                log.error(e + "");
            }
        }
        this.msgQueue.add(outMsgElement);
        interestedWrite();
        if (this.key != null) {
            this.key.selector().wakeup();
        }
    }

    private void bufferUnacked() {
        this.msgNo++;
        this.unackedMsgs.put(new Integer(this.msgNo), new UnackedMsg((MsgSrcDestWrapper) this.currMsg.load));
        this.connHandler.statAddMsgsUnacked(1);
    }

    public void sendAck(int i) {
        addElement(new OutMsgElement(null, ConnMessageTypes.ACK_MSG, new Integer(i)));
    }

    public void ackReceived(int i) {
        UnackedMsg unackedMsg = (UnackedMsg) this.unackedMsgs.remove(new Integer(i));
        if (unackedMsg != null) {
            long calculateRTT = unackedMsg.calculateRTT();
            if (calculateRTT > 0) {
                this.rtt = (this.rtt * 0.8d) + (calculateRTT * (1.0d - 0.8d));
                log.debug("AVG=" + this.rtt + " NEW=" + calculateRTT);
            }
        } else {
            log.warn("Received an ack for a message that does not exists :" + i);
        }
        this.connHandler.statAddMsgsUnacked(-1);
    }

    public void sendMessage(DKSNetAddress dKSNetAddress, Object obj) {
        addElement(new OutMsgElement(dKSNetAddress, ConnMessageTypes.CONTENTS_MSG, obj));
        this.listener.threadPool.addJob(this);
    }

    public synchronized void checkTimeouts() {
        synchronized (this.unackedMsgs) {
            Iterator it = this.unackedMsgs.values().iterator();
            while (it.hasNext()) {
                UnackedMsg unackedMsg = (UnackedMsg) it.next();
                double time = new Date().getTime() - unackedMsg.rttStart;
                if (time > RTOFACTOR * this.rtt) {
                    it.remove();
                    log.warn("Timeout after " + time + "msec with avg rtt=" + this.rtt);
                    this.connHandler.statAddMsgsFailed(1);
                    this.marshal.failureHandler(unackedMsg.msg);
                }
            }
        }
    }
}
