/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.controlcenter.rest;

import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.Collections2;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.primitives.Longs;
import com.google.inject.Inject;
import com.google.inject.Provider;
import io.confluent.controlcenter.ControlCenterConfig;
import io.confluent.controlcenter.data.KafkaMetadataDao;
import io.confluent.controlcenter.rest.TimeseriesUtils;
import io.confluent.controlcenter.streams.TopicStoreModule;
import io.confluent.controlcenter.streams.aggregation.GroupingSets;
import io.confluent.controlcenter.streams.aggregation.MetricValues;
import io.confluent.controlcenter.streams.aggregation.MetricsAggregation;
import io.confluent.metrics.Statistics;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.validation.Constraint;
import javax.validation.ConstraintValidator;
import javax.validation.ConstraintValidatorContext;
import javax.validation.Payload;
import javax.validation.Valid;
import javax.ws.rs.BadRequestException;
import javax.ws.rs.BeanParam;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import org.apache.kafka.common.Node;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.joda.time.ReadableInstant;
import org.joda.time.ReadablePeriod;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Path(value="/2.0/metrics/{clusterId}")
@Produces(value={"application/json"})
public class MetricsResource {
    private static final String EMPTY_STRING = "";
    private static final String TRUE = Boolean.toString(true);
    private static final int MAX_RESULT_SIZE = 12000;
    private static final long FIVE_MINUTES_IN_MILLIS = TimeUnit.MINUTES.toMillis(5L);
    private static final long ONE_HOUR_IN_MILLIS = TimeUnit.HOURS.toMillis(1L);
    private static final double SKEWED_DISK_THRESHOLD = 0.1;
    private static final String DEFAULT_LATENCY_PERCENTILE = "99";
    protected static final int MAXTIME_MAX_SEARCH_WINDOW = 840;
    private static final Logger log = LoggerFactory.getLogger(MetricsResource.class);
    private static final Function<KeyValue<Long, MetricValues>, MetricValues> EXTRACT_METRICS = new Function<KeyValue<Long, MetricValues>, MetricValues>(){

        public MetricValues apply(KeyValue<Long, MetricValues> input) {
            return (MetricValues)input.value;
        }
    };
    private static final Function<MetricValues, Long> F_MAX = new Function<MetricValues, Long>(){

        public Long apply(MetricValues input) {
            return MetricsResource.safeMax(input);
        }
    };
    private static final Function<MetricValues, Long> F_MIN = new Function<MetricValues, Long>(){

        public Long apply(MetricValues input) {
            return MetricsResource.safeMin(input);
        }
    };
    private static final Function<MetricValues, Long> F_SUM = new Function<MetricValues, Long>(){

        public Long apply(MetricValues input) {
            return MetricsResource.safeSum(input);
        }
    };
    private static final Function<MetricValues, Long> F_MEAN = new Function<MetricValues, Long>(){

        public Long apply(MetricValues input) {
            return input != null ? Long.valueOf(input.sum() / input.count()) : null;
        }
    };
    private final KafkaMetadataDao kafkaMetadataDao;
    private final Provider<GroupingSets.PartitionedGroupingSets.GroupedWindowStore<MetricValues>> metricsStore;
    private final MetricsAggregation metricsAggregation;
    private final ControlCenterConfig controlCenterConfig;

    private static Long safeMin(MetricValues holder) {
        return holder != null ? Long.valueOf(holder.min()) : null;
    }

    private static Long safeMax(MetricValues holder) {
        return holder != null ? Long.valueOf(holder.max()) : null;
    }

    private static Long safeSum(MetricValues holder) {
        return holder != null ? Long.valueOf(holder.sum()) : null;
    }

    protected static String determineDistribution(Collection<BrokerSize> values, long minSkewBytes) {
        long[] longValues = Longs.toArray((Collection)Collections2.transform((Collection)Collections2.filter(values, (Predicate)Predicates.notNull()), (Function)new Function<BrokerSize, Long>(){

            public Long apply(BrokerSize input) {
                return input.getSegmentSize();
            }
        }));
        if (MetricsResource.maxDiff(longValues) <= minSkewBytes || Statistics.rmad((long[])longValues) <= 0.1) {
            return "even";
        }
        return "skewed";
    }

    protected static long maxDiff(long[] values) {
        if (values == null || values.length == 0) {
            return -1L;
        }
        long min = values[0];
        long max = values[0];
        for (int i = 1; i < values.length; ++i) {
            max = Math.max(max, values[i]);
            min = Math.min(min, values[i]);
        }
        return Math.abs(max - min);
    }

    protected static double average(Iterable<MetricValues> values) {
        long sum = 0L;
        long count = 0L;
        for (MetricValues v : values) {
            if (v == null) continue;
            sum += v.sum();
            count += v.count();
        }
        return count != 0L ? (double)sum / (double)count : 0.0;
    }

    protected static Double requestPoolUsage(MetricValues idleNanos, int brokerCount, long bucketNanos) {
        return idleNanos != null ? Double.valueOf(1.0 - Math.min(1.0, (double)idleNanos.sum() / (double)bucketNanos / (double)brokerCount)) : null;
    }

    @Inject
    public MetricsResource(KafkaMetadataDao kafkaMetadataDao, @TopicStoreModule.MetricsAggregateStore Provider<GroupingSets.PartitionedGroupingSets.GroupedWindowStore<MetricValues>> metricsStore, MetricsAggregation metricsAggregation, ControlCenterConfig controlCenterConfig) {
        this.kafkaMetadataDao = kafkaMetadataDao;
        this.metricsStore = metricsStore;
        this.metricsAggregation = metricsAggregation;
        this.controlCenterConfig = controlCenterConfig;
    }

    @Path(value="/broker/status")
    @GET
    public Map<String, Object> brokerStatus(final @PathParam(value="clusterId") String clusterId, @QueryParam(value="end") Long end) {
        Interval interval = MetricsResource.mostRecent(end, this.maxTime(clusterId, System.currentTimeMillis()));
        final long timestamp = interval.getEndMillis();
        Integer brokerCount = this.currentBrokerCount(interval, clusterId);
        Map<Integer, BrokerSize> brokerSize = this.brokerSizes(timestamp, clusterId);
        Iterable networkProcessorAvgIdlePercent = Iterables.transform(brokerSize.keySet(), (Function)new Function<Integer, MetricValues>(){

            public MetricValues apply(Integer input) {
                return MetricsResource.this.currentValue(timestamp, clusterId, Integer.toString(input), "NetworkProcessorAvgIdlePercent");
            }
        });
        double networkPoolUsage = MetricsResource.networkPoolUsage(networkProcessorAvgIdlePercent);
        Double requestHandlerAvgIdlePercent = brokerCount != null ? MetricsResource.requestPoolUsage(this.currentValue(timestamp, clusterId, "RequestHandlerAvgIdlePercent"), brokerCount, this.bucketNanos()) : null;
        MetricValues activeControllerCount = this.currentValue(timestamp, clusterId, "ActiveControllerCount");
        Long disconnects = MetricsResource.safeSum(this.currentValue(timestamp, clusterId, "ZooKeeperDisconnectsPerSec"));
        TopicPartitionStatus topicPartitionStatus = brokerCount != null ? this.topicPartitionStatus(timestamp, clusterId, brokerCount) : null;
        LinkedHashMap result = Maps.newLinkedHashMap();
        result.put("brokerCount", brokerCount);
        result.put("zooKeeperStatus", disconnects != null ? (disconnects == 0L ? "up" : "down") : null);
        result.put("activeControllers", activeControllerCount != null && brokerCount != null ? Long.valueOf(activeControllerCount.sum() * (long)brokerCount.intValue() / activeControllerCount.count()) : null);
        result.put("uncleanLeaderElectionCount", MetricsResource.safeSum(this.currentValue(timestamp, clusterId, "UncleanLeaderElectionsPerSec")));
        result.put("networkPoolUsage", networkPoolUsage);
        result.put("requestPoolUsage", requestHandlerAvgIdlePercent);
        if (topicPartitionStatus != null) {
            result.put("onlinePartitionCount", topicPartitionStatus.getOnlinePartitions());
            result.put("underReplicatedPartitionCount", topicPartitionStatus.getUnderReplicatedPartitions());
            result.put("offlinePartitionCount", topicPartitionStatus.getOfflinePartitions());
        }
        result.put("diskUsage", ImmutableList.copyOf((Iterable)Iterables.filter(brokerSize.values(), (Predicate)Predicates.notNull())));
        result.put("diskUsageDistribution", MetricsResource.determineDistribution(brokerSize.values(), this.controlCenterConfig.getLong("confluent.controlcenter.disk.skew.warning.min.bytes")));
        return result;
    }

    protected static double networkPoolUsage(Iterable<MetricValues> networkProcessorAvgIdlePercent) {
        if (!Iterables.isEmpty(networkProcessorAvgIdlePercent)) {
            return 1.0 - Math.min(MetricsResource.average(networkProcessorAvgIdlePercent) / 10000.0, 1.0);
        }
        return 0.0;
    }

    @Path(value="/broker/zookeeper")
    @GET
    public List<Map<String, Object>> zookeeperStatus(@PathParam(value="clusterId") String clusterId, @Valid @BeanParam RangeParam range) {
        Interval interval = MetricsResource.defaultInterval(range, this.maxTime(clusterId, System.currentTimeMillis()));
        List disconnects = this.timeSeries(interval, "ZooKeeperDisconnectsPerSec", MetricsAggregation.CLUSTER, clusterId);
        List expires = this.timeSeries(interval, "ZooKeeperExpiresPerSec", MetricsAggregation.CLUSTER, clusterId);
        List leaderElections = this.timeSeries(interval, "LeaderElectionRateAndTimeMs", MetricsAggregation.CLUSTER, clusterId);
        return TimeseriesUtils.mergeTimeseriesAsMap("disconnectsPerSec", TimeseriesUtils.map(disconnects, this.perSecond()), "expiresPerSec", TimeseriesUtils.map(expires, this.perSecond()), "leaderElections", TimeseriesUtils.map(leaderElections, F_SUM));
    }

    @Path(value="/maxtime")
    @GET
    public Map<String, Long> maxTime(@PathParam(value="clusterId") String clusterId) {
        long maxTime = this.maxTime(clusterId, System.currentTimeMillis());
        if (maxTime > 0L) {
            return ImmutableMap.of((Object)"timestamp", (Object)maxTime);
        }
        return ImmutableMap.of();
    }

    protected long maxTime(String clusterId, long end) {
        MetricValues timestamp = null;
        for (int hours = 1; timestamp == null && hours < 840; hours *= 2) {
            Interval searchInterval = new Interval((ReadablePeriod)Period.hours((int)hours), (ReadableInstant)new DateTime(end));
            List<MetricValues> timestamps = this.timeSeriesValues(searchInterval, "timestamp", MetricsAggregation.CLUSTER, clusterId);
            if (timestamps.size() > 1) {
                timestamp = timestamps.get(timestamps.size() - 2);
                continue;
            }
            if (timestamps.size() != 1) continue;
            timestamp = timestamps.get(0);
        }
        if (timestamp != null) {
            return timestamp.max();
        }
        return -1L;
    }

    @Path(value="/topic/status")
    @GET
    public Map<String, Object> topicStatus(@PathParam(value="clusterId") String clusterId, @QueryParam(value="end") Long end) {
        int topicCount = -1;
        try {
            topicCount = this.kafkaMetadataDao.getTopicNamesFromMetadataOrCache(clusterId).size();
        }
        catch (Exception e) {
            log.debug("unable to query metadata for cluster {}", (Object)clusterId, (Object)e);
        }
        Interval interval = MetricsResource.mostRecent(end, this.maxTime(clusterId, System.currentTimeMillis()));
        long endTime = interval.getEndMillis();
        Integer brokerCount = this.currentBrokerCount(interval, clusterId);
        TopicPartitionStatus topicPartitionStatus = brokerCount != null ? this.topicPartitionStatus(endTime, clusterId, brokerCount) : null;
        LinkedHashMap result = Maps.newLinkedHashMap();
        result.put("topicCount", topicCount >= 0 ? Integer.valueOf(topicCount) : null);
        if (topicPartitionStatus != null) {
            result.put("onlinePartitionCount", topicPartitionStatus.getOnlinePartitions());
            result.put("underReplicatedPartitionCount", topicPartitionStatus.getUnderReplicatedPartitions());
            result.put("offlinePartitionCount", topicPartitionStatus.getOfflinePartitions());
            PartitionReplicaStatus partitionReplicas = PartitionReplicaStatus.fromMetrics(topicPartitionStatus.getOnlinePartitions(), this.currentValue(endTime, clusterId, "UnderReplicated"), this.currentValue(endTime, clusterId, "InSyncReplicasCount"), this.currentValue(endTime, clusterId, "ReplicasCount"));
            if (partitionReplicas != null) {
                result.put("inSyncReplicas", partitionReplicas.getInSyncReplicas());
                result.put("outOfSyncReplicas", partitionReplicas.getOutOfSyncReplicas());
            }
        }
        return result;
    }

    @Path(value="/topic/replicas")
    @GET
    public List<Map<String, Object>> topicReplicas(@PathParam(value="clusterId") String clusterId, @Valid @BeanParam RangeParam range) {
        Interval interval = MetricsResource.defaultInterval(range, this.maxTime(clusterId, System.currentTimeMillis()));
        List<KeyValue<Long, MetricValues>> leaderCount = this.timeSeries(interval, "LeaderCount", MetricsAggregation.CLUSTER, clusterId);
        List<KeyValue<Long, MetricValues>> offline = this.timeSeries(interval, "OfflinePartitionsCount", MetricsAggregation.CLUSTER, clusterId);
        List<KeyValue<Long, MetricValues>> underReplicated = this.timeSeries(interval, "UnderReplicatedPartitions", MetricsAggregation.CLUSTER, clusterId);
        List<KeyValue<Long, MetricValues>> inSyncReplicas = this.timeSeries(interval, "InSyncReplicasCount", MetricsAggregation.CLUSTER, clusterId);
        List<KeyValue<Long, MetricValues>> replicasCount = this.timeSeries(interval, "ReplicasCount", MetricsAggregation.CLUSTER, clusterId);
        List<KeyValue<Long, MetricValues>> brokerCountSeries = this.brokerCountSeries(interval, clusterId);
        List partitionStatus = TimeseriesUtils.mergeTimeseries(ImmutableList.of(brokerCountSeries, leaderCount, underReplicated, offline, inSyncReplicas, replicasCount), new Function<List<MetricValues>, PartitionReplicaStatus>(){

            public PartitionReplicaStatus apply(List<MetricValues> input) {
                MetricValues brokerCount = input.get(0);
                if (brokerCount != null) {
                    MetricValues leaderCount = input.get(1);
                    MetricValues underReplicatedPartitions = input.get(2);
                    MetricValues offlinePartitions = input.get(3);
                    MetricValues inSyncReplicas = input.get(4);
                    MetricValues replicasCount = input.get(5);
                    TopicPartitionStatus topicPartitionStatus = TopicPartitionStatus.fromMetrics((int)brokerCount.max(), leaderCount, underReplicatedPartitions, offlinePartitions);
                    if (topicPartitionStatus != null) {
                        return PartitionReplicaStatus.fromMetrics(topicPartitionStatus.getOnlinePartitions(), underReplicatedPartitions, inSyncReplicas, replicasCount);
                    }
                }
                return null;
            }
        });
        return TimeseriesUtils.mergeTimeseriesAsMap("inSyncReplicas", TimeseriesUtils.map(partitionStatus, new Function<PartitionReplicaStatus, Integer>(){

            public Integer apply(PartitionReplicaStatus input) {
                return input != null ? Integer.valueOf(input.getInSyncReplicas()) : null;
            }
        }), "outOfSyncReplicas", TimeseriesUtils.map(partitionStatus, new Function<PartitionReplicaStatus, Integer>(){

            public Integer apply(PartitionReplicaStatus input) {
                return input != null ? Integer.valueOf(input.getOutOfSyncReplicas()) : null;
            }
        }));
    }

    @Path(value="/topic/detail")
    @GET
    public List<Map<String, Object>> topicDetail(@PathParam(value="clusterId") String clusterId, @Valid @BeanParam RangeParam range) {
        Object topics = ImmutableList.of();
        try {
            topics = this.kafkaMetadataDao.getTopicNamesFromMetadataOrCache(clusterId);
        }
        catch (Exception e) {
            log.debug("unable to query metadata for cluster {}", (Object)clusterId, (Object)e);
        }
        Interval interval = MetricsResource.mostRecent(range.end, this.maxTime(clusterId, System.currentTimeMillis()));
        final Map<String, List<Map<String, Object>>> res = this.topicDetailData(clusterId, (Iterable<String>)topics, interval);
        return Lists.transform((List)topics, (Function)new Function<String, Map<String, Object>>(){

            public Map<String, Object> apply(String topic) {
                Map<String, Object> map = TimeseriesUtils.flattenLast((List)res.get(topic));
                map.put("topic", topic);
                return map;
            }
        });
    }

    @Path(value="/topic/trend")
    @GET
    public List<Map<String, Object>> topicTrend(@PathParam(value="clusterId") String clusterId, @Valid @BeanParam RangeParam range) {
        Interval interval = MetricsResource.defaultInterval(range, this.maxTime(clusterId, System.currentTimeMillis()));
        Object topics = ImmutableList.of();
        try {
            topics = this.kafkaMetadataDao.getTopicNamesFromMetadataOrCache(clusterId);
        }
        catch (Exception e) {
            log.debug("unable to query metadata for cluster {}", (Object)clusterId, (Object)e);
        }
        final Map<String, List<Map<String, Object>>> res = this.topicDetailData(clusterId, (Iterable<String>)topics, interval);
        return FluentIterable.from(res.keySet()).transform((Function)new Function<String, Map<String, Object>>(){

            public Map<String, Object> apply(String input) {
                return ImmutableMap.of((Object)"topic", (Object)input, (Object)"value", res.get(input));
            }
        }).toList();
    }

    private Map<String, List<Map<String, Object>>> topicDetailData(final String clusterId, Iterable<String> topics, final Interval interval) {
        return FluentIterable.from(topics).toMap((Function)new Function<String, List<Map<String, Object>>>(){

            public List<Map<String, Object>> apply(String topic) {
                List bytesIn = MetricsResource.this.timeSeries(interval, "BytesInPerSec", (ImmutableList<String>)MetricsAggregation.CLUSTER_TOPIC, new String[]{clusterId, topic});
                List bytesOut = MetricsResource.this.timeSeries(interval, "BytesOutPerSec", (ImmutableList<String>)MetricsAggregation.CLUSTER_TOPIC, new String[]{clusterId, topic});
                List replicasCount = MetricsResource.this.timeSeries(interval, "ReplicasCount", (ImmutableList<String>)MetricsAggregation.CLUSTER_TOPIC, new String[]{clusterId, topic});
                List underReplicated = MetricsResource.this.timeSeries(interval, "UnderReplicated", (ImmutableList<String>)MetricsAggregation.CLUSTER_TOPIC, new String[]{clusterId, topic});
                List inSyncReplicasCount = MetricsResource.this.timeSeries(interval, "InSyncReplicasCount", (ImmutableList<String>)MetricsAggregation.CLUSTER_TOPIC, new String[]{clusterId, topic});
                LinkedHashMap ts = Maps.newLinkedHashMap();
                ts.put("bytesInPerSec", TimeseriesUtils.map(bytesIn, MetricsResource.this.longPerSecond()));
                ts.put("bytesOutPerSec", TimeseriesUtils.map(bytesOut, MetricsResource.this.longPerSecond()));
                List size = MetricsResource.this.timeSeries(interval, "Size", (ImmutableList<String>)MetricsAggregation.CLUSTER_TOPIC_LEADER, new String[]{clusterId, topic, TRUE});
                List numLogSegments = MetricsResource.this.timeSeries(interval, "NumLogSegments", (ImmutableList<String>)MetricsAggregation.CLUSTER_TOPIC_LEADER, new String[]{clusterId, topic, TRUE});
                List startOffset = MetricsResource.this.timeSeries(interval, "LogStartOffset", (ImmutableList<String>)MetricsAggregation.CLUSTER_TOPIC_LEADER, new String[]{clusterId, topic, TRUE});
                List endOffset = MetricsResource.this.timeSeries(interval, "LogEndOffset", (ImmutableList<String>)MetricsAggregation.CLUSTER_TOPIC_LEADER, new String[]{clusterId, topic, TRUE});
                ts.put("startOffset", TimeseriesUtils.map(startOffset, F_MIN));
                ts.put("endOffset", TimeseriesUtils.map(endOffset, F_MAX));
                List byTopicLeaderCount = MetricsResource.this.timeSeries(interval, "byTopicLeaderCount", (ImmutableList<String>)MetricsAggregation.CLUSTER_TOPIC, new String[]{clusterId, topic});
                List<KeyValue<Long, Long>> byTopicBrokerCount = TimeseriesUtils.map(MetricsResource.this.brokerCountSeries(interval, "byTopicLeaderCount", clusterId, (ImmutableList<String>)MetricsAggregation.CLUSTER_TOPIC, new String[]{clusterId, topic}), F_MAX);
                List<KeyValue<Long, Long>> partitionLeaderCount = TimeseriesUtils.meanTimesConstant(byTopicLeaderCount, byTopicBrokerCount);
                List<KeyValue<Long, Long>> segmentSize = TimeseriesUtils.meanTimesConstant(size, partitionLeaderCount);
                List<KeyValue<Long, Long>> segmentCount = TimeseriesUtils.meanTimesConstant(numLogSegments, partitionLeaderCount);
                List replicationFactor = TimeseriesUtils.map(replicasCount, F_MAX);
                List<KeyValue<Long, Long>> partitionReplicas = TimeseriesUtils.meanTimesConstant(replicasCount, partitionLeaderCount);
                ts.put("replicationFactor", replicationFactor);
                ts.put("replicas", partitionReplicas);
                List<KeyValue<Long, Long>> inSyncReplicas = TimeseriesUtils.meanTimesConstant(inSyncReplicasCount, partitionLeaderCount);
                ts.put("inSyncReplicas", inSyncReplicas);
                ts.put("outOfSyncReplicas", TimeseriesUtils.mergeTimeseries(ImmutableList.of(partitionReplicas, inSyncReplicas), new Function<List<Long>, Long>(){

                    public Long apply(List<Long> input) {
                        Long replicas = input.get(0);
                        Long inSyncReplicas = input.get(1);
                        return replicas != null && inSyncReplicas != null ? Long.valueOf(replicas - inSyncReplicas) : null;
                    }
                }));
                List<KeyValue<Long, Long>> underReplicatedPartitions = TimeseriesUtils.meanTimesConstant(underReplicated, partitionLeaderCount);
                ts.put("partitions", partitionLeaderCount);
                ts.put("segmentSize", segmentSize);
                ts.put("segmentCount", segmentCount);
                ts.put("underReplicatedPartitions", underReplicatedPartitions);
                return TimeseriesUtils.mergeTimeseriesAsMap(ts.keySet(), ts.values());
            }
        });
    }

    @Path(value="/broker/detail")
    @GET
    public List<Map<String, Object>> brokerDetail(@PathParam(value="clusterId") String clusterId, @Valid @BeanParam RangeParam range) {
        final HashMap rackId = Maps.newHashMap();
        try {
            Collection<Node> nodes = this.kafkaMetadataDao.getNodes(clusterId);
            if (nodes != null) {
                for (Node node : nodes) {
                    if (node.hasRack()) {
                        rackId.put(node.id(), node.rack());
                        continue;
                    }
                    rackId.put(node.id(), null);
                }
            }
        }
        catch (Exception e) {
            log.debug("unable to query metadata for cluster {}", (Object)clusterId, (Object)e);
        }
        Interval interval = MetricsResource.mostRecent(range.end, this.maxTime(clusterId, System.currentTimeMillis()));
        List<Integer> brokerIds = this.allBrokers(interval, clusterId);
        final Map<Integer, List<Map<String, Object>>> res = this.brokerDetailData(clusterId, brokerIds, interval);
        return Lists.transform(brokerIds, (Function)new Function<Integer, Map<String, Object>>(){

            public Map<String, Object> apply(Integer brokerId) {
                Map<String, Object> map = TimeseriesUtils.flattenLast((List)res.get(brokerId));
                map.put("brokerId", Integer.toString(brokerId));
                map.put("rackId", rackId.containsKey(brokerId) ? rackId.get(brokerId) : "Unknown");
                return map;
            }
        });
    }

    @Path(value="/broker/trend")
    @GET
    public List<Map<String, Object>> brokerTrend(@PathParam(value="clusterId") String clusterId, @Valid @BeanParam RangeParam range) {
        Interval interval = MetricsResource.defaultInterval(range, this.maxTime(clusterId, System.currentTimeMillis()));
        List<Integer> brokerIds = this.allBrokers(interval, clusterId);
        final Map<Integer, List<Map<String, Object>>> res = this.brokerDetailData(clusterId, brokerIds, interval);
        return FluentIterable.from(res.keySet()).transform((Function)new Function<Integer, Map<String, Object>>(){

            public Map<String, Object> apply(Integer input) {
                return ImmutableMap.of((Object)"brokerId", (Object)input, (Object)"value", res.get(input));
            }
        }).toList();
    }

    private Map<Integer, List<Map<String, Object>>> brokerDetailData(final String clusterId, List<Integer> brokerIds, final Interval interval) {
        final Function<MetricValues, Long> perSecond = this.longPerSecond();
        return FluentIterable.from(brokerIds).toMap((Function)new Function<Integer, List<Map<String, Object>>>(){

            public List<Map<String, Object>> apply(Integer nodeId) {
                String brokerId = nodeId.toString();
                List bytesIn = MetricsResource.this.timeSeries(interval, "BytesInPerSec", (ImmutableList<String>)MetricsAggregation.CLUSTER_TOPIC_BROKER, new String[]{clusterId, MetricsResource.EMPTY_STRING, brokerId});
                List bytesOut = MetricsResource.this.timeSeries(interval, "BytesOutPerSec", (ImmutableList<String>)MetricsAggregation.CLUSTER_TOPIC_BROKER, new String[]{clusterId, MetricsResource.EMPTY_STRING, brokerId});
                List partitionCount = MetricsResource.this.timeSeries(interval, "PartitionCount", (ImmutableList<String>)MetricsAggregation.CLUSTER_BROKER, new String[]{clusterId, brokerId});
                List size = MetricsResource.this.timeSeries(interval, "Size", (ImmutableList<String>)MetricsAggregation.CLUSTER_BROKER, new String[]{clusterId, brokerId});
                List<KeyValue<Long, Long>> segmentSize = TimeseriesUtils.map(MetricsResource.brokerSize(nodeId, partitionCount, size), new Function<BrokerSize, Long>(){

                    public Long apply(BrokerSize input) {
                        return input != null ? Long.valueOf(input.getSegmentSize()) : null;
                    }
                });
                LinkedHashMap ts = Maps.newLinkedHashMap();
                ts.put("bytesInPerSec", TimeseriesUtils.map(bytesIn, perSecond));
                ts.put("bytesOutPerSec", TimeseriesUtils.map(bytesOut, perSecond));
                ts.put("segmentSize", segmentSize);
                ts.put("partitionReplicas", TimeseriesUtils.map(partitionCount, F_MEAN));
                for (MetricsAggregation.Percentile p : MetricsAggregation.Percentile.values()) {
                    String latencyMetric = MetricsAggregation.latencyMetric("TotalTimeMs", p);
                    List latencyIn = MetricsResource.this.timeSeries(interval, latencyMetric, (ImmutableList<String>)MetricsAggregation.CLUSTER_BROKER_REQUEST, new String[]{clusterId, brokerId, "Produce"});
                    List latencyOut = MetricsResource.this.timeSeries(interval, latencyMetric, (ImmutableList<String>)MetricsAggregation.CLUSTER_BROKER_REQUEST, new String[]{clusterId, brokerId, "Fetch"});
                    ts.put("requestLatencyInP" + p.getDisplayValue(), TimeseriesUtils.map(latencyIn, F_MAX));
                    ts.put("requestLatencyOutP" + p.getDisplayValue(), TimeseriesUtils.map(latencyOut, F_MAX));
                }
                return TimeseriesUtils.mergeTimeseriesAsMap(ts.keySet(), ts.values());
            }
        });
    }

    private static List<KeyValue<Long, BrokerSize>> brokerSize(final Integer brokerId, List<KeyValue<Long, MetricValues>> partitionCount, List<KeyValue<Long, MetricValues>> size) {
        return TimeseriesUtils.mergeTimeseries(ImmutableList.of(partitionCount, size), new Function<List<MetricValues>, BrokerSize>(){

            public BrokerSize apply(List<MetricValues> input) {
                return BrokerSize.fromMetrics(brokerId, input.get(0), input.get(1));
            }
        });
    }

    @Path(value="/broker/request/latency")
    @GET
    public List<Map<String, Object>> requestLatency(final @PathParam(value="clusterId") String clusterId, @Valid @BeanParam RangeParam range, @DefaultValue(value="99") @QueryParam(value="p") int p) {
        MetricsAggregation.Percentile percentile;
        try {
            percentile = MetricsAggregation.Percentile.fromDisplayValue(p);
        }
        catch (IllegalArgumentException e) {
            throw new BadRequestException(e.getMessage());
        }
        final Interval interval = MetricsResource.defaultInterval(range, this.maxTime(clusterId, System.currentTimeMillis()));
        List<Integer> nodes = this.allBrokers(interval, clusterId);
        return FluentIterable.from(nodes).transform((Function)new Function<Integer, Map<String, Object>>(){

            public Map<String, Object> apply(Integer brokerId) {
                String idString = brokerId.toString();
                String latencyMetric = MetricsAggregation.latencyMetric("TotalTimeMs", percentile);
                List in = MetricsResource.this.timeSeries(interval, latencyMetric, (ImmutableList<String>)MetricsAggregation.CLUSTER_BROKER_REQUEST, new String[]{clusterId, idString, "Produce"});
                List out = MetricsResource.this.timeSeries(interval, latencyMetric, (ImmutableList<String>)MetricsAggregation.CLUSTER_BROKER_REQUEST, new String[]{clusterId, idString, "Fetch"});
                return ImmutableMap.of((Object)"brokerId", (Object)brokerId, (Object)"value", TimeseriesUtils.mergeTimeseriesAsMap("requestLatencyIn", TimeseriesUtils.map(in, F_MAX), "requestLatencyOut", TimeseriesUtils.map(out, F_MAX)));
            }
        }).toList();
    }

    @Path(value="/broker/{brokerId}/request/lifecycle")
    @GET
    public Map<String, Map<String, Object>> brokerRequestLifeCycle(@PathParam(value="clusterId") String clusterId, @PathParam(value="brokerId") String brokerId, @QueryParam(value="end") Long end) {
        Interval interval = MetricsResource.mostRecent(end, this.maxTime(clusterId, System.currentTimeMillis()));
        ImmutableMap metrics = ImmutableMap.builder().put((Object)"requestQueue", (Object)"RequestQueueTimeMs").put((Object)"requestLocal", (Object)"LocalTimeMs").put((Object)"responseRemote", (Object)"RemoteTimeMs").put((Object)"responseQueue", (Object)"ResponseQueueTimeMs").put((Object)"responseSend", (Object)"ResponseSendTimeMs").put((Object)"requestLatency", (Object)"TotalTimeMs").build();
        HashMap res = Maps.newHashMap();
        for (MetricsAggregation.Percentile percentile : MetricsAggregation.Percentile.values()) {
            LinkedHashMap latencies = Maps.newLinkedHashMap();
            for (Map.Entry entry : metrics.entrySet()) {
                String field = (String)entry.getKey();
                String latencyMetric = MetricsAggregation.latencyMetric((String)entry.getValue(), percentile);
                List in = this.timeSeries(interval, latencyMetric, MetricsAggregation.CLUSTER_BROKER_REQUEST, clusterId, brokerId, "Produce");
                List out = this.timeSeries(interval, latencyMetric, MetricsAggregation.CLUSTER_BROKER_REQUEST, clusterId, brokerId, "Fetch");
                latencies.put(field + "In", TimeseriesUtils.map(in, F_MAX));
                latencies.put(field + "Out", TimeseriesUtils.map(out, F_MAX));
            }
            res.put(Integer.toString(percentile.getDisplayValue()), TimeseriesUtils.flattenLast(TimeseriesUtils.mergeTimeseriesAsMap(latencies.keySet(), latencies.values())));
        }
        return res;
    }

    @Path(value="/broker/requests")
    @GET
    public List<Map<String, Object>> brokerRequestTotals(@PathParam(value="clusterId") String clusterId, @Valid @BeanParam RangeParam range) {
        return this.requestStats(range, MetricsAggregation.CLUSTER_TOPIC, clusterId, EMPTY_STRING);
    }

    @Path(value="/broker/{brokerId}/requests")
    @GET
    public List<Map<String, Object>> brokerRequests(@PathParam(value="clusterId") String clusterId, @PathParam(value="brokerId") String brokerId, @Valid @BeanParam RangeParam range) {
        return this.requestStats(range, MetricsAggregation.CLUSTER_TOPIC_BROKER, clusterId, EMPTY_STRING, brokerId);
    }

    @Path(value="/topic/{topic}/requests")
    @GET
    public List<Map<String, Object>> topicRequests(@PathParam(value="clusterId") String clusterId, @PathParam(value="topic") String topic, @Valid @BeanParam RangeParam range) {
        return this.requestStats(range, MetricsAggregation.CLUSTER_TOPIC, clusterId, topic);
    }

    @Path(value="/broker/request/pool")
    @GET
    public List<Map<String, Object>> requestPool(@PathParam(value="clusterId") String clusterId, @Valid @BeanParam RangeParam range) {
        Interval interval = MetricsResource.defaultInterval(range, this.maxTime(clusterId, System.currentTimeMillis()));
        final long bucketNanos = this.bucketNanos();
        List<KeyValue<Long, Number>> requestPoolUsage = TimeseriesUtils.mergeTimeseries(ImmutableList.of(this.timeSeries(interval, "RequestHandlerAvgIdlePercent", MetricsAggregation.CLUSTER, clusterId), this.brokerCountSeries(interval, clusterId)), new Function<List<MetricValues>, Number>(){

            public Number apply(List<MetricValues> values) {
                MetricValues idleNanos = values.get(0);
                MetricValues brokerCount = values.get(1);
                if (brokerCount != null) {
                    return MetricsResource.requestPoolUsage(idleNanos, (int)brokerCount.max(), bucketNanos);
                }
                return null;
            }
        });
        List<KeyValue<Long, Long>> queuedRequests = TimeseriesUtils.map(this.timeSeries(interval, "RequestQueueSize", MetricsAggregation.CLUSTER, clusterId), F_SUM);
        return TimeseriesUtils.mergeTimeseriesAsMap("requestPoolUsage", requestPoolUsage, "queuedRequests", queuedRequests);
    }

    @Path(value="/broker/count")
    @GET
    public List<Map<String, Object>> brokers(@PathParam(value="clusterId") String clusterId, @Valid @BeanParam RangeParam range) {
        Interval interval = MetricsResource.defaultInterval(range, this.maxTime(clusterId, System.currentTimeMillis()));
        return TimeseriesUtils.mergeTimeseriesAsMap("brokerCount", TimeseriesUtils.map(this.brokerCountSeries(interval, clusterId), F_MAX));
    }

    @Path(value="/broker/network/pool")
    @GET
    public List<Map<String, Object>> networkPool(@PathParam(value="clusterId") String clusterId, @Valid @BeanParam RangeParam range) {
        Interval interval = MetricsResource.defaultInterval(range, this.maxTime(clusterId, System.currentTimeMillis()));
        List<Integer> brokerIds = this.allBrokers(interval, clusterId);
        LinkedHashMap ts = Maps.newLinkedHashMap();
        for (Integer id : brokerIds) {
            List<KeyValue<Long, MetricValues>> networkPoolIdlePercent = this.timeSeries(interval, "NetworkProcessorAvgIdlePercent", MetricsAggregation.CLUSTER_BROKER, clusterId, Integer.toString(id));
            if (networkPoolIdlePercent == null || networkPoolIdlePercent.isEmpty()) continue;
            ts.put(id, networkPoolIdlePercent);
        }
        List merged = TimeseriesUtils.mergeTimeseries(ts.keySet(), ts.values());
        List<KeyValue<Long, Double>> networkPoolUsage = TimeseriesUtils.map(merged, new Function<Map<Integer, MetricValues>, Double>(){

            public Double apply(Map<Integer, MetricValues> input) {
                return input != null ? Double.valueOf(MetricsResource.networkPoolUsage(input.values())) : null;
            }
        });
        return TimeseriesUtils.mergeTimeseriesAsMap("networkPoolUsage", networkPoolUsage);
    }

    @Path(value="/broker/activecontroller")
    @GET
    public List<Map<String, Object>> activeController(@PathParam(value="clusterId") String clusterId, @Valid @BeanParam RangeParam range) {
        Interval interval = MetricsResource.defaultInterval(range, this.maxTime(clusterId, System.currentTimeMillis()));
        LinkedHashMap<Integer, List<KeyValue<Long, MetricValues>>> activeControllers = this.brokerMetric(interval, "ActiveControllerCount", clusterId);
        final ArrayList brokerIds = Lists.newArrayList(activeControllers.keySet());
        List<KeyValue<Long, List<Integer>>> activeBrokerIds = TimeseriesUtils.mergeTimeseries(activeControllers.values(), new Function<List<MetricValues>, List<Integer>>(){

            public List<Integer> apply(List<MetricValues> input) {
                ArrayList activeBrokerIds = Lists.newArrayList();
                for (int i = 0; i < input.size(); ++i) {
                    MetricValues active = input.get(i);
                    if (active == null || active.max() <= 0L) continue;
                    activeBrokerIds.add(brokerIds.get(i));
                }
                return activeBrokerIds;
            }
        });
        return TimeseriesUtils.mergeTimeseriesAsMap("activeControllers", activeBrokerIds);
    }

    @Path(value="/broker/controller")
    @GET
    public List<Map<String, Object>> controller(@PathParam(value="clusterId") String clusterId, @Valid @BeanParam RangeParam range) {
        Interval interval = MetricsResource.defaultInterval(range, this.maxTime(clusterId, System.currentTimeMillis()));
        List<KeyValue<Long, Long>> brokerCountSeries = TimeseriesUtils.map(this.brokerCountSeries(interval, clusterId), F_MAX);
        List<KeyValue<Long, MetricValues>> activeControllers = this.timeSeries(interval, "ActiveControllerCount", MetricsAggregation.CLUSTER, clusterId);
        List uncleanElections = this.timeSeries(interval, "UncleanLeaderElectionsPerSec", MetricsAggregation.CLUSTER, clusterId);
        return TimeseriesUtils.mergeTimeseriesAsMap("activeControllerCount", TimeseriesUtils.meanTimesConstant(activeControllers, brokerCountSeries), "uncleanLeaderElectionCount", TimeseriesUtils.map(uncleanElections, F_SUM));
    }

    @Path(value="/broker/partitions")
    @GET
    public List<Map<String, Object>> partitionStats(@PathParam(value="clusterId") String clusterId, @Valid @BeanParam RangeParam range) {
        Interval interval = MetricsResource.defaultInterval(range, this.maxTime(clusterId, System.currentTimeMillis()));
        List<KeyValue<Long, MetricValues>> leaderCount = this.timeSeries(interval, "LeaderCount", MetricsAggregation.CLUSTER, clusterId);
        List<KeyValue<Long, MetricValues>> offline = this.timeSeries(interval, "OfflinePartitionsCount", MetricsAggregation.CLUSTER, clusterId);
        List<KeyValue<Long, MetricValues>> underReplicated = this.timeSeries(interval, "UnderReplicatedPartitions", MetricsAggregation.CLUSTER, clusterId);
        List<KeyValue<Long, MetricValues>> brokerCountSeries = this.brokerCountSeries(interval, clusterId);
        List partitionStatus = TimeseriesUtils.mergeTimeseries(ImmutableList.of(brokerCountSeries, leaderCount, underReplicated, offline), new Function<List<MetricValues>, TopicPartitionStatus>(){

            public TopicPartitionStatus apply(List<MetricValues> input) {
                MetricValues brokerCount = input.get(0);
                if (brokerCount != null) {
                    MetricValues leaderCount = input.get(1);
                    MetricValues underReplicatedPartitions = input.get(2);
                    MetricValues offlinePartitions = input.get(3);
                    return TopicPartitionStatus.fromMetrics((int)brokerCount.max(), leaderCount, underReplicatedPartitions, offlinePartitions);
                }
                return null;
            }
        });
        return TimeseriesUtils.mergeTimeseriesAsMap("onlinePartitions", TimeseriesUtils.map(partitionStatus, new Function<TopicPartitionStatus, Integer>(){

            public Integer apply(TopicPartitionStatus input) {
                return input != null ? Integer.valueOf(input.getOnlinePartitions()) : null;
            }
        }), "underReplicatedPartitions", TimeseriesUtils.map(partitionStatus, new Function<TopicPartitionStatus, Integer>(){

            public Integer apply(TopicPartitionStatus input) {
                return input != null ? Integer.valueOf(input.getUnderReplicatedPartitions()) : null;
            }
        }), "offlinePartitions", TimeseriesUtils.map(partitionStatus, new Function<TopicPartitionStatus, Integer>(){

            public Integer apply(TopicPartitionStatus input) {
                return input != null ? Integer.valueOf(input.getOfflinePartitions()) : null;
            }
        }));
    }

    private List<Map<String, Object>> requestStats(RangeParam range, ImmutableList<String> aggs, String ... fields) {
        String clusterId = fields[0];
        Interval interval = MetricsResource.defaultInterval(range, this.maxTime(clusterId, System.currentTimeMillis()));
        List in = this.timeSeries(interval, "TotalProduceRequestsPerSec", aggs, fields);
        List out = this.timeSeries(interval, "TotalFetchRequestsPerSec", aggs, fields);
        List failedIn = this.timeSeries(interval, "FailedProduceRequestsPerSec", aggs, fields);
        List failedOut = this.timeSeries(interval, "FailedFetchRequestsPerSec", aggs, fields);
        List bytesIn = this.timeSeries(interval, "BytesInPerSec", aggs, fields);
        List bytesOut = this.timeSeries(interval, "BytesOutPerSec", aggs, fields);
        Function<MetricValues, Long> perSecond = this.longPerSecond();
        return TimeseriesUtils.mergeTimeseriesAsMap("requestsIn", TimeseriesUtils.map(in, F_SUM), "requestsOut", TimeseriesUtils.map(out, F_SUM), "requestsFailedIn", TimeseriesUtils.map(failedIn, F_SUM), "requestsFailedOut", TimeseriesUtils.map(failedOut, F_SUM), "bytesInPerSec", TimeseriesUtils.map(bytesIn, perSecond), "bytesOutPerSec", TimeseriesUtils.map(bytesOut, perSecond));
    }

    private long bucketNanos() {
        MetricsResource metricsResource = this;
        return TimeUnit.MILLISECONDS.toNanos(metricsResource.metricsAggregation.rollup().size());
    }

    private Function<MetricValues, Double> perSecond() {
        final long bucketSize = MetricsAggregation.metricsWindowSizeInSeconds();
        return new Function<MetricValues, Double>(){

            public Double apply(MetricValues input) {
                return input != null ? Double.valueOf((double)input.sum() / (double)bucketSize) : null;
            }
        };
    }

    private Function<MetricValues, Long> longPerSecond() {
        final long bucketSize = MetricsAggregation.metricsWindowSizeInSeconds();
        return new Function<MetricValues, Long>(){

            public Long apply(MetricValues input) {
                return input != null ? Long.valueOf(input.sum() / bucketSize) : null;
            }
        };
    }

    private TopicPartitionStatus topicPartitionStatus(long timestamp, String clusterId, int brokerCount) {
        return TopicPartitionStatus.fromMetrics(brokerCount, this.currentValue(timestamp, clusterId, "LeaderCount"), this.currentValue(timestamp, clusterId, "UnderReplicatedPartitions"), this.currentValue(timestamp, clusterId, "OfflinePartitionsCount"));
    }

    private List<Integer> allBrokers(Interval interval, String clusterId) {
        LinkedHashMap<Integer, List<KeyValue<Long, MetricValues>>> brokerRange = this.brokerMetric(interval, "timestamp", clusterId);
        return Ordering.natural().sortedCopy(brokerRange.keySet());
    }

    private Integer currentBrokerCount(Interval interval, String clusterId) {
        return (Integer)Iterables.getLast((Iterable)Iterables.transform(this.brokerCountSeries(interval, clusterId), (Function)new Function<KeyValue<Long, MetricValues>, Integer>(){

            public Integer apply(KeyValue<Long, MetricValues> input) {
                return (int)((MetricValues)input.value).max();
            }
        }), null);
    }

    private List<KeyValue<Long, MetricValues>> brokerCountSeries(Interval interval, String clusterId) {
        return this.brokerCountSeries(interval, "timestamp", clusterId, MetricsAggregation.CLUSTER, clusterId);
    }

    private List<KeyValue<Long, MetricValues>> brokerCountSeries(Interval interval, String metric, String clusterId, ImmutableList<String> metricFields, String ... metricValues) {
        LinkedHashMap<Integer, List<KeyValue<Long, MetricValues>>> ts = this.brokerMetric(interval, metric, clusterId, metricFields, metricValues);
        List merged = TimeseriesUtils.mergeTimeseries(ts.keySet(), ts.values());
        return TimeseriesUtils.map(merged, new Function<Map<Integer, MetricValues>, MetricValues>(){

            public MetricValues apply(Map<Integer, MetricValues> idToTimestamp) {
                int brokerCount = Iterables.size((Iterable)Iterables.filter(idToTimestamp.values(), (Predicate)new Predicate<MetricValues>(){

                    public boolean apply(MetricValues input) {
                        return input != null && input.count() > 0L;
                    }
                }));
                return TimeseriesUtils.singleMetricValue(brokerCount);
            }
        });
    }

    private Map<Integer, BrokerSize> brokerSizes(long timestamp, @PathParam(value="clusterId") String clusterId) {
        Interval interval = MetricsResource.mostRecent(timestamp);
        LinkedHashMap<Integer, List<KeyValue<Long, MetricValues>>> partitionCount = this.brokerMetric(interval, "PartitionCount", clusterId);
        LinkedHashMap<Integer, List<KeyValue<Long, MetricValues>>> size = this.brokerMetric(interval, "Size", clusterId);
        LinkedHashMap brokerSize = Maps.newLinkedHashMap();
        for (Map.Entry<Integer, List<KeyValue<Long, MetricValues>>> entry : partitionCount.entrySet()) {
            Integer brokerId = entry.getKey();
            if (!size.containsKey(brokerId)) continue;
            brokerSize.put(brokerId, MetricsResource.brokerSize(brokerId, entry.getValue(), size.get(brokerId)));
        }
        KeyValue last = (KeyValue)Iterables.getLast(TimeseriesUtils.mergeTimeseries(brokerSize.keySet(), brokerSize.values()), null);
        return last != null ? (Map)last.value : ImmutableMap.of();
    }

    private LinkedHashMap<Integer, List<KeyValue<Long, MetricValues>>> brokerMetric(Interval interval, String metric, String clusterId) {
        return this.brokerMetric(interval, metric, clusterId, MetricsAggregation.CLUSTER, clusterId);
    }

    private LinkedHashMap<Integer, List<KeyValue<Long, MetricValues>>> brokerMetric(Interval interval, String metric, String clusterId, ImmutableList<String> metricFields, String ... metricValues) {
        LinkedHashMap ts = Maps.newLinkedHashMap();
        List<Integer> brokerIds = null;
        try {
            brokerIds = this.kafkaMetadataDao.getBrokerIdsFromCacheOrMetadata(clusterId);
        }
        catch (Exception e) {
            log.debug("unable to query metadata for cluster {}", (Object)clusterId, (Object)e);
        }
        if (brokerIds == null) {
            return ts;
        }
        for (int id : brokerIds) {
            String[] values = new String[metricFields.size() + 1];
            System.arraycopy(metricValues, 0, values, 0, metricValues.length);
            values[values.length - 1] = Integer.toString(id);
            List<KeyValue<Long, MetricValues>> timestamps = this.timeSeries(interval, metric, (ImmutableList<String>)ImmutableList.copyOf((Iterable)Iterables.concat(metricFields, (Iterable)ImmutableList.of((Object)"broker"))), values);
            if (timestamps.isEmpty()) continue;
            ts.put(id, timestamps);
        }
        return ts;
    }

    protected static Interval mostRecent(long end) {
        return MetricsResource.mostRecent(end, end);
    }

    protected static Interval mostRecent(Long end, long maxTime) {
        long endTime = end != null ? Math.min(end, maxTime) : maxTime;
        long startTime = endTime - FIVE_MINUTES_IN_MILLIS;
        return new Interval(startTime, endTime);
    }

    protected static Interval defaultInterval(RangeParam range, long maxTime) {
        long endTime = range.end != null ? range.end : maxTime;
        long startTime = range.start != null ? Math.min(range.start, endTime - 1L) : endTime - ONE_HOUR_IN_MILLIS;
        return new Interval(startTime, endTime);
    }

    private MetricValues currentValue(long timestamp, String clusterId, String broker, String metric) {
        return this.currentValue(timestamp, metric, MetricsAggregation.CLUSTER_BROKER, clusterId, broker);
    }

    private MetricValues currentValue(long timestamp, String clusterId, String metric) {
        return this.currentValue(timestamp, metric, MetricsAggregation.CLUSTER, clusterId);
    }

    private MetricValues currentValue(long timestamp, String metric, ImmutableList<String> fields, String ... values) {
        Interval interval = MetricsResource.mostRecent(timestamp);
        return this.currentValue(interval, metric, fields, values);
    }

    private MetricValues currentValue(Interval interval, String metric, ImmutableList<String> fields, String ... values) {
        return (MetricValues)Iterables.getLast(this.timeSeriesValues(interval, metric, fields, values), null);
    }

    protected List<MetricValues> timeSeriesValues(Interval interval, String metric, ImmutableList<String> fields, String ... values) {
        return this.timeSeries(interval, metric, EXTRACT_METRICS, fields, values);
    }

    private List<KeyValue<Long, MetricValues>> timeSeries(Interval interval, String metric, ImmutableList<String> fields, String ... values) {
        try (WindowStoreIterator iterator = ((GroupingSets.PartitionedGroupingSets.GroupedWindowStore)this.metricsStore.get()).forGroupingSet(metric, (Collection<String>)fields).inRange(interval.getStartMillis(), interval.getEndMillis()).matching(values);){
            ImmutableList immutableList = ImmutableList.copyOf(TimeseriesUtils.nullToEmpty(Iterators.limit(iterator, (int)12000)));
            return immutableList;
        }
    }

    private <T> List<T> timeSeries(Interval interval, String metric, Function<KeyValue<Long, MetricValues>, ? extends T> f, ImmutableList<String> fields, String ... values) {
        try (WindowStoreIterator iterator = ((GroupingSets.PartitionedGroupingSets.GroupedWindowStore)this.metricsStore.get()).forGroupingSet(metric, (Collection<String>)fields).inRange(interval.getStartMillis(), interval.getEndMillis()).matching(values);){
            ImmutableList immutableList = ImmutableList.copyOf((Iterator)Iterators.transform(TimeseriesUtils.nullToEmpty(Iterators.limit(iterator, (int)12000)), f));
            return immutableList;
        }
    }

    @ValidRange(message="start must be earlier than end")
    static class RangeParam {
        final Long start;
        final Long end;

        public RangeParam(@QueryParam(value="start") Long start, @QueryParam(value="end") Long end) {
            this.start = start;
            this.end = end;
        }
    }

    @Target(value={ElementType.METHOD, ElementType.FIELD, ElementType.PARAMETER, ElementType.TYPE})
    @Retention(value=RetentionPolicy.RUNTIME)
    @Constraint(validatedBy={TimeRangeValidator.class})
    static @interface ValidRange {
        public String message() default "{io.confluent.controlcenter.rest.MetricsResource.ValidRange}";

        public Class<?>[] groups() default {};

        public Class<? extends Payload>[] payload() default {};
    }

    static class TimeRangeValidator
    implements ConstraintValidator<ValidRange, RangeParam> {
        TimeRangeValidator() {
        }

        public void initialize(ValidRange validRange) {
        }

        public boolean isValid(RangeParam rangeParam, ConstraintValidatorContext constraintValidatorContext) {
            return rangeParam.start == null || rangeParam.end == null || rangeParam.start <= rangeParam.end;
        }
    }

    public static class BrokerSize {
        private final int brokerId;
        private final long segmentSize;

        protected BrokerSize(int brokerId, long segmentSize) {
            this.brokerId = brokerId;
            this.segmentSize = segmentSize;
        }

        public static BrokerSize fromMetrics(int brokerId, MetricValues partitionCount, MetricValues size) {
            if (partitionCount == null || size == null) {
                return null;
            }
            long numPartitions = (Long)F_MEAN.apply((Object)partitionCount);
            long meanPartitionSize = (Long)F_MEAN.apply((Object)size);
            return new BrokerSize(brokerId, meanPartitionSize * numPartitions);
        }

        public int getBrokerId() {
            return this.brokerId;
        }

        public long getSegmentSize() {
            return this.segmentSize;
        }
    }

    protected static class TopicPartitionStatus {
        private final int onlinePartitions;
        private final int underReplicatedPartitions;
        private final int offlinePartitions;

        public static TopicPartitionStatus fromMetrics(Integer brokerCount, MetricValues leaderCount, MetricValues underReplicatedPartitions, MetricValues offlinePartitions) {
            if (brokerCount == null || leaderCount == null || underReplicatedPartitions == null || offlinePartitions == null) {
                return null;
            }
            return new TopicPartitionStatus((int)(leaderCount.sum() * (long)brokerCount.intValue() / leaderCount.count()), (int)(underReplicatedPartitions.sum() * (long)brokerCount.intValue() / underReplicatedPartitions.count()), (int)offlinePartitions.max());
        }

        private TopicPartitionStatus(int onlinePartitions, int underReplicatedPartitions, int offlinePartitions) {
            this.onlinePartitions = onlinePartitions;
            this.underReplicatedPartitions = underReplicatedPartitions;
            this.offlinePartitions = offlinePartitions;
        }

        public int getOnlinePartitions() {
            return this.onlinePartitions;
        }

        public int getUnderReplicatedPartitions() {
            return this.underReplicatedPartitions;
        }

        public int getOfflinePartitions() {
            return this.offlinePartitions;
        }
    }

    protected static class PartitionReplicaStatus {
        private final int partitionReplicas;
        private final int inSyncReplicas;
        private final int underReplicated;

        private PartitionReplicaStatus(int partitionReplicas, int inSyncReplicas, int underReplicated) {
            this.partitionReplicas = partitionReplicas;
            this.inSyncReplicas = inSyncReplicas;
            this.underReplicated = underReplicated;
        }

        public static PartitionReplicaStatus fromMetrics(Integer onlinePartitions, MetricValues underReplicated, MetricValues inSyncReplicas, MetricValues replicasCount) {
            if (onlinePartitions == null || underReplicated == null || inSyncReplicas == null || replicasCount == null) {
                return null;
            }
            return new PartitionReplicaStatus((int)(replicasCount.sum() * (long)onlinePartitions.intValue() / replicasCount.count()), (int)(inSyncReplicas.sum() * (long)onlinePartitions.intValue() / inSyncReplicas.count()), (int)(underReplicated.sum() * (long)onlinePartitions.intValue() / underReplicated.count()));
        }

        public int getPartitionReplicas() {
            return this.partitionReplicas;
        }

        public int getInSyncReplicas() {
            return this.inSyncReplicas;
        }

        public int getOutOfSyncReplicas() {
            return this.partitionReplicas - this.inSyncReplicas;
        }

        public int getUnderReplicated() {
            return this.underReplicated;
        }
    }
}

