package io.confluent.ksql.api.server;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.connect.protobuf.ProtobufData;
import io.confluent.connect.protobuf.ProtobufDataConfig;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.api.spi.QueryPublisher;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.rest.ApiJsonMapper;
import io.confluent.ksql.rest.entity.ConsistencyToken;
import io.confluent.ksql.rest.entity.KsqlErrorMessage;
import io.confluent.ksql.rest.entity.KsqlHostInfoEntity;
import io.confluent.ksql.rest.entity.PushContinuationToken;
import io.confluent.ksql.rest.entity.QueryResponseMetadata;
import io.confluent.ksql.rest.entity.StreamedRow;
import io.confluent.ksql.rest.server.resources.streaming.TombstoneFactory;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.serde.connect.ConnectSchemas;
import io.confluent.ksql.serde.connect.KsqlConnectSerializer;
import io.confluent.ksql.serde.protobuf.ProtobufNoSRSerdeFactory;
import io.confluent.ksql.util.KeyValue;
import io.confluent.ksql.util.KeyValueMetadata;
import io.confluent.ksql.util.KsqlHostInfo;
import io.confluent.ksql.util.RowMetadata;
import io.vertx.core.Context;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpServerResponse;
import java.nio.charset.StandardCharsets;
import java.time.Clock;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import org.apache.kafka.connect.data.ConnectSchema;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;

/* loaded from: input_file:io/confluent/ksql/api/server/JsonStreamedRowResponseWriter.class */
public class JsonStreamedRowResponseWriter implements QueryStreamResponseWriter {
    private static final int FLUSH_SIZE_BYTES = 5120;
    private static final ObjectMapper OBJECT_MAPPER = ApiJsonMapper.INSTANCE.get();
    static final long MAX_FLUSH_MS = 200;
    private final HttpServerResponse response;
    private final Optional<TombstoneFactory> tombstoneFactory;
    private final Optional<String> completionMessage;
    private final Optional<String> limitMessage;
    private final Clock clock;
    private final boolean bufferOutput;
    private final Context context;
    private final RowFormat rowFormat;
    private final WriterState writerState;
    private StreamedRow lastRow;
    private long timerId = -1;

    /* loaded from: input_file:io/confluent/ksql/api/server/JsonStreamedRowResponseWriter$RowFormat.class */
    public enum RowFormat {
        PROTOBUF { // from class: io.confluent.ksql.api.server.JsonStreamedRowResponseWriter.RowFormat.1
            private transient ConnectSchema connectSchema;
            private transient KsqlConnectSerializer<Struct> serializer;

            @Override // io.confluent.ksql.api.server.JsonStreamedRowResponseWriter.RowFormat
            public StreamedRow metadataRow(QueryResponseMetadata queryResponseMetadata) {
                LogicalSchema logicalSchema = queryResponseMetadata.schema;
                String str = queryResponseMetadata.queryId;
                this.connectSchema = ConnectSchemas.columnsToConnectSchema(logicalSchema.columns());
                this.serializer = new ProtobufNoSRSerdeFactory(ImmutableMap.of()).createSerializer(this.connectSchema, Struct.class, false);
                return StreamedRow.headerProtobuf(new QueryId(str), logicalSchema, JsonStreamedRowResponseWriter.logicalToProtoSchema(logicalSchema));
            }

            @Override // io.confluent.ksql.api.server.JsonStreamedRowResponseWriter.RowFormat
            public StreamedRow dataRow(KeyValueMetadata<List<?>, GenericRow> keyValueMetadata) {
                KeyValue keyValue = keyValueMetadata.getKeyValue();
                Struct struct = new Struct(this.connectSchema);
                int i = 0;
                Iterator it = this.connectSchema.fields().iterator();
                while (it.hasNext()) {
                    struct.put((Field) it.next(), ((GenericRow) keyValue.value()).get(i));
                    i++;
                }
                return StreamedRow.pullRowProtobuf(this.serializer.serialize("", struct));
            }
        },
        JSON { // from class: io.confluent.ksql.api.server.JsonStreamedRowResponseWriter.RowFormat.2
            @Override // io.confluent.ksql.api.server.JsonStreamedRowResponseWriter.RowFormat
            public StreamedRow metadataRow(QueryResponseMetadata queryResponseMetadata) {
                return StreamedRow.header(new QueryId(queryResponseMetadata.queryId), queryResponseMetadata.schema);
            }

            @Override // io.confluent.ksql.api.server.JsonStreamedRowResponseWriter.RowFormat
            public StreamedRow dataRow(KeyValueMetadata<List<?>, GenericRow> keyValueMetadata) {
                KeyValue keyValue = keyValueMetadata.getKeyValue();
                return (keyValueMetadata.getRowMetadata().isPresent() && ((RowMetadata) keyValueMetadata.getRowMetadata().get()).getSourceNode().isPresent()) ? StreamedRow.pullRow((GenericRow) keyValue.value(), JsonStreamedRowResponseWriter.toKsqlHostInfoEntity(((RowMetadata) keyValueMetadata.getRowMetadata().get()).getSourceNode())) : StreamedRow.pushRow((GenericRow) keyValue.value());
            }
        };

        public abstract StreamedRow metadataRow(QueryResponseMetadata queryResponseMetadata);

        public abstract StreamedRow dataRow(KeyValueMetadata<List<?>, GenericRow> keyValueMetadata);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/ksql/api/server/JsonStreamedRowResponseWriter$WriterState.class */
    public static class WriterState {
        private final Clock clock;
        private StringBuilder sb = new StringBuilder();
        private long lastFlushMs;

        WriterState(Clock clock) {
            this.lastFlushMs = 0L;
            this.clock = clock;
            this.lastFlushMs = clock.millis();
        }

        public WriterState append(String str) {
            this.sb.append(str);
            return this;
        }

        public int length() {
            return this.sb.length();
        }

        public long getLastFlushMs() {
            return this.lastFlushMs;
        }

        public String getStringToFlush() {
            String sb = this.sb.toString();
            this.sb = new StringBuilder();
            this.lastFlushMs = this.clock.millis();
            return sb;
        }
    }

    @SuppressFBWarnings({"EI_EXPOSE_REP2"})
    public JsonStreamedRowResponseWriter(HttpServerResponse httpServerResponse, QueryPublisher queryPublisher, Optional<String> optional, Optional<String> optional2, Clock clock, boolean z, Context context, RowFormat rowFormat) {
        this.response = httpServerResponse;
        this.tombstoneFactory = queryPublisher.getResultType().map(resultType -> {
            return TombstoneFactory.create(queryPublisher.geLogicalSchema(), resultType);
        });
        this.completionMessage = optional;
        this.limitMessage = optional2;
        this.writerState = new WriterState(clock);
        this.clock = clock;
        this.bufferOutput = z;
        this.context = context;
        Preconditions.checkState(z || optional2.isPresent() || optional.isPresent(), "If buffering isn't used, a limit/completion message must be set");
        this.rowFormat = rowFormat;
    }

    @Override // io.confluent.ksql.api.server.QueryStreamResponseWriter
    public QueryStreamResponseWriter writeMetadata(QueryResponseMetadata queryResponseMetadata) {
        StreamedRow metadataRow = this.rowFormat.metadataRow(queryResponseMetadata);
        Buffer appendByte = Buffer.buffer().appendByte((byte) 91);
        if (this.bufferOutput) {
            writeBuffer(appendByte, true);
            maybeCacheRowAndWriteLast(metadataRow);
        } else {
            appendByte.appendBuffer(serializeObject(metadataRow));
            writeBuffer(appendByte, false);
        }
        return this;
    }

    @Override // io.confluent.ksql.api.server.QueryStreamResponseWriter
    public QueryStreamResponseWriter writeRow(KeyValueMetadata<List<?>, GenericRow> keyValueMetadata) {
        StreamedRow dataRow;
        KeyValue<List<?>, GenericRow> keyValue = keyValueMetadata.getKeyValue();
        if (keyValue.value() == null) {
            Preconditions.checkState(this.tombstoneFactory.isPresent(), "Should only have null values for query types that support them");
            dataRow = StreamedRow.tombstone(this.tombstoneFactory.get().createRow(keyValue));
        } else {
            dataRow = this.rowFormat.dataRow(keyValueMetadata);
        }
        maybeCacheRowAndWriteLast(dataRow);
        return this;
    }

    @Override // io.confluent.ksql.api.server.QueryStreamResponseWriter
    public QueryStreamResponseWriter writeContinuationToken(PushContinuationToken pushContinuationToken) {
        maybeCacheRowAndWriteLast(StreamedRow.continuationToken(pushContinuationToken));
        return this;
    }

    @Override // io.confluent.ksql.api.server.QueryStreamResponseWriter
    public QueryStreamResponseWriter writeError(KsqlErrorMessage ksqlErrorMessage) {
        maybeCacheRowAndWriteLast(StreamedRow.error(ksqlErrorMessage));
        return this;
    }

    @Override // io.confluent.ksql.api.server.QueryStreamResponseWriter
    public QueryStreamResponseWriter writeConsistencyToken(ConsistencyToken consistencyToken) {
        maybeCacheRowAndWriteLast(StreamedRow.consistencyToken(consistencyToken));
        return this;
    }

    @Override // io.confluent.ksql.api.server.QueryStreamResponseWriter
    public QueryStreamResponseWriter writeCompletionMessage() {
        if (this.completionMessage.isPresent()) {
            writeLastRow(false);
            writeBuffer(serializeObject(StreamedRow.finalMessage(this.completionMessage.get())), true);
        }
        return this;
    }

    @Override // io.confluent.ksql.api.server.QueryStreamResponseWriter
    public QueryStreamResponseWriter writeLimitMessage() {
        if (this.limitMessage.isPresent()) {
            writeLastRow(false);
            writeBuffer(serializeObject(StreamedRow.finalMessage(this.limitMessage.get())), true);
        }
        return this;
    }

    @Override // io.confluent.ksql.api.server.QueryStreamResponseWriter
    public void end() {
        cancelTimer();
        writeLastRow(true);
        writeBuffer(Buffer.buffer("]"), true);
        if (this.writerState.length() > 0) {
            this.response.write(this.writerState.getStringToFlush());
        }
        this.response.end();
    }

    @VisibleForTesting
    static String logicalToProtoSchema(LogicalSchema logicalSchema) {
        return new ProtobufData(new ProtobufDataConfig(ImmutableMap.of())).fromConnectSchema(ConnectSchemas.columnsToConnectSchema(logicalSchema.columns())).canonicalString();
    }

    private void maybeCacheRowAndWriteLast(StreamedRow streamedRow) {
        StreamedRow streamedRow2;
        if (this.bufferOutput) {
            streamedRow2 = this.lastRow;
            this.lastRow = streamedRow;
        } else {
            streamedRow2 = streamedRow;
        }
        if (streamedRow2 != null) {
            writeBuffer(serializeObject(streamedRow2), false);
            maybeFlushBuffer();
        }
    }

    private void writeLastRow(boolean z) {
        StreamedRow streamedRow = this.lastRow;
        this.lastRow = null;
        if (streamedRow != null) {
            writeBuffer(serializeObject(streamedRow), z);
        }
    }

    private void writeBuffer(Buffer buffer, boolean z) {
        if (this.bufferOutput) {
            this.writerState.append(buffer.toString(StandardCharsets.UTF_8));
            if (z) {
                return;
            }
            this.writerState.append(",\n");
            return;
        }
        Buffer buffer2 = Buffer.buffer();
        buffer2.appendBuffer(buffer);
        if (!z) {
            buffer2.appendString(",\n");
        }
        this.response.write(buffer2);
    }

    private void maybeFlushBuffer() {
        if (this.writerState.length() > 0) {
            if (this.writerState.length() < FLUSH_SIZE_BYTES && this.clock.millis() - this.writerState.getLastFlushMs() < MAX_FLUSH_MS) {
                maybeScheduleFlush();
            } else {
                cancelTimer();
                this.response.write(this.writerState.getStringToFlush());
            }
        }
    }

    private void maybeScheduleFlush() {
        if (this.timerId >= 0) {
            return;
        }
        this.timerId = this.context.owner().setTimer(Math.min(Math.max(0L, MAX_FLUSH_MS - (this.clock.millis() - this.writerState.getLastFlushMs())), MAX_FLUSH_MS), l -> {
            this.timerId = -1L;
            maybeFlushBuffer();
        });
    }

    private void cancelTimer() {
        if (this.timerId >= 0) {
            this.context.owner().cancelTimer(this.timerId);
            this.timerId = -1L;
        }
    }

    public static <T> Buffer serializeObject(T t) {
        try {
            return Buffer.buffer(OBJECT_MAPPER.writeValueAsBytes(t));
        } catch (JsonProcessingException e) {
            throw new RuntimeException("Failed to serialize buffer", e);
        }
    }

    private static Optional<KsqlHostInfoEntity> toKsqlHostInfoEntity(Optional<KsqlHostInfo> optional) {
        return optional.map(ksqlHostInfo -> {
            return new KsqlHostInfoEntity(ksqlHostInfo.host(), ksqlHostInfo.port());
        });
    }
}
