/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.kafka.core;

import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.Serializer;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextStoppedEvent;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.TransactionSupport;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

public class DefaultKafkaProducerFactory<K, V>
implements ProducerFactory<K, V>,
ApplicationContextAware,
ApplicationListener<ContextStoppedEvent>,
DisposableBean {
    public static final Duration DEFAULT_PHYSICAL_CLOSE_TIMEOUT = Duration.ofSeconds(30L);
    private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(DefaultKafkaProducerFactory.class));
    private final Map<String, Object> configs;
    private final AtomicInteger transactionIdSuffix = new AtomicInteger();
    private final Map<String, BlockingQueue<CloseSafeProducer<K, V>>> cache = new ConcurrentHashMap<String, BlockingQueue<CloseSafeProducer<K, V>>>();
    private final Map<String, CloseSafeProducer<K, V>> consumerProducers = new HashMap<String, CloseSafeProducer<K, V>>();
    private final AtomicInteger clientIdCounter = new AtomicInteger();
    private Supplier<Serializer<K>> keySerializerSupplier;
    private Supplier<Serializer<V>> valueSerializerSupplier;
    private Duration physicalCloseTimeout = DEFAULT_PHYSICAL_CLOSE_TIMEOUT;
    private String transactionIdPrefix;
    private ApplicationContext applicationContext;
    private boolean producerPerConsumerPartition = true;
    private boolean producerPerThread;
    private ThreadLocal<CloseSafeProducer<K, V>> threadBoundProducers;
    private String clientIdPrefix;
    private volatile CloseSafeProducer<K, V> producer;

    public DefaultKafkaProducerFactory(Map<String, Object> configs) {
        this(configs, () -> null, () -> null);
    }

    public DefaultKafkaProducerFactory(Map<String, Object> configs, @Nullable Serializer<K> keySerializer, @Nullable Serializer<V> valueSerializer) {
        this(configs, () -> keySerializer, () -> valueSerializer);
    }

    public DefaultKafkaProducerFactory(Map<String, Object> configs, @Nullable Supplier<Serializer<K>> keySerializerSupplier, @Nullable Supplier<Serializer<V>> valueSerializerSupplier) {
        String txId;
        this.configs = new HashMap<String, Object>(configs);
        this.keySerializerSupplier = keySerializerSupplier == null ? () -> null : keySerializerSupplier;
        Supplier<Object> supplier = this.valueSerializerSupplier = valueSerializerSupplier == null ? () -> null : valueSerializerSupplier;
        if (this.clientIdPrefix == null && configs.get("client.id") instanceof String) {
            this.clientIdPrefix = (String)configs.get("client.id");
        }
        if (StringUtils.hasText((String)(txId = (String)this.configs.get("transactional.id")))) {
            this.setTransactionIdPrefix(txId);
            LOGGER.info(() -> "If 'setTransactionIdPrefix()' is not going to be configured, the existing 'transactional.id' config with value: '" + txId + "' will be suffixed for concurrent transactions support.");
        }
    }

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    public void setKeySerializer(@Nullable Serializer<K> keySerializer) {
        this.keySerializerSupplier = () -> keySerializer;
    }

    public void setValueSerializer(@Nullable Serializer<V> valueSerializer) {
        this.valueSerializerSupplier = () -> valueSerializer;
    }

    public void setPhysicalCloseTimeout(int physicalCloseTimeout) {
        this.physicalCloseTimeout = Duration.ofSeconds(physicalCloseTimeout);
    }

    public final void setTransactionIdPrefix(String transactionIdPrefix) {
        Assert.notNull((Object)transactionIdPrefix, (String)"'transactionIdPrefix' cannot be null");
        this.transactionIdPrefix = transactionIdPrefix;
        this.enableIdempotentBehaviour();
    }

    protected String getTransactionIdPrefix() {
        return this.transactionIdPrefix;
    }

    public void setProducerPerThread(boolean producerPerThread) {
        this.producerPerThread = producerPerThread;
        this.threadBoundProducers = new ThreadLocal();
    }

    public void setProducerPerConsumerPartition(boolean producerPerConsumerPartition) {
        this.producerPerConsumerPartition = producerPerConsumerPartition;
    }

    @Override
    public boolean isProducerPerConsumerPartition() {
        return this.producerPerConsumerPartition;
    }

    public Map<String, Object> getConfigurationProperties() {
        return Collections.unmodifiableMap(this.configs);
    }

    private void enableIdempotentBehaviour() {
        Object previousValue = this.configs.putIfAbsent("enable.idempotence", true);
        if (Boolean.FALSE.equals(previousValue)) {
            LOGGER.debug(() -> "The 'enable.idempotence' is set to false, may result in duplicate messages");
        }
    }

    @Override
    public boolean transactionCapable() {
        return this.transactionIdPrefix != null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void destroy() {
        CloseSafeProducer<K, V> producerToClose = this.producer;
        this.producer = null;
        if (producerToClose != null) {
            producerToClose.getDelegate().close(this.physicalCloseTimeout);
        }
        this.cache.values().forEach(queue -> {
            CloseSafeProducer next = (CloseSafeProducer)queue.poll();
            while (next != null) {
                try {
                    next.getDelegate().close(this.physicalCloseTimeout);
                }
                catch (Exception e) {
                    LOGGER.error((Throwable)e, (CharSequence)"Exception while closing producer");
                }
                next = (CloseSafeProducer)queue.poll();
            }
        });
        this.cache.clear();
        Map<String, CloseSafeProducer<K, V>> map = this.consumerProducers;
        synchronized (map) {
            this.consumerProducers.forEach((k, v) -> v.getDelegate().close(this.physicalCloseTimeout));
            this.consumerProducers.clear();
        }
    }

    public void onApplicationEvent(ContextStoppedEvent event) {
        if (event.getApplicationContext().equals(this.applicationContext)) {
            this.reset();
        }
    }

    @Override
    public void reset() {
        try {
            this.destroy();
        }
        catch (Exception e) {
            LOGGER.error((Throwable)e, (CharSequence)"Exception while closing producer");
        }
    }

    @Deprecated
    public boolean isRunning() {
        return true;
    }

    @Override
    public Producer<K, V> createProducer() {
        return this.createProducer(this.transactionIdPrefix);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Producer<K, V> createProducer(@Nullable String txIdPrefixArg) {
        String txIdPrefix;
        String string = txIdPrefix = txIdPrefixArg == null ? this.transactionIdPrefix : txIdPrefixArg;
        if (txIdPrefix != null) {
            if (this.producerPerConsumerPartition) {
                return this.createTransactionalProducerForPartition(txIdPrefix);
            }
            return this.createTransactionalProducer(txIdPrefix);
        }
        if (this.producerPerThread) {
            CloseSafeProducer<K, V> tlProducer = this.threadBoundProducers.get();
            if (tlProducer == null) {
                tlProducer = new CloseSafeProducer<K, V>(this.createKafkaProducer());
                this.threadBoundProducers.set(tlProducer);
            }
            return tlProducer;
        }
        if (this.producer == null) {
            DefaultKafkaProducerFactory defaultKafkaProducerFactory = this;
            synchronized (defaultKafkaProducerFactory) {
                if (this.producer == null) {
                    this.producer = new CloseSafeProducer<K, V>(this.createKafkaProducer());
                }
            }
        }
        return this.producer;
    }

    protected Producer<K, V> createKafkaProducer() {
        if (this.clientIdPrefix == null) {
            return this.createRawProducer(this.configs);
        }
        HashMap<String, Object> newConfigs = new HashMap<String, Object>(this.configs);
        newConfigs.put("client.id", this.clientIdPrefix + "-" + this.clientIdCounter.incrementAndGet());
        return this.createRawProducer(newConfigs);
    }

    protected Producer<K, V> createTransactionalProducerForPartition() {
        return this.createTransactionalProducerForPartition(this.transactionIdPrefix);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected Producer<K, V> createTransactionalProducerForPartition(String txIdPrefix) {
        String suffix = TransactionSupport.getTransactionIdSuffix();
        if (suffix == null) {
            return this.createTransactionalProducer(txIdPrefix);
        }
        Map<String, CloseSafeProducer<K, V>> map = this.consumerProducers;
        synchronized (map) {
            if (!this.consumerProducers.containsKey(suffix)) {
                CloseSafeProducer<K, V> newProducer = this.doCreateTxProducer(txIdPrefix, suffix, this::removeConsumerProducer);
                this.consumerProducers.put(suffix, newProducer);
                return newProducer;
            }
            return this.consumerProducers.get(suffix);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void removeConsumerProducer(CloseSafeProducer<K, V> producerToRemove) {
        Map<String, CloseSafeProducer<K, V>> map = this.consumerProducers;
        synchronized (map) {
            Iterator<Map.Entry<String, CloseSafeProducer<K, V>>> iterator = this.consumerProducers.entrySet().iterator();
            while (iterator.hasNext()) {
                if (!iterator.next().getValue().equals(producerToRemove)) continue;
                iterator.remove();
                break;
            }
        }
    }

    protected Producer<K, V> createTransactionalProducer() {
        return this.createTransactionalProducer(this.transactionIdPrefix);
    }

    protected Producer<K, V> createTransactionalProducer(String txIdPrefix) {
        BlockingQueue<CloseSafeProducer<K, V>> queue = this.getCache(txIdPrefix);
        Assert.notNull(queue, () -> "No cache found for " + txIdPrefix);
        Producer cachedProducer = (Producer)queue.poll();
        if (cachedProducer == null) {
            return this.doCreateTxProducer(txIdPrefix, "" + this.transactionIdSuffix.getAndIncrement(), null);
        }
        return cachedProducer;
    }

    private CloseSafeProducer<K, V> doCreateTxProducer(String prefix, String suffix, @Nullable Consumer<CloseSafeProducer<K, V>> remover) {
        HashMap<String, Object> newProducerConfigs = new HashMap<String, Object>(this.configs);
        newProducerConfigs.put("transactional.id", prefix + suffix);
        if (this.clientIdPrefix != null) {
            newProducerConfigs.put("client.id", this.clientIdPrefix + "-" + this.clientIdCounter.incrementAndGet());
        }
        Producer<K, V> newProducer = this.createRawProducer(newProducerConfigs);
        newProducer.initTransactions();
        return new CloseSafeProducer<K, V>(newProducer, this.getCache(prefix), remover, (String)newProducerConfigs.get("transactional.id"));
    }

    protected Producer<K, V> createRawProducer(Map<String, Object> configs) {
        return new KafkaProducer(configs, this.keySerializerSupplier.get(), this.valueSerializerSupplier.get());
    }

    @Nullable
    protected BlockingQueue<CloseSafeProducer<K, V>> getCache() {
        return this.getCache(this.transactionIdPrefix);
    }

    @Nullable
    protected BlockingQueue<CloseSafeProducer<K, V>> getCache(String txIdPrefix) {
        if (txIdPrefix == null) {
            return null;
        }
        return this.cache.computeIfAbsent(txIdPrefix, txId -> new LinkedBlockingQueue());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void closeProducerFor(String suffix) {
        if (this.producerPerConsumerPartition) {
            Map<String, CloseSafeProducer<K, V>> map = this.consumerProducers;
            synchronized (map) {
                CloseSafeProducer<K, V> removed = this.consumerProducers.remove(suffix);
                if (removed != null) {
                    removed.getDelegate().close(this.physicalCloseTimeout);
                }
            }
        }
    }

    @Override
    public void closeThreadBoundProducer() {
        CloseSafeProducer<K, V> tlProducer = this.threadBoundProducers.get();
        if (tlProducer != null) {
            tlProducer.getDelegate().close(this.physicalCloseTimeout);
            this.threadBoundProducers.remove();
        }
    }

    protected static class CloseSafeProducer<K, V>
    implements Producer<K, V> {
        private static final Duration CLOSE_TIMEOUT_AFTER_TX_TIMEOUT = Duration.ofMillis(0L);
        private final Producer<K, V> delegate;
        private final BlockingQueue<CloseSafeProducer<K, V>> cache;
        private final Consumer<CloseSafeProducer<K, V>> removeConsumerProducer;
        private final String txId;
        private volatile Exception txFailed;

        CloseSafeProducer(Producer<K, V> delegate) {
            this(delegate, null, null);
            Assert.isTrue((!(delegate instanceof CloseSafeProducer) ? 1 : 0) != 0, (String)"Cannot double-wrap a producer");
        }

        CloseSafeProducer(Producer<K, V> delegate, BlockingQueue<CloseSafeProducer<K, V>> cache) {
            this(delegate, cache, null);
        }

        CloseSafeProducer(Producer<K, V> delegate, @Nullable BlockingQueue<CloseSafeProducer<K, V>> cache, @Nullable Consumer<CloseSafeProducer<K, V>> removeConsumerProducer) {
            this(delegate, cache, removeConsumerProducer, null);
        }

        CloseSafeProducer(Producer<K, V> delegate, @Nullable BlockingQueue<CloseSafeProducer<K, V>> cache, @Nullable Consumer<CloseSafeProducer<K, V>> removeConsumerProducer, @Nullable String txId) {
            this.delegate = delegate;
            this.cache = cache;
            this.removeConsumerProducer = removeConsumerProducer;
            this.txId = txId;
            LOGGER.debug(() -> "Created new Producer: " + this);
        }

        Producer<K, V> getDelegate() {
            return this.delegate;
        }

        public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
            LOGGER.trace(() -> this.toString() + " send(" + record + ")");
            return this.delegate.send(record);
        }

        public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
            LOGGER.trace(() -> this.toString() + " send(" + record + ")");
            return this.delegate.send(record, callback);
        }

        public void flush() {
            LOGGER.trace(() -> this.toString() + " flush()");
            this.delegate.flush();
        }

        public List<PartitionInfo> partitionsFor(String topic) {
            return this.delegate.partitionsFor(topic);
        }

        public Map<MetricName, ? extends Metric> metrics() {
            return this.delegate.metrics();
        }

        public void initTransactions() {
            this.delegate.initTransactions();
        }

        public void beginTransaction() throws ProducerFencedException {
            LOGGER.debug(() -> this.toString() + " beginTransaction()");
            try {
                this.delegate.beginTransaction();
            }
            catch (RuntimeException e) {
                LOGGER.error((Throwable)e, () -> "beginTransaction failed: " + this);
                this.txFailed = e;
                throw e;
            }
        }

        public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException {
            LOGGER.trace(() -> this.toString() + " sendOffsetsToTransaction(" + offsets + ", " + consumerGroupId + ")");
            this.delegate.sendOffsetsToTransaction(offsets, consumerGroupId);
        }

        public void commitTransaction() throws ProducerFencedException {
            LOGGER.debug(() -> this.toString() + " commitTransaction()");
            try {
                this.delegate.commitTransaction();
            }
            catch (RuntimeException e) {
                LOGGER.error((Throwable)e, () -> "commitTransaction failed: " + this);
                this.txFailed = e;
                throw e;
            }
        }

        public void abortTransaction() throws ProducerFencedException {
            LOGGER.debug(() -> this.toString() + " abortTransaction()");
            if (this.txFailed != null) {
                LOGGER.debug(() -> "abortTransaction ignored - previous txFailed: " + this.txFailed.getMessage() + ": " + this);
            } else {
                try {
                    this.delegate.abortTransaction();
                }
                catch (RuntimeException e) {
                    LOGGER.error((Throwable)e, () -> "Abort failed: " + this);
                    this.txFailed = e;
                    throw e;
                }
            }
        }

        public void close() {
            this.close(null);
        }

        @Deprecated
        public void close(long timeout, @Nullable TimeUnit unit) {
            this.close(unit == null ? null : Duration.ofMillis(unit.toMillis(timeout)));
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void close(@Nullable Duration timeout) {
            LOGGER.trace(() -> this.toString() + " close(" + (timeout == null ? "null" : timeout) + ")");
            if (this.cache != null) {
                Duration closeTimeout;
                Duration duration = closeTimeout = this.txFailed instanceof TimeoutException ? CLOSE_TIMEOUT_AFTER_TX_TIMEOUT : timeout;
                if (this.txFailed != null) {
                    LOGGER.warn(() -> "Error during transactional operation; producer removed from cache; possible cause: broker restarted during transaction: " + this);
                    this.delegate.close(closeTimeout);
                    if (this.removeConsumerProducer != null) {
                        this.removeConsumerProducer.accept(this);
                    }
                } else if (this.removeConsumerProducer == null) {
                    CloseSafeProducer closeSafeProducer = this;
                    synchronized (closeSafeProducer) {
                        if (!this.cache.contains(this) && !this.cache.offer(this)) {
                            this.delegate.close(closeTimeout);
                        }
                    }
                }
            }
        }

        public String toString() {
            return "CloseSafeProducer [delegate=" + this.delegate + "" + (this.txId != null ? ", txId=" + this.txId : "") + "]";
        }
    }
}

