package com.lightbend.lagom.internal.broker.kafka;

import akka.Done;
import akka.actor.Actor;
import akka.actor.ActorContext;
import akka.actor.ActorLogging;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.SupervisorStrategy;
import akka.annotation.InternalApi;
import akka.event.LoggingAdapter;
import akka.kafka.AutoSubscription;
import akka.kafka.ConsumerMessage;
import akka.kafka.ConsumerSettings;
import akka.kafka.scaladsl.Committer$;
import akka.kafka.scaladsl.Consumer$;
import akka.pattern.package$;
import akka.stream.FanInShape2;
import akka.stream.FanOutShape2;
import akka.stream.FlowShape;
import akka.stream.KillSwitch;
import akka.stream.KillSwitches$;
import akka.stream.Materializer;
import akka.stream.OverflowStrategy$;
import akka.stream.UniqueKillSwitch;
import akka.stream.scaladsl.Flow;
import akka.stream.scaladsl.Flow$;
import akka.stream.scaladsl.GraphDSL$;
import akka.stream.scaladsl.GraphDSL$Implicits$;
import akka.stream.scaladsl.Keep$;
import akka.stream.scaladsl.Sink$;
import akka.stream.scaladsl.Source;
import akka.stream.scaladsl.Unzip$;
import akka.stream.scaladsl.Zip$;
import com.lightbend.lagom.internal.api.UriUtils$;
import java.net.URI;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.PartialFunction;
import scala.PartialFunction$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.immutable.Nil$;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.Promise;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.Nothing$;

/* compiled from: KafkaSubscriberActor.scala */
@ScalaSignature(bytes = "\u0006\u0001\tEb!B\u000e\u001d\u0001\tB\u0003\u0002C\u001e\u0001\u0005\u0003\u0005\u000b\u0011B\u001f\t\u0011\u0005\u0003!\u0011!Q\u0001\n\tC\u0001\"\u0012\u0001\u0003\u0002\u0003\u0006IA\u0012\u0005\tW\u0002\u0011\t\u0011)A\u0005\u0013\"AA\u000e\u0001B\u0001B\u0003%Q\u000e\u0003\u0006\u0002\u0012\u0001\u0011\t\u0011)A\u0005\u0003'A!\"a\t\u0001\u0005\u0003\u0005\u000b\u0011BA\u0013\u0011)\tY\u0003\u0001B\u0001B\u0003%\u0011Q\u0006\u0005\u000b\u0003g\u0001!\u0011!Q\u0001\n\u0005U\u0002BCA)\u0001\t\u0005\t\u0015a\u0003\u0002T!Q\u00111\f\u0001\u0003\u0002\u0003\u0006Y!!\u0018\t\u000f\u0005\r\u0004\u0001\"\u0001\u0002f!I\u0011\u0011\u0012\u0001A\u0002\u0013%\u00111\u0012\u0005\n\u00033\u0003\u0001\u0019!C\u0005\u00037C\u0001\"a*\u0001A\u0003&\u0011Q\u0012\u0005\b\u0003S\u0003A\u0011IAV\u0011\u001d\ti\u000b\u0001C!\u0003WCq!a,\u0001\t\u0013\t\t\fC\u0004\u0002@\u0002!I!!1\t\u000f\u0005\r\u0007\u0001\"\u0011\u0002F\"9\u0011Q\u001a\u0001\u0005\n\u0005=\u0007bBAl\u0001\u0011%\u0011\u0011\\\u0004\b\u0003[d\u0002\u0012AAx\r\u0019YB\u0004#\u0001\u0002r\"9\u00111\r\r\u0005\u0002\u0005M\bbBA{1\u0011\u0005\u0011q\u001f\u0002\u0015\u0017\u000647.Y*vEN\u001c'/\u001b2fe\u0006\u001bGo\u001c:\u000b\u0005uq\u0012!B6bM.\f'BA\u0010!\u0003\u0019\u0011'o\\6fe*\u0011\u0011EI\u0001\tS:$XM\u001d8bY*\u00111\u0005J\u0001\u0006Y\u0006<w.\u001c\u0006\u0003K\u0019\n\u0011\u0002\\5hQR\u0014WM\u001c3\u000b\u0003\u001d\n1aY8n+\u0011I\u0013q\u0004=\u0014\t\u0001Q\u0003\u0007\u000f\t\u0003W9j\u0011\u0001\f\u0006\u0002[\u0005)1oY1mC&\u0011q\u0006\f\u0002\u0007\u0003:L(+\u001a4\u0011\u0005E2T\"\u0001\u001a\u000b\u0005M\"\u0014!B1di>\u0014(\"A\u001b\u0002\t\u0005\\7.Y\u0005\u0003oI\u0012Q!Q2u_J\u0004\"!M\u001d\n\u0005i\u0012$\u0001D!di>\u0014Hj\\4hS:<\u0017aC6bM.\f7i\u001c8gS\u001e\u001c\u0001\u0001\u0005\u0002?\u007f5\tA$\u0003\u0002A9\tY1*\u00194lC\u000e{gNZ5h\u00039\u0019wN\\:v[\u0016\u00148i\u001c8gS\u001e\u0004\"AP\"\n\u0005\u0011c\"AD\"p]N,X.\u001a:D_:4\u0017nZ\u0001\u000eY>\u001c\u0017\r^3TKJ4\u0018nY3\u0011\t-:\u0015\nV\u0005\u0003\u00112\u0012\u0011BR;oGRLwN\\\u0019\u0011\u0005)\u000bfBA&P!\taE&D\u0001N\u0015\tqE(\u0001\u0004=e>|GOP\u0005\u0003!2\na\u0001\u0015:fI\u00164\u0017B\u0001*T\u0005\u0019\u0019FO]5oO*\u0011\u0001\u000b\f\t\u0004+bSV\"\u0001,\u000b\u0005]c\u0013AC2p]\u000e,(O]3oi&\u0011\u0011L\u0016\u0002\u0007\rV$XO]3\u0011\u0007m\u00037M\u0004\u0002]=:\u0011A*X\u0005\u0002[%\u0011q\fL\u0001\ba\u0006\u001c7.Y4f\u0013\t\t'MA\u0002TKFT!a\u0018\u0017\u0011\u0005\u0011LW\"A3\u000b\u0005\u0019<\u0017a\u00018fi*\t\u0001.\u0001\u0003kCZ\f\u0017B\u00016f\u0005\r)&+S\u0001\bi>\u0004\u0018nY%e\u0003\u00111Gn\\<1\u00079\fi\u0001E\u0004piZ\f\u0019!a\u0003\u000e\u0003AT!!\u001d:\u0002\u0011M\u001c\u0017\r\\1eg2T!a\u001d\u001b\u0002\rM$(/Z1n\u0013\t)\bO\u0001\u0003GY><\bCA<y\u0019\u0001!Q!\u001f\u0001C\u0002i\u0014\u0011cU;cg\u000e\u0014\u0018NY3s!\u0006LHn\\1e#\tYh\u0010\u0005\u0002,y&\u0011Q\u0010\f\u0002\b\u001d>$\b.\u001b8h!\tYs0C\u0002\u0002\u00021\u00121!\u00118z!\u0011\t)!a\u0002\u000e\u0003QJ1!!\u00035\u0005\u0011!uN\\3\u0011\u0007]\fi\u0001\u0002\u0006\u0002\u0010\u0015\t\t\u0011!A\u0003\u0002i\u00141a\u0018\u00132\u0003A\u0019wN\\:v[\u0016\u00148+\u001a;uS:<7\u000fE\u0004\u0002\u0016\u0005e\u0011*!\b\u000e\u0005\u0005]!BA\u000f5\u0013\u0011\tY\"a\u0006\u0003!\r{gn];nKJ\u001cV\r\u001e;j]\u001e\u001c\bcA<\u0002 \u00111\u0011\u0011\u0005\u0001C\u0002i\u0014q\u0001U1zY>\fG-\u0001\u0007tk\n\u001c8M]5qi&|g\u000e\u0005\u0003\u0002\u0016\u0005\u001d\u0012\u0002BA\u0015\u0003/\u0011\u0001#Q;u_N+(m]2sSB$\u0018n\u001c8\u0002\u001fM$(/Z1n\u0007>l\u0007\u000f\\3uK\u0012\u0004R!VA\u0018\u0003\u0007I1!!\rW\u0005\u001d\u0001&o\\7jg\u0016\f\u0011\u0002\u001e:b]N4wN]7\u0011\u000b-:\u0015q\u0007<\u0011\u000f\u0005e\u0012QJ%\u0002\u001e5\u0011\u00111\b\u0006\u0005\u0003{\ty$\u0001\u0005d_:\u001cX/\\3s\u0015\u0011\t\t%a\u0011\u0002\u000f\rd\u0017.\u001a8ug*\u0019Q$!\u0012\u000b\t\u0005\u001d\u0013\u0011J\u0001\u0007CB\f7\r[3\u000b\u0005\u0005-\u0013aA8sO&!\u0011qJA\u001e\u00059\u0019uN\\:v[\u0016\u0014(+Z2pe\u0012\f1!\\1u!\u0011\t)&a\u0016\u000e\u0003IL1!!\u0017s\u00051i\u0015\r^3sS\u0006d\u0017N_3s\u0003\t)7\rE\u0002V\u0003?J1!!\u0019W\u0005A)\u00050Z2vi&|gnQ8oi\u0016DH/\u0001\u0004=S:LGO\u0010\u000b\u0015\u0003O\ny'!\u001d\u0002t\u0005U\u0014qOAA\u0003\u0007\u000b))a\"\u0015\r\u0005%\u00141NA7!\u0015q\u0004!!\bw\u0011\u001d\t\t\u0006\u0004a\u0002\u0003'Bq!a\u0017\r\u0001\b\ti\u0006C\u0003<\u0019\u0001\u0007Q\bC\u0003B\u0019\u0001\u0007!\tC\u0003F\u0019\u0001\u0007a\tC\u0003l\u0019\u0001\u0007\u0011\n\u0003\u0004m\u0019\u0001\u0007\u0011\u0011\u0010\u0019\u0005\u0003w\ny\bE\u0004piZ\f\u0019!! \u0011\u0007]\fy\bB\u0006\u0002\u0010\u0005]\u0014\u0011!A\u0001\u0006\u0003Q\bbBA\t\u0019\u0001\u0007\u00111\u0003\u0005\b\u0003Ga\u0001\u0019AA\u0013\u0011\u001d\tY\u0003\u0004a\u0001\u0003[Aq!a\r\r\u0001\u0004\t)$\u0001\u0005tQV$Hm\\<o+\t\ti\tE\u0003,\u0003\u001f\u000b\u0019*C\u0002\u0002\u00122\u0012aa\u00149uS>t\u0007\u0003BA+\u0003+K1!a&s\u0005)Y\u0015\u000e\u001c7To&$8\r[\u0001\rg\",H\u000fZ8x]~#S-\u001d\u000b\u0005\u0003;\u000b\u0019\u000bE\u0002,\u0003?K1!!)-\u0005\u0011)f.\u001b;\t\u0013\u0005\u0015f\"!AA\u0002\u00055\u0015a\u0001=%c\u0005I1\u000f[;uI><h\u000eI\u0001\taJ,7\u000b^1siR\u0011\u0011QT\u0001\ta>\u001cHo\u0015;pa\u0006yAn\\2bi&twmU3sm&\u001cW\r\u0006\u0003\u00024\u0006m\u0006\u0003BA[\u0003ok\u0011\u0001A\u0005\u0004\u0003s3$a\u0002*fG\u0016Lg/\u001a\u0005\u0007\u0003{\u0013\u0002\u0019A%\u0002\t9\fW.Z\u0001\beVtg.\u001b8h+\t\t\u0019,A\u0004sK\u000e,\u0017N^3\u0016\u0005\u0005\u001d\u0007#B\u0016\u0002Jz\\\u0018bAAfY\ty\u0001+\u0019:uS\u0006dg)\u001e8di&|g.A\u0002sk:$B!!(\u0002R\"9\u00111[\u000bA\u0002\u0005U\u0017aA;sSB!1&a$J\u0003-\tG\u000fT3bgR|enY3\u0015\t\u0005m\u0017\u0011\u001e\u0019\u0005\u0003;\f)\u000fE\u0004p\u0003?\f\u0019!a9\n\u0007\u0005\u0005\bO\u0001\u0004T_V\u00148-\u001a\t\u0004o\u0006\u0015HACAt-\u0005\u0005\t\u0011!B\u0001u\n\u0019q\f\n\u001a\t\u000f\u0005-h\u00031\u0001\u0002V\u0006\u00112/\u001a:wS\u000e,Gj\\2bi>\u0014XK]5t\u0003QY\u0015MZ6b'V\u00147o\u0019:jE\u0016\u0014\u0018i\u0019;peB\u0011a\bG\n\u00031)\"\"!a<\u0002\u000bA\u0014x\u000e]:\u0016\r\u0005e(Q\u0005B\f)Q\tYPa\u0002\u0003\n\t-!Q\u0002B\b\u0005?\u00119C!\u000b\u0003,Q1\u0011Q B\u0002\u0005\u000b\u00012!MA��\u0013\r\u0011\tA\r\u0002\u0006!J|\u0007o\u001d\u0005\b\u0003#R\u00029AA*\u0011\u001d\tYF\u0007a\u0002\u0003;BQa\u000f\u000eA\u0002uBQ!\u0011\u000eA\u0002\tCQ!\u0012\u000eA\u0002\u0019CQa\u001b\u000eA\u0002%Ca\u0001\u001c\u000eA\u0002\tE\u0001\u0007\u0002B\n\u00057\u0001\u0002b\u001c;\u0003\u0016\u0005\r!\u0011\u0004\t\u0004o\n]A!B=\u001b\u0005\u0004Q\bcA<\u0003\u001c\u0011Y!Q\u0004B\b\u0003\u0003\u0005\tQ!\u0001{\u0005\ryFe\r\u0005\b\u0003#Q\u0002\u0019\u0001B\u0011!\u001d\t)\"!\u0007J\u0005G\u00012a\u001eB\u0013\t\u0019\t\tC\u0007b\u0001u\"9\u00111\u0005\u000eA\u0002\u0005\u0015\u0002bBA\u00165\u0001\u0007\u0011Q\u0006\u0005\b\u0003gQ\u0002\u0019\u0001B\u0017!\u0019YsIa\f\u0003\u0016A9\u0011\u0011HA'\u0013\n\r\u0002")
/* loaded from: input_file:com/lightbend/lagom/internal/broker/kafka/KafkaSubscriberActor.class */
public class KafkaSubscriberActor<Payload, SubscriberPayload> implements Actor, ActorLogging {
    private final KafkaConfig kafkaConfig;
    private final ConsumerConfig consumerConfig;
    private final Function1<String, Future<Seq<URI>>> locateService;
    public final String com$lightbend$lagom$internal$broker$kafka$KafkaSubscriberActor$$topicId;
    private final Flow<SubscriberPayload, Done, ?> flow;
    private final ConsumerSettings<String, Payload> consumerSettings;
    private final AutoSubscription subscription;
    public final Promise<Done> com$lightbend$lagom$internal$broker$kafka$KafkaSubscriberActor$$streamCompleted;
    private final Function1<ConsumerRecord<String, Payload>, SubscriberPayload> transform;
    private final Materializer mat;
    private final ExecutionContext ec;
    private Option<KillSwitch> shutdown;
    private LoggingAdapter akka$actor$ActorLogging$$_log;
    private final ActorContext context;
    private final ActorRef self;

    public static <Payload, SubscriberPayload> Props props(KafkaConfig kafkaConfig, ConsumerConfig consumerConfig, Function1<String, Future<Seq<URI>>> function1, String str, Flow<SubscriberPayload, Done, ?> flow, ConsumerSettings<String, Payload> consumerSettings, AutoSubscription autoSubscription, Promise<Done> promise, Function1<ConsumerRecord<String, Payload>, SubscriberPayload> function12, Materializer materializer, ExecutionContext executionContext) {
        return KafkaSubscriberActor$.MODULE$.props(kafkaConfig, consumerConfig, function1, str, flow, consumerSettings, autoSubscription, promise, function12, materializer, executionContext);
    }

    public LoggingAdapter log() {
        return ActorLogging.log$(this);
    }

    public final ActorRef sender() {
        return Actor.sender$(this);
    }

    @InternalApi
    public void aroundReceive(PartialFunction<Object, BoxedUnit> partialFunction, Object obj) {
        Actor.aroundReceive$(this, partialFunction, obj);
    }

    @InternalApi
    public void aroundPreStart() {
        Actor.aroundPreStart$(this);
    }

    @InternalApi
    public void aroundPostStop() {
        Actor.aroundPostStop$(this);
    }

    @InternalApi
    public void aroundPreRestart(Throwable th, Option<Object> option) {
        Actor.aroundPreRestart$(this, th, option);
    }

    @InternalApi
    public void aroundPostRestart(Throwable th) {
        Actor.aroundPostRestart$(this, th);
    }

    public SupervisorStrategy supervisorStrategy() {
        return Actor.supervisorStrategy$(this);
    }

    public void preRestart(Throwable th, Option<Object> option) throws Exception {
        Actor.preRestart$(this, th, option);
    }

    public void postRestart(Throwable th) throws Exception {
        Actor.postRestart$(this, th);
    }

    public void unhandled(Object obj) {
        Actor.unhandled$(this, obj);
    }

    public LoggingAdapter akka$actor$ActorLogging$$_log() {
        return this.akka$actor$ActorLogging$$_log;
    }

    public void akka$actor$ActorLogging$$_log_$eq(LoggingAdapter loggingAdapter) {
        this.akka$actor$ActorLogging$$_log = loggingAdapter;
    }

    public ActorContext context() {
        return this.context;
    }

    public final ActorRef self() {
        return this.self;
    }

    public void akka$actor$Actor$_setter_$context_$eq(ActorContext actorContext) {
        this.context = actorContext;
    }

    public final void akka$actor$Actor$_setter_$self_$eq(ActorRef actorRef) {
        this.self = actorRef;
    }

    private Option<KillSwitch> shutdown() {
        return this.shutdown;
    }

    private void shutdown_$eq(Option<KillSwitch> option) {
        this.shutdown = option;
    }

    public void preStart() {
        Some serviceName = this.kafkaConfig.serviceName();
        if (!(serviceName instanceof Some)) {
            if (!None$.MODULE$.equals(serviceName)) {
                throw new MatchError(serviceName);
            }
            com$lightbend$lagom$internal$broker$kafka$KafkaSubscriberActor$$run(None$.MODULE$);
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            return;
        }
        String str = (String) serviceName.value();
        log().debug("Looking up Kafka service from service locator with name [{}] for at least once source", str);
        package$.MODULE$.pipe(((Future) this.locateService.apply(str)).map(seq -> {
            return Nil$.MODULE$.equals(seq) ? None$.MODULE$ : new Some(UriUtils$.MODULE$.hostAndPorts(seq));
        }, this.ec), this.ec).pipeTo(self(), self());
        context().become(locatingService(str));
        BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
    }

    public void postStop() {
        shutdown().foreach(killSwitch -> {
            killSwitch.shutdown();
            return BoxedUnit.UNIT;
        });
    }

    private PartialFunction<Object, BoxedUnit> locatingService(String str) {
        return new KafkaSubscriberActor$$anonfun$locatingService$1(this, str);
    }

    private PartialFunction<Object, BoxedUnit> running() {
        return new KafkaSubscriberActor$$anonfun$running$1(this);
    }

    public PartialFunction<Object, Nothing$> receive() {
        return PartialFunction$.MODULE$.empty();
    }

    public void com$lightbend$lagom$internal$broker$kafka$KafkaSubscriberActor$$run(Option<String> option) {
        Tuple2 tuple2 = (Tuple2) atLeastOnce(option).viaMat(KillSwitches$.MODULE$.single(), Keep$.MODULE$.right()).toMat(Sink$.MODULE$.ignore(), Keep$.MODULE$.both()).run(this.mat);
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        Tuple2 tuple22 = new Tuple2((UniqueKillSwitch) tuple2._1(), (Future) tuple2._2());
        UniqueKillSwitch uniqueKillSwitch = (UniqueKillSwitch) tuple22._1();
        Future future = (Future) tuple22._2();
        shutdown_$eq(new Some(uniqueKillSwitch));
        package$.MODULE$.pipe(future, this.ec).pipeTo(self(), self());
        context().become(running());
    }

    private Source<Done, ?> atLeastOnce(Option<String> option) {
        ConsumerSettings<String, Payload> consumerSettings;
        if (option instanceof Some) {
            consumerSettings = this.consumerSettings.withBootstrapServers((String) ((Some) option).value());
        } else {
            if (!None$.MODULE$.equals(option)) {
                throw new MatchError(option);
            }
            consumerSettings = this.consumerSettings;
        }
        return Consumer$.MODULE$.committableSource(consumerSettings, this.subscription).map(committableMessage -> {
            return new Tuple2(committableMessage.committableOffset(), this.transform.apply(committableMessage.record()));
        }).via(Flow$.MODULE$.fromGraph(GraphDSL$.MODULE$.create(this.flow, builder -> {
            return flowShape -> {
                FanOutShape2 add = builder.add(Unzip$.MODULE$.apply());
                FanInShape2 add2 = builder.add(Zip$.MODULE$.apply());
                FlowShape add3 = builder.add(Flow$.MODULE$.apply().map(tuple2 -> {
                    return (ConsumerMessage.CommittableOffset) tuple2._1();
                }).via(Committer$.MODULE$.flow(this.consumerConfig.committerSettings())));
                GraphDSL$Implicits$.MODULE$.port2flow(add.out0(), builder).$tilde$greater(Flow$.MODULE$.apply().buffer(this.consumerConfig.offsetBuffer(), OverflowStrategy$.MODULE$.backpressure()), builder).$tilde$greater(add2.in0(), builder);
                GraphDSL$Implicits$.MODULE$.port2flow(add.out1(), builder).$tilde$greater(flowShape, builder).$tilde$greater(add2.in1(), builder);
                GraphDSL$Implicits$.MODULE$.port2flow(add2.out(), builder).$tilde$greater(add3.in(), builder);
                return new FlowShape(add.in(), add3.out());
            };
        })));
    }

    public KafkaSubscriberActor(KafkaConfig kafkaConfig, ConsumerConfig consumerConfig, Function1<String, Future<Seq<URI>>> function1, String str, Flow<SubscriberPayload, Done, ?> flow, ConsumerSettings<String, Payload> consumerSettings, AutoSubscription autoSubscription, Promise<Done> promise, Function1<ConsumerRecord<String, Payload>, SubscriberPayload> function12, Materializer materializer, ExecutionContext executionContext) {
        this.kafkaConfig = kafkaConfig;
        this.consumerConfig = consumerConfig;
        this.locateService = function1;
        this.com$lightbend$lagom$internal$broker$kafka$KafkaSubscriberActor$$topicId = str;
        this.flow = flow;
        this.consumerSettings = consumerSettings;
        this.subscription = autoSubscription;
        this.com$lightbend$lagom$internal$broker$kafka$KafkaSubscriberActor$$streamCompleted = promise;
        this.transform = function12;
        this.mat = materializer;
        this.ec = executionContext;
        Actor.$init$(this);
        ActorLogging.$init$(this);
        this.shutdown = None$.MODULE$;
    }
}
