package org.axonframework.extensions.kafka.eventhandling.producer;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.BuilderUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/axonframework/extensions/kafka/eventhandling/producer/DefaultProducerFactory.class */
public class DefaultProducerFactory<K, V> implements ProducerFactory<K, V> {
    private static final Logger logger = LoggerFactory.getLogger(DefaultProducerFactory.class);
    private final int closeTimeout;
    private final TimeUnit timeUnit;
    private final BlockingQueue<CloseLazyProducer<K, V>> cache;
    private final Map<String, Object> configuration;
    private final ConfirmationMode confirmationMode;
    private final String transactionIdPrefix;
    private final AtomicInteger transactionIdSuffix;
    private volatile CloseLazyProducer<K, V> producer;

    /* loaded from: input_file:org/axonframework/extensions/kafka/eventhandling/producer/DefaultProducerFactory$Builder.class */
    public static final class Builder<K, V> {
        private Map<String, Object> configuration;
        private String transactionIdPrefix;
        private int closeTimeout = 30;
        private TimeUnit timeUnit = TimeUnit.SECONDS;
        private int producerCacheSize = 10;
        private ConfirmationMode confirmationMode = ConfirmationMode.NONE;

        public Builder<K, V> closeTimeout(int i, TimeUnit timeUnit) {
            BuilderUtils.assertThat(Integer.valueOf(i), num -> {
                return num.intValue() > 0;
            }, "The closeTimeout should be a positive number");
            BuilderUtils.assertNonNull(timeUnit, "The timeUnit may not be null");
            this.closeTimeout = i;
            this.timeUnit = timeUnit;
            return this;
        }

        public Builder<K, V> producerCacheSize(int i) {
            BuilderUtils.assertThat(Integer.valueOf(i), num -> {
                return num.intValue() > 0;
            }, "The producerCacheSize should be a positive number");
            this.producerCacheSize = i;
            return this;
        }

        public Builder<K, V> configuration(Map<String, Object> map) {
            BuilderUtils.assertNonNull(map, "The configuration may not be null");
            this.configuration = Collections.unmodifiableMap(new HashMap(map));
            return this;
        }

        public Builder<K, V> confirmationMode(ConfirmationMode confirmationMode) {
            BuilderUtils.assertNonNull(confirmationMode, "ConfirmationMode may not be null");
            this.confirmationMode = confirmationMode;
            return this;
        }

        public Builder<K, V> transactionalIdPrefix(String str) {
            this.transactionIdPrefix = str;
            return confirmationMode(ConfirmationMode.TRANSACTIONAL);
        }

        public DefaultProducerFactory<K, V> build() {
            return new DefaultProducerFactory<>(this);
        }

        protected void validate() throws AxonConfigurationException {
            BuilderUtils.assertNonNull(this.configuration, "The configuration is a hard requirement and should be provided");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/axonframework/extensions/kafka/eventhandling/producer/DefaultProducerFactory$CloseLazyProducer.class */
    public static final class CloseLazyProducer<K, V> implements Producer<K, V> {
        private final Producer<K, V> delegate;
        private final BlockingQueue<CloseLazyProducer<K, V>> cache;
        private final int closeTimeout;
        private final TimeUnit unit;

        CloseLazyProducer(Producer<K, V> producer, BlockingQueue<CloseLazyProducer<K, V>> blockingQueue, int i, TimeUnit timeUnit) {
            this.delegate = producer;
            this.cache = blockingQueue;
            this.closeTimeout = i;
            this.unit = timeUnit;
        }

        public Future<RecordMetadata> send(ProducerRecord<K, V> producerRecord) {
            return this.delegate.send(producerRecord);
        }

        public Future<RecordMetadata> send(ProducerRecord<K, V> producerRecord, Callback callback) {
            return this.delegate.send(producerRecord, callback);
        }

        public void flush() {
            this.delegate.flush();
        }

        public List<PartitionInfo> partitionsFor(String str) {
            return this.delegate.partitionsFor(str);
        }

        public Map<MetricName, ? extends Metric> metrics() {
            return this.delegate.metrics();
        }

        public void initTransactions() {
            this.delegate.initTransactions();
        }

        public void beginTransaction() throws ProducerFencedException {
            this.delegate.beginTransaction();
        }

        public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> map, String str) throws ProducerFencedException {
            this.delegate.sendOffsetsToTransaction(map, str);
        }

        public void commitTransaction() throws ProducerFencedException {
            this.delegate.commitTransaction();
        }

        public void abortTransaction() throws ProducerFencedException {
            this.delegate.abortTransaction();
        }

        public void close() {
            close(this.closeTimeout, this.unit);
        }

        public void close(long j, TimeUnit timeUnit) {
            if (this.cache.offer(this)) {
                return;
            }
            this.delegate.close(j, timeUnit);
        }

        public String toString() {
            return "CloseLazyProducer [delegate=" + this.delegate + "]";
        }
    }

    protected DefaultProducerFactory(Builder<K, V> builder) {
        builder.validate();
        this.closeTimeout = ((Builder) builder).closeTimeout;
        this.timeUnit = ((Builder) builder).timeUnit;
        this.cache = new ArrayBlockingQueue(((Builder) builder).producerCacheSize);
        this.configuration = ((Builder) builder).configuration;
        this.confirmationMode = ((Builder) builder).confirmationMode;
        this.transactionIdPrefix = ((Builder) builder).transactionIdPrefix;
        this.transactionIdSuffix = new AtomicInteger();
    }

    public static <K, V> Builder<K, V> builder() {
        return new Builder<>();
    }

    @Override // org.axonframework.extensions.kafka.eventhandling.producer.ProducerFactory
    public Producer<K, V> createProducer() {
        if (this.confirmationMode.isTransactional()) {
            return createTransactionalProducer();
        }
        if (this.producer == null) {
            synchronized (this) {
                if (this.producer == null) {
                    this.producer = new CloseLazyProducer<>(createKafkaProducer(this.configuration), this.cache, this.closeTimeout, this.timeUnit);
                }
            }
        }
        return this.producer;
    }

    @Override // org.axonframework.extensions.kafka.eventhandling.producer.ProducerFactory
    public ConfirmationMode confirmationMode() {
        return this.confirmationMode;
    }

    public Map<String, Object> configurationProperties() {
        return Collections.unmodifiableMap(this.configuration);
    }

    public String transactionIdPrefix() {
        return this.transactionIdPrefix;
    }

    @Override // org.axonframework.extensions.kafka.eventhandling.producer.ProducerFactory
    public void shutDown() {
        CloseLazyProducer<K, V> closeLazyProducer = this.producer;
        this.producer = null;
        if (closeLazyProducer != null) {
            ((CloseLazyProducer) closeLazyProducer).delegate.close(this.closeTimeout, this.timeUnit);
        }
        CloseLazyProducer<K, V> poll = this.cache.poll();
        while (true) {
            CloseLazyProducer<K, V> closeLazyProducer2 = poll;
            if (closeLazyProducer2 == null) {
                return;
            }
            try {
                ((CloseLazyProducer) closeLazyProducer2).delegate.close(this.closeTimeout, this.timeUnit);
            } catch (Exception e) {
                logger.error("Exception closing producer", e);
            }
            poll = this.cache.poll();
        }
    }

    private Producer<K, V> createTransactionalProducer() {
        CloseLazyProducer<K, V> poll = this.cache.poll();
        if (poll != null) {
            return poll;
        }
        Map<String, Object> hashMap = new HashMap<>(this.configuration);
        hashMap.put("transactional.id", this.transactionIdPrefix + this.transactionIdSuffix.getAndIncrement());
        CloseLazyProducer closeLazyProducer = new CloseLazyProducer(createKafkaProducer(hashMap), this.cache, this.closeTimeout, this.timeUnit);
        closeLazyProducer.initTransactions();
        return closeLazyProducer;
    }

    private Producer<K, V> createKafkaProducer(Map<String, Object> map) {
        return new KafkaProducer(map);
    }
}
