package org.kth.dks.dks_comm;

import java.io.DataOutputStream;
import java.io.IOException;
import java.net.Socket;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import org.apache.log4j.Logger;
import org.kth.dks.dks_exceptions.DKSRefNoResponse;
import org.kth.dks.dks_marshal.BootstrapMsg;
import org.kth.dks.dks_marshal.DKSMarshal;
import org.kth.dks.dks_marshal.MsgSrcDestWrapper;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/kth/dks/dks_comm/ConnHandlerOut.class */
public class ConnHandlerOut extends Thread {
    private Logger log;
    private final int MAXSEND = 25;
    private DKSNetAddress destNode;
    private List msgQueue;
    private Map unackedMsgs;
    private final int SOCKETTIMEOUT = 0;
    private boolean finish;
    private ConnectionHandler connHandler;
    private DKSMarshal marshal;
    private Socket socket;
    DataOutputStream outStream;
    private double rtt;
    private static final double DEFAULTRTT = 1000.0d;
    private static final double RTOFACTOR = 7.0d;
    private static final double MAXRTO = 20000.0d;
    private static final double MINRTO = 1000.0d;
    private int msgNo;
    private final Timer rttTimer;
    private final TimeoutDetector timeDetect;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/kth/dks/dks_comm/ConnHandlerOut$OutMsgElement.class */
    public class OutMsgElement {
        public ConnMessageTypes type;
        public Object load;
        public DKSNetAddress src;

        public OutMsgElement(DKSNetAddress dKSNetAddress, ConnMessageTypes connMessageTypes, Object obj) {
            this.src = dKSNetAddress;
            this.type = connMessageTypes;
            this.load = obj;
        }
    }

    /* loaded from: input_file:org/kth/dks/dks_comm/ConnHandlerOut$TimeoutDetector.class */
    private final class TimeoutDetector extends TimerTask {
        private TimeoutDetector() {
        }

        @Override // java.util.TimerTask, java.lang.Runnable
        public void run() {
            if (ConnHandlerOut.this.finish && ConnHandlerOut.this.unackedMsgs.isEmpty()) {
                ConnHandlerOut.this.rttTimer.cancel();
            }
            synchronized (ConnHandlerOut.this.unackedMsgs) {
                Iterator it = ConnHandlerOut.this.unackedMsgs.values().iterator();
                while (it.hasNext()) {
                    UnackedMsg unackedMsg = (UnackedMsg) it.next();
                    double time = new Date().getTime() - unackedMsg.rttStart;
                    if (time > 1000.0d && (time > ConnHandlerOut.RTOFACTOR * ConnHandlerOut.this.rtt || time > ConnHandlerOut.MAXRTO)) {
                        it.remove();
                        ConnHandlerOut.this.log.warn("Timeout after " + time + "msec with avg rtt=" + ConnHandlerOut.this.rtt + " on socket " + ConnHandlerOut.this.destNode);
                        ConnHandlerOut.this.marshal.failureHandler(unackedMsg.msg);
                    }
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/kth/dks/dks_comm/ConnHandlerOut$UnackedMsg.class */
    public class UnackedMsg {
        private MsgSrcDestWrapper msg;
        private long rttStart;

        UnackedMsg(MsgSrcDestWrapper msgSrcDestWrapper) {
            this.rttStart = 0L;
            this.msg = msgSrcDestWrapper;
            this.rttStart = new Date().getTime();
        }

        long calculateRTT() {
            long time = new Date().getTime() - this.rttStart;
            this.rttStart = 0L;
            return time;
        }
    }

    public ConnHandlerOut(ConnectionHandler connectionHandler, DKSNetAddress dKSNetAddress, DKSNetAddress dKSNetAddress2) throws DKSRefNoResponse {
        this.log = Logger.getLogger(ConnHandlerOut.class);
        this.MAXSEND = 25;
        this.destNode = null;
        this.msgQueue = Collections.synchronizedList(new LinkedList());
        this.unackedMsgs = Collections.synchronizedMap(new HashMap());
        this.SOCKETTIMEOUT = 0;
        this.finish = false;
        this.connHandler = null;
        this.socket = null;
        this.outStream = null;
        this.rtt = 1000.0d;
        this.msgNo = 0;
        this.timeDetect = new TimeoutDetector();
        this.log.info("Creating new sender -- connecting " + dKSNetAddress2);
        this.connHandler = connectionHandler;
        this.marshal = this.connHandler.getDKSMarshal();
        this.destNode = dKSNetAddress2;
        try {
            establishConnection();
            addElement(new OutMsgElement(dKSNetAddress, ConnMessageTypes.PRESENT_MSG, null));
            setName(ConnHandlerOut.class.getName() + "(" + dKSNetAddress2 + ")");
            start();
            this.rttTimer = new Timer(ConnHandlerOut.class.getName() + ".RTTTimer");
            this.rttTimer.schedule(this.timeDetect, 0L, 4000L);
        } catch (IOException e) {
            throw new DKSRefNoResponse(dKSNetAddress2);
        }
    }

    public ConnHandlerOut(ConnectionHandler connectionHandler, DKSNetAddress dKSNetAddress, Socket socket) {
        this.log = Logger.getLogger(ConnHandlerOut.class);
        this.MAXSEND = 25;
        this.destNode = null;
        this.msgQueue = Collections.synchronizedList(new LinkedList());
        this.unackedMsgs = Collections.synchronizedMap(new HashMap());
        this.SOCKETTIMEOUT = 0;
        this.finish = false;
        this.connHandler = null;
        this.socket = null;
        this.outStream = null;
        this.rtt = 1000.0d;
        this.msgNo = 0;
        this.timeDetect = new TimeoutDetector();
        this.log.info("Creating new sender -- accepting " + dKSNetAddress);
        this.connHandler = connectionHandler;
        this.destNode = dKSNetAddress;
        this.socket = socket;
        try {
            acceptConnection();
        } catch (IOException e) {
            this.log.error("Couldn't create connection to " + dKSNetAddress + "\n" + e);
        }
        setName(ConnHandlerOut.class.getName() + "(" + dKSNetAddress + ")");
        start();
        this.rttTimer = new Timer(ConnHandlerOut.class.getName() + ".RTTTimer");
        this.rttTimer.schedule(this.timeDetect, 0L, 4000L);
    }

    public Socket getSocket() {
        return this.socket;
    }

    public void sendMessage(DKSNetAddress dKSNetAddress, Object obj) {
        this.log.info("Sending msg  to " + this.destNode);
        addElement(new OutMsgElement(dKSNetAddress, ConnMessageTypes.CONTENTS_MSG, obj));
    }

    public void sendAck(int i) {
        this.log.info("Sending ack " + i + " to " + this.destNode);
        addElement(new OutMsgElement(null, ConnMessageTypes.ACK_MSG, new Integer(i)));
    }

    public void ackReceived(int i) {
        this.log.info("Received ack " + i + " from " + this.destNode);
        UnackedMsg unackedMsg = (UnackedMsg) this.unackedMsgs.remove(new Integer(i));
        if (unackedMsg == null) {
            this.log.warn("Received an ack for a message that does not exists :" + i);
            return;
        }
        long calculateRTT = unackedMsg.calculateRTT();
        if (calculateRTT > 0) {
            this.rtt = (this.rtt * 0.8d) + (calculateRTT * (1.0d - 0.8d));
            this.log.debug("AVG=" + this.rtt + " NEW=" + calculateRTT);
        }
    }

    public int getUnackedMsgs() {
        return this.unackedMsgs.size();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void end() {
        this.finish = true;
        notifyAll();
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        this.log.info("Creating sender " + this.destNode);
        while (!this.finish) {
            deliverMessage();
            Thread.yield();
        }
        this.log.info("Closing ConnHandlerOut to node " + this.destNode);
    }

    private synchronized void deliverMessage() {
        OutMsgElement nextElement = getNextElement();
        if (nextElement == null) {
            return;
        }
        this.log.info("Delivering message of type " + nextElement.type + " to " + this.destNode);
        try {
            if (nextElement.type == ConnMessageTypes.CONTENTS_MSG) {
                this.msgNo++;
                this.unackedMsgs.put(new Integer(this.msgNo), new UnackedMsg((MsgSrcDestWrapper) nextElement.load));
            }
            this.outStream.writeByte(nextElement.type.toByte());
            if (nextElement.type == ConnMessageTypes.CONTENTS_MSG) {
                MsgSrcDestWrapper msgSrcDestWrapper = (MsgSrcDestWrapper) nextElement.load;
                this.outStream.writeByte(1);
                this.outStream.writeInt(this.msgNo);
                byte[] marshalMsgSrcDestWrapper = this.marshal.marshalMsgSrcDestWrapper(msgSrcDestWrapper);
                ConnMessageMiscs.m_writeDataBlock(marshalMsgSrcDestWrapper, this.outStream);
                this.connHandler.statAddBytesSent(marshalMsgSrcDestWrapper.length);
                this.log.info("msg " + this.msgNo + " is sent");
            } else if (nextElement.type == ConnMessageTypes.PRESENT_MSG) {
                ConnMessageMiscs.m_writeDataBlock(new BootstrapMsg(nextElement.src).flatten(), this.outStream);
            } else if (nextElement.type == ConnMessageTypes.ACK_MSG) {
                this.outStream.writeInt(((Integer) nextElement.load).intValue());
            }
            this.outStream.flush();
        } catch (IOException e) {
            this.finish = true;
            this.log.info("Closing ConnHandlerOut " + this.destNode + "\n" + e);
        } catch (Exception e2) {
            this.finish = true;
            this.log.info("Closing ConnHandlerOut " + this.destNode + "\n" + e2);
        } catch (StackOverflowError e3) {
            this.log.error("Stack Overflow:" + e3);
        }
    }

    private void acceptConnection() throws IOException {
        this.socket.setSoTimeout(0);
        this.outStream = new DataOutputStream(this.socket.getOutputStream());
    }

    private void establishConnection() throws IOException {
        this.socket = new Socket(this.destNode.getIP(), this.destNode.getPort());
        this.socket.setSoTimeout(0);
        this.outStream = new DataOutputStream(this.socket.getOutputStream());
    }

    private synchronized OutMsgElement getNextElement() {
        while (this.msgQueue.isEmpty() && !this.finish) {
            try {
                wait();
            } catch (InterruptedException e) {
                end();
                return null;
            }
        }
        if (this.msgQueue.size() == 25) {
            notify();
        }
        if (this.finish) {
            return null;
        }
        return (OutMsgElement) this.msgQueue.remove(0);
    }

    private synchronized void addElement(OutMsgElement outMsgElement) {
        while (this.msgQueue.size() >= 25) {
            try {
                wait();
            } catch (Exception e) {
                this.log.error(e + "");
            }
        }
        this.msgQueue.add(outMsgElement);
        if (this.msgQueue.size() == 1) {
            notify();
        }
    }

    public static double getDefaultRTT() {
        return 1000.0d;
    }

    public static double getDefaultRTO() {
        return 7000.0d;
    }

    public double getRTT() {
        return this.rtt;
    }

    public double getRTO() {
        return this.rtt * RTOFACTOR;
    }
}
