package com.hazelcast.jet.impl.connector;

import com.hazelcast.function.FunctionEx;
import com.hazelcast.jet.RestartableException;
import com.hazelcast.jet.config.ProcessingGuarantee;
import com.hazelcast.jet.core.Inbox;
import com.hazelcast.jet.core.Outbox;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.Watermark;
import com.hazelcast.jet.impl.execution.init.JetInitDataSerializerHook;
import com.hazelcast.jet.impl.processor.TwoPhaseSnapshotCommitUtility;
import com.hazelcast.jet.impl.processor.UnboundedTransactionsProcessorUtility;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.LoggingUtil;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.jet.pipeline.FileSinkBuilder;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
import com.hazelcast.security.impl.function.SecuredFunctions;
import com.hazelcast.security.permission.ActionConstants;
import com.hazelcast.security.permission.ConnectorPermission;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.BufferedOutputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Serializable;
import java.io.Writer;
import java.lang.invoke.SerializedLambda;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.nio.file.attribute.FileAttribute;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.function.LongSupplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

/* loaded from: input_file:com/hazelcast/jet/impl/connector/WriteFileP.class */
public final class WriteFileP<T> implements Processor {
    private static final LongSupplier SYSTEM_CLOCK;
    private static final Pattern FILE_INDEX_WITH_SEQ;
    private final Path directory;
    private final FunctionEx<? super T, ? extends String> toStringFn;
    private final Charset charset;
    private final DateTimeFormatter dateFormatter;
    private final long maxFileSize;
    private final boolean exactlyOnce;
    private final LongSupplier clock;
    private UnboundedTransactionsProcessorUtility<FileId, WriteFileP<T>.FileResource> utility;
    private Processor.Context context;
    private int fileSequence;
    private SizeTrackingStream sizeTrackingStream;
    private String lastFileDate;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:com/hazelcast/jet/impl/connector/WriteFileP$FileId.class */
    public static final class FileId implements TwoPhaseSnapshotCommitUtility.TransactionId, IdentifiedDataSerializable {
        private String fileName;
        private int index;

        public FileId() {
        }

        private FileId(String str, int i) {
            this.fileName = str;
            this.index = i;
        }

        @Override // com.hazelcast.jet.impl.processor.TwoPhaseSnapshotCommitUtility.TransactionId
        public int index() {
            return this.index;
        }

        public String toString() {
            return "File{" + this.fileName + '}';
        }

        @Override // com.hazelcast.nio.serialization.IdentifiedDataSerializable
        public int getFactoryId() {
            return JetInitDataSerializerHook.FACTORY_ID;
        }

        @Override // com.hazelcast.nio.serialization.IdentifiedDataSerializable
        public int getClassId() {
            return 42;
        }

        @Override // com.hazelcast.nio.serialization.DataSerializable
        public void readData(ObjectDataInput objectDataInput) throws IOException {
            this.fileName = objectDataInput.readUTF();
            this.index = objectDataInput.readInt();
        }

        @Override // com.hazelcast.nio.serialization.DataSerializable
        public void writeData(ObjectDataOutput objectDataOutput) throws IOException {
            objectDataOutput.writeUTF(this.fileName);
            objectDataOutput.writeInt(this.index);
        }
    }

    /* loaded from: input_file:com/hazelcast/jet/impl/connector/WriteFileP$FileResource.class */
    private abstract class FileResource implements TwoPhaseSnapshotCommitUtility.TransactionalResource<FileId> {
        final FileId fileId;
        final Path targetFile;
        Writer writer;

        @SuppressFBWarnings(value = {"NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"}, justification = "targetFile always has fileName")
        FileResource(FileId fileId) {
            this.fileId = fileId;
            this.targetFile = WriteFileP.this.directory.resolve(fileId.fileName);
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // com.hazelcast.jet.impl.processor.TwoPhaseSnapshotCommitUtility.TransactionalResource
        public FileId id() {
            return this.fileId;
        }

        @Override // com.hazelcast.jet.impl.processor.TwoPhaseSnapshotCommitUtility.TransactionalResource
        public void endAndPrepare() throws IOException {
            closeFile();
        }

        @Override // com.hazelcast.jet.impl.processor.TwoPhaseSnapshotCommitUtility.TransactionalResource
        public void release() throws IOException {
            closeFile();
        }

        private void closeFile() throws IOException {
            if (this.writer != null) {
                LoggingUtil.logFine(WriteFileP.this.context.logger(), "closing %s", id().fileName);
                this.writer.close();
                this.writer = null;
            }
        }

        public Writer writer() {
            return this.writer;
        }
    }

    /* loaded from: input_file:com/hazelcast/jet/impl/connector/WriteFileP$FileWithTransaction.class */
    private final class FileWithTransaction extends WriteFileP<T>.FileResource {
        private final Path tempFile;

        FileWithTransaction(@Nonnull FileId fileId) {
            super(fileId);
            this.tempFile = WriteFileP.this.directory.resolve(fileId.fileName + FileSinkBuilder.TEMP_FILE_SUFFIX);
        }

        @Override // com.hazelcast.jet.impl.processor.TwoPhaseSnapshotCommitUtility.TransactionalResource
        public void begin() {
            this.writer = WriteFileP.this.createWriter(this.tempFile);
        }

        @Override // com.hazelcast.jet.impl.processor.TwoPhaseSnapshotCommitUtility.TransactionalResource
        public void commit() throws IOException {
            if (this.writer != null) {
                this.writer.close();
                this.writer = null;
            }
            Files.move(this.tempFile, this.targetFile, StandardCopyOption.ATOMIC_MOVE);
        }

        @Override // com.hazelcast.jet.impl.processor.TwoPhaseSnapshotCommitUtility.TransactionalResource
        public void rollback() throws Exception {
            if (this.writer != null) {
                this.writer.close();
                this.writer = null;
            }
            Files.delete(this.tempFile);
        }
    }

    /* loaded from: input_file:com/hazelcast/jet/impl/connector/WriteFileP$FileWithoutTransaction.class */
    private final class FileWithoutTransaction extends WriteFileP<T>.FileResource {
        FileWithoutTransaction(@Nonnull FileId fileId) {
            super(fileId);
        }

        @Override // com.hazelcast.jet.impl.connector.WriteFileP.FileResource
        public Writer writer() {
            if (this.writer == null) {
                this.writer = WriteFileP.this.createWriter(this.targetFile);
            }
            return super.writer();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hazelcast/jet/impl/connector/WriteFileP$SizeTrackingStream.class */
    public static final class SizeTrackingStream extends OutputStream {
        private final OutputStream target;
        private long size;

        private SizeTrackingStream(OutputStream outputStream) {
            this.target = outputStream;
        }

        @Override // java.io.OutputStream
        public void write(int i) throws IOException {
            this.size++;
            this.target.write(i);
        }

        @Override // java.io.OutputStream
        public void write(@Nonnull byte[] bArr, int i, int i2) throws IOException {
            this.size += i2;
            this.target.write(bArr, i, i2);
        }

        @Override // java.io.OutputStream, java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            this.target.close();
        }

        @Override // java.io.OutputStream, java.io.Flushable
        public void flush() throws IOException {
            this.target.flush();
        }
    }

    public WriteFileP(@Nonnull String str, @Nonnull FunctionEx<? super T, ? extends String> functionEx, @Nonnull String str2, @Nullable String str3, long j, boolean z, @Nonnull LongSupplier longSupplier) {
        this.directory = Paths.get(str, new String[0]);
        this.toStringFn = functionEx;
        this.charset = Charset.forName(str2);
        this.dateFormatter = str3 != null ? DateTimeFormatter.ofPattern(str3).withZone(ZoneId.from(ZoneOffset.UTC)) : null;
        this.maxFileSize = j;
        this.exactlyOnce = z;
        this.clock = longSupplier;
    }

    @Override // com.hazelcast.jet.core.Processor
    public void init(@Nonnull Outbox outbox, @Nonnull Processor.Context context) throws IOException {
        this.context = context;
        Files.createDirectories(this.directory, new FileAttribute[0]);
        this.utility = new UnboundedTransactionsProcessorUtility<>(outbox, context, (context.processingGuarantee() != ProcessingGuarantee.EXACTLY_ONCE || this.exactlyOnce) ? context.processingGuarantee() : ProcessingGuarantee.AT_LEAST_ONCE, this::newFileName, this::newFileResource, this::recoverAndCommit, this::abortUnfinishedTransactions);
    }

    @Override // com.hazelcast.jet.core.Processor
    public boolean isCooperative() {
        return false;
    }

    @Override // com.hazelcast.jet.core.Processor
    public boolean tryProcess() {
        return this.utility.tryProcess();
    }

    @Override // com.hazelcast.jet.core.Processor
    public void process(int i, @Nonnull Inbox inbox) {
        if (this.dateFormatter != null && !currentTimeFormatted().equals(this.lastFileDate)) {
            this.fileSequence = 0;
            this.utility.finishActiveTransaction();
        }
        Writer writer = this.utility.activeTransaction().writer();
        while (true) {
            try {
                Object poll = inbox.poll();
                if (poll == null) {
                    break;
                }
                writer.write(this.toStringFn.apply(poll));
                writer.write(System.lineSeparator());
                if (this.maxFileSize != Long.MAX_VALUE && this.sizeTrackingStream.size >= this.maxFileSize) {
                    this.utility.finishActiveTransaction();
                    writer = this.utility.activeTransaction().writer();
                }
            } catch (IOException e) {
                throw new RestartableException(e);
            }
        }
        writer.flush();
        if (this.maxFileSize != Long.MAX_VALUE && this.sizeTrackingStream.size >= this.maxFileSize) {
            this.utility.finishActiveTransaction();
        }
    }

    @Override // com.hazelcast.jet.core.Processor
    public boolean tryProcessWatermark(@Nonnull Watermark watermark) {
        return true;
    }

    @Override // com.hazelcast.jet.core.Processor
    public boolean complete() {
        this.utility.afterCompleted();
        return true;
    }

    @Override // com.hazelcast.jet.core.Processor
    public void close() {
        if (this.utility != null) {
            this.utility.close();
        }
    }

    @Override // com.hazelcast.jet.core.Processor
    public boolean snapshotCommitPrepare() {
        return this.utility.snapshotCommitPrepare();
    }

    @Override // com.hazelcast.jet.core.Processor
    public boolean snapshotCommitFinish(boolean z) {
        return this.utility.snapshotCommitFinish(z);
    }

    @Override // com.hazelcast.jet.core.Processor
    public void restoreFromSnapshot(@Nonnull Inbox inbox) {
        this.utility.restoreFromSnapshot(inbox);
    }

    private FileId newFileName() {
        StringBuilder sb = new StringBuilder();
        if (this.dateFormatter != null) {
            this.lastFileDate = currentTimeFormatted();
            sb.append(this.lastFileDate);
            sb.append('-');
        }
        sb.append(this.context.globalProcessorIndex());
        String sb2 = sb.toString();
        if (this.utility.externalGuarantee() == ProcessingGuarantee.EXACTLY_ONCE || this.maxFileSize != Long.MAX_VALUE) {
            int length = sb.length();
            while (true) {
                StringBuilder append = sb.append('-');
                int i = this.fileSequence;
                this.fileSequence = i + 1;
                append.append(i);
                sb2 = sb.toString();
                sb.setLength(length);
                if (!Files.exists(this.directory.resolve(sb2), new LinkOption[0]) && !Files.exists(this.directory.resolve(sb2 + FileSinkBuilder.TEMP_FILE_SUFFIX), new LinkOption[0])) {
                    break;
                }
            }
        }
        return new FileId(sb2, this.context.globalProcessorIndex());
    }

    private WriteFileP<T>.FileResource newFileResource(FileId fileId) {
        return this.utility.externalGuarantee() == ProcessingGuarantee.EXACTLY_ONCE ? new FileWithTransaction(fileId) : new FileWithoutTransaction(fileId);
    }

    private void recoverAndCommit(FileId fileId) throws IOException {
        Path resolve = this.directory.resolve(fileId.fileName + FileSinkBuilder.TEMP_FILE_SUFFIX);
        Path resolve2 = this.directory.resolve(fileId.fileName);
        if (Files.exists(resolve, new LinkOption[0])) {
            Files.move(resolve, resolve2, StandardCopyOption.ATOMIC_MOVE);
        } else {
            if (Files.exists(resolve2, new LinkOption[0])) {
                return;
            }
            this.context.logger().warning("Neither temporary nor final file from the previous execution exists, data loss might occur: " + resolve);
        }
    }

    private void abortUnfinishedTransactions() {
        try {
            Stream<Path> list = Files.list(this.directory);
            Throwable th = null;
            try {
                list.filter(path -> {
                    return path.getFileName().toString().endsWith(FileSinkBuilder.TEMP_FILE_SUFFIX);
                }).filter(path2 -> {
                    if (!$assertionsDisabled && !this.utility.usesTransactionLifecycle()) {
                        throw new AssertionError();
                    }
                    Matcher matcher = FILE_INDEX_WITH_SEQ.matcher(path2.getFileName().toString());
                    if (!matcher.find() || matcher.groupCount() < 1) {
                        this.context.logger().warning("file with unknown name structure found in the directory: " + path2);
                        return false;
                    }
                    try {
                        return Integer.parseInt(matcher.group(1)) % this.context.totalParallelism() == this.context.globalProcessorIndex();
                    } catch (NumberFormatException e) {
                        this.context.logger().warning("file with unknown name structure found in the directory: " + path2, e);
                        return false;
                    }
                }).forEach(path3 -> {
                    Util.uncheckRun(() -> {
                        LoggingUtil.logFine(this.context.logger(), "deleting %s", path3);
                        Files.delete(path3);
                    });
                });
                if (list != null) {
                    if (0 != 0) {
                        try {
                            list.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        list.close();
                    }
                }
            } finally {
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Writer createWriter(Path path) {
        try {
            LoggingUtil.logFine(this.context.logger(), "creating %s", path);
            this.sizeTrackingStream = new SizeTrackingStream(new BufferedOutputStream(new FileOutputStream(path.toFile(), true)));
            return new OutputStreamWriter(this.sizeTrackingStream, this.charset);
        } catch (IOException e) {
            throw ExceptionUtil.sneakyThrow(e);
        }
    }

    private String currentTimeFormatted() {
        return this.dateFormatter.format(Instant.ofEpochMilli(this.clock.getAsLong()));
    }

    public static <T> ProcessorMetaSupplier metaSupplier(@Nonnull String str, @Nonnull FunctionEx<? super T, ? extends String> functionEx, @Nonnull String str2, @Nullable String str3, long j, boolean z) {
        return metaSupplier(str, functionEx, str2, str3, j, z, SYSTEM_CLOCK);
    }

    static <T> ProcessorMetaSupplier metaSupplier(@Nonnull String str, @Nonnull FunctionEx<? super T, ? extends String> functionEx, @Nonnull String str2, @Nullable String str3, long j, boolean z, @Nonnull LongSupplier longSupplier) {
        return ProcessorMetaSupplier.preferLocalParallelismOne(ConnectorPermission.file(str, ActionConstants.ACTION_WRITE), SecuredFunctions.writeFileProcessorFn(str, functionEx, str2, str3, j, z, longSupplier));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1399840032:
                if (implMethodName.equals("lambda$null$4aa7b2$1")) {
                    z = false;
                    break;
                }
                break;
            case 664818922:
                if (implMethodName.equals("recoverAndCommit")) {
                    z = 3;
                    break;
                }
                break;
            case 1100115818:
                if (implMethodName.equals("newFileResource")) {
                    z = 2;
                    break;
                }
                break;
            case 1555800524:
                if (implMethodName.equals("currentTimeMillis")) {
                    z = true;
                    break;
                }
                break;
            case 1987876432:
                if (implMethodName.equals("abortUnfinishedTransactions")) {
                    z = 4;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/jet/function/RunnableEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("runEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()V") && serializedLambda.getImplClass().equals("com/hazelcast/jet/impl/connector/WriteFileP") && serializedLambda.getImplMethodSignature().equals("(Ljava/nio/file/Path;)V")) {
                    WriteFileP writeFileP = (WriteFileP) serializedLambda.getCapturedArg(0);
                    Path path = (Path) serializedLambda.getCapturedArg(1);
                    return () -> {
                        LoggingUtil.logFine(this.context.logger(), "deleting %s", path);
                        Files.delete(path);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("java/util/function/LongSupplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("getAsLong") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()J") && serializedLambda.getImplClass().equals("java/lang/System") && serializedLambda.getImplMethodSignature().equals("()J")) {
                    return System::currentTimeMillis;
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/impl/connector/WriteFileP") && serializedLambda.getImplMethodSignature().equals("(Lcom/hazelcast/jet/impl/connector/WriteFileP$FileId;)Lcom/hazelcast/jet/impl/connector/WriteFileP$FileResource;")) {
                    WriteFileP writeFileP2 = (WriteFileP) serializedLambda.getCapturedArg(0);
                    return writeFileP2::newFileResource;
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/ConsumerEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("acceptEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("com/hazelcast/jet/impl/connector/WriteFileP") && serializedLambda.getImplMethodSignature().equals("(Lcom/hazelcast/jet/impl/connector/WriteFileP$FileId;)V")) {
                    WriteFileP writeFileP3 = (WriteFileP) serializedLambda.getCapturedArg(0);
                    return writeFileP3::recoverAndCommit;
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/jet/function/RunnableEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("runEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()V") && serializedLambda.getImplClass().equals("com/hazelcast/jet/impl/connector/WriteFileP") && serializedLambda.getImplMethodSignature().equals("()V")) {
                    WriteFileP writeFileP4 = (WriteFileP) serializedLambda.getCapturedArg(0);
                    return writeFileP4::abortUnfinishedTransactions;
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }

    static {
        $assertionsDisabled = !WriteFileP.class.desiredAssertionStatus();
        SYSTEM_CLOCK = (LongSupplier) ((Serializable) System::currentTimeMillis);
        FILE_INDEX_WITH_SEQ = Pattern.compile("(\\d+)-\\d+(\\.tmp)?$");
    }
}
