/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.internals.InternalTopicConfig;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.QuickUnion;
import org.apache.kafka.streams.processor.internals.SinkNode;
import org.apache.kafka.streams.processor.internals.SourceNode;
import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor;
import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;

public class TopologyBuilder {
    private final LinkedHashMap<String, NodeFactory> nodeFactories = new LinkedHashMap();
    private final Map<String, StateStoreFactory> stateFactories = new HashMap<String, StateStoreFactory>();
    private final Set<String> sourceTopicNames = new HashSet<String>();
    private final Set<String> internalTopicNames = new HashSet<String>();
    private final List<Set<String>> copartitionSourceGroups = new ArrayList<Set<String>>();
    private final HashMap<String, String[]> nodeToSourceTopics = new HashMap();
    private final HashMap<String, Pattern> nodeToSourcePatterns = new LinkedHashMap<String, Pattern>();
    private final HashMap<String, String> nodeToSinkTopic = new HashMap();
    private final HashMap<String, Pattern> topicToPatterns = new HashMap();
    private final Map<String, Set<String>> stateStoreNameToSourceTopics = new HashMap<String, Set<String>>();
    private final HashMap<String, String> sourceStoreToSourceTopic = new HashMap();
    private final QuickUnion<String> nodeGrouper = new QuickUnion();
    private StreamPartitionAssignor.SubscriptionUpdates subscriptionUpdates = new StreamPartitionAssignor.SubscriptionUpdates();
    private String applicationId = null;
    private Pattern topicPattern = null;
    private Map<Integer, Set<String>> nodeGroups = null;

    public final synchronized TopologyBuilder setApplicationId(String applicationId) {
        Objects.requireNonNull(applicationId, "applicationId can't be null");
        this.applicationId = applicationId;
        return this;
    }

    public final synchronized TopologyBuilder addSource(String name, String ... topics) {
        return this.addSource(name, (Deserializer)null, (Deserializer)null, topics);
    }

    public final synchronized TopologyBuilder addSource(String name, Pattern topicPattern) {
        return this.addSource(name, (Deserializer)null, (Deserializer)null, topicPattern);
    }

    public final synchronized TopologyBuilder addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializer, String ... topics) {
        if (topics.length == 0) {
            throw new TopologyBuilderException("You must provide at least one topic");
        }
        Objects.requireNonNull(name, "name must not be null");
        if (this.nodeFactories.containsKey(name)) {
            throw new TopologyBuilderException("Processor " + name + " is already added.");
        }
        for (String topic : topics) {
            Objects.requireNonNull(topic, "topic names cannot be null");
            if (this.sourceTopicNames.contains(topic)) {
                throw new TopologyBuilderException("Topic " + topic + " has already been registered by another source.");
            }
            for (Pattern pattern : this.nodeToSourcePatterns.values()) {
                if (!pattern.matcher(topic).matches()) continue;
                throw new TopologyBuilderException("Topic " + topic + " matches a Pattern already registered by another source.");
            }
            this.sourceTopicNames.add(topic);
        }
        this.nodeFactories.put(name, new SourceNodeFactory(name, topics, null, keyDeserializer, valDeserializer));
        this.nodeToSourceTopics.put(name, (String[])topics.clone());
        this.nodeGrouper.add(name);
        return this;
    }

    public final synchronized TopologyBuilder addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializer, Pattern topicPattern) {
        Objects.requireNonNull(topicPattern, "topicPattern can't be null");
        Objects.requireNonNull(name, "name can't be null");
        if (this.nodeFactories.containsKey(name)) {
            throw new TopologyBuilderException("Processor " + name + " is already added.");
        }
        for (String sourceTopicName : this.sourceTopicNames) {
            if (!topicPattern.matcher(sourceTopicName).matches()) continue;
            throw new TopologyBuilderException("Pattern  " + topicPattern + " will match a topic that has already been registered by another source.");
        }
        this.nodeFactories.put(name, new SourceNodeFactory(name, null, topicPattern, keyDeserializer, valDeserializer));
        this.nodeToSourcePatterns.put(name, topicPattern);
        this.nodeGrouper.add(name);
        return this;
    }

    public final synchronized TopologyBuilder addSink(String name, String topic, String ... parentNames) {
        return this.addSink(name, topic, (Serializer)null, (Serializer)null, parentNames);
    }

    public final synchronized TopologyBuilder addSink(String name, String topic, StreamPartitioner partitioner, String ... parentNames) {
        return this.addSink(name, topic, (Serializer)null, (Serializer)null, partitioner, parentNames);
    }

    public final synchronized TopologyBuilder addSink(String name, String topic, Serializer keySerializer, Serializer valSerializer, String ... parentNames) {
        return this.addSink(name, topic, keySerializer, valSerializer, (StreamPartitioner)null, parentNames);
    }

    public final synchronized <K, V> TopologyBuilder addSink(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamPartitioner<K, V> partitioner, String ... parentNames) {
        Objects.requireNonNull(name, "name must not be null");
        Objects.requireNonNull(topic, "topic must not be null");
        if (this.nodeFactories.containsKey(name)) {
            throw new TopologyBuilderException("Processor " + name + " is already added.");
        }
        if (parentNames != null) {
            for (String parent : parentNames) {
                if (parent.equals(name)) {
                    throw new TopologyBuilderException("Processor " + name + " cannot be a parent of itself.");
                }
                if (this.nodeFactories.containsKey(parent)) continue;
                throw new TopologyBuilderException("Parent processor " + parent + " is not added yet.");
            }
        }
        this.nodeFactories.put(name, new SinkNodeFactory(name, parentNames, topic, keySerializer, valSerializer, partitioner));
        this.nodeToSinkTopic.put(name, topic);
        this.nodeGrouper.add(name);
        this.nodeGrouper.unite(name, (String[])parentNames);
        return this;
    }

    public final synchronized TopologyBuilder addProcessor(String name, ProcessorSupplier supplier, String ... parentNames) {
        Objects.requireNonNull(name, "name must not be null");
        Objects.requireNonNull(supplier, "supplier must not be null");
        if (this.nodeFactories.containsKey(name)) {
            throw new TopologyBuilderException("Processor " + name + " is already added.");
        }
        if (parentNames != null) {
            for (String parent : parentNames) {
                if (parent.equals(name)) {
                    throw new TopologyBuilderException("Processor " + name + " cannot be a parent of itself.");
                }
                if (this.nodeFactories.containsKey(parent)) continue;
                throw new TopologyBuilderException("Parent processor " + parent + " is not added yet.");
            }
        }
        this.nodeFactories.put(name, new ProcessorNodeFactory(name, parentNames, supplier));
        this.nodeGrouper.add(name);
        this.nodeGrouper.unite(name, (String[])parentNames);
        return this;
    }

    public final synchronized TopologyBuilder addStateStore(StateStoreSupplier supplier, String ... processorNames) {
        Objects.requireNonNull(supplier, "supplier can't be null");
        if (this.stateFactories.containsKey(supplier.name())) {
            throw new TopologyBuilderException("StateStore " + supplier.name() + " is already added.");
        }
        this.stateFactories.put(supplier.name(), new StateStoreFactory(supplier));
        if (processorNames != null) {
            for (String processorName : processorNames) {
                this.connectProcessorAndStateStore(processorName, supplier.name());
            }
        }
        return this;
    }

    public final synchronized TopologyBuilder connectProcessorAndStateStores(String processorName, String ... stateStoreNames) {
        Objects.requireNonNull(processorName, "processorName can't be null");
        if (stateStoreNames != null) {
            for (String stateStoreName : stateStoreNames) {
                this.connectProcessorAndStateStore(processorName, stateStoreName);
            }
        }
        return this;
    }

    protected final synchronized TopologyBuilder connectSourceStoreAndTopic(String sourceStoreName, String topic) {
        if (this.sourceStoreToSourceTopic.containsKey(sourceStoreName)) {
            throw new TopologyBuilderException("Source store " + sourceStoreName + " is already added.");
        }
        this.sourceStoreToSourceTopic.put(sourceStoreName, topic);
        return this;
    }

    public final synchronized TopologyBuilder connectProcessors(String ... processorNames) {
        if (processorNames.length < 2) {
            throw new TopologyBuilderException("At least two processors need to participate in the connection.");
        }
        for (String processorName : processorNames) {
            if (this.nodeFactories.containsKey(processorName)) continue;
            throw new TopologyBuilderException("Processor " + processorName + " is not added yet.");
        }
        String firstProcessorName = processorNames[0];
        this.nodeGrouper.unite(firstProcessorName, (String[])Arrays.copyOfRange(processorNames, 1, processorNames.length));
        return this;
    }

    public final synchronized TopologyBuilder addInternalTopic(String topicName) {
        Objects.requireNonNull(topicName, "topicName can't be null");
        this.internalTopicNames.add(topicName);
        return this;
    }

    public final synchronized TopologyBuilder copartitionSources(Collection<String> sourceNodes) {
        this.copartitionSourceGroups.add(Collections.unmodifiableSet(new HashSet<String>(sourceNodes)));
        return this;
    }

    private void connectProcessorAndStateStore(String processorName, String stateStoreName) {
        if (!this.stateFactories.containsKey(stateStoreName)) {
            throw new TopologyBuilderException("StateStore " + stateStoreName + " is not added yet.");
        }
        if (!this.nodeFactories.containsKey(processorName)) {
            throw new TopologyBuilderException("Processor " + processorName + " is not added yet.");
        }
        StateStoreFactory stateStoreFactory = this.stateFactories.get(stateStoreName);
        Iterator<String> iter = stateStoreFactory.users.iterator();
        if (iter.hasNext()) {
            String user = iter.next();
            this.nodeGrouper.unite(user, (String[])new String[]{processorName});
        }
        stateStoreFactory.users.add(processorName);
        NodeFactory nodeFactory = this.nodeFactories.get(processorName);
        if (!(nodeFactory instanceof ProcessorNodeFactory)) {
            throw new TopologyBuilderException("cannot connect a state store " + stateStoreName + " to a source node or a sink node.");
        }
        ProcessorNodeFactory processorNodeFactory = (ProcessorNodeFactory)nodeFactory;
        processorNodeFactory.addStateStore(stateStoreName);
        this.connectStateStoreNameToSourceTopics(stateStoreName, processorNodeFactory);
    }

    private Set<String> findSourceTopicsForProcessorParents(String[] parents) {
        HashSet<String> sourceTopics = new HashSet<String>();
        for (String parent : parents) {
            NodeFactory nodeFactory = this.nodeFactories.get(parent);
            if (nodeFactory instanceof SourceNodeFactory) {
                sourceTopics.addAll(Arrays.asList(((SourceNodeFactory)nodeFactory).getTopics()));
                continue;
            }
            if (!(nodeFactory instanceof ProcessorNodeFactory)) continue;
            sourceTopics.addAll(this.findSourceTopicsForProcessorParents(((ProcessorNodeFactory)nodeFactory).parents));
        }
        return sourceTopics;
    }

    private void connectStateStoreNameToSourceTopics(String stateStoreName, ProcessorNodeFactory processorNodeFactory) {
        Set<String> sourceTopics = this.findSourceTopicsForProcessorParents(processorNodeFactory.parents);
        if (sourceTopics.isEmpty()) {
            throw new TopologyBuilderException("can't find source topic for state store " + stateStoreName);
        }
        this.stateStoreNameToSourceTopics.put(stateStoreName, Collections.unmodifiableSet(sourceTopics));
    }

    public synchronized Map<Integer, Set<String>> nodeGroups() {
        if (this.nodeGroups == null) {
            this.nodeGroups = this.makeNodeGroups();
        }
        return this.nodeGroups;
    }

    private Map<Integer, Set<String>> makeNodeGroups() {
        Set<String> nodeGroup;
        String root;
        LinkedHashMap<Integer, Set<String>> nodeGroups = new LinkedHashMap<Integer, Set<String>>();
        HashMap rootToNodeGroup = new HashMap();
        int nodeGroupId = 0;
        for (String nodeName : Utils.sorted(this.nodeToSourceTopics.keySet())) {
            root = this.nodeGrouper.root(nodeName);
            nodeGroup = (HashSet<String>)rootToNodeGroup.get(root);
            if (nodeGroup == null) {
                nodeGroup = new HashSet<String>();
                rootToNodeGroup.put(root, nodeGroup);
                nodeGroups.put(nodeGroupId++, nodeGroup);
            }
            nodeGroup.add(nodeName);
        }
        for (String nodeName : Utils.sorted(this.nodeFactories.keySet())) {
            if (this.nodeToSourceTopics.containsKey(nodeName)) continue;
            root = this.nodeGrouper.root(nodeName);
            nodeGroup = (Set)rootToNodeGroup.get(root);
            if (nodeGroup == null) {
                nodeGroup = new HashSet();
                rootToNodeGroup.put(root, nodeGroup);
                nodeGroups.put(nodeGroupId++, nodeGroup);
            }
            nodeGroup.add(nodeName);
        }
        return nodeGroups;
    }

    public synchronized ProcessorTopology build(Integer topicGroupId) {
        Set<String> nodeGroup = topicGroupId != null ? this.nodeGroups().get(topicGroupId) : null;
        return this.build(nodeGroup);
    }

    private ProcessorTopology build(Set<String> nodeGroup) {
        ArrayList<ProcessorNode> processorNodes = new ArrayList<ProcessorNode>(this.nodeFactories.size());
        HashMap<String, ProcessorNode> processorMap = new HashMap<String, ProcessorNode>();
        HashMap<String, SourceNode> topicSourceMap = new HashMap<String, SourceNode>();
        HashMap<String, SinkNode> topicSinkMap = new HashMap<String, SinkNode>();
        LinkedHashMap<String, StateStore> stateStoreMap = new LinkedHashMap<String, StateStore>();
        HashMap<StateStore, ProcessorNode> storeToProcessorNodeMap = new HashMap<StateStore, ProcessorNode>();
        for (NodeFactory factory : this.nodeFactories.values()) {
            if (nodeGroup != null && !nodeGroup.contains(factory.name)) continue;
            ProcessorNode node = factory.build();
            processorNodes.add(node);
            processorMap.put(node.name(), node);
            if (factory instanceof ProcessorNodeFactory) {
                for (String parent : ((ProcessorNodeFactory)factory).parents) {
                    ((ProcessorNode)processorMap.get(parent)).addChild(node);
                }
                for (String stateStoreName : ((ProcessorNodeFactory)factory).stateStoreNames) {
                    if (stateStoreMap.containsKey(stateStoreName)) continue;
                    StateStoreSupplier supplier = this.stateFactories.get((Object)stateStoreName).supplier;
                    StateStore stateStore = supplier.get();
                    stateStoreMap.put(stateStoreName, stateStore);
                    storeToProcessorNodeMap.put(stateStore, node);
                }
                continue;
            }
            if (factory instanceof SourceNodeFactory) {
                String[] topics;
                SourceNodeFactory sourceNodeFactory = (SourceNodeFactory)factory;
                for (String topic : topics = sourceNodeFactory.pattern != null ? sourceNodeFactory.getTopics(this.subscriptionUpdates.getUpdates()) : sourceNodeFactory.getTopics()) {
                    if (this.internalTopicNames.contains(topic)) {
                        topicSourceMap.put(this.decorateTopic(topic), (SourceNode)node);
                        continue;
                    }
                    topicSourceMap.put(topic, (SourceNode)node);
                }
                continue;
            }
            if (factory instanceof SinkNodeFactory) {
                SinkNodeFactory sinkNodeFactory = (SinkNodeFactory)factory;
                for (String parent : sinkNodeFactory.parents) {
                    ((ProcessorNode)processorMap.get(parent)).addChild(node);
                    if (this.internalTopicNames.contains(sinkNodeFactory.topic)) {
                        topicSinkMap.put(this.decorateTopic(sinkNodeFactory.topic), (SinkNode)node);
                        continue;
                    }
                    topicSinkMap.put(sinkNodeFactory.topic, (SinkNode)node);
                }
                continue;
            }
            throw new TopologyBuilderException("Unknown definition class: " + factory.getClass().getName());
        }
        return new ProcessorTopology(processorNodes, topicSourceMap, topicSinkMap, new ArrayList<StateStore>(stateStoreMap.values()), this.sourceStoreToSourceTopic, storeToProcessorNodeMap);
    }

    public synchronized Map<Integer, TopicsInfo> topicGroups() {
        LinkedHashMap<Object, TopicsInfo> topicGroups = new LinkedHashMap<Object, TopicsInfo>();
        if (this.subscriptionUpdates.hasUpdates()) {
            for (Map.Entry<Object, Object> entry : this.nodeToSourcePatterns.entrySet()) {
                SourceNodeFactory sourceNode = (SourceNodeFactory)this.nodeFactories.get(entry.getKey());
                this.nodeToSourceTopics.put((String)entry.getKey(), sourceNode.getTopics(this.subscriptionUpdates.getUpdates()));
            }
        }
        if (this.nodeGroups == null) {
            this.nodeGroups = this.makeNodeGroups();
        }
        for (Map.Entry<Object, Object> entry : this.nodeGroups.entrySet()) {
            HashSet<String> sinkTopics = new HashSet<String>();
            HashSet<String> sourceTopics = new HashSet<String>();
            HashMap<String, InternalTopicConfig> internalSourceTopics = new HashMap<String, InternalTopicConfig>();
            HashMap<String, InternalTopicConfig> stateChangelogTopics = new HashMap<String, InternalTopicConfig>();
            for (String node : (Set)entry.getValue()) {
                String topic;
                String[] topics = this.nodeToSourceTopics.get(node);
                if (topics != null) {
                    for (String topic2 : topics) {
                        if (this.internalTopicNames.contains(topic2)) {
                            String internalTopic = this.decorateTopic(topic2);
                            internalSourceTopics.put(internalTopic, new InternalTopicConfig(internalTopic, Collections.singleton(InternalTopicConfig.CleanupPolicy.delete), Collections.emptyMap()));
                            sourceTopics.add(internalTopic);
                            continue;
                        }
                        sourceTopics.add(topic2);
                    }
                }
                if ((topic = this.nodeToSinkTopic.get(node)) != null) {
                    if (this.internalTopicNames.contains(topic)) {
                        sinkTopics.add(this.decorateTopic(topic));
                    } else {
                        sinkTopics.add(topic);
                    }
                }
                for (StateStoreFactory stateFactory : this.stateFactories.values()) {
                    StateStoreSupplier supplier = stateFactory.supplier;
                    if (!supplier.loggingEnabled() || !stateFactory.users.contains(node)) continue;
                    String name = ProcessorStateManager.storeChangelogTopic(this.applicationId, supplier.name());
                    InternalTopicConfig internalTopicConfig = this.createInternalTopicConfig(supplier, name);
                    stateChangelogTopics.put(name, internalTopicConfig);
                }
            }
            topicGroups.put(entry.getKey(), new TopicsInfo(Collections.unmodifiableSet(sinkTopics), Collections.unmodifiableSet(sourceTopics), Collections.unmodifiableMap(internalSourceTopics), Collections.unmodifiableMap(stateChangelogTopics)));
        }
        return Collections.unmodifiableMap(topicGroups);
    }

    private InternalTopicConfig createInternalTopicConfig(StateStoreSupplier supplier, String name) {
        if (!(supplier instanceof RocksDBWindowStoreSupplier)) {
            return new InternalTopicConfig(name, Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), supplier.logConfig());
        }
        RocksDBWindowStoreSupplier windowStoreSupplier = (RocksDBWindowStoreSupplier)supplier;
        InternalTopicConfig config = new InternalTopicConfig(name, Utils.mkSet((Object[])new InternalTopicConfig.CleanupPolicy[]{InternalTopicConfig.CleanupPolicy.compact, InternalTopicConfig.CleanupPolicy.delete}), supplier.logConfig());
        config.setRetentionMs(windowStoreSupplier.retentionPeriod());
        return config;
    }

    public synchronized Set<String> sourceTopics() {
        Set<String> topics = this.maybeDecorateInternalSourceTopics(this.sourceTopicNames);
        return Collections.unmodifiableSet(topics);
    }

    public Map<String, Set<String>> stateStoreNameToSourceTopics() {
        HashMap<String, Set<String>> results = new HashMap<String, Set<String>>();
        for (Map.Entry<String, Set<String>> entry : this.stateStoreNameToSourceTopics.entrySet()) {
            results.put(entry.getKey(), this.maybeDecorateInternalSourceTopics(entry.getValue()));
        }
        return results;
    }

    public synchronized Collection<Set<String>> copartitionGroups() {
        ArrayList list = new ArrayList(this.copartitionSourceGroups.size());
        for (Set<String> nodeNames : this.copartitionSourceGroups) {
            HashSet<String> copartitionGroup = new HashSet<String>();
            for (String node : nodeNames) {
                String[] topics = this.nodeToSourceTopics.get(node);
                if (topics == null) continue;
                copartitionGroup.addAll(this.maybeDecorateInternalSourceTopics(topics));
            }
            list.add(Collections.unmodifiableSet(copartitionGroup));
        }
        return Collections.unmodifiableList(list);
    }

    private Set<String> maybeDecorateInternalSourceTopics(Set<String> sourceTopics) {
        return this.maybeDecorateInternalSourceTopics(sourceTopics.toArray(new String[sourceTopics.size()]));
    }

    private Set<String> maybeDecorateInternalSourceTopics(String ... sourceTopics) {
        HashSet<String> decoratedTopics = new HashSet<String>();
        for (String topic : sourceTopics) {
            if (this.internalTopicNames.contains(topic)) {
                decoratedTopics.add(this.decorateTopic(topic));
                continue;
            }
            decoratedTopics.add(topic);
        }
        return decoratedTopics;
    }

    private String decorateTopic(String topic) {
        if (this.applicationId == null) {
            throw new TopologyBuilderException("there are internal topics and applicationId hasn't been set. Call setApplicationId first");
        }
        return this.applicationId + "-" + topic;
    }

    public synchronized Pattern sourceTopicPattern() {
        if (this.topicPattern == null && !this.nodeToSourcePatterns.isEmpty()) {
            StringBuilder builder = new StringBuilder();
            for (Pattern pattern : this.nodeToSourcePatterns.values()) {
                builder.append(pattern.pattern()).append("|");
            }
            if (!this.nodeToSourceTopics.isEmpty()) {
                Iterator<Pattern> i$ = this.nodeToSourceTopics.values().iterator();
                while (i$.hasNext()) {
                    String[] topics;
                    for (String topic : topics = (String[])i$.next()) {
                        builder.append(topic).append("|");
                    }
                }
            }
            builder.setLength(builder.length() - 1);
            this.topicPattern = Pattern.compile(builder.toString());
        }
        return this.topicPattern;
    }

    public synchronized void updateSubscriptions(StreamPartitionAssignor.SubscriptionUpdates subscriptionUpdates) {
        this.subscriptionUpdates = subscriptionUpdates;
    }

    public static class TopicsInfo {
        public Set<String> sinkTopics;
        public Set<String> sourceTopics;
        public Map<String, InternalTopicConfig> interSourceTopics;
        public Map<String, InternalTopicConfig> stateChangelogTopics;

        public TopicsInfo(Set<String> sinkTopics, Set<String> sourceTopics, Map<String, InternalTopicConfig> interSourceTopics, Map<String, InternalTopicConfig> stateChangelogTopics) {
            this.sinkTopics = sinkTopics;
            this.sourceTopics = sourceTopics;
            this.interSourceTopics = interSourceTopics;
            this.stateChangelogTopics = stateChangelogTopics;
        }

        public boolean equals(Object o) {
            if (o instanceof TopicsInfo) {
                TopicsInfo other = (TopicsInfo)o;
                return other.sourceTopics.equals(this.sourceTopics) && other.stateChangelogTopics.equals(this.stateChangelogTopics);
            }
            return false;
        }

        public int hashCode() {
            long n = (long)this.sourceTopics.hashCode() << 32 | (long)this.stateChangelogTopics.hashCode();
            return (int)(n % 0xFFFFFFFFL);
        }

        public String toString() {
            return "TopicsInfo{sinkTopics=" + this.sinkTopics + ", sourceTopics=" + this.sourceTopics + ", interSourceTopics=" + this.interSourceTopics + ", stateChangelogTopics=" + this.stateChangelogTopics + '}';
        }
    }

    private class SinkNodeFactory
    extends NodeFactory {
        public final String[] parents;
        public final String topic;
        private Serializer keySerializer;
        private Serializer valSerializer;
        private final StreamPartitioner partitioner;

        private SinkNodeFactory(String name, String[] parents, String topic, Serializer keySerializer, Serializer valSerializer, StreamPartitioner partitioner) {
            super(name);
            this.parents = (String[])parents.clone();
            this.topic = topic;
            this.keySerializer = keySerializer;
            this.valSerializer = valSerializer;
            this.partitioner = partitioner;
        }

        @Override
        public ProcessorNode build() {
            if (TopologyBuilder.this.internalTopicNames.contains(this.topic)) {
                return new SinkNode(this.name, TopologyBuilder.this.decorateTopic(this.topic), this.keySerializer, this.valSerializer, this.partitioner);
            }
            return new SinkNode(this.name, this.topic, this.keySerializer, this.valSerializer, this.partitioner);
        }
    }

    private class SourceNodeFactory
    extends NodeFactory {
        private final String[] topics;
        public final Pattern pattern;
        private Deserializer keyDeserializer;
        private Deserializer valDeserializer;

        private SourceNodeFactory(String name, String[] topics, Pattern pattern, Deserializer keyDeserializer, Deserializer valDeserializer) {
            super(name);
            this.topics = topics != null ? (String[])topics.clone() : null;
            this.pattern = pattern;
            this.keyDeserializer = keyDeserializer;
            this.valDeserializer = valDeserializer;
        }

        public String[] getTopics() {
            return this.topics;
        }

        public String[] getTopics(Collection<String> subscribedTopics) {
            ArrayList<String> matchedTopics = new ArrayList<String>();
            for (String update : subscribedTopics) {
                if (this.pattern == TopologyBuilder.this.topicToPatterns.get(update)) {
                    matchedTopics.add(update);
                    continue;
                }
                if (TopologyBuilder.this.topicToPatterns.containsKey(update) && this.isMatch(update)) {
                    throw new TopologyBuilderException("Topic " + update + " is already matched for another regex pattern " + TopologyBuilder.this.topicToPatterns.get(update) + " and hence cannot be matched to this regex pattern " + this.pattern + " any more.");
                }
                if (!this.isMatch(update)) continue;
                TopologyBuilder.this.topicToPatterns.put(update, this.pattern);
                matchedTopics.add(update);
            }
            return matchedTopics.toArray(new String[matchedTopics.size()]);
        }

        @Override
        public ProcessorNode build() {
            return new SourceNode(this.name, (String[])TopologyBuilder.this.nodeToSourceTopics.get(this.name), this.keyDeserializer, this.valDeserializer);
        }

        private boolean isMatch(String topic) {
            return this.pattern.matcher(topic).matches();
        }
    }

    private static class ProcessorNodeFactory
    extends NodeFactory {
        public final String[] parents;
        private final ProcessorSupplier supplier;
        private final Set<String> stateStoreNames = new HashSet<String>();

        public ProcessorNodeFactory(String name, String[] parents, ProcessorSupplier supplier) {
            super(name);
            this.parents = (String[])parents.clone();
            this.supplier = supplier;
        }

        public void addStateStore(String stateStoreName) {
            this.stateStoreNames.add(stateStoreName);
        }

        @Override
        public ProcessorNode build() {
            return new ProcessorNode(this.name, this.supplier.get(), this.stateStoreNames);
        }
    }

    private static abstract class NodeFactory {
        public final String name;

        NodeFactory(String name) {
            this.name = name;
        }

        public abstract ProcessorNode build();
    }

    private static class StateStoreFactory {
        public final Set<String> users;
        public final StateStoreSupplier supplier;

        StateStoreFactory(StateStoreSupplier supplier) {
            this.supplier = supplier;
            this.users = new HashSet<String>();
        }
    }
}

