/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.monitoring.clients.interceptor;

import io.confluent.monitoring.clients.interceptor.MonitoringInterceptorConfig;
import io.confluent.monitoring.clients.interceptor.MonitoringTimeBuckets;
import io.confluent.shaded.monitoring.common.Clock;
import io.confluent.shaded.monitoring.common.MonitoringMessageUtil;
import io.confluent.shaded.monitoring.common.SystemClock;
import io.confluent.shaded.monitoring.common.TimeBucket;
import io.confluent.shaded.monitoring.record.Monitoring;
import io.confluent.shaded.serializers.ProtoSerde;
import io.confluent.shaded.serializers.UberSerde;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MonitoringInterceptor
implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(MonitoringInterceptor.class);
    private static final int WARNING_THROTTLE_MS = 250;
    private final long samplePeriodMs;
    private final long publishPeriodMs;
    private final int windowHistorySize;
    private final Monitoring.MonitoringMessage baseMonitoringMessage;
    private String clientInfo;
    private final String publishTopic;
    private final UberSerde<Monitoring.MonitoringMessage> monitoringMessageSerde;
    private final ConcurrentHashMap<TopicPartition, MonitoringTimeBuckets> buckets;
    private final AtomicLong lastSkippedWarningMs = new AtomicLong(0L);
    final ConcurrentHashMap<String, AtomicInteger> invalid;
    private final Thread reporterThread;
    private final KafkaProducer<byte[], byte[]> monitoringProducer;
    protected volatile boolean publishing;
    private volatile boolean closed = false;
    private String session;
    private long sessionExpireTime;
    private final Clock clock;

    protected static MonitoringInterceptor create(Monitoring.ClientType clientType, String clusterId, String groupId, Map<String, ?> configs, Clock clock) {
        Map<String, Object> producerProperties = MonitoringInterceptorConfig.getProducerProperties(configs);
        MonitoringInterceptorConfig interceptorConfig = MonitoringInterceptorConfig.getConfig(configs);
        String clientId = configs.get("client.id").toString();
        return new MonitoringInterceptor(MonitoringInterceptor.getBaseMonitoringMessage(clientType, clusterId, groupId, clientId), (KafkaProducer<byte[], byte[]>)new KafkaProducer(producerProperties), new ProtoSerde<Monitoring.MonitoringMessage>(Monitoring.MonitoringMessage.getDefaultInstance()), interceptorConfig.getString("confluent.monitoring.interceptor.topic"), TimeBucket.SIZE, interceptorConfig.getLong("confluent.monitoring.interceptor.publishMs"), 10, producerProperties.get("client.id").toString(), clock);
    }

    static Monitoring.MonitoringMessage getBaseMonitoringMessage(Monitoring.ClientType clientType, String clusterId, String groupId, String clientId) {
        Monitoring.MonitoringMessage.Builder monitoringMessageBuilder = Monitoring.MonitoringMessage.newBuilder(MonitoringMessageUtil.baseMonitoringMessage()).setGroup(groupId).setClientId(clientId).setClientType(clientType).setWindow(-1L);
        if (clusterId != null) {
            monitoringMessageBuilder.setClusterId(clusterId);
        }
        return monitoringMessageBuilder.buildPartial();
    }

    MonitoringInterceptor(Monitoring.MonitoringMessage baseMonitoringMessage, KafkaProducer<byte[], byte[]> monitoringProducer, UberSerde<Monitoring.MonitoringMessage> monitoringMessageSerde, String publishTopic, long samplePeriodMs, long publishPeriodMs, int windowHistorySize, String interceptorClientId, Clock clock) {
        this.baseMonitoringMessage = baseMonitoringMessage;
        this.monitoringProducer = monitoringProducer;
        this.monitoringMessageSerde = monitoringMessageSerde;
        this.publishTopic = publishTopic;
        this.samplePeriodMs = samplePeriodMs;
        this.publishPeriodMs = publishPeriodMs;
        this.windowHistorySize = windowHistorySize;
        this.clientInfo = String.format("client_id=%s client_type=%s session=%s cluster=%s", baseMonitoringMessage.getClientId(), baseMonitoringMessage.getClientType(), baseMonitoringMessage.getSession(), baseMonitoringMessage.getClusterId());
        if (baseMonitoringMessage.getClientType() == Monitoring.ClientType.CONSUMER) {
            this.clientInfo = this.clientInfo + " group=" + baseMonitoringMessage.getGroup();
        }
        this.publishing = false;
        this.buckets = new ConcurrentHashMap();
        this.invalid = new ConcurrentHashMap();
        this.reporterThread = new Thread((Runnable)this, "confluent-metrics-reporter-" + interceptorClientId);
        this.clock = clock;
        log.info("interceptor={} created for {}", (Object)interceptorClientId, (Object)this.clientInfo);
    }

    public static MonitoringInterceptor createForProducer(String clusterId, Map<String, ?> configs) {
        return MonitoringInterceptor.create(Monitoring.ClientType.PRODUCER, clusterId, "", configs, new SystemClock());
    }

    public static MonitoringInterceptor createForConsumer(String clusterId, String groupId, Map<String, ?> configs) {
        return MonitoringInterceptor.create(Monitoring.ClientType.CONSUMER, clusterId, groupId, configs, new SystemClock());
    }

    public void start() {
        log.debug("starting");
        if (this.closed) {
            throw new IllegalStateException("Monitoring Interceptor was already shut down");
        }
        this.reporterThread.start();
        this.publishing = true;
    }

    public void recordMessageMetric(String topic, int partition, long timestamp, int size, long checksum, long latency) {
        if (!this.publishing) {
            throw new IllegalStateException("Publishing thread must be running to enable recording of metrics for " + this.clientInfo);
        }
        try {
            TopicPartition tp = new TopicPartition(topic, partition);
            MonitoringTimeBuckets tpBuckets = this.buckets.get(tp);
            if (tpBuckets == null) {
                this.buckets.putIfAbsent(tp, new MonitoringTimeBuckets(this.samplePeriodMs, this.windowHistorySize));
                tpBuckets = this.buckets.get(tp);
            }
            AtomicInteger invalid = this.invalid.get(topic);
            if (timestamp < 0L) {
                if (invalid == null) {
                    this.invalid.putIfAbsent(topic, new AtomicInteger(0));
                    invalid = this.invalid.get(topic);
                }
                invalid.incrementAndGet();
            } else {
                tpBuckets.record(timestamp, size, checksum, latency);
            }
            this.maybeWarnSkippedMessages(topic, invalid);
        }
        catch (Exception e) {
            log.error("Monitoring Interceptor failed to record message metrics for topic={} partition={} {}", new Object[]{topic, partition, this.clientInfo, e});
        }
    }

    private void maybeWarnSkippedMessages(String topic, AtomicInteger count) {
        int n;
        long t0;
        long now;
        if (count != null && count.get() > 0 && (now = this.clock.currentTimeMillis()) - (t0 = this.lastSkippedWarningMs.get()) > 250L && this.lastSkippedWarningMs.compareAndSet(t0, now) && (n = count.getAndSet(0)) > 0) {
            log.warn("Monitoring Interceptor skipped {} messages with missing or invalid timestamps for topic {}. The messages were either corrupted or using an older message format. Please verify that all your producers support timestamped messages and that your brokers and topics are all configured with log.message.format.version, and message.format.version >= 0.10.0 respectively. You may also experience this if you are consuming older messages produced to Kafka prior to any of those changes taking place.", (Object)n, (Object)topic);
        }
    }

    @Override
    public void run() {
        try {
            this.startNewSession();
            while (!Thread.currentThread().isInterrupted()) {
                Thread.sleep(this.publishPeriodMs);
                this.publishMetrics();
            }
        }
        catch (InterruptedException ie) {
            log.info("Publish thread interrupted for {}", (Object)this.clientInfo);
        }
        catch (Exception e) {
            log.error("Terminating publishing and collecting monitoring metrics for {}", (Object)this.clientInfo, (Object)e);
        }
        finally {
            this.publishing = false;
            try {
                this.publishMetrics(true);
            }
            catch (Exception e) {
                log.error("Failed to publish all cached metrics on termination for {}", (Object)this.clientInfo, (Object)e);
            }
            log.info("Publishing Monitoring Metrics stopped for {}", (Object)this.clientInfo);
        }
    }

    public void shutdown() {
        log.debug("shutting down");
        if (!this.closed) {
            try {
                this.reporterThread.interrupt();
                this.reporterThread.join(2000L);
            }
            catch (Exception e) {
                log.error("Failed to shutdown metrics reporting thread for {}", (Object)this.clientInfo, (Object)e);
            }
            finally {
                try {
                    this.monitoringProducer.close();
                }
                catch (Exception e) {
                    log.error("Failed to close monitoring interceptor for {}", (Object)this.clientInfo, (Object)e);
                }
                this.closed = true;
                log.info("Closed monitoring interceptor for {}", (Object)this.clientInfo);
            }
        }
    }

    protected void publishMetrics() {
        this.publishMetrics(false);
    }

    private void publishMetrics(boolean shutdown) {
        long now = this.clock.currentTimeMillis();
        boolean shouldShutdown = shutdown || this.isSessionExpired(now);
        for (Map.Entry<TopicPartition, MonitoringTimeBuckets> entry : this.buckets.entrySet()) {
            Monitoring.MonitoringMessage.Builder publishMessage = entry.getValue().getNextMetrics(this.baseMonitoringMessage, now);
            if (publishMessage == null) {
                Monitoring.MonitoringMessage.Builder heartbeatMsg = entry.getValue().getEmptyMetrics(this.baseMonitoringMessage, now);
                heartbeatMsg.setTopic(entry.getKey().topic());
                heartbeatMsg.setPartition(entry.getKey().partition());
                heartbeatMsg.setSession(this.session);
                if (shouldShutdown) {
                    heartbeatMsg.setShutdown(Boolean.TRUE);
                }
                this.publishMonitoringMessage(heartbeatMsg.build());
                continue;
            }
            while (publishMessage != null) {
                publishMessage.setTopic(entry.getKey().topic());
                publishMessage.setPartition(entry.getKey().partition());
                publishMessage.setSession(this.session);
                Monitoring.MonitoringMessage.Builder next = entry.getValue().getNextMetrics(this.baseMonitoringMessage, now);
                if (next == null && shouldShutdown) {
                    publishMessage.setShutdown(Boolean.TRUE);
                }
                this.publishMonitoringMessage(publishMessage.build());
                publishMessage = next;
            }
        }
        if (shouldShutdown) {
            this.startNewSession();
        }
    }

    protected void startNewSession() {
        this.session = UUID.randomUUID().toString();
        for (MonitoringTimeBuckets bucket : this.buckets.values()) {
            bucket.resetSequence();
        }
        this.sessionExpireTime = this.clock.currentTimeMillis() + TimeBucket.MAX_SESSION_DURATION;
    }

    protected long sessionExpireTime() {
        return this.sessionExpireTime;
    }

    protected boolean isSessionExpired(long now) {
        return this.sessionExpireTime <= now;
    }

    protected void publishMonitoringMessage(Monitoring.MonitoringMessage monitoringMessage) {
        log.debug("publish {}", log.isTraceEnabled() ? monitoringMessage : "");
        try {
            this.monitoringProducer.send(new ProducerRecord(this.publishTopic, null, null, (Object)this.monitoringMessageSerde.serialize(monitoringMessage)), new Callback(){

                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null) {
                        log.trace("successfully produced {}", (Object)recordMetadata);
                    } else {
                        log.warn("failed to produce for {}", (Object)MonitoringInterceptor.this.clientInfo, (Object)e);
                    }
                }
            });
        }
        catch (Throwable t) {
            log.warn("failed to publish monitoring message for {}", (Object)this.clientInfo, (Object)t);
        }
    }

    static {
        log.debug("available");
    }
}

