package io.confluent.ksql.execution.streams;

import io.confluent.ksql.GenericKey;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.execution.context.QueryContext;
import io.confluent.ksql.execution.function.udaf.KudafAggregator;
import io.confluent.ksql.execution.materialization.MaterializationInfo;
import io.confluent.ksql.execution.plan.ExecutionKeyFactory;
import io.confluent.ksql.execution.plan.Formats;
import io.confluent.ksql.execution.plan.KGroupedStreamHolder;
import io.confluent.ksql.execution.plan.KTableHolder;
import io.confluent.ksql.execution.plan.StreamAggregate;
import io.confluent.ksql.execution.plan.StreamWindowedAggregate;
import io.confluent.ksql.execution.runtime.MaterializedFactory;
import io.confluent.ksql.execution.runtime.RuntimeBuildContext;
import io.confluent.ksql.execution.streams.transform.KsValueTransformer;
import io.confluent.ksql.execution.transform.KsqlProcessingContext;
import io.confluent.ksql.execution.transform.KsqlTransformer;
import io.confluent.ksql.execution.windows.HoppingWindowExpression;
import io.confluent.ksql.execution.windows.KsqlWindowExpression;
import io.confluent.ksql.execution.windows.SessionWindowExpression;
import io.confluent.ksql.execution.windows.TumblingWindowExpression;
import io.confluent.ksql.execution.windows.WindowVisitor;
import io.confluent.ksql.parser.OutputRefinement;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.PhysicalSchema;
import java.util.Objects;
import java.util.Optional;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.kstream.EmitStrategy;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.SessionWindowedKStream;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.TimeWindowedKStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;

/* loaded from: input_file:io/confluent/ksql/execution/streams/StreamAggregateBuilder.class */
public final class StreamAggregateBuilder {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/ksql/execution/streams/StreamAggregateBuilder$WindowBoundsPopulator.class */
    public static final class WindowBoundsPopulator implements KsqlTransformer<Windowed<GenericKey>, GenericRow> {
        private WindowBoundsPopulator() {
        }

        public GenericRow transform(Windowed<GenericKey> windowed, GenericRow genericRow, KsqlProcessingContext ksqlProcessingContext) {
            if (genericRow == null) {
                return null;
            }
            Window window = windowed.window();
            genericRow.ensureAdditionalCapacity(2);
            genericRow.append(Long.valueOf(window.start()));
            genericRow.append(Long.valueOf(window.end()));
            return genericRow;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/ksql/execution/streams/StreamAggregateBuilder$WindowedAggregator.class */
    public static class WindowedAggregator implements WindowVisitor<KTable<Windowed<GenericKey>, GenericRow>, Void> {
        final QueryContext queryContext;
        final Formats formats;
        final KGroupedStream<GenericKey, GenericRow> groupedStream;
        final RuntimeBuildContext buildContext;
        final MaterializedFactory materializedFactory;
        final Serde<GenericKey> keySerde;
        final Serde<GenericRow> valueSerde;
        final AggregateParams aggregateParams;

        WindowedAggregator(KGroupedStream<GenericKey, GenericRow> kGroupedStream, StreamWindowedAggregate streamWindowedAggregate, LogicalSchema logicalSchema, RuntimeBuildContext runtimeBuildContext, MaterializedFactory materializedFactory, AggregateParams aggregateParams) {
            Objects.requireNonNull(streamWindowedAggregate, "aggregate");
            this.groupedStream = (KGroupedStream) Objects.requireNonNull(kGroupedStream, "groupedStream");
            this.buildContext = (RuntimeBuildContext) Objects.requireNonNull(runtimeBuildContext, "buildContext");
            this.materializedFactory = (MaterializedFactory) Objects.requireNonNull(materializedFactory, "materializedFactory");
            this.aggregateParams = (AggregateParams) Objects.requireNonNull(aggregateParams, "aggregateParams");
            this.queryContext = MaterializationUtil.materializeContext(streamWindowedAggregate);
            this.formats = streamWindowedAggregate.getInternalFormats();
            PhysicalSchema from = PhysicalSchema.from(logicalSchema, this.formats.getKeyFeatures(), this.formats.getValueFeatures());
            this.keySerde = runtimeBuildContext.buildKeySerde(this.formats.getKeyFormat(), from, this.queryContext);
            this.valueSerde = runtimeBuildContext.buildValueSerde(this.formats.getValueFormat(), from, this.queryContext);
        }

        public KTable<Windowed<GenericKey>, GenericRow> visitHoppingWindowExpression(HoppingWindowExpression hoppingWindowExpression, Void r12) {
            TimeWindows advanceBy = TimeWindows.of(hoppingWindowExpression.getSize().toDuration()).advanceBy(hoppingWindowExpression.getAdvanceBy().toDuration());
            Optional map = hoppingWindowExpression.getGracePeriod().map((v0) -> {
                return v0.toDuration();
            });
            advanceBy.getClass();
            TimeWindowedKStream windowedBy = this.groupedStream.windowedBy((TimeWindows) map.map(advanceBy::grace).orElse(advanceBy));
            if (hoppingWindowExpression.getEmitStrategy().isPresent() && hoppingWindowExpression.getEmitStrategy().get() == OutputRefinement.FINAL) {
                windowedBy = windowedBy.emitStrategy(EmitStrategy.onWindowClose());
            }
            return windowedBy.aggregate(this.aggregateParams.getInitializer(), this.aggregateParams.getAggregator(), this.materializedFactory.create(this.keySerde, this.valueSerde, StreamsUtil.buildOpName(this.queryContext), hoppingWindowExpression.getRetention().map((v0) -> {
                return v0.toDuration();
            })));
        }

        public KTable<Windowed<GenericKey>, GenericRow> visitSessionWindowExpression(SessionWindowExpression sessionWindowExpression, Void r13) {
            SessionWindows with = SessionWindows.with(sessionWindowExpression.getGap().toDuration());
            Optional map = sessionWindowExpression.getGracePeriod().map((v0) -> {
                return v0.toDuration();
            });
            with.getClass();
            SessionWindowedKStream windowedBy = this.groupedStream.windowedBy((SessionWindows) map.map(with::grace).orElse(with));
            if (sessionWindowExpression.getEmitStrategy().isPresent() && sessionWindowExpression.getEmitStrategy().get() == OutputRefinement.FINAL) {
                windowedBy = windowedBy.emitStrategy(EmitStrategy.onWindowClose());
            }
            return windowedBy.aggregate(this.aggregateParams.getInitializer(), this.aggregateParams.getAggregator(), this.aggregateParams.getAggregator().getMerger(), this.materializedFactory.create(this.keySerde, this.valueSerde, StreamsUtil.buildOpName(this.queryContext), sessionWindowExpression.getRetention().map((v0) -> {
                return v0.toDuration();
            })));
        }

        public KTable<Windowed<GenericKey>, GenericRow> visitTumblingWindowExpression(TumblingWindowExpression tumblingWindowExpression, Void r12) {
            TimeWindows of = TimeWindows.of(tumblingWindowExpression.getSize().toDuration());
            Optional map = tumblingWindowExpression.getGracePeriod().map((v0) -> {
                return v0.toDuration();
            });
            of.getClass();
            TimeWindowedKStream windowedBy = this.groupedStream.windowedBy((TimeWindows) map.map(of::grace).orElse(of));
            if (tumblingWindowExpression.getEmitStrategy().isPresent() && tumblingWindowExpression.getEmitStrategy().get() == OutputRefinement.FINAL) {
                windowedBy = windowedBy.emitStrategy(EmitStrategy.onWindowClose());
            }
            return windowedBy.aggregate(this.aggregateParams.getInitializer(), this.aggregateParams.getAggregator(), this.materializedFactory.create(this.keySerde, this.valueSerde, StreamsUtil.buildOpName(this.queryContext), tumblingWindowExpression.getRetention().map((v0) -> {
                return v0.toDuration();
            })));
        }
    }

    private StreamAggregateBuilder() {
    }

    public static KTableHolder<GenericKey> build(KGroupedStreamHolder kGroupedStreamHolder, StreamAggregate streamAggregate, RuntimeBuildContext runtimeBuildContext, MaterializedFactory materializedFactory) {
        return build(kGroupedStreamHolder, streamAggregate, runtimeBuildContext, materializedFactory, new AggregateParamsFactory());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static KTableHolder<GenericKey> build(KGroupedStreamHolder kGroupedStreamHolder, StreamAggregate streamAggregate, RuntimeBuildContext runtimeBuildContext, MaterializedFactory materializedFactory, AggregateParamsFactory aggregateParamsFactory) {
        AggregateParams create = aggregateParamsFactory.create(kGroupedStreamHolder.getSchema(), streamAggregate.getNonAggregateColumns(), runtimeBuildContext.getFunctionRegistry(), streamAggregate.getAggregationFunctions(), false, runtimeBuildContext.getKsqlConfig());
        LogicalSchema aggregateSchema = create.getAggregateSchema();
        LogicalSchema schema = create.getSchema();
        Materialized buildMaterialized = MaterializationUtil.buildMaterialized(streamAggregate, aggregateSchema, streamAggregate.getInternalFormats(), runtimeBuildContext, materializedFactory, ExecutionKeyFactory.unwindowed(runtimeBuildContext));
        KudafAggregator aggregator = create.getAggregator();
        KTable aggregate = kGroupedStreamHolder.getGroupedStream().aggregate(create.getInitializer(), create.getAggregator(), buildMaterialized);
        return KTableHolder.materialized(aggregate.transformValues(() -> {
            return new KsValueTransformer(aggregator.getResultMapper());
        }, Named.as(StreamsUtil.buildOpName(AggregateBuilderUtils.outputContext(streamAggregate))), new String[0]), schema, ExecutionKeyFactory.unwindowed(runtimeBuildContext), AggregateBuilderUtils.materializationInfoBuilder(create.getAggregator(), streamAggregate, aggregateSchema, schema));
    }

    public static KTableHolder<Windowed<GenericKey>> build(KGroupedStreamHolder kGroupedStreamHolder, StreamWindowedAggregate streamWindowedAggregate, RuntimeBuildContext runtimeBuildContext, MaterializedFactory materializedFactory) {
        return build(kGroupedStreamHolder, streamWindowedAggregate, runtimeBuildContext, materializedFactory, new AggregateParamsFactory());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static KTableHolder<Windowed<GenericKey>> build(KGroupedStreamHolder kGroupedStreamHolder, StreamWindowedAggregate streamWindowedAggregate, RuntimeBuildContext runtimeBuildContext, MaterializedFactory materializedFactory, AggregateParamsFactory aggregateParamsFactory) {
        AggregateParams create = aggregateParamsFactory.create(kGroupedStreamHolder.getSchema(), streamWindowedAggregate.getNonAggregateColumns(), runtimeBuildContext.getFunctionRegistry(), streamWindowedAggregate.getAggregationFunctions(), true, runtimeBuildContext.getKsqlConfig());
        LogicalSchema aggregateSchema = create.getAggregateSchema();
        LogicalSchema schema = create.getSchema();
        KsqlWindowExpression windowExpression = streamWindowedAggregate.getWindowExpression();
        KTable kTable = (KTable) windowExpression.accept(new WindowedAggregator(kGroupedStreamHolder.getGroupedStream(), streamWindowedAggregate, aggregateSchema, runtimeBuildContext, materializedFactory, create), (Object) null);
        KudafAggregator aggregator = create.getAggregator();
        KTable transformValues = kTable.transformValues(() -> {
            return new KsValueTransformer(aggregator.getResultMapper());
        }, Named.as(StreamsUtil.buildOpName(AggregateBuilderUtils.outputContext(streamWindowedAggregate))), new String[0]);
        MaterializationInfo.Builder materializationInfoBuilder = AggregateBuilderUtils.materializationInfoBuilder(create.getAggregator(), streamWindowedAggregate, aggregateSchema, schema);
        KTable transformValues2 = transformValues.transformValues(() -> {
            return new KsValueTransformer(new WindowBoundsPopulator());
        }, Named.as(StreamsUtil.buildOpName(AggregateBuilderUtils.windowSelectContext(streamWindowedAggregate))), new String[0]);
        materializationInfoBuilder.map(processingLogger -> {
            return new WindowBoundsPopulator();
        }, schema, AggregateBuilderUtils.windowSelectContext(streamWindowedAggregate));
        return KTableHolder.materialized(transformValues2, schema, ExecutionKeyFactory.windowed(runtimeBuildContext, windowExpression.getWindowInfo()), materializationInfoBuilder);
    }
}
