/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.controlcenter.streams;

import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import com.google.inject.Provides;
import io.confluent.controlcenter.ControlCenterConfig;
import io.confluent.controlcenter.streams.ContextAwareTransformer;
import io.confluent.monitoring.record.Monitoring;
import java.util.ArrayList;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BroadcastHeartbeatTransformerSupplier<K>
implements TransformerSupplier<K, Monitoring.MonitoringMessage, KeyValue<K, Iterable<KeyValue<K, Monitoring.MonitoringMessage>>>> {
    private static final Logger log = LoggerFactory.getLogger(BroadcastHeartbeatTransformerSupplier.class);
    private final ControlCenterConfig cfg;

    @Inject
    public BroadcastHeartbeatTransformerSupplier(ControlCenterConfig cfg) {
        this.cfg = cfg;
    }

    @Provides
    public BroadcastHeartbeatTransformer get() {
        return new BroadcastHeartbeatTransformer(this.cfg.getInt("confluent.controlcenter.internal.topics.partitions"));
    }

    public class BroadcastHeartbeatTransformer
    extends ContextAwareTransformer<K, Monitoring.MonitoringMessage, KeyValue<K, Iterable<KeyValue<K, Monitoring.MonitoringMessage>>>> {
        private final int numInternalPartitions;

        public BroadcastHeartbeatTransformer(int numInternalPartitions) {
            this.numInternalPartitions = numInternalPartitions;
        }

        public KeyValue<K, Iterable<KeyValue<K, Monitoring.MonitoringMessage>>> transform(K key, Monitoring.MonitoringMessage monitoringMessage) {
            if (monitoringMessage.getClientType() == Monitoring.ClientType.CONTROLCENTER) {
                if (log.isTraceEnabled()) {
                    log.trace("Broadcasting C3 heartbeat from monitoring topic partition={} to all (total={}) partitions of re-keyed monitoring topic", (Object)this.context.partition(), (Object)this.numInternalPartitions);
                }
                ArrayList<KeyValue> broadcastMessages = new ArrayList<KeyValue>(this.numInternalPartitions);
                for (int partition = 0; partition < this.numInternalPartitions; ++partition) {
                    Monitoring.MonitoringMessage msg = Monitoring.MonitoringMessage.newBuilder((Monitoring.MonitoringMessage)monitoringMessage).setPartition(partition).build();
                    broadcastMessages.add(KeyValue.pair(key, (Object)msg));
                }
                return KeyValue.pair(null, broadcastMessages);
            }
            return KeyValue.pair(null, (Object)ImmutableList.of((Object)KeyValue.pair(key, (Object)monitoringMessage)));
        }
    }
}

