package io.smallrye.reactive.messaging.kafka;

import io.reactivex.Flowable;
import io.smallrye.reactive.messaging.spi.ConfigurationHelper;
import io.vertx.reactivex.core.Vertx;
import io.vertx.reactivex.kafka.client.consumer.KafkaConsumer;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.reactivestreams.Publisher;

/* loaded from: input_file:io/smallrye/reactive/messaging/kafka/KafkaSource.class */
public class KafkaSource {
    private final Flowable<KafkaMessage> source;

    public <K, T> KafkaSource(Vertx vertx, Map<String, String> map) {
        ConfigurationHelper create = ConfigurationHelper.create(map);
        KafkaConsumer subscribe = KafkaConsumer.create(vertx, map).subscribe(create.getOrDie("topic"));
        Flowable flowable = subscribe.toFlowable();
        flowable = create.getAsBoolean("retry", true) ? flowable.retryWhen(flowable2 -> {
            return flowable2.zipWith(Flowable.range(1, create.getAsInteger("retry-attempts", 5)), (th, num) -> {
                return num;
            }).flatMap(num2 -> {
                return Flowable.timer(num2.intValue(), TimeUnit.SECONDS);
            });
        }) : flowable;
        this.source = (create.getAsBoolean("broadcast", false) ? flowable.publish().autoConnect() : flowable).map(kafkaConsumerRecord -> {
            return new ReceivedKafkaMessage(subscribe, kafkaConsumerRecord);
        });
    }

    public static CompletionStage<Publisher<? extends Message>> create(Vertx vertx, Map<String, String> map) {
        return CompletableFuture.completedFuture(new KafkaSource(vertx, map).source);
    }

    public Flowable<KafkaMessage> getSource() {
        return this.source;
    }
}
