package org.springframework.integration.kafka.inbound;

import java.lang.reflect.Method;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.springframework.core.AttributeAccessor;
import org.springframework.integration.context.OrderlyShutdownCapable;
import org.springframework.integration.gateway.MessagingGatewaySupport;
import org.springframework.integration.kafka.support.RawRecordHeaderErrorMessageStrategy;
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
import org.springframework.integration.support.ErrorMessageUtils;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.ConsumerSeekAware;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter;
import org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.converter.ConversionException;
import org.springframework.kafka.support.converter.KafkaMessageHeaders;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.retry.RecoveryCallback;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.RetryListener;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;

/* loaded from: input_file:org/springframework/integration/kafka/inbound/KafkaInboundGateway.class */
public class KafkaInboundGateway<K, V, R> extends MessagingGatewaySupport implements OrderlyShutdownCapable {
    private static final ThreadLocal<AttributeAccessor> attributesHolder = new ThreadLocal<>();
    private final KafkaInboundGateway<K, V, R>.IntegrationRecordMessageListener listener = new IntegrationRecordMessageListener();
    private final AbstractMessageListenerContainer<K, V> messageListenerContainer;
    private final KafkaTemplate<K, R> kafkaTemplate;
    private RetryTemplate retryTemplate;
    private RecoveryCallback<? extends Object> recoveryCallback;
    private BiConsumer<Map<TopicPartition, Long>, ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedSeekCallback;

    /* loaded from: input_file:org/springframework/integration/kafka/inbound/KafkaInboundGateway$IntegrationRecordMessageListener.class */
    private class IntegrationRecordMessageListener extends RecordMessagingMessageListenerAdapter<K, V> implements RetryListener {
        IntegrationRecordMessageListener() {
            super((Object) null, (Method) null);
        }

        public void onPartitionsAssigned(Map<TopicPartition, Long> map, ConsumerSeekAware.ConsumerSeekCallback consumerSeekCallback) {
            if (KafkaInboundGateway.this.onPartitionsAssignedSeekCallback != null) {
                KafkaInboundGateway.this.onPartitionsAssignedSeekCallback.accept(map, consumerSeekCallback);
            }
        }

        public void onMessage(ConsumerRecord<K, V> consumerRecord, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
            Message<?> message = null;
            try {
                message = toMessagingMessage(consumerRecord, acknowledgment, consumer);
                if (KafkaInboundGateway.this.retryTemplate != null) {
                    message = addDeliveryAttemptHeader(message);
                }
                KafkaInboundGateway.this.setAttributesIfNecessary(consumerRecord, message);
            } catch (RuntimeException e) {
                if (KafkaInboundGateway.this.getErrorChannel() != null) {
                    KafkaInboundGateway.this.messagingTemplate.send(KafkaInboundGateway.this.getErrorChannel(), KafkaInboundGateway.this.buildErrorMessage(null, new ConversionException("Failed to convert to message for: " + consumerRecord, e)));
                }
            }
            if (message == null) {
                KafkaInboundGateway.this.logger.debug("Converter returned a null message for: " + consumerRecord);
                return;
            }
            try {
                Message<?> sendAndReceiveMessage = KafkaInboundGateway.this.sendAndReceiveMessage(message);
                if (sendAndReceiveMessage != null) {
                    KafkaInboundGateway.this.kafkaTemplate.send(enhanceReply(message, sendAndReceiveMessage));
                }
            } finally {
                if (KafkaInboundGateway.this.retryTemplate == null) {
                    KafkaInboundGateway.attributesHolder.remove();
                }
            }
        }

        private Message<?> addDeliveryAttemptHeader(Message<?> message) {
            Message<?> message2 = message;
            AtomicInteger atomicInteger = new AtomicInteger(((RetryContext) KafkaInboundGateway.attributesHolder.get()).getRetryCount() + 1);
            if (message.getHeaders() instanceof KafkaMessageHeaders) {
                message.getHeaders().getRawHeaders().put("deliveryAttempt", atomicInteger);
            } else {
                message2 = MessageBuilder.fromMessage(message).setHeader("deliveryAttempt", atomicInteger).build();
            }
            return message2;
        }

        private Message<?> enhanceReply(Message<?> message, Message<?> message2) {
            AbstractIntegrationMessageBuilder abstractIntegrationMessageBuilder = null;
            MessageHeaders headers = message2.getHeaders();
            MessageHeaders headers2 = message.getHeaders();
            if (headers.get("kafka_correlationId") == null && headers2.get("kafka_correlationId") != null) {
                abstractIntegrationMessageBuilder = KafkaInboundGateway.this.getMessageBuilderFactory().fromMessage(message2).setHeader("kafka_correlationId", headers2.get("kafka_correlationId"));
            }
            if (headers.get("kafka_topic") == null && headers2.get("kafka_replyTopic") != null) {
                if (abstractIntegrationMessageBuilder == null) {
                    abstractIntegrationMessageBuilder = KafkaInboundGateway.this.getMessageBuilderFactory().fromMessage(message2);
                }
                abstractIntegrationMessageBuilder.setHeader("kafka_topic", headers2.get("kafka_replyTopic"));
            }
            if (headers.get("kafka_partitionId") == null && headers2.get("kafka_replyPartition") != null) {
                if (abstractIntegrationMessageBuilder == null) {
                    abstractIntegrationMessageBuilder = KafkaInboundGateway.this.getMessageBuilderFactory().fromMessage(message2);
                }
                abstractIntegrationMessageBuilder.setHeader("kafka_partitionId", headers2.get("kafka_replyPartition"));
            }
            return abstractIntegrationMessageBuilder != null ? abstractIntegrationMessageBuilder.build() : message2;
        }

        public <T, E extends Throwable> boolean open(RetryContext retryContext, RetryCallback<T, E> retryCallback) {
            if (KafkaInboundGateway.this.recoveryCallback == null) {
                return true;
            }
            KafkaInboundGateway.attributesHolder.set(retryContext);
            return true;
        }

        public <T, E extends Throwable> void close(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable th) {
            KafkaInboundGateway.attributesHolder.remove();
        }

        public <T, E extends Throwable> void onError(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable th) {
        }

        public /* bridge */ /* synthetic */ void onMessage(Object obj, Acknowledgment acknowledgment, Consumer consumer) {
            onMessage((ConsumerRecord) obj, acknowledgment, (Consumer<?, ?>) consumer);
        }
    }

    public KafkaInboundGateway(AbstractMessageListenerContainer<K, V> abstractMessageListenerContainer, KafkaTemplate<K, R> kafkaTemplate) {
        Assert.notNull(abstractMessageListenerContainer, "messageListenerContainer is required");
        Assert.notNull(kafkaTemplate, "kafkaTemplate is required");
        Assert.isNull(abstractMessageListenerContainer.getContainerProperties().getMessageListener(), "Container must not already have a listener");
        this.messageListenerContainer = abstractMessageListenerContainer;
        this.messageListenerContainer.setAutoStartup(false);
        this.kafkaTemplate = kafkaTemplate;
        setErrorMessageStrategy(new RawRecordHeaderErrorMessageStrategy());
    }

    public void setMessageConverter(RecordMessageConverter recordMessageConverter) {
        this.listener.setMessageConverter(recordMessageConverter);
    }

    public void setPayloadType(Class<?> cls) {
        this.listener.setFallbackType(cls);
    }

    public void setRetryTemplate(RetryTemplate retryTemplate) {
        this.retryTemplate = retryTemplate;
    }

    public void setRecoveryCallback(RecoveryCallback<? extends Object> recoveryCallback) {
        this.recoveryCallback = recoveryCallback;
    }

    public void setOnPartitionsAssignedSeekCallback(BiConsumer<Map<TopicPartition, Long>, ConsumerSeekAware.ConsumerSeekCallback> biConsumer) {
        this.onPartitionsAssignedSeekCallback = biConsumer;
    }

    protected void onInit() {
        super.onInit();
        MessageListener messageListener = this.listener;
        if (this.retryTemplate != null) {
            messageListener = new RetryingMessageListenerAdapter(messageListener, this.retryTemplate, this.recoveryCallback);
            this.retryTemplate.registerListener(this.listener);
        }
        this.messageListenerContainer.getContainerProperties().setMessageListener(messageListener);
    }

    protected void doStart() {
        this.messageListenerContainer.start();
    }

    protected void doStop() {
        this.messageListenerContainer.stop();
    }

    public String getComponentType() {
        return "kafka:inbound-gateway";
    }

    public int beforeShutdown() {
        this.messageListenerContainer.stop();
        return getPhase();
    }

    public int afterShutdown() {
        return getPhase();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void setAttributesIfNecessary(Object obj, Message<?> message) {
        AttributeAccessor attributeAccessor;
        boolean z = getErrorChannel() != null && this.retryTemplate == null;
        boolean z2 = z | (this.retryTemplate != null);
        if (z) {
            attributesHolder.set(ErrorMessageUtils.getAttributeAccessor((Message) null, (Message) null));
        }
        if (!z2 || (attributeAccessor = attributesHolder.get()) == null) {
            return;
        }
        attributeAccessor.setAttribute("inputMessage", message);
        attributeAccessor.setAttribute("kafka_data", obj);
    }

    protected AttributeAccessor getErrorMessageAttributes(Message<?> message) {
        AttributeAccessor attributeAccessor = attributesHolder.get();
        return attributeAccessor == null ? super.getErrorMessageAttributes(message) : attributeAccessor;
    }
}
