package io.confluent.ksql.api.impl;

import io.confluent.ksql.api.server.InsertResult;
import io.confluent.ksql.api.server.InsertsStreamSubscriber;
import io.confluent.ksql.api.server.KsqlApiException;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.Identifiers;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.ReservedInternalTopics;
import io.confluent.ksql.util.VertxUtils;
import io.vertx.core.Context;
import io.vertx.core.WorkerExecutor;
import io.vertx.core.json.JsonObject;
import org.reactivestreams.Subscriber;

/* loaded from: input_file:io/confluent/ksql/api/impl/InsertsStreamEndpoint.class */
public class InsertsStreamEndpoint {
    private final KsqlEngine ksqlEngine;
    private final KsqlConfig ksqlConfig;
    private final ReservedInternalTopics reservedInternalTopics;

    public InsertsStreamEndpoint(KsqlEngine ksqlEngine, KsqlConfig ksqlConfig, ReservedInternalTopics reservedInternalTopics) {
        this.ksqlEngine = ksqlEngine;
        this.ksqlConfig = ksqlConfig;
        this.reservedInternalTopics = reservedInternalTopics;
    }

    public InsertsStreamSubscriber createInsertsSubscriber(String str, JsonObject jsonObject, Subscriber<InsertResult> subscriber, Context context, WorkerExecutor workerExecutor, ServiceContext serviceContext) {
        VertxUtils.checkIsWorker();
        if (!this.ksqlConfig.getBoolean("ksql.insert.into.values.enabled").booleanValue()) {
            throw new KsqlApiException("The server has disabled INSERT INTO ... VALUES functionality. To enable it, restart your ksqlDB server with 'ksql.insert.into.values.enabled'=true", Errors.ERROR_CODE_BAD_REQUEST);
        }
        try {
            DataSource dataSource = getDataSource(this.ksqlEngine.getMetaStore(), SourceName.of(Identifiers.getIdentifierText(str)));
            if (dataSource.getDataSourceType() == DataSource.DataSourceType.KTABLE) {
                throw new KsqlApiException("Cannot insert into a table", Errors.ERROR_CODE_BAD_STATEMENT);
            }
            return InsertsSubscriber.createInsertsSubscriber(serviceContext, jsonObject, dataSource, this.ksqlConfig, context, subscriber, workerExecutor);
        } catch (IllegalArgumentException e) {
            throw new KsqlApiException("Invalid target name: " + e.getMessage(), Errors.ERROR_CODE_BAD_STATEMENT);
        }
    }

    private DataSource getDataSource(MetaStore metaStore, SourceName sourceName) {
        DataSource source = metaStore.getSource(sourceName);
        if (source == null) {
            throw new KsqlApiException("Cannot insert values into an unknown stream: " + sourceName, Errors.ERROR_CODE_BAD_STATEMENT);
        }
        if (source.getKsqlTopic().getKeyFormat().isWindowed()) {
            throw new KsqlApiException("Cannot insert values into windowed stream", Errors.ERROR_CODE_BAD_STATEMENT);
        }
        if (this.reservedInternalTopics.isReadOnly(source.getKafkaTopicName())) {
            throw new KsqlApiException("Cannot insert values into read-only topic: " + source.getKafkaTopicName(), Errors.ERROR_CODE_BAD_STATEMENT);
        }
        return source;
    }
}
