/*
 * Decompiled with CFR 0.152.
 */
package com.orientechnologies.orient.etl;

import com.orientechnologies.common.io.OIOUtils;
import com.orientechnologies.orient.core.OConstants;
import com.orientechnologies.orient.core.Orient;
import com.orientechnologies.orient.core.command.OBasicCommandContext;
import com.orientechnologies.orient.core.command.OCommandContext;
import com.orientechnologies.orient.core.exception.OConfigurationException;
import com.orientechnologies.orient.core.record.impl.ODocument;
import com.orientechnologies.orient.etl.OETLComponent;
import com.orientechnologies.orient.etl.OETLComponentFactory;
import com.orientechnologies.orient.etl.OETLPipeline;
import com.orientechnologies.orient.etl.OETLProcessHaltedException;
import com.orientechnologies.orient.etl.OExtractedItem;
import com.orientechnologies.orient.etl.block.OBlock;
import com.orientechnologies.orient.etl.extractor.OExtractor;
import com.orientechnologies.orient.etl.loader.OLoader;
import com.orientechnologies.orient.etl.source.OSource;
import com.orientechnologies.orient.etl.transformer.OTransformer;
import com.tinkerpop.blueprints.impls.orient.OrientEdge;
import com.tinkerpop.blueprints.impls.orient.OrientVertex;
import java.io.File;
import java.io.IOException;
import java.io.Reader;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.TimerTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

public class OETLProcessor {
    protected final OETLComponentFactory factory = new OETLComponentFactory();
    protected List<OBlock> beginBlocks;
    protected List<OBlock> endBlocks;
    protected OSource source;
    protected OExtractor extractor;
    protected OLoader loader;
    protected List<OTransformer> transformers;
    protected OCommandContext context;
    protected long startTime;
    protected long elapsed;
    protected OETLProcessorStats stats = new OETLProcessorStats();
    protected TimerTask dumpTask;
    protected LOG_LEVELS logLevel = LOG_LEVELS.INFO;
    protected boolean haltOnError = true;
    protected boolean parallel = false;
    protected int maxRetries = 10;
    private Thread[] threads;

    public OETLProcessor(List<OBlock> iBeginBlocks, OSource iSource, OExtractor iExtractor, List<OTransformer> iTransformers, OLoader iLoader, List<OBlock> iEndBlocks, OCommandContext iContext) {
        this.beginBlocks = iBeginBlocks;
        this.source = iSource;
        this.extractor = iExtractor;
        this.transformers = iTransformers;
        this.loader = iLoader;
        this.endBlocks = iEndBlocks;
        this.context = iContext;
        this.init();
    }

    public OETLProcessor() {
    }

    public static void main(String[] args) {
        System.out.println("OrientDB etl v." + OConstants.getVersion() + " " + "www.orientdb.com");
        if (args.length == 0) {
            System.out.println("Syntax error, missing configuration file.");
            System.out.println("Use: oetl.sh <json-file>");
            System.exit(1);
        }
        OETLProcessor processor = OETLProcessor.parseConfigAndParameters(args);
        processor.execute();
    }

    protected static OETLProcessor parseConfigAndParameters(String[] args) {
        OCommandContext context = OETLProcessor.createDefaultContext();
        ODocument configuration = new ODocument().fromJSON("{}");
        for (String arg : args) {
            if (arg.charAt(0) == '-') continue;
            try {
                String config = OIOUtils.readFileAsString((File)new File(arg));
                configuration.merge(new ODocument().fromJSON(config, "noMap"), true, true);
                ODocument cfgGlobal = (ODocument)configuration.field("config");
                if (cfgGlobal == null) continue;
                for (String f : cfgGlobal.fieldNames()) {
                    context.setVariable(f, cfgGlobal.field(f));
                }
            }
            catch (IOException e) {
                throw new OConfigurationException("Error on loading config file: " + arg, (Throwable)e);
            }
        }
        for (String arg : args) {
            if (arg.charAt(0) != '-') continue;
            String[] parts = arg.substring(1).split("=");
            context.setVariable(parts[0], (Object)parts[1]);
        }
        return new OETLProcessor().parse(configuration, context);
    }

    protected static OCommandContext createDefaultContext() {
        OBasicCommandContext context = new OBasicCommandContext();
        context.setVariable("dumpEveryMs", (Object)1000);
        return context;
    }

    public OETLProcessor parse(ODocument cfg, OCommandContext iContext) {
        return this.parse((Collection)cfg.field("begin"), (ODocument)cfg.field("source"), (ODocument)cfg.field("extractor"), (Collection)cfg.field("transformers"), (ODocument)cfg.field("loader"), (Collection)cfg.field("end"), iContext);
    }

    public OETLProcessor parse(Collection<ODocument> iBeginBlocks, ODocument iSource, ODocument iExtractor, Collection<ODocument> iTransformers, ODocument iLoader, Collection<ODocument> iEndBlocks, OCommandContext iContext) {
        if (iExtractor == null) {
            throw new IllegalArgumentException("No Extractor configured");
        }
        this.context = iContext != null ? iContext : OETLProcessor.createDefaultContext();
        this.init();
        try {
            OBlock b;
            String name;
            this.beginBlocks = new ArrayList<OBlock>();
            if (iBeginBlocks != null) {
                for (ODocument block : iBeginBlocks) {
                    name = block.fieldNames()[0];
                    b = this.factory.getBlock(name);
                    this.beginBlocks.add(b);
                    this.configureComponent(b, (ODocument)block.field(name), iContext);
                    b.execute();
                }
            }
            if (iSource != null) {
                name = iSource.fieldNames()[0];
                this.source = this.factory.getSource(name);
                this.configureComponent(this.source, (ODocument)iSource.field(name), iContext);
            } else {
                this.source = this.factory.getSource("input");
            }
            name = iExtractor.fieldNames()[0];
            this.extractor = this.factory.getExtractor(name);
            this.configureComponent(this.extractor, (ODocument)iExtractor.field(name), iContext);
            if (iLoader != null) {
                name = iLoader.fieldNames()[0];
                this.loader = this.factory.getLoader(name);
                this.configureComponent(this.loader, (ODocument)iLoader.field(name), iContext);
            } else {
                this.loader = this.factory.getLoader("output");
            }
            this.transformers = new ArrayList<OTransformer>();
            if (iTransformers != null) {
                for (ODocument t : iTransformers) {
                    name = t.fieldNames()[0];
                    OTransformer tr = this.factory.getTransformer(name);
                    this.transformers.add(tr);
                    this.configureComponent(tr, (ODocument)t.field(name), iContext);
                }
            }
            this.endBlocks = new ArrayList<OBlock>();
            if (iEndBlocks != null) {
                for (ODocument block : iEndBlocks) {
                    name = block.fieldNames()[0];
                    b = this.factory.getBlock(name);
                    this.endBlocks.add(b);
                    this.configureComponent(b, (ODocument)block.field(name), iContext);
                }
            }
        }
        catch (Exception e) {
            throw new OConfigurationException("Error on creating ETL processor", (Throwable)e);
        }
        return this;
    }

    public OETLComponentFactory getFactory() {
        return this.factory;
    }

    public OETLProcessor execute() {
        if (this.parallel) {
            this.executeParallel();
        } else {
            this.executeSequentially();
        }
        return this;
    }

    public void out(LOG_LEVELS iLogLevel, String iText, Object ... iArgs) {
        if (this.logLevel.ordinal() >= iLogLevel.ordinal()) {
            System.out.println(String.format(iText, iArgs));
        }
    }

    public OETLProcessorStats getStats() {
        return this.stats;
    }

    public OExtractor getExtractor() {
        return this.extractor;
    }

    public OLoader getLoader() {
        return this.loader;
    }

    public List<OTransformer> getTransformers() {
        return this.transformers;
    }

    public LOG_LEVELS getLogLevel() {
        return this.logLevel;
    }

    public OCommandContext getContext() {
        return this.context;
    }

    public boolean isParallel() {
        return this.threads != null;
    }

    protected void executeParallel() {
        try {
            Reader reader;
            this.begin();
            this.out(LOG_LEVELS.INFO, "Started parallel execution with %d threads", this.threads.length);
            if (this.source != null && (reader = this.source.read()) != null) {
                this.extractor.extract(reader);
            }
            final LinkedBlockingQueue<OExtractedItem> queue = new LinkedBlockingQueue<OExtractedItem>(this.threads.length * 500){

                @Override
                public boolean offer(OExtractedItem e) {
                    try {
                        this.put(e);
                        return true;
                    }
                    catch (InterruptedException ie) {
                        Thread.currentThread().interrupt();
                        return false;
                    }
                }
            };
            final AtomicLong counter = new AtomicLong();
            final AtomicBoolean extractionFinished = new AtomicBoolean(false);
            final OETLProcessor processor = this;
            for (int i = 0; i < this.threads.length; ++i) {
                this.threads[i] = new Thread(new Runnable(){
                    final OETLPipeline pipeline;
                    {
                        this.pipeline = new OETLPipeline(processor, OETLProcessor.this.transformers, OETLProcessor.this.loader, OETLProcessor.this.logLevel, OETLProcessor.this.maxRetries, OETLProcessor.this.haltOnError);
                    }

                    @Override
                    public void run() {
                        this.pipeline.begin();
                        while (!extractionFinished.get() || counter.get() > 0L) {
                            try {
                                OExtractedItem content = (OExtractedItem)queue.take();
                                try {
                                    this.pipeline.execute(content);
                                }
                                finally {
                                    counter.decrementAndGet();
                                }
                            }
                            catch (InterruptedException interruptedException) {}
                        }
                    }
                }, "OrientDB ETL pipeline-" + i);
                this.threads[i].setDaemon(true);
                this.threads[i].start();
            }
            while (this.extractor.hasNext()) {
                OExtractedItem current = (OExtractedItem)this.extractor.next();
                queue.offer(current);
                counter.incrementAndGet();
            }
            extractionFinished.set(true);
            while (counter.get() > 0L) {
                this.out(LOG_LEVELS.INFO, "Waiting for the pipeline to finish, remaining " + counter.get() + " entries to process", new Object[0]);
                try {
                    Thread.sleep(500L);
                }
                catch (InterruptedException interruptedException) {}
            }
            this.end();
        }
        catch (OETLProcessHaltedException e) {
            this.out(LOG_LEVELS.ERROR, "ETL process halted: %s", new Object[]{e});
        }
    }

    protected void begin() {
        Integer dumpEveryMs;
        this.out(LOG_LEVELS.INFO, "BEGIN ETL PROCESSOR", new Object[0]);
        Integer cfgMaxRetries = (Integer)this.context.getVariable("maxRetries");
        if (cfgMaxRetries != null) {
            this.maxRetries = cfgMaxRetries;
        }
        if ((dumpEveryMs = (Integer)this.context.getVariable("dumpEveryMs")) != null && dumpEveryMs > 0) {
            this.dumpTask = new TimerTask(){

                @Override
                public void run() {
                    OETLProcessor.this.dumpProgress();
                }
            };
            Orient.instance().scheduleTask(this.dumpTask, (long)dumpEveryMs.intValue(), (long)dumpEveryMs.intValue());
            this.startTime = System.currentTimeMillis();
        }
        for (OBlock t : this.beginBlocks) {
            t.begin();
            t.execute();
            t.end();
        }
        if (this.source != null) {
            this.source.begin();
        }
        this.extractor.begin();
    }

    protected void end() {
        for (OTransformer oTransformer : this.transformers) {
            oTransformer.end();
        }
        if (this.source != null) {
            this.source.end();
        }
        this.extractor.end();
        this.loader.end();
        for (OBlock oBlock : this.endBlocks) {
            oBlock.begin();
            oBlock.execute();
            oBlock.end();
        }
        this.elapsed = System.currentTimeMillis() - this.startTime;
        if (this.dumpTask != null) {
            this.dumpTask.cancel();
        }
        this.out(LOG_LEVELS.INFO, "END ETL PROCESSOR", new Object[0]);
        this.dumpProgress();
    }

    protected void executeSequentially() {
        try {
            Reader reader;
            this.begin();
            if (this.source != null && (reader = this.source.read()) != null) {
                this.extractor.extract(reader);
            }
            OETLPipeline pipeline = new OETLPipeline(this, this.transformers, this.loader, this.logLevel, this.maxRetries, this.haltOnError);
            pipeline.begin();
            while (this.extractor.hasNext()) {
                OExtractedItem current = (OExtractedItem)this.extractor.next();
                pipeline.execute(current);
            }
            this.end();
        }
        catch (OETLProcessHaltedException e) {
            this.out(LOG_LEVELS.ERROR, "ETL process halted: %s", new Object[]{e});
            throw e;
        }
    }

    protected void configureComponent(OETLComponent iComponent, ODocument iCfg, OCommandContext iContext) {
        iComponent.configure(this, iCfg, iContext);
    }

    protected Class getClassByName(OETLComponent iComponent, String iClassName) {
        Class inClass;
        if (iClassName.equals("ODocument")) {
            inClass = ODocument.class;
        } else if (iClassName.equals("String")) {
            inClass = String.class;
        } else if (iClassName.equals("Object")) {
            inClass = Object.class;
        } else if (iClassName.equals("OrientVertex")) {
            inClass = OrientVertex.class;
        } else if (iClassName.equals("OrientEdge")) {
            inClass = OrientEdge.class;
        } else {
            try {
                inClass = Class.forName(iClassName);
            }
            catch (ClassNotFoundException e) {
                throw new OConfigurationException("Class '" + iClassName + "' declared as 'input' of ETL Component '" + iComponent.getName() + "' was not found.");
            }
        }
        return inClass;
    }

    protected void dumpProgress() {
        String extractorTotalFormatted;
        long now = System.currentTimeMillis();
        long extractorProgress = this.extractor.getProgress();
        long extractorTotal = this.extractor.getTotal();
        long extractorItemsSec = (long)((float)(extractorProgress - this.stats.lastExtractorProgress) * 1000.0f / (float)(now - this.stats.lastLap));
        String extractorUnit = this.extractor.getUnit();
        long loaderProgress = this.loader.getProgress();
        long loaderItemsSec = (long)((float)(loaderProgress - this.stats.lastLoaderProgress) * 1000.0f / (float)(now - this.stats.lastLap));
        String loaderUnit = this.loader.getUnit();
        String string = extractorTotalFormatted = extractorTotal > -1L ? String.format("%,d", extractorTotal) : "?";
        if (extractorTotal == -1L) {
            this.out(LOG_LEVELS.INFO, "+ extracted %,d %s (%,d %s/sec) - %,d %s -> loaded %,d %s (%,d %s/sec) Total time: %s [%d warnings, %d errors]", extractorProgress, extractorUnit, extractorItemsSec, extractorUnit, this.extractor.getProgress(), this.extractor.getUnit(), loaderProgress, loaderUnit, loaderItemsSec, loaderUnit, OIOUtils.getTimeAsString((long)(now - this.startTime)), this.stats.warnings.get(), this.stats.errors.get());
        } else {
            float extractorPercentage = (float)extractorProgress * 100.0f / (float)extractorTotal;
            this.out(LOG_LEVELS.INFO, "+ %3.2f%% -> extracted %,d/%,d %s (%,d %s/sec) - %,d %s -> loaded %,d %s (%,d %s/sec) Total time: %s [%d warnings, %d errors]", Float.valueOf(extractorPercentage), extractorProgress, extractorTotal, extractorUnit, extractorItemsSec, extractorUnit, this.extractor.getProgress(), this.extractor.getUnit(), loaderProgress, loaderUnit, loaderItemsSec, loaderUnit, OIOUtils.getTimeAsString((long)(now - this.startTime)), this.stats.warnings.get(), this.stats.errors.get());
        }
        this.stats.lastExtractorProgress = extractorProgress;
        this.stats.lastLoaderProgress = loaderProgress;
        this.stats.lastLap = now;
    }

    protected void analyzeFlow() {
        if (this.extractor == null) {
            throw new OConfigurationException("extractor is null");
        }
        if (this.loader == null) {
            throw new OConfigurationException("loader is null");
        }
        OETLComponent lastComponent = this.extractor;
        for (OTransformer t : this.transformers) {
            this.checkTypeCompatibility(t, lastComponent);
            lastComponent = t;
        }
        this.checkTypeCompatibility(this.loader, lastComponent);
    }

    protected void checkTypeCompatibility(OETLComponent iCurrentComponent, OETLComponent iLastComponent) {
        List ins;
        String out;
        try {
            out = (String)iLastComponent.getConfiguration().field("output");
            if (out == null) {
                return;
            }
            ins = (List)iCurrentComponent.getConfiguration().field("input");
            if (ins == null) {
                return;
            }
            Class outClass = this.getClassByName(iLastComponent, out);
            for (String in : ins) {
                Class inClass = this.getClassByName(iCurrentComponent, in);
                if (!inClass.isAssignableFrom(outClass)) continue;
                return;
            }
        }
        catch (Exception e) {
            throw new OConfigurationException("Error on checking compatibility between components '" + iLastComponent.getName() + "' and '" + iCurrentComponent.getName() + "'", (Throwable)e);
        }
        throw new OConfigurationException("Component '" + iCurrentComponent.getName() + "' expects one of the following inputs " + ins + " but the 'output' for component '" + iLastComponent.getName() + "' is: " + out);
    }

    protected void init() {
        Object parallelSetting;
        Boolean cfgHaltOnError;
        String cfgLog = (String)this.context.getVariable("log");
        if (cfgLog != null) {
            this.logLevel = LOG_LEVELS.valueOf(cfgLog.toUpperCase());
        }
        if ((cfgHaltOnError = (Boolean)this.context.getVariable("haltOnError")) != null) {
            this.haltOnError = cfgHaltOnError;
        }
        if ((parallelSetting = this.context.getVariable("parallel")) != null) {
            this.parallel = (Boolean)parallelSetting;
        }
        if (this.parallel) {
            int cores = Runtime.getRuntime().availableProcessors();
            if (cores > 2) {
                --cores;
            }
            this.threads = new Thread[cores];
            for (int i = 0; i < cores; ++i) {
                this.threads[i] = new Thread("OrientDB ETL Pipeline-" + i);
            }
        }
    }

    public class OETLProcessorStats {
        public long lastExtractorProgress = 0L;
        public long lastLoaderProgress = 0L;
        public long lastLap = 0L;
        public AtomicLong warnings = new AtomicLong();
        public AtomicLong errors = new AtomicLong();

        public long incrementWarnings() {
            return this.warnings.incrementAndGet();
        }

        public long incrementErrors() {
            return this.errors.incrementAndGet();
        }
    }

    public static enum LOG_LEVELS {
        NONE,
        ERROR,
        INFO,
        DEBUG;

    }
}

