/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.kafka.listener;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.support.TopicPartitionInitialOffset;
import org.springframework.util.Assert;

public class ConcurrentMessageListenerContainer<K, V>
extends AbstractMessageListenerContainer<K, V> {
    private final List<KafkaMessageListenerContainer<K, V>> containers = new ArrayList<KafkaMessageListenerContainer<K, V>>();
    private int concurrency = 1;

    public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, ContainerProperties containerProperties) {
        super(consumerFactory, containerProperties);
        Assert.notNull(consumerFactory, (String)"A ConsumerFactory must be provided");
    }

    public int getConcurrency() {
        return this.concurrency;
    }

    public void setConcurrency(int concurrency) {
        Assert.isTrue((concurrency > 0 ? 1 : 0) != 0, (String)"concurrency must be greater than 0");
        this.concurrency = concurrency;
    }

    public List<KafkaMessageListenerContainer<K, V>> getContainers() {
        return Collections.unmodifiableList(this.containers);
    }

    @Override
    public Collection<TopicPartition> getAssignedPartitions() {
        return this.containers.stream().map(KafkaMessageListenerContainer::getAssignedPartitions).filter(Objects::nonNull).flatMap(assignedPartitions -> assignedPartitions.stream()).collect(Collectors.toList());
    }

    @Override
    public boolean isContainerPaused() {
        boolean paused = this.isPaused();
        if (paused) {
            for (AbstractMessageListenerContainer abstractMessageListenerContainer : this.containers) {
                if (abstractMessageListenerContainer.isContainerPaused()) continue;
                return false;
            }
        }
        return paused;
    }

    @Override
    public Map<String, Map<MetricName, ? extends Metric>> metrics() {
        HashMap<String, Map<MetricName, Metric>> metrics = new HashMap<String, Map<MetricName, Metric>>();
        for (KafkaMessageListenerContainer<K, V> container : this.containers) {
            metrics.putAll(container.metrics());
        }
        return Collections.unmodifiableMap(metrics);
    }

    @Override
    protected void doStart() {
        if (!this.isRunning()) {
            ContainerProperties containerProperties = this.getContainerProperties();
            TopicPartitionInitialOffset[] topicPartitions = containerProperties.getTopicPartitions();
            if (topicPartitions != null && this.concurrency > topicPartitions.length) {
                this.logger.warn((Object)("When specific partitions are provided, the concurrency must be less than or equal to the number of partitions; reduced from " + this.concurrency + " to " + topicPartitions.length));
                this.concurrency = topicPartitions.length;
            }
            this.setRunning(true);
            for (int i = 0; i < this.concurrency; ++i) {
                KafkaMessageListenerContainer container = topicPartitions == null ? new KafkaMessageListenerContainer(this, this.consumerFactory, containerProperties) : new KafkaMessageListenerContainer(this, this.consumerFactory, containerProperties, this.partitionSubset(containerProperties, i));
                String beanName = this.getBeanName();
                container.setBeanName((beanName != null ? beanName : "consumer") + "-" + i);
                if (this.getApplicationEventPublisher() != null) {
                    container.setApplicationEventPublisher(this.getApplicationEventPublisher());
                }
                container.setClientIdSuffix("-" + i);
                container.start();
                this.containers.add(container);
            }
        }
    }

    private TopicPartitionInitialOffset[] partitionSubset(ContainerProperties containerProperties, int i) {
        TopicPartitionInitialOffset[] topicPartitions = containerProperties.getTopicPartitions();
        if (this.concurrency == 1) {
            return topicPartitions;
        }
        int numPartitions = topicPartitions.length;
        if (numPartitions == this.concurrency) {
            return new TopicPartitionInitialOffset[]{topicPartitions[i]};
        }
        int perContainer = numPartitions / this.concurrency;
        TopicPartitionInitialOffset[] subset = i == this.concurrency - 1 ? Arrays.copyOfRange(topicPartitions, i * perContainer, topicPartitions.length) : Arrays.copyOfRange(topicPartitions, i * perContainer, (i + 1) * perContainer);
        return subset;
    }

    @Override
    protected void doStop(final Runnable callback) {
        final AtomicInteger count = new AtomicInteger();
        if (this.isRunning()) {
            this.setRunning(false);
            for (KafkaMessageListenerContainer<K, V> container : this.containers) {
                if (!container.isRunning()) continue;
                count.incrementAndGet();
            }
            for (KafkaMessageListenerContainer<K, V> container : this.containers) {
                if (!container.isRunning()) continue;
                container.stop(new Runnable(){

                    @Override
                    public void run() {
                        if (count.decrementAndGet() <= 0) {
                            callback.run();
                        }
                    }
                });
            }
            this.containers.clear();
        }
    }

    @Override
    public void pause() {
        super.pause();
        this.containers.forEach(c -> c.pause());
    }

    @Override
    public void resume() {
        super.resume();
        this.containers.forEach(c -> c.resume());
    }

    public String toString() {
        return "ConcurrentMessageListenerContainer [concurrency=" + this.concurrency + ", beanName=" + this.getBeanName() + ", running=" + this.isRunning() + "]";
    }
}

