package org.apache.flink.streaming.api.operators;

import java.util.Objects;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.util.functions.StreamingFunctionUtils;

@PublicEvolving
/* loaded from: input_file:org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.class */
public abstract class AbstractUdfStreamOperator<OUT, F extends Function> extends AbstractStreamOperator<OUT> implements OutputTypeConfigurable<OUT> {
    private static final long serialVersionUID = 1;
    protected final F userFunction;
    private transient boolean functionsClosed = false;

    public AbstractUdfStreamOperator(F f) {
        this.userFunction = (F) Objects.requireNonNull(f);
        checkUdfCheckpointingPreconditions();
    }

    public F getUserFunction() {
        return this.userFunction;
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void setup(StreamTask<?, ?> streamTask, StreamConfig streamConfig, Output<StreamRecord<OUT>> output) {
        super.setup(streamTask, streamConfig, output);
        FunctionUtils.setFunctionRuntimeContext(this.userFunction, getRuntimeContext());
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator
    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        super.snapshotState(stateSnapshotContext);
        StreamingFunctionUtils.snapshotFunctionState(stateSnapshotContext, getOperatorStateBackend(), this.userFunction);
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator
    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        super.initializeState(stateInitializationContext);
        StreamingFunctionUtils.restoreFunctionState(stateInitializationContext, this.userFunction);
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void open() throws Exception {
        super.open();
        FunctionUtils.openFunction(this.userFunction, new Configuration());
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void close() throws Exception {
        super.close();
        this.functionsClosed = true;
        FunctionUtils.closeFunction(this.userFunction);
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void dispose() throws Exception {
        super.dispose();
        if (this.functionsClosed) {
            return;
        }
        this.functionsClosed = true;
        FunctionUtils.closeFunction(this.userFunction);
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator
    public void notifyCheckpointComplete(long j) throws Exception {
        super.notifyCheckpointComplete(j);
        if (this.userFunction instanceof CheckpointListener) {
            this.userFunction.notifyCheckpointComplete(j);
        }
    }

    @Override // org.apache.flink.streaming.api.operators.OutputTypeConfigurable
    public void setOutputType(TypeInformation<OUT> typeInformation, ExecutionConfig executionConfig) {
        StreamingFunctionUtils.setOutputType(this.userFunction, typeInformation, executionConfig);
    }

    public Configuration getUserFunctionParameters() {
        return new Configuration();
    }

    private void checkUdfCheckpointingPreconditions() {
        if ((this.userFunction instanceof CheckpointedFunction) && (this.userFunction instanceof ListCheckpointed)) {
            throw new IllegalStateException("User functions are not allowed to implement CheckpointedFunction AND ListCheckpointed.");
        }
    }
}
