package com.lightbend.lagom.internal.broker.kafka;

import akka.Done$;
import akka.actor.Status;
import akka.pattern.package$;
import akka.persistence.query.Offset;
import akka.stream.KillSwitches$;
import akka.stream.UniqueKillSwitch;
import akka.stream.scaladsl.FlowOpsMat;
import akka.stream.scaladsl.Keep$;
import akka.stream.scaladsl.RestartSource$;
import akka.stream.scaladsl.Sink$;
import akka.stream.scaladsl.Source;
import akka.stream.scaladsl.Source$;
import com.lightbend.lagom.internal.spi.projection.ProjectionSpi$;
import com.lightbend.lagom.spi.persistence.OffsetDao;
import scala.Function1;
import scala.MatchError;
import scala.Serializable;
import scala.Some;
import scala.Tuple2;
import scala.concurrent.Future;
import scala.runtime.AbstractPartialFunction;
import scala.runtime.BoxedUnit;

/* compiled from: TopicProducerActor.scala */
/* loaded from: input_file:com/lightbend/lagom/internal/broker/kafka/TopicProducerActor$$anonfun$receive$1.class */
public final class TopicProducerActor$$anonfun$receive$1 extends AbstractPartialFunction<Object, BoxedUnit> implements Serializable {
    public static final long serialVersionUID = 0;
    private final /* synthetic */ TopicProducerActor $outer;

    public final <A1, B1> B1 applyOrElse(A1 a1, Function1<A1, B1> function1) {
        Object apply;
        if (TopicProducerActor$Start$.MODULE$.equals(a1)) {
            Tuple2 tuple2 = (Tuple2) RestartSource$.MODULE$.withBackoff(this.$outer.com$lightbend$lagom$internal$broker$kafka$TopicProducerActor$$producerConfig.minBackoff(), this.$outer.com$lightbend$lagom$internal$broker$kafka$TopicProducerActor$$producerConfig.maxBackoff(), this.$outer.com$lightbend$lagom$internal$broker$kafka$TopicProducerActor$$producerConfig.randomBackoffFactor(), () -> {
                return Source$.MODULE$.future(this.$outer.com$lightbend$lagom$internal$broker$kafka$TopicProducerActor$$eventualBrokersAndOffset(this.$outer.tagName())).initialTimeout(this.$outer.com$lightbend$lagom$internal$broker$kafka$TopicProducerActor$$producerConfig.offsetTimeout()).flatMapConcat(tuple22 -> {
                    if (tuple22 == null) {
                        throw new MatchError(tuple22);
                    }
                    String str = (String) tuple22._1();
                    OffsetDao offsetDao = (OffsetDao) tuple22._2();
                    this.$outer.log().debug("Kafka service {} located at URIs [{}] for producer of [{}]", (String) this.$outer.com$lightbend$lagom$internal$broker$kafka$TopicProducerActor$$kafkaConfig.serviceName().map(str2 -> {
                        return new StringBuilder(2).append("[").append(str2).append("]").toString();
                    }).getOrElse(() -> {
                        return "";
                    }), str, this.$outer.com$lightbend$lagom$internal$broker$kafka$TopicProducerActor$$topicId);
                    Source watchTermination = ((FlowOpsMat) this.$outer.com$lightbend$lagom$internal$broker$kafka$TopicProducerActor$$eventStreamFactory.apply(this.$outer.tagName(), offsetDao.loadedOffset())).watchTermination((obj, future) -> {
                        return future.recoverWith(new TopicProducerActor$$anonfun$receive$1$$anonfun$$nestedInanonfun$applyOrElse$5$1(this, future), this.$outer.com$lightbend$lagom$internal$broker$kafka$TopicProducerActor$$ec);
                    });
                    return watchTermination.map(tuple22 -> {
                        if (tuple22 == null) {
                            throw new MatchError(tuple22);
                        }
                        return new Tuple2(tuple22._1(), ProjectionSpi$.MODULE$.afterUserFlow(this.$outer.com$lightbend$lagom$internal$broker$kafka$TopicProducerActor$$workerCoordinates.projectionName(), this.$outer.com$lightbend$lagom$internal$broker$kafka$TopicProducerActor$$workerCoordinates.tagName(), (Offset) tuple22._2()));
                    }).via(this.$outer.com$lightbend$lagom$internal$broker$kafka$TopicProducerActor$$eventsPublisherFlow(str, offsetDao)).map(future2 -> {
                        return future2.map(offset -> {
                            ProjectionSpi$.MODULE$.completedProcessing(this.$outer.com$lightbend$lagom$internal$broker$kafka$TopicProducerActor$$workerCoordinates.projectionName(), this.$outer.com$lightbend$lagom$internal$broker$kafka$TopicProducerActor$$workerCoordinates.tagName(), offset);
                            return offset;
                        }, this.$outer.com$lightbend$lagom$internal$broker$kafka$TopicProducerActor$$ec);
                    });
                });
            }).viaMat(KillSwitches$.MODULE$.single(), Keep$.MODULE$.right()).toMat(Sink$.MODULE$.ignore(), Keep$.MODULE$.both()).run(this.$outer.com$lightbend$lagom$internal$broker$kafka$TopicProducerActor$$mat);
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            Tuple2 tuple22 = new Tuple2((UniqueKillSwitch) tuple2._1(), (Future) tuple2._2());
            UniqueKillSwitch uniqueKillSwitch = (UniqueKillSwitch) tuple22._1();
            Future future = (Future) tuple22._2();
            this.$outer.com$lightbend$lagom$internal$broker$kafka$TopicProducerActor$$shutdown_$eq(new Some(uniqueKillSwitch));
            package$.MODULE$.pipe(future, this.$outer.com$lightbend$lagom$internal$broker$kafka$TopicProducerActor$$ec).pipeTo(this.$outer.self(), this.$outer.self());
            apply = BoxedUnit.UNIT;
        } else if (Done$.MODULE$.equals(a1)) {
            this.$outer.log().info("Kafka producer stream for topic {} was completed.", this.$outer.com$lightbend$lagom$internal$broker$kafka$TopicProducerActor$$topicId);
            this.$outer.context().stop(this.$outer.self());
            apply = BoxedUnit.UNIT;
        } else {
            if (a1 instanceof Status.Failure) {
                throw ((Status.Failure) a1).cause();
            }
            apply = function1.apply(a1);
        }
        return (B1) apply;
    }

    public final boolean isDefinedAt(Object obj) {
        return TopicProducerActor$Start$.MODULE$.equals(obj) ? true : Done$.MODULE$.equals(obj) ? true : obj instanceof Status.Failure;
    }

    public /* synthetic */ TopicProducerActor com$lightbend$lagom$internal$broker$kafka$TopicProducerActor$$anonfun$$$outer() {
        return this.$outer;
    }

    public TopicProducerActor$$anonfun$receive$1(TopicProducerActor topicProducerActor) {
        if (topicProducerActor == null) {
            throw null;
        }
        this.$outer = topicProducerActor;
    }
}
