/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.sink;

import java.util.List;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.Preconditions;

@Internal
abstract class AbstractSinkWriterOperator<InputT, CommT>
extends AbstractStreamOperator<CommT>
implements OneInputStreamOperator<InputT, CommT>,
BoundedOneInput {
    private static final long serialVersionUID = 1L;
    private final Context<InputT> context;
    private Long currentWatermark;
    protected SinkWriter<InputT, CommT, ?> sinkWriter;

    AbstractSinkWriterOperator(ProcessingTimeService processingTimeService) {
        this.processingTimeService = (ProcessingTimeService)Preconditions.checkNotNull((Object)processingTimeService);
        this.context = new Context();
    }

    @Override
    public void open() throws Exception {
        super.open();
        this.currentWatermark = Long.MIN_VALUE;
        this.sinkWriter = this.createWriter();
    }

    @Override
    public void processElement(StreamRecord<InputT> element) throws Exception {
        ((Context)this.context).element = element;
        this.sinkWriter.write(element.getValue(), this.context);
    }

    @Override
    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
        super.prepareSnapshotPreBarrier(checkpointId);
        this.sendCommittables(this.sinkWriter.prepareCommit(false));
    }

    @Override
    public void processWatermark(Watermark mark) throws Exception {
        super.processWatermark(mark);
        this.currentWatermark = mark.getTimestamp();
    }

    @Override
    public void endInput() throws Exception {
        this.sendCommittables(this.sinkWriter.prepareCommit(true));
    }

    @Override
    public void close() throws Exception {
        super.close();
        this.sinkWriter.close();
    }

    protected Sink.InitContext createInitContext() {
        return new InitContextImpl(this.getRuntimeContext().getIndexOfThisSubtask(), this.processingTimeService, this.getMetricGroup());
    }

    abstract SinkWriter<InputT, CommT, ?> createWriter() throws Exception;

    private void sendCommittables(List<CommT> committables) {
        for (CommT committable : committables) {
            this.output.collect(new StreamRecord<CommT>(committable));
        }
    }

    private static class ProcessingTimerServiceImpl
    implements Sink.ProcessingTimeService {
        private final ProcessingTimeService processingTimeService;

        public ProcessingTimerServiceImpl(ProcessingTimeService processingTimeService) {
            this.processingTimeService = (ProcessingTimeService)Preconditions.checkNotNull((Object)processingTimeService);
        }

        public long getCurrentProcessingTime() {
            return this.processingTimeService.getCurrentProcessingTime();
        }

        public void registerProcessingTimer(long time, Sink.ProcessingTimeService.ProcessingTimeCallback processingTimerCallback) {
            Preconditions.checkNotNull((Object)processingTimerCallback);
            this.processingTimeService.registerTimer(time, arg_0 -> ((Sink.ProcessingTimeService.ProcessingTimeCallback)processingTimerCallback).onProcessingTime(arg_0));
        }
    }

    private static class InitContextImpl
    implements Sink.InitContext {
        private final int subtaskIdx;
        private final ProcessingTimeService processingTimeService;
        private final MetricGroup metricGroup;

        public InitContextImpl(int subtaskIdx, ProcessingTimeService processingTimeService, MetricGroup metricGroup) {
            this.subtaskIdx = subtaskIdx;
            this.processingTimeService = (ProcessingTimeService)Preconditions.checkNotNull((Object)processingTimeService);
            this.metricGroup = (MetricGroup)Preconditions.checkNotNull((Object)metricGroup);
        }

        public Sink.ProcessingTimeService getProcessingTimeService() {
            return new ProcessingTimerServiceImpl(this.processingTimeService);
        }

        public int getSubtaskId() {
            return this.subtaskIdx;
        }

        public MetricGroup metricGroup() {
            return this.metricGroup;
        }
    }

    private class Context<IN>
    implements SinkWriter.Context {
        private StreamRecord<IN> element;

        private Context() {
        }

        public long currentWatermark() {
            return AbstractSinkWriterOperator.this.currentWatermark;
        }

        public Long timestamp() {
            if (this.element.hasTimestamp()) {
                return this.element.getTimestamp();
            }
            return null;
        }
    }
}

