/*
 * Decompiled with CFR 0.152.
 */
package org.kth.dks.dks_comm;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.log4j.Logger;
import org.kth.dks.dks_comm.ConnMessageTypes;
import org.kth.dks.dks_comm.ConnectionHandler;
import org.kth.dks.dks_comm.DKSNetAddress;
import org.kth.dks.dks_comm.ListenerNB;
import org.kth.dks.dks_marshal.BootstrapMsg;
import org.kth.dks.dks_marshal.DKSMarshal;
import org.kth.dks.dks_marshal.MsgSrcDestWrapper;

public class ConnHandlerOutNB
implements Runnable {
    private static Logger log = Logger.getLogger(ConnHandlerOutNB.class);
    Status state = Status.INIT;
    private SocketChannel sock;
    private static final int BUFFSIZE = 4096;
    private final int MAXSEND = 100;
    private List msgQueue = Collections.synchronizedList(new LinkedList());
    private Map unackedMsgs = Collections.synchronizedMap(new HashMap());
    private double rtt = 1000.0;
    private static final double DEFAULTRTT = 1000.0;
    private static final double RTOFACTOR = 20.0;
    private boolean finish = false;
    private ByteBuffer buff = ByteBuffer.allocateDirect(4096);
    private DKSMarshal marshal = null;
    private ListenerNB listener = null;
    private SelectionKey key = null;
    private DKSNetAddress remoteNetAddr = null;
    private ConnectionHandler connHandler = null;
    private OutMsgElement currMsg = null;
    private byte[] varBuffArr = null;
    private int varBuffArrPos = 0;
    private int msgNo = 0;

    public ConnHandlerOutNB(ListenerNB l, SocketChannel s, DKSMarshal dm, ConnectionHandler ch) {
        this.sock = s;
        this.marshal = dm;
        this.listener = l;
        this.connHandler = ch;
        this.buff.order(ByteOrder.BIG_ENDIAN);
        this.buff.limit(0);
    }

    public static synchronized SocketChannel createConnection(DKSNetAddress na) {
        try {
            log.debug((Object)"Entering createConnection");
            SocketChannel sock = SocketChannel.open();
            sock.configureBlocking(false);
            boolean finish = sock.connect(new InetSocketAddress(na.getIP(), na.getPort()));
            log.debug((Object)("CONNECTED TO " + na.getIP() + ":" + na.getPort()));
            return sock;
        }
        catch (IOException ex) {
            ex.printStackTrace();
            log.warn((Object)"Returning null");
            return null;
        }
    }

    public synchronized void connected() {
        log.debug((Object)"Presenting myself to endpoint");
        DKSNetAddress myNA = new DKSNetAddress(this.listener.getHostAddress(), this.listener.getLocalPort());
        this.addElement(new OutMsgElement(myNA, ConnMessageTypes.PRESENT_MSG, null));
    }

    public synchronized void setRemoteAddress(DKSNetAddress netAdr) {
        this.remoteNetAddr = netAdr;
    }

    private void interestedWrite() {
        if (this.key != null) {
            this.key.interestOps(this.key.interestOps() | 4);
            this.key.selector().wakeup();
        }
    }

    private void interestedWriteNot() {
        if (this.key != null) {
            this.key.interestOps(this.key.interestOps() & 0xFFFFFFFB);
        }
    }

    public synchronized void setKey(SelectionKey key) {
        this.key = key;
    }

    private boolean initH() {
        this.currMsg = this.getNextElement();
        if (this.currMsg == null) {
            return false;
        }
        this.state = Status.SENDTYPE;
        return true;
    }

    private boolean sendtypeH() {
        if (this.buff.remaining() < 1) {
            return false;
        }
        this.buff.put(this.currMsg.type.toByte());
        if (this.currMsg.type == ConnMessageTypes.CONTENTS_MSG) {
            this.bufferUnacked();
            this.state = Status.CONT_TRANS;
        } else if (this.currMsg.type == ConnMessageTypes.PRESENT_MSG) {
            BootstrapMsg m = new BootstrapMsg(this.currMsg.src);
            this.varBuffArr = m.flatten();
            this.state = Status.WRITE_ARRLEN;
        } else if (this.currMsg.type == ConnMessageTypes.ACK_MSG) {
            this.state = Status.ACK;
        }
        return true;
    }

    private boolean ackH() {
        if (this.buff.remaining() < 4) {
            return false;
        }
        this.buff.putInt((Integer)this.currMsg.load);
        this.state = Status.INIT;
        return true;
    }

    private boolean cont_transH() {
        if (this.buff.remaining() < 1) {
            return false;
        }
        this.buff.put((byte)1);
        this.state = Status.CONT_SEQNR;
        return true;
    }

    private boolean cont_seqnrH() {
        if (this.buff.remaining() < 4) {
            return false;
        }
        this.buff.putInt(this.msgNo);
        MsgSrcDestWrapper mesg = (MsgSrcDestWrapper)this.currMsg.load;
        this.varBuffArr = this.marshal.marshalMsgSrcDestWrapper(mesg);
        this.state = Status.WRITE_ARRLEN;
        return true;
    }

    private boolean write_arrlenH() {
        if (this.buff.remaining() < 4) {
            return false;
        }
        this.buff.putInt(this.varBuffArr.length);
        this.state = Status.WRITE_ARR;
        return true;
    }

    private boolean write_arrH() {
        int act = Math.min(this.buff.remaining(), this.varBuffArr.length - this.varBuffArrPos);
        this.buff.put(this.varBuffArr, this.varBuffArrPos, act);
        this.varBuffArrPos += act;
        if (this.varBuffArrPos == this.varBuffArr.length) {
            this.state = Status.INIT;
            this.varBuffArrPos = 0;
            this.varBuffArr = null;
            return true;
        }
        return false;
    }

    private void stateMachine() {
        this.buff.compact();
        boolean cont = true;
        while (cont) {
            if (this.state == Status.INIT) {
                cont = this.initH();
                continue;
            }
            if (this.state == Status.SENDTYPE) {
                cont = this.sendtypeH();
                continue;
            }
            if (this.state == Status.ACK) {
                cont = this.ackH();
                continue;
            }
            if (this.state == Status.CONT_TRANS) {
                cont = this.cont_transH();
                continue;
            }
            if (this.state == Status.CONT_SEQNR) {
                cont = this.cont_seqnrH();
                continue;
            }
            if (this.state == Status.WRITE_ARRLEN) {
                cont = this.write_arrlenH();
                continue;
            }
            if (this.state != Status.WRITE_ARR) continue;
            cont = this.write_arrH();
        }
        this.buff.flip();
    }

    @Override
    public void run() {
        this.write();
    }

    private synchronized void write() {
        int wrSize = 1;
        try {
            this.stateMachine();
            while (wrSize > 0 && this.buff.hasRemaining()) {
                wrSize = this.sock.write(this.buff);
                this.connHandler.statAddBytesSent(wrSize);
                if (this.buff.hasRemaining()) continue;
                this.stateMachine();
            }
        }
        catch (IOException ex) {
            ex.printStackTrace();
        }
        if (wrSize == 0 && this.buff.hasRemaining()) {
            this.interestedWrite();
        } else {
            this.interestedWriteNot();
        }
        if (wrSize < 0) {
            this.listener.connectionClosed(this.key, wrSize, this.remoteNetAddr);
        }
    }

    synchronized void end() {
        this.finish = true;
        this.notifyAll();
    }

    private synchronized OutMsgElement getNextElement() {
        if (this.msgQueue.isEmpty()) {
            return null;
        }
        return this.finish ? null : (OutMsgElement)this.msgQueue.remove(0);
    }

    private synchronized void addElement(OutMsgElement ele) {
        while (this.msgQueue.size() >= 100) {
            try {
                log.error((Object)"Waiting because outgoing buffer full");
                this.wait();
            }
            catch (Exception ex) {
                log.error((Object)(ex + ""));
            }
        }
        this.msgQueue.add(ele);
        this.interestedWrite();
        if (this.key != null) {
            this.key.selector().wakeup();
        }
    }

    private void bufferUnacked() {
        ++this.msgNo;
        MsgSrcDestWrapper mesg = (MsgSrcDestWrapper)this.currMsg.load;
        this.unackedMsgs.put(new Integer(this.msgNo), new UnackedMsg(mesg));
        this.connHandler.statAddMsgsUnacked(1);
    }

    public void sendAck(int msgId) {
        this.addElement(new OutMsgElement(null, ConnMessageTypes.ACK_MSG, new Integer(msgId)));
    }

    public void ackReceived(int msgId) {
        UnackedMsg msg = (UnackedMsg)this.unackedMsgs.remove(new Integer(msgId));
        if (msg != null) {
            long newRTT = msg.calculateRTT();
            if (newRTT > 0L) {
                double alpha = 0.8;
                this.rtt = this.rtt * alpha + (double)newRTT * (1.0 - alpha);
                log.debug((Object)("AVG=" + this.rtt + " NEW=" + (double)newRTT));
            }
        } else {
            log.warn((Object)("Received an ack for a message that does not exists :" + msgId));
        }
        this.connHandler.statAddMsgsUnacked(-1);
    }

    public void sendMessage(DKSNetAddress src, Object msg) {
        this.addElement(new OutMsgElement(src, ConnMessageTypes.CONTENTS_MSG, msg));
        this.listener.threadPool.addJob(this);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void checkTimeouts() {
        Map map = this.unackedMsgs;
        synchronized (map) {
            Iterator it = this.unackedMsgs.values().iterator();
            while (it.hasNext()) {
                UnackedMsg msg = (UnackedMsg)it.next();
                double elapsed = new Date().getTime() - msg.rttStart;
                if (!(elapsed > 20.0 * this.rtt)) continue;
                it.remove();
                log.warn((Object)("Timeout after " + elapsed + "msec with avg rtt=" + this.rtt));
                this.connHandler.statAddMsgsFailed(1);
                this.marshal.failureHandler(msg.msg);
            }
        }
    }

    private static class Status {
        public static final Status INIT = new Status("INIT");
        public static final Status SENDTYPE = new Status("SENDTYPE");
        public static final Status ACK = new Status("ACK");
        public static final Status CONT_TRANS = new Status("CONT_TRANS");
        public static final Status CONT_SEQNR = new Status("CONT_SEQNR");
        public static final Status WRITE_ARRLEN = new Status("WRITE_ARRLEN");
        public static final Status WRITE_ARR = new Status("WRITE_ARR");
        private final String name;

        private Status(String s) {
            this.name = s;
        }

        public String toString() {
            return this.name;
        }
    }

    class UnackedMsg {
        private MsgSrcDestWrapper msg;
        private long rttStart = 0L;

        UnackedMsg(MsgSrcDestWrapper msg) {
            this.msg = msg;
            this.rttStart = new Date().getTime();
        }

        long calculateRTT() {
            long rttEnd = new Date().getTime();
            long newRTT = rttEnd - this.rttStart;
            this.rttStart = 0L;
            return newRTT;
        }
    }

    class OutMsgElement {
        public ConnMessageTypes type;
        public Object load;
        public DKSNetAddress src;

        public OutMsgElement(DKSNetAddress _src, ConnMessageTypes type, Object load) {
            this.src = _src;
            this.type = type;
            this.load = load;
        }
    }
}

