package io.smallrye.reactive.messaging.kafka.impl;

import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.kafka.client.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.HashMap;
import java.util.Optional;
import java.util.UUID;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/smallrye/reactive/messaging/kafka/impl/KafkaSource.class */
public class KafkaSource<K, V> {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSource.class);
    private final PublisherBuilder<? extends Message<?>> source;
    private final KafkaConsumer<K, V> consumer;

    public KafkaSource(Vertx vertx, KafkaConnectorIncomingConfiguration kafkaConnectorIncomingConfiguration) {
        HashMap hashMap = new HashMap();
        String orElseGet = kafkaConnectorIncomingConfiguration.getGroupId().orElseGet(() -> {
            String uuid = UUID.randomUUID().toString();
            LOGGER.warn("No `group.id` set in the configuration, generate a random id: {}", uuid);
            return uuid;
        });
        JsonHelper.asJsonObject(kafkaConnectorIncomingConfiguration.config()).forEach(entry -> {
        });
        hashMap.put("group.id", orElseGet);
        String bootstrapServers = kafkaConnectorIncomingConfiguration.getBootstrapServers();
        if (!hashMap.containsKey("bootstrap.servers")) {
            LOGGER.info("Setting {} to {}", "bootstrap.servers", bootstrapServers);
            hashMap.put("bootstrap.servers", bootstrapServers);
        }
        if (!hashMap.containsKey("key.deserializer")) {
            LOGGER.info("Key deserializer omitted, using String as default");
            hashMap.put("key.deserializer", kafkaConnectorIncomingConfiguration.getKeyDeserializer());
        }
        hashMap.remove("channel-name");
        hashMap.remove("topic");
        hashMap.remove("connector");
        hashMap.remove("retry");
        hashMap.remove("retry-attempts");
        hashMap.remove("broadcast");
        this.consumer = KafkaConsumer.create(vertx, hashMap);
        Optional<String> topic = kafkaConnectorIncomingConfiguration.getTopic();
        kafkaConnectorIncomingConfiguration.getClass();
        String orElseGet2 = topic.orElseGet(kafkaConnectorIncomingConfiguration::getChannel);
        Multi invoke = this.consumer.toMulti().onFailure().invoke(th -> {
            LOGGER.error("Unable to read a record from Kafka topic '{}'", orElseGet2, th);
        });
        if (kafkaConnectorIncomingConfiguration.getRetry().booleanValue()) {
            int intValue = kafkaConnectorIncomingConfiguration.getRetryAttempts().intValue();
            int intValue2 = kafkaConnectorIncomingConfiguration.getRetryMaxWait().intValue();
            if (intValue == -1) {
                invoke.onFailure().retry().withBackOff(Duration.ofSeconds(1L), Duration.ofSeconds(intValue2)).atMost(Long.MAX_VALUE);
            } else {
                invoke = invoke.onFailure().retry().withBackOff(Duration.ofSeconds(1L), Duration.ofSeconds(intValue2)).atMost(intValue);
            }
        }
        this.source = ReactiveStreams.fromPublisher((kafkaConnectorIncomingConfiguration.getBroadcast().booleanValue() ? invoke.broadcast().toAllSubscribers() : invoke).on().subscribed(subscription -> {
            this.consumer.subscribeAndAwait(orElseGet2);
        })).map(kafkaConsumerRecord -> {
            return new IncomingKafkaRecord(this.consumer, kafkaConsumerRecord);
        });
    }

    public PublisherBuilder<? extends Message<?>> getSource() {
        return this.source;
    }

    public void closeQuietly() {
        try {
            this.consumer.closeAndAwait();
        } catch (Throwable th) {
            LOGGER.debug("An exception has been caught while closing the Kafka consumer", th);
        }
    }
}
