package io.confluent.ksql.rest.server.execution;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.ksql.GenericKey;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.engine.generic.GenericRecordFactory;
import io.confluent.ksql.engine.generic.KsqlGenericRecord;
import io.confluent.ksql.exception.KsqlSchemaAuthorizationException;
import io.confluent.ksql.exception.KsqlTopicAuthorizationException;
import io.confluent.ksql.logging.processing.NoopProcessingLogContext;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.parser.tree.InsertValues;
import io.confluent.ksql.rest.SessionProperties;
import io.confluent.ksql.schema.ksql.PersistenceSchema;
import io.confluent.ksql.schema.ksql.PhysicalSchema;
import io.confluent.ksql.schema.ksql.SimpleColumn;
import io.confluent.ksql.schema.registry.SchemaAndId;
import io.confluent.ksql.schema.registry.SchemaRegistryUtil;
import io.confluent.ksql.serde.Format;
import io.confluent.ksql.serde.FormatFactory;
import io.confluent.ksql.serde.FormatInfo;
import io.confluent.ksql.serde.GenericKeySerDe;
import io.confluent.ksql.serde.GenericRowSerDe;
import io.confluent.ksql.serde.KeyFormat;
import io.confluent.ksql.serde.KeySerdeFactory;
import io.confluent.ksql.serde.SchemaTranslator;
import io.confluent.ksql.serde.SerdeFeature;
import io.confluent.ksql.serde.ValueSerdeFactory;
import io.confluent.ksql.serde.protobuf.ProtobufFormat;
import io.confluent.ksql.serde.protobuf.ProtobufProperties;
import io.confluent.ksql.serde.protobuf.ProtobufSchemaTranslator;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlStatementException;
import io.confluent.ksql.util.ReservedInternalTopics;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.serialization.Serde;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/rest/server/execution/InsertValuesExecutor.class */
public class InsertValuesExecutor {
    private static final Logger LOG = LoggerFactory.getLogger(InsertValuesExecutor.class);
    private static final Duration MAX_SEND_TIMEOUT = Duration.ofSeconds(5);
    private final LongSupplier clock;
    private final boolean canBeDisabledByConfig;
    private final RecordProducer producer;
    private final ValueSerdeFactory valueSerdeFactory;
    private final KeySerdeFactory keySerdeFactory;

    /* loaded from: input_file:io/confluent/ksql/rest/server/execution/InsertValuesExecutor$RecordProducer.class */
    public interface RecordProducer {
        void sendRecord(ProducerRecord<byte[], byte[]> producerRecord, ServiceContext serviceContext, Map<String, Object> map);
    }

    public InsertValuesExecutor() {
        this(true, InsertValuesExecutor::sendRecord);
    }

    @VisibleForTesting
    public InsertValuesExecutor(boolean z, RecordProducer recordProducer) {
        this(recordProducer, z, System::currentTimeMillis, new GenericKeySerDe(), new GenericRowSerDe());
    }

    @VisibleForTesting
    InsertValuesExecutor(LongSupplier longSupplier, KeySerdeFactory keySerdeFactory, ValueSerdeFactory valueSerdeFactory) {
        this(InsertValuesExecutor::sendRecord, true, longSupplier, keySerdeFactory, valueSerdeFactory);
    }

    private InsertValuesExecutor(RecordProducer recordProducer, boolean z, LongSupplier longSupplier, KeySerdeFactory keySerdeFactory, ValueSerdeFactory valueSerdeFactory) {
        this.canBeDisabledByConfig = z;
        this.producer = (RecordProducer) Objects.requireNonNull(recordProducer, "producer");
        this.clock = (LongSupplier) Objects.requireNonNull(longSupplier, "clock");
        this.keySerdeFactory = (KeySerdeFactory) Objects.requireNonNull(keySerdeFactory, "keySerdeFactory");
        this.valueSerdeFactory = (ValueSerdeFactory) Objects.requireNonNull(valueSerdeFactory, "valueSerdeFactory");
    }

    public void execute(ConfiguredStatement<InsertValues> configuredStatement, SessionProperties sessionProperties, KsqlExecutionContext ksqlExecutionContext, ServiceContext serviceContext) {
        InsertValues statement = configuredStatement.getStatement();
        MetaStore metaStore = ksqlExecutionContext.getMetaStore();
        KsqlConfig config = configuredStatement.getSessionConfig().getConfig(true);
        DataSource dataSource = getDataSource(config, metaStore, statement);
        validateInsert(statement.getColumns(), dataSource);
        try {
            this.producer.sendRecord(buildRecord(configuredStatement, metaStore, dataSource, serviceContext), serviceContext, config.getProducerClientConfigProps());
        } catch (Exception e) {
            throw new KsqlException(createInsertFailedExceptionMessage(statement), e);
        } catch (TopicAuthorizationException e2) {
            throw new KsqlException(createInsertFailedExceptionMessage(statement), new KsqlTopicAuthorizationException(AclOperation.WRITE, e2.unauthorizedTopics()));
        } catch (KafkaException e3) {
            if (e3.getCause() != null && (e3.getCause() instanceof ClusterAuthorizationException)) {
                throw new KsqlException(createInsertFailedExceptionMessage(statement), createClusterAuthorizationExceptionRootCause(dataSource));
            }
            throw new KsqlException(createInsertFailedExceptionMessage(statement), e3);
        } catch (ClusterAuthorizationException e4) {
            throw new KsqlException(createInsertFailedExceptionMessage(statement), createClusterAuthorizationExceptionRootCause(dataSource));
        }
    }

    private void validateInsert(List<ColumnName> list, DataSource dataSource) {
        List list2 = list.isEmpty() ? (List) dataSource.getSchema().headers().stream().map(column -> {
            return column.name().text();
        }).collect(Collectors.toList()) : (List) list.stream().filter(columnName -> {
            return dataSource.getSchema().isHeaderColumn(columnName);
        }).map((v0) -> {
            return v0.text();
        }).collect(Collectors.toList());
        if (!list2.isEmpty()) {
            throw new KsqlException("Cannot insert into HEADER columns: " + String.join(", ", list2));
        }
    }

    private static DataSource getDataSource(KsqlConfig ksqlConfig, MetaStore metaStore, InsertValues insertValues) {
        DataSource source = metaStore.getSource(insertValues.getTarget());
        if (source == null) {
            throw new KsqlException("Cannot insert values into an unknown stream/table: " + insertValues.getTarget() + metaStore.checkAlternatives(insertValues.getTarget(), Optional.empty()));
        }
        if (source.getKsqlTopic().getKeyFormat().isWindowed()) {
            throw new KsqlException("Cannot insert values into windowed stream/table!");
        }
        if (new ReservedInternalTopics(ksqlConfig).isReadOnly(source.getKafkaTopicName())) {
            throw new KsqlException("Cannot insert values into read-only topic: " + source.getKafkaTopicName());
        }
        if (source.isSource()) {
            throw new KsqlException(String.format("Cannot insert values into read-only %s: %s", source.getDataSourceType().getKsqlType().toLowerCase(), source.getName().text()));
        }
        return source;
    }

    private ProducerRecord<byte[], byte[]> buildRecord(ConfiguredStatement<InsertValues> configuredStatement, MetaStore metaStore, DataSource dataSource, ServiceContext serviceContext) {
        throwIfDisabled(configuredStatement.getSessionConfig().getConfig(false));
        InsertValues statement = configuredStatement.getStatement();
        KsqlConfig config = configuredStatement.getSessionConfig().getConfig(true);
        try {
            KsqlGenericRecord build = new GenericRecordFactory(config, metaStore, this.clock).build(statement.getColumns(), statement.getValues(), dataSource.getSchema(), dataSource.getDataSourceType());
            return new ProducerRecord<>(dataSource.getKafkaTopicName(), (Integer) null, Long.valueOf(build.ts), serializeKey(build.key, dataSource, config, serviceContext), serializeValue(build.value, dataSource, config, serviceContext));
        } catch (Exception e) {
            throw new KsqlStatementException(createInsertFailedExceptionMessage(statement) + " " + e.getMessage(), configuredStatement.getMaskedStatementText(), e);
        }
    }

    private static String createInsertFailedExceptionMessage(InsertValues insertValues) {
        return "Failed to insert values into '" + insertValues.getTarget().text() + "'.";
    }

    private void throwIfDisabled(KsqlConfig ksqlConfig) {
        boolean booleanValue = ksqlConfig.getBoolean("ksql.insert.into.values.enabled").booleanValue();
        if (this.canBeDisabledByConfig && !booleanValue) {
            throw new KsqlException("The server has disabled INSERT INTO ... VALUES functionality. To enable it, restart your ksqlDB server with 'ksql.insert.into.values.enabled'=true");
        }
    }

    private static Exception createClusterAuthorizationExceptionRootCause(DataSource dataSource) {
        return new KsqlTopicAuthorizationException(AclOperation.WRITE, Collections.singletonList(dataSource.getKafkaTopicName()), "The producer is not authorized to do idempotent sends. Check that you have write permissions to the specified topic, and disable idempotent sends by setting 'enable.idempotent=false'  if necessary.");
    }

    private byte[] serializeKey(GenericKey genericKey, DataSource dataSource, KsqlConfig ksqlConfig, ServiceContext serviceContext) {
        PhysicalSchema from = PhysicalSchema.from(dataSource.getSchema(), dataSource.getKsqlTopic().getKeyFormat().getFeatures(), dataSource.getKsqlTopic().getValueFormat().getFeatures());
        Serde create = this.keySerdeFactory.create(addSerializerMissingFormatFields(dataSource.getKsqlTopic().getKeyFormat().getFormatInfo(), dataSource.getKafkaTopicName(), true, ensureKeySchemasMatch(from.keySchema(), dataSource, serviceContext)), from.keySchema(), ksqlConfig, serviceContext.getSchemaRegistryClientFactory(), "", NoopProcessingLogContext.INSTANCE, Optional.empty());
        Throwable th = null;
        try {
            String kafkaTopicName = dataSource.getKafkaTopicName();
            try {
                byte[] serialize = create.serializer().serialize(kafkaTopicName, genericKey);
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        create.close();
                    }
                }
                return serialize;
            } catch (Exception e) {
                maybeThrowSchemaRegistryAuthError(FormatFactory.fromName(dataSource.getKsqlTopic().getKeyFormat().getFormat()), kafkaTopicName, true, AclOperation.WRITE, e);
                LOG.error("Could not serialize key.", e);
                throw new KsqlException("Could not serialize key", e);
            }
        } catch (Throwable th3) {
            if (create != null) {
                if (0 != 0) {
                    try {
                        create.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    create.close();
                }
            }
            throw th3;
        }
    }

    private static Optional<Integer> ensureKeySchemasMatch(PersistenceSchema persistenceSchema, DataSource dataSource, ServiceContext serviceContext) {
        KeyFormat keyFormat = dataSource.getKsqlTopic().getKeyFormat();
        Format fromName = FormatFactory.fromName(keyFormat.getFormat());
        if (!fromName.supportsFeature(SerdeFeature.SCHEMA_INFERENCE)) {
            return Optional.empty();
        }
        Map properties = keyFormat.getFormatInfo().getProperties();
        SchemaTranslator schemaTranslator = fromName.getSchemaTranslator(properties);
        ParsedSchema parsedSchema = schemaTranslator.toParsedSchema(persistenceSchema);
        try {
            Optional latestSchemaAndId = SchemaRegistryUtil.getLatestSchemaAndId(serviceContext.getSchemaRegistryClient(), dataSource.getKafkaTopicName(), true);
            if (!latestSchemaAndId.isPresent()) {
                throw new KsqlException(String.format("Failed to fetch key schema (%s). Please check if schema exists in Schema Registry and/or check connection with Schema Registry.", KsqlConstants.getSRSubject(dataSource.getKafkaTopicName(), true)));
            }
            ParsedSchema schema = ((SchemaAndId) latestSchemaAndId.get()).getSchema();
            if ((fromName instanceof ProtobufFormat) && properties.containsKey("fullSchemaName")) {
                ProtobufSchemaTranslator protobufSchemaTranslator = new ProtobufSchemaTranslator(new ProtobufProperties(properties));
                schema = protobufSchemaTranslator.fromConnectSchema(protobufSchemaTranslator.toConnectSchema(((SchemaAndId) latestSchemaAndId.get()).getSchema()));
            }
            if (schemaEquals(schemaTranslator, persistenceSchema, parsedSchema, schema)) {
                return Optional.of(((SchemaAndId) latestSchemaAndId.get()).getId());
            }
            throw new KsqlException("Cannot INSERT VALUES into data source " + dataSource.getName() + ". ksqlDB generated schema would overwrite existing key schema.\n\tExisting Schema: " + getColumns(schemaTranslator, persistenceSchema, ((SchemaAndId) latestSchemaAndId.get()).getSchema(), true).toString() + "\n\tksqlDB Generated: " + persistenceSchema.columns());
        } catch (KsqlException e) {
            maybeThrowSchemaRegistryAuthError(fromName, dataSource.getKafkaTopicName(), true, AclOperation.READ, e);
            throw new KsqlException("Could not determine that insert values operations is safe; operation potentially overrides existing key schema in schema registry.", e);
        }
    }

    private static List<SimpleColumn> getColumns(SchemaTranslator schemaTranslator, PersistenceSchema persistenceSchema, ParsedSchema parsedSchema, boolean z) {
        return schemaTranslator.toColumns(parsedSchema, persistenceSchema.features(), z);
    }

    private static boolean schemaEquals(SchemaTranslator schemaTranslator, PersistenceSchema persistenceSchema, ParsedSchema parsedSchema, ParsedSchema parsedSchema2) {
        List<SimpleColumn> columns = getColumns(schemaTranslator, persistenceSchema, parsedSchema, true);
        List<SimpleColumn> columns2 = getColumns(schemaTranslator, persistenceSchema, parsedSchema2, true);
        if (columns.size() != columns2.size()) {
            return false;
        }
        for (int i = 0; i < columns.size(); i++) {
            SimpleColumn simpleColumn = columns.get(i);
            SimpleColumn simpleColumn2 = columns2.get(i);
            if (!simpleColumn.name().equals(simpleColumn2.name()) || !simpleColumn.type().equals(simpleColumn2.type())) {
                return false;
            }
        }
        return true;
    }

    private byte[] serializeValue(GenericRow genericRow, DataSource dataSource, KsqlConfig ksqlConfig, ServiceContext serviceContext) {
        Serde create = this.valueSerdeFactory.create(addSerializerMissingFormatFields(dataSource.getKsqlTopic().getValueFormat().getFormatInfo(), dataSource.getKafkaTopicName(), false, Optional.empty()), PhysicalSchema.from(dataSource.getSchema(), dataSource.getKsqlTopic().getKeyFormat().getFeatures(), dataSource.getKsqlTopic().getValueFormat().getFeatures()).valueSchema(), ksqlConfig, serviceContext.getSchemaRegistryClientFactory(), "", NoopProcessingLogContext.INSTANCE, Optional.empty());
        Throwable th = null;
        try {
            String kafkaTopicName = dataSource.getKafkaTopicName();
            try {
                byte[] serialize = create.serializer().serialize(kafkaTopicName, genericRow);
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        create.close();
                    }
                }
                return serialize;
            } catch (Exception e) {
                maybeThrowSchemaRegistryAuthError(FormatFactory.fromName(dataSource.getKsqlTopic().getValueFormat().getFormat()), kafkaTopicName, false, AclOperation.WRITE, e);
                LOG.error("Could not serialize value.", e);
                throw new KsqlException("Could not serialize value" + e.getMessage(), e);
            }
        } catch (Throwable th3) {
            if (create != null) {
                if (0 != 0) {
                    try {
                        create.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    create.close();
                }
            }
            throw th3;
        }
    }

    private static FormatInfo addSerializerMissingFormatFields(FormatInfo formatInfo, String str, boolean z, Optional<Integer> optional) {
        Format fromName = FormatFactory.fromName(formatInfo.getFormat());
        if (!fromName.supportsFeature(SerdeFeature.SCHEMA_INFERENCE)) {
            return formatInfo;
        }
        if (optional.isPresent() && !formatInfo.getProperties().containsKey("schemaId")) {
            ImmutableMap.Builder builder = ImmutableMap.builder();
            builder.putAll(formatInfo.getProperties());
            builder.put("schemaId", String.valueOf(optional.get()));
            return FormatInfo.of(formatInfo.getFormat(), builder.build());
        }
        if (formatInfo.getProperties().containsKey("schemaId") || !fromName.getSupportedProperties().contains("subjectName")) {
            return formatInfo;
        }
        ImmutableMap.Builder builder2 = ImmutableMap.builder();
        builder2.putAll(formatInfo.getProperties());
        builder2.put("subjectName", KsqlConstants.getSRSubject(str, z));
        return FormatInfo.of(formatInfo.getFormat(), builder2.build());
    }

    private static void maybeThrowSchemaRegistryAuthError(Format format, String str, boolean z, AclOperation aclOperation, Exception exc) {
        if (format.supportsFeature(SerdeFeature.SCHEMA_INFERENCE)) {
            RestClientException restClientException = (Throwable) ObjectUtils.defaultIfNull(ExceptionUtils.getRootCause(exc), exc);
            if (restClientException instanceof RestClientException) {
                switch (restClientException.getStatus()) {
                    case 401:
                    case 403:
                        throw new KsqlSchemaAuthorizationException(aclOperation, KsqlConstants.getSRSubject(str, z));
                    default:
                        return;
                }
            }
        }
    }

    private static void sendRecord(ProducerRecord<byte[], byte[]> producerRecord, ServiceContext serviceContext, Map<String, Object> map) {
        Producer producer = serviceContext.getKafkaClientSupplier().getProducer(map);
        try {
            Future send = producer.send(producerRecord);
            producer.close(MAX_SEND_TIMEOUT);
            try {
                send.get();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            } catch (ExecutionException e2) {
                if (!(e2.getCause() instanceof RuntimeException)) {
                    throw new RuntimeException(e2);
                }
                throw ((RuntimeException) e2.getCause());
            }
        } catch (Throwable th) {
            producer.close(MAX_SEND_TIMEOUT);
            throw th;
        }
    }
}
