/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.TaskAssignmentException;
import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.InternalTopicManager;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import org.apache.kafka.streams.processor.internals.assignment.ClientState;
import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamPartitionAssignor
implements PartitionAssignor,
Configurable {
    private static final Logger log = LoggerFactory.getLogger(StreamPartitionAssignor.class);
    private static final Comparator<TopicPartition> PARTITION_COMPARATOR = new Comparator<TopicPartition>(){

        @Override
        public int compare(TopicPartition p1, TopicPartition p2) {
            int result = p1.topic().compareTo(p2.topic());
            if (result != 0) {
                return result;
            }
            return p1.partition() < p2.partition() ? -1 : (p1.partition() > p2.partition() ? 1 : 0);
        }
    };
    private StreamThread streamThread;
    private int numStandbyReplicas;
    private Map<Integer, TopologyBuilder.TopicsInfo> topicGroups;
    private Map<TopicPartition, Set<TaskId>> partitionToTaskIds;
    private Map<String, Set<TaskId>> stateChangelogTopicToTaskIds;
    private Map<String, Set<TaskId>> internalSourceTopicToTaskIds;
    private Map<TaskId, Set<TopicPartition>> standbyTasks;
    private InternalTopicManager internalTopicManager;

    public void configure(Map<String, ?> configs) {
        this.numStandbyReplicas = (Integer)configs.get("num.standby.replicas");
        Object o = configs.get("__stream.thread.instance__");
        if (o == null) {
            KafkaException ex = new KafkaException("StreamThread is not specified");
            log.error(ex.getMessage(), (Throwable)ex);
            throw ex;
        }
        if (!(o instanceof StreamThread)) {
            KafkaException ex = new KafkaException(o.getClass().getName() + " is not an instance of " + StreamThread.class.getName());
            log.error(ex.getMessage(), (Throwable)ex);
            throw ex;
        }
        this.streamThread = (StreamThread)o;
        this.streamThread.partitionAssignor(this);
        this.topicGroups = this.streamThread.builder.topicGroups();
        if (configs.containsKey("zookeeper.connect")) {
            this.internalTopicManager = new InternalTopicManager((String)configs.get("zookeeper.connect"), configs.containsKey("replication.factor") ? (Integer)configs.get("replication.factor") : 1);
        } else {
            log.info("Config '{}' isn't supplied and hence no internal topics will be created.", (Object)"zookeeper.connect");
        }
    }

    public String name() {
        return "stream";
    }

    public PartitionAssignor.Subscription subscription(Set<String> topics) {
        Set<TaskId> prevTasks = this.streamThread.prevTasks();
        Set<TaskId> standbyTasks = this.streamThread.cachedTasks();
        standbyTasks.removeAll(prevTasks);
        SubscriptionInfo data = new SubscriptionInfo(this.streamThread.processId, prevTasks, standbyTasks);
        return new PartitionAssignor.Subscription(new ArrayList<String>(topics), data.encode());
    }

    private Map<TopicPartition, PartitionInfo> prepareTopic(Map<String, Set<TaskId>> topicToTaskIds, boolean compactTopic, boolean postPartitionPhase) {
        HashMap<TopicPartition, PartitionInfo> partitionInfos = new HashMap<TopicPartition, PartitionInfo>();
        if (this.internalTopicManager != null) {
            log.debug("Starting to validate internal topics in partition assignor.");
            for (Map.Entry<String, Set<TaskId>> entry : topicToTaskIds.entrySet()) {
                List partitions;
                String topic = entry.getKey();
                int numPartitions = 0;
                if (postPartitionPhase) {
                    for (TaskId task : entry.getValue()) {
                        if (numPartitions >= task.partition + 1) continue;
                        numPartitions = task.partition + 1;
                    }
                } else {
                    numPartitions = -1;
                    for (TaskId task : entry.getValue()) {
                        numPartitions = task.partition;
                    }
                }
                this.internalTopicManager.makeReady(topic, numPartitions, compactTopic);
                while ((partitions = this.streamThread.restoreConsumer.partitionsFor(topic)) == null || partitions.size() != numPartitions) {
                }
                for (PartitionInfo partition : partitions) {
                    partitionInfos.put(new TopicPartition(partition.topic(), partition.partition()), partition);
                }
            }
            log.info("Completed validating internal topics in partition assignor.");
        } else {
            ArrayList<String> missingTopics = new ArrayList<String>();
            for (String topic : topicToTaskIds.keySet()) {
                List partitions = this.streamThread.restoreConsumer.partitionsFor(topic);
                if (partitions != null) continue;
                missingTopics.add(topic);
            }
            if (!missingTopics.isEmpty()) {
                log.warn("Topic {} do not exists but couldn't created as the config '{}' isn't supplied", missingTopics, (Object)"zookeeper.connect");
            }
        }
        return partitionInfos;
    }

    public Map<String, PartitionAssignor.Assignment> assign(Cluster metadata, Map<String, PartitionAssignor.Subscription> subscriptions) {
        HashMap<UUID, HashSet<String>> consumersByClient = new HashMap<UUID, HashSet<String>>();
        Map states = new HashMap();
        for (Map.Entry<String, PartitionAssignor.Subscription> entry : subscriptions.entrySet()) {
            String consumerId = entry.getKey();
            PartitionAssignor.Subscription subscription = entry.getValue();
            SubscriptionInfo info = SubscriptionInfo.decode(subscription.userData());
            HashSet<String> consumers = (HashSet<String>)consumersByClient.get(info.processId);
            if (consumers == null) {
                consumers = new HashSet<String>();
                consumersByClient.put(info.processId, consumers);
            }
            consumers.add(consumerId);
            ClientState state = (ClientState)states.get(info.processId);
            if (state == null) {
                state = new ClientState();
                states.put(info.processId, state);
            }
            state.prevActiveTasks.addAll(info.prevTasks);
            state.prevAssignedTasks.addAll(info.prevTasks);
            state.prevAssignedTasks.addAll(info.standbyTasks);
            state.capacity += 1.0;
        }
        this.internalSourceTopicToTaskIds = new HashMap<String, Set<TaskId>>();
        HashMap<Integer, Set<String>> sourceTopicGroups = new HashMap<Integer, Set<String>>();
        HashMap<Integer, Set<String>> internalSourceTopicGroups = new HashMap<Integer, Set<String>>();
        for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> entry : this.topicGroups.entrySet()) {
            sourceTopicGroups.put(entry.getKey(), entry.getValue().sourceTopics);
            internalSourceTopicGroups.put(entry.getKey(), entry.getValue().interSourceTopics);
        }
        Collection<Set<String>> copartitionTopicGroups = this.streamThread.builder.copartitionGroups();
        this.ensureCopartitioning(copartitionTopicGroups, internalSourceTopicGroups, metadata);
        for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> entry : this.topicGroups.entrySet()) {
            Set<String> internalTopics = entry.getValue().interSourceTopics;
            for (String internalTopic : internalTopics) {
                Set<TaskId> tasks = this.internalSourceTopicToTaskIds.get(internalTopic);
                if (tasks != null) continue;
                int numPartitions = -1;
                for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> other : this.topicGroups.entrySet()) {
                    Set<String> otherSinkTopics = other.getValue().sinkTopics;
                    if (!otherSinkTopics.contains(internalTopic)) continue;
                    for (String topic : other.getValue().sourceTopics) {
                        List infos = metadata.partitionsForTopic(topic);
                        if (infos == null || infos.size() <= numPartitions) continue;
                        numPartitions = infos.size();
                    }
                }
                this.internalSourceTopicToTaskIds.put(internalTopic, Collections.singleton(new TaskId(entry.getKey(), numPartitions)));
            }
        }
        Map<TopicPartition, PartitionInfo> internalPartitionInfos = this.prepareTopic(this.internalSourceTopicToTaskIds, false, false);
        this.internalSourceTopicToTaskIds.clear();
        Cluster metadataWithInternalTopics = metadata;
        if (this.internalTopicManager != null) {
            metadataWithInternalTopics = metadata.withPartitions(internalPartitionInfos);
        }
        Map<TaskId, Set<TopicPartition>> partitionsForTask = this.streamThread.partitionGrouper.partitionGroups(sourceTopicGroups, metadataWithInternalTopics);
        this.stateChangelogTopicToTaskIds = new HashMap<String, Set<TaskId>>();
        for (TaskId task : partitionsForTask.keySet()) {
            Set<TaskId> tasks;
            for (String topicName : this.topicGroups.get((Object)Integer.valueOf((int)task.topicGroupId)).stateChangelogTopics) {
                tasks = this.stateChangelogTopicToTaskIds.get(topicName);
                if (tasks == null) {
                    tasks = new HashSet<TaskId>();
                    this.stateChangelogTopicToTaskIds.put(topicName, tasks);
                }
                tasks.add(task);
            }
            for (String topicName : this.topicGroups.get((Object)Integer.valueOf((int)task.topicGroupId)).interSourceTopics) {
                tasks = this.internalSourceTopicToTaskIds.get(topicName);
                if (tasks == null) {
                    tasks = new HashSet<TaskId>();
                    this.internalSourceTopicToTaskIds.put(topicName, tasks);
                }
                tasks.add(task);
            }
        }
        states = TaskAssignor.assign(states, partitionsForTask.keySet(), this.numStandbyReplicas);
        HashMap<String, PartitionAssignor.Assignment> assignment = new HashMap<String, PartitionAssignor.Assignment>();
        for (Map.Entry entry : consumersByClient.entrySet()) {
            UUID processId = (UUID)entry.getKey();
            Set consumers = (Set)entry.getValue();
            ClientState state = (ClientState)states.get(processId);
            ArrayList<TaskId> taskIds = new ArrayList<TaskId>(state.assignedTasks.size());
            int numActiveTasks = state.activeTasks.size();
            for (TaskId taskId : state.activeTasks) {
                taskIds.add(taskId);
            }
            for (TaskId id : state.assignedTasks) {
                if (state.activeTasks.contains(id)) continue;
                taskIds.add(id);
            }
            int numConsumers = consumers.size();
            HashMap<TaskId, Set<TopicPartition>> standby = new HashMap<TaskId, Set<TopicPartition>>();
            int i = 0;
            for (String consumer : consumers) {
                ArrayList<AssignedPartition> assignedPartitions = new ArrayList<AssignedPartition>();
                int numTaskIds = taskIds.size();
                for (int j = i; j < numTaskIds; j += numConsumers) {
                    TaskId taskId = (TaskId)taskIds.get(j);
                    if (j < numActiveTasks) {
                        for (TopicPartition topicPartition : partitionsForTask.get(taskId)) {
                            assignedPartitions.add(new AssignedPartition(taskId, topicPartition));
                        }
                        continue;
                    }
                    HashSet standbyPartitions = (HashSet)standby.get(taskId);
                    if (standbyPartitions == null) {
                        standbyPartitions = new HashSet();
                        standby.put(taskId, standbyPartitions);
                    }
                    standbyPartitions.addAll(partitionsForTask.get(taskId));
                }
                Collections.sort(assignedPartitions);
                ArrayList<TaskId> active = new ArrayList<TaskId>();
                ArrayList<TopicPartition> activePartitions = new ArrayList<TopicPartition>();
                for (AssignedPartition assignedPartition : assignedPartitions) {
                    active.add(assignedPartition.taskId);
                    activePartitions.add(assignedPartition.partition);
                }
                AssignmentInfo data = new AssignmentInfo(active, standby);
                assignment.put(consumer, new PartitionAssignor.Assignment(activePartitions, data.encode()));
                ++i;
                active.clear();
                standby.clear();
            }
        }
        this.prepareTopic(this.internalSourceTopicToTaskIds, false, true);
        this.prepareTopic(this.stateChangelogTopicToTaskIds, true, true);
        return assignment;
    }

    public void onAssignment(PartitionAssignor.Assignment assignment) {
        ArrayList partitions = new ArrayList(assignment.partitions());
        Collections.sort(partitions, PARTITION_COMPARATOR);
        AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
        this.standbyTasks = info.standbyTasks;
        HashMap<TopicPartition, Set<TaskId>> partitionToTaskIds = new HashMap<TopicPartition, Set<TaskId>>();
        Iterator<TaskId> iter = info.activeTasks.iterator();
        for (TopicPartition partition : partitions) {
            HashSet<TaskId> taskIds = (HashSet<TaskId>)partitionToTaskIds.get(partition);
            if (taskIds == null) {
                taskIds = new HashSet<TaskId>();
                partitionToTaskIds.put(partition, taskIds);
            }
            if (iter.hasNext()) {
                taskIds.add(iter.next());
                continue;
            }
            TaskAssignmentException ex = new TaskAssignmentException("failed to find a task id for the partition=" + partition.toString() + ", partitions=" + partitions.size() + ", assignmentInfo=" + info.toString());
            log.error(ex.getMessage(), (Throwable)((Object)ex));
            throw ex;
        }
        this.partitionToTaskIds = partitionToTaskIds;
    }

    private void ensureCopartitioning(Collection<Set<String>> copartitionGroups, Map<Integer, Set<String>> internalTopicGroups, Cluster metadata) {
        HashSet<String> internalTopics = new HashSet<String>();
        for (Set<String> topics : internalTopicGroups.values()) {
            internalTopics.addAll(topics);
        }
        for (Set<String> copartitionGroup : copartitionGroups) {
            this.ensureCopartitioning(copartitionGroup, internalTopics, metadata);
        }
    }

    private void ensureCopartitioning(Set<String> copartitionGroup, Set<String> internalTopics, Cluster metadata) {
        int numPartitions = -1;
        for (String topic : copartitionGroup) {
            if (internalTopics.contains(topic)) continue;
            List infos = metadata.partitionsForTopic(topic);
            if (infos == null) {
                throw new TopologyBuilderException("External source topic not found: " + topic);
            }
            if (numPartitions == -1) {
                numPartitions = infos.size();
                continue;
            }
            if (numPartitions == infos.size()) continue;
            Object[] topics = copartitionGroup.toArray(new String[copartitionGroup.size()]);
            Arrays.sort(topics);
            throw new TopologyBuilderException("Topics not copartitioned: [" + Utils.mkString(Arrays.asList(topics), (String)",") + "]");
        }
        for (String topic : internalTopics) {
            if (!copartitionGroup.contains(topic)) continue;
            this.internalSourceTopicToTaskIds.put(topic, Collections.singleton(new TaskId(-1, numPartitions)));
        }
    }

    public Set<TaskId> tasksForState(String stateName) {
        return this.stateChangelogTopicToTaskIds.get(ProcessorStateManager.storeChangelogTopic(this.streamThread.applicationId, stateName));
    }

    public Set<TaskId> tasksForPartition(TopicPartition partition) {
        return this.partitionToTaskIds.get(partition);
    }

    public Map<TaskId, Set<TopicPartition>> standbyTasks() {
        return this.standbyTasks;
    }

    public void setInternalTopicManager(InternalTopicManager internalTopicManager) {
        this.internalTopicManager = internalTopicManager;
    }

    private static class AssignedPartition
    implements Comparable<AssignedPartition> {
        public final TaskId taskId;
        public final TopicPartition partition;

        public AssignedPartition(TaskId taskId, TopicPartition partition) {
            this.taskId = taskId;
            this.partition = partition;
        }

        @Override
        public int compareTo(AssignedPartition that) {
            return PARTITION_COMPARATOR.compare(this.partition, that.partition);
        }
    }
}

