/*
 * Decompiled with CFR 0.152.
 */
package org.mpisws.p2p.transport.direct;

import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.Map;
import org.mpisws.p2p.transport.ErrorHandler;
import org.mpisws.p2p.transport.MessageCallback;
import org.mpisws.p2p.transport.MessageRequestHandle;
import org.mpisws.p2p.transport.P2PSocket;
import org.mpisws.p2p.transport.SocketCallback;
import org.mpisws.p2p.transport.SocketRequestHandle;
import org.mpisws.p2p.transport.TransportLayer;
import org.mpisws.p2p.transport.TransportLayerCallback;
import org.mpisws.p2p.transport.direct.ConnectorExceptionDelivery;
import org.mpisws.p2p.transport.direct.DirectAppSocket;
import org.mpisws.p2p.transport.direct.GenericNetworkSimulator;
import org.mpisws.p2p.transport.exception.NodeIsFaultyException;
import org.mpisws.p2p.transport.liveness.LivenessProvider;
import org.mpisws.p2p.transport.util.MessageRequestHandleImpl;
import org.mpisws.p2p.transport.util.SocketRequestHandleImpl;
import rice.environment.Environment;
import rice.environment.logging.Logger;
import rice.p2p.commonapi.Cancellable;
import rice.p2p.commonapi.CancellableTask;
import rice.pastry.direct.NetworkSimulator;
import rice.pastry.direct.NodeRecord;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class DirectTransportLayer<Identifier, MessageType>
implements TransportLayer<Identifier, MessageType> {
    protected boolean acceptMessages = true;
    protected boolean acceptSockets = true;
    protected Identifier localIdentifier;
    protected TransportLayerCallback<Identifier, MessageType> callback;
    protected GenericNetworkSimulator<Identifier, MessageType> simulator;
    protected ErrorHandler<Identifier> errorHandler;
    protected LivenessProvider<Identifier> livenessProvider;
    protected Environment environment;
    protected Logger logger;
    int seq = Integer.MIN_VALUE;

    public DirectTransportLayer(Identifier local, NetworkSimulator<Identifier, MessageType> simulator, NodeRecord nr, Environment env) {
        this.localIdentifier = local;
        this.simulator = simulator.getGenericSimulator();
        this.livenessProvider = simulator.getLivenessProvider();
        this.environment = env;
        this.logger = this.environment.getLogManager().getLogger(DirectTransportLayer.class, null);
        simulator.registerNode(local, this, nr);
    }

    @Override
    public void acceptMessages(boolean b) {
        this.acceptMessages = b;
    }

    @Override
    public void acceptSockets(boolean b) {
        this.acceptSockets = b;
    }

    @Override
    public Identifier getLocalIdentifier() {
        return this.localIdentifier;
    }

    @Override
    public SocketRequestHandle<Identifier> openSocket(Identifier i, SocketCallback<Identifier> deliverSocketToMe, Map<String, Object> options) {
        SocketRequestHandleImpl<Identifier> handle = new SocketRequestHandleImpl<Identifier>(i, options, this.logger);
        if (this.simulator.isAlive(i)) {
            int delay = Math.round(this.simulator.networkDelay(this.localIdentifier, i));
            DirectAppSocket<Identifier, MessageType> socket = new DirectAppSocket<Identifier, MessageType>(i, this.localIdentifier, deliverSocketToMe, this.simulator, handle, options);
            CancelAndClose<Identifier, MessageType> cancelAndClose = new CancelAndClose<Identifier, MessageType>(socket, this.simulator.enqueueDelivery(socket.getAcceptorDelivery(), delay));
            handle.setSubCancellable(cancelAndClose);
        } else {
            int delay = 5000;
            handle.setSubCancellable(this.simulator.enqueueDelivery(new ConnectorExceptionDelivery<Identifier>(deliverSocketToMe, handle, new SocketTimeoutException()), delay));
        }
        return handle;
    }

    @Override
    public MessageRequestHandle<Identifier, MessageType> sendMessage(Identifier i, MessageType m, MessageCallback<Identifier, MessageType> deliverAckToMe, Map<String, Object> options) {
        if (!this.simulator.isAlive(this.localIdentifier)) {
            return null;
        }
        MessageRequestHandleImpl<Identifier, MessageType> handle = new MessageRequestHandleImpl<Identifier, MessageType>(i, m, options);
        if (this.livenessProvider.getLiveness(i, null) >= 3) {
            if (this.logger.level <= 500) {
                this.logger.log("Attempt to send message " + m + " to a dead node " + i + "!");
            }
            if (deliverAckToMe != null) {
                deliverAckToMe.sendFailed(handle, new NodeIsFaultyException(i));
            }
        } else if (this.simulator.isAlive(i)) {
            int delay = Math.round(this.simulator.networkDelay(this.localIdentifier, i));
            handle.setSubCancellable(this.simulator.deliverMessage(m, i, this.localIdentifier, delay));
            if (deliverAckToMe != null) {
                deliverAckToMe.ack(handle);
            }
        }
        return handle;
    }

    @Override
    public void setCallback(TransportLayerCallback<Identifier, MessageType> callback) {
        this.callback = callback;
    }

    @Override
    public void setErrorHandler(ErrorHandler<Identifier> handler) {
        this.errorHandler = handler;
    }

    @Override
    public void destroy() {
        this.simulator.remove(this.getLocalIdentifier());
    }

    public boolean canReceiveSocket() {
        return this.acceptSockets;
    }

    public void finishReceiveSocket(P2PSocket<Identifier> acceptorEndpoint) {
        block2: {
            try {
                this.callback.incomingSocket(acceptorEndpoint);
            }
            catch (IOException ioe) {
                if (this.logger.level > 900) break block2;
                this.logger.logException("Exception in " + this.callback, ioe);
            }
        }
    }

    public Logger getLogger() {
        return this.logger;
    }

    public synchronized int getNextSeq() {
        return this.seq++;
    }

    public void incomingMessage(Identifier i, MessageType m, Map<String, Object> options) throws IOException {
        this.callback.messageReceived(i, m, options);
    }

    public Environment getEnvironment() {
        return this.environment;
    }

    public void clearState(Identifier i) {
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    static class CancelAndClose<Identifier, MessageType>
    implements Cancellable {
        DirectAppSocket<Identifier, MessageType> closeMe;
        Cancellable cancelMe;

        public CancelAndClose(DirectAppSocket<Identifier, MessageType> socket, CancellableTask task) {
            this.closeMe = socket;
            this.cancelMe = task;
        }

        @Override
        public boolean cancel() {
            this.closeMe.connectorEndpoint.close();
            return this.cancelMe.cancel();
        }
    }
}

