/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.controlcenter.streams.verify;

import com.google.common.collect.Lists;
import io.confluent.controlcenter.record.Controlcenter;
import io.confluent.controlcenter.streams.TopicStoreMaster;
import io.confluent.monitoring.common.Clock;
import io.confluent.monitoring.record.Monitoring;
import io.confluent.serializers.UberSerde;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MonitoringHeartbeatSender
implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(MonitoringHeartbeatSender.class);
    private final long timeout;
    private final Monitoring.MonitoringMessage baseMonitoringMessage;
    private final String publishTopic;
    private final Set<TopicStoreMaster.Topic<Controlcenter.WindowedClusterGroup, Monitoring.MonitoringMessage, Void, Void>> internalPublishTopics;
    private final UberSerde<Monitoring.MonitoringMessage> monitoringMessageSerde;
    private final Clock clock;
    private KafkaProducer<byte[], byte[]> heartbeatProducer;

    public MonitoringHeartbeatSender(String publishTopic, Set<TopicStoreMaster.Topic<Controlcenter.WindowedClusterGroup, Monitoring.MonitoringMessage, Void, Void>> internalPublishTopics, Monitoring.MonitoringMessage baseMonitoringMessage, KafkaProducer<byte[], byte[]> heartbeatProducer, UberSerde<Monitoring.MonitoringMessage> monitoringMessageUberSerde, long timeout, Clock clock) {
        this.timeout = timeout;
        this.publishTopic = publishTopic;
        this.internalPublishTopics = internalPublishTopics;
        this.baseMonitoringMessage = baseMonitoringMessage;
        this.heartbeatProducer = heartbeatProducer;
        this.monitoringMessageSerde = monitoringMessageUberSerde;
        this.clock = clock;
    }

    int numberOfTopicPartitions(String topic) {
        return this.heartbeatProducer.partitionsFor(topic).size();
    }

    @Override
    public void run() {
        try {
            this.sendHeartbeat();
        }
        catch (Throwable t) {
            log.error("Terminating heartbeat thread", t);
        }
    }

    public void close() {
        try {
            this.heartbeatProducer.close();
        }
        catch (Exception e) {
            log.error("Failed to close monitoring heartbeat producer", (Throwable)e);
        }
        log.info("Closed monitoring heartbeat producer");
    }

    private void sendHeartbeat() {
        long now = this.clock.currentTimeMillis();
        Monitoring.MonitoringMessage heartbeatMsg = Monitoring.MonitoringMessage.newBuilder((Monitoring.MonitoringMessage)this.baseMonitoringMessage).setTimestamp(now).setWindow(now).build();
        try {
            ArrayList futures = Lists.newArrayList();
            this.sendHeartbeatToAllPartitions(this.publishTopic, null, heartbeatMsg, futures);
            for (TopicStoreMaster.Topic<Controlcenter.WindowedClusterGroup, Monitoring.MonitoringMessage, Void, Void> internalPublishTopic : this.internalPublishTopics) {
                this.sendHeartbeatToAllPartitions(internalPublishTopic.name, internalPublishTopic.keySerde.serialize((Object)Controlcenter.WindowedClusterGroup.newBuilder().setWindow(0L).build()), heartbeatMsg, futures);
            }
            long endTime = now + this.timeout;
            Iterator i = futures.iterator();
            while (endTime > this.clock.currentTimeMillis() && i.hasNext()) {
                Future f = (Future)i.next();
                f.get(endTime - this.clock.currentTimeMillis(), TimeUnit.MILLISECONDS);
            }
        }
        catch (Throwable t) {
            log.warn("Failed to publish monitoring heartbeat message", t);
        }
    }

    private void sendHeartbeatToAllPartitions(final String topic, byte[] key, Monitoring.MonitoringMessage heartbeatMsg, List<Future<RecordMetadata>> futures) {
        int partitions = this.heartbeatProducer.partitionsFor(topic).size();
        int p = 0;
        while (p < partitions) {
            final int partition = p++;
            futures.add(this.heartbeatProducer.send(new ProducerRecord(topic, Integer.valueOf(partition), (Object)key, (Object)this.monitoringMessageSerde.serialize((Object)heartbeatMsg)), new Callback(){

                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception != null) {
                        log.warn("Failed to publish monitoring heartbeat message to topic-partition [{}-{}]", new Object[]{topic, partition, exception});
                    }
                }
            }));
        }
    }
}

