package io.confluent.ksql.rest.server.resources.streaming;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.api.server.StreamingOutput;
import io.confluent.ksql.query.BlockingRowQueue;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.entity.PushContinuationToken;
import io.confluent.ksql.rest.entity.StreamedRow;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.util.KeyValueMetadata;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.PushOffsetRange;
import io.confluent.ksql.util.PushQueryMetadata;
import io.confluent.ksql.util.RowMetadata;
import java.io.EOFException;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:io/confluent/ksql/rest/server/resources/streaming/QueryStreamWriter.class */
class QueryStreamWriter implements StreamingOutput {
    private static final int WRITE_TIMEOUT_MS = 600000;
    private static final Logger log = LogManager.getLogger(QueryStreamWriter.class);
    private final PushQueryMetadata queryMetadata;
    private final long disconnectCheckInterval;
    private final ObjectMapper objectMapper;
    private final TombstoneFactory tombstoneFactory;
    private volatile Exception streamsException;
    private volatile boolean limitReached;
    private volatile boolean complete;
    private volatile boolean connectionClosed;
    private boolean closed;

    /* loaded from: input_file:io/confluent/ksql/rest/server/resources/streaming/QueryStreamWriter$StreamsExceptionHandler.class */
    private class StreamsExceptionHandler implements StreamsUncaughtExceptionHandler {
        private StreamsExceptionHandler() {
        }

        public StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse handle(Throwable th) {
            QueryStreamWriter.this.streamsException = th instanceof Exception ? (Exception) th : new RuntimeException(th);
            return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
        }
    }

    QueryStreamWriter(PushQueryMetadata pushQueryMetadata, long j, ObjectMapper objectMapper, CompletableFuture<Void> completableFuture) {
        this(pushQueryMetadata, j, objectMapper, completableFuture, false);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public QueryStreamWriter(PushQueryMetadata pushQueryMetadata, long j, ObjectMapper objectMapper, CompletableFuture<Void> completableFuture, boolean z) {
        this.limitReached = false;
        this.objectMapper = (ObjectMapper) Objects.requireNonNull(objectMapper, "objectMapper");
        this.disconnectCheckInterval = j;
        this.queryMetadata = (PushQueryMetadata) Objects.requireNonNull(pushQueryMetadata, "queryMetadata");
        this.queryMetadata.setLimitHandler(() -> {
            this.limitReached = true;
        });
        this.queryMetadata.setCompletionHandler(() -> {
            this.complete = true;
        });
        this.queryMetadata.setUncaughtExceptionHandler(new StreamsExceptionHandler());
        this.tombstoneFactory = TombstoneFactory.create(pushQueryMetadata.getLogicalSchema(), pushQueryMetadata.getResultType());
        completableFuture.thenAccept(r4 -> {
            this.connectionClosed = true;
        });
        if (z) {
            this.complete = true;
        } else {
            pushQueryMetadata.start();
        }
    }

    @Override // io.confluent.ksql.api.server.StreamingOutput
    public void write(OutputStream outputStream) {
        try {
            try {
                outputStream.write("[".getBytes(StandardCharsets.UTF_8));
                write(outputStream, buildHeader());
                BlockingRowQueue rowQueue = this.queryMetadata.getRowQueue();
                while (!this.connectionClosed && this.queryMetadata.isRunning() && !this.limitReached && !this.complete) {
                    KeyValueMetadata<List<?>, GenericRow> poll = rowQueue.poll(this.disconnectCheckInterval, TimeUnit.MILLISECONDS);
                    if (poll != null) {
                        write(outputStream, buildRow(poll));
                    } else {
                        outputStream.write("\n".getBytes(StandardCharsets.UTF_8));
                        outputStream.flush();
                    }
                    drainAndThrowOnError(outputStream);
                }
                if (this.connectionClosed) {
                    close();
                    return;
                }
                drain(outputStream);
                if (this.limitReached) {
                    this.objectMapper.writeValue(outputStream, StreamedRow.finalMessage("Limit Reached"));
                } else if (this.complete) {
                    this.objectMapper.writeValue(outputStream, StreamedRow.finalMessage("Query Completed"));
                }
                outputStream.write("]\n".getBytes(StandardCharsets.UTF_8));
                outputStream.flush();
                close();
            } catch (EOFException e) {
                log.warn("Query terminated due to exception:" + e.toString());
                close();
            } catch (InterruptedException e2) {
                log.warn("Interrupted while writing to connection stream");
                close();
            } catch (Exception e3) {
                log.error("Exception occurred while writing to connection stream: ", e3);
                outputException(outputStream, e3);
                close();
            }
        } catch (Throwable th) {
            close();
            throw th;
        }
    }

    private void write(OutputStream outputStream, StreamedRow streamedRow) throws IOException {
        this.objectMapper.writeValue(outputStream, streamedRow);
        outputStream.write(",\n".getBytes(StandardCharsets.UTF_8));
        outputStream.flush();
    }

    @Override // io.confluent.ksql.api.server.StreamingOutput, java.lang.AutoCloseable
    public synchronized void close() {
        if (this.closed) {
            return;
        }
        this.queryMetadata.close();
        this.closed = true;
    }

    @Override // io.confluent.ksql.api.server.StreamingOutput
    public int getWriteTimeoutMs() {
        return WRITE_TIMEOUT_MS;
    }

    private StreamedRow buildHeader() {
        QueryId queryId = this.queryMetadata.getQueryId();
        LogicalSchema logicalSchema = this.queryMetadata.getLogicalSchema();
        LogicalSchema.Builder builder = LogicalSchema.builder();
        List value = logicalSchema.value();
        Objects.requireNonNull(builder);
        value.forEach((v1) -> {
            r1.valueColumn(v1);
        });
        return StreamedRow.header(queryId, builder.build());
    }

    private StreamedRow buildRow(KeyValueMetadata<List<?>, GenericRow> keyValueMetadata) {
        return (keyValueMetadata.getRowMetadata().isPresent() && ((RowMetadata) keyValueMetadata.getRowMetadata().get()).getPushOffsetsRange().isPresent()) ? StreamedRow.continuationToken(new PushContinuationToken(((PushOffsetRange) ((RowMetadata) keyValueMetadata.getRowMetadata().get()).getPushOffsetsRange().get()).serialize())) : keyValueMetadata.getKeyValue().value() == null ? StreamedRow.tombstone(this.tombstoneFactory.createRow(keyValueMetadata.getKeyValue())) : StreamedRow.pushRow((GenericRow) keyValueMetadata.getKeyValue().value());
    }

    private void outputException(OutputStream outputStream, Throwable th) {
        try {
            outputStream.write("\n".getBytes(StandardCharsets.UTF_8));
            if (th.getCause() instanceof KsqlException) {
                this.objectMapper.writeValue(outputStream, StreamedRow.error(th.getCause(), Errors.ERROR_CODE_SERVER_ERROR));
            } else {
                this.objectMapper.writeValue(outputStream, StreamedRow.error(th, Errors.ERROR_CODE_SERVER_ERROR));
            }
            outputStream.write(",\n".getBytes(StandardCharsets.UTF_8));
            outputStream.flush();
        } catch (IOException e) {
            log.debug("Client disconnected while attempting to write an error message");
        }
    }

    private void drainAndThrowOnError(OutputStream outputStream) throws Exception {
        if (this.streamsException != null) {
            drain(outputStream);
            throw this.streamsException;
        }
    }

    private void drain(OutputStream outputStream) throws IOException {
        ArrayList newArrayList = Lists.newArrayList();
        this.queryMetadata.getRowQueue().drainTo(newArrayList);
        Iterator it = newArrayList.iterator();
        while (it.hasNext()) {
            write(outputStream, buildRow((KeyValueMetadata) it.next()));
        }
    }
}
