/*
 * Decompiled with CFR 0.152.
 */
package com.streamsets.pipeline.api.base.configurablestage;

import com.streamsets.pipeline.api.Source;
import com.streamsets.pipeline.api.Stage;
import com.streamsets.pipeline.api.StageException;
import com.streamsets.pipeline.api.base.configurablestage.DSourceOffsetCommitter;
import com.streamsets.pipeline.api.impl.ClusterSource;
import com.streamsets.pipeline.api.impl.Utils;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class DClusterSourceOffsetCommitter
extends DSourceOffsetCommitter
implements ClusterSource {
    private static final Logger LOG = LoggerFactory.getLogger(DClusterSourceOffsetCommitter.class);
    private ClusterSource clusterSource;

    @Override
    Stage<Source.Context> createStage() {
        Stage<Source.Context> result = super.createStage();
        LOG.info("Created source of type: {}", (Object)this.source);
        if (this.source instanceof ClusterSource) {
            this.clusterSource = (ClusterSource)this.source;
        } else if (this.source == null) {
            throw new NullPointerException("Source cannot be null");
        }
        return result;
    }

    @Override
    public String getName() {
        if (this.initializeClusterSource()) {
            return this.clusterSource.getName();
        }
        return null;
    }

    @Override
    public boolean isInBatchMode() {
        if (this.initializeClusterSource()) {
            return this.clusterSource.isInBatchMode();
        }
        return false;
    }

    @Override
    public Object put(List<Map.Entry> batch) throws InterruptedException {
        if (this.initializeClusterSource()) {
            return this.clusterSource.put(batch);
        }
        return null;
    }

    @Override
    public void completeBatch() throws InterruptedException {
        this.clusterSource.completeBatch();
    }

    private boolean initializeClusterSource() {
        long start = System.currentTimeMillis();
        try {
            while (this.clusterSource == null && System.currentTimeMillis() - start < 60000L) {
                Source source = this.getSource();
                if (source instanceof ClusterSource) {
                    this.clusterSource = (ClusterSource)source;
                    return true;
                }
                if (source != null) {
                    LOG.info(Utils.format("The instance '{}' will not call this method as it does not implement '{}'", source.getClass().getName(), ClusterSource.class.getName()));
                    return false;
                }
                Thread.sleep(1L);
            }
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        if (this.clusterSource == null) {
            throw new RuntimeException("Could not obtain cluster source");
        }
        return true;
    }

    @Override
    public long getRecordsProduced() {
        if (this.initializeClusterSource()) {
            return this.clusterSource.getRecordsProduced();
        }
        return -1L;
    }

    @Override
    public boolean inErrorState() {
        if (this.initializeClusterSource()) {
            return this.clusterSource.inErrorState();
        }
        return false;
    }

    @Override
    public Map<String, String> getConfigsToShip() {
        if (this.initializeClusterSource()) {
            return this.clusterSource.getConfigsToShip();
        }
        return null;
    }

    @Override
    public void postDestroy() {
        if (this.initializeClusterSource()) {
            this.clusterSource.postDestroy();
        }
    }

    @Override
    public int getParallelism() throws IOException, StageException {
        return this.initializeClusterSource() ? this.clusterSource.getParallelism() : -1;
    }
}

