package io.confluent.monitoring.clients.interceptor;

import io.confluent.shaded.monitoring.common.MonitoringMessageUtil;
import io.confluent.shaded.monitoring.common.TimeBucket;
import io.confluent.shaded.monitoring.record.Monitoring;
import io.confluent.shaded.serializers.ProtoSerde;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/monitoring/clients/interceptor/MonitoringInterceptor.class */
public class MonitoringInterceptor implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(MonitoringInterceptor.class);
    private final long publishPeriodMs;
    private final Monitoring.MonitoringMessage baseMonitoringMessage;
    private final MonitoringInterceptorConfig interceptorConfig;
    private final String publishTopic;
    private final ProtoSerde<Monitoring.MonitoringMessage> monitoringMessageSerde;
    private ConcurrentHashMap<TopicPartition, MonitoringTimeBuckets> buckets;
    private Thread reporterThread;
    private volatile boolean publishing;
    private KafkaProducer<byte[], byte[]> monitoringProducer;
    private volatile boolean closed = false;
    private final long samplePeriodMs = TimeBucket.SIZE;
    private final int windowHistorySize = 10;

    public MonitoringInterceptor(Monitoring.ClientType clientType, String str, String str2, Map<String, ?> map) {
        this.interceptorConfig = MonitoringInterceptorConfig.getConfig(map);
        this.publishPeriodMs = this.interceptorConfig.getLong(MonitoringInterceptorConfig.PUBLISH_PERIOD).longValue();
        this.publishTopic = this.interceptorConfig.getString(MonitoringInterceptorConfig.TOPIC_CONFIG);
        Properties producerProperties = MonitoringInterceptorConfig.getProducerProperties(map);
        this.baseMonitoringMessage = Monitoring.MonitoringMessage.newBuilder(MonitoringMessageUtil.baseMonitoringMessage()).setSession(UUID.randomUUID().toString()).setClusterId(str).setGroup(str2).setClientId(map.get("client.id").toString()).setClientType(clientType).setWindow(-1L).buildPartial();
        this.buckets = new ConcurrentHashMap<>();
        String property = producerProperties.getProperty("client.id");
        this.reporterThread = new Thread(this, "confluent-metrics-reporter-" + property);
        this.publishing = false;
        log.info("creating producer for client={} {}", property, producerProperties);
        this.monitoringProducer = new KafkaProducer<>(producerProperties);
        this.monitoringMessageSerde = new ProtoSerde<>(Monitoring.MonitoringMessage.getDefaultInstance());
    }

    public static MonitoringInterceptor createForProducer(String str, Map<String, ?> map) {
        return new MonitoringInterceptor(Monitoring.ClientType.PRODUCER, str, "", map);
    }

    public static MonitoringInterceptor createForConsumer(String str, String str2, Map<String, ?> map) {
        return new MonitoringInterceptor(Monitoring.ClientType.CONSUMER, str, str2, map);
    }

    public void start() {
        if (this.closed) {
            throw new IllegalStateException("Monitoring Interceptor was already shut down");
        }
        this.reporterThread.start();
        this.publishing = true;
    }

    public void recordMessageMetric(String str, int i, long j, int i2, long j2, long j3) {
        if (!this.publishing) {
            throw new IllegalStateException("Publishing thread must be running to enable recording of metrics");
        }
        try {
            TopicPartition topicPartition = new TopicPartition(str, i);
            MonitoringTimeBuckets monitoringTimeBuckets = this.buckets.get(topicPartition);
            if (monitoringTimeBuckets == null) {
                this.buckets.putIfAbsent(topicPartition, new MonitoringTimeBuckets(this.samplePeriodMs, this.windowHistorySize));
                monitoringTimeBuckets = this.buckets.get(topicPartition);
            }
            monitoringTimeBuckets.record(j, i2, j2, j3);
        } catch (Exception e) {
            log.error("Monitoring Interceptor failed to record message metrics for topic: " + str + ", partition: " + i, e);
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        while (!Thread.currentThread().isInterrupted()) {
            try {
                try {
                    Thread.sleep(this.publishPeriodMs);
                    publishMetrics();
                } catch (InterruptedException e) {
                    log.info("Publish thread interrupted!");
                    this.publishing = false;
                    try {
                        publishMetrics(true);
                    } catch (Exception e2) {
                        log.error("Failed to publish all cached metrics on termination", e2);
                    }
                    log.info("Publishing Monitoring Metrics stopped for clientID={}", this.baseMonitoringMessage.getClientId());
                    return;
                } catch (Exception e3) {
                    log.error("Terminating publishing and collecting monitoring metrics", e3);
                    this.publishing = false;
                    try {
                        publishMetrics(true);
                    } catch (Exception e4) {
                        log.error("Failed to publish all cached metrics on termination", e4);
                    }
                    log.info("Publishing Monitoring Metrics stopped for clientID={}", this.baseMonitoringMessage.getClientId());
                    return;
                }
            } catch (Throwable th) {
                this.publishing = false;
                try {
                    publishMetrics(true);
                } catch (Exception e5) {
                    log.error("Failed to publish all cached metrics on termination", e5);
                }
                log.info("Publishing Monitoring Metrics stopped for clientID={}", this.baseMonitoringMessage.getClientId());
                throw th;
            }
        }
        this.publishing = false;
        try {
            publishMetrics(true);
        } catch (Exception e6) {
            log.error("Failed to publish all cached metrics on termination", e6);
        }
        log.info("Publishing Monitoring Metrics stopped for clientID={}", this.baseMonitoringMessage.getClientId());
    }

    public void shutdown() {
        try {
            if (this.closed) {
                return;
            }
            try {
                this.reporterThread.interrupt();
                this.reporterThread.join(2000L);
                try {
                    this.monitoringProducer.close();
                } catch (Exception e) {
                    log.error("Failed to close monitoring interceptor", e);
                }
                this.closed = true;
                log.info("Closed monitoring interceptor for client ID={}", this.baseMonitoringMessage.getClientId());
            } catch (Exception e2) {
                log.error("Failed to shutdown metrics reporting thread", e2);
                try {
                    this.monitoringProducer.close();
                } catch (Exception e3) {
                    log.error("Failed to close monitoring interceptor", e3);
                }
                this.closed = true;
                log.info("Closed monitoring interceptor for client ID={}", this.baseMonitoringMessage.getClientId());
            }
        } catch (Throwable th) {
            try {
                this.monitoringProducer.close();
            } catch (Exception e4) {
                log.error("Failed to close monitoring interceptor", e4);
            }
            this.closed = true;
            log.info("Closed monitoring interceptor for client ID={}", this.baseMonitoringMessage.getClientId());
            throw th;
        }
    }

    private void publishMetrics() {
        publishMetrics(false);
    }

    private void publishMetrics(boolean z) {
        long currentTimeMillis = System.currentTimeMillis();
        for (Map.Entry<TopicPartition, MonitoringTimeBuckets> entry : this.buckets.entrySet()) {
            Monitoring.MonitoringMessage.Builder nextMetrics = entry.getValue().getNextMetrics(this.baseMonitoringMessage, currentTimeMillis);
            if (nextMetrics == null) {
                Monitoring.MonitoringMessage.Builder emptyMetrics = entry.getValue().getEmptyMetrics(this.baseMonitoringMessage, currentTimeMillis);
                emptyMetrics.setTopic(entry.getKey().topic());
                emptyMetrics.setPartition(entry.getKey().partition());
                if (z) {
                    emptyMetrics.setShutdown(Boolean.TRUE.booleanValue());
                }
                publishMonitoringMessage(emptyMetrics.build());
            } else {
                while (nextMetrics != null) {
                    nextMetrics.setTopic(entry.getKey().topic());
                    nextMetrics.setPartition(entry.getKey().partition());
                    Monitoring.MonitoringMessage.Builder nextMetrics2 = entry.getValue().getNextMetrics(this.baseMonitoringMessage, currentTimeMillis);
                    if (z && nextMetrics2 == null) {
                        nextMetrics.setShutdown(Boolean.TRUE.booleanValue());
                    }
                    publishMonitoringMessage(nextMetrics.build());
                    nextMetrics = nextMetrics2;
                }
            }
        }
    }

    protected void publishMonitoringMessage(Monitoring.MonitoringMessage monitoringMessage) {
        log.debug("Publish {}", monitoringMessage);
        try {
            this.monitoringProducer.send(new ProducerRecord(this.publishTopic, Integer.valueOf(MonitoringMessageUtil.partitionFor(monitoringMessage, this.monitoringProducer.partitionsFor(this.publishTopic).size())), (Object) null, this.monitoringMessageSerde.serialize((ProtoSerde<Monitoring.MonitoringMessage>) monitoringMessage)));
        } catch (Throwable th) {
            log.warn("failed to publish monitoring message", th);
        }
    }
}
