/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.ksql.execution.transform.sqlpredicate;

import com.google.common.collect.Iterables;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.execution.codegen.CodeGenRunner;
import io.confluent.ksql.execution.codegen.ExpressionMetadata;
import io.confluent.ksql.execution.expression.tree.Expression;
import io.confluent.ksql.execution.transform.KsqlProcessingContext;
import io.confluent.ksql.execution.transform.KsqlTransformer;
import io.confluent.ksql.execution.util.EngineProcessingLogMessageFactory;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.logging.processing.ProcessingLogger;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.types.SqlTypes;
import io.confluent.ksql.util.KsqlConfig;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Stream;

public final class SqlPredicate {
    private final Expression filterExpression;
    private final ExpressionMetadata evaluator;

    public SqlPredicate(Expression filterExpression, LogicalSchema schema, KsqlConfig ksqlConfig, FunctionRegistry functionRegistry) {
        this.filterExpression = Objects.requireNonNull(filterExpression, "filterExpression");
        this.evaluator = (ExpressionMetadata)Iterables.getOnlyElement(CodeGenRunner.compileExpressions(Stream.of(filterExpression), "Predicate", schema, ksqlConfig, functionRegistry));
        if (!this.evaluator.getExpressionType().equals(SqlTypes.BOOLEAN)) {
            throw new IllegalArgumentException("Filter expression must resolve to boolean: " + (Object)((Object)filterExpression));
        }
    }

    public <K> KsqlTransformer<K, Optional<GenericRow>> getTransformer(ProcessingLogger processingLogger) {
        return new Transformer(processingLogger);
    }

    private final class Transformer<K>
    implements KsqlTransformer<K, Optional<GenericRow>> {
        private final ProcessingLogger processingLogger;
        private final String errorMsg;

        Transformer(ProcessingLogger processingLogger) {
            this.processingLogger = Objects.requireNonNull(processingLogger, "processingLogger");
            this.errorMsg = "Error evaluating predicate " + SqlPredicate.this.filterExpression.toString() + ": ";
        }

        @Override
        public Optional<GenericRow> transform(K readOnlyKey, GenericRow value, KsqlProcessingContext ctx) {
            if (value == null) {
                return Optional.empty();
            }
            try {
                boolean result = (Boolean)SqlPredicate.this.evaluator.evaluate(value);
                return result ? Optional.of(value) : Optional.empty();
            }
            catch (Exception e) {
                this.logProcessingError(this.processingLogger, e, value);
                return Optional.empty();
            }
        }

        private void logProcessingError(ProcessingLogger processingLogger, Exception e, GenericRow row) {
            processingLogger.error(EngineProcessingLogMessageFactory.recordProcessingError(this.errorMsg + e.getMessage(), e, row));
        }
    }
}

