package io.confluent.ksql.api.impl;

import io.confluent.ksql.GenericKey;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.api.server.InsertResult;
import io.confluent.ksql.api.server.InsertsStreamSubscriber;
import io.confluent.ksql.api.util.ApiSqlValueCoercer;
import io.confluent.ksql.logging.processing.NoopProcessingLogContext;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.reactive.BaseSubscriber;
import io.confluent.ksql.reactive.BufferedPublisher;
import io.confluent.ksql.schema.ksql.PhysicalSchema;
import io.confluent.ksql.schema.ksql.SqlValueCoercer;
import io.confluent.ksql.serde.GenericKeySerDe;
import io.confluent.ksql.serde.GenericRowSerDe;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.VertxUtils;
import io.vertx.core.Context;
import io.vertx.core.WorkerExecutor;
import io.vertx.core.json.JsonObject;
import java.util.Objects;
import java.util.Optional;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/api/impl/InsertsSubscriber.class */
public final class InsertsSubscriber extends BaseSubscriber<JsonObject> implements InsertsStreamSubscriber {
    private static final int REQUEST_BATCH_SIZE = 200;
    private final Producer<byte[], byte[]> producer;
    private final DataSource dataSource;
    private final Serializer<GenericKey> keySerializer;
    private final Serializer<GenericRow> valueSerializer;
    private final BufferedPublisher<InsertResult> acksPublisher;
    private final WorkerExecutor workerExecutor;
    private int outstandingTokens;
    private boolean drainHandlerSet;
    private long sequence;
    private static final Logger log = LoggerFactory.getLogger(InsertsSubscriber.class);
    private static final SqlValueCoercer SQL_VALUE_COERCER = ApiSqlValueCoercer.INSTANCE;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/ksql/api/impl/InsertsSubscriber$SendCallback.class */
    public class SendCallback implements Callback {
        private final long seq;

        SendCallback(long j) {
            this.seq = j;
        }

        public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
            InsertResult failedInsert = exc != null ? InsertResult.failedInsert(this.seq, exc) : InsertResult.succeededInsert(this.seq);
            InsertsSubscriber.this.context.runOnContext(r5 -> {
                InsertsSubscriber.this.handleResult(failedInsert);
            });
        }
    }

    public static InsertsSubscriber createInsertsSubscriber(ServiceContext serviceContext, JsonObject jsonObject, DataSource dataSource, KsqlConfig ksqlConfig, Context context, Subscriber<InsertResult> subscriber, WorkerExecutor workerExecutor) {
        Producer producer = serviceContext.getKafkaClientSupplier().getProducer(ksqlConfig.cloneWithPropertyOverwrite(jsonObject.getMap()).originals());
        PhysicalSchema from = PhysicalSchema.from(dataSource.getSchema(), dataSource.getKsqlTopic().getKeyFormat().getFeatures(), dataSource.getKsqlTopic().getValueFormat().getFeatures());
        Serde create = new GenericKeySerDe().create(dataSource.getKsqlTopic().getKeyFormat().getFormatInfo(), from.keySchema(), ksqlConfig, serviceContext.getSchemaRegistryClientFactory(), "", NoopProcessingLogContext.INSTANCE, Optional.empty());
        Serde create2 = new GenericRowSerDe().create(dataSource.getKsqlTopic().getValueFormat().getFormatInfo(), from.valueSchema(), ksqlConfig, serviceContext.getSchemaRegistryClientFactory(), "", NoopProcessingLogContext.INSTANCE, Optional.empty());
        BufferedPublisher bufferedPublisher = new BufferedPublisher(context);
        bufferedPublisher.subscribe(subscriber);
        return new InsertsSubscriber(context, producer, dataSource, create.serializer(), create2.serializer(), bufferedPublisher, workerExecutor);
    }

    private InsertsSubscriber(Context context, Producer<byte[], byte[]> producer, DataSource dataSource, Serializer<GenericKey> serializer, Serializer<GenericRow> serializer2, BufferedPublisher<InsertResult> bufferedPublisher, WorkerExecutor workerExecutor) {
        super(context);
        this.producer = (Producer) Objects.requireNonNull(producer);
        this.dataSource = (DataSource) Objects.requireNonNull(dataSource);
        this.keySerializer = (Serializer) Objects.requireNonNull(serializer);
        this.valueSerializer = (Serializer) Objects.requireNonNull(serializer2);
        this.acksPublisher = (BufferedPublisher) Objects.requireNonNull(bufferedPublisher);
        this.workerExecutor = (WorkerExecutor) Objects.requireNonNull(workerExecutor);
    }

    @Override // io.confluent.ksql.api.server.InsertsStreamSubscriber
    public void close() {
        Producer<byte[], byte[]> producer = this.producer;
        producer.getClass();
        executeOnWorker(producer::close);
    }

    protected void afterSubscribe(Subscription subscription) {
        checkRequest();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void handleValue(JsonObject jsonObject) {
        try {
            JsonObject convertColumnNameCase = KeyValueExtractor.convertColumnNameCase(jsonObject);
            GenericKey extractKey = extractKey(convertColumnNameCase);
            GenericRow extractValues = extractValues(convertColumnNameCase);
            String kafkaTopicName = this.dataSource.getKafkaTopicName();
            ProducerRecord producerRecord = new ProducerRecord(kafkaTopicName, (Integer) null, Long.valueOf(System.currentTimeMillis()), this.keySerializer.serialize(kafkaTopicName, extractKey), this.valueSerializer.serialize(kafkaTopicName, extractValues));
            this.outstandingTokens--;
            this.producer.send(producerRecord, new SendCallback(this.sequence));
        } catch (Exception e) {
            this.acksPublisher.accept(InsertResult.failedInsert(this.sequence, e));
        }
        this.sequence++;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleResult(InsertResult insertResult) {
        VertxUtils.checkContext(this.context);
        if (!this.acksPublisher.accept(insertResult)) {
            checkRequest();
        } else {
            if (this.drainHandlerSet) {
                return;
            }
            this.acksPublisher.drainHandler(this::acksPublisherReceptive);
            this.drainHandlerSet = true;
        }
    }

    private void acksPublisherReceptive() {
        this.drainHandlerSet = false;
        checkRequest();
    }

    private void checkRequest() {
        if (this.outstandingTokens == 0) {
            this.outstandingTokens = 200;
            makeRequest(200L);
        }
    }

    private GenericKey extractKey(JsonObject jsonObject) {
        return KeyValueExtractor.extractKey(jsonObject, this.dataSource.getSchema(), SQL_VALUE_COERCER);
    }

    private GenericRow extractValues(JsonObject jsonObject) {
        return KeyValueExtractor.extractValues(jsonObject, this.dataSource.getSchema(), SQL_VALUE_COERCER);
    }

    private void executeOnWorker(Runnable runnable) {
        this.workerExecutor.executeBlocking(promise -> {
            runnable.run();
        }, false, asyncResult -> {
            if (asyncResult.failed()) {
                log.error("Failed to close producer", asyncResult.cause());
            }
        });
    }
}
