/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.binder.kafka;

import java.io.PrintWriter;
import java.io.StringWriter;
import java.io.Writer;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
import org.springframework.cloud.stream.binder.BinderHeaders;
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
import org.springframework.cloud.stream.binder.ConsumerProperties;
import org.springframework.cloud.stream.binder.DefaultPollableMessageSource;
import org.springframework.cloud.stream.binder.EmbeddedHeaderUtils;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
import org.springframework.cloud.stream.binder.HeaderMode;
import org.springframework.cloud.stream.binder.MessageValues;
import org.springframework.cloud.stream.binder.kafka.BinderHeaderMapper;
import org.springframework.cloud.stream.binder.kafka.KafkaBindingRebalanceListener;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaExtendedBindingProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.cloud.stream.binding.MessageConverterConfigurer;
import org.springframework.cloud.stream.config.ListenerContainerCustomizer;
import org.springframework.cloud.stream.config.MessageSourceCustomizer;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.cloud.stream.provisioning.ProvisioningProvider;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.Lifecycle;
import org.springframework.expression.Expression;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.integration.StaticMessageHeaderAccessor;
import org.springframework.integration.acks.AcknowledgmentCallback;
import org.springframework.integration.channel.ChannelInterceptorAware;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter;
import org.springframework.integration.kafka.inbound.KafkaMessageSource;
import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
import org.springframework.integration.kafka.support.RawRecordHeaderErrorMessageStrategy;
import org.springframework.integration.support.ErrorMessageStrategy;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.support.KafkaHeaderMapper;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.SendResult;
import org.springframework.kafka.support.TopicPartitionInitialOffset;
import org.springframework.kafka.support.converter.MessageConverter;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.transaction.KafkaTransactionManager;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.retry.RecoveryCallback;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

public class KafkaMessageChannelBinder
extends AbstractMessageChannelBinder<ExtendedConsumerProperties<KafkaConsumerProperties>, ExtendedProducerProperties<KafkaProducerProperties>, KafkaTopicProvisioner>
implements ExtendedPropertiesBinder<MessageChannel, KafkaConsumerProperties, KafkaProducerProperties> {
    public static final String X_EXCEPTION_FQCN = "x-exception-fqcn";
    public static final String X_EXCEPTION_STACKTRACE = "x-exception-stacktrace";
    public static final String X_EXCEPTION_MESSAGE = "x-exception-message";
    public static final String X_ORIGINAL_TOPIC = "x-original-topic";
    public static final String X_ORIGINAL_PARTITION = "x-original-partition";
    public static final String X_ORIGINAL_OFFSET = "x-original-offset";
    public static final String X_ORIGINAL_TIMESTAMP = "x-original-timestamp";
    public static final String X_ORIGINAL_TIMESTAMP_TYPE = "x-original-timestamp-type";
    private static final ThreadLocal<String> bindingNameHolder = new ThreadLocal();
    private final KafkaBinderConfigurationProperties configurationProperties;
    private final Map<String, TopicInformation> topicsInUse = new ConcurrentHashMap<String, TopicInformation>();
    private final KafkaTransactionManager<byte[], byte[]> transactionManager;
    private final KafkaBindingRebalanceListener rebalanceListener;
    private ProducerListener<byte[], byte[]> producerListener;
    private KafkaExtendedBindingProperties extendedBindingProperties = new KafkaExtendedBindingProperties();

    public KafkaMessageChannelBinder(KafkaBinderConfigurationProperties configurationProperties, KafkaTopicProvisioner provisioningProvider) {
        this(configurationProperties, provisioningProvider, null, null, null);
    }

    public KafkaMessageChannelBinder(KafkaBinderConfigurationProperties configurationProperties, KafkaTopicProvisioner provisioningProvider, ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> containerCustomizer, KafkaBindingRebalanceListener rebalanceListener) {
        this(configurationProperties, provisioningProvider, containerCustomizer, null, rebalanceListener);
    }

    public KafkaMessageChannelBinder(KafkaBinderConfigurationProperties configurationProperties, KafkaTopicProvisioner provisioningProvider, ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> containerCustomizer, MessageSourceCustomizer<KafkaMessageSource<?, ?>> sourceCustomizer, KafkaBindingRebalanceListener rebalanceListener) {
        super(KafkaMessageChannelBinder.headersToMap(configurationProperties), (ProvisioningProvider)provisioningProvider, containerCustomizer, sourceCustomizer);
        this.configurationProperties = configurationProperties;
        this.transactionManager = StringUtils.hasText((String)configurationProperties.getTransaction().getTransactionIdPrefix()) ? new KafkaTransactionManager(this.getProducerFactory(configurationProperties.getTransaction().getTransactionIdPrefix(), (ExtendedProducerProperties<KafkaProducerProperties>)new ExtendedProducerProperties((Object)configurationProperties.getTransaction().getProducer().getExtension()))) : null;
        this.rebalanceListener = rebalanceListener;
    }

    private static String[] headersToMap(KafkaBinderConfigurationProperties configurationProperties) {
        String[] headersToMap;
        if (ObjectUtils.isEmpty((Object[])configurationProperties.getHeaders())) {
            headersToMap = BinderHeaders.STANDARD_HEADERS;
        } else {
            String[] combinedHeadersToMap = Arrays.copyOfRange(BinderHeaders.STANDARD_HEADERS, 0, BinderHeaders.STANDARD_HEADERS.length + configurationProperties.getHeaders().length);
            System.arraycopy(configurationProperties.getHeaders(), 0, combinedHeadersToMap, BinderHeaders.STANDARD_HEADERS.length, configurationProperties.getHeaders().length);
            headersToMap = combinedHeadersToMap;
        }
        return headersToMap;
    }

    public void setExtendedBindingProperties(KafkaExtendedBindingProperties extendedBindingProperties) {
        this.extendedBindingProperties = extendedBindingProperties;
    }

    public void setProducerListener(ProducerListener<byte[], byte[]> producerListener) {
        this.producerListener = producerListener;
    }

    Map<String, TopicInformation> getTopicsInUse() {
        return this.topicsInUse;
    }

    public KafkaConsumerProperties getExtendedConsumerProperties(String channelName) {
        bindingNameHolder.set(channelName);
        return (KafkaConsumerProperties)this.extendedBindingProperties.getExtendedConsumerProperties(channelName);
    }

    public KafkaProducerProperties getExtendedProducerProperties(String channelName) {
        return (KafkaProducerProperties)this.extendedBindingProperties.getExtendedProducerProperties(channelName);
    }

    public String getDefaultsPrefix() {
        return this.extendedBindingProperties.getDefaultsPrefix();
    }

    public Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {
        return this.extendedBindingProperties.getExtendedPropertiesEntryClass();
    }

    protected MessageHandler createProducerMessageHandler(ProducerDestination destination, ExtendedProducerProperties<KafkaProducerProperties> producerProperties, MessageChannel errorChannel) throws Exception {
        throw new IllegalStateException("The abstract binder should not call this method");
    }

    protected MessageHandler createProducerMessageHandler(ProducerDestination destination, ExtendedProducerProperties<KafkaProducerProperties> producerProperties, MessageChannel channel, MessageChannel errorChannel) throws Exception {
        DefaultKafkaProducerFactory<byte[], byte[]> producerFB = this.transactionManager != null ? this.transactionManager.getProducerFactory() : this.getProducerFactory(null, producerProperties);
        Collection partitions = ((KafkaTopicProvisioner)this.provisioningProvider).getPartitionsForTopic(producerProperties.getPartitionCount(), false, () -> {
            Producer producer = producerFB.createProducer();
            List partitionsFor = producer.partitionsFor(destination.getName());
            producer.close();
            if (this.transactionManager == null) {
                ((DisposableBean)producerFB).destroy();
            }
            return partitionsFor;
        }, destination.getName());
        this.topicsInUse.put(destination.getName(), new TopicInformation(null, partitions, false));
        if (producerProperties.isPartitioned() && producerProperties.getPartitionCount() < partitions.size()) {
            if (this.logger.isInfoEnabled()) {
                this.logger.info((Object)("The `partitionCount` of the producer for topic " + destination.getName() + " is " + producerProperties.getPartitionCount() + ", smaller than the actual partition count of " + partitions.size() + " for the topic. The larger number will be used instead."));
            }
            List interceptors = ((ChannelInterceptorAware)channel).getChannelInterceptors();
            interceptors.forEach(interceptor -> {
                if (interceptor instanceof MessageConverterConfigurer.PartitioningInterceptor) {
                    ((MessageConverterConfigurer.PartitioningInterceptor)interceptor).setPartitionCount(partitions.size());
                }
            });
        }
        KafkaTemplate kafkaTemplate = new KafkaTemplate(producerFB);
        if (this.producerListener != null) {
            kafkaTemplate.setProducerListener(this.producerListener);
        }
        ProducerConfigurationMessageHandler handler = new ProducerConfigurationMessageHandler((KafkaTemplate<byte[], byte[]>)kafkaTemplate, destination.getName(), producerProperties, (ProducerFactory<byte[], byte[]>)producerFB);
        if (errorChannel != null) {
            handler.setSendFailureChannel(errorChannel);
        }
        Object mapper = null;
        if (this.configurationProperties.getHeaderMapperBeanName() != null) {
            mapper = (KafkaHeaderMapper)this.getApplicationContext().getBean(this.configurationProperties.getHeaderMapperBeanName(), KafkaHeaderMapper.class);
        }
        if (producerProperties.getHeaderMode() != null && !HeaderMode.headers.equals((Object)producerProperties.getHeaderMode())) {
            mapper = null;
        } else if (mapper == null) {
            String[] headerPatterns = ((KafkaProducerProperties)producerProperties.getExtension()).getHeaderPatterns();
            if (headerPatterns != null && headerPatterns.length > 0) {
                LinkedList<String> patterns = new LinkedList<String>(Arrays.asList(headerPatterns));
                if (!patterns.contains("!timestamp")) {
                    patterns.add(0, "!timestamp");
                }
                if (!patterns.contains("!id")) {
                    patterns.add(0, "!id");
                }
                mapper = new BinderHeaderMapper(patterns.toArray(new String[patterns.size()]));
            } else {
                mapper = new BinderHeaderMapper();
            }
        }
        handler.setHeaderMapper((KafkaHeaderMapper)mapper);
        return handler;
    }

    protected DefaultKafkaProducerFactory<byte[], byte[]> getProducerFactory(String transactionIdPrefix, ExtendedProducerProperties<KafkaProducerProperties> producerProperties) {
        HashMap<String, Object> props = new HashMap<String, Object>();
        props.put("retries", 0);
        props.put("buffer.memory", 0x2000000);
        props.put("key.serializer", ByteArraySerializer.class);
        props.put("value.serializer", ByteArraySerializer.class);
        props.put("acks", String.valueOf(this.configurationProperties.getRequiredAcks()));
        Map mergedConfig = this.configurationProperties.mergedProducerConfiguration();
        if (!ObjectUtils.isEmpty((Object)mergedConfig)) {
            props.putAll(mergedConfig);
        }
        if (ObjectUtils.isEmpty(props.get("bootstrap.servers"))) {
            props.put("bootstrap.servers", this.configurationProperties.getKafkaConnectionString());
        }
        if (ObjectUtils.isEmpty(props.get("batch.size"))) {
            props.put("batch.size", String.valueOf(((KafkaProducerProperties)producerProperties.getExtension()).getBufferSize()));
        }
        if (ObjectUtils.isEmpty(props.get("linger.ms"))) {
            props.put("linger.ms", String.valueOf(((KafkaProducerProperties)producerProperties.getExtension()).getBatchTimeout()));
        }
        if (ObjectUtils.isEmpty(props.get("compression.type"))) {
            props.put("compression.type", ((KafkaProducerProperties)producerProperties.getExtension()).getCompressionType().toString());
        }
        if (!ObjectUtils.isEmpty((Object)((KafkaProducerProperties)producerProperties.getExtension()).getConfiguration())) {
            props.putAll(((KafkaProducerProperties)producerProperties.getExtension()).getConfiguration());
        }
        DefaultKafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory(props);
        if (transactionIdPrefix != null) {
            producerFactory.setTransactionIdPrefix(transactionIdPrefix);
        }
        return producerFactory;
    }

    protected boolean useNativeEncoding(ExtendedProducerProperties<KafkaProducerProperties> producerProperties) {
        if (this.transactionManager != null) {
            return this.configurationProperties.getTransaction().getProducer().isUseNativeEncoding();
        }
        return super.useNativeEncoding(producerProperties);
    }

    protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group, ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties) {
        int concurrency;
        ContainerProperties containerProperties;
        String[] stringArray;
        boolean anonymous = !StringUtils.hasText((String)group);
        Assert.isTrue((!anonymous || !((KafkaConsumerProperties)extendedConsumerProperties.getExtension()).isEnableDlq() ? 1 : 0) != 0, (String)"DLQ support is not available for anonymous subscriptions");
        String consumerGroup = anonymous ? "anonymous." + UUID.randomUUID().toString() : group;
        ConsumerFactory<?, ?> consumerFactory = this.createKafkaConsumerFactory(anonymous, consumerGroup, extendedConsumerProperties);
        int partitionCount = extendedConsumerProperties.getInstanceCount() * extendedConsumerProperties.getConcurrency();
        ArrayList<PartitionInfo> listenedPartitions = new ArrayList<PartitionInfo>();
        boolean usingPatterns = ((KafkaConsumerProperties)extendedConsumerProperties.getExtension()).isDestinationIsPattern();
        Assert.isTrue((!usingPatterns || !extendedConsumerProperties.isMultiplex() ? 1 : 0) != 0, (String)"Cannot use a pattern with multiplexed destinations; use the regex pattern to specify multiple topics instead");
        boolean groupManagement = ((KafkaConsumerProperties)extendedConsumerProperties.getExtension()).isAutoRebalanceEnabled();
        if (!extendedConsumerProperties.isMultiplex()) {
            listenedPartitions.addAll(this.processTopic(consumerGroup, extendedConsumerProperties, consumerFactory, partitionCount, usingPatterns, groupManagement, destination.getName()));
        } else {
            for (String name : StringUtils.commaDelimitedListToStringArray((String)destination.getName())) {
                listenedPartitions.addAll(this.processTopic(consumerGroup, extendedConsumerProperties, consumerFactory, partitionCount, usingPatterns, groupManagement, name.trim()));
            }
        }
        if (extendedConsumerProperties.isMultiplex()) {
            stringArray = StringUtils.commaDelimitedListToStringArray((String)destination.getName());
        } else {
            String[] stringArray2 = new String[1];
            stringArray = stringArray2;
            stringArray2[0] = destination.getName();
        }
        String[] topics = stringArray;
        for (int i = 0; i < topics.length; ++i) {
            topics[i] = topics[i].trim();
        }
        if (!usingPatterns && !groupManagement) {
            Assert.isTrue((!CollectionUtils.isEmpty(listenedPartitions) ? 1 : 0) != 0, (String)"A list of partitions must be provided");
        }
        TopicPartitionInitialOffset[] topicPartitionInitialOffsets = this.getTopicPartitionInitialOffsets(listenedPartitions);
        ContainerProperties containerProperties2 = anonymous || ((KafkaConsumerProperties)extendedConsumerProperties.getExtension()).isAutoRebalanceEnabled() ? (usingPatterns ? new ContainerProperties(Pattern.compile(topics[0])) : new ContainerProperties(topics)) : (containerProperties = new ContainerProperties(topicPartitionInitialOffsets));
        if (this.transactionManager != null) {
            containerProperties.setTransactionManager(this.transactionManager);
        }
        if (this.rebalanceListener != null) {
            this.setupRebalanceListener(extendedConsumerProperties, containerProperties);
        }
        containerProperties.setIdleEventInterval(Long.valueOf(((KafkaConsumerProperties)extendedConsumerProperties.getExtension()).getIdleEventInterval()));
        int n = concurrency = usingPatterns ? extendedConsumerProperties.getConcurrency() : Math.min(extendedConsumerProperties.getConcurrency(), listenedPartitions.size());
        if (groupManagement && listenedPartitions.isEmpty()) {
            concurrency = extendedConsumerProperties.getConcurrency();
        }
        this.resetOffsets(extendedConsumerProperties, consumerFactory, groupManagement, containerProperties);
        ConcurrentMessageListenerContainer messageListenerContainer = new ConcurrentMessageListenerContainer(consumerFactory, containerProperties){

            public void stop(Runnable callback) {
                super.stop(callback);
            }
        };
        messageListenerContainer.setConcurrency(concurrency);
        if (this.getApplicationEventPublisher() != null) {
            messageListenerContainer.setApplicationEventPublisher(this.getApplicationEventPublisher());
        } else if (this.getApplicationContext() != null) {
            messageListenerContainer.setApplicationEventPublisher((ApplicationEventPublisher)this.getApplicationContext());
        }
        messageListenerContainer.setBeanName(destination + ".container");
        if (!((KafkaConsumerProperties)extendedConsumerProperties.getExtension()).isAutoCommitOffset()) {
            messageListenerContainer.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
            messageListenerContainer.getContainerProperties().setAckOnError(false);
        } else {
            messageListenerContainer.getContainerProperties().setAckOnError(this.isAutoCommitOnError(extendedConsumerProperties));
            if (((KafkaConsumerProperties)extendedConsumerProperties.getExtension()).isAckEachRecord()) {
                messageListenerContainer.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
            }
        }
        if (this.logger.isDebugEnabled()) {
            this.logger.debug((Object)("Listened partitions: " + StringUtils.collectionToCommaDelimitedString(listenedPartitions)));
        }
        this.getContainerCustomizer().configure((Object)messageListenerContainer, destination.getName(), group);
        KafkaMessageDrivenChannelAdapter kafkaMessageDrivenChannelAdapter = new KafkaMessageDrivenChannelAdapter((AbstractMessageListenerContainer)messageListenerContainer);
        kafkaMessageDrivenChannelAdapter.setMessageConverter((MessageConverter)this.getMessageConverter(extendedConsumerProperties));
        kafkaMessageDrivenChannelAdapter.setBeanFactory((BeanFactory)this.getBeanFactory());
        AbstractMessageChannelBinder.ErrorInfrastructure errorInfrastructure = this.registerErrorInfrastructure(destination, consumerGroup, (ConsumerProperties)extendedConsumerProperties);
        if (extendedConsumerProperties.getMaxAttempts() > 1) {
            kafkaMessageDrivenChannelAdapter.setRetryTemplate(this.buildRetryTemplate((ConsumerProperties)extendedConsumerProperties));
            kafkaMessageDrivenChannelAdapter.setRecoveryCallback((RecoveryCallback)errorInfrastructure.getRecoverer());
        } else {
            kafkaMessageDrivenChannelAdapter.setErrorChannel((MessageChannel)errorInfrastructure.getErrorChannel());
        }
        return kafkaMessageDrivenChannelAdapter;
    }

    public void setupRebalanceListener(ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties, ContainerProperties containerProperties) {
        Assert.isTrue((!((KafkaConsumerProperties)extendedConsumerProperties.getExtension()).isResetOffsets() ? 1 : 0) != 0, (String)"'resetOffsets' cannot be set when a KafkaBindingRebalanceListener is provided");
        final String bindingName = bindingNameHolder.get();
        bindingNameHolder.remove();
        Assert.notNull((Object)bindingName, (String)"'bindingName' cannot be null");
        final KafkaBindingRebalanceListener userRebalanceListener = this.rebalanceListener;
        containerProperties.setConsumerRebalanceListener((ConsumerRebalanceListener)new ConsumerAwareRebalanceListener(){
            private boolean initial = true;

            public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
                userRebalanceListener.onPartitionsRevokedBeforeCommit(bindingName, consumer, partitions);
            }

            public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
                userRebalanceListener.onPartitionsRevokedAfterCommit(bindingName, consumer, partitions);
            }

            public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
                try {
                    userRebalanceListener.onPartitionsAssigned(bindingName, consumer, partitions, this.initial);
                }
                finally {
                    this.initial = false;
                }
            }
        });
    }

    public Collection<PartitionInfo> processTopic(String group, ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties, ConsumerFactory<?, ?> consumerFactory, int partitionCount, boolean usingPatterns, boolean groupManagement, String topic) {
        List<PartitionInfo> listenedPartitions;
        List<PartitionInfo> allPartitions;
        Collection<PartitionInfo> collection = allPartitions = usingPatterns ? Collections.emptyList() : this.getPartitionInfo(topic, extendedConsumerProperties, consumerFactory, partitionCount);
        if (groupManagement || extendedConsumerProperties.getInstanceCount() == 1) {
            listenedPartitions = allPartitions;
        } else {
            listenedPartitions = new ArrayList();
            for (PartitionInfo partition : allPartitions) {
                if (partition.partition() % extendedConsumerProperties.getInstanceCount() != extendedConsumerProperties.getInstanceIndex()) continue;
                listenedPartitions.add(partition);
            }
        }
        this.topicsInUse.put(topic, new TopicInformation(group, listenedPartitions, usingPatterns));
        return listenedPartitions;
    }

    private void resetOffsets(ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties, ConsumerFactory<?, ?> consumerFactory, boolean groupManagement, ContainerProperties containerProperties) {
        boolean resetOffsets = ((KafkaConsumerProperties)extendedConsumerProperties.getExtension()).isResetOffsets();
        final Object resetTo = consumerFactory.getConfigurationProperties().get("auto.offset.reset");
        final AtomicBoolean initialAssignment = new AtomicBoolean(true);
        if (!"earliest".equals(resetTo) && !"latest".equals(resetTo)) {
            this.logger.warn((Object)"no (or unknown) auto.offset.reset property cannot reset");
            resetOffsets = false;
        }
        if (groupManagement && resetOffsets) {
            containerProperties.setConsumerRebalanceListener((ConsumerRebalanceListener)new ConsumerAwareRebalanceListener(){

                public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> tps) {
                }

                public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> tps) {
                }

                public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> tps) {
                    if (initialAssignment.getAndSet(false)) {
                        if ("earliest".equals(resetTo)) {
                            consumer.seekToBeginning(tps);
                        } else if ("latest".equals(resetTo)) {
                            consumer.seekToEnd(tps);
                        }
                    }
                }
            });
        } else if (resetOffsets) {
            Arrays.stream(containerProperties.getTopicPartitions()).map(tpio -> new TopicPartitionInitialOffset(tpio.topic(), tpio.partition(), "earliest".equals(resetTo) ? TopicPartitionInitialOffset.SeekPosition.BEGINNING : TopicPartitionInitialOffset.SeekPosition.END)).collect(Collectors.toList()).toArray(containerProperties.getTopicPartitions());
        }
    }

    protected AbstractMessageChannelBinder.PolledConsumerResources createPolledConsumerResources(String name, String group, ConsumerDestination destination, ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties) {
        String[] stringArray;
        boolean anonymous = !StringUtils.hasText((String)group);
        Assert.isTrue((!anonymous || !((KafkaConsumerProperties)consumerProperties.getExtension()).isEnableDlq() ? 1 : 0) != 0, (String)"DLQ support is not available for anonymous subscriptions");
        String consumerGroup = anonymous ? "anonymous." + UUID.randomUUID().toString() : group;
        ConsumerFactory<?, ?> consumerFactory = this.createKafkaConsumerFactory(anonymous, consumerGroup, consumerProperties);
        if (consumerProperties.isMultiplex()) {
            stringArray = StringUtils.commaDelimitedListToStringArray((String)destination.getName());
        } else {
            String[] stringArray2 = new String[1];
            stringArray = stringArray2;
            stringArray2[0] = destination.getName();
        }
        String[] topics = stringArray;
        for (int i = 0; i < topics.length; ++i) {
            topics[i] = topics[i].trim();
        }
        KafkaMessageSource source = new KafkaMessageSource(consumerFactory, topics);
        source.setMessageConverter((RecordMessageConverter)this.getMessageConverter(consumerProperties));
        source.setRawMessageHeader(((KafkaConsumerProperties)consumerProperties.getExtension()).isEnableDlq());
        String clientId = name;
        if (((KafkaConsumerProperties)consumerProperties.getExtension()).getConfiguration().containsKey("client.id")) {
            clientId = (String)((KafkaConsumerProperties)consumerProperties.getExtension()).getConfiguration().get("client.id");
        }
        source.setClientId(clientId);
        if (!consumerProperties.isMultiplex()) {
            Collection<PartitionInfo> partitionInfos = this.getPartitionInfo(destination.getName(), consumerProperties, consumerFactory, -1);
            this.topicsInUse.put(destination.getName(), new TopicInformation(consumerGroup, partitionInfos, false));
        } else {
            for (int i = 0; i < topics.length; ++i) {
                Collection<PartitionInfo> partitionInfos = this.getPartitionInfo(topics[i], consumerProperties, consumerFactory, -1);
                this.topicsInUse.put(topics[i], new TopicInformation(consumerGroup, partitionInfos, false));
            }
        }
        source.setRebalanceListener(new ConsumerRebalanceListener(){

            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                KafkaMessageChannelBinder.this.logger.info((Object)("Revoked: " + partitions));
            }

            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                KafkaMessageChannelBinder.this.logger.info((Object)("Assigned: " + partitions));
            }
        });
        this.getMessageSourceCustomizer().configure((Object)source, destination.getName(), group);
        return new AbstractMessageChannelBinder.PolledConsumerResources((MessageSource)source, this.registerErrorInfrastructure(destination, group, (ConsumerProperties)consumerProperties, true));
    }

    protected void postProcessPollableSource(DefaultPollableMessageSource bindingTarget) {
        bindingTarget.setAttributesProvider((accessor, message) -> {
            Object rawMessage = message.getHeaders().get((Object)"kafka_data");
            if (rawMessage != null) {
                accessor.setAttribute("kafka_data", rawMessage);
            }
        });
    }

    private MessagingMessageConverter getMessageConverter(ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties) {
        MessagingMessageConverter messageConverter;
        if (((KafkaConsumerProperties)extendedConsumerProperties.getExtension()).getConverterBeanName() == null) {
            messageConverter = new MessagingMessageConverter();
            KafkaConsumerProperties.StandardHeaders standardHeaders = ((KafkaConsumerProperties)extendedConsumerProperties.getExtension()).getStandardHeaders();
            messageConverter.setGenerateMessageId(KafkaConsumerProperties.StandardHeaders.id.equals((Object)standardHeaders) || KafkaConsumerProperties.StandardHeaders.both.equals((Object)standardHeaders));
            messageConverter.setGenerateTimestamp(KafkaConsumerProperties.StandardHeaders.timestamp.equals((Object)standardHeaders) || KafkaConsumerProperties.StandardHeaders.both.equals((Object)standardHeaders));
        } else {
            try {
                messageConverter = (MessagingMessageConverter)this.getApplicationContext().getBean(((KafkaConsumerProperties)extendedConsumerProperties.getExtension()).getConverterBeanName(), MessagingMessageConverter.class);
            }
            catch (NoSuchBeanDefinitionException ex) {
                throw new IllegalStateException("Converter bean not present in application context", ex);
            }
        }
        messageConverter.setHeaderMapper(this.getHeaderMapper(extendedConsumerProperties));
        return messageConverter;
    }

    private KafkaHeaderMapper getHeaderMapper(ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties) {
        Object mapper = null;
        if (this.configurationProperties.getHeaderMapperBeanName() != null) {
            mapper = (KafkaHeaderMapper)this.getApplicationContext().getBean(this.configurationProperties.getHeaderMapperBeanName(), KafkaHeaderMapper.class);
        }
        if (mapper == null) {
            BinderHeaderMapper headerMapper = new BinderHeaderMapper(){

                @Override
                public void toHeaders(Headers source, Map<String, Object> headers) {
                    super.toHeaders(source, headers);
                    if (headers.size() > 0) {
                        headers.put("scst_nativeHeadersPresent", Boolean.TRUE);
                    }
                }
            };
            String[] trustedPackages = ((KafkaConsumerProperties)extendedConsumerProperties.getExtension()).getTrustedPackages();
            if (!StringUtils.isEmpty((Object)trustedPackages)) {
                headerMapper.addTrustedPackages(trustedPackages);
            }
            mapper = headerMapper;
        }
        return mapper;
    }

    private Collection<PartitionInfo> getPartitionInfo(String topic, ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties, ConsumerFactory<?, ?> consumerFactory, int partitionCount) {
        return ((KafkaTopicProvisioner)this.provisioningProvider).getPartitionsForTopic(partitionCount, ((KafkaConsumerProperties)extendedConsumerProperties.getExtension()).isAutoRebalanceEnabled(), () -> {
            try (Consumer consumer = consumerFactory.createConsumer();){
                List list = consumer.partitionsFor(topic);
                return list;
            }
        }, topic);
    }

    protected ErrorMessageStrategy getErrorMessageStrategy() {
        return new RawRecordHeaderErrorMessageStrategy();
    }

    protected MessageHandler getErrorMessageHandler(ConsumerDestination destination, String group, ExtendedConsumerProperties<KafkaConsumerProperties> properties) {
        KafkaConsumerProperties kafkaConsumerProperties = (KafkaConsumerProperties)properties.getExtension();
        if (kafkaConsumerProperties.isEnableDlq()) {
            KafkaProducerProperties dlqProducerProperties = kafkaConsumerProperties.getDlqProducerProperties();
            DefaultKafkaProducerFactory<byte[], byte[]> producerFactory = this.transactionManager != null ? this.transactionManager.getProducerFactory() : this.getProducerFactory(null, (ExtendedProducerProperties<KafkaProducerProperties>)new ExtendedProducerProperties((Object)dlqProducerProperties));
            KafkaTemplate kafkaTemplate = new KafkaTemplate(producerFactory);
            DlqSender dlqSender = new DlqSender(kafkaTemplate);
            return message -> {
                ConsumerRecord record = (ConsumerRecord)message.getHeaders().get((Object)"kafka_data", ConsumerRecord.class);
                if (properties.isUseNativeDecoding() && record != null) {
                    Map configuration;
                    Map map = configuration = this.transactionManager == null ? dlqProducerProperties.getConfiguration() : this.configurationProperties.getTransaction().getProducer().getConfiguration();
                    if (record.key() != null && !record.key().getClass().isInstance(byte[].class)) {
                        KafkaMessageChannelBinder.ensureDlqMessageCanBeProperlySerialized(configuration, config -> !config.containsKey("key.serializer"), "Key");
                    }
                    if (record.value() != null && !record.value().getClass().isInstance(byte[].class)) {
                        KafkaMessageChannelBinder.ensureDlqMessageCanBeProperlySerialized(configuration, config -> !config.containsKey("value.serializer"), "Payload");
                    }
                }
                if (record == null) {
                    this.logger.error((Object)("No raw record; cannot send to DLQ: " + message));
                    return;
                }
                RecordHeaders kafkaHeaders = new RecordHeaders(record.headers().toArray());
                AtomicReference<ConsumerRecord> recordToSend = new AtomicReference<ConsumerRecord>(record);
                if (message.getPayload() instanceof Throwable) {
                    Throwable throwable = (Throwable)message.getPayload();
                    HeaderMode headerMode = properties.getHeaderMode();
                    if (headerMode == null || HeaderMode.headers.equals((Object)headerMode)) {
                        kafkaHeaders.add((Header)new RecordHeader(X_ORIGINAL_TOPIC, record.topic().getBytes(StandardCharsets.UTF_8)));
                        kafkaHeaders.add((Header)new RecordHeader(X_ORIGINAL_PARTITION, ByteBuffer.allocate(4).putInt(record.partition()).array()));
                        kafkaHeaders.add((Header)new RecordHeader(X_ORIGINAL_OFFSET, ByteBuffer.allocate(8).putLong(record.offset()).array()));
                        kafkaHeaders.add((Header)new RecordHeader(X_ORIGINAL_TIMESTAMP, ByteBuffer.allocate(8).putLong(record.timestamp()).array()));
                        kafkaHeaders.add((Header)new RecordHeader(X_ORIGINAL_TIMESTAMP_TYPE, record.timestampType().toString().getBytes(StandardCharsets.UTF_8)));
                        kafkaHeaders.add((Header)new RecordHeader(X_EXCEPTION_FQCN, throwable.getClass().getName().getBytes(StandardCharsets.UTF_8)));
                        kafkaHeaders.add((Header)new RecordHeader(X_EXCEPTION_MESSAGE, throwable.getMessage().getBytes(StandardCharsets.UTF_8)));
                        kafkaHeaders.add((Header)new RecordHeader(X_EXCEPTION_STACKTRACE, this.getStackTraceAsString(throwable).getBytes(StandardCharsets.UTF_8)));
                    } else if (HeaderMode.embeddedHeaders.equals((Object)headerMode)) {
                        try {
                            MessageValues messageValues = EmbeddedHeaderUtils.extractHeaders((Message)MessageBuilder.withPayload((Object)((byte[])record.value())).build(), (boolean)false);
                            messageValues.put(X_ORIGINAL_TOPIC, (Object)record.topic());
                            messageValues.put(X_ORIGINAL_PARTITION, (Object)record.partition());
                            messageValues.put(X_ORIGINAL_OFFSET, (Object)record.offset());
                            messageValues.put(X_ORIGINAL_TIMESTAMP, (Object)record.timestamp());
                            messageValues.put(X_ORIGINAL_TIMESTAMP_TYPE, (Object)record.timestampType().toString());
                            messageValues.put(X_EXCEPTION_FQCN, (Object)throwable.getClass().getName());
                            messageValues.put(X_EXCEPTION_MESSAGE, (Object)throwable.getMessage());
                            messageValues.put(X_EXCEPTION_STACKTRACE, (Object)this.getStackTraceAsString(throwable));
                            String[] headersToEmbed = new ArrayList(messageValues.keySet()).toArray(new String[messageValues.keySet().size()]);
                            byte[] payload = EmbeddedHeaderUtils.embedHeaders((MessageValues)messageValues, (String[])EmbeddedHeaderUtils.headersToEmbed((String[])headersToEmbed));
                            recordToSend.set(new ConsumerRecord(record.topic(), record.partition(), record.offset(), record.key(), (Object)payload));
                        }
                        catch (Exception ex) {
                            throw new RuntimeException(ex);
                        }
                    }
                }
                String dlqName = StringUtils.hasText((String)kafkaConsumerProperties.getDlqName()) ? kafkaConsumerProperties.getDlqName() : "error." + record.topic() + "." + group;
                dlqSender.sendToDlq(recordToSend.get(), (Headers)kafkaHeaders, dlqName);
            };
        }
        return null;
    }

    protected MessageHandler getPolledConsumerErrorMessageHandler(ConsumerDestination destination, String group, ExtendedConsumerProperties<KafkaConsumerProperties> properties) {
        if (((KafkaConsumerProperties)properties.getExtension()).isEnableDlq()) {
            return this.getErrorMessageHandler(destination, group, properties);
        }
        MessageHandler superHandler = super.getErrorMessageHandler(destination, group, properties);
        return message -> {
            AcknowledgmentCallback ack;
            ConsumerRecord record = (ConsumerRecord)message.getHeaders().get((Object)"kafka_data");
            if (!(message instanceof ErrorMessage)) {
                this.logger.error((Object)("Expected an ErrorMessage, not a " + message.getClass().toString() + " for: " + message));
            } else if (record == null) {
                if (superHandler != null) {
                    superHandler.handleMessage(message);
                }
            } else if (message.getPayload() instanceof MessagingException && (ack = StaticMessageHeaderAccessor.getAcknowledgmentCallback((Message)((MessagingException)message.getPayload()).getFailedMessage())) != null) {
                if (this.isAutoCommitOnError(properties)) {
                    ack.acknowledge(AcknowledgmentCallback.Status.REJECT);
                } else {
                    ack.acknowledge(AcknowledgmentCallback.Status.REQUEUE);
                }
            }
        };
    }

    private static void ensureDlqMessageCanBeProperlySerialized(Map<String, String> configuration, Predicate<Map<String, String>> configPredicate, String dataType) {
        if (CollectionUtils.isEmpty(configuration) || configPredicate.test(configuration)) {
            throw new IllegalArgumentException("Native decoding is used on the consumer. " + dataType + " is not byte[] and no serializer is set on the DLQ producer.");
        }
    }

    protected ConsumerFactory<?, ?> createKafkaConsumerFactory(boolean anonymous, String consumerGroup, ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties) {
        HashMap<String, Object> props = new HashMap<String, Object>();
        props.put("key.deserializer", ByteArrayDeserializer.class);
        props.put("value.deserializer", ByteArrayDeserializer.class);
        props.put("enable.auto.commit", false);
        props.put("auto.commit.interval.ms", 100);
        props.put("auto.offset.reset", anonymous ? "latest" : "earliest");
        props.put("group.id", consumerGroup);
        Map mergedConfig = this.configurationProperties.mergedConsumerConfiguration();
        if (!ObjectUtils.isEmpty((Object)mergedConfig)) {
            props.putAll(mergedConfig);
        }
        if (ObjectUtils.isEmpty(props.get("bootstrap.servers"))) {
            props.put("bootstrap.servers", this.configurationProperties.getKafkaConnectionString());
        }
        if (!ObjectUtils.isEmpty((Object)((KafkaConsumerProperties)consumerProperties.getExtension()).getConfiguration())) {
            props.putAll(((KafkaConsumerProperties)consumerProperties.getExtension()).getConfiguration());
        }
        if (!ObjectUtils.isEmpty((Object)((KafkaConsumerProperties)consumerProperties.getExtension()).getStartOffset())) {
            props.put("auto.offset.reset", ((KafkaConsumerProperties)consumerProperties.getExtension()).getStartOffset().name());
        }
        return new DefaultKafkaConsumerFactory(props);
    }

    private boolean isAutoCommitOnError(ExtendedConsumerProperties<KafkaConsumerProperties> properties) {
        return ((KafkaConsumerProperties)properties.getExtension()).getAutoCommitOnError() != null ? ((KafkaConsumerProperties)properties.getExtension()).getAutoCommitOnError() : ((KafkaConsumerProperties)properties.getExtension()).isAutoCommitOffset() && ((KafkaConsumerProperties)properties.getExtension()).isEnableDlq();
    }

    private TopicPartitionInitialOffset[] getTopicPartitionInitialOffsets(Collection<PartitionInfo> listenedPartitions) {
        TopicPartitionInitialOffset[] topicPartitionInitialOffsets = new TopicPartitionInitialOffset[listenedPartitions.size()];
        int i = 0;
        for (PartitionInfo partition : listenedPartitions) {
            topicPartitionInitialOffsets[i++] = new TopicPartitionInitialOffset(partition.topic(), partition.partition());
        }
        return topicPartitionInitialOffsets;
    }

    private String toDisplayString(String original, int maxCharacters) {
        if (original.length() <= maxCharacters) {
            return original;
        }
        return original.substring(0, maxCharacters) + "...";
    }

    private String getStackTraceAsString(Throwable cause) {
        StringWriter stringWriter = new StringWriter();
        PrintWriter printWriter = new PrintWriter((Writer)stringWriter, true);
        cause.printStackTrace(printWriter);
        return stringWriter.getBuffer().toString();
    }

    private final class DlqSender<K, V> {
        private final KafkaTemplate<K, V> kafkaTemplate;

        DlqSender(KafkaTemplate<K, V> kafkaTemplate) {
            this.kafkaTemplate = kafkaTemplate;
        }

        void sendToDlq(ConsumerRecord<?, ?> consumerRecord, Headers headers, String dlqName) {
            block2: {
                Object key = consumerRecord.key();
                Object value = consumerRecord.value();
                ProducerRecord producerRecord = new ProducerRecord(dlqName, Integer.valueOf(consumerRecord.partition()), key, value, (Iterable)headers);
                final StringBuilder sb = new StringBuilder().append(" a message with key='").append(KafkaMessageChannelBinder.this.toDisplayString(ObjectUtils.nullSafeToString((Object)key), 50)).append("'").append(" and payload='").append(KafkaMessageChannelBinder.this.toDisplayString(ObjectUtils.nullSafeToString((Object)value), 50)).append("'").append(" received from ").append(consumerRecord.partition());
                ListenableFuture sentDlq = null;
                try {
                    sentDlq = this.kafkaTemplate.send(producerRecord);
                    sentDlq.addCallback(new ListenableFutureCallback<SendResult<K, V>>(){

                        public void onFailure(Throwable ex) {
                            KafkaMessageChannelBinder.this.logger.error((Object)("Error sending to DLQ " + sb.toString()), ex);
                        }

                        public void onSuccess(SendResult<K, V> result) {
                            if (KafkaMessageChannelBinder.this.logger.isDebugEnabled()) {
                                KafkaMessageChannelBinder.this.logger.debug((Object)("Sent to DLQ " + sb.toString()));
                            }
                        }
                    });
                }
                catch (Exception ex) {
                    if (sentDlq != null) break block2;
                    KafkaMessageChannelBinder.this.logger.error((Object)("Error sending to DLQ " + sb.toString()), (Throwable)ex);
                }
            }
        }
    }

    static class TopicInformation {
        private final String consumerGroup;
        private final Collection<PartitionInfo> partitionInfos;
        private final boolean isTopicPattern;

        TopicInformation(String consumerGroup, Collection<PartitionInfo> partitionInfos, boolean isTopicPattern) {
            this.consumerGroup = consumerGroup;
            this.partitionInfos = partitionInfos;
            this.isTopicPattern = isTopicPattern;
        }

        String getConsumerGroup() {
            return this.consumerGroup;
        }

        boolean isConsumerTopic() {
            return this.consumerGroup != null;
        }

        boolean isTopicPattern() {
            return this.isTopicPattern;
        }

        Collection<PartitionInfo> getPartitionInfos() {
            return this.partitionInfos;
        }
    }

    private final class ProducerConfigurationMessageHandler
    extends KafkaProducerMessageHandler<byte[], byte[]>
    implements Lifecycle {
        private boolean running;
        private final ProducerFactory<byte[], byte[]> producerFactory;

        ProducerConfigurationMessageHandler(KafkaTemplate<byte[], byte[]> kafkaTemplate, String topic, ExtendedProducerProperties<KafkaProducerProperties> producerProperties, ProducerFactory<byte[], byte[]> producerFactory) {
            super(kafkaTemplate);
            this.running = true;
            this.setTopicExpression((Expression)new LiteralExpression(topic));
            this.setMessageKeyExpression(((KafkaProducerProperties)producerProperties.getExtension()).getMessageKeyExpression());
            this.setBeanFactory((BeanFactory)KafkaMessageChannelBinder.this.getBeanFactory());
            if (producerProperties.isPartitioned()) {
                SpelExpressionParser parser = new SpelExpressionParser();
                this.setPartitionIdExpression(parser.parseExpression("headers['scst_partition']"));
            }
            if (((KafkaProducerProperties)producerProperties.getExtension()).isSync()) {
                this.setSync(true);
            }
            this.producerFactory = producerFactory;
        }

        public void start() {
            try {
                super.onInit();
            }
            catch (Exception ex) {
                this.logger.error((Object)"Initialization errors: ", (Throwable)ex);
                throw new RuntimeException(ex);
            }
        }

        public void stop() {
            if (this.producerFactory instanceof Lifecycle) {
                ((Lifecycle)this.producerFactory).stop();
            }
            this.running = false;
        }

        public boolean isRunning() {
            return this.running;
        }
    }
}

