/*
 * Decompiled with CFR 0.152.
 */
package de.huxhorn.lilith.engine.impl.eventproducer;

import de.huxhorn.lilith.data.eventsource.EventWrapper;
import de.huxhorn.lilith.data.eventsource.SourceIdentifier;
import de.huxhorn.lilith.engine.impl.eventproducer.AbstractEventProducer;
import de.huxhorn.lilith.engine.impl.eventproducer.SourceIdentifierUpdater;
import de.huxhorn.sulky.buffers.AppendOperation;
import de.huxhorn.sulky.codec.Decoder;
import de.huxhorn.sulky.io.IOUtilities;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public abstract class AbstractMessageBasedEventProducer<T extends Serializable>
extends AbstractEventProducer<T> {
    private final Logger logger = LoggerFactory.getLogger(AbstractMessageBasedEventProducer.class);
    private final DataInputStream dataInput;
    private Decoder<T> decoder;
    private boolean compressing;
    private final AtomicLong heartbeatTimestamp;

    public AbstractMessageBasedEventProducer(SourceIdentifier sourceIdentifier, AppendOperation<EventWrapper<T>> eventQueue, SourceIdentifierUpdater<T> sourceIdentifierUpdater, InputStream inputStream, boolean compressing) {
        super(sourceIdentifier, eventQueue, sourceIdentifierUpdater);
        this.dataInput = new DataInputStream(new BufferedInputStream(inputStream));
        this.compressing = compressing;
        this.decoder = this.createDecoder();
        this.heartbeatTimestamp = new AtomicLong();
    }

    protected abstract Decoder<T> createDecoder();

    @Override
    public void start() {
        this.updateHeartbeatTimestamp();
        Thread t = new Thread((Runnable)new ReceiverRunnable(this.getSourceIdentifier()), "" + this.getSourceIdentifier() + "-Receiver");
        t.setDaemon(false);
        t.start();
        t = new Thread((Runnable)new HeartbeatObserverRunnable(), "" + this.getSourceIdentifier() + "-HeartbeatObserver");
        t.setDaemon(false);
        t.start();
    }

    protected void updateHeartbeatTimestamp() {
        this.heartbeatTimestamp.set(System.currentTimeMillis());
    }

    protected long getHeartbeatTimestamp() {
        return this.heartbeatTimestamp.get();
    }

    public boolean isCompressing() {
        return this.compressing;
    }

    @Override
    public void close() {
        if (this.logger.isInfoEnabled()) {
            this.logger.info("Closing {}.", (Object)this.getClass().getName());
        }
        IOUtilities.closeQuietly((InputStream)this.dataInput);
    }

    private class ReceiverRunnable
    implements Runnable {
        private SourceIdentifier sourceIdentifier;
        private static final String SOURCE_IDENTIFIER_MDC_KEY = "sourceIdentifier";

        public ReceiverRunnable(SourceIdentifier sourceIdentifier) {
            this.sourceIdentifier = sourceIdentifier;
        }

        public void run() {
            MDC.put((String)SOURCE_IDENTIFIER_MDC_KEY, (String)this.sourceIdentifier.toString());
            block4: while (true) {
                try {
                    while (true) {
                        boolean allocating = true;
                        int size = 0;
                        try {
                            size = AbstractMessageBasedEventProducer.this.dataInput.readInt();
                            AbstractMessageBasedEventProducer.this.updateHeartbeatTimestamp();
                            if (size > 0) {
                                byte[] bytes = new byte[size];
                                allocating = false;
                                AbstractMessageBasedEventProducer.this.dataInput.readFully(bytes);
                                Serializable object = (Serializable)AbstractMessageBasedEventProducer.this.decoder.decode(bytes);
                                if (object == null) {
                                    if (!AbstractMessageBasedEventProducer.this.logger.isInfoEnabled()) continue block4;
                                    AbstractMessageBasedEventProducer.this.logger.info("Retrieved null!");
                                    continue block4;
                                }
                                AbstractMessageBasedEventProducer.this.addEvent(object);
                                continue block4;
                            }
                            if (!AbstractMessageBasedEventProducer.this.logger.isDebugEnabled()) continue block4;
                            AbstractMessageBasedEventProducer.this.logger.debug("Received heartbeat from {}.", (Object)AbstractMessageBasedEventProducer.this.getSourceIdentifier());
                            continue block4;
                        }
                        catch (OutOfMemoryError ex) {
                            if (allocating) {
                                if (AbstractMessageBasedEventProducer.this.logger.isWarnEnabled()) {
                                    AbstractMessageBasedEventProducer.this.logger.warn("Out of memory while trying to allocate {} bytes! Skipping them instead...", (Object)size);
                                }
                                this.skipBytes(size, AbstractMessageBasedEventProducer.this.dataInput);
                                continue;
                            }
                            if (!AbstractMessageBasedEventProducer.this.logger.isWarnEnabled()) continue;
                            AbstractMessageBasedEventProducer.this.logger.warn("Out of memory while deserializing from {} bytes!", (Object)size);
                            continue;
                        }
                        break;
                    }
                }
                catch (Throwable e) {
                    if (AbstractMessageBasedEventProducer.this.logger.isInfoEnabled()) {
                        AbstractMessageBasedEventProducer.this.logger.info("Exception ({}: '{}') while reading events. Adding eventWrapper with empty event and stopping...", (Object)e.getClass().getName(), (Object)e.getMessage());
                    }
                    AbstractMessageBasedEventProducer.this.addEvent(null);
                    IOUtilities.interruptIfNecessary((Throwable)e);
                    MDC.remove((String)SOURCE_IDENTIFIER_MDC_KEY);
                    return;
                }
            }
        }

        public void skipBytes(long numberOfBytes, InputStream input) throws IOException {
            long skipped;
            for (long skippedTotal = 0L; skippedTotal < numberOfBytes; skippedTotal += skipped) {
                skipped = input.skip(numberOfBytes - skippedTotal);
                if (skipped >= 0L) continue;
                throw new IOException("Negative skipped bytes value while trying to skip " + numberOfBytes + " bytes!");
            }
        }
    }

    private class HeartbeatObserverRunnable
    implements Runnable {
        private HeartbeatObserverRunnable() {
        }

        public void run() {
            try {
                long heartbeat;
                do {
                    Thread.sleep(45000L);
                    heartbeat = AbstractMessageBasedEventProducer.this.getHeartbeatTimestamp();
                } while (System.currentTimeMillis() - heartbeat <= 90000L);
                if (AbstractMessageBasedEventProducer.this.logger.isInfoEnabled()) {
                    AbstractMessageBasedEventProducer.this.logger.info("Closing receiver because heartbeat of {} was missing.", (Object)AbstractMessageBasedEventProducer.this.getSourceIdentifier());
                }
                AbstractMessageBasedEventProducer.this.close();
                return;
            }
            catch (InterruptedException e) {
                if (AbstractMessageBasedEventProducer.this.logger.isInfoEnabled()) {
                    AbstractMessageBasedEventProducer.this.logger.info("Interrupted...", (Throwable)e);
                }
                AbstractMessageBasedEventProducer.this.close();
                return;
            }
        }
    }
}

