/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.controlcenter.rest;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.google.inject.Provider;
import io.confluent.controlcenter.ControlCenterRbacConfig;
import io.confluent.controlcenter.Rollup;
import io.confluent.controlcenter.data.ClusterMetadataDao;
import io.confluent.controlcenter.data.ScopedKafkaMetadataDao;
import io.confluent.controlcenter.keys.Keys;
import io.confluent.controlcenter.record.Controlcenter;
import io.confluent.controlcenter.rest.Metric;
import io.confluent.controlcenter.rest.RequestType;
import io.confluent.controlcenter.rest.Util;
import io.confluent.controlcenter.rest.VisibleCluster;
import io.confluent.controlcenter.rest.req.RangeRequest;
import io.confluent.controlcenter.rest.res.DeliveryResponse;
import io.confluent.controlcenter.serialization.OrderedKeyPrefixedSerdeSupplier;
import io.confluent.controlcenter.streams.TopicStoreModule;
import io.confluent.monitoring.record.Monitoring;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import javax.ws.rs.BadRequestException;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.ForbiddenException;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.state.ReadOnlyWindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Path(value="/2.0/monitoring/{clusterId}")
@Produces(value={"application/json"})
public class MessageDeliveryResource {
    private static final Logger log = LoggerFactory.getLogger(MessageDeliveryResource.class);
    private final OrderedKeyPrefixedSerdeSupplier<Keys.KeyType, Monitoring.MonitoringMessage> keySerdeSupplier;
    private final ControlCenterRbacConfig rbacConfig;
    private Provider<Map<Rollup, ReadOnlyWindowStore<Bytes, Controlcenter.WindowedGrouping>>> groupDb;
    private Provider<Map<Rollup, ReadOnlyWindowStore<Bytes, Monitoring.MonitoringMessage>>> expectedDb;
    private Provider<Map<Rollup, ReadOnlyWindowStore<Bytes, Monitoring.MonitoringMessage>>> actualDb;
    @Context
    private ScopedKafkaMetadataDao scopedKafkaMetadataDao;

    private static String validateClusterId(String clusterId) {
        if (clusterId == null) {
            throw new BadRequestException("missing clusterId");
        }
        return ClusterMetadataDao.getInternalKafkaId(clusterId);
    }

    @Inject
    public MessageDeliveryResource(@TopicStoreModule.GroupStore Provider<Map<Rollup, ReadOnlyWindowStore<Bytes, Controlcenter.WindowedGrouping>>> groupDb, @TopicStoreModule.MonitoringMessageAggregatorWindowsStore Provider<Map<Rollup, ReadOnlyWindowStore<Bytes, Monitoring.MonitoringMessage>>> expectedDb, @TopicStoreModule.MonitoringStreamStore Provider<Map<Rollup, ReadOnlyWindowStore<Bytes, Monitoring.MonitoringMessage>>> actualDb, OrderedKeyPrefixedSerdeSupplier<Keys.KeyType, Monitoring.MonitoringMessage> keySerdeSupplier, ControlCenterRbacConfig rbacConfig) {
        this.groupDb = groupDb;
        this.expectedDb = expectedDb;
        this.actualDb = actualDb;
        this.keySerdeSupplier = keySerdeSupplier;
        this.rbacConfig = rbacConfig;
    }

    protected void checkParams(int limit, Rollup rollup, Long startTimeMs, Long stopTimeMs) throws BadRequestException {
        if (limit > 6000) {
            throw new BadRequestException("limit must be < 6000");
        }
        if (startTimeMs == null) {
            throw new BadRequestException("missing startTimeMs");
        }
        if (stopTimeMs == null) {
            throw new BadRequestException("missing stopTimeMs");
        }
        if (startTimeMs > stopTimeMs) {
            throw new BadRequestException("startTimeMs cannot be later than stopTimeMs");
        }
        if (rollup == null) {
            throw new BadRequestException("missing rollup");
        }
    }

    private Controlcenter.WindowedGrouping getAggregateGroupInfo(RangeRequest rr, int limit, Keys.KeyType keyType) {
        try (WindowStoreIterator groupInfos = ((ReadOnlyWindowStore)((Map)this.groupDb.get()).get((Object)rr.rollup)).fetch((Object)this.keySerdeSupplier.get(keyType).key((Object)rr.createStartKeySpec()), rr.getRollupAlignedStartTimeMs(), rr.getRollupAlignedStopTimeMs());){
            Controlcenter.WindowedGrouping aggregate;
            HashSet memberInfos = Sets.newHashSet();
            for (int count = 0; groupInfos.hasNext() && count < limit; ++count) {
                Controlcenter.WindowedGrouping wg = (Controlcenter.WindowedGrouping)((KeyValue)groupInfos.next()).value;
                memberInfos.addAll(wg.getMembersList());
            }
            Controlcenter.WindowedGrouping windowedGrouping = aggregate = Controlcenter.WindowedGrouping.newBuilder().addAllMembers((Iterable)memberInfos).setWindow(0L).build();
            return windowedGrouping;
        }
    }

    private Set<String> getMetricsData(RangeRequest rr, int limit, Keys.KeyType keyType, Function<Controlcenter.MemberInfo, String> getData) {
        RangeRequest listMembersRr = new RangeRequest(rr);
        listMembersRr.clientType = Monitoring.ClientType.CONSUMER;
        if (listMembersRr.rollup == Rollup.ONE_MINUTE) {
            listMembersRr.rollup = Rollup.THREE_HOURS;
        }
        return FluentIterable.from((Iterable)this.getAggregateGroupInfo(listMembersRr, limit, keyType).getMembersList()).transform(getData::apply).toSet();
    }

    private Set<String> getMetricsConsumerGroups(RangeRequest rr, int limit) {
        return this.getMetricsData(rr, limit, Keys.KeyType.INFO_GROUPS_IN_CLIENTTYPE, Controlcenter.MemberInfo::getGroup);
    }

    private Set<String> getMetricsTopics(RangeRequest rr, int limit) {
        return this.getMetricsData(rr, limit, Keys.KeyType.INFO_TOPICS_IN_CLIENTTYPE, Controlcenter.MemberInfo::getTopic);
    }

    private Set<String> getConsumerGroups(String clusterId) throws InterruptedException, ExecutionException, TimeoutException {
        return ImmutableSet.copyOf(this.scopedKafkaMetadataDao.getConsumerGroups(clusterId));
    }

    private Set<String> getTopics(String clusterId) throws InterruptedException, ExecutionException, TimeoutException {
        return ImmutableSet.copyOf(this.scopedKafkaMetadataDao.getTopicNamesFromMetadataOrCache(clusterId));
    }

    private void verifyAccessToAllConsumerGroupsInTopic(RangeRequest rr, int limit) throws InterruptedException, ExecutionException, TimeoutException {
        Preconditions.checkArgument((rr.topic != null ? 1 : 0) != 0);
        Set<String> metricsConsumerGroups = this.getMetricsConsumerGroups(rr, limit);
        Set<String> userAccessibleConsumerGroups = this.getConsumerGroups(rr.clusterId);
        if (!userAccessibleConsumerGroups.containsAll(metricsConsumerGroups)) {
            throw new ForbiddenException("cannot show topic overview data unless user has access to all intercepted consumer groups");
        }
    }

    private void verifyAccessToAllTopicsAndConsumerGroups(RangeRequest rr, int limit) throws InterruptedException, ExecutionException, TimeoutException {
        Preconditions.checkArgument((rr.topic == null && rr.group == null ? 1 : 0) != 0);
        Set<String> metricsTopics = this.getMetricsTopics(rr, limit);
        Set<String> userAccessibleTopics = this.getTopics(rr.clusterId);
        if (!userAccessibleTopics.containsAll(metricsTopics)) {
            throw new ForbiddenException("cannot show total overview data unless user has access to all intercepted topics");
        }
        for (String topic : metricsTopics) {
            RangeRequest rrTopic = new RangeRequest(rr);
            rrTopic.topic = topic;
            this.verifyAccessToAllConsumerGroupsInTopic(rrTopic, limit);
        }
    }

    private void verifyRequestAccess(RangeRequest rr, int limit) throws InterruptedException, ExecutionException, TimeoutException {
        if (this.rbacConfig.isRbacEnabled()) {
            if (rr.group == null) {
                if (rr.topic == null) {
                    this.verifyAccessToAllTopicsAndConsumerGroups(rr, limit);
                } else {
                    this.verifyAccessToAllConsumerGroupsInTopic(rr, limit);
                }
            } else if (!this.getConsumerGroups(rr.clusterId).contains(rr.group)) {
                throw new ForbiddenException("no access to this consumer group");
            }
            if (rr.topic != null && !this.getTopics(rr.clusterId).contains(rr.topic)) {
                throw new ForbiddenException("no access to this topic");
            }
        }
    }

    protected Map<Metric, ArrayList<Object>> getConsumedMessages(RangeRequest rr, int limit, long stepMs, Keys.KeyType keyType) throws InterruptedException, ExecutionException, TimeoutException {
        rr.clientType = Monitoring.ClientType.CONSUMER;
        this.verifyRequestAccess(rr, limit);
        try (WindowStoreIterator expectedMessages = ((ReadOnlyWindowStore)((Map)this.actualDb.get()).get((Object)rr.rollup)).fetch((Object)this.keySerdeSupplier.get(keyType).key((Object)rr.createStartKeySpec()), rr.getRollupAlignedStartTimeMs(), rr.getRollupAlignedStopTimeMs());){
            ArrayList consumedMessages = Lists.newArrayList();
            while (expectedMessages.hasNext() && consumedMessages.size() < limit) {
                consumedMessages.add(((KeyValue)expectedMessages.next()).value);
            }
            HashMap consumed = Maps.newHashMap();
            for (Metric metric : Metric.values()) {
                consumed.put(metric, Util.extractValues(consumedMessages, stepMs, rr.getRollupAlignedStartTimeMs(), rr.getRollupAlignedStopTimeMs(), limit, metric));
            }
            HashMap hashMap = consumed;
            return hashMap;
        }
    }

    protected Map<Metric, ArrayList<Object>> getProducedMessages(RangeRequest rr, int limit, long stepMs, Keys.KeyType keyType) throws InterruptedException, ExecutionException, TimeoutException {
        rr.clientType = Monitoring.ClientType.PRODUCER;
        this.verifyRequestAccess(rr, limit);
        try (WindowStoreIterator expectedMessages = ((ReadOnlyWindowStore)((Map)this.expectedDb.get()).get((Object)rr.rollup)).fetch((Object)this.keySerdeSupplier.get(keyType).key((Object)rr.createStartKeySpec()), rr.getRollupAlignedStartTimeMs(), rr.getRollupAlignedStopTimeMs());){
            ArrayList producedMessages = Lists.newArrayList();
            while (expectedMessages.hasNext() && producedMessages.size() < limit) {
                producedMessages.add(((KeyValue)expectedMessages.next()).value);
            }
            HashMap produced = Maps.newHashMap();
            for (Metric metric : new Metric[]{Metric.count, Metric.error, Metric.crc}) {
                produced.put(metric, Util.extractValues(producedMessages, stepMs, rr.getRollupAlignedStartTimeMs(), rr.getRollupAlignedStopTimeMs(), limit, metric));
            }
            HashMap hashMap = produced;
            return hashMap;
        }
    }

    protected DeliveryResponse addOverviewData(DeliveryResponse res, RangeRequest rr, DeliveryResponse.DeliveryType deliveryType, int limit) throws InterruptedException, ExecutionException, TimeoutException {
        res.produced = this.getProducedMessages(rr, limit, rr.rollup.getMillis(), deliveryType.baseProductionKeyType);
        res.consumed = this.getConsumedMessages(rr, limit, rr.rollup.getMillis(), deliveryType.baseConsumptionKeyType);
        if (deliveryType != DeliveryResponse.DeliveryType.CONSUMER) {
            res.consumed.remove((Object)Metric.group);
        }
        return res;
    }

    protected void addMembers(DeliveryResponse res, RangeRequest rr, DeliveryResponse.DeliveryType deliveryType, int limit, String clientId, String group, Long stepMs, RequestType requestType, AggregationType aggregateBy) throws InterruptedException, ExecutionException, TimeoutException {
        Set<String> filteredConsumerGroups = null;
        if (this.rbacConfig.isRbacEnabled()) {
            filteredConsumerGroups = this.getConsumerGroups(rr.clusterId);
        }
        if (filteredConsumerGroups != null && res.group != null && !filteredConsumerGroups.contains(res.group)) {
            throw new ForbiddenException("no access to this consumer group");
        }
        if (requestType == RequestType.MEMBER_LIST && rr.rollup == Rollup.ONE_MINUTE) {
            rr.rollup = Rollup.THREE_HOURS;
        }
        Controlcenter.WindowedGrouping aggregateGroupInfo = this.getAggregateGroupInfo(rr, limit, deliveryType.subGroupingKeyType);
        res.members = Lists.newArrayList();
        res.sources = Lists.newArrayList();
        res.destinations = Lists.newArrayList();
        if (deliveryType.subGroupingKeyType != null) {
            for (Controlcenter.MemberInfo member : aggregateGroupInfo.getMembersList()) {
                rr = Util.createMergedRangeRequest(member, rr, clientId, group);
                DeliveryResponse.MemberResponse memberResponse = new DeliveryResponse.MemberResponse(member);
                if (requestType == RequestType.ALL) {
                    memberResponse.consumed = this.getConsumedMessages(rr, limit, stepMs, deliveryType.subConsumptionKeyType);
                    memberResponse.consumed.remove((Object)Metric.group);
                    memberResponse.produced = this.getProducedMessages(rr, limit, stepMs, deliveryType.subProductionKeyType);
                }
                if (filteredConsumerGroups != null && (res.group != null || !filteredConsumerGroups.contains(member.getGroup()))) continue;
                res.members.add(memberResponse);
                if (aggregateBy == AggregationType.Topic) {
                    res.sources.add(new DeliveryResponse.MemberResponse(member));
                    continue;
                }
                res.destinations.add(new DeliveryResponse.MemberResponse(member));
            }
        }
        if (deliveryType.secondarySubGroupingKeyType != null) {
            rr.clientType = Monitoring.ClientType.CONSUMER;
            aggregateGroupInfo = this.getAggregateGroupInfo(rr, limit, deliveryType.secondarySubGroupingKeyType);
            for (Controlcenter.MemberInfo member : aggregateGroupInfo.getMembersList()) {
                if (filteredConsumerGroups != null && (res.group != null || !filteredConsumerGroups.contains(member.getGroup()))) continue;
                if (aggregateBy == AggregationType.Topic) {
                    res.destinations.add(new DeliveryResponse.MemberResponse(member));
                    continue;
                }
                res.sources.add(new DeliveryResponse.MemberResponse(member));
            }
        }
    }

    protected DeliveryResponse constructResponse(int limit, String clusterId, String group, String clientId, Rollup rollup, Long startTimeMs, Long stopTimeMs, RequestType requestType, String memberGroup, String memberTopic, Integer memberPartition, String memberClientId, AggregationType groupBy) throws InterruptedException, ExecutionException, TimeoutException {
        String topic;
        DeliveryResponse.DeliveryType deliveryType;
        this.checkParams(limit, rollup, startTimeMs, stopTimeMs);
        RangeRequest rr = new RangeRequest(clusterId, startTimeMs, stopTimeMs);
        rr.group = group;
        rr.rollup = rollup;
        rr.clientId = clientId;
        rr.clientType = Monitoring.ClientType.CONSUMER;
        long stepMs = rollup.getMillis();
        DeliveryResponse.DeliveryType deliveryType2 = deliveryType = groupBy == AggregationType.Topic ? DeliveryResponse.DeliveryType.CONSUMER_GROUPS_TOPICS : DeliveryResponse.DeliveryType.CONSUMER_GROUPS;
        if (group != null) {
            DeliveryResponse.DeliveryType deliveryType3 = deliveryType = groupBy == AggregationType.Topic ? DeliveryResponse.DeliveryType.CONSUMER_GROUP_TOPICS : DeliveryResponse.DeliveryType.CONSUMER_GROUP;
        }
        if (rr.clientId != null) {
            deliveryType = DeliveryResponse.DeliveryType.CONSUMER;
        }
        DeliveryResponse res = new DeliveryResponse(rr.rollup, rr.getRollupAlignedStartTimeMs(), rr.rollup.getMillis(), rr.group, rr.clientId, deliveryType);
        if (requestType == RequestType.ALL || requestType == RequestType.OVERVIEW) {
            this.addOverviewData(res, rr, deliveryType, limit);
        }
        if (requestType == RequestType.OVERVIEW) {
            return res;
        }
        if (requestType == RequestType.ALL || requestType == RequestType.MEMBER_LIST) {
            try {
                this.addMembers(res, rr, deliveryType, limit, clientId, group, stepMs, requestType, groupBy);
                return res;
            }
            catch (IllegalArgumentException e) {
                log.error("request=range", (Throwable)e);
                throw new BadRequestException((Throwable)e);
            }
        }
        String string = topic = deliveryType == DeliveryResponse.DeliveryType.CONSUMER_GROUPS_TOPICS ? memberTopic : null;
        Controlcenter.TopicPartition topicPartition = deliveryType == DeliveryResponse.DeliveryType.CONSUMER_GROUPS_TOPICS ? Controlcenter.TopicPartition.getDefaultInstance() : Controlcenter.TopicPartition.newBuilder().setTopic(Strings.nullToEmpty((String)memberTopic)).setPartition(memberPartition == null ? 0 : memberPartition).build();
        Controlcenter.MemberInfo memberInfo = Controlcenter.MemberInfo.newBuilder().setClientId(Strings.nullToEmpty((String)memberClientId)).setGroup(Strings.nullToEmpty((String)memberGroup)).setTopic(Strings.nullToEmpty((String)topic)).setTopicPartition(topicPartition).build();
        rr = Util.createMergedRangeRequest(memberInfo, rr, clientId, group);
        DeliveryResponse.MemberResponse memberResponse = new DeliveryResponse.MemberResponse(memberInfo);
        memberResponse.consumed = this.getConsumedMessages(rr, limit, stepMs, deliveryType.subConsumptionKeyType);
        memberResponse.consumed.remove((Object)Metric.group);
        memberResponse.produced = this.getProducedMessages(rr, limit, stepMs, deliveryType.subProductionKeyType);
        res.members = Lists.newArrayList();
        res.members.add(memberResponse);
        return res;
    }

    @GET
    @Path(value="/consumer")
    public DeliveryResponse consumer(@PathParam(value="clusterId") VisibleCluster cluster, @QueryParam(value="limit") @DefaultValue(value="6000") int limit, @QueryParam(value="group") String group, @QueryParam(value="clientId") String clientId, @QueryParam(value="rollup") Rollup rollup, @QueryParam(value="startTimeMs") Long startTimeMs, @QueryParam(value="stopTimeMs") Long stopTimeMs, @QueryParam(value="type") @DefaultValue(value="ALL") RequestType type, @QueryParam(value="memberTopic") String memberTopic, @QueryParam(value="memberPartition") Integer memberPartition) throws InterruptedException, ExecutionException, TimeoutException {
        if (group == null) {
            throw new BadRequestException("missing group");
        }
        if (clientId == null) {
            throw new BadRequestException("missing clientId");
        }
        return this.constructResponse(limit, MessageDeliveryResource.validateClusterId(cluster.getClusterId()), group, clientId, rollup, startTimeMs, stopTimeMs, type, null, memberTopic, memberPartition, null, AggregationType.ConsumerGroup);
    }

    @GET
    @Path(value="/consumer_group")
    public DeliveryResponse consumerGroup(@PathParam(value="clusterId") VisibleCluster cluster, @QueryParam(value="limit") @DefaultValue(value="6000") int limit, @QueryParam(value="group") String group, @QueryParam(value="rollup") Rollup rollup, @QueryParam(value="startTimeMs") Long startTimeMs, @QueryParam(value="stopTimeMs") Long stopTimeMs, @QueryParam(value="type") @DefaultValue(value="ALL") RequestType type, @QueryParam(value="memberClientId") String memberClientId) throws InterruptedException, ExecutionException, TimeoutException {
        if (group == null) {
            throw new BadRequestException("missing group");
        }
        return this.constructResponse(limit, MessageDeliveryResource.validateClusterId(cluster.getClusterId()), group, null, rollup, startTimeMs, stopTimeMs, type, null, null, null, memberClientId, AggregationType.ConsumerGroup);
    }

    @GET
    @Path(value="/consumer_groups")
    public DeliveryResponse consumerGroups(@PathParam(value="clusterId") VisibleCluster cluster, @QueryParam(value="limit") @DefaultValue(value="6000") int limit, @QueryParam(value="group") String group, @QueryParam(value="clientId") String clientId, @QueryParam(value="rollup") Rollup rollup, @QueryParam(value="startTimeMs") Long startTimeMs, @QueryParam(value="stopTimeMs") Long stopTimeMs, @QueryParam(value="type") @DefaultValue(value="ALL") RequestType type, @QueryParam(value="memberGroup") String memberGroup) throws InterruptedException, ExecutionException, TimeoutException {
        return this.constructResponse(limit, MessageDeliveryResource.validateClusterId(cluster.getClusterId()), group, clientId, rollup, startTimeMs, stopTimeMs, type, memberGroup, null, null, null, AggregationType.ConsumerGroup);
    }

    @GET
    @Path(value="/consumer_group_topics")
    public DeliveryResponse consumerGroupTopics(@PathParam(value="clusterId") VisibleCluster cluster, @QueryParam(value="limit") @DefaultValue(value="6000") int limit, @QueryParam(value="group") String group, @QueryParam(value="rollup") Rollup rollup, @QueryParam(value="startTimeMs") Long startTimeMs, @QueryParam(value="stopTimeMs") Long stopTimeMs, @QueryParam(value="type") @DefaultValue(value="ALL") RequestType type, @QueryParam(value="memberTopic") String memberTopic, @QueryParam(value="memberPartition") Integer memberPartition) throws InterruptedException, ExecutionException, TimeoutException {
        return this.constructResponse(limit, MessageDeliveryResource.validateClusterId(cluster.getClusterId()), group, null, rollup, startTimeMs, stopTimeMs, type, null, memberTopic, memberPartition, null, AggregationType.Topic);
    }

    @GET
    @Path(value="/consumer_groups_topics")
    public DeliveryResponse consumerGroupsTopics(@PathParam(value="clusterId") VisibleCluster cluster, @QueryParam(value="limit") @DefaultValue(value="6000") int limit, @QueryParam(value="group") String group, @QueryParam(value="rollup") Rollup rollup, @QueryParam(value="startTimeMs") Long startTimeMs, @QueryParam(value="stopTimeMs") Long stopTimeMs, @QueryParam(value="type") @DefaultValue(value="ALL") RequestType type, @QueryParam(value="memberTopic") String memberTopic) throws InterruptedException, ExecutionException, TimeoutException {
        return this.constructResponse(limit, MessageDeliveryResource.validateClusterId(cluster.getClusterId()), group, null, rollup, startTimeMs, stopTimeMs, type, null, memberTopic, null, null, AggregationType.Topic);
    }

    protected static enum AggregationType {
        ConsumerGroup,
        Topic;

    }
}

