package org.apache.spark.sql.execution.python.streaming;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.util.Map;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowStreamReader;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkEnv$;
import org.apache.spark.api.python.PythonFunction;
import org.apache.spark.api.python.PythonWorker;
import org.apache.spark.api.python.PythonWorkerFactory;
import org.apache.spark.api.python.PythonWorkerUtils$;
import org.apache.spark.api.python.SpecialLengths$;
import org.apache.spark.internal.LogEntry;
import org.apache.spark.internal.LogEntry$;
import org.apache.spark.internal.LogKeys$MODULE_NAME$;
import org.apache.spark.internal.LogKeys$PYTHON_EXEC$;
import org.apache.spark.internal.Logging;
import org.apache.spark.internal.MDC;
import org.apache.spark.internal.config.Python$;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.errors.QueryCompilationErrors$;
import org.apache.spark.sql.errors.QueryExecutionErrors$;
import org.apache.spark.sql.internal.SQLConf$;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.ArrowUtils$;
import org.apache.spark.sql.vectorized.ArrowColumnVector;
import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.slf4j.Logger;
import scala.$less$colon$less$;
import scala.Function0;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.ArrayOps$;
import scala.collection.IterableOnceOps;
import scala.collection.Iterator;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.ArrayBuffer$;
import scala.jdk.CollectionConverters$;
import scala.package$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.runtime.ScalaRunTime$;

/* compiled from: PythonStreamingSourceRunner.scala */
@ScalaSignature(bytes = "\u0006\u0005\t=r!\u0002\u001d:\u0011\u0003Ae!\u0002&:\u0011\u0003Y\u0005\"\u0002*\u0002\t\u0003\u0019\u0006b\u0002+\u0002\u0005\u0004%\t!\u0016\u0005\u00073\u0006\u0001\u000b\u0011\u0002,\t\u000fi\u000b!\u0019!C\u0001+\"11,\u0001Q\u0001\nYCq\u0001X\u0001C\u0002\u0013\u0005Q\u000b\u0003\u0004^\u0003\u0001\u0006IA\u0016\u0005\b=\u0006\u0011\r\u0011\"\u0001V\u0011\u0019y\u0016\u0001)A\u0005-\"9\u0001-\u0001b\u0001\n\u0003)\u0006BB1\u0002A\u0003%a\u000bC\u0004c\u0003\t\u0007I\u0011A+\t\r\r\f\u0001\u0015!\u0003W\u0011\u001d!\u0017A1A\u0005\u0002UCa!Z\u0001!\u0002\u00131f\u0001\u0002&:\u0001\u0019D\u0001\"\\\t\u0003\u0002\u0003\u0006IA\u001c\u0005\tkF\u0011\t\u0011)A\u0005m\")!+\u0005C\u0001y\"I\u0011\u0011A\tC\u0002\u0013\u0005\u00111\u0001\u0005\t\u0003+\t\u0002\u0015!\u0003\u0002\u0006!I\u0011qC\tC\u0002\u0013%\u0011\u0011\u0004\u0005\t\u0003G\t\u0002\u0015!\u0003\u0002\u001c!A\u0011QE\tC\u0002\u0013%Q\u000bC\u0004\u0002(E\u0001\u000b\u0011\u0002,\t\u0013\u0005%\u0012C1A\u0005\n\u0005-\u0002\u0002CA\u001a#\u0001\u0006I!!\f\t\u0013\u0005U\u0012C1A\u0005\n\u0005]\u0002\u0002CA-#\u0001\u0006I!!\u000f\t\u0013\u0005m\u0013C1A\u0005\n\u0005u\u0003\u0002CA0#\u0001\u0006I!!\u0012\t\u0013\u0005\u0005\u0014\u00031A\u0005\n\u0005\r\u0004\"CA9#\u0001\u0007I\u0011BA:\u0011!\ty(\u0005Q!\n\u0005\u0015\u0004\"CAA#\u0001\u0007I\u0011BAB\u0011%\ti)\u0005a\u0001\n\u0013\ty\t\u0003\u0005\u0002\u0014F\u0001\u000b\u0015BAC\u0011%\t)*\u0005b\u0001\n\u0013\ti\u0006\u0003\u0005\u0002\u0018F\u0001\u000b\u0011BA#\u0011%\tI*\u0005a\u0001\n\u0013\tY\nC\u0005\u0002*F\u0001\r\u0011\"\u0003\u0002,\"A\u0011qV\t!B\u0013\ti\nC\u0005\u00022F\u0001\r\u0011\"\u0003\u00024\"I\u00111X\tA\u0002\u0013%\u0011Q\u0018\u0005\t\u0003\u0003\f\u0002\u0015)\u0003\u00026\"9\u00111Y\t\u0005\u0002\u0005\u0015\u0007bBAd#\u0011\u0005\u0011\u0011\u001a\u0005\b\u0003\u0017\fB\u0011AAe\u0011\u001d\ti-\u0005C\u0001\u0003\u001fDqA!\u0004\u0012\t\u0003\u0011y\u0001C\u0004\u0003\u0014E!\t!!2\t\u0013\tU\u0011C1A\u0005\n\t]\u0001\u0002\u0003B\u0015#\u0001\u0006IA!\u0007\t\u000f\t-\u0012\u0003\"\u0001\u0003.\u0005Y\u0002+\u001f;i_:\u001cFO]3b[&twmU8ve\u000e,'+\u001e8oKJT!AO\u001e\u0002\u0013M$(/Z1nS:<'B\u0001\u001f>\u0003\u0019\u0001\u0018\u0010\u001e5p]*\u0011ahP\u0001\nKb,7-\u001e;j_:T!\u0001Q!\u0002\u0007M\fHN\u0003\u0002C\u0007\u0006)1\u000f]1sW*\u0011A)R\u0001\u0007CB\f7\r[3\u000b\u0003\u0019\u000b1a\u001c:h\u0007\u0001\u0001\"!S\u0001\u000e\u0003e\u00121\u0004U=uQ>t7\u000b\u001e:fC6LgnZ*pkJ\u001cWMU;o]\u0016\u00148CA\u0001M!\ti\u0005+D\u0001O\u0015\u0005y\u0015!B:dC2\f\u0017BA)O\u0005\u0019\te.\u001f*fM\u00061A(\u001b8jiz\"\u0012\u0001S\u0001\u0017\u0013:KE+S!M?>3eiU#U?\u001a+fjQ0J\tV\ta\u000b\u0005\u0002N/&\u0011\u0001L\u0014\u0002\u0004\u0013:$\u0018aF%O\u0013RK\u0015\tT0P\r\u001a\u001bV\tV0G+:\u001bu,\u0013#!\u0003Ua\u0015\tV#T)~{eIR*F)~3UKT\"`\u0013\u0012\u000ba\u0003T!U\u000bN#vl\u0014$G'\u0016#vLR+O\u0007~KE\tI\u0001\u0013!\u0006\u0013F+\u0013+J\u001f:\u001bvLR+O\u0007~KE)A\nQ\u0003J#\u0016\nV%P\u001dN{f)\u0016(D?&#\u0005%\u0001\bD\u001f6k\u0015\nV0G+:\u001bu,\u0013#\u0002\u001f\r{U*T%U?\u001a+fjQ0J\t\u0002\nA\u0004\u0015*F\r\u0016#6\tS#E?J+5i\u0014*E'~su\nV0G\u001fVsE)A\u000fQ%\u00163U\tV\"I\u000b\u0012{&+R\"P%\u0012\u001bvLT(U?\u001a{UK\u0014#!\u0003\u0001ruJT0F\u001bB#\u0016l\u0018)Z\u0003J\u0013vjV0S\u000b\u000e{%\u000bR0C\u0003R\u001b\u0005*R*\u0002C9{ejX#N!RKv\fU-B%J{uk\u0018*F\u0007>\u0013Fi\u0018\"B)\u000eCUi\u0015\u0011\u00029\u0015k\u0005\u000bV-`!f\u000b%KU(X?J+5i\u0014*E?\n\u000bEk\u0011%F'\u0006iR)\u0014)U3~\u0003\u0016,\u0011*S\u001f^{&+R\"P%\u0012{&)\u0011+D\u0011\u0016\u001b\u0006eE\u0002\u0012\u0019\u001e\u0004\"\u0001[6\u000e\u0003%T!A[!\u0002\u0011%tG/\u001a:oC2L!\u0001\\5\u0003\u000f1{wmZ5oO\u0006!a-\u001e8d!\ty7/D\u0001q\u0015\ta\u0014O\u0003\u0002s\u0003\u0006\u0019\u0011\r]5\n\u0005Q\u0004(A\u0004)zi\"|gNR;oGRLwN\\\u0001\r_V$\b/\u001e;TG\",W.\u0019\t\u0003ojl\u0011\u0001\u001f\u0006\u0003s~\nQ\u0001^=qKNL!a\u001f=\u0003\u0015M#(/^2u)f\u0004X\rF\u0002~}~\u0004\"!S\t\t\u000b5$\u0002\u0019\u00018\t\u000bU$\u0002\u0019\u0001<\u0002\u0019]|'o[3s\u001b>$W\u000f\\3\u0016\u0005\u0005\u0015\u0001\u0003BA\u0004\u0003#i!!!\u0003\u000b\t\u0005-\u0011QB\u0001\u0005Y\u0006twM\u0003\u0002\u0002\u0010\u0005!!.\u0019<b\u0013\u0011\t\u0019\"!\u0003\u0003\rM#(/\u001b8h\u000359xN]6fe6{G-\u001e7fA\u0005!1m\u001c8g+\t\tY\u0002\u0005\u0003\u0002\u001e\u0005}Q\"A!\n\u0007\u0005\u0005\u0012IA\u0005Ta\u0006\u00148nQ8oM\u0006)1m\u001c8gA\u0005Q!-\u001e4gKJ\u001c\u0016N_3\u0002\u0017\t,hMZ3s'&TX\rI\u0001\u0012CV$\bnU8dW\u0016$H+[7f_V$XCAA\u0017!\ri\u0015qF\u0005\u0004\u0003cq%\u0001\u0002'p]\u001e\f!#Y;uQN{7m[3u)&lWm\\;uA\u00059QM\u001c<WCJ\u001cXCAA\u001d!!\tY$!\u0011\u0002F\u0005\u0015SBAA\u001f\u0015\u0011\ty$!\u0004\u0002\tU$\u0018\u000e\\\u0005\u0005\u0003\u0007\niDA\u0002NCB\u0004B!a\u0012\u0002V9!\u0011\u0011JA)!\r\tYET\u0007\u0003\u0003\u001bR1!a\u0014H\u0003\u0019a$o\\8u}%\u0019\u00111\u000b(\u0002\rA\u0013X\rZ3g\u0013\u0011\t\u0019\"a\u0016\u000b\u0007\u0005Mc*\u0001\u0005f]Z4\u0016M]:!\u0003)\u0001\u0018\u0010\u001e5p]\u0016CXmY\u000b\u0003\u0003\u000b\n1\u0002]=uQ>tW\t_3dA\u0005a\u0001/\u001f;i_:<vN]6feV\u0011\u0011Q\r\t\u0006\u001b\u0006\u001d\u00141N\u0005\u0004\u0003Sr%AB(qi&|g\u000eE\u0002p\u0003[J1!a\u001cq\u00051\u0001\u0016\u0010\u001e5p]^{'o[3s\u0003A\u0001\u0018\u0010\u001e5p]^{'o[3s?\u0012*\u0017\u000f\u0006\u0003\u0002v\u0005m\u0004cA'\u0002x%\u0019\u0011\u0011\u0010(\u0003\tUs\u0017\u000e\u001e\u0005\n\u0003{\u0012\u0013\u0011!a\u0001\u0003K\n1\u0001\u001f\u00132\u00035\u0001\u0018\u0010\u001e5p]^{'o[3sA\u0005\u0019\u0002/\u001f;i_:<vN]6fe\u001a\u000b7\r^8ssV\u0011\u0011Q\u0011\t\u0006\u001b\u0006\u001d\u0014q\u0011\t\u0004_\u0006%\u0015bAAFa\n\u0019\u0002+\u001f;i_:<vN]6fe\u001a\u000b7\r^8ss\u00069\u0002/\u001f;i_:<vN]6fe\u001a\u000b7\r^8ss~#S-\u001d\u000b\u0005\u0003k\n\t\nC\u0005\u0002~\u0015\n\t\u00111\u0001\u0002\u0006\u0006!\u0002/\u001f;i_:<vN]6fe\u001a\u000b7\r^8ss\u0002\n\u0011\u0002]=uQ>tg+\u001a:\u0002\u0015ALH\u000f[8o-\u0016\u0014\b%A\u0004eCR\fw*\u001e;\u0016\u0005\u0005u\u0005\u0003BAP\u0003Kk!!!)\u000b\t\u0005\r\u0016QB\u0001\u0003S>LA!a*\u0002\"\n\u0001B)\u0019;b\u001fV$\b/\u001e;TiJ,\u0017-\\\u0001\fI\u0006$\u0018mT;u?\u0012*\u0017\u000f\u0006\u0003\u0002v\u00055\u0006\"CA?U\u0005\u0005\t\u0019AAO\u0003!!\u0017\r^1PkR\u0004\u0013A\u00023bi\u0006Le.\u0006\u0002\u00026B!\u0011qTA\\\u0013\u0011\tI,!)\u0003\u001f\u0011\u000bG/Y%oaV$8\u000b\u001e:fC6\f!\u0002Z1uC&sw\fJ3r)\u0011\t)(a0\t\u0013\u0005uT&!AA\u0002\u0005U\u0016a\u00023bi\u0006Le\u000eI\u0001\u0005S:LG\u000f\u0006\u0002\u0002v\u0005aA.\u0019;fgR|eMZ:fiR\u0011\u0011QI\u0001\u000eS:LG/[1m\u001f\u001a47/\u001a;\u0002\u0015A\f'\u000f^5uS>t7\u000f\u0006\u0004\u0002R\n\u0015!\u0011\u0002\t\b\u001b\u0006M\u0017q[As\u0013\r\t)N\u0014\u0002\u0007)V\u0004H.\u001a\u001a\u0011\u000b5\u000bI.!8\n\u0007\u0005mgJA\u0003BeJ\f\u0017\u0010E\u0003N\u00033\fy\u000eE\u0002N\u0003CL1!a9O\u0005\u0011\u0011\u0015\u0010^3\u0011\u000b5\u000b9'a:\u0011\r\u0005%\u00181_A}\u001d\u0011\tY/a<\u000f\t\u0005-\u0013Q^\u0005\u0002\u001f&\u0019\u0011\u0011\u001f(\u0002\u000fA\f7m[1hK&!\u0011Q_A|\u0005!IE/\u001a:bi>\u0014(bAAy\u001dB!\u00111 B\u0001\u001b\t\tiPC\u0002\u0002��~\n\u0001bY1uC2L8\u000f^\u0005\u0005\u0005\u0007\tiPA\u0006J]R,'O\\1m%><\bb\u0002B\u0004e\u0001\u0007\u0011QI\u0001\u0006gR\f'\u000f\u001e\u0005\b\u0005\u0017\u0011\u0004\u0019AA#\u0003\r)g\u000eZ\u0001\u0007G>lW.\u001b;\u0015\t\u0005U$\u0011\u0003\u0005\b\u0005\u0017\u0019\u0004\u0019AA#\u0003\u0011\u0019Ho\u001c9\u0002\u0013\u0005dGn\\2bi>\u0014XC\u0001B\r!\u0011\u0011YB!\n\u000e\u0005\tu!\u0002\u0002B\u0010\u0005C\ta!\\3n_JL(b\u0001B\u0012\u0007\u0006)\u0011M\u001d:po&!!q\u0005B\u000f\u0005=\u0011UO\u001a4fe\u0006cGn\\2bi>\u0014\u0018AC1mY>\u001c\u0017\r^8sA\u00051\"/Z1e\u0003J\u0014xn\u001e*fG>\u0014HMQ1uG\",7\u000f\u0006\u0002\u0002h\u0002")
/* loaded from: input_file:org/apache/spark/sql/execution/python/streaming/PythonStreamingSourceRunner.class */
public class PythonStreamingSourceRunner implements Logging {
    private final PythonFunction func;
    private final StructType outputSchema;
    private final String workerModule;
    private final SparkConf conf;
    private final int bufferSize;
    private final long authSocketTimeout;
    private final Map<String, String> envVars;
    private final String pythonExec;
    private Option<PythonWorker> pythonWorker;
    private Option<PythonWorkerFactory> pythonWorkerFactory;
    private final String pythonVer;
    private DataOutputStream dataOut;
    private DataInputStream dataIn;
    private final BufferAllocator allocator;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    public static int EMPTY_PYARROW_RECORD_BATCHES() {
        return PythonStreamingSourceRunner$.MODULE$.EMPTY_PYARROW_RECORD_BATCHES();
    }

    public static int NON_EMPTY_PYARROW_RECORD_BATCHES() {
        return PythonStreamingSourceRunner$.MODULE$.NON_EMPTY_PYARROW_RECORD_BATCHES();
    }

    public static int PREFETCHED_RECORDS_NOT_FOUND() {
        return PythonStreamingSourceRunner$.MODULE$.PREFETCHED_RECORDS_NOT_FOUND();
    }

    public static int COMMIT_FUNC_ID() {
        return PythonStreamingSourceRunner$.MODULE$.COMMIT_FUNC_ID();
    }

    public static int PARTITIONS_FUNC_ID() {
        return PythonStreamingSourceRunner$.MODULE$.PARTITIONS_FUNC_ID();
    }

    public static int LATEST_OFFSET_FUNC_ID() {
        return PythonStreamingSourceRunner$.MODULE$.LATEST_OFFSET_FUNC_ID();
    }

    public static int INITIAL_OFFSET_FUNC_ID() {
        return PythonStreamingSourceRunner$.MODULE$.INITIAL_OFFSET_FUNC_ID();
    }

    public String logName() {
        return Logging.logName$(this);
    }

    public Logger log() {
        return Logging.log$(this);
    }

    public Logging.LogStringContext LogStringContext(StringContext stringContext) {
        return Logging.LogStringContext$(this, stringContext);
    }

    public void withLogContext(Map<String, String> map, Function0<BoxedUnit> function0) {
        Logging.withLogContext$(this, map, function0);
    }

    public void logInfo(Function0<String> function0) {
        Logging.logInfo$(this, function0);
    }

    public void logInfo(LogEntry logEntry) {
        Logging.logInfo$(this, logEntry);
    }

    public void logInfo(LogEntry logEntry, Throwable th) {
        Logging.logInfo$(this, logEntry, th);
    }

    public void logDebug(Function0<String> function0) {
        Logging.logDebug$(this, function0);
    }

    public void logDebug(LogEntry logEntry) {
        Logging.logDebug$(this, logEntry);
    }

    public void logDebug(LogEntry logEntry, Throwable th) {
        Logging.logDebug$(this, logEntry, th);
    }

    public void logTrace(Function0<String> function0) {
        Logging.logTrace$(this, function0);
    }

    public void logTrace(LogEntry logEntry) {
        Logging.logTrace$(this, logEntry);
    }

    public void logTrace(LogEntry logEntry, Throwable th) {
        Logging.logTrace$(this, logEntry, th);
    }

    public void logWarning(Function0<String> function0) {
        Logging.logWarning$(this, function0);
    }

    public void logWarning(LogEntry logEntry) {
        Logging.logWarning$(this, logEntry);
    }

    public void logWarning(LogEntry logEntry, Throwable th) {
        Logging.logWarning$(this, logEntry, th);
    }

    public void logError(Function0<String> function0) {
        Logging.logError$(this, function0);
    }

    public void logError(LogEntry logEntry) {
        Logging.logError$(this, logEntry);
    }

    public void logError(LogEntry logEntry, Throwable th) {
        Logging.logError$(this, logEntry, th);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.logInfo$(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.logDebug$(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.logTrace$(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.logWarning$(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.logError$(this, function0, th);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    public void initializeLogIfNecessary(boolean z) {
        Logging.initializeLogIfNecessary$(this, z);
    }

    public boolean initializeLogIfNecessary(boolean z, boolean z2) {
        return Logging.initializeLogIfNecessary$(this, z, z2);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$(this);
    }

    public void initializeForcefully(boolean z, boolean z2) {
        Logging.initializeForcefully$(this, z, z2);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger logger) {
        this.org$apache$spark$internal$Logging$$log_ = logger;
    }

    public String workerModule() {
        return this.workerModule;
    }

    private SparkConf conf() {
        return this.conf;
    }

    private int bufferSize() {
        return this.bufferSize;
    }

    private long authSocketTimeout() {
        return this.authSocketTimeout;
    }

    private Map<String, String> envVars() {
        return this.envVars;
    }

    private String pythonExec() {
        return this.pythonExec;
    }

    private Option<PythonWorker> pythonWorker() {
        return this.pythonWorker;
    }

    private void pythonWorker_$eq(Option<PythonWorker> option) {
        this.pythonWorker = option;
    }

    private Option<PythonWorkerFactory> pythonWorkerFactory() {
        return this.pythonWorkerFactory;
    }

    private void pythonWorkerFactory_$eq(Option<PythonWorkerFactory> option) {
        this.pythonWorkerFactory = option;
    }

    private String pythonVer() {
        return this.pythonVer;
    }

    private DataOutputStream dataOut() {
        return this.dataOut;
    }

    private void dataOut_$eq(DataOutputStream dataOutputStream) {
        this.dataOut = dataOutputStream;
    }

    private DataInputStream dataIn() {
        return this.dataIn;
    }

    private void dataIn_$eq(DataInputStream dataInputStream) {
        this.dataIn = dataInputStream;
    }

    public void init() {
        PythonWorker pythonWorker;
        logInfo(LogEntry$.MODULE$.from(() -> {
            return this.LogStringContext(new StringContext(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{"Initializing Python runner pythonExec: ", ""}))).log(ScalaRunTime$.MODULE$.wrapRefArray(new MDC[]{new MDC(LogKeys$PYTHON_EXEC$.MODULE$, this.pythonExec())}));
        }));
        envVars().put("SPARK_LOCAL_DIRS", Predef$.MODULE$.wrapRefArray((Object[]) ArrayOps$.MODULE$.map$extension(Predef$.MODULE$.refArrayOps(SparkEnv$.MODULE$.get().blockManager().diskBlockManager().localDirs()), file -> {
            return file.getPath();
        }, ClassTag$.MODULE$.apply(String.class))).mkString(","));
        envVars().put("SPARK_AUTH_SOCKET_TIMEOUT", Long.toString(authSocketTimeout()));
        envVars().put("SPARK_BUFFER_SIZE", Integer.toString(bufferSize()));
        PythonWorkerFactory pythonWorkerFactory = new PythonWorkerFactory(pythonExec(), workerModule(), CollectionConverters$.MODULE$.MapHasAsScala(envVars()).asScala().toMap($less$colon$less$.MODULE$.refl()), false);
        Tuple2 createSimpleWorker = pythonWorkerFactory.createSimpleWorker(true);
        if (createSimpleWorker == null || (pythonWorker = (PythonWorker) createSimpleWorker._1()) == null) {
            throw new MatchError(createSimpleWorker);
        }
        pythonWorker_$eq(new Some(pythonWorker));
        pythonWorkerFactory_$eq(new Some(pythonWorkerFactory));
        dataOut_$eq(new DataOutputStream(new BufferedOutputStream(((PythonWorker) pythonWorker().get()).channel().socket().getOutputStream(), bufferSize())));
        PythonWorkerUtils$.MODULE$.writePythonVersion(pythonVer(), dataOut());
        PythonWorkerUtils$.MODULE$.writeSparkFiles(new Some("streaming_job"), CollectionConverters$.MODULE$.ListHasAsScala(this.func.pythonIncludes()).asScala().toSet(), dataOut());
        PythonWorkerUtils$.MODULE$.writePythonFunction(this.func, dataOut());
        PythonWorkerUtils$.MODULE$.writeUTF(this.outputSchema.json(), dataOut());
        dataOut().writeInt(SQLConf$.MODULE$.get().arrowMaxRecordsPerBatch());
        dataOut().flush();
        dataIn_$eq(new DataInputStream(new BufferedInputStream(((PythonWorker) pythonWorker().get()).channel().socket().getInputStream(), bufferSize())));
        if (dataIn().readInt() == SpecialLengths$.MODULE$.PYTHON_EXCEPTION_THROWN()) {
            throw QueryCompilationErrors$.MODULE$.pythonDataSourceError("plan", "initialize source", PythonWorkerUtils$.MODULE$.readUTF(dataIn()));
        }
    }

    public String latestOffset() {
        dataOut().writeInt(PythonStreamingSourceRunner$.MODULE$.LATEST_OFFSET_FUNC_ID());
        dataOut().flush();
        int readInt = dataIn().readInt();
        if (readInt != SpecialLengths$.MODULE$.PYTHON_EXCEPTION_THROWN()) {
            return PythonWorkerUtils$.MODULE$.readUTF(readInt, dataIn());
        }
        throw QueryExecutionErrors$.MODULE$.pythonStreamingDataSourceRuntimeError("latestOffset", PythonWorkerUtils$.MODULE$.readUTF(dataIn()));
    }

    public String initialOffset() {
        dataOut().writeInt(PythonStreamingSourceRunner$.MODULE$.INITIAL_OFFSET_FUNC_ID());
        dataOut().flush();
        int readInt = dataIn().readInt();
        if (readInt != SpecialLengths$.MODULE$.PYTHON_EXCEPTION_THROWN()) {
            return PythonWorkerUtils$.MODULE$.readUTF(readInt, dataIn());
        }
        throw QueryExecutionErrors$.MODULE$.pythonStreamingDataSourceRuntimeError("initialOffset", PythonWorkerUtils$.MODULE$.readUTF(dataIn()));
    }

    public Tuple2<byte[][], Option<Iterator<InternalRow>>> partitions(String str, String str2) {
        Some some;
        dataOut().writeInt(PythonStreamingSourceRunner$.MODULE$.PARTITIONS_FUNC_ID());
        PythonWorkerUtils$.MODULE$.writeUTF(str, dataOut());
        PythonWorkerUtils$.MODULE$.writeUTF(str2, dataOut());
        dataOut().flush();
        ArrayBuffer empty = ArrayBuffer$.MODULE$.empty();
        int readInt = dataIn().readInt();
        if (readInt == SpecialLengths$.MODULE$.PYTHON_EXCEPTION_THROWN()) {
            throw QueryExecutionErrors$.MODULE$.pythonStreamingDataSourceRuntimeError("planPartitions", PythonWorkerUtils$.MODULE$.readUTF(dataIn()));
        }
        RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), readInt).foreach(obj -> {
            return $anonfun$partitions$1(this, empty, BoxesRunTime.unboxToInt(obj));
        });
        int readInt2 = dataIn().readInt();
        if (PythonStreamingSourceRunner$.MODULE$.NON_EMPTY_PYARROW_RECORD_BATCHES() == readInt2) {
            some = new Some(readArrowRecordBatches());
        } else if (PythonStreamingSourceRunner$.MODULE$.PREFETCHED_RECORDS_NOT_FOUND() == readInt2) {
            some = None$.MODULE$;
        } else {
            if (PythonStreamingSourceRunner$.MODULE$.EMPTY_PYARROW_RECORD_BATCHES() != readInt2) {
                if (SpecialLengths$.MODULE$.PYTHON_EXCEPTION_THROWN() != readInt2) {
                    throw QueryExecutionErrors$.MODULE$.pythonStreamingDataSourceRuntimeError("planPartitions", "unknown status code " + readInt2);
                }
                throw QueryExecutionErrors$.MODULE$.pythonStreamingDataSourceRuntimeError("planPartitions", PythonWorkerUtils$.MODULE$.readUTF(dataIn()));
            }
            some = new Some(package$.MODULE$.Iterator().empty());
        }
        return new Tuple2<>(empty.toArray(ClassTag$.MODULE$.apply(ScalaRunTime$.MODULE$.arrayClass(Byte.TYPE))), some);
    }

    public void commit(String str) {
        dataOut().writeInt(PythonStreamingSourceRunner$.MODULE$.COMMIT_FUNC_ID());
        PythonWorkerUtils$.MODULE$.writeUTF(str, dataOut());
        dataOut().flush();
        if (dataIn().readInt() == SpecialLengths$.MODULE$.PYTHON_EXCEPTION_THROWN()) {
            throw QueryExecutionErrors$.MODULE$.pythonStreamingDataSourceRuntimeError("commitSource", PythonWorkerUtils$.MODULE$.readUTF(dataIn()));
        }
    }

    public void stop() {
        logInfo(LogEntry$.MODULE$.from(() -> {
            return this.LogStringContext(new StringContext(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{"Stopping streaming runner for module: "}))).log(Nil$.MODULE$).$plus(this.LogStringContext(new StringContext(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{"", "."}))).log(ScalaRunTime$.MODULE$.wrapRefArray(new MDC[]{new MDC(LogKeys$MODULE_NAME$.MODULE$, this.workerModule())})));
        }));
        try {
            pythonWorkerFactory().foreach(pythonWorkerFactory -> {
                $anonfun$stop$2(this, pythonWorkerFactory);
                return BoxedUnit.UNIT;
            });
        } catch (Exception e) {
            logError(() -> {
                return "Exception when trying to kill worker";
            }, e);
        }
    }

    private BufferAllocator allocator() {
        return this.allocator;
    }

    public Iterator<InternalRow> readArrowRecordBatches() {
        int readInt = dataIn().readInt();
        if (SpecialLengths$.MODULE$.PYTHON_EXCEPTION_THROWN() == readInt) {
            throw QueryExecutionErrors$.MODULE$.pythonStreamingDataSourceRuntimeError("prefetchArrowBatches", PythonWorkerUtils$.MODULE$.readUTF(dataIn()));
        }
        if (SpecialLengths$.MODULE$.START_ARROW_STREAM() != readInt) {
            throw QueryExecutionErrors$.MODULE$.pythonStreamingDataSourceRuntimeError("prefetchArrowBatches", "unknown status code " + readInt);
        }
        BoxedUnit boxedUnit = BoxedUnit.UNIT;
        ArrowStreamReader arrowStreamReader = new ArrowStreamReader(dataIn(), allocator());
        VectorSchemaRoot vectorSchemaRoot = arrowStreamReader.getVectorSchemaRoot();
        StructType fromArrowSchema = ArrowUtils$.MODULE$.fromArrowSchema(vectorSchemaRoot.getSchema());
        Predef$ predef$ = Predef$.MODULE$;
        StructType structType = this.outputSchema;
        predef$.assert(fromArrowSchema != null ? fromArrowSchema.equals(structType) : structType == null);
        ColumnVector[] columnVectorArr = (ColumnVector[]) ((IterableOnceOps) CollectionConverters$.MODULE$.ListHasAsScala(vectorSchemaRoot.getFieldVectors()).asScala().map(fieldVector -> {
            return new ArrowColumnVector(fieldVector);
        })).toArray(ClassTag$.MODULE$.apply(ColumnVector.class));
        ArrayBuffer arrayBuffer = (ArrayBuffer) ArrayBuffer$.MODULE$.apply(Nil$.MODULE$);
        while (arrowStreamReader.loadNextBatch()) {
            ColumnarBatch columnarBatch = new ColumnarBatch(columnVectorArr);
            columnarBatch.setNumRows(vectorSchemaRoot.getRowCount());
            arrayBuffer.appendAll(CollectionConverters$.MODULE$.IteratorHasAsScala(columnarBatch.rowIterator()).asScala().map(internalRow -> {
                return internalRow.copy();
            }));
        }
        arrowStreamReader.close(false);
        return arrayBuffer.iterator();
    }

    public static final /* synthetic */ ArrayBuffer $anonfun$partitions$1(PythonStreamingSourceRunner pythonStreamingSourceRunner, ArrayBuffer arrayBuffer, int i) {
        return arrayBuffer.append(PythonWorkerUtils$.MODULE$.readBytes(pythonStreamingSourceRunner.dataIn()));
    }

    public static final /* synthetic */ void $anonfun$stop$3(PythonWorkerFactory pythonWorkerFactory, PythonWorker pythonWorker) {
        pythonWorkerFactory.stopWorker(pythonWorker);
        pythonWorkerFactory.stop();
    }

    public static final /* synthetic */ void $anonfun$stop$2(PythonStreamingSourceRunner pythonStreamingSourceRunner, PythonWorkerFactory pythonWorkerFactory) {
        pythonStreamingSourceRunner.pythonWorker().foreach(pythonWorker -> {
            $anonfun$stop$3(pythonWorkerFactory, pythonWorker);
            return BoxedUnit.UNIT;
        });
    }

    public PythonStreamingSourceRunner(PythonFunction pythonFunction, StructType structType) {
        this.func = pythonFunction;
        this.outputSchema = structType;
        Logging.$init$(this);
        this.workerModule = "pyspark.sql.streaming.python_streaming_source_runner";
        this.conf = SparkEnv$.MODULE$.get().conf();
        this.bufferSize = BoxesRunTime.unboxToInt(conf().get(org.apache.spark.internal.config.package$.MODULE$.BUFFER_SIZE()));
        this.authSocketTimeout = BoxesRunTime.unboxToLong(conf().get(Python$.MODULE$.PYTHON_AUTH_SOCKET_TIMEOUT()));
        this.envVars = pythonFunction.envVars();
        this.pythonExec = pythonFunction.pythonExec();
        this.pythonWorker = None$.MODULE$;
        this.pythonWorkerFactory = None$.MODULE$;
        this.pythonVer = pythonFunction.pythonVer();
        this.dataOut = null;
        this.dataIn = null;
        this.allocator = ArrowUtils$.MODULE$.rootAllocator().newChildAllocator("stream reader for " + pythonExec(), 0L, Long.MAX_VALUE);
    }
}
