package io.confluent.ksql.execution.streams;

import io.confluent.ksql.GenericKey;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.execution.context.QueryContext;
import io.confluent.ksql.execution.plan.ExecutionStepPropertiesV1;
import io.confluent.ksql.execution.plan.SourceStep;
import io.confluent.ksql.execution.runtime.RuntimeBuildContext;
import io.confluent.ksql.execution.streams.timestamp.TimestampExtractionPolicyFactory;
import io.confluent.ksql.execution.timestamp.TimestampColumn;
import io.confluent.ksql.schema.ksql.Column;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.PhysicalSchema;
import io.confluent.ksql.schema.ksql.SystemColumns;
import io.confluent.ksql.serde.FormatFactory;
import io.confluent.ksql.serde.FormatInfo;
import io.confluent.ksql.serde.SerdeFeature;
import io.confluent.ksql.serde.StaticTopicSerde;
import io.confluent.ksql.serde.WindowInfo;
import io.confluent.ksql.util.KsqlConfig;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.TimestampExtractor;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/confluent/ksql/execution/streams/SourceBuilderUtils.class */
public final class SourceBuilderUtils {
    private static final String MATERIALIZE_OP_NAME = "Materialized";
    private static final Collection<?> NULL_WINDOWED_KEY_COLUMNS = Collections.unmodifiableList(Arrays.asList(null, null, null));

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/confluent/ksql/execution/streams/SourceBuilderUtils$AddKeyAndPseudoColumns.class */
    public static class AddKeyAndPseudoColumns<K> implements ValueTransformerWithKeySupplier<K, GenericRow, GenericRow> {
        private final Function<K, Collection<?>> keyGenerator;
        private final int pseudoColumnVersion;
        private final List<Column> headerColumns;

        /* JADX INFO: Access modifiers changed from: package-private */
        public AddKeyAndPseudoColumns(Function<K, Collection<?>> function, int i, List<Column> list) {
            this.keyGenerator = (Function) Objects.requireNonNull(function, "keyGenerator");
            this.pseudoColumnVersion = i;
            this.headerColumns = list;
        }

        /* renamed from: get, reason: merged with bridge method [inline-methods] */
        public ValueTransformerWithKey<K, GenericRow, GenericRow> m9get() {
            return new ValueTransformerWithKey<K, GenericRow, GenericRow>() { // from class: io.confluent.ksql.execution.streams.SourceBuilderUtils.AddKeyAndPseudoColumns.1
                private ProcessorContext processorContext;

                public void init(ProcessorContext processorContext) {
                    this.processorContext = (ProcessorContext) Objects.requireNonNull(processorContext, "processorContext");
                }

                public GenericRow transform(K k, GenericRow genericRow) {
                    if (genericRow == null) {
                        return genericRow;
                    }
                    Collection collection = (Collection) AddKeyAndPseudoColumns.this.keyGenerator.apply(k);
                    genericRow.ensureAdditionalCapacity(SystemColumns.pseudoColumnNames(AddKeyAndPseudoColumns.this.pseudoColumnVersion).size() + collection.size() + AddKeyAndPseudoColumns.this.headerColumns.size());
                    for (Column column : AddKeyAndPseudoColumns.this.headerColumns) {
                        if (column.headerKey().isPresent()) {
                            genericRow.append(SourceBuilderUtils.extractHeader(this.processorContext.headers(), (String) column.headerKey().get()));
                        } else {
                            genericRow.append(SourceBuilderUtils.createHeaderData(this.processorContext.headers()));
                        }
                    }
                    if (AddKeyAndPseudoColumns.this.pseudoColumnVersion >= 0) {
                        genericRow.append(Long.valueOf(this.processorContext.timestamp()));
                    }
                    if (AddKeyAndPseudoColumns.this.pseudoColumnVersion >= 1) {
                        int partition = this.processorContext.partition();
                        long offset = this.processorContext.offset();
                        genericRow.append(Integer.valueOf(partition));
                        genericRow.append(Long.valueOf(offset));
                    }
                    genericRow.appendAll(collection);
                    return genericRow;
                }

                public void close() {
                }

                public /* bridge */ /* synthetic */ Object transform(Object obj, Object obj2) {
                    return transform((AnonymousClass1) obj, (GenericRow) obj2);
                }
            };
        }
    }

    private SourceBuilderUtils() {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static LogicalSchema buildSchema(SourceStep<?> sourceStep, boolean z) {
        return sourceStep.getSourceSchema().withPseudoAndKeyColsInValue(z, sourceStep.getPseudoColumnVersion());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Serde<GenericRow> getValueSerde(RuntimeBuildContext runtimeBuildContext, SourceStep<?> sourceStep, PhysicalSchema physicalSchema, QueryContext queryContext) {
        return runtimeBuildContext.buildValueSerde(sourceStep.getFormats().getValueFormat(), physicalSchema, queryContext);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Serde<GenericRow> getValueSerde(RuntimeBuildContext runtimeBuildContext, SourceStep<?> sourceStep, PhysicalSchema physicalSchema) {
        return getValueSerde(runtimeBuildContext, sourceStep, physicalSchema, sourceStep.getProperties().getQueryContext());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Serde<GenericKey> getKeySerde(SourceStep<?> sourceStep, PhysicalSchema physicalSchema, RuntimeBuildContext runtimeBuildContext, QueryContext queryContext) {
        return runtimeBuildContext.buildKeySerde(sourceStep.getFormats().getKeyFormat(), physicalSchema, queryContext);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Serde<GenericKey> getKeySerde(SourceStep<?> sourceStep, PhysicalSchema physicalSchema, RuntimeBuildContext runtimeBuildContext) {
        return getKeySerde(sourceStep, physicalSchema, runtimeBuildContext, sourceStep.getProperties().getQueryContext());
    }

    static Serde<Windowed<GenericKey>> getWindowedKeySerde(SourceStep<?> sourceStep, PhysicalSchema physicalSchema, RuntimeBuildContext runtimeBuildContext, WindowInfo windowInfo, QueryContext queryContext) {
        return runtimeBuildContext.buildKeySerde(sourceStep.getFormats().getKeyFormat(), windowInfo, physicalSchema, queryContext);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Serde<Windowed<GenericKey>> getWindowedKeySerde(SourceStep<?> sourceStep, PhysicalSchema physicalSchema, RuntimeBuildContext runtimeBuildContext, WindowInfo windowInfo) {
        return getWindowedKeySerde(sourceStep, physicalSchema, runtimeBuildContext, windowInfo, sourceStep.getProperties().getQueryContext());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static PhysicalSchema getPhysicalSchema(SourceStep<?> sourceStep) {
        return PhysicalSchema.from(sourceStep.getSourceSchema(), sourceStep.getFormats().getKeyFeatures(), sourceStep.getFormats().getValueFeatures());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static StaticTopicSerde.Callback getRegisterCallback(RuntimeBuildContext runtimeBuildContext, FormatInfo formatInfo) {
        return ((!runtimeBuildContext.getKsqlConfig().getString("ksql.schema.registry.url").isEmpty()) && FormatFactory.fromName(formatInfo.getFormat()).supportsFeature(SerdeFeature.SCHEMA_INFERENCE)) ? new RegisterSchemaCallback(runtimeBuildContext.getServiceContext().getSchemaRegistryClient()) : (str, str2, bArr) -> {
        };
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static String changelogTopic(RuntimeBuildContext runtimeBuildContext, String str) {
        return runtimeBuildContext.getApplicationId() + "-" + str + "-changelog";
    }

    static TimestampExtractor timestampExtractor(KsqlConfig ksqlConfig, LogicalSchema logicalSchema, Optional<TimestampColumn> optional, SourceStep<?> sourceStep, RuntimeBuildContext runtimeBuildContext) {
        return TimestampExtractionPolicyFactory.create(ksqlConfig, logicalSchema, optional).create(optional.map((v0) -> {
            return v0.getColumn();
        }).map(columnName -> {
            return (Column) logicalSchema.findColumn(columnName).orElseThrow(IllegalStateException::new);
        }), ksqlConfig.getBoolean("ksql.timestamp.throw.on.invalid").booleanValue(), runtimeBuildContext.getProcessingLogger(sourceStep.getProperties().getQueryContext()));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <K> Consumed<K, GenericRow> buildSourceConsumed(SourceStep<?> sourceStep, Serde<K> serde, Serde<GenericRow> serde2, Topology.AutoOffsetReset autoOffsetReset, RuntimeBuildContext runtimeBuildContext, ConsumedFactory consumedFactory) {
        return consumedFactory.create(serde, serde2).withTimestampExtractor(timestampExtractor(runtimeBuildContext.getKsqlConfig(), sourceStep.getSourceSchema(), sourceStep.getTimestampColumn(), sourceStep, runtimeBuildContext)).withOffsetResetPolicy(getAutoOffsetReset(autoOffsetReset, runtimeBuildContext));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static String tableChangeLogOpName(ExecutionStepPropertiesV1 executionStepPropertiesV1) {
        List context = executionStepPropertiesV1.getQueryContext().getContext();
        QueryContext.Stacker stacker = new QueryContext.Stacker();
        Iterator it = context.subList(0, context.size() - 1).iterator();
        while (it.hasNext()) {
            stacker = stacker.push(new String[]{(String) it.next()});
        }
        return StreamsUtil.buildOpName(stacker.push(new String[]{"Reduce"}).getQueryContext());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Function<Windowed<GenericKey>, Collection<?>> windowedKeyGenerator(LogicalSchema logicalSchema) {
        if (logicalSchema.key().isEmpty()) {
            throw new IllegalStateException("Windowed sources require a key column");
        }
        return windowed -> {
            if (windowed == null) {
                return NULL_WINDOWED_KEY_COLUMNS;
            }
            Window window = windowed.window();
            GenericKey genericKey = (GenericKey) windowed.key();
            ArrayList arrayList = new ArrayList(logicalSchema.key().size() + 2);
            arrayList.addAll(genericKey.values());
            arrayList.add(Long.valueOf(window.start()));
            arrayList.add(Long.valueOf(window.end()));
            return Collections.unmodifiableCollection(arrayList);
        };
    }

    static Topology.AutoOffsetReset getAutoOffsetReset(Topology.AutoOffsetReset autoOffsetReset, RuntimeBuildContext runtimeBuildContext) {
        Object obj = runtimeBuildContext.getKsqlConfig().getKsqlStreamConfigProps().get("auto.offset.reset");
        if (obj == null) {
            return autoOffsetReset;
        }
        try {
            return Topology.AutoOffsetReset.valueOf(obj.toString().toUpperCase());
        } catch (Exception e) {
            throw new ConfigException("auto.offset.reset", obj, "Unknown value");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static QueryContext addMaterializedContext(SourceStep<?> sourceStep) {
        return QueryContext.Stacker.of(sourceStep.getProperties().getQueryContext()).push(new String[]{MATERIALIZE_OP_NAME}).getQueryContext();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static List<Struct> createHeaderData(Headers headers) {
        return (List) Arrays.stream(headers.toArray()).map(header -> {
            return new Struct(SchemaBuilder.struct().field("KEY", Schema.OPTIONAL_STRING_SCHEMA).field("VALUE", Schema.OPTIONAL_BYTES_SCHEMA).optional().build()).put("KEY", header.key()).put("VALUE", ByteBuffer.wrap(header.value()));
        }).collect(Collectors.toList());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static ByteBuffer extractHeader(Headers headers, String str) {
        Header lastHeader = headers.lastHeader(str);
        if (lastHeader == null) {
            return null;
        }
        return ByteBuffer.wrap(lastHeader.value());
    }
}
