package io.confluent.monitoring.clients.interceptor;

import io.confluent.shaded.monitoring.common.Clock;
import io.confluent.shaded.monitoring.common.MonitoringMessageUtil;
import io.confluent.shaded.monitoring.common.SystemClock;
import io.confluent.shaded.monitoring.common.TimeBucket;
import io.confluent.shaded.monitoring.record.Monitoring;
import io.confluent.shaded.serializers.ProtoSerde;
import io.confluent.shaded.serializers.UberSerde;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/monitoring/clients/interceptor/MonitoringInterceptor.class */
public class MonitoringInterceptor implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(MonitoringInterceptor.class);
    private static final int WARNING_THROTTLE_MS = 250;
    private final long samplePeriodMs;
    private final long publishPeriodMs;
    private final int windowHistorySize;
    private final Monitoring.MonitoringMessage baseMonitoringMessage;
    private String clientInfo;
    private final String publishTopic;
    private final UberSerde<Monitoring.MonitoringMessage> monitoringMessageSerde;
    private final ConcurrentHashMap<TopicPartition, MonitoringTimeBuckets> buckets;
    final ConcurrentHashMap<String, AtomicInteger> invalid;
    private final Thread reporterThread;
    private final KafkaProducer<byte[], byte[]> monitoringProducer;
    protected volatile boolean publishing;
    private String session;
    private long sessionExpireTime;
    private final Clock clock;
    private final AtomicLong lastSkippedWarningMs = new AtomicLong(0);
    private volatile boolean closed = false;

    protected static MonitoringInterceptor create(Monitoring.ClientType clientType, String str, String str2, Map<String, ?> map, Clock clock) {
        Map<String, Object> producerProperties = MonitoringInterceptorConfig.getProducerProperties(map);
        MonitoringInterceptorConfig config = MonitoringInterceptorConfig.getConfig(map);
        return new MonitoringInterceptor(getBaseMonitoringMessage(clientType, str, str2, map.get("client.id").toString()), new KafkaProducer(producerProperties), new ProtoSerde(Monitoring.MonitoringMessage.getDefaultInstance()), config.getString(MonitoringInterceptorConfig.TOPIC_CONFIG), TimeBucket.SIZE, config.getLong(MonitoringInterceptorConfig.PUBLISH_PERIOD).longValue(), 10, producerProperties.get("client.id").toString(), clock);
    }

    static Monitoring.MonitoringMessage getBaseMonitoringMessage(Monitoring.ClientType clientType, String str, String str2, String str3) {
        Monitoring.MonitoringMessage.Builder window = Monitoring.MonitoringMessage.newBuilder(MonitoringMessageUtil.baseMonitoringMessage()).setGroup(str2).setClientId(str3).setClientType(clientType).setWindow(-1L);
        if (str != null) {
            window.setClusterId(str);
        }
        return window.buildPartial();
    }

    MonitoringInterceptor(Monitoring.MonitoringMessage monitoringMessage, KafkaProducer<byte[], byte[]> kafkaProducer, UberSerde<Monitoring.MonitoringMessage> uberSerde, String str, long j, long j2, int i, String str2, Clock clock) {
        this.baseMonitoringMessage = monitoringMessage;
        this.monitoringProducer = kafkaProducer;
        this.monitoringMessageSerde = uberSerde;
        this.publishTopic = str;
        this.samplePeriodMs = j;
        this.publishPeriodMs = j2;
        this.windowHistorySize = i;
        this.clientInfo = String.format("client_id=%s client_type=%s session=%s cluster=%s", monitoringMessage.getClientId(), monitoringMessage.getClientType(), monitoringMessage.getSession(), monitoringMessage.getClusterId());
        if (monitoringMessage.getClientType() == Monitoring.ClientType.CONSUMER) {
            this.clientInfo += " group=" + monitoringMessage.getGroup();
        }
        this.publishing = false;
        this.buckets = new ConcurrentHashMap<>();
        this.invalid = new ConcurrentHashMap<>();
        this.reporterThread = new Thread(this, "confluent-metrics-reporter-" + str2);
        this.clock = clock;
        log.info("interceptor={} created for {}", str2, this.clientInfo);
    }

    public static MonitoringInterceptor createForProducer(String str, Map<String, ?> map) {
        return create(Monitoring.ClientType.PRODUCER, str, "", map, new SystemClock());
    }

    public static MonitoringInterceptor createForConsumer(String str, String str2, Map<String, ?> map) {
        return create(Monitoring.ClientType.CONSUMER, str, str2, map, new SystemClock());
    }

    public void start() {
        log.debug("starting");
        if (this.closed) {
            throw new IllegalStateException("Monitoring Interceptor was already shut down");
        }
        this.reporterThread.start();
        this.publishing = true;
    }

    public void recordMessageMetric(String str, int i, long j, int i2, long j2, long j3) {
        if (!this.publishing) {
            throw new IllegalStateException("Publishing thread must be running to enable recording of metrics for " + this.clientInfo);
        }
        try {
            TopicPartition topicPartition = new TopicPartition(str, i);
            MonitoringTimeBuckets monitoringTimeBuckets = this.buckets.get(topicPartition);
            if (monitoringTimeBuckets == null) {
                this.buckets.putIfAbsent(topicPartition, new MonitoringTimeBuckets(this.samplePeriodMs, this.windowHistorySize));
                monitoringTimeBuckets = this.buckets.get(topicPartition);
            }
            AtomicInteger atomicInteger = this.invalid.get(str);
            if (j < 0) {
                if (atomicInteger == null) {
                    this.invalid.putIfAbsent(str, new AtomicInteger(0));
                    atomicInteger = this.invalid.get(str);
                }
                atomicInteger.incrementAndGet();
            } else {
                monitoringTimeBuckets.record(j, i2, j2, j3);
            }
            maybeWarnSkippedMessages(str, atomicInteger);
        } catch (Exception e) {
            log.error("Monitoring Interceptor failed to record message metrics for topic={} partition={} {}", new Object[]{str, Integer.valueOf(i), this.clientInfo, e});
        }
    }

    private void maybeWarnSkippedMessages(String str, AtomicInteger atomicInteger) {
        int andSet;
        if (atomicInteger == null || atomicInteger.get() <= 0) {
            return;
        }
        long currentTimeMillis = this.clock.currentTimeMillis();
        long j = this.lastSkippedWarningMs.get();
        if (currentTimeMillis - j <= 250 || !this.lastSkippedWarningMs.compareAndSet(j, currentTimeMillis) || (andSet = atomicInteger.getAndSet(0)) <= 0) {
            return;
        }
        log.warn("Monitoring Interceptor skipped {} messages with missing or invalid timestamps for topic {}. The messages were either corrupted or using an older message format. Please verify that all your producers support timestamped messages and that your brokers and topics are all configured with log.message.format.version, and message.format.version >= 0.10.0 respectively. You may also experience this if you are consuming older messages produced to Kafka prior to any of those changes taking place.", Integer.valueOf(andSet), str);
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            try {
                startNewSession();
                while (!Thread.currentThread().isInterrupted()) {
                    Thread.sleep(this.publishPeriodMs);
                    publishMetrics();
                }
                this.publishing = false;
                try {
                    publishMetrics(true);
                } catch (Exception e) {
                    log.error("Failed to publish all cached metrics on termination for {}", this.clientInfo, e);
                }
                log.info("Publishing Monitoring Metrics stopped for {}", this.clientInfo);
            } catch (InterruptedException e2) {
                log.info("Publish thread interrupted for {}", this.clientInfo);
                this.publishing = false;
                try {
                    publishMetrics(true);
                } catch (Exception e3) {
                    log.error("Failed to publish all cached metrics on termination for {}", this.clientInfo, e3);
                }
                log.info("Publishing Monitoring Metrics stopped for {}", this.clientInfo);
            } catch (Exception e4) {
                log.error("Terminating publishing and collecting monitoring metrics for {}", this.clientInfo, e4);
                this.publishing = false;
                try {
                    publishMetrics(true);
                } catch (Exception e5) {
                    log.error("Failed to publish all cached metrics on termination for {}", this.clientInfo, e5);
                }
                log.info("Publishing Monitoring Metrics stopped for {}", this.clientInfo);
            }
        } catch (Throwable th) {
            this.publishing = false;
            try {
                publishMetrics(true);
            } catch (Exception e6) {
                log.error("Failed to publish all cached metrics on termination for {}", this.clientInfo, e6);
            }
            log.info("Publishing Monitoring Metrics stopped for {}", this.clientInfo);
            throw th;
        }
    }

    public void shutdown() {
        log.debug("shutting down");
        try {
            if (this.closed) {
                return;
            }
            try {
                this.reporterThread.interrupt();
                this.reporterThread.join(2000L);
            } catch (Exception e) {
                log.error("Failed to shutdown metrics reporting thread for {}", this.clientInfo, e);
                try {
                    this.monitoringProducer.close();
                } catch (Exception e2) {
                    log.error("Failed to close monitoring interceptor for {}", this.clientInfo, e2);
                }
                this.closed = true;
                log.info("Closed monitoring interceptor for {}", this.clientInfo);
            }
        } finally {
            try {
                this.monitoringProducer.close();
            } catch (Exception e3) {
                log.error("Failed to close monitoring interceptor for {}", this.clientInfo, e3);
            }
            this.closed = true;
            log.info("Closed monitoring interceptor for {}", this.clientInfo);
        }
    }

    protected void publishMetrics() {
        publishMetrics(false);
    }

    private void publishMetrics(boolean z) {
        long currentTimeMillis = this.clock.currentTimeMillis();
        boolean z2 = z || isSessionExpired(currentTimeMillis);
        for (Map.Entry<TopicPartition, MonitoringTimeBuckets> entry : this.buckets.entrySet()) {
            Monitoring.MonitoringMessage.Builder nextMetrics = entry.getValue().getNextMetrics(this.baseMonitoringMessage, currentTimeMillis);
            if (nextMetrics == null) {
                Monitoring.MonitoringMessage.Builder emptyMetrics = entry.getValue().getEmptyMetrics(this.baseMonitoringMessage, currentTimeMillis);
                emptyMetrics.setTopic(entry.getKey().topic());
                emptyMetrics.setPartition(entry.getKey().partition());
                emptyMetrics.setSession(this.session);
                if (z2) {
                    emptyMetrics.setShutdown(Boolean.TRUE.booleanValue());
                }
                publishMonitoringMessage(emptyMetrics.build());
            } else {
                while (nextMetrics != null) {
                    nextMetrics.setTopic(entry.getKey().topic());
                    nextMetrics.setPartition(entry.getKey().partition());
                    nextMetrics.setSession(this.session);
                    Monitoring.MonitoringMessage.Builder nextMetrics2 = entry.getValue().getNextMetrics(this.baseMonitoringMessage, currentTimeMillis);
                    if (nextMetrics2 == null && z2) {
                        nextMetrics.setShutdown(Boolean.TRUE.booleanValue());
                    }
                    publishMonitoringMessage(nextMetrics.build());
                    nextMetrics = nextMetrics2;
                }
            }
        }
        if (z2) {
            startNewSession();
        }
    }

    protected void startNewSession() {
        this.session = UUID.randomUUID().toString();
        Iterator<MonitoringTimeBuckets> it = this.buckets.values().iterator();
        while (it.hasNext()) {
            it.next().resetSequence();
        }
        this.sessionExpireTime = this.clock.currentTimeMillis() + TimeBucket.MAX_SESSION_DURATION;
    }

    protected long sessionExpireTime() {
        return this.sessionExpireTime;
    }

    protected boolean isSessionExpired(long j) {
        return this.sessionExpireTime <= j;
    }

    protected void publishMonitoringMessage(Monitoring.MonitoringMessage monitoringMessage) {
        log.debug("publish {}", log.isTraceEnabled() ? monitoringMessage : "");
        try {
            this.monitoringProducer.send(new ProducerRecord(this.publishTopic, (Integer) null, (Object) null, this.monitoringMessageSerde.serialize(monitoringMessage)), new Callback() { // from class: io.confluent.monitoring.clients.interceptor.MonitoringInterceptor.1
                public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                    if (exc == null) {
                        MonitoringInterceptor.log.trace("successfully produced {}", recordMetadata);
                    } else {
                        MonitoringInterceptor.log.warn("failed to produce for {}", MonitoringInterceptor.this.clientInfo, exc);
                    }
                }
            });
        } catch (Throwable th) {
            log.warn("failed to publish monitoring message for {}", this.clientInfo, th);
        }
    }

    static {
        log.debug("available");
    }
}
