package io.confluent.ksql.execution.streams;

import io.confluent.ksql.GenericRow;
import io.confluent.ksql.execution.context.QueryContext;
import io.confluent.ksql.execution.plan.ExecutionKeyFactory;
import io.confluent.ksql.execution.plan.Formats;
import io.confluent.ksql.execution.runtime.RuntimeBuildContext;
import io.confluent.ksql.execution.streams.timestamp.KsqlTimestampExtractor;
import io.confluent.ksql.execution.streams.timestamp.TimestampExtractionPolicy;
import io.confluent.ksql.execution.streams.timestamp.TimestampExtractionPolicyFactory;
import io.confluent.ksql.execution.timestamp.TimestampColumn;
import io.confluent.ksql.logging.processing.ProcessingLogger;
import io.confluent.ksql.logging.processing.RecordProcessingError;
import io.confluent.ksql.schema.ksql.Column;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.PhysicalSchema;
import java.util.Objects;
import java.util.Optional;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.To;

/* loaded from: input_file:io/confluent/ksql/execution/streams/SinkBuilder.class */
public final class SinkBuilder {
    private static final String TIMESTAMP_TRANSFORM_NAME = "ApplyTimestampTransform-";

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/confluent/ksql/execution/streams/SinkBuilder$TransformTimestamp.class */
    public static class TransformTimestamp<K> implements TransformerSupplier<K, GenericRow, KeyValue<K, GenericRow>> {
        private final KsqlTimestampExtractor timestampExtractor;
        private final ProcessingLogger processingLogger;

        TransformTimestamp(KsqlTimestampExtractor ksqlTimestampExtractor, ProcessingLogger processingLogger) {
            this.timestampExtractor = (KsqlTimestampExtractor) Objects.requireNonNull(ksqlTimestampExtractor, "timestampExtractor");
            this.processingLogger = (ProcessingLogger) Objects.requireNonNull(processingLogger, "processingLogger");
        }

        /* renamed from: get, reason: merged with bridge method [inline-methods] */
        public Transformer<K, GenericRow, KeyValue<K, GenericRow>> m3get() {
            return new Transformer<K, GenericRow, KeyValue<K, GenericRow>>() { // from class: io.confluent.ksql.execution.streams.SinkBuilder.TransformTimestamp.1
                private ProcessorContext processorContext;

                public void init(ProcessorContext processorContext) {
                    this.processorContext = (ProcessorContext) Objects.requireNonNull(processorContext, "processorContext");
                }

                public KeyValue<K, GenericRow> transform(K k, GenericRow genericRow) {
                    try {
                        this.processorContext.forward(k, genericRow, To.all().withTimestamp(TransformTimestamp.this.timestampExtractor.extract(k, genericRow)));
                        return null;
                    } catch (Exception e) {
                        TransformTimestamp.this.processingLogger.error(RecordProcessingError.recordProcessingError("Error writing row with extracted timestamp: " + e.getMessage(), e, genericRow));
                        return null;
                    }
                }

                public void close() {
                }

                public /* bridge */ /* synthetic */ Object transform(Object obj, Object obj2) {
                    return transform((AnonymousClass1) obj, (GenericRow) obj2);
                }
            };
        }
    }

    private SinkBuilder() {
    }

    public static <K> void build(LogicalSchema logicalSchema, Formats formats, Optional<TimestampColumn> optional, String str, KStream<K, GenericRow> kStream, ExecutionKeyFactory<K> executionKeyFactory, QueryContext queryContext, RuntimeBuildContext runtimeBuildContext) {
        PhysicalSchema from = PhysicalSchema.from(logicalSchema, formats.getKeyFeatures(), formats.getValueFeatures());
        ((KStream) timestampTransformer(runtimeBuildContext, queryContext, logicalSchema, optional).map(transformTimestamp -> {
            return kStream.transform(transformTimestamp, Named.as(TIMESTAMP_TRANSFORM_NAME + StreamsUtil.buildOpName(queryContext)), new String[0]);
        }).orElse(kStream)).to(str, Produced.with(executionKeyFactory.buildKeySerde(formats.getKeyFormat(), from, queryContext), runtimeBuildContext.buildValueSerde(formats.getValueFormat(), from, queryContext)));
    }

    private static <K> Optional<TransformTimestamp<K>> timestampTransformer(RuntimeBuildContext runtimeBuildContext, QueryContext queryContext, LogicalSchema logicalSchema, Optional<TimestampColumn> optional) {
        if (!optional.isPresent()) {
            return Optional.empty();
        }
        TimestampExtractionPolicy create = TimestampExtractionPolicyFactory.create(runtimeBuildContext.getKsqlConfig(), logicalSchema, optional);
        return optional.map((v0) -> {
            return v0.getColumn();
        }).map(columnName -> {
            return (Column) logicalSchema.findColumn(columnName).orElseThrow(IllegalStateException::new);
        }).map(column -> {
            return create.create(Optional.of(column));
        }).map(ksqlTimestampExtractor -> {
            return new TransformTimestamp(ksqlTimestampExtractor, runtimeBuildContext.getProcessingLogger(queryContext));
        });
    }
}
