/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.kafka.listener;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.jspecify.annotations.Nullable;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.event.ConcurrentContainerStoppedEvent;
import org.springframework.kafka.event.ConsumerStoppedEvent;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.util.Assert;

public class ConcurrentMessageListenerContainer<K, V>
extends AbstractMessageListenerContainer<K, V> {
    private final List<KafkaMessageListenerContainer<K, V>> containers = new ArrayList<KafkaMessageListenerContainer<K, V>>();
    private final List<AsyncTaskExecutor> executors = new ArrayList<AsyncTaskExecutor>();
    private final AtomicInteger startedContainers = new AtomicInteger();
    private int concurrency = 1;
    private boolean alwaysClientIdSuffix = true;
    private volatile  @Nullable ConsumerStoppedEvent.Reason reason;

    public ConcurrentMessageListenerContainer(@Nullable ConsumerFactory<? super K, ? super V> consumerFactory, ContainerProperties containerProperties) {
        super(consumerFactory, containerProperties);
        Assert.notNull(consumerFactory, (String)"A ConsumerFactory must be provided");
    }

    public int getConcurrency() {
        return this.concurrency;
    }

    public void setConcurrency(int concurrency) {
        Assert.isTrue((concurrency > 0 ? 1 : 0) != 0, (String)"concurrency must be greater than 0");
        this.concurrency = concurrency;
    }

    public void setAlwaysClientIdSuffix(boolean alwaysClientIdSuffix) {
        this.alwaysClientIdSuffix = alwaysClientIdSuffix;
    }

    public List<KafkaMessageListenerContainer<K, V>> getContainers() {
        this.lifecycleLock.lock();
        try {
            List<KafkaMessageListenerContainer<K, V>> list = List.copyOf(this.containers);
            return list;
        }
        finally {
            this.lifecycleLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public MessageListenerContainer getContainerFor(String topic, int partition) {
        this.lifecycleLock.lock();
        try {
            for (KafkaMessageListenerContainer<K, V> container : this.containers) {
                Collection<TopicPartition> assignedPartitions = container.getAssignedPartitions();
                if (assignedPartitions == null) continue;
                for (TopicPartition part : assignedPartitions) {
                    if (!part.topic().equals(topic) || part.partition() != partition) continue;
                    KafkaMessageListenerContainer<K, V> kafkaMessageListenerContainer = container;
                    return kafkaMessageListenerContainer;
                }
            }
            ConcurrentMessageListenerContainer concurrentMessageListenerContainer = this;
            return concurrentMessageListenerContainer;
        }
        finally {
            this.lifecycleLock.unlock();
        }
    }

    @Override
    public Collection<TopicPartition> getAssignedPartitions() {
        this.lifecycleLock.lock();
        try {
            List<TopicPartition> list = this.containers.stream().map(KafkaMessageListenerContainer::getAssignedPartitions).filter(Objects::nonNull).flatMap(Collection::stream).toList();
            return list;
        }
        finally {
            this.lifecycleLock.unlock();
        }
    }

    @Override
    public Map<String, Collection<TopicPartition>> getAssignmentsByClientId() {
        this.lifecycleLock.lock();
        try {
            HashMap<String, Collection<TopicPartition>> assignments = new HashMap<String, Collection<TopicPartition>>();
            this.containers.forEach(container -> {
                Map<String, Collection<TopicPartition>> byClientId = container.getAssignmentsByClientId();
                if (byClientId != null) {
                    assignments.putAll(byClientId);
                }
            });
            HashMap<String, Collection<TopicPartition>> hashMap = assignments;
            return hashMap;
        }
        finally {
            this.lifecycleLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean isContainerPaused() {
        this.lifecycleLock.lock();
        try {
            boolean paused = this.isPauseRequested();
            if (paused) {
                for (AbstractMessageListenerContainer abstractMessageListenerContainer : this.containers) {
                    if (abstractMessageListenerContainer.isContainerPaused()) continue;
                    boolean bl = false;
                    return bl;
                }
            }
            boolean bl = paused;
            return bl;
        }
        finally {
            this.lifecycleLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean isChildRunning() {
        this.lifecycleLock.lock();
        try {
            for (MessageListenerContainer messageListenerContainer : this.containers) {
                if (!messageListenerContainer.isRunning()) continue;
                boolean bl = true;
                return bl;
            }
            if (this.startedContainers.get() > 0) {
                boolean bl = true;
                return bl;
            }
        }
        finally {
            this.lifecycleLock.unlock();
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Map<String, Map<MetricName, ? extends Metric>> metrics() {
        this.lifecycleLock.lock();
        try {
            HashMap<String, Map<MetricName, Metric>> metrics = new HashMap<String, Map<MetricName, Metric>>();
            for (KafkaMessageListenerContainer<K, V> container : this.containers) {
                metrics.putAll(container.metrics());
            }
            Map map = Collections.unmodifiableMap(metrics);
            return map;
        }
        finally {
            this.lifecycleLock.unlock();
        }
    }

    @Override
    protected void doStart() {
        if (!this.isRunning()) {
            this.checkTopics();
            ContainerProperties containerProperties = this.getContainerProperties();
            @Nullable TopicPartitionOffset @Nullable [] topicPartitions = containerProperties.getTopicPartitions();
            if (topicPartitions != null && this.concurrency > topicPartitions.length) {
                this.logger.warn(() -> "When specific partitions are provided, the concurrency must be less than or equal to the number of partitions; reduced from " + this.concurrency + " to " + topicPartitions.length);
                this.concurrency = topicPartitions.length;
            }
            this.clearState();
            this.setRunning(true);
            for (int i = 0; i < this.concurrency; ++i) {
                KafkaMessageListenerContainer<K, V> container = this.constructContainer(containerProperties, topicPartitions, i);
                this.configureChildContainer(i, container);
                if (this.isPauseRequested()) {
                    container.pause();
                }
                container.start();
                this.containers.add(container);
            }
        }
    }

    private void configureChildContainer(int index, KafkaMessageListenerContainer<K, V> container) {
        ApplicationEventPublisher publisher;
        Object beanName = this.getBeanName();
        beanName = (String)(beanName == null ? "consumer" : beanName) + "-" + index;
        container.setBeanName((String)beanName);
        ApplicationContext applicationContext = this.getApplicationContext();
        if (applicationContext != null) {
            container.setApplicationContext(applicationContext);
        }
        if ((publisher = this.getApplicationEventPublisher()) != null) {
            container.setApplicationEventPublisher(publisher);
        }
        container.setClientIdSuffix((String)(this.concurrency > 1 || this.alwaysClientIdSuffix ? "-" + index : ""));
        container.setCommonErrorHandler(this.getCommonErrorHandler());
        container.setAfterRollbackProcessor(this.getAfterRollbackProcessor());
        container.setRecordInterceptor(this.getRecordInterceptor());
        container.setBatchInterceptor(this.getBatchInterceptor());
        container.setInterceptBeforeTx(this.isInterceptBeforeTx());
        container.setListenerInfo(this.getListenerInfo());
        container.setEmergencyStop(() -> this.stopAbnormally(() -> {}));
        AsyncTaskExecutor exec = container.getContainerProperties().getListenerTaskExecutor();
        if (exec == null) {
            if (this.executors.size() > index) {
                exec = this.executors.get(index);
            } else {
                exec = new SimpleAsyncTaskExecutor((String)beanName + "-C-");
                this.executors.add(exec);
            }
            container.getContainerProperties().setListenerTaskExecutor(exec);
        }
    }

    private KafkaMessageListenerContainer<K, V> constructContainer(ContainerProperties containerProperties, @Nullable TopicPartitionOffset @Nullable [] topicPartitions, int i) {
        KafkaMessageListenerContainer container = topicPartitions == null ? new KafkaMessageListenerContainer(this, this.consumerFactory, containerProperties) : new KafkaMessageListenerContainer(this, this.consumerFactory, containerProperties, this.partitionSubset(containerProperties, i));
        return container;
    }

    private @Nullable TopicPartitionOffset @Nullable [] partitionSubset(ContainerProperties containerProperties, int index) {
        @Nullable TopicPartitionOffset @Nullable [] topicPartitions = containerProperties.getTopicPartitions();
        if (topicPartitions == null) {
            return null;
        }
        if (this.concurrency == 1) {
            return topicPartitions;
        }
        int numPartitions = topicPartitions.length;
        if (numPartitions == this.concurrency) {
            return new TopicPartitionOffset[]{topicPartitions[index]};
        }
        int perContainer = numPartitions / this.concurrency;
        int start = index * perContainer;
        int end = index == this.concurrency - 1 ? numPartitions : start + perContainer;
        return Arrays.copyOfRange(topicPartitions, start, end);
    }

    @Override
    protected void doStop(Runnable callback, boolean normal) {
        AtomicInteger count = new AtomicInteger();
        if (this.isRunning()) {
            boolean childRunning = this.isChildRunning();
            this.setRunning(false);
            if (!childRunning) {
                callback.run();
            }
            for (KafkaMessageListenerContainer<K, V> container : this.containers) {
                if (!container.isRunning()) continue;
                count.incrementAndGet();
            }
            for (KafkaMessageListenerContainer<K, V> container : this.containers) {
                container.setFenced(true);
                if (!container.isRunning()) continue;
                if (normal) {
                    container.stop(() -> {
                        if (count.decrementAndGet() <= 0) {
                            callback.run();
                        }
                    });
                    continue;
                }
                container.stopAbnormally(() -> {
                    if (count.decrementAndGet() <= 0) {
                        callback.run();
                    }
                });
            }
            this.setStoppedNormally(normal);
            if (this.startedContainers.get() == 0) {
                this.publishConcurrentContainerStoppedEvent(this.reason);
            }
        }
    }

    @Override
    public void childStarted(MessageListenerContainer child) {
        this.lifecycleLock.lock();
        try {
            if (this.containers.contains(child)) {
                this.startedContainers.incrementAndGet();
            }
        }
        finally {
            this.lifecycleLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void childStopped(MessageListenerContainer child, ConsumerStoppedEvent.Reason reason) {
        this.lifecycleLock.lock();
        try {
            int startedContainersCount;
            if (!this.containers.contains(child)) {
                return;
            }
            if (this.reason == null || reason.equals((Object)ConsumerStoppedEvent.Reason.AUTH)) {
                this.reason = reason;
            }
            if ((startedContainersCount = this.startedContainers.decrementAndGet()) == 0) {
                if (!this.isRunning()) {
                    this.containers.clear();
                    this.publishConcurrentContainerStoppedEvent(this.reason);
                }
                boolean restartContainer = ConsumerStoppedEvent.Reason.AUTH.equals((Object)this.reason) && this.getContainerProperties().isRestartAfterAuthExceptions();
                this.reason = null;
                if (restartContainer) {
                    AsyncTaskExecutor exec = this.getContainerProperties().getListenerTaskExecutor();
                    if (exec == null) {
                        exec = new SimpleAsyncTaskExecutor(this.getListenerId() + ".authRestart");
                    }
                    exec.execute(this::start);
                }
            }
        }
        finally {
            this.lifecycleLock.unlock();
        }
    }

    private void publishConcurrentContainerStoppedEvent( @Nullable ConsumerStoppedEvent.Reason reason) {
        ApplicationEventPublisher eventPublisher = this.getApplicationEventPublisher();
        if (eventPublisher != null) {
            eventPublisher.publishEvent((ApplicationEvent)new ConcurrentContainerStoppedEvent((Object)this, reason));
        }
    }

    @Override
    public void enforceRebalance() {
        this.lifecycleLock.lock();
        try {
            KafkaMessageListenerContainer<K, V> listenerContainer = this.containers.get(0);
            listenerContainer.enforceRebalance();
        }
        finally {
            this.lifecycleLock.unlock();
        }
    }

    @Override
    public void pause() {
        this.lifecycleLock.lock();
        try {
            super.pause();
            this.containers.forEach(AbstractMessageListenerContainer::pause);
        }
        finally {
            this.lifecycleLock.unlock();
        }
    }

    @Override
    public void resume() {
        this.lifecycleLock.lock();
        try {
            super.resume();
            this.containers.forEach(AbstractMessageListenerContainer::resume);
        }
        finally {
            this.lifecycleLock.unlock();
        }
    }

    @Override
    public void pausePartition(TopicPartition topicPartition) {
        this.lifecycleLock.lock();
        try {
            this.containers.stream().filter(container -> this.containsPartition(topicPartition, (KafkaMessageListenerContainer<K, V>)container)).forEach(container -> container.pausePartition(topicPartition));
        }
        finally {
            this.lifecycleLock.unlock();
        }
    }

    @Override
    public void resumePartition(TopicPartition topicPartition) {
        this.lifecycleLock.lock();
        try {
            this.containers.stream().filter(container -> container.isPartitionPauseRequested(topicPartition)).forEach(container -> container.resumePartition(topicPartition));
        }
        finally {
            this.lifecycleLock.unlock();
        }
    }

    @Override
    public boolean isPartitionPaused(TopicPartition topicPartition) {
        this.lifecycleLock.lock();
        try {
            boolean bl = this.containers.stream().anyMatch(container -> container.isPartitionPaused(topicPartition));
            return bl;
        }
        finally {
            this.lifecycleLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean isInExpectedState() {
        this.lifecycleLock.lock();
        try {
            boolean isInExpectedState;
            boolean bl = isInExpectedState = this.isRunning() || this.isStoppedNormally();
            if (isInExpectedState) {
                for (KafkaMessageListenerContainer<K, V> container : this.containers) {
                    if (container.isInExpectedState()) continue;
                    boolean bl2 = false;
                    return bl2;
                }
            }
            boolean bl3 = isInExpectedState;
            return bl3;
        }
        finally {
            this.lifecycleLock.unlock();
        }
    }

    private boolean containsPartition(TopicPartition topicPartition, KafkaMessageListenerContainer<K, V> container) {
        Collection<TopicPartition> assignedPartitions = container.getAssignedPartitions();
        return assignedPartitions != null && assignedPartitions.contains(topicPartition);
    }

    private void clearState() {
        this.containers.clear();
        this.startedContainers.set(0);
        this.reason = null;
    }

    public String toString() {
        return "ConcurrentMessageListenerContainer [concurrency=" + this.concurrency + ", beanName=" + this.getBeanName() + ", running=" + this.isRunning() + "]";
    }
}

