/*
 * Decompiled with CFR 0.152.
 */
package com.holdenkarau.spark.testing;

import com.holdenkarau.spark.testing.BatchCountListener;
import com.holdenkarau.spark.testing.StreamingSuiteBase;
import com.holdenkarau.spark.testing.TestInputStream;
import java.io.Serializable;
import org.apache.spark.streaming.TestStreamingContext;
import org.apache.spark.streaming.dstream.DStream;
import org.apache.spark.streaming.util.TestManualClock;
import org.scalactic.Bool;
import org.scalactic.Bool$;
import org.scalactic.Prettifier$;
import org.scalactic.source.Position;
import org.scalatest.Assertions;
import scala.Function0;
import scala.Function1;
import scala.Predef$;
import scala.collection.Seq;
import scala.reflect.ClassTag;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.runtime.java8.JFunction1;

@ScalaSignature(bytes="\u0006\u0001\u0005ea!C\u0001\u0003!\u0003\r\taCA\u0003\u0005M\u0019FO]3b[&tw-Q2uS>t')Y:f\u0015\t\u0019A!A\u0004uKN$\u0018N\\4\u000b\u0005\u00151\u0011!B:qCJ\\'BA\u0004\t\u0003-Aw\u000e\u001c3f].\f'/Y;\u000b\u0003%\t1aY8n\u0007\u0001\u00192\u0001\u0001\u0007\u0013!\ti\u0001#D\u0001\u000f\u0015\u0005y\u0011!B:dC2\f\u0017BA\t\u000f\u0005\u0019\te.\u001f*fMB\u00111\u0003F\u0007\u0002\u0005%\u0011QC\u0001\u0002\u0013'R\u0014X-Y7j]\u001e\u001cV/\u001b;f\u0005\u0006\u001cX\rC\u0003\u0018\u0001\u0011\u0005\u0001$\u0001\u0004%S:LG\u000f\n\u000b\u00023A\u0011QBG\u0005\u000379\u0011A!\u00168ji\"9Q\u0004\u0001b\u0001\n\u0003q\u0012A\u00052bi\u000eD7i\\;oi2K7\u000f^3oKJ,\u0012a\b\t\u0003'\u0001J!!\t\u0002\u0003%\t\u000bGo\u00195D_VtG\u000fT5ti\u0016tWM\u001d\u0005\u0006G\u0001!\t\u0001J\u0001\neVt\u0017i\u0019;j_:,\"!J\u0019\u0015\u0007\u0019R\u0014\n\u0006\u0002\u001aO!9\u0001FIA\u0001\u0002\bI\u0013AC3wS\u0012,gnY3%cA\u0019!&L\u0018\u000e\u0003-R!\u0001\f\b\u0002\u000fI,g\r\\3di&\u0011af\u000b\u0002\t\u00072\f7o\u001d+bOB\u0011\u0001'\r\u0007\u0001\t\u0015\u0011$E1\u00014\u0005\u0005)\u0016C\u0001\u001b8!\tiQ'\u0003\u00027\u001d\t9aj\u001c;iS:<\u0007CA\u00079\u0013\tIdBA\u0002B]fDQa\u000f\u0012A\u0002q\nQ!\u001b8qkR\u00042!P#I\u001d\tq4I\u0004\u0002@\u00056\t\u0001I\u0003\u0002B\u0015\u00051AH]8pizJ\u0011aD\u0005\u0003\t:\tq\u0001]1dW\u0006<W-\u0003\u0002G\u000f\n\u00191+Z9\u000b\u0005\u0011s\u0001cA\u001fF_!)!J\ta\u0001\u0017\u0006Iq\u000e]3sCRLwN\u001c\t\u0005\u001b1s\u0015$\u0003\u0002N\u001d\tIa)\u001e8di&|g.\r\t\u0004\u001ff{S\"\u0001)\u000b\u0005E\u0013\u0016a\u00023tiJ,\u0017-\u001c\u0006\u0003'R\u000b\u0011b\u001d;sK\u0006l\u0017N\\4\u000b\u0005\u0015)&B\u0001,X\u0003\u0019\t\u0007/Y2iK*\t\u0001,A\u0002pe\u001eL!A\u0017)\u0003\u000f\u0011\u001bFO]3b[\")A\f\u0001C\u0005;\u0006!r/\u001b;i'R\u0014X-Y7j]\u001e\u001cuN\u001c;fqR$\"A\u00184\u0015\u0005ey\u0006\"\u00021\\\u0001\u0004\t\u0017!\u00022m_\u000e\\\u0007\u0003B\u0007MEf\u0001\"a\u00193\u000e\u0003IK!!\u001a*\u0003)Q+7\u000f^*ue\u0016\fW.\u001b8h\u0007>tG/\u001a=u\u0011\u001597\f1\u0001c\u0003=yW\u000f\u001e9viN#(/Z1n'N\u001b\u0005\"B5\u0001\t\u0013Q\u0017aC:fiV\u00048\u000b\u001e:fC6,\"a[9\u0015\u00071\u0014X\u000f\u0006\u0002c[\"9a\u000e[A\u0001\u0002\by\u0017AC3wS\u0012,gnY3%eA\u0019!&\f9\u0011\u0005A\nH!\u0002\u001ai\u0005\u0004\u0019\u0004\"B\u001ei\u0001\u0004\u0019\bcA\u001fFiB\u0019Q(\u00129\t\u000b)C\u0007\u0019\u0001<\u0011\t5auo\u000e\t\u0004\u001ff\u0003\b\"B=\u0001\t\u0013Q\u0018a\u0004:v]\u0006\u001bG/[8o'R\u0014X-Y7\u0015\u0007eYX\u0010C\u0003}q\u0002\u0007!-A\u0002tg\u000eDQA =A\u0002}\f!B\\;n\u0005\u0006$8\r[3t!\ri\u0011\u0011A\u0005\u0004\u0003\u0007q!aA%oiJ1\u0011qAA\u0006\u0003\u001b1a!!\u0003\u0001\u0001\u0005\u0015!\u0001\u0004\u001fsK\u001aLg.Z7f]Rt\u0004CA\n\u0001!\u0011\ty!!\u0006\u000e\u0005\u0005E!bAA\n/\u0006I1oY1mCR,7\u000f^\u0005\u0005\u0003/\t\tBA\u0003Tk&$X\r")
public interface StreamingActionBase
extends StreamingSuiteBase {
    public void com$holdenkarau$spark$testing$StreamingActionBase$_setter_$batchCountListener_$eq(BatchCountListener var1);

    public BatchCountListener batchCountListener();

    public static /* synthetic */ void runAction$(StreamingActionBase $this, Seq input, Function1 operation, ClassTag evidence$1) {
        $this.runAction(input, operation, evidence$1);
    }

    default public <U> void runAction(Seq<Seq<U>> input, Function1<DStream<U>, BoxedUnit> operation, ClassTag<U> evidence$1) {
        int numBatches_ = input.size();
        this.withStreamingContext(this.setupStream(input, operation, evidence$1), (Function1<TestStreamingContext, BoxedUnit>)(Function1 & Serializable & scala.Serializable)ssc -> {
            this.runActionStream(ssc, numBatches_);
            return BoxedUnit.UNIT;
        });
    }

    private void withStreamingContext(TestStreamingContext outputStreamSSC, Function1<TestStreamingContext, BoxedUnit> block) {
        try {
            block.apply((Object)outputStreamSSC);
        }
        finally {
            try {
                outputStreamSSC.stop(false);
            }
            catch (Throwable e) {
                this.logError((Function0 & Serializable & scala.Serializable)() -> "Error stopping StreamingContext", e);
            }
        }
    }

    private <U> TestStreamingContext setupStream(Seq<Seq<U>> input, Function1<DStream<U>, Object> operation, ClassTag<U> evidence$2) {
        TestStreamingContext ssc = new TestStreamingContext(this.sc(), this.batchDuration());
        ssc.addStreamingListener(this.batchCountListener());
        if (this.checkpointDir() != null) {
            ssc.checkpoint(this.checkpointDir());
        }
        TestInputStream<U> inputStream = this.createTestInputStream(this.sc(), ssc, input, evidence$2);
        operation.apply(inputStream);
        return ssc;
    }

    private void runActionStream(TestStreamingContext ssc, int numBatches) {
        int $org_scalatest_assert_macro_left = numBatches;
        int $org_scalatest_assert_macro_right = 0;
        Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_left), ">", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left > $org_scalatest_assert_macro_right, Prettifier$.MODULE$.default());
        ((Assertions)this).assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"Number of batches to run stream computation is zero", Prettifier$.MODULE$.default(), new Position("StreamingActionBase.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 83));
        this.batchCountListener().batchCount_$eq(0);
        ssc.start();
        TestManualClock clock = (TestManualClock)ssc.getScheduler().clock();
        this.logInfo((Function0 & Serializable & scala.Serializable)() -> new StringBuilder(32).append("Manual clock before advancing = ").append(clock.currentTime()).toString());
        if (this.actuallyWait()) {
            RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), numBatches).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
                this.logInfo((Function0 & Serializable & scala.Serializable)() -> new StringBuilder(21).append("Actually waiting for ").append(this.batchDuration()).toString());
                clock.addToTime(this.batchDuration().milliseconds());
                Thread.sleep(this.batchDuration().milliseconds());
            });
        } else {
            clock.addToTime((long)numBatches * this.batchDuration().milliseconds());
        }
        this.logInfo((Function0 & Serializable & scala.Serializable)() -> new StringBuilder(31).append("Manual clock after advancing = ").append(clock.currentTime()).toString());
        long startTime = System.currentTimeMillis();
        while (this.batchCountListener().batchCount() < numBatches && System.currentTimeMillis() - startTime < (long)this.maxWaitTimeMillis()) {
            this.logInfo((Function0 & Serializable & scala.Serializable)() -> new StringBuilder(16).append("batches: run = ").append(this.batchCountListener().batchCount()).append(" ").append(new StringBuilder(9).append("target = ").append(numBatches).toString()).toString());
            ssc.awaitTerminationOrTimeout(50L);
        }
        long timeTaken = System.currentTimeMillis() - startTime;
        this.logInfo((Function0 & Serializable & scala.Serializable)() -> new StringBuilder(33).append("Output generated in ").append(timeTaken).append(" milliseconds").toString());
        Thread.sleep(100L);
    }

    public static void $init$(StreamingActionBase $this) {
        $this.com$holdenkarau$spark$testing$StreamingActionBase$_setter_$batchCountListener_$eq(new BatchCountListener());
    }
}

