package io.confluent.auditlog.emitter.transport;

import io.confluent.auditlog.emitter.errormappers.KafkaTransportException;
import io.confluent.auditlog.emitter.telemetry.NoOpTelemetry;
import io.confluent.auditlog.emitter.telemetry.Telemetry;
import io.confluent.shaded.io.confluent.telemetry.JavaRuntimeResourceLabelProvider;
import io.confluent.telemetry.api.events.Event;
import io.confluent.telemetry.events.exporter.kafka.EventAsyncKafkaExporter;
import java.util.HashMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.common.KafkaException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.zookeeper.server.persistence.FileTxnLog;

/* loaded from: input_file:io/confluent/auditlog/emitter/transport/KafkaTransport.class */
public class KafkaTransport implements Transport {
    private static final Logger log = LogManager.getLogger(KafkaTransport.class);
    private final EventAsyncKafkaExporter eventAsyncKafkaExporter;
    private Telemetry telemetry;
    private String serviceName;

    public KafkaTransport(KafkaTransportOptions kafkaTransportOptions) {
        if (kafkaTransportOptions == null) {
            throw new KafkaTransportException("must provide non-nil params");
        }
        if (kafkaTransportOptions.getConfig() == null) {
            throw new KafkaTransportException("must provide a non-nil config");
        }
        if (StringUtils.isBlank(kafkaTransportOptions.getTopicName())) {
            throw new KafkaTransportException("must provide a topic name");
        }
        this.telemetry = new NoOpTelemetry();
        int topicReplicationFactor = kafkaTransportOptions.getTopicReplicationFactor();
        int topicNumPartitions = kafkaTransportOptions.getTopicNumPartitions();
        topicReplicationFactor = topicReplicationFactor <= 0 ? 3 : topicReplicationFactor;
        topicNumPartitions = topicNumPartitions <= 0 ? 12 : topicNumPartitions;
        HashMap hashMap = new HashMap(kafkaTransportOptions.getConfig());
        hashMap.put("topic.name", kafkaTransportOptions.getTopicName());
        hashMap.put("topic.partitions", Integer.valueOf(topicNumPartitions));
        hashMap.put("topic.replicas", Integer.valueOf(topicReplicationFactor));
        if (kafkaTransportOptions.getTransportBufferSize() > 0) {
            hashMap.put("event.queue.size", Integer.valueOf(kafkaTransportOptions.getTransportBufferSize()));
        }
        this.eventAsyncKafkaExporter = EventAsyncKafkaExporter.newBuilder(hashMap).build();
        log.info("Created a kafka producer to transport audit events.");
    }

    @Override // io.confluent.auditlog.emitter.transport.Transport
    public void init(Telemetry telemetry, String str) {
        this.telemetry = telemetry;
        this.serviceName = str;
    }

    @Override // io.confluent.auditlog.emitter.transport.Transport
    public void log(Event event) {
        log.debug(String.format("[log]: Logging an audit event [id = %s]", event.id()));
        this.telemetry.transportLogCallsCounter.increment("service_name", this.serviceName, "lib", JavaRuntimeResourceLabelProvider.NAMESPACE);
        this.eventAsyncKafkaExporter.emit(event).whenCompleteAsync((bool, th) -> {
            if (th != null) {
                if (th instanceof KafkaException) {
                    log.warn(String.format("[log]: Encountered KafkaException while logging audit event [id = %s]", event.id()), th);
                    this.telemetry.transportErrorCounter.increment("error_in", FileTxnLog.LOG_FILE_PREFIX, "error_type", "kafka_exception", "service_name", this.serviceName, "lib", JavaRuntimeResourceLabelProvider.NAMESPACE);
                } else {
                    log.error(String.format("[log]: Encountered unknown error while logging audit event [id = %s]", event.id()), th);
                    this.telemetry.transportErrorCounter.increment("error_in", FileTxnLog.LOG_FILE_PREFIX, "error_type", "unknown_error", "service_name", this.serviceName, "lib", JavaRuntimeResourceLabelProvider.NAMESPACE);
                }
            }
            if (bool.booleanValue()) {
                log.debug(String.format("[log]: Successfully logged audit event [id = %s]", event.id()));
                this.telemetry.transportSuccessCounter.increment("success_in", FileTxnLog.LOG_FILE_PREFIX, "service_name", this.serviceName, "lib", JavaRuntimeResourceLabelProvider.NAMESPACE);
            } else {
                log.warn(String.format("[log]: Could not log audit event [id = %s]", event.id()));
                this.telemetry.transportErrorCounter.increment("error_in", FileTxnLog.LOG_FILE_PREFIX, "error_type", "incomplete_future", "service_name", this.serviceName, "lib", JavaRuntimeResourceLabelProvider.NAMESPACE);
            }
        });
    }

    @Override // io.confluent.auditlog.emitter.transport.Transport
    public void close() {
        try {
            this.eventAsyncKafkaExporter.close();
            log.debug("[close]: Successfully closed Kafka exporter");
            this.telemetry.transportSuccessCounter.increment("success_in", "close", "service_name", this.serviceName, "lib", JavaRuntimeResourceLabelProvider.NAMESPACE);
        } catch (Exception e) {
            this.telemetry.transportErrorCounter.increment("error_in", "close", "service_name", this.serviceName, "lib", JavaRuntimeResourceLabelProvider.NAMESPACE);
            log.error("[close]: Error while closing KafkaExporter - " + e.getMessage());
            throw e;
        }
    }
}
