/*
 * Decompiled with CFR 0.152.
 */
package de.huxhorn.lilith.engine.impl.eventproducer;

import de.huxhorn.lilith.data.eventsource.EventWrapper;
import de.huxhorn.lilith.data.eventsource.SourceIdentifier;
import de.huxhorn.lilith.engine.impl.eventproducer.AbstractEventProducer;
import de.huxhorn.lilith.engine.impl.eventproducer.SourceIdentifierUpdater;
import de.huxhorn.sulky.buffers.AppendOperation;
import de.huxhorn.sulky.io.IOUtilities;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.Serializable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public abstract class AbstractStreamEventProducer<T extends Serializable>
extends AbstractEventProducer<T> {
    final Logger logger = LoggerFactory.getLogger(AbstractStreamEventProducer.class);
    private ObjectInputStream dataInput;

    public AbstractStreamEventProducer(SourceIdentifier sourceIdentifier, AppendOperation<EventWrapper<T>> eventQueue, SourceIdentifierUpdater<T> sourceIdentifierUpdater, InputStream inputStream) throws IOException {
        super(sourceIdentifier, eventQueue, sourceIdentifierUpdater);
        this.dataInput = new ObjectInputStream(new BufferedInputStream(inputStream));
    }

    @Override
    public void start() {
        Thread t = new Thread((Runnable)new ReceiverRunnable(), "" + this.getSourceIdentifier() + "-Receiver");
        t.setDaemon(false);
        t.start();
    }

    protected abstract T postProcessEvent(Object var1);

    @Override
    public void close() {
        IOUtilities.closeQuietly((InputStream)this.dataInput);
    }

    private class ReceiverRunnable
    implements Runnable {
        private ReceiverRunnable() {
        }

        public void run() {
            try {
                while (true) {
                    Object object = AbstractStreamEventProducer.this.dataInput.readObject();
                    Object event = AbstractStreamEventProducer.this.postProcessEvent(object);
                    if (object == null) {
                        if (!AbstractStreamEventProducer.this.logger.isInfoEnabled()) continue;
                        AbstractStreamEventProducer.this.logger.info("Retrieved null!");
                        continue;
                    }
                    AbstractStreamEventProducer.this.addEvent(event);
                }
            }
            catch (Throwable e) {
                if (AbstractStreamEventProducer.this.logger.isInfoEnabled()) {
                    AbstractStreamEventProducer.this.logger.info("Exception ({}: '{}') while reading events. Adding eventWrapper with empty event and stopping...", (Object)e.getClass().getName(), (Object)e.getMessage());
                }
                if (AbstractStreamEventProducer.this.logger.isDebugEnabled()) {
                    AbstractStreamEventProducer.this.logger.debug("Exception ({}: '{}') while reading events. Adding eventWrapper with empty event and stopping...", new Object[]{e.getClass().getName(), e.getMessage(), e});
                }
                AbstractStreamEventProducer.this.addEvent(null);
                IOUtilities.interruptIfNecessary((Throwable)e);
                return;
            }
        }
    }
}

