/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.kafka.requestreply;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.KafkaException;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.BatchMessageListener;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.GenericMessageListenerContainer;
import org.springframework.kafka.listener.ListenerUtils;
import org.springframework.kafka.requestreply.CorrelationKey;
import org.springframework.kafka.requestreply.KafkaReplyTimeoutException;
import org.springframework.kafka.requestreply.ReplyingKafkaOperations;
import org.springframework.kafka.requestreply.RequestReplyFuture;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.kafka.support.serializer.DeserializationException;
import org.springframework.lang.Nullable;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.util.Assert;

public class ReplyingKafkaTemplate<K, V, R>
extends KafkaTemplate<K, V>
implements BatchMessageListener<K, R>,
InitializingBean,
SmartLifecycle,
DisposableBean,
ReplyingKafkaOperations<K, V, R> {
    private static final String WITH_CORRELATION_ID = " with correlationId: ";
    private static final int FIVE = 5;
    private static final Duration DEFAULT_REPLY_TIMEOUT = Duration.ofSeconds(5L);
    private final GenericMessageListenerContainer<K, R> replyContainer;
    private final ConcurrentMap<CorrelationKey, RequestReplyFuture<K, V, R>> futures = new ConcurrentHashMap<CorrelationKey, RequestReplyFuture<K, V, R>>();
    private final byte[] replyTopic;
    private final byte[] replyPartition;
    private TaskScheduler scheduler = new ThreadPoolTaskScheduler();
    private int phase;
    private boolean autoStartup = true;
    private Duration defaultReplyTimeout = DEFAULT_REPLY_TIMEOUT;
    private boolean schedulerSet;
    private boolean sharedReplyTopic;
    private Function<ProducerRecord<K, V>, CorrelationKey> correlationStrategy = ReplyingKafkaTemplate::defaultCorrelationIdStrategy;
    private String correlationHeaderName = "kafka_correlationId";
    private String replyTopicHeaderName = "kafka_replyTopic";
    private String replyPartitionHeaderName = "kafka_replyPartition";
    private Function<ConsumerRecord<?, ?>, Exception> replyErrorChecker = rec -> null;
    private volatile boolean running;
    private volatile boolean schedulerInitialized;

    public ReplyingKafkaTemplate(ProducerFactory<K, V> producerFactory, GenericMessageListenerContainer<K, R> replyContainer) {
        this(producerFactory, replyContainer, false);
    }

    public ReplyingKafkaTemplate(ProducerFactory<K, V> producerFactory, GenericMessageListenerContainer<K, R> replyContainer, boolean autoFlush) {
        super(producerFactory, autoFlush);
        Assert.notNull(replyContainer, (String)"'replyContainer' cannot be null");
        this.replyContainer = replyContainer;
        this.replyContainer.setupMessageListener(this);
        ContainerProperties properties = this.replyContainer.getContainerProperties();
        String tempReplyTopic = null;
        byte[] tempReplyPartition = null;
        TopicPartitionOffset[] topicPartitionsToAssign = properties.getTopicPartitions();
        String[] topics = properties.getTopics();
        if (topics != null && topics.length == 1) {
            tempReplyTopic = topics[0];
        } else if (topicPartitionsToAssign != null && topicPartitionsToAssign.length == 1) {
            TopicPartitionOffset topicPartitionOffset = topicPartitionsToAssign[0];
            Assert.notNull((Object)topicPartitionOffset, (String)"'topicPartitionsToAssign' must not be null");
            tempReplyTopic = topicPartitionOffset.getTopic();
            ByteBuffer buffer = ByteBuffer.allocate(4);
            buffer.putInt(topicPartitionOffset.getPartition());
            tempReplyPartition = buffer.array();
        }
        if (tempReplyTopic == null) {
            this.replyTopic = null;
            this.replyPartition = null;
            this.logger.debug(() -> "Could not determine container's reply topic/partition; senders must populate at least the kafka_replyTopic header, and optionally the kafka_replyPartition header");
        } else {
            this.replyTopic = tempReplyTopic.getBytes(StandardCharsets.UTF_8);
            this.replyPartition = tempReplyPartition;
        }
    }

    public void setTaskScheduler(TaskScheduler scheduler) {
        Assert.notNull((Object)scheduler, (String)"'scheduler' cannot be null");
        this.scheduler = scheduler;
        this.schedulerSet = true;
    }

    protected Duration getDefaultReplyTimeout() {
        return this.defaultReplyTimeout;
    }

    public void setDefaultReplyTimeout(Duration defaultReplyTimeout) {
        Assert.notNull((Object)defaultReplyTimeout, (String)"'defaultReplyTimeout' cannot be null");
        Assert.isTrue((defaultReplyTimeout.toMillis() >= 0L ? 1 : 0) != 0, (String)"'replyTimeout' must be >= 0");
        this.defaultReplyTimeout = defaultReplyTimeout;
    }

    public boolean isRunning() {
        return this.running;
    }

    public int getPhase() {
        return this.phase;
    }

    public void setPhase(int phase) {
        this.phase = phase;
    }

    public boolean isAutoStartup() {
        return this.autoStartup;
    }

    public void setAutoStartup(boolean autoStartup) {
        this.autoStartup = autoStartup;
    }

    public Collection<TopicPartition> getAssignedReplyTopicPartitions() {
        return this.replyContainer.getAssignedPartitions();
    }

    public void setSharedReplyTopic(boolean sharedReplyTopic) {
        this.sharedReplyTopic = sharedReplyTopic;
    }

    public void setCorrelationIdStrategy(Function<ProducerRecord<K, V>, CorrelationKey> correlationStrategy) {
        Assert.notNull(correlationStrategy, (String)"'correlationStrategy' cannot be null");
        this.correlationStrategy = correlationStrategy;
    }

    public void setCorrelationHeaderName(String correlationHeaderName) {
        Assert.notNull((Object)correlationHeaderName, (String)"'correlationHeaderName' cannot be null");
        this.correlationHeaderName = correlationHeaderName;
    }

    public void setReplyTopicHeaderName(String replyTopicHeaderName) {
        Assert.notNull((Object)replyTopicHeaderName, (String)"'replyTopicHeaderName' cannot be null");
        this.replyTopicHeaderName = replyTopicHeaderName;
    }

    public void setReplyPartitionHeaderName(String replyPartitionHeaderName) {
        Assert.notNull((Object)replyPartitionHeaderName, (String)"'replyPartitionHeaderName' cannot be null");
        this.replyPartitionHeaderName = replyPartitionHeaderName;
    }

    public void setReplyErrorChecker(Function<ConsumerRecord<?, ?>, Exception> replyErrorChecker) {
        Assert.notNull(replyErrorChecker, (String)"'replyErrorChecker' cannot be null");
        this.replyErrorChecker = replyErrorChecker;
    }

    public void afterPropertiesSet() {
        if (!this.schedulerSet && !this.schedulerInitialized) {
            ((ThreadPoolTaskScheduler)this.scheduler).initialize();
            this.schedulerInitialized = true;
        }
    }

    public synchronized void start() {
        if (!this.running) {
            try {
                this.afterPropertiesSet();
            }
            catch (Exception e) {
                throw new KafkaException("Failed to initialize", e);
            }
            this.replyContainer.start();
            this.running = true;
        }
    }

    public synchronized void stop() {
        if (this.running) {
            this.running = false;
            this.replyContainer.stop();
            this.futures.clear();
        }
    }

    public void stop(Runnable callback) {
        this.stop();
        callback.run();
    }

    @Override
    public RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record) {
        return this.sendAndReceive(record, null);
    }

    @Override
    public RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record, @Nullable Duration replyTimeout) {
        boolean hasReplyTopic;
        Assert.state((boolean)this.running, (String)"Template has not been start()ed");
        CorrelationKey correlationId = this.correlationStrategy.apply(record);
        Assert.notNull((Object)correlationId, (String)"the created 'correlationId' cannot be null");
        Headers headers = record.headers();
        boolean bl = hasReplyTopic = headers.lastHeader("kafka_replyTopic") != null;
        if (!hasReplyTopic && this.replyTopic != null) {
            headers.add((Header)new RecordHeader(this.replyTopicHeaderName, this.replyTopic));
            if (this.replyPartition != null) {
                headers.add((Header)new RecordHeader(this.replyPartitionHeaderName, this.replyPartition));
            }
        }
        headers.add((Header)new RecordHeader(this.correlationHeaderName, correlationId.getCorrelationId()));
        this.logger.debug(() -> "Sending: " + record + WITH_CORRELATION_ID + correlationId);
        RequestReplyFuture future = new RequestReplyFuture();
        this.futures.put(correlationId, future);
        try {
            future.setSendFuture(this.send(record));
        }
        catch (Exception e) {
            this.futures.remove(correlationId);
            throw new KafkaException("Send failed", e);
        }
        this.scheduleTimeout(record, correlationId, replyTimeout == null ? this.defaultReplyTimeout : replyTimeout);
        return future;
    }

    private void scheduleTimeout(ProducerRecord<K, V> record, CorrelationKey correlationId, Duration replyTimeout) {
        this.scheduler.schedule(() -> {
            RequestReplyFuture removed = (RequestReplyFuture)((Object)((Object)this.futures.remove(correlationId)));
            if (removed != null) {
                this.logger.warn(() -> "Reply timed out for: " + record + WITH_CORRELATION_ID + correlationId);
                if (!this.handleTimeout(correlationId, removed)) {
                    removed.setException((Throwable)((Object)new KafkaReplyTimeoutException("Reply timed out")));
                }
            }
        }, Instant.now().plus(replyTimeout));
    }

    protected boolean handleTimeout(CorrelationKey correlationId, RequestReplyFuture<K, V, R> future) {
        return false;
    }

    protected boolean isPending(CorrelationKey correlationId) {
        return this.futures.containsKey(correlationId);
    }

    @Override
    public void destroy() {
        if (!this.schedulerSet) {
            ((ThreadPoolTaskScheduler)this.scheduler).destroy();
        }
    }

    private static <K, V> CorrelationKey defaultCorrelationIdStrategy(ProducerRecord<K, V> record) {
        UUID uuid = UUID.randomUUID();
        byte[] bytes = new byte[16];
        ByteBuffer bb = ByteBuffer.wrap(bytes);
        bb.putLong(uuid.getMostSignificantBits());
        bb.putLong(uuid.getLeastSignificantBits());
        return new CorrelationKey(bytes);
    }

    @Override
    public void onMessage(List<ConsumerRecord<K, R>> data) {
        data.forEach(record -> {
            Header correlationHeader = record.headers().lastHeader(this.correlationHeaderName);
            CorrelationKey correlationId = null;
            if (correlationHeader != null) {
                correlationId = new CorrelationKey(correlationHeader.value());
            }
            if (correlationId == null) {
                this.logger.error(() -> "No correlationId found in reply: " + record + " - to use request/reply semantics, the responding server must return the correlation id  in the '" + this.correlationHeaderName + "' header");
            } else {
                RequestReplyFuture future = (RequestReplyFuture)((Object)((Object)this.futures.remove(correlationId)));
                CorrelationKey correlationKey = correlationId;
                if (future == null) {
                    this.logLateArrival((ConsumerRecord<K, R>)record, correlationId);
                } else {
                    boolean ok = true;
                    Exception exception = this.checkForErrors((ConsumerRecord<K, R>)record);
                    if (exception != null) {
                        ok = false;
                        future.setException(exception);
                    }
                    if (ok) {
                        this.logger.debug(() -> "Received: " + record + WITH_CORRELATION_ID + correlationKey);
                        future.set(record);
                    }
                }
            }
        });
    }

    @Nullable
    protected Exception checkForErrors(ConsumerRecord<K, R> record) {
        DeserializationException de;
        if ((record.value() == null || record.key() == null) && (de = ReplyingKafkaTemplate.checkDeserialization(record, this.logger)) != null) {
            return de;
        }
        return this.replyErrorChecker.apply(record);
    }

    @Nullable
    public static DeserializationException checkDeserialization(ConsumerRecord<?, ?> record, LogAccessor logger) {
        DeserializationException exception = ListenerUtils.getExceptionFromHeader(record, "springDeserializerExceptionValue", logger);
        if (exception != null) {
            logger.error((Throwable)((Object)exception), () -> "Reply value deserialization failed for " + record.topic() + "-" + record.partition() + "@" + record.offset());
            return exception;
        }
        exception = ListenerUtils.getExceptionFromHeader(record, "springDeserializerExceptionKey", logger);
        if (exception != null) {
            logger.error((Throwable)((Object)exception), () -> "Reply key deserialization failed for " + record.topic() + "-" + record.partition() + "@" + record.offset());
            return exception;
        }
        return null;
    }

    protected void logLateArrival(ConsumerRecord<K, R> record, CorrelationKey correlationId) {
        if (this.sharedReplyTopic) {
            this.logger.debug(() -> this.missingCorrelationLogMessage(record, correlationId));
        } else {
            this.logger.error(() -> this.missingCorrelationLogMessage(record, correlationId));
        }
    }

    private String missingCorrelationLogMessage(ConsumerRecord<K, R> record, CorrelationKey correlationId) {
        return "No pending reply: " + record + WITH_CORRELATION_ID + correlationId + ", perhaps timed out, or using a shared reply topic";
    }
}

