/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.ksql.execution.streams;

import io.confluent.ksql.GenericRow;
import io.confluent.ksql.execution.context.QueryContext;
import io.confluent.ksql.execution.plan.ExecutionKeyFactory;
import io.confluent.ksql.execution.plan.Formats;
import io.confluent.ksql.execution.runtime.RuntimeBuildContext;
import io.confluent.ksql.execution.streams.StreamsUtil;
import io.confluent.ksql.execution.streams.timestamp.KsqlTimestampExtractor;
import io.confluent.ksql.execution.streams.timestamp.TimestampExtractionPolicy;
import io.confluent.ksql.execution.streams.timestamp.TimestampExtractionPolicyFactory;
import io.confluent.ksql.execution.timestamp.TimestampColumn;
import io.confluent.ksql.logging.processing.ProcessingLogger;
import io.confluent.ksql.logging.processing.RecordProcessingError;
import io.confluent.ksql.schema.ksql.Column;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.PhysicalSchema;
import io.confluent.ksql.serde.SerdeFeatures;
import java.util.Objects;
import java.util.Optional;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.To;

public final class SinkBuilder {
    private static final String TIMESTAMP_TRANSFORM_NAME = "ApplyTimestampTransform-";

    private SinkBuilder() {
    }

    public static <K> void build(LogicalSchema schema, Formats formats, Optional<TimestampColumn> timestampColumn, String topicName, KStream<K, GenericRow> stream, ExecutionKeyFactory<K> executionKeyFactory, QueryContext queryContext, RuntimeBuildContext buildContext) {
        PhysicalSchema physicalSchema = PhysicalSchema.from((LogicalSchema)schema, (SerdeFeatures)formats.getKeyFeatures(), (SerdeFeatures)formats.getValueFeatures());
        Serde keySerde = executionKeyFactory.buildKeySerde(formats.getKeyFormat(), physicalSchema, queryContext);
        Serde valueSerde = buildContext.buildValueSerde(formats.getValueFormat(), physicalSchema, queryContext);
        Optional<TransformTimestamp<K>> tsTransformer = SinkBuilder.timestampTransformer(buildContext, queryContext, schema, timestampColumn);
        KStream<K, GenericRow> transformed = tsTransformer.map(t -> stream.transform((TransformerSupplier)t, Named.as((String)(TIMESTAMP_TRANSFORM_NAME + StreamsUtil.buildOpName(queryContext))), new String[0])).orElse(stream);
        transformed.to(topicName, Produced.with((Serde)keySerde, (Serde)valueSerde));
    }

    private static <K> Optional<TransformTimestamp<K>> timestampTransformer(RuntimeBuildContext buildContext, QueryContext queryContext, LogicalSchema sourceSchema, Optional<TimestampColumn> timestampColumn) {
        if (!timestampColumn.isPresent()) {
            return Optional.empty();
        }
        TimestampExtractionPolicy timestampPolicy = TimestampExtractionPolicyFactory.create(buildContext.getKsqlConfig(), sourceSchema, timestampColumn);
        return timestampColumn.map(TimestampColumn::getColumn).map(c -> (Column)sourceSchema.findColumn(c).orElseThrow(IllegalStateException::new)).map(c -> timestampPolicy.create(Optional.of(c))).map(te -> new TransformTimestamp((KsqlTimestampExtractor)te, buildContext.getProcessingLogger(queryContext)));
    }

    static class TransformTimestamp<K>
    implements TransformerSupplier<K, GenericRow, KeyValue<K, GenericRow>> {
        private final KsqlTimestampExtractor timestampExtractor;
        private final ProcessingLogger processingLogger;

        TransformTimestamp(KsqlTimestampExtractor timestampExtractor, ProcessingLogger processingLogger) {
            this.timestampExtractor = Objects.requireNonNull(timestampExtractor, "timestampExtractor");
            this.processingLogger = Objects.requireNonNull(processingLogger, "processingLogger");
        }

        public Transformer<K, GenericRow, KeyValue<K, GenericRow>> get() {
            return new Transformer<K, GenericRow, KeyValue<K, GenericRow>>(){
                private ProcessorContext processorContext;

                public void init(ProcessorContext processorContext) {
                    this.processorContext = Objects.requireNonNull(processorContext, "processorContext");
                }

                public KeyValue<K, GenericRow> transform(K key, GenericRow row) {
                    try {
                        this.processorContext.forward(key, (Object)row, To.all().withTimestamp(timestampExtractor.extract(key, row)));
                    }
                    catch (Exception e) {
                        processingLogger.error(RecordProcessingError.recordProcessingError((String)("Error writing row with extracted timestamp: " + e.getMessage()), (Throwable)e, (GenericRow)row));
                    }
                    return null;
                }

                public void close() {
                }
            };
        }
    }
}

