package io.confluent.ksql.execution.streams;

import com.google.common.collect.ImmutableList;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.execution.builder.KsqlQueryBuilder;
import io.confluent.ksql.execution.context.QueryContext;
import io.confluent.ksql.execution.plan.ExecutionStepPropertiesV1;
import io.confluent.ksql.execution.plan.KStreamHolder;
import io.confluent.ksql.execution.plan.KTableHolder;
import io.confluent.ksql.execution.plan.KeySerdeFactory;
import io.confluent.ksql.execution.plan.SourceStep;
import io.confluent.ksql.execution.plan.StreamSource;
import io.confluent.ksql.execution.plan.TableSource;
import io.confluent.ksql.execution.plan.WindowedStreamSource;
import io.confluent.ksql.execution.plan.WindowedTableSource;
import io.confluent.ksql.execution.streams.timestamp.TimestampExtractionPolicyFactory;
import io.confluent.ksql.execution.timestamp.TimestampColumn;
import io.confluent.ksql.schema.ksql.Column;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.PhysicalSchema;
import io.confluent.ksql.serde.FormatFactory;
import io.confluent.ksql.serde.FormatInfo;
import io.confluent.ksql.serde.SerdeFeature;
import io.confluent.ksql.serde.StaticTopicSerde;
import io.confluent.ksql.serde.WindowInfo;
import io.confluent.ksql.serde.connect.ConnectSchemas;
import io.confluent.ksql.util.KsqlConfig;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.state.KeyValueStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/execution/streams/SourceBuilder.class */
public final class SourceBuilder {
    private static final Logger LOG = LoggerFactory.getLogger(SourceBuilder.class);
    private static final Collection<?> NULL_WINDOWED_KEY_COLUMNS = Collections.unmodifiableList(Arrays.asList(null, null, null));

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/ksql/execution/streams/SourceBuilder$AddKeyAndTimestampColumns.class */
    public static class AddKeyAndTimestampColumns<K> implements ValueTransformerWithKeySupplier<K, GenericRow, GenericRow> {
        private final Function<K, Collection<?>> keyGenerator;

        AddKeyAndTimestampColumns(Function<K, Collection<?>> function) {
            this.keyGenerator = (Function) Objects.requireNonNull(function, "keyGenerator");
        }

        public ValueTransformerWithKey<K, GenericRow, GenericRow> get() {
            return new ValueTransformerWithKey<K, GenericRow, GenericRow>() { // from class: io.confluent.ksql.execution.streams.SourceBuilder.AddKeyAndTimestampColumns.1
                private ProcessorContext processorContext;

                public void init(ProcessorContext processorContext) {
                    this.processorContext = (ProcessorContext) Objects.requireNonNull(processorContext, "processorContext");
                }

                public GenericRow transform(K k, GenericRow genericRow) {
                    if (genericRow == null) {
                        return genericRow;
                    }
                    long timestamp = this.processorContext.timestamp();
                    Collection collection = (Collection) AddKeyAndTimestampColumns.this.keyGenerator.apply(k);
                    genericRow.ensureAdditionalCapacity(1 + collection.size());
                    genericRow.append(Long.valueOf(timestamp));
                    genericRow.appendAll(collection);
                    return genericRow;
                }

                public void close() {
                }

                public /* bridge */ /* synthetic */ Object transform(Object obj, Object obj2) {
                    return transform((AnonymousClass1) obj, (GenericRow) obj2);
                }
            };
        }
    }

    private SourceBuilder() {
    }

    public static KStreamHolder<Struct> buildStream(KsqlQueryBuilder ksqlQueryBuilder, StreamSource streamSource, ConsumedFactory consumedFactory) {
        PhysicalSchema physicalSchema = getPhysicalSchema(streamSource);
        return new KStreamHolder<>(buildKStream(streamSource, ksqlQueryBuilder, buildSourceConsumed(streamSource, ksqlQueryBuilder.buildKeySerde(streamSource.getFormats().getKeyFormat(), physicalSchema, streamSource.getProperties().getQueryContext()), getValueSerde(ksqlQueryBuilder, streamSource, physicalSchema), Topology.AutoOffsetReset.LATEST, ksqlQueryBuilder, consumedFactory), nonWindowedKeyGenerator(streamSource.getSourceSchema())), buildSchema(streamSource, false), KeySerdeFactory.unwindowed(ksqlQueryBuilder));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static KStreamHolder<Windowed<Struct>> buildWindowedStream(KsqlQueryBuilder ksqlQueryBuilder, WindowedStreamSource windowedStreamSource, ConsumedFactory consumedFactory) {
        PhysicalSchema physicalSchema = getPhysicalSchema(windowedStreamSource);
        Serde<GenericRow> valueSerde = getValueSerde(ksqlQueryBuilder, windowedStreamSource, physicalSchema);
        WindowInfo windowInfo = windowedStreamSource.getWindowInfo();
        return new KStreamHolder<>(buildKStream(windowedStreamSource, ksqlQueryBuilder, buildSourceConsumed(windowedStreamSource, ksqlQueryBuilder.buildKeySerde(windowedStreamSource.getFormats().getKeyFormat(), windowInfo, physicalSchema, windowedStreamSource.getProperties().getQueryContext()), valueSerde, Topology.AutoOffsetReset.LATEST, ksqlQueryBuilder, consumedFactory), windowedKeyGenerator(windowedStreamSource.getSourceSchema())), buildSchema(windowedStreamSource, true), KeySerdeFactory.windowed(ksqlQueryBuilder, windowInfo));
    }

    public static KTableHolder<Struct> buildTable(KsqlQueryBuilder ksqlQueryBuilder, TableSource tableSource, ConsumedFactory consumedFactory, MaterializedFactory materializedFactory) {
        PhysicalSchema physicalSchema = getPhysicalSchema(tableSource);
        Serde<GenericRow> valueSerde = getValueSerde(ksqlQueryBuilder, tableSource, physicalSchema);
        Serde buildKeySerde = ksqlQueryBuilder.buildKeySerde(tableSource.getFormats().getKeyFormat(), physicalSchema, tableSource.getProperties().getQueryContext());
        Consumed buildSourceConsumed = buildSourceConsumed(tableSource, buildKeySerde, valueSerde, Topology.AutoOffsetReset.EARLIEST, ksqlQueryBuilder, consumedFactory);
        String tableChangeLogOpName = tableChangeLogOpName(tableSource.getProperties());
        return KTableHolder.unmaterialized(buildKTable(tableSource, ksqlQueryBuilder, buildSourceConsumed, nonWindowedKeyGenerator(tableSource.getSourceSchema()), materializedFactory.create(buildKeySerde, valueSerde, tableChangeLogOpName), valueSerde, tableChangeLogOpName), buildSchema(tableSource, false), KeySerdeFactory.unwindowed(ksqlQueryBuilder));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static KTableHolder<Windowed<Struct>> buildWindowedTable(KsqlQueryBuilder ksqlQueryBuilder, WindowedTableSource windowedTableSource, ConsumedFactory consumedFactory, MaterializedFactory materializedFactory) {
        PhysicalSchema physicalSchema = getPhysicalSchema(windowedTableSource);
        Serde<GenericRow> valueSerde = getValueSerde(ksqlQueryBuilder, windowedTableSource, physicalSchema);
        WindowInfo windowInfo = windowedTableSource.getWindowInfo();
        Serde buildKeySerde = ksqlQueryBuilder.buildKeySerde(windowedTableSource.getFormats().getKeyFormat(), windowInfo, physicalSchema, windowedTableSource.getProperties().getQueryContext());
        Consumed buildSourceConsumed = buildSourceConsumed(windowedTableSource, buildKeySerde, valueSerde, Topology.AutoOffsetReset.EARLIEST, ksqlQueryBuilder, consumedFactory);
        String tableChangeLogOpName = tableChangeLogOpName(windowedTableSource.getProperties());
        return KTableHolder.unmaterialized(buildKTable(windowedTableSource, ksqlQueryBuilder, buildSourceConsumed, windowedKeyGenerator(windowedTableSource.getSourceSchema()), materializedFactory.create(buildKeySerde, valueSerde, tableChangeLogOpName), valueSerde, tableChangeLogOpName), buildSchema(windowedTableSource, true), KeySerdeFactory.windowed(ksqlQueryBuilder, windowInfo));
    }

    private static LogicalSchema buildSchema(SourceStep<?> sourceStep, boolean z) {
        return sourceStep.getSourceSchema().withPseudoAndKeyColsInValue(z);
    }

    private static Serde<GenericRow> getValueSerde(KsqlQueryBuilder ksqlQueryBuilder, SourceStep<?> sourceStep, PhysicalSchema physicalSchema) {
        return ksqlQueryBuilder.buildValueSerde(sourceStep.getFormats().getValueFormat(), physicalSchema, sourceStep.getProperties().getQueryContext());
    }

    private static PhysicalSchema getPhysicalSchema(SourceStep<?> sourceStep) {
        return PhysicalSchema.from(sourceStep.getSourceSchema(), sourceStep.getFormats().getKeyFeatures(), sourceStep.getFormats().getValueFeatures());
    }

    private static <K> KStream<K, GenericRow> buildKStream(SourceStep<?> sourceStep, KsqlQueryBuilder ksqlQueryBuilder, Consumed<K, GenericRow> consumed, Function<K, Collection<?>> function) {
        return ksqlQueryBuilder.getStreamsBuilder().stream(sourceStep.getTopicName(), consumed).transformValues(new AddKeyAndTimestampColumns(function), new String[0]);
    }

    private static <K> KTable<K, GenericRow> buildKTable(SourceStep<?> sourceStep, KsqlQueryBuilder ksqlQueryBuilder, Consumed<K, GenericRow> consumed, Function<K, Collection<?>> function, Materialized<K, GenericRow, KeyValueStore<Bytes, byte[]>> materialized, Serde<GenericRow> serde, String str) {
        return (!((sourceStep instanceof TableSource) && ((TableSource) sourceStep).isForceChangelog().booleanValue()) ? ksqlQueryBuilder.getStreamsBuilder().table(sourceStep.getTopicName(), consumed.withValueSerde(StaticTopicSerde.wrap(changelogTopic(ksqlQueryBuilder, str), serde, getRegisterCallback(ksqlQueryBuilder, sourceStep.getFormats().getValueFormat()))), materialized) : ksqlQueryBuilder.getStreamsBuilder().table(sourceStep.getTopicName(), consumed).mapValues(genericRow -> {
            return genericRow;
        }, materialized)).transformValues(new AddKeyAndTimestampColumns(function), new String[0]);
    }

    private static StaticTopicSerde.Callback getRegisterCallback(KsqlQueryBuilder ksqlQueryBuilder, FormatInfo formatInfo) {
        return ((!ksqlQueryBuilder.getKsqlConfig().getString("ksql.schema.registry.url").isEmpty()) && FormatFactory.fromName(formatInfo.getFormat()).supportsFeature(SerdeFeature.SCHEMA_INFERENCE)) ? new RegisterSchemaCallback(ksqlQueryBuilder.getServiceContext().getSchemaRegistryClient()) : (str, str2, bArr) -> {
        };
    }

    private static String changelogTopic(KsqlQueryBuilder ksqlQueryBuilder, String str) {
        return "_confluent-ksql-" + ksqlQueryBuilder.getKsqlConfig().getString("ksql.service.id") + ksqlQueryBuilder.getKsqlConfig().getString("ksql.persistent.prefix") + ksqlQueryBuilder.getQueryId().toString() + "-" + str + "-changelog";
    }

    private static TimestampExtractor timestampExtractor(KsqlConfig ksqlConfig, LogicalSchema logicalSchema, Optional<TimestampColumn> optional, SourceStep<?> sourceStep, KsqlQueryBuilder ksqlQueryBuilder) {
        return TimestampExtractionPolicyFactory.create(ksqlConfig, logicalSchema, optional).create(optional.map((v0) -> {
            return v0.getColumn();
        }).map(columnName -> {
            return (Column) logicalSchema.findColumn(columnName).orElseThrow(IllegalStateException::new);
        }), ksqlConfig.getBoolean("ksql.timestamp.throw.on.invalid").booleanValue(), ksqlQueryBuilder.getProcessingLogger(sourceStep.getProperties().getQueryContext()));
    }

    private static <K> Consumed<K, GenericRow> buildSourceConsumed(SourceStep<?> sourceStep, Serde<K> serde, Serde<GenericRow> serde2, Topology.AutoOffsetReset autoOffsetReset, KsqlQueryBuilder ksqlQueryBuilder, ConsumedFactory consumedFactory) {
        return consumedFactory.create(serde, serde2).withTimestampExtractor(timestampExtractor(ksqlQueryBuilder.getKsqlConfig(), sourceStep.getSourceSchema(), sourceStep.getTimestampColumn(), sourceStep, ksqlQueryBuilder)).withOffsetResetPolicy(getAutoOffsetReset(autoOffsetReset, ksqlQueryBuilder));
    }

    private static Optional<Field> getKeySchemaSingleField(LogicalSchema logicalSchema) {
        if (logicalSchema.key().isEmpty()) {
            return Optional.empty();
        }
        if (logicalSchema.key().size() != 1) {
            throw new IllegalStateException("Only single key fields are currently supported");
        }
        return Optional.of(ConnectSchemas.columnsToConnectSchema(logicalSchema.key()).fields().get(0));
    }

    private static String tableChangeLogOpName(ExecutionStepPropertiesV1 executionStepPropertiesV1) {
        List context = executionStepPropertiesV1.getQueryContext().getContext();
        QueryContext.Stacker stacker = new QueryContext.Stacker();
        Iterator it = context.subList(0, context.size() - 1).iterator();
        while (it.hasNext()) {
            stacker = stacker.push(new String[]{(String) it.next()});
        }
        return StreamsUtil.buildOpName(stacker.push(new String[]{"Reduce"}).getQueryContext());
    }

    private static Function<Windowed<Struct>, Collection<?>> windowedKeyGenerator(LogicalSchema logicalSchema) {
        Field orElseThrow = getKeySchemaSingleField(logicalSchema).orElseThrow(() -> {
            return new IllegalStateException("Windowed sources require a key column");
        });
        return windowed -> {
            if (windowed == null) {
                return NULL_WINDOWED_KEY_COLUMNS;
            }
            Window window = windowed.window();
            return Arrays.asList(((Struct) windowed.key()).get(orElseThrow), Long.valueOf(window.start()), Long.valueOf(window.end()));
        };
    }

    private static Function<Struct, Collection<?>> nonWindowedKeyGenerator(LogicalSchema logicalSchema) {
        Optional<Field> keySchemaSingleField = getKeySchemaSingleField(logicalSchema);
        return struct -> {
            return !keySchemaSingleField.isPresent() ? ImmutableList.of() : struct == null ? Collections.singletonList(null) : Collections.singletonList(struct.get((Field) keySchemaSingleField.orElseThrow(IllegalStateException::new)));
        };
    }

    private static Topology.AutoOffsetReset getAutoOffsetReset(Topology.AutoOffsetReset autoOffsetReset, KsqlQueryBuilder ksqlQueryBuilder) {
        Object obj = ksqlQueryBuilder.getKsqlConfig().getKsqlStreamConfigProps().get("auto.offset.reset");
        if (obj == null) {
            return autoOffsetReset;
        }
        try {
            return Topology.AutoOffsetReset.valueOf(obj.toString().toUpperCase());
        } catch (Exception e) {
            throw new ConfigException("auto.offset.reset", obj, "Unknown value");
        }
    }
}
