/*
 * Decompiled with CFR 0.152.
 */
package org.kth.dks.dks_comm;

import java.io.DataOutputStream;
import java.io.IOException;
import java.net.Socket;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import org.apache.log4j.Logger;
import org.kth.dks.dks_comm.ConnMessageMiscs;
import org.kth.dks.dks_comm.ConnMessageTypes;
import org.kth.dks.dks_comm.ConnectionHandler;
import org.kth.dks.dks_comm.DKSNetAddress;
import org.kth.dks.dks_exceptions.DKSRefNoResponse;
import org.kth.dks.dks_marshal.BootstrapMsg;
import org.kth.dks.dks_marshal.DKSMarshal;
import org.kth.dks.dks_marshal.MsgSrcDestWrapper;

class ConnHandlerOut
extends Thread {
    private Logger log = Logger.getLogger(ConnHandlerOut.class);
    private final int MAXSEND = 25;
    private DKSNetAddress destNode = null;
    private List msgQueue = Collections.synchronizedList(new LinkedList());
    private Map unackedMsgs = Collections.synchronizedMap(new HashMap());
    private final int SOCKETTIMEOUT = 0;
    private boolean finish = false;
    private ConnectionHandler connHandler = null;
    private DKSMarshal marshal;
    private Socket socket = null;
    DataOutputStream outStream = null;
    private double rtt = 1000.0;
    private static final double DEFAULTRTT = 1000.0;
    private static final double RTOFACTOR = 7.0;
    private static final double MAXRTO = 20000.0;
    private static final double MINRTO = 1000.0;
    private int msgNo = 0;
    private final Timer rttTimer;
    private final TimeoutDetector timeDetect = new TimeoutDetector();

    public ConnHandlerOut(ConnectionHandler ch, DKSNetAddress src, DKSNetAddress dest) throws DKSRefNoResponse {
        this.log.info((Object)("Creating new sender -- connecting " + dest));
        this.connHandler = ch;
        this.marshal = this.connHandler.getDKSMarshal();
        this.destNode = dest;
        try {
            this.establishConnection();
        }
        catch (IOException ex) {
            throw new DKSRefNoResponse(dest);
        }
        this.addElement(new OutMsgElement(src, ConnMessageTypes.PRESENT_MSG, null));
        this.setName(ConnHandlerOut.class.getName() + "(" + dest + ")");
        this.start();
        this.rttTimer = new Timer(ConnHandlerOut.class.getName() + ".RTTTimer");
        this.rttTimer.schedule((TimerTask)this.timeDetect, 0L, 4000L);
    }

    public ConnHandlerOut(ConnectionHandler ch, DKSNetAddress dest, Socket soc) {
        this.log.info((Object)("Creating new sender -- accepting " + dest));
        this.connHandler = ch;
        this.destNode = dest;
        this.socket = soc;
        try {
            this.acceptConnection();
        }
        catch (IOException ex) {
            this.log.error((Object)("Couldn't create connection to " + dest + "\n" + ex));
        }
        this.setName(ConnHandlerOut.class.getName() + "(" + dest + ")");
        this.start();
        this.rttTimer = new Timer(ConnHandlerOut.class.getName() + ".RTTTimer");
        this.rttTimer.schedule((TimerTask)this.timeDetect, 0L, 4000L);
    }

    public Socket getSocket() {
        return this.socket;
    }

    public void sendMessage(DKSNetAddress src, Object msg) {
        this.log.info((Object)("Sending msg  to " + this.destNode));
        this.addElement(new OutMsgElement(src, ConnMessageTypes.CONTENTS_MSG, msg));
    }

    public void sendAck(int msgId) {
        this.log.info((Object)("Sending ack " + msgId + " to " + this.destNode));
        this.addElement(new OutMsgElement(null, ConnMessageTypes.ACK_MSG, new Integer(msgId)));
    }

    public void ackReceived(int msgId) {
        this.log.info((Object)("Received ack " + msgId + " from " + this.destNode));
        UnackedMsg msg = (UnackedMsg)this.unackedMsgs.remove(new Integer(msgId));
        if (msg != null) {
            long newRTT = msg.calculateRTT();
            if (newRTT > 0L) {
                double alpha = 0.8;
                this.rtt = this.rtt * alpha + (double)newRTT * (1.0 - alpha);
                this.log.debug((Object)("AVG=" + this.rtt + " NEW=" + (double)newRTT));
            }
        } else {
            this.log.warn((Object)("Received an ack for a message that does not exists :" + msgId));
        }
    }

    public int getUnackedMsgs() {
        return this.unackedMsgs.size();
    }

    synchronized void end() {
        this.finish = true;
        this.notifyAll();
    }

    @Override
    public void run() {
        this.log.info((Object)("Creating sender " + this.destNode));
        while (!this.finish) {
            this.deliverMessage();
            Thread.yield();
        }
        this.log.info((Object)("Closing ConnHandlerOut to node " + this.destNode));
    }

    private synchronized void deliverMessage() {
        OutMsgElement msgEle = this.getNextElement();
        if (msgEle == null) {
            return;
        }
        this.log.info((Object)("Delivering message of type " + msgEle.type + " to " + this.destNode));
        try {
            MsgSrcDestWrapper mesg;
            if (msgEle.type == ConnMessageTypes.CONTENTS_MSG) {
                ++this.msgNo;
                mesg = (MsgSrcDestWrapper)msgEle.load;
                this.unackedMsgs.put(new Integer(this.msgNo), new UnackedMsg(mesg));
            }
            this.outStream.writeByte(msgEle.type.toByte());
            if (msgEle.type == ConnMessageTypes.CONTENTS_MSG) {
                mesg = (MsgSrcDestWrapper)msgEle.load;
                this.outStream.writeByte(1);
                this.outStream.writeInt(this.msgNo);
                byte[] out_data = this.marshal.marshalMsgSrcDestWrapper(mesg);
                ConnMessageMiscs.m_writeDataBlock(out_data, this.outStream);
                this.connHandler.statAddBytesSent(out_data.length);
                this.log.info((Object)("msg " + this.msgNo + " is sent"));
            } else if (msgEle.type == ConnMessageTypes.PRESENT_MSG) {
                BootstrapMsg m = new BootstrapMsg(msgEle.src);
                ConnMessageMiscs.m_writeDataBlock(m.flatten(), this.outStream);
            } else if (msgEle.type == ConnMessageTypes.ACK_MSG) {
                this.outStream.writeInt((Integer)msgEle.load);
            }
            this.outStream.flush();
        }
        catch (IOException ioexp) {
            this.finish = true;
            this.log.info((Object)("Closing ConnHandlerOut " + this.destNode + "\n" + ioexp));
        }
        catch (Exception ex) {
            this.finish = true;
            this.log.info((Object)("Closing ConnHandlerOut " + this.destNode + "\n" + ex));
        }
        catch (StackOverflowError er) {
            this.log.error((Object)("Stack Overflow:" + er));
        }
    }

    private void acceptConnection() throws IOException {
        this.socket.setSoTimeout(0);
        this.outStream = new DataOutputStream(this.socket.getOutputStream());
    }

    private void establishConnection() throws IOException {
        this.socket = new Socket(this.destNode.getIP(), this.destNode.getPort());
        this.socket.setSoTimeout(0);
        this.outStream = new DataOutputStream(this.socket.getOutputStream());
    }

    private synchronized OutMsgElement getNextElement() {
        try {
            while (this.msgQueue.isEmpty() && !this.finish) {
                this.wait();
            }
            if (this.msgQueue.size() == 25) {
                this.notify();
            }
            return this.finish ? null : (OutMsgElement)this.msgQueue.remove(0);
        }
        catch (InterruptedException iexp) {
            this.end();
            return null;
        }
    }

    private synchronized void addElement(OutMsgElement ele) {
        while (this.msgQueue.size() >= 25) {
            try {
                this.wait();
            }
            catch (Exception ex) {
                this.log.error((Object)(ex + ""));
            }
        }
        this.msgQueue.add(ele);
        if (this.msgQueue.size() == 1) {
            this.notify();
        }
    }

    public static double getDefaultRTT() {
        return 1000.0;
    }

    public static double getDefaultRTO() {
        return 7000.0;
    }

    public double getRTT() {
        return this.rtt;
    }

    public double getRTO() {
        return this.rtt * 7.0;
    }

    class UnackedMsg {
        private MsgSrcDestWrapper msg;
        private long rttStart = 0L;

        UnackedMsg(MsgSrcDestWrapper msg) {
            this.msg = msg;
            this.rttStart = new Date().getTime();
        }

        long calculateRTT() {
            long rttEnd = new Date().getTime();
            long newRTT = rttEnd - this.rttStart;
            this.rttStart = 0L;
            return newRTT;
        }
    }

    class OutMsgElement {
        public ConnMessageTypes type;
        public Object load;
        public DKSNetAddress src;

        public OutMsgElement(DKSNetAddress _src, ConnMessageTypes type, Object load) {
            this.src = _src;
            this.type = type;
            this.load = load;
        }
    }

    private final class TimeoutDetector
    extends TimerTask {
        private TimeoutDetector() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            if (ConnHandlerOut.this.finish && ConnHandlerOut.this.unackedMsgs.isEmpty()) {
                ConnHandlerOut.this.rttTimer.cancel();
            }
            Map map = ConnHandlerOut.this.unackedMsgs;
            synchronized (map) {
                Iterator it = ConnHandlerOut.this.unackedMsgs.values().iterator();
                while (it.hasNext()) {
                    UnackedMsg msg = (UnackedMsg)it.next();
                    double elapsed = new Date().getTime() - msg.rttStart;
                    if (!(elapsed > 1000.0) || !(elapsed > 7.0 * ConnHandlerOut.this.rtt) && !(elapsed > 20000.0)) continue;
                    it.remove();
                    ConnHandlerOut.this.log.warn((Object)("Timeout after " + elapsed + "msec with avg rtt=" + ConnHandlerOut.this.rtt + " on socket " + ConnHandlerOut.this.destNode));
                    ConnHandlerOut.this.marshal.failureHandler(msg.msg);
                }
            }
        }
    }
}

