package io.confluent.ksql.engine;

import com.google.common.annotations.VisibleForTesting;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.engine.generic.GenericRecordFactory;
import io.confluent.ksql.engine.generic.KsqlGenericRecord;
import io.confluent.ksql.exception.KsqlTopicAuthorizationException;
import io.confluent.ksql.logging.processing.NoopProcessingLogContext;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.parser.tree.InsertValues;
import io.confluent.ksql.rest.SessionProperties;
import io.confluent.ksql.schema.ksql.PhysicalSchema;
import io.confluent.ksql.serde.Format;
import io.confluent.ksql.serde.FormatFactory;
import io.confluent.ksql.serde.GenericKeySerDe;
import io.confluent.ksql.serde.GenericRowSerDe;
import io.confluent.ksql.serde.KeySerdeFactory;
import io.confluent.ksql.serde.SerdeFeature;
import io.confluent.ksql.serde.ValueSerdeFactory;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlStatementException;
import io.confluent.ksql.util.ReservedInternalTopics;
import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.LongSupplier;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.connect.data.Struct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/engine/InsertValuesExecutor.class */
public class InsertValuesExecutor {
    private static final Logger LOG = LoggerFactory.getLogger(InsertValuesExecutor.class);
    private static final Duration MAX_SEND_TIMEOUT = Duration.ofSeconds(5);
    private final LongSupplier clock;
    private final boolean canBeDisabledByConfig;
    private final RecordProducer producer;
    private final ValueSerdeFactory valueSerdeFactory;
    private final KeySerdeFactory keySerdeFactory;

    /* loaded from: input_file:io/confluent/ksql/engine/InsertValuesExecutor$RecordProducer.class */
    public interface RecordProducer {
        void sendRecord(ProducerRecord<byte[], byte[]> producerRecord, ServiceContext serviceContext, Map<String, Object> map);
    }

    public InsertValuesExecutor() {
        this(true, InsertValuesExecutor::sendRecord);
    }

    @VisibleForTesting
    public InsertValuesExecutor(boolean z, RecordProducer recordProducer) {
        this(recordProducer, z, System::currentTimeMillis, new GenericKeySerDe(), new GenericRowSerDe());
    }

    @VisibleForTesting
    InsertValuesExecutor(LongSupplier longSupplier, KeySerdeFactory keySerdeFactory, ValueSerdeFactory valueSerdeFactory) {
        this(InsertValuesExecutor::sendRecord, true, longSupplier, keySerdeFactory, valueSerdeFactory);
    }

    private InsertValuesExecutor(RecordProducer recordProducer, boolean z, LongSupplier longSupplier, KeySerdeFactory keySerdeFactory, ValueSerdeFactory valueSerdeFactory) {
        this.canBeDisabledByConfig = z;
        this.producer = (RecordProducer) Objects.requireNonNull(recordProducer, "producer");
        this.clock = (LongSupplier) Objects.requireNonNull(longSupplier, "clock");
        this.keySerdeFactory = (KeySerdeFactory) Objects.requireNonNull(keySerdeFactory, "keySerdeFactory");
        this.valueSerdeFactory = (ValueSerdeFactory) Objects.requireNonNull(valueSerdeFactory, "valueSerdeFactory");
    }

    public void execute(ConfiguredStatement<InsertValues> configuredStatement, SessionProperties sessionProperties, KsqlExecutionContext ksqlExecutionContext, ServiceContext serviceContext) {
        InsertValues statement = configuredStatement.getStatement();
        MetaStore metaStore = ksqlExecutionContext.getMetaStore();
        KsqlConfig config = configuredStatement.getSessionConfig().getConfig(true);
        try {
            this.producer.sendRecord(buildRecord(configuredStatement, metaStore, getDataSource(config, metaStore, statement), serviceContext), serviceContext, config.getProducerClientConfigProps());
        } catch (Exception e) {
            throw new KsqlException(createInsertFailedExceptionMessage(statement), e);
        } catch (TopicAuthorizationException e2) {
            throw new KsqlException(createInsertFailedExceptionMessage(statement), new KsqlTopicAuthorizationException(AclOperation.WRITE, e2.unauthorizedTopics()));
        }
    }

    private static DataSource getDataSource(KsqlConfig ksqlConfig, MetaStore metaStore, InsertValues insertValues) {
        DataSource source = metaStore.getSource(insertValues.getTarget());
        if (source == null) {
            throw new KsqlException("Cannot insert values into an unknown stream/table: " + insertValues.getTarget());
        }
        if (source.getKsqlTopic().getKeyFormat().isWindowed()) {
            throw new KsqlException("Cannot insert values into windowed stream/table!");
        }
        if (new ReservedInternalTopics(ksqlConfig).isReadOnly(source.getKafkaTopicName())) {
            throw new KsqlException("Cannot insert values into read-only topic: " + source.getKafkaTopicName());
        }
        return source;
    }

    private ProducerRecord<byte[], byte[]> buildRecord(ConfiguredStatement<InsertValues> configuredStatement, MetaStore metaStore, DataSource dataSource, ServiceContext serviceContext) {
        throwIfDisabled(configuredStatement.getSessionConfig().getConfig(false));
        InsertValues statement = configuredStatement.getStatement();
        KsqlConfig config = configuredStatement.getSessionConfig().getConfig(true);
        try {
            KsqlGenericRecord build = new GenericRecordFactory(config, metaStore, this.clock).build(statement.getColumns(), statement.getValues(), dataSource.getSchema(), dataSource.getDataSourceType());
            return new ProducerRecord<>(dataSource.getKafkaTopicName(), (Integer) null, Long.valueOf(build.ts), serializeKey(build.key, dataSource, config, serviceContext), serializeValue(build.value, dataSource, config, serviceContext));
        } catch (Exception e) {
            throw new KsqlStatementException(createInsertFailedExceptionMessage(statement) + " " + e.getMessage(), configuredStatement.getStatementText(), e);
        }
    }

    private static String createInsertFailedExceptionMessage(InsertValues insertValues) {
        return "Failed to insert values into '" + insertValues.getTarget().text() + "'.";
    }

    private void throwIfDisabled(KsqlConfig ksqlConfig) {
        boolean booleanValue = ksqlConfig.getBoolean("ksql.insert.into.values.enabled").booleanValue();
        if (this.canBeDisabledByConfig && !booleanValue) {
            throw new KsqlException("The server has disabled INSERT INTO ... VALUES functionality. To enable it, restart your ksqlDB server with 'ksql.insert.into.values.enabled'=true");
        }
    }

    private byte[] serializeKey(Struct struct, DataSource dataSource, KsqlConfig ksqlConfig, ServiceContext serviceContext) {
        Serde create = this.keySerdeFactory.create(dataSource.getKsqlTopic().getKeyFormat().getFormatInfo(), PhysicalSchema.from(dataSource.getSchema(), dataSource.getKsqlTopic().getKeyFormat().getFeatures(), dataSource.getKsqlTopic().getValueFormat().getFeatures()).keySchema(), ksqlConfig, serviceContext.getSchemaRegistryClientFactory(), "", NoopProcessingLogContext.INSTANCE, Optional.empty());
        String kafkaTopicName = dataSource.getKafkaTopicName();
        try {
            return create.serializer().serialize(kafkaTopicName, struct);
        } catch (Exception e) {
            maybeThrowSchemaRegistryAuthError(FormatFactory.fromName(dataSource.getKsqlTopic().getKeyFormat().getFormat()), kafkaTopicName, true, e);
            LOG.error("Could not serialize key.", e);
            throw new KsqlException("Could not serialize key: " + struct, e);
        }
    }

    private byte[] serializeValue(GenericRow genericRow, DataSource dataSource, KsqlConfig ksqlConfig, ServiceContext serviceContext) {
        Serde create = this.valueSerdeFactory.create(dataSource.getKsqlTopic().getValueFormat().getFormatInfo(), PhysicalSchema.from(dataSource.getSchema(), dataSource.getKsqlTopic().getKeyFormat().getFeatures(), dataSource.getKsqlTopic().getValueFormat().getFeatures()).valueSchema(), ksqlConfig, serviceContext.getSchemaRegistryClientFactory(), "", NoopProcessingLogContext.INSTANCE, Optional.empty());
        String kafkaTopicName = dataSource.getKafkaTopicName();
        try {
            return create.serializer().serialize(kafkaTopicName, genericRow);
        } catch (Exception e) {
            maybeThrowSchemaRegistryAuthError(FormatFactory.fromName(dataSource.getKsqlTopic().getValueFormat().getFormat()), kafkaTopicName, false, e);
            LOG.error("Could not serialize value.", e);
            throw new KsqlException("Could not serialize value: " + genericRow + ". " + e.getMessage(), e);
        }
    }

    private static void maybeThrowSchemaRegistryAuthError(Format format, String str, boolean z, Exception exc) {
        if (format.supportsFeature(SerdeFeature.SCHEMA_INFERENCE)) {
            RestClientException rootCause = ExceptionUtils.getRootCause(exc);
            if (rootCause instanceof RestClientException) {
                switch (rootCause.getStatus()) {
                    case 401:
                    case 403:
                        throw new KsqlException(String.format("Not authorized to write Schema Registry subject: [%s]", KsqlConstants.getSRSubject(str, z)));
                    default:
                        return;
                }
            }
        }
    }

    private static void sendRecord(ProducerRecord<byte[], byte[]> producerRecord, ServiceContext serviceContext, Map<String, Object> map) {
        Producer producer = serviceContext.getKafkaClientSupplier().getProducer(map);
        try {
            Future send = producer.send(producerRecord);
            producer.close(MAX_SEND_TIMEOUT);
            try {
                send.get();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            } catch (ExecutionException e2) {
                if (!(e2.getCause() instanceof RuntimeException)) {
                    throw new RuntimeException(e2);
                }
                throw ((RuntimeException) e2.getCause());
            }
        } catch (Throwable th) {
            producer.close(MAX_SEND_TIMEOUT);
            throw th;
        }
    }
}
