package io.confluent.ksql.execution.streams;

import io.confluent.ksql.GenericRow;
import io.confluent.ksql.execution.builder.KsqlQueryBuilder;
import io.confluent.ksql.execution.plan.KStreamHolder;
import io.confluent.ksql.execution.plan.StreamFilter;
import io.confluent.ksql.execution.streams.transform.KsTransformer;
import io.confluent.ksql.execution.transform.KsqlTransformer;
import io.confluent.ksql.execution.transform.sqlpredicate.SqlPredicate;
import io.confluent.ksql.logging.processing.ProcessingLogger;
import java.util.Collections;
import java.util.Optional;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
import org.apache.kafka.streams.processor.ProcessorContext;

/* loaded from: input_file:io/confluent/ksql/execution/streams/StreamFilterBuilder.class */
public final class StreamFilterBuilder {
    private StreamFilterBuilder() {
    }

    public static <K> KStreamHolder<K> build(KStreamHolder<K> kStreamHolder, StreamFilter<K> streamFilter, KsqlQueryBuilder ksqlQueryBuilder) {
        return build(kStreamHolder, streamFilter, ksqlQueryBuilder, SqlPredicate::new);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <K> KStreamHolder<K> build(KStreamHolder<K> kStreamHolder, StreamFilter<K> streamFilter, KsqlQueryBuilder ksqlQueryBuilder, SqlPredicateFactory sqlPredicateFactory) {
        SqlPredicate create = sqlPredicateFactory.create(streamFilter.getFilterExpression(), kStreamHolder.getSchema(), ksqlQueryBuilder.getKsqlConfig(), ksqlQueryBuilder.getFunctionRegistry());
        ProcessingLogger processingLogger = ksqlQueryBuilder.getProcessingLogger(streamFilter.getProperties().getQueryContext());
        return kStreamHolder.withStream(kStreamHolder.getStream().flatTransformValues(() -> {
            return toFlatMapTransformer(create.getTransformer(processingLogger));
        }, Named.as(StreamsUtil.buildOpName(streamFilter.getProperties().getQueryContext())), new String[0]), kStreamHolder.getSchema());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <K> ValueTransformerWithKey<K, GenericRow, Iterable<GenericRow>> toFlatMapTransformer(KsqlTransformer<K, Optional<GenericRow>> ksqlTransformer) {
        final KsTransformer ksTransformer = new KsTransformer(ksqlTransformer);
        return new ValueTransformerWithKey<K, GenericRow, Iterable<GenericRow>>() { // from class: io.confluent.ksql.execution.streams.StreamFilterBuilder.1
            public void init(ProcessorContext processorContext) {
                ksTransformer.init(processorContext);
            }

            public Iterable<GenericRow> transform(K k, GenericRow genericRow) {
                return (Iterable) ((Optional) ksTransformer.transform(k, genericRow)).map((v0) -> {
                    return Collections.singletonList(v0);
                }).orElse(Collections.emptyList());
            }

            public void close() {
                ksTransformer.close();
            }

            /* JADX WARN: Multi-variable type inference failed */
            public /* bridge */ /* synthetic */ Object transform(Object obj, Object obj2) {
                return transform((AnonymousClass1<K>) obj, (GenericRow) obj2);
            }
        };
    }
}
