package io.confluent.ksql.engine;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import com.google.common.collect.Streams;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.exception.KsqlTopicAuthorizationException;
import io.confluent.ksql.execution.codegen.CodeGenRunner;
import io.confluent.ksql.execution.codegen.ExpressionMetadata;
import io.confluent.ksql.execution.expression.tree.Expression;
import io.confluent.ksql.execution.expression.tree.VisitParentExpressionVisitor;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.logging.processing.NoopProcessingLogContext;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.metastore.model.KeyField;
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.parser.tree.InsertValues;
import io.confluent.ksql.schema.ksql.DefaultSqlValueCoercer;
import io.confluent.ksql.schema.ksql.FormatOptions;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.PhysicalSchema;
import io.confluent.ksql.schema.ksql.SchemaConverters;
import io.confluent.ksql.schema.ksql.SqlValueCoercer;
import io.confluent.ksql.schema.ksql.types.SqlType;
import io.confluent.ksql.serde.GenericKeySerDe;
import io.confluent.ksql.serde.GenericRowSerDe;
import io.confluent.ksql.serde.KeySerdeFactory;
import io.confluent.ksql.serde.ValueSerdeFactory;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlStatementException;
import io.confluent.ksql.util.ReservedInternalTopics;
import io.confluent.ksql.util.SchemaUtil;
import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/engine/InsertValuesExecutor.class */
public class InsertValuesExecutor {
    private static final Logger LOG = LoggerFactory.getLogger(InsertValuesExecutor.class);
    private static final Duration MAX_SEND_TIMEOUT = Duration.ofSeconds(5);
    private final LongSupplier clock;
    private final boolean canBeDisabledByConfig;
    private final RecordProducer producer;
    private final ValueSerdeFactory valueSerdeFactory;
    private final KeySerdeFactory keySerdeFactory;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/ksql/engine/InsertValuesExecutor$ExpressionResolver.class */
    public static class ExpressionResolver extends VisitParentExpressionVisitor<Object, Void> {
        private final SqlType fieldType;
        private final ColumnName fieldName;
        private final LogicalSchema schema;
        private final SqlValueCoercer sqlValueCoercer = DefaultSqlValueCoercer.INSTANCE;
        private final FunctionRegistry functionRegistry;
        private final KsqlConfig config;

        ExpressionResolver(SqlType sqlType, ColumnName columnName, LogicalSchema logicalSchema, FunctionRegistry functionRegistry, KsqlConfig ksqlConfig) {
            this.fieldType = (SqlType) Objects.requireNonNull(sqlType, "fieldType");
            this.fieldName = (ColumnName) Objects.requireNonNull(columnName, "fieldName");
            this.schema = (LogicalSchema) Objects.requireNonNull(logicalSchema, "schema");
            this.functionRegistry = (FunctionRegistry) Objects.requireNonNull(functionRegistry, "functionRegistry");
            this.config = (KsqlConfig) Objects.requireNonNull(ksqlConfig, "config");
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public Object visitExpression(Expression expression, Void r8) {
            Object evaluate = ((ExpressionMetadata) Iterables.getOnlyElement(CodeGenRunner.compileExpressions(Stream.of(expression), "insert value", this.schema, this.config, this.functionRegistry))).evaluate(new GenericRow());
            return this.sqlValueCoercer.coerce(evaluate, this.fieldType).orElseThrow(() -> {
                return new KsqlException(String.format("Expected type %s for field %s but got %s(%s)", this.fieldType, this.fieldName, SchemaConverters.javaToSqlConverter().toSqlType(evaluate.getClass()), evaluate));
            });
        }
    }

    /* loaded from: input_file:io/confluent/ksql/engine/InsertValuesExecutor$RecordProducer.class */
    public interface RecordProducer {
        void sendRecord(ProducerRecord<byte[], byte[]> producerRecord, ServiceContext serviceContext, Map<String, Object> map);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/ksql/engine/InsertValuesExecutor$RowData.class */
    public static final class RowData {
        final long ts;
        final Struct key;
        final GenericRow value;

        /* JADX INFO: Access modifiers changed from: private */
        public static RowData of(long j, Struct struct, GenericRow genericRow) {
            return new RowData(j, struct, genericRow);
        }

        private RowData(long j, Struct struct, GenericRow genericRow) {
            this.ts = j;
            this.key = struct;
            this.value = genericRow;
        }
    }

    public InsertValuesExecutor() {
        this(true, InsertValuesExecutor::sendRecord);
    }

    @VisibleForTesting
    InsertValuesExecutor(boolean z, RecordProducer recordProducer) {
        this(recordProducer, z, System::currentTimeMillis, new GenericKeySerDe(), new GenericRowSerDe());
    }

    @VisibleForTesting
    InsertValuesExecutor(LongSupplier longSupplier, KeySerdeFactory keySerdeFactory, ValueSerdeFactory valueSerdeFactory) {
        this(InsertValuesExecutor::sendRecord, true, longSupplier, keySerdeFactory, valueSerdeFactory);
    }

    private InsertValuesExecutor(RecordProducer recordProducer, boolean z, LongSupplier longSupplier, KeySerdeFactory keySerdeFactory, ValueSerdeFactory valueSerdeFactory) {
        this.canBeDisabledByConfig = z;
        this.producer = (RecordProducer) Objects.requireNonNull(recordProducer, "producer");
        this.clock = (LongSupplier) Objects.requireNonNull(longSupplier, "clock");
        this.keySerdeFactory = (KeySerdeFactory) Objects.requireNonNull(keySerdeFactory, "keySerdeFactory");
        this.valueSerdeFactory = (ValueSerdeFactory) Objects.requireNonNull(valueSerdeFactory, "valueSerdeFactory");
    }

    public void execute(ConfiguredStatement<InsertValues> configuredStatement, Map<String, ?> map, KsqlExecutionContext ksqlExecutionContext, ServiceContext serviceContext) {
        InsertValues statement = configuredStatement.getStatement();
        MetaStore metaStore = ksqlExecutionContext.getMetaStore();
        KsqlConfig cloneWithPropertyOverwrite = configuredStatement.getConfig().cloneWithPropertyOverwrite(configuredStatement.getOverrides());
        try {
            this.producer.sendRecord(buildRecord(configuredStatement, metaStore, getDataSource(cloneWithPropertyOverwrite, metaStore, statement), serviceContext), serviceContext, cloneWithPropertyOverwrite.getProducerClientConfigProps());
        } catch (Exception e) {
            throw new KsqlException(createInsertFailedExceptionMessage(statement), e);
        } catch (TopicAuthorizationException e2) {
            throw new KsqlException(createInsertFailedExceptionMessage(statement), new KsqlTopicAuthorizationException(AclOperation.WRITE, e2.unauthorizedTopics()));
        }
    }

    private DataSource getDataSource(KsqlConfig ksqlConfig, MetaStore metaStore, InsertValues insertValues) {
        DataSource source = metaStore.getSource(insertValues.getTarget());
        if (source == null) {
            throw new KsqlException("Cannot insert values into an unknown stream/table: " + insertValues.getTarget());
        }
        if (source.getKsqlTopic().getKeyFormat().isWindowed()) {
            throw new KsqlException("Cannot insert values into windowed stream/table!");
        }
        if (new ReservedInternalTopics(ksqlConfig).isReadOnly(source.getKafkaTopicName())) {
            throw new KsqlException("Cannot insert values into read-only topic: " + source.getKafkaTopicName());
        }
        return source;
    }

    private ProducerRecord<byte[], byte[]> buildRecord(ConfiguredStatement<InsertValues> configuredStatement, MetaStore metaStore, DataSource dataSource, ServiceContext serviceContext) {
        throwIfDisabled(configuredStatement.getConfig());
        InsertValues statement = configuredStatement.getStatement();
        KsqlConfig cloneWithPropertyOverwrite = configuredStatement.getConfig().cloneWithPropertyOverwrite(configuredStatement.getOverrides());
        try {
            RowData extractRow = extractRow(statement, dataSource, metaStore, cloneWithPropertyOverwrite);
            return new ProducerRecord<>(dataSource.getKafkaTopicName(), (Integer) null, Long.valueOf(extractRow.ts), serializeKey(extractRow.key, dataSource, cloneWithPropertyOverwrite, serviceContext), serializeValue(extractRow.value, dataSource, cloneWithPropertyOverwrite, serviceContext));
        } catch (Exception e) {
            throw new KsqlStatementException(createInsertFailedExceptionMessage(statement) + " " + e.getMessage(), configuredStatement.getStatementText(), e);
        }
    }

    private static String createInsertFailedExceptionMessage(InsertValues insertValues) {
        return "Failed to insert values into '" + insertValues.getTarget().name() + "'.";
    }

    private void throwIfDisabled(KsqlConfig ksqlConfig) {
        boolean booleanValue = ksqlConfig.getBoolean("ksql.insert.into.values.enabled").booleanValue();
        if (this.canBeDisabledByConfig && !booleanValue) {
            throw new KsqlException("The server has disabled INSERT INTO ... VALUES functionality. To enable it, restart your KSQL-server with 'ksql.insert.into.values.enabled'=true");
        }
    }

    private RowData extractRow(InsertValues insertValues, DataSource dataSource, FunctionRegistry functionRegistry, KsqlConfig ksqlConfig) {
        List<ColumnName> implicitColumns = insertValues.getColumns().isEmpty() ? implicitColumns(dataSource, insertValues.getValues()) : insertValues.getColumns();
        LogicalSchema schema = dataSource.getSchema();
        Map<ColumnName, Object> resolveValues = resolveValues(insertValues, implicitColumns, schema, functionRegistry, ksqlConfig);
        handleExplicitKeyField(resolveValues, dataSource.getKeyField());
        if (dataSource.getDataSourceType() == DataSource.DataSourceType.KTABLE && resolveValues.get(SchemaUtil.ROWKEY_NAME) == null) {
            throw new KsqlException("Value for ROWKEY is required for tables");
        }
        return RowData.of(((Long) resolveValues.getOrDefault(SchemaUtil.ROWTIME_NAME, Long.valueOf(this.clock.getAsLong()))).longValue(), buildKey(schema, resolveValues), buildValue(schema, resolveValues));
    }

    private static Struct buildKey(LogicalSchema logicalSchema, Map<ColumnName, Object> map) {
        Struct struct = new Struct(logicalSchema.keyConnectSchema());
        for (Field field : struct.schema().fields()) {
            struct.put(field, map.get(ColumnName.of(field.name())));
        }
        return struct;
    }

    private static GenericRow buildValue(LogicalSchema logicalSchema, Map<ColumnName, Object> map) {
        GenericRow genericRow = new GenericRow();
        Stream map2 = logicalSchema.value().stream().map((v0) -> {
            return v0.name();
        });
        map.getClass();
        return genericRow.appendAll((Collection) map2.map((v1) -> {
            return r2.get(v1);
        }).collect(Collectors.toList()));
    }

    private static List<ColumnName> implicitColumns(DataSource dataSource, List<Expression> list) {
        LogicalSchema schema = dataSource.getSchema();
        List<ColumnName> list2 = (List) Streams.concat(new Stream[]{schema.key().stream(), schema.value().stream()}).map((v0) -> {
            return v0.name();
        }).collect(Collectors.toList());
        if (list2.size() != list.size()) {
            throw new KsqlException("Expected a value for each column. Expected Columns: " + list2 + ". Got " + list);
        }
        return list2;
    }

    private static Map<ColumnName, Object> resolveValues(InsertValues insertValues, List<ColumnName> list, LogicalSchema logicalSchema, FunctionRegistry functionRegistry, KsqlConfig ksqlConfig) {
        HashMap hashMap = new HashMap();
        for (int i = 0; i < list.size(); i++) {
            ColumnName columnName = list.get(i);
            hashMap.put(columnName, new ExpressionResolver(columnType(columnName, logicalSchema), columnName, logicalSchema, functionRegistry, ksqlConfig).process((Expression) insertValues.getValues().get(i), null));
        }
        return hashMap;
    }

    private static void handleExplicitKeyField(Map<ColumnName, Object> map, KeyField keyField) {
        Optional ref = keyField.ref();
        if (ref.isPresent()) {
            ColumnName columnName = (ColumnName) ref.get();
            Object obj = map.get(columnName);
            Object obj2 = map.get(SchemaUtil.ROWKEY_NAME);
            if (!((obj != null) ^ (obj2 != null))) {
                if (obj != null && !Objects.equals(obj, obj2)) {
                    throw new KsqlException(String.format("Expected ROWKEY and %s to match but got %s and %s respectively.", columnName.toString(FormatOptions.noEscape()), obj2, obj));
                }
            } else if (obj == null) {
                map.put(columnName, obj2);
            } else {
                map.put(SchemaUtil.ROWKEY_NAME, obj);
            }
        }
    }

    private static SqlType columnType(ColumnName columnName, LogicalSchema logicalSchema) {
        return (SqlType) logicalSchema.findColumn(columnName).map((v0) -> {
            return v0.type();
        }).orElseThrow(IllegalStateException::new);
    }

    private byte[] serializeKey(Struct struct, DataSource dataSource, KsqlConfig ksqlConfig, ServiceContext serviceContext) {
        try {
            return this.keySerdeFactory.create(dataSource.getKsqlTopic().getKeyFormat().getFormatInfo(), PhysicalSchema.from(dataSource.getSchema(), dataSource.getSerdeOptions()).keySchema(), ksqlConfig, serviceContext.getSchemaRegistryClientFactory(), "", NoopProcessingLogContext.INSTANCE).serializer().serialize(dataSource.getKafkaTopicName(), struct);
        } catch (Exception e) {
            throw new KsqlException("Could not serialize key: " + struct, e);
        }
    }

    private byte[] serializeValue(GenericRow genericRow, DataSource dataSource, KsqlConfig ksqlConfig, ServiceContext serviceContext) {
        Serde create = this.valueSerdeFactory.create(dataSource.getKsqlTopic().getValueFormat().getFormatInfo(), PhysicalSchema.from(dataSource.getSchema(), dataSource.getSerdeOptions()).valueSchema(), ksqlConfig, serviceContext.getSchemaRegistryClientFactory(), "", NoopProcessingLogContext.INSTANCE);
        String kafkaTopicName = dataSource.getKafkaTopicName();
        try {
            return create.serializer().serialize(kafkaTopicName, genericRow);
        } catch (Exception e) {
            if (dataSource.getKsqlTopic().getValueFormat().getFormat().supportsSchemaInference()) {
                RestClientException rootCause = ExceptionUtils.getRootCause(e);
                if (rootCause instanceof RestClientException) {
                    switch (rootCause.getStatus()) {
                        case 401:
                        case 403:
                            throw new KsqlException(String.format("Not authorized to write Schema Registry subject: [%s]", kafkaTopicName + "-value"));
                    }
                }
            }
            LOG.error("Could not serialize row.", e);
            throw new KsqlException("Could not serialize row: " + genericRow, e);
        }
    }

    private static void sendRecord(ProducerRecord<byte[], byte[]> producerRecord, ServiceContext serviceContext, Map<String, Object> map) {
        Producer producer = serviceContext.getKafkaClientSupplier().getProducer(map);
        try {
            Future send = producer.send(producerRecord);
            producer.close(MAX_SEND_TIMEOUT);
            try {
                send.get();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            } catch (ExecutionException e2) {
                if (!(e2.getCause() instanceof RuntimeException)) {
                    throw new RuntimeException(e2);
                }
                throw ((RuntimeException) e2.getCause());
            }
        } catch (Throwable th) {
            producer.close(MAX_SEND_TIMEOUT);
            throw th;
        }
    }
}
