/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.ksql.execution.streams;

import io.confluent.ksql.GenericKey;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.execution.context.QueryContext;
import io.confluent.ksql.execution.plan.ExecutionStepPropertiesV1;
import io.confluent.ksql.execution.plan.SourceStep;
import io.confluent.ksql.execution.runtime.RuntimeBuildContext;
import io.confluent.ksql.execution.streams.ConsumedFactory;
import io.confluent.ksql.execution.streams.RegisterSchemaCallback;
import io.confluent.ksql.execution.streams.StreamsUtil;
import io.confluent.ksql.execution.streams.timestamp.TimestampExtractionPolicy;
import io.confluent.ksql.execution.streams.timestamp.TimestampExtractionPolicyFactory;
import io.confluent.ksql.execution.timestamp.TimestampColumn;
import io.confluent.ksql.schema.ksql.Column;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.PhysicalSchema;
import io.confluent.ksql.schema.ksql.SystemColumns;
import io.confluent.ksql.serde.FormatFactory;
import io.confluent.ksql.serde.FormatInfo;
import io.confluent.ksql.serde.SerdeFeature;
import io.confluent.ksql.serde.SerdeFeatures;
import io.confluent.ksql.serde.StaticTopicSerde;
import io.confluent.ksql.serde.WindowInfo;
import io.confluent.ksql.util.KsqlConfig;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.TimestampExtractor;

final class SourceBuilderUtils {
    private static final String MATERIALIZE_OP_NAME = "Materialized";
    private static final Collection<?> NULL_WINDOWED_KEY_COLUMNS = Collections.unmodifiableList(Arrays.asList(null, null, null));

    private SourceBuilderUtils() {
    }

    static LogicalSchema buildSchema(SourceStep<?> source, boolean windowed) {
        return source.getSourceSchema().withPseudoAndKeyColsInValue(windowed, source.getPseudoColumnVersion());
    }

    static Serde<GenericRow> getValueSerde(RuntimeBuildContext buildContext, SourceStep<?> streamSource, PhysicalSchema physicalSchema, QueryContext queryContext) {
        return buildContext.buildValueSerde(streamSource.getFormats().getValueFormat(), physicalSchema, queryContext);
    }

    static Serde<GenericRow> getValueSerde(RuntimeBuildContext buildContext, SourceStep<?> streamSource, PhysicalSchema physicalSchema) {
        return SourceBuilderUtils.getValueSerde(buildContext, streamSource, physicalSchema, streamSource.getProperties().getQueryContext());
    }

    static Serde<GenericKey> getKeySerde(SourceStep<?> step, PhysicalSchema physicalSchema, RuntimeBuildContext buildContext, QueryContext queryContext) {
        return buildContext.buildKeySerde(step.getFormats().getKeyFormat(), physicalSchema, queryContext);
    }

    static Serde<GenericKey> getKeySerde(SourceStep<?> step, PhysicalSchema physicalSchema, RuntimeBuildContext buildContext) {
        return SourceBuilderUtils.getKeySerde(step, physicalSchema, buildContext, step.getProperties().getQueryContext());
    }

    static Serde<Windowed<GenericKey>> getWindowedKeySerde(SourceStep<?> step, PhysicalSchema physicalSchema, RuntimeBuildContext buildContext, WindowInfo windowInfo, QueryContext queryContext) {
        return buildContext.buildKeySerde(step.getFormats().getKeyFormat(), windowInfo, physicalSchema, queryContext);
    }

    static Serde<Windowed<GenericKey>> getWindowedKeySerde(SourceStep<?> step, PhysicalSchema physicalSchema, RuntimeBuildContext buildContext, WindowInfo windowInfo) {
        return SourceBuilderUtils.getWindowedKeySerde(step, physicalSchema, buildContext, windowInfo, step.getProperties().getQueryContext());
    }

    static PhysicalSchema getPhysicalSchema(SourceStep<?> streamSource) {
        return PhysicalSchema.from((LogicalSchema)streamSource.getSourceSchema(), (SerdeFeatures)streamSource.getFormats().getKeyFeatures(), (SerdeFeatures)streamSource.getFormats().getValueFeatures());
    }

    static StaticTopicSerde.Callback getRegisterCallback(RuntimeBuildContext buildContext, FormatInfo valueFormat) {
        boolean schemaRegistryEnabled = !buildContext.getKsqlConfig().getString("ksql.schema.registry.url").isEmpty();
        boolean useSR = FormatFactory.fromName((String)valueFormat.getFormat()).supportsFeature(SerdeFeature.SCHEMA_INFERENCE);
        if (!schemaRegistryEnabled || !useSR) {
            return (t1, t2, data) -> {};
        }
        return new RegisterSchemaCallback(buildContext.getServiceContext().getSchemaRegistryClient());
    }

    static String changelogTopic(RuntimeBuildContext buildContext, String stateStoreName) {
        return buildContext.getApplicationId() + "-" + stateStoreName + "-changelog";
    }

    static TimestampExtractor timestampExtractor(KsqlConfig ksqlConfig, LogicalSchema sourceSchema, Optional<TimestampColumn> timestampColumn, SourceStep<?> streamSource, RuntimeBuildContext buildContext) {
        TimestampExtractionPolicy timestampPolicy = TimestampExtractionPolicyFactory.create(ksqlConfig, sourceSchema, timestampColumn);
        Optional<Column> tsColumn = timestampColumn.map(TimestampColumn::getColumn).map(c -> (Column)sourceSchema.findColumn(c).orElseThrow(IllegalStateException::new));
        QueryContext queryContext = streamSource.getProperties().getQueryContext();
        return timestampPolicy.create(tsColumn, ksqlConfig.getBoolean("ksql.timestamp.throw.on.invalid"), buildContext.getProcessingLogger(queryContext));
    }

    static <K> Consumed<K, GenericRow> buildSourceConsumed(SourceStep<?> streamSource, Serde<K> keySerde, Serde<GenericRow> valueSerde, Topology.AutoOffsetReset defaultReset, RuntimeBuildContext buildContext, ConsumedFactory consumedFactory) {
        TimestampExtractor timestampExtractor = SourceBuilderUtils.timestampExtractor(buildContext.getKsqlConfig(), streamSource.getSourceSchema(), streamSource.getTimestampColumn(), streamSource, buildContext);
        Consumed consumed = consumedFactory.create(keySerde, valueSerde).withTimestampExtractor(timestampExtractor);
        return consumed.withOffsetResetPolicy(SourceBuilderUtils.getAutoOffsetReset(defaultReset, buildContext));
    }

    static String tableChangeLogOpName(ExecutionStepPropertiesV1 props) {
        List parts = props.getQueryContext().getContext();
        QueryContext.Stacker stacker = new QueryContext.Stacker();
        for (String part : parts.subList(0, parts.size() - 1)) {
            stacker = stacker.push(new String[]{part});
        }
        return StreamsUtil.buildOpName(stacker.push(new String[]{"Reduce"}).getQueryContext());
    }

    static Function<Windowed<GenericKey>, Collection<?>> windowedKeyGenerator(LogicalSchema schema) {
        if (schema.key().isEmpty()) {
            throw new IllegalStateException("Windowed sources require a key column");
        }
        return windowedKey -> {
            if (windowedKey == null) {
                return NULL_WINDOWED_KEY_COLUMNS;
            }
            Window window = windowedKey.window();
            GenericKey key = (GenericKey)windowedKey.key();
            ArrayList<Long> keys = new ArrayList<Long>(schema.key().size() + 2);
            keys.addAll(key.values());
            keys.add(window.start());
            keys.add(window.end());
            return Collections.unmodifiableCollection(keys);
        };
    }

    static Topology.AutoOffsetReset getAutoOffsetReset(Topology.AutoOffsetReset defaultValue, RuntimeBuildContext buildContext) {
        Object offestReset = buildContext.getKsqlConfig().getKsqlStreamConfigProps().get("auto.offset.reset");
        if (offestReset == null) {
            return defaultValue;
        }
        try {
            return Topology.AutoOffsetReset.valueOf((String)offestReset.toString().toUpperCase());
        }
        catch (Exception e) {
            throw new ConfigException("auto.offset.reset", offestReset, "Unknown value");
        }
    }

    static QueryContext addMaterializedContext(SourceStep<?> step) {
        return QueryContext.Stacker.of((QueryContext)step.getProperties().getQueryContext()).push(new String[]{MATERIALIZE_OP_NAME}).getQueryContext();
    }

    static List<Struct> createHeaderData(Headers headers) {
        return Arrays.stream(headers.toArray()).map(header -> new Struct(SchemaBuilder.struct().field("KEY", Schema.OPTIONAL_STRING_SCHEMA).field("VALUE", Schema.OPTIONAL_BYTES_SCHEMA).optional().build()).put("KEY", (Object)header.key()).put("VALUE", (Object)ByteBuffer.wrap(header.value()))).collect(Collectors.toList());
    }

    static ByteBuffer extractHeader(Headers headers, String key) {
        Header header = headers.lastHeader(key);
        return header == null ? null : ByteBuffer.wrap(header.value());
    }

    static class AddKeyAndPseudoColumns<K>
    implements ValueTransformerWithKeySupplier<K, GenericRow, GenericRow> {
        private final Function<K, Collection<?>> keyGenerator;
        private final int pseudoColumnVersion;
        private final List<Column> headerColumns;

        AddKeyAndPseudoColumns(Function<K, Collection<?>> keyGenerator, int pseudoColumnVersion, List<Column> headerColumns) {
            this.keyGenerator = Objects.requireNonNull(keyGenerator, "keyGenerator");
            this.pseudoColumnVersion = pseudoColumnVersion;
            this.headerColumns = headerColumns;
        }

        public ValueTransformerWithKey<K, GenericRow, GenericRow> get() {
            return new ValueTransformerWithKey<K, GenericRow, GenericRow>(){
                private ProcessorContext processorContext;

                public void init(ProcessorContext processorContext) {
                    this.processorContext = Objects.requireNonNull(processorContext, "processorContext");
                }

                public GenericRow transform(K key, GenericRow row) {
                    if (row == null) {
                        return row;
                    }
                    Collection keyColumns = (Collection)keyGenerator.apply(key);
                    int numPseudoColumns = SystemColumns.pseudoColumnNames((int)pseudoColumnVersion).size();
                    row.ensureAdditionalCapacity(numPseudoColumns + keyColumns.size() + headerColumns.size());
                    for (Column col : headerColumns) {
                        if (col.headerKey().isPresent()) {
                            row.append((Object)SourceBuilderUtils.extractHeader(this.processorContext.headers(), (String)col.headerKey().get()));
                            continue;
                        }
                        row.append(SourceBuilderUtils.createHeaderData(this.processorContext.headers()));
                    }
                    if (pseudoColumnVersion >= 0) {
                        long timestamp = this.processorContext.timestamp();
                        row.append((Object)timestamp);
                    }
                    if (pseudoColumnVersion >= 1) {
                        int partition = this.processorContext.partition();
                        long offset = this.processorContext.offset();
                        row.append((Object)partition);
                        row.append((Object)offset);
                    }
                    row.appendAll(keyColumns);
                    return row;
                }

                public void close() {
                }
            };
        }
    }
}

