package io.confluent.telemetry.events.exporter.kafka;

import io.cloudevents.CloudEvent;
import io.cloudevents.core.format.EventFormat;
import io.cloudevents.core.message.Encoding;
import io.cloudevents.core.provider.EventFormatProvider;
import io.cloudevents.kafka.impl.KafkaSerializerMessageWriterImpl;
import io.confluent.telemetry.api.events.Event;
import io.confluent.telemetry.events.EventUtils;
import io.confluent.telemetry.events.Extensions;
import io.confluent.telemetry.events.exporter.kafka.async.AsyncKafkaExporter;
import io.confluent.telemetry.events.exporter.kafka.async.SingleTopicSupplier;
import io.confluent.telemetry.events.exporter.kafka.async.TopicSupplier;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/telemetry/events/exporter/kafka/EventAsyncKafkaExporter.class */
public class EventAsyncKafkaExporter extends AsyncKafkaExporter<Event> {
    private static final Logger log = LoggerFactory.getLogger(EventAsyncKafkaExporter.class);
    public static EventFormat format = EventFormatProvider.getInstance().resolveFormat("application/cloudevents+json");

    /* loaded from: input_file:io/confluent/telemetry/events/exporter/kafka/EventAsyncKafkaExporter$Builder.class */
    public static final class Builder {
        private Properties producerProperties;
        private int bufferSize = KafkaExporterConfig.DEFAULT_EVENT_QUEUE_SIZE;
        private TopicSupplier<Event> topicSupplier;
        private Function<Event, ProducerRecord<String, byte[]>> dataSerializer;
        private Predicate<Event> filter;

        public Builder withBufferSize(int i) {
            this.bufferSize = i;
            return this;
        }

        public Builder withProducerProperties(Properties properties) {
            this.producerProperties = properties;
            return this;
        }

        public Builder withTopicSupplier(TopicSupplier<Event> topicSupplier) {
            this.topicSupplier = topicSupplier;
            return this;
        }

        public Builder withDataSerializer(Function<Event, ProducerRecord<String, byte[]>> function) {
            this.dataSerializer = function;
            return this;
        }

        public Builder withFilter(Predicate<Event> predicate) {
            this.filter = predicate;
            return this;
        }

        public EventAsyncKafkaExporter build() {
            Objects.requireNonNull(this.producerProperties);
            return new EventAsyncKafkaExporter(this);
        }
    }

    protected EventAsyncKafkaExporter(Builder builder) {
        super(builder.producerProperties, builder.bufferSize, builder.topicSupplier, builder.dataSerializer, builder.filter);
    }

    public static Builder newBuilder(Map<String, Object> map) {
        Encoding encoding = (map.containsKey(KafkaExporterConfig.CLOUD_EVENT_ENCODING_CONFIG) && map.get(KafkaExporterConfig.CLOUD_EVENT_ENCODING_CONFIG).equals(Encoding.STRUCTURED.toString())) ? Encoding.STRUCTURED : Encoding.BINARY;
        KafkaExporterConfig kafkaExporterConfig = new KafkaExporterConfig(map);
        SingleTopicSupplier singleTopicSupplier = new SingleTopicSupplier(new NewTopic(kafkaExporterConfig.getTopicName(), kafkaExporterConfig.getTopicPartitions(), (short) kafkaExporterConfig.getTopicReplicas()).configs(kafkaExporterConfig.getTopicConfig()), kafkaExporterConfig.getProducerProperties());
        Encoding encoding2 = encoding;
        return new Builder().withProducerProperties(kafkaExporterConfig.getProducerProperties()).withBufferSize(kafkaExporterConfig.getInt(KafkaExporterConfig.EVENT_QUEUE_SIZE).intValue()).withTopicSupplier(singleTopicSupplier).withDataSerializer(event -> {
            String topicName = kafkaExporterConfig.getTopicName();
            String str = null;
            if (event.extensionNames().contains(Extensions.ROUTE)) {
                topicName = event.extension(Extensions.ROUTE);
            }
            if (event.extensionNames().contains(Extensions.PARTITION_KEY)) {
                str = event.extension(Extensions.PARTITION_KEY);
            }
            CloudEvent cloudEvent = EventUtils.toCloudEvent(event);
            RecordHeaders recordHeaders = new RecordHeaders();
            return new ProducerRecord(topicName, (Integer) null, (Long) null, str, encoding2 == Encoding.STRUCTURED ? (byte[]) new KafkaSerializerMessageWriterImpl(recordHeaders).writeStructured(cloudEvent, format) : (byte[]) new KafkaSerializerMessageWriterImpl(recordHeaders).writeBinary(cloudEvent), recordHeaders);
        }).withFilter(kafkaExporterConfig.isFilteringEnabled() ? new EventRouteFilter(kafkaExporterConfig.getAllowedRoutes()) : event2 -> {
            return true;
        });
    }
}
