/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.streaming.flume;

import java.io.Serializable;
import java.util.List;
import org.apache.spark.internal.Logging;
import org.apache.spark.streaming.flume.FlumeConnection;
import org.apache.spark.streaming.flume.FlumePollingReceiver;
import org.apache.spark.streaming.flume.SparkFlumeEvent;
import org.apache.spark.streaming.flume.sink.EventBatch;
import org.apache.spark.streaming.flume.sink.SparkFlumeProtocol;
import org.apache.spark.streaming.flume.sink.SparkSinkEvent;
import org.apache.spark.streaming.flume.sink.SparkSinkUtils$;
import org.slf4j.Logger;
import org.spark_project.guava.base.Throwables;
import scala.Function0;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Some;
import scala.collection.mutable.ArrayBuffer;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;

@ScalaSignature(bytes="\u0006\u0001e4Q!\u0003\u0006\u0001\u0015QA\u0001B\n\u0001\u0003\u0002\u0003\u0006I\u0001\u000b\u0005\u0006Y\u0001!\t!\f\u0005\u0006a\u0001!\t!\r\u0005\u0006q\u0001!I!\u000f\u0005\u0006\u0019\u0002!I!\u0014\u0005\u0006=\u0002!Ia\u0018\u0005\u0006M\u0002!Ia\u001a\u0005\u0006Y\u0002!I!\u001c\u0002\u0012\r2,X.\u001a\"bi\u000eDg)\u001a;dQ\u0016\u0014(BA\u0006\r\u0003\u00151G.^7f\u0015\tia\"A\u0005tiJ,\u0017-\\5oO*\u0011q\u0002E\u0001\u0006gB\f'o\u001b\u0006\u0003#I\ta!\u00199bG\",'\"A\n\u0002\u0007=\u0014xm\u0005\u0003\u0001+u\u0001\u0003C\u0001\f\u001c\u001b\u00059\"B\u0001\r\u001a\u0003\u0011a\u0017M\\4\u000b\u0003i\tAA[1wC&\u0011Ad\u0006\u0002\u0007\u001f\nTWm\u0019;\u0011\u0005Yq\u0012BA\u0010\u0018\u0005!\u0011VO\u001c8bE2,\u0007CA\u0011%\u001b\u0005\u0011#BA\u0012\u000f\u0003!Ig\u000e^3s]\u0006d\u0017BA\u0013#\u0005\u001daunZ4j]\u001e\f\u0001B]3dK&4XM]\u0002\u0001!\tI#&D\u0001\u000b\u0013\tY#B\u0001\u000bGYVlW\rU8mY&twMU3dK&4XM]\u0001\u0007y%t\u0017\u000e\u001e \u0015\u00059z\u0003CA\u0015\u0001\u0011\u00151#\u00011\u0001)\u0003\r\u0011XO\u001c\u000b\u0002eA\u00111GN\u0007\u0002i)\tQ'A\u0003tG\u0006d\u0017-\u0003\u00028i\t!QK\\5u\u0003!9W\r\u001e\"bi\u000eDGC\u0001\u001eD!\r\u00194(P\u0005\u0003yQ\u0012aa\u00149uS>t\u0007C\u0001 B\u001b\u0005y$B\u0001!\u000b\u0003\u0011\u0019\u0018N\\6\n\u0005\t{$AC#wK:$()\u0019;dQ\")A\t\u0002a\u0001\u000b\u000611\r\\5f]R\u0004\"AR%\u000f\u0005y:\u0015B\u0001%@\u0003I\u0019\u0006/\u0019:l\r2,X.\u001a)s_R|7m\u001c7\n\u0005)[%\u0001C\"bY2\u0014\u0017mY6\u000b\u0005!{\u0014!B:u_J,GC\u0001(R!\t\u0019t*\u0003\u0002Qi\t9!i\\8mK\u0006t\u0007\"\u0002*\u0006\u0001\u0004\u0019\u0016A\u00022vM\u001a,'\u000fE\u0002U3nk\u0011!\u0016\u0006\u0003-^\u000bq!\\;uC\ndWM\u0003\u0002Yi\u0005Q1m\u001c7mK\u000e$\u0018n\u001c8\n\u0005i+&aC!se\u0006L()\u001e4gKJ\u0004\"!\u000b/\n\u0005uS!aD*qCJ\\g\t\\;nK\u00163XM\u001c;\u0002\u000fM,g\u000eZ!dWR\u0019!\u0007Y1\t\u000b\u00113\u0001\u0019A#\t\u000b\t4\u0001\u0019A2\u0002\u0007M,\u0017\u000f\u0005\u0002\u0017I&\u0011Qm\u0006\u0002\r\u0007\"\f'oU3rk\u0016t7-Z\u0001\tg\u0016tGMT1dWR!!\u0007\u001b6l\u0011\u0015Iw\u00011\u0001O\u00035\u0011\u0017\r^2i%\u0016\u001cW-\u001b<fI\")Ai\u0002a\u0001\u000b\")!m\u0002a\u0001G\u0006\u0011Bo\\*qCJ\\g\t\\;nK\u00163XM\u001c;t)\t\u0019f\u000eC\u0003p\u0011\u0001\u0007\u0001/\u0001\u0004fm\u0016tGo\u001d\t\u0004cR4X\"\u0001:\u000b\u0005ML\u0012\u0001B;uS2L!!\u001e:\u0003\t1K7\u000f\u001e\t\u0003}]L!\u0001_ \u0003\u001dM\u0003\u0018M]6TS:\\WI^3oi\u0002")
public class FlumeBatchFetcher
implements Runnable,
Logging {
    private final FlumePollingReceiver receiver;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    public String logName() {
        return Logging.logName$((Logging)this);
    }

    public Logger log() {
        return Logging.log$((Logging)this);
    }

    public void logInfo(Function0<String> msg) {
        Logging.logInfo$((Logging)this, msg);
    }

    public void logDebug(Function0<String> msg) {
        Logging.logDebug$((Logging)this, msg);
    }

    public void logTrace(Function0<String> msg) {
        Logging.logTrace$((Logging)this, msg);
    }

    public void logWarning(Function0<String> msg) {
        Logging.logWarning$((Logging)this, msg);
    }

    public void logError(Function0<String> msg) {
        Logging.logError$((Logging)this, msg);
    }

    public void logInfo(Function0<String> msg, Throwable throwable) {
        Logging.logInfo$((Logging)this, msg, (Throwable)throwable);
    }

    public void logDebug(Function0<String> msg, Throwable throwable) {
        Logging.logDebug$((Logging)this, msg, (Throwable)throwable);
    }

    public void logTrace(Function0<String> msg, Throwable throwable) {
        Logging.logTrace$((Logging)this, msg, (Throwable)throwable);
    }

    public void logWarning(Function0<String> msg, Throwable throwable) {
        Logging.logWarning$((Logging)this, msg, (Throwable)throwable);
    }

    public void logError(Function0<String> msg, Throwable throwable) {
        Logging.logError$((Logging)this, msg, (Throwable)throwable);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$((Logging)this);
    }

    public void initializeLogIfNecessary(boolean isInterpreter) {
        Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter);
    }

    public boolean initializeLogIfNecessary(boolean isInterpreter, boolean silent) {
        return Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$((Logging)this);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger x$1) {
        this.org$apache$spark$internal$Logging$$log_ = x$1;
    }

    @Override
    public void run() {
        while (!this.receiver.isStopped()) {
            FlumeConnection connection = this.receiver.getConnections().poll();
            SparkFlumeProtocol.Callback client = connection.client();
            boolean batchReceived = false;
            CharSequence seq = null;
            try {
                try {
                    BoxedUnit boxedUnit;
                    Option<EventBatch> option = this.getBatch(client);
                    if (option instanceof Some) {
                        BoxedUnit boxedUnit2;
                        Some some = (Some)option;
                        EventBatch eventBatch = (EventBatch)some.value();
                        batchReceived = true;
                        seq = eventBatch.getSequenceNumber();
                        ArrayBuffer<SparkFlumeEvent> events = this.toSparkFlumeEvents(eventBatch.getEvents());
                        if (this.store(events)) {
                            this.sendAck(client, seq);
                            boxedUnit2 = BoxedUnit.UNIT;
                        } else {
                            this.sendNack(batchReceived, client, seq);
                            boxedUnit2 = BoxedUnit.UNIT;
                        }
                        boxedUnit = boxedUnit2;
                        continue;
                    }
                    if (None$.MODULE$.equals(option)) {
                        boxedUnit = BoxedUnit.UNIT;
                        continue;
                    }
                    throw new MatchError(option);
                }
                catch (Exception e) {
                    BoxedUnit boxedUnit;
                    Throwable throwable = Throwables.getRootCause((Throwable)e);
                    if (throwable instanceof InterruptedException) {
                        BoxedUnit boxedUnit3;
                        InterruptedException interruptedException = (InterruptedException)throwable;
                        if (!this.receiver.isStopped()) {
                            this.logWarning((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Interrupted while receiving data from Flume", interruptedException);
                            this.sendNack(batchReceived, client, seq);
                            boxedUnit3 = BoxedUnit.UNIT;
                        } else {
                            boxedUnit3 = BoxedUnit.UNIT;
                        }
                        boxedUnit = boxedUnit3;
                        continue;
                    }
                    if (throwable instanceof Exception) {
                        Exception exception = (Exception)throwable;
                        this.logWarning((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Error while receiving data from Flume", exception);
                        this.sendNack(batchReceived, client, seq);
                        boxedUnit = BoxedUnit.UNIT;
                        continue;
                    }
                    throw new MatchError((Object)throwable);
                }
            }
            finally {
                this.receiver.getConnections().add(connection);
            }
        }
    }

    private Option<EventBatch> getBatch(SparkFlumeProtocol.Callback client) {
        None$ none$;
        EventBatch eventBatch = client.getEventBatch(this.receiver.getMaxBatchSize());
        if (!SparkSinkUtils$.MODULE$.isErrorBatch(eventBatch)) {
            this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(48).append("Received batch of ").append(eventBatch.getEvents().size()).append(" events with sequence ").append("number: ").append(eventBatch.getSequenceNumber()).toString());
            none$ = new Some((Object)eventBatch);
        } else {
            this.logWarning((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(73).append("Did not receive events from Flume agent due to error on the Flume agent: ").append(eventBatch.getErrorMsg()).toString());
            none$ = None$.MODULE$;
        }
        return none$;
    }

    private boolean store(ArrayBuffer<SparkFlumeEvent> buffer) {
        boolean bl;
        try {
            this.receiver.store(buffer);
            bl = true;
        }
        catch (Exception e) {
            this.logWarning((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Error while attempting to store data received from Flume", e);
            bl = false;
        }
        return bl;
    }

    private void sendAck(SparkFlumeProtocol.Callback client, CharSequence seq) {
        this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(33).append("Sending ack for sequence number: ").append(seq).toString());
        client.ack(seq);
        this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(30).append("Ack sent for sequence number: ").append(seq).toString());
    }

    private void sendNack(boolean batchReceived, SparkFlumeProtocol.Callback client, CharSequence seq) {
        block0: {
            if (!batchReceived) break block0;
            this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(34).append("Sending nack for sequence number: ").append(seq).toString());
            client.nack(seq);
            this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(31).append("Nack sent for sequence number: ").append(seq).toString());
        }
    }

    /*
     * WARNING - void declaration
     */
    private ArrayBuffer<SparkFlumeEvent> toSparkFlumeEvents(List<SparkSinkEvent> events) {
        void var2_2;
        ArrayBuffer buffer = new ArrayBuffer(events.size());
        for (int j = 0; j < events.size(); ++j) {
            SparkSinkEvent event = events.get(j);
            SparkFlumeEvent sparkFlumeEvent = new SparkFlumeEvent();
            sparkFlumeEvent.event().setBody(event.getBody());
            sparkFlumeEvent.event().setHeaders(event.getHeaders());
            buffer.$plus$eq((Object)sparkFlumeEvent);
        }
        return var2_2;
    }

    public FlumeBatchFetcher(FlumePollingReceiver receiver) {
        this.receiver = receiver;
        Logging.$init$((Logging)this);
    }
}

