/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.functions.sink.filesystem;

import java.io.IOException;
import java.io.Serializable;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketFactory;
import org.apache.flink.streaming.api.functions.sink.filesystem.Buckets;
import org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.RowWisePartWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.Preconditions;

@PublicEvolving
public class StreamingFileSink<IN>
extends RichSinkFunction<IN>
implements CheckpointedFunction,
CheckpointListener,
ProcessingTimeCallback {
    private static final long serialVersionUID = 1L;
    private static final ListStateDescriptor<byte[]> BUCKET_STATE_DESC = new ListStateDescriptor("bucket-states", (TypeSerializer)BytePrimitiveArraySerializer.INSTANCE);
    private static final ListStateDescriptor<Long> MAX_PART_COUNTER_STATE_DESC = new ListStateDescriptor("max-part-counter", (TypeSerializer)LongSerializer.INSTANCE);
    private final long bucketCheckInterval;
    private final BucketsBuilder<IN, ?, ? extends BucketsBuilder<IN, ?, ?>> bucketsBuilder;
    private transient Buckets<IN, ?> buckets;
    private transient ProcessingTimeService processingTimeService;
    private transient ListState<byte[]> bucketStates;
    private transient ListState<Long> maxPartCountersState;

    protected StreamingFileSink(RowFormatBuilder<IN, ?, ? extends BucketsBuilder<IN, ?, ?>> bucketsBuilder, long bucketCheckInterval) {
        Preconditions.checkArgument((bucketCheckInterval > 0L ? 1 : 0) != 0);
        this.bucketsBuilder = (BucketsBuilder)Preconditions.checkNotNull(bucketsBuilder);
        this.bucketCheckInterval = bucketCheckInterval;
    }

    protected StreamingFileSink(BulkFormatBuilder<IN, ?, ? extends BucketsBuilder<IN, ?, ?>> bucketsBuilder, long bucketCheckInterval) {
        Preconditions.checkArgument((bucketCheckInterval > 0L ? 1 : 0) != 0);
        this.bucketsBuilder = (BucketsBuilder)Preconditions.checkNotNull(bucketsBuilder);
        this.bucketCheckInterval = bucketCheckInterval;
    }

    public static <IN> DefaultRowFormatBuilder<IN> forRowFormat(Path basePath, Encoder<IN> encoder) {
        return new DefaultRowFormatBuilder(basePath, encoder, new DateTimeBucketAssigner());
    }

    public static <IN> DefaultBulkFormatBuilder<IN> forBulkFormat(Path basePath, BulkWriter.Factory<IN> writerFactory) {
        return new DefaultBulkFormatBuilder(basePath, writerFactory, new DateTimeBucketAssigner());
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        int subtaskIndex = this.getRuntimeContext().getIndexOfThisSubtask();
        this.buckets = this.bucketsBuilder.createBuckets(subtaskIndex);
        OperatorStateStore stateStore = context.getOperatorStateStore();
        this.bucketStates = stateStore.getListState(BUCKET_STATE_DESC);
        this.maxPartCountersState = stateStore.getUnionListState(MAX_PART_COUNTER_STATE_DESC);
        if (context.isRestored()) {
            this.buckets.initializeState(this.bucketStates, this.maxPartCountersState);
        }
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        this.buckets.commitUpToCheckpoint(checkpointId);
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        Preconditions.checkState((this.bucketStates != null && this.maxPartCountersState != null ? 1 : 0) != 0, (Object)"sink has not been initialized");
        this.buckets.snapshotState(context.getCheckpointId(), this.bucketStates, this.maxPartCountersState);
    }

    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        this.processingTimeService = ((StreamingRuntimeContext)this.getRuntimeContext()).getProcessingTimeService();
        long currentProcessingTime = this.processingTimeService.getCurrentProcessingTime();
        this.processingTimeService.registerTimer(currentProcessingTime + this.bucketCheckInterval, this);
    }

    @Override
    public void onProcessingTime(long timestamp) throws Exception {
        long currentTime = this.processingTimeService.getCurrentProcessingTime();
        this.buckets.onProcessingTime(currentTime);
        this.processingTimeService.registerTimer(currentTime + this.bucketCheckInterval, this);
    }

    @Override
    public void invoke(IN value, SinkFunction.Context context) throws Exception {
        this.buckets.onElement(value, context);
    }

    public void close() throws Exception {
        if (this.buckets != null) {
            this.buckets.close();
        }
    }

    public static final class DefaultBulkFormatBuilder<IN>
    extends BulkFormatBuilder<IN, String, DefaultBulkFormatBuilder<IN>> {
        private static final long serialVersionUID = 7493169281036370228L;

        private DefaultBulkFormatBuilder(Path basePath, BulkWriter.Factory<IN> writerFactory, BucketAssigner<IN, String> assigner) {
            super(basePath, writerFactory, assigner);
        }
    }

    @PublicEvolving
    public static class BulkFormatBuilder<IN, BucketID, T extends BulkFormatBuilder<IN, BucketID, T>>
    extends BucketsBuilder<IN, BucketID, T> {
        private static final long serialVersionUID = 1L;
        private long bucketCheckInterval;
        private final Path basePath;
        private BulkWriter.Factory<IN> writerFactory;
        private BucketAssigner<IN, BucketID> bucketAssigner;
        private CheckpointRollingPolicy<IN, BucketID> rollingPolicy;
        private BucketFactory<IN, BucketID> bucketFactory;
        private OutputFileConfig outputFileConfig;

        protected BulkFormatBuilder(Path basePath, BulkWriter.Factory<IN> writerFactory, BucketAssigner<IN, BucketID> assigner) {
            this(basePath, writerFactory, assigner, OnCheckpointRollingPolicy.build(), 60000L, new DefaultBucketFactoryImpl(), OutputFileConfig.builder().build());
        }

        protected BulkFormatBuilder(Path basePath, BulkWriter.Factory<IN> writerFactory, BucketAssigner<IN, BucketID> assigner, CheckpointRollingPolicy<IN, BucketID> policy, long bucketCheckInterval, BucketFactory<IN, BucketID> bucketFactory, OutputFileConfig outputFileConfig) {
            this.basePath = (Path)Preconditions.checkNotNull((Object)basePath);
            this.writerFactory = writerFactory;
            this.bucketAssigner = (BucketAssigner)Preconditions.checkNotNull(assigner);
            this.rollingPolicy = (CheckpointRollingPolicy)Preconditions.checkNotNull(policy);
            this.bucketCheckInterval = bucketCheckInterval;
            this.bucketFactory = (BucketFactory)Preconditions.checkNotNull(bucketFactory);
            this.outputFileConfig = (OutputFileConfig)Preconditions.checkNotNull((Object)outputFileConfig);
        }

        public long getBucketCheckInterval() {
            return this.bucketCheckInterval;
        }

        public T withBucketCheckInterval(long interval) {
            this.bucketCheckInterval = interval;
            return (T)((BulkFormatBuilder)this.self());
        }

        public T withBucketAssigner(BucketAssigner<IN, BucketID> assigner) {
            this.bucketAssigner = (BucketAssigner)Preconditions.checkNotNull(assigner);
            return (T)((BulkFormatBuilder)this.self());
        }

        public T withRollingPolicy(CheckpointRollingPolicy<IN, BucketID> rollingPolicy) {
            this.rollingPolicy = (CheckpointRollingPolicy)Preconditions.checkNotNull(rollingPolicy);
            return (T)((BulkFormatBuilder)this.self());
        }

        @VisibleForTesting
        T withBucketFactory(BucketFactory<IN, BucketID> factory) {
            this.bucketFactory = (BucketFactory)Preconditions.checkNotNull(factory);
            return (T)((BulkFormatBuilder)this.self());
        }

        public T withOutputFileConfig(OutputFileConfig outputFileConfig) {
            this.outputFileConfig = outputFileConfig;
            return (T)((BulkFormatBuilder)this.self());
        }

        public <ID> BulkFormatBuilder<IN, ID, ? extends BulkFormatBuilder<IN, ID, ?>> withNewBucketAssigner(BucketAssigner<IN, ID> assigner) {
            Preconditions.checkState((this.bucketFactory.getClass() == DefaultBucketFactoryImpl.class ? 1 : 0) != 0, (Object)"newBuilderWithBucketAssigner() cannot be called after specifying a customized bucket factory");
            return new BulkFormatBuilder<IN, BucketID, T>(this.basePath, this.writerFactory, (BucketAssigner)Preconditions.checkNotNull(assigner), this.rollingPolicy, this.bucketCheckInterval, new DefaultBucketFactoryImpl(), this.outputFileConfig);
        }

        public StreamingFileSink<IN> build() {
            return new StreamingFileSink(this, this.bucketCheckInterval);
        }

        @Override
        Buckets<IN, BucketID> createBuckets(int subtaskIndex) throws IOException {
            return new Buckets<IN, BucketID>(this.basePath, this.bucketAssigner, this.bucketFactory, new BulkPartWriter.Factory(this.writerFactory), this.rollingPolicy, subtaskIndex, this.outputFileConfig);
        }
    }

    public static final class DefaultRowFormatBuilder<IN>
    extends RowFormatBuilder<IN, String, DefaultRowFormatBuilder<IN>> {
        private static final long serialVersionUID = -8503344257202146718L;

        private DefaultRowFormatBuilder(Path basePath, Encoder<IN> encoder, BucketAssigner<IN, String> bucketAssigner) {
            super(basePath, encoder, bucketAssigner);
        }
    }

    @PublicEvolving
    public static class RowFormatBuilder<IN, BucketID, T extends RowFormatBuilder<IN, BucketID, T>>
    extends BucketsBuilder<IN, BucketID, T> {
        private static final long serialVersionUID = 1L;
        private long bucketCheckInterval;
        private final Path basePath;
        private Encoder<IN> encoder;
        private BucketAssigner<IN, BucketID> bucketAssigner;
        private RollingPolicy<IN, BucketID> rollingPolicy;
        private BucketFactory<IN, BucketID> bucketFactory;
        private OutputFileConfig outputFileConfig;

        protected RowFormatBuilder(Path basePath, Encoder<IN> encoder, BucketAssigner<IN, BucketID> bucketAssigner) {
            this(basePath, encoder, bucketAssigner, DefaultRollingPolicy.builder().build(), 60000L, new DefaultBucketFactoryImpl(), OutputFileConfig.builder().build());
        }

        protected RowFormatBuilder(Path basePath, Encoder<IN> encoder, BucketAssigner<IN, BucketID> assigner, RollingPolicy<IN, BucketID> policy, long bucketCheckInterval, BucketFactory<IN, BucketID> bucketFactory, OutputFileConfig outputFileConfig) {
            this.basePath = (Path)Preconditions.checkNotNull((Object)basePath);
            this.encoder = (Encoder)Preconditions.checkNotNull(encoder);
            this.bucketAssigner = (BucketAssigner)Preconditions.checkNotNull(assigner);
            this.rollingPolicy = (RollingPolicy)Preconditions.checkNotNull(policy);
            this.bucketCheckInterval = bucketCheckInterval;
            this.bucketFactory = (BucketFactory)Preconditions.checkNotNull(bucketFactory);
            this.outputFileConfig = (OutputFileConfig)Preconditions.checkNotNull((Object)outputFileConfig);
        }

        public long getBucketCheckInterval() {
            return this.bucketCheckInterval;
        }

        public T withBucketCheckInterval(long interval) {
            this.bucketCheckInterval = interval;
            return (T)((RowFormatBuilder)this.self());
        }

        public T withBucketAssigner(BucketAssigner<IN, BucketID> assigner) {
            this.bucketAssigner = (BucketAssigner)Preconditions.checkNotNull(assigner);
            return (T)((RowFormatBuilder)this.self());
        }

        public T withRollingPolicy(RollingPolicy<IN, BucketID> policy) {
            this.rollingPolicy = (RollingPolicy)Preconditions.checkNotNull(policy);
            return (T)((RowFormatBuilder)this.self());
        }

        public T withOutputFileConfig(OutputFileConfig outputFileConfig) {
            this.outputFileConfig = outputFileConfig;
            return (T)((RowFormatBuilder)this.self());
        }

        public <ID> RowFormatBuilder<IN, ID, ? extends RowFormatBuilder<IN, ID, ?>> withNewBucketAssignerAndPolicy(BucketAssigner<IN, ID> assigner, RollingPolicy<IN, ID> policy) {
            Preconditions.checkState((this.bucketFactory.getClass() == DefaultBucketFactoryImpl.class ? 1 : 0) != 0, (Object)"newBuilderWithBucketAssignerAndPolicy() cannot be called after specifying a customized bucket factory");
            return new RowFormatBuilder(this.basePath, this.encoder, (BucketAssigner)Preconditions.checkNotNull(assigner), (RollingPolicy)Preconditions.checkNotNull(policy), this.bucketCheckInterval, new DefaultBucketFactoryImpl(), this.outputFileConfig);
        }

        public StreamingFileSink<IN> build() {
            return new StreamingFileSink(this, this.bucketCheckInterval);
        }

        @VisibleForTesting
        T withBucketFactory(BucketFactory<IN, BucketID> factory) {
            this.bucketFactory = (BucketFactory)Preconditions.checkNotNull(factory);
            return (T)((RowFormatBuilder)this.self());
        }

        @Override
        Buckets<IN, BucketID> createBuckets(int subtaskIndex) throws IOException {
            return new Buckets<IN, BucketID>(this.basePath, this.bucketAssigner, this.bucketFactory, new RowWisePartWriter.Factory(this.encoder), this.rollingPolicy, subtaskIndex, this.outputFileConfig);
        }
    }

    private static abstract class BucketsBuilder<IN, BucketID, T extends BucketsBuilder<IN, BucketID, T>>
    implements Serializable {
        private static final long serialVersionUID = 1L;
        protected static final long DEFAULT_BUCKET_CHECK_INTERVAL = 60000L;

        private BucketsBuilder() {
        }

        protected T self() {
            return (T)this;
        }

        abstract Buckets<IN, BucketID> createBuckets(int var1) throws IOException;
    }
}

