package io.confluent.ksql.rest.server.resources.streaming;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.api.server.StreamingOutput;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.entity.StreamedRow;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.TransientQueryMetadata;
import java.io.EOFException;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.Thread;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/confluent/ksql/rest/server/resources/streaming/QueryStreamWriter.class */
public class QueryStreamWriter implements StreamingOutput {
    private static final Logger log = LoggerFactory.getLogger(QueryStreamWriter.class);
    private static final QueryId NO_QUERY_ID = new QueryId("none");
    private final TransientQueryMetadata queryMetadata;
    private final long disconnectCheckInterval;
    private final ObjectMapper objectMapper;
    private volatile Exception streamsException;
    private volatile boolean limitReached = false;
    private volatile boolean connectionClosed;
    private boolean closed;

    /* loaded from: input_file:io/confluent/ksql/rest/server/resources/streaming/QueryStreamWriter$LimitHandler.class */
    private class LimitHandler implements io.confluent.ksql.query.LimitHandler {
        private LimitHandler() {
        }

        public void limitReached() {
            QueryStreamWriter.this.limitReached = true;
        }
    }

    /* loaded from: input_file:io/confluent/ksql/rest/server/resources/streaming/QueryStreamWriter$StreamsExceptionHandler.class */
    private class StreamsExceptionHandler implements Thread.UncaughtExceptionHandler {
        private StreamsExceptionHandler() {
        }

        @Override // java.lang.Thread.UncaughtExceptionHandler
        public void uncaughtException(Thread thread, Throwable th) {
            QueryStreamWriter.this.streamsException = th instanceof Exception ? (Exception) th : new RuntimeException(th);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public QueryStreamWriter(TransientQueryMetadata transientQueryMetadata, long j, ObjectMapper objectMapper, CompletableFuture<Void> completableFuture) {
        this.objectMapper = (ObjectMapper) Objects.requireNonNull(objectMapper, "objectMapper");
        this.disconnectCheckInterval = j;
        this.queryMetadata = (TransientQueryMetadata) Objects.requireNonNull(transientQueryMetadata, "queryMetadata");
        this.queryMetadata.setLimitHandler(new LimitHandler());
        this.queryMetadata.setUncaughtExceptionHandler(new StreamsExceptionHandler());
        completableFuture.thenAccept(r4 -> {
            this.connectionClosed = true;
        });
        transientQueryMetadata.start();
    }

    @Override // io.confluent.ksql.api.server.StreamingOutput
    public void write(OutputStream outputStream) {
        try {
            outputStream.write("[".getBytes(StandardCharsets.UTF_8));
            write(outputStream, buildHeader());
            while (!this.connectionClosed && this.queryMetadata.isRunning() && !this.limitReached) {
                GenericRow poll = this.queryMetadata.getRowQueue().poll(this.disconnectCheckInterval, TimeUnit.MILLISECONDS);
                if (poll != null) {
                    write(outputStream, StreamedRow.row(poll));
                } else {
                    outputStream.write("\n".getBytes(StandardCharsets.UTF_8));
                    outputStream.flush();
                }
                drainAndThrowOnError(outputStream);
            }
            drain(outputStream);
            if (this.limitReached) {
                this.objectMapper.writeValue(outputStream, StreamedRow.finalMessage("Limit Reached"));
                outputStream.write("]\n".getBytes(StandardCharsets.UTF_8));
                outputStream.flush();
            }
        } catch (Exception e) {
            log.error("Exception occurred while writing to connection stream: ", e);
            outputException(outputStream, e);
        } catch (EOFException e2) {
            log.warn("Query terminated due to exception:" + e2.toString());
        } catch (InterruptedException e3) {
            log.warn("Interrupted while writing to connection stream");
        } finally {
            close();
        }
    }

    private void write(OutputStream outputStream, StreamedRow streamedRow) throws IOException {
        this.objectMapper.writeValue(outputStream, streamedRow);
        outputStream.write(",\n".getBytes(StandardCharsets.UTF_8));
        outputStream.flush();
    }

    @Override // io.confluent.ksql.api.server.StreamingOutput, java.lang.AutoCloseable
    public synchronized void close() {
        if (this.closed) {
            return;
        }
        this.queryMetadata.close();
        this.closed = true;
    }

    private StreamedRow buildHeader() {
        LogicalSchema logicalSchema = this.queryMetadata.getLogicalSchema();
        LogicalSchema.Builder builder = LogicalSchema.builder();
        List value = logicalSchema.value();
        builder.getClass();
        value.forEach((v1) -> {
            r1.valueColumn(v1);
        });
        return StreamedRow.header(NO_QUERY_ID, builder.build());
    }

    private void outputException(OutputStream outputStream, Throwable th) {
        try {
            outputStream.write("\n".getBytes(StandardCharsets.UTF_8));
            if (th.getCause() instanceof KsqlException) {
                this.objectMapper.writeValue(outputStream, StreamedRow.error(th.getCause(), Errors.ERROR_CODE_SERVER_ERROR));
            } else {
                this.objectMapper.writeValue(outputStream, StreamedRow.error(th, Errors.ERROR_CODE_SERVER_ERROR));
            }
            outputStream.write(",\n".getBytes(StandardCharsets.UTF_8));
            outputStream.flush();
        } catch (IOException e) {
            log.debug("Client disconnected while attempting to write an error message");
        }
    }

    private void drainAndThrowOnError(OutputStream outputStream) throws Exception {
        if (this.streamsException != null) {
            drain(outputStream);
            throw this.streamsException;
        }
    }

    private void drain(OutputStream outputStream) throws IOException {
        ArrayList newArrayList = Lists.newArrayList();
        this.queryMetadata.getRowQueue().drainTo(newArrayList);
        Iterator it = newArrayList.iterator();
        while (it.hasNext()) {
            write(outputStream, StreamedRow.row((GenericRow) it.next()));
        }
    }
}
