/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.extensions.kafka.eventhandling.producer;

import com.thoughtworks.xstream.XStream;
import com.thoughtworks.xstream.io.HierarchicalStreamDriver;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import javax.annotation.Nonnull;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.BuilderUtils;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.extensions.kafka.eventhandling.DefaultKafkaMessageConverter;
import org.axonframework.extensions.kafka.eventhandling.KafkaMessageConverter;
import org.axonframework.extensions.kafka.eventhandling.producer.ConfirmationMode;
import org.axonframework.extensions.kafka.eventhandling.producer.ProducerFactory;
import org.axonframework.extensions.kafka.eventhandling.producer.TopicResolver;
import org.axonframework.lifecycle.Lifecycle;
import org.axonframework.messaging.EventPublicationFailedException;
import org.axonframework.messaging.unitofwork.CurrentUnitOfWork;
import org.axonframework.monitoring.MessageMonitor;
import org.axonframework.monitoring.NoOpMessageMonitor;
import org.axonframework.serialization.Serializer;
import org.axonframework.serialization.xml.CompactDriver;
import org.axonframework.serialization.xml.XStreamSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaPublisher<K, V>
implements Lifecycle {
    private static final Logger logger = LoggerFactory.getLogger(KafkaPublisher.class);
    private static final String DEFAULT_TOPIC = "Axon.Events";
    private final ProducerFactory<K, V> producerFactory;
    private final KafkaMessageConverter<K, V> messageConverter;
    private final MessageMonitor<? super EventMessage<?>> messageMonitor;
    private final TopicResolver topicResolver;
    private final long publisherAckTimeout;

    protected KafkaPublisher(Builder<K, V> builder) {
        builder.validate();
        this.producerFactory = ((Builder)builder).producerFactory;
        this.messageConverter = ((Builder)builder).messageConverter;
        this.messageMonitor = ((Builder)builder).messageMonitor;
        this.topicResolver = ((Builder)builder).topicResolver;
        this.publisherAckTimeout = ((Builder)builder).publisherAckTimeout;
    }

    public static <K, V> Builder<K, V> builder() {
        return new Builder();
    }

    public <T extends EventMessage<?>> void send(T event) {
        logger.debug("Starting event producing process for [{}].", (Object)event.getPayloadType());
        Optional<String> topic = this.topicResolver.resolve(event);
        if (!topic.isPresent()) {
            logger.debug("Skip publishing event for [{}] since topicFunction returned empty.", (Object)event.getPayloadType());
            return;
        }
        MessageMonitor.MonitorCallback monitorCallback = this.messageMonitor.onMessageIngested(event);
        Producer<K, V> producer = this.producerFactory.createProducer();
        ConfirmationMode confirmationMode = this.producerFactory.confirmationMode();
        if (confirmationMode.isTransactional()) {
            this.tryBeginTxn(producer);
        }
        Future publishStatus = producer.send(this.messageConverter.createKafkaMessage(event, topic.get()));
        CurrentUnitOfWork.get().onRollback(u -> {
            if (confirmationMode.isTransactional()) {
                this.tryRollback(producer);
            }
            this.tryClose(producer);
        });
        if (confirmationMode.isTransactional()) {
            this.tryCommit(producer, monitorCallback);
        } else if (confirmationMode.isWaitForAck()) {
            this.waitForPublishAck(publishStatus, monitorCallback);
        }
        this.tryClose(producer);
    }

    private void tryBeginTxn(Producer<?, ?> producer) {
        try {
            producer.beginTransaction();
        }
        catch (ProducerFencedException e) {
            logger.warn("Unable to begin transaction");
            throw new EventPublicationFailedException("Event publication failed, exception occurred while starting Kafka transaction.", (Throwable)e);
        }
    }

    private void tryCommit(Producer<?, ?> producer, MessageMonitor.MonitorCallback monitorCallback) {
        try {
            producer.commitTransaction();
            monitorCallback.reportSuccess();
        }
        catch (ProducerFencedException e) {
            logger.warn("Unable to commit transaction");
            monitorCallback.reportFailure((Throwable)e);
            throw new EventPublicationFailedException("Event publication failed, exception occurred while committing Kafka transaction.", (Throwable)e);
        }
    }

    private void waitForPublishAck(Future<RecordMetadata> future, MessageMonitor.MonitorCallback monitorCallback) {
        long deadline = System.currentTimeMillis() + this.publisherAckTimeout;
        try {
            future.get(Math.max(0L, deadline - System.currentTimeMillis()), TimeUnit.MILLISECONDS);
            monitorCallback.reportSuccess();
        }
        catch (InterruptedException e) {
            monitorCallback.reportFailure((Throwable)e);
            logger.warn("Encountered error while waiting for event publication.", (Throwable)e);
            Thread.currentThread().interrupt();
        }
        catch (ExecutionException | TimeoutException e) {
            monitorCallback.reportFailure((Throwable)e);
            logger.warn("Encountered error while waiting for event publication.");
            throw new EventPublicationFailedException("Event publication failed, exception occurred while waiting for event publication.", (Throwable)e);
        }
    }

    private void tryRollback(Producer<?, ?> producer) {
        try {
            producer.abortTransaction();
        }
        catch (Exception e) {
            logger.warn("Unable to abort transaction.", (Throwable)e);
        }
    }

    private void tryClose(Producer<?, ?> producer) {
        try {
            producer.close();
        }
        catch (Exception e) {
            logger.debug("Unable to close producer.", (Throwable)e);
        }
    }

    public void shutDown() {
        this.producerFactory.shutDown();
    }

    public void registerLifecycleHandlers(@Nonnull Lifecycle.LifecycleRegistry lifecycle) {
        lifecycle.onShutdown(0x3FFFFFFF, this::shutDown);
    }

    public static class Builder<K, V> {
        private ProducerFactory<K, V> producerFactory;
        private KafkaMessageConverter<K, V> messageConverter;
        private MessageMonitor<? super EventMessage<?>> messageMonitor = NoOpMessageMonitor.instance();
        private TopicResolver topicResolver = m -> Optional.of(KafkaPublisher.DEFAULT_TOPIC);
        private long publisherAckTimeout = 1000L;
        private Supplier<Serializer> serializer;

        public Builder<K, V> serializer(Serializer serializer) {
            BuilderUtils.assertNonNull((Object)serializer, (String)"The Serializer may not be null");
            this.serializer = () -> serializer;
            return this;
        }

        public Builder<K, V> producerFactory(ProducerFactory<K, V> producerFactory) {
            BuilderUtils.assertNonNull(producerFactory, (String)"ProducerFactory may not be null");
            this.producerFactory = producerFactory;
            return this;
        }

        public Builder<K, V> messageConverter(KafkaMessageConverter<K, V> messageConverter) {
            BuilderUtils.assertNonNull(messageConverter, (String)"MessageConverter may not be null");
            this.messageConverter = messageConverter;
            return this;
        }

        public Builder<K, V> messageMonitor(MessageMonitor<? super EventMessage<?>> messageMonitor) {
            BuilderUtils.assertNonNull(messageMonitor, (String)"MessageMonitor may not be null");
            this.messageMonitor = messageMonitor;
            return this;
        }

        @Deprecated
        public Builder<K, V> topic(String topic) {
            BuilderUtils.assertThat((Object)topic, name -> Objects.nonNull(name) && !"".equals(name), (String)"The topic may not be null or empty");
            this.topicResolver = m -> Optional.of(topic);
            return this;
        }

        public Builder<K, V> topicResolver(TopicResolver topicResolver) {
            BuilderUtils.assertNonNull((Object)topicResolver, (String)"The TopicResolver may not be null");
            this.topicResolver = topicResolver;
            return this;
        }

        public Builder<K, V> publisherAckTimeout(long publisherAckTimeout) {
            BuilderUtils.assertThat((Object)publisherAckTimeout, timeout -> timeout >= 0L, (String)"The publisherAckTimeout should be a positive number or zero");
            this.publisherAckTimeout = publisherAckTimeout;
            return this;
        }

        public KafkaPublisher<K, V> build() {
            return new KafkaPublisher(this);
        }

        protected void validate() throws AxonConfigurationException {
            BuilderUtils.assertNonNull(this.producerFactory, (String)"The ProducerFactory is a hard requirement and should be provided");
            if (this.serializer == null) {
                logger.warn("The default XStreamSerializer is used, whereas it is strongly recommended to configure the security context of the XStream instance.", (Throwable)new AxonConfigurationException("A default XStreamSerializer is used, without specifying the security context"));
                this.serializer = () -> XStreamSerializer.builder().xStream(new XStream((HierarchicalStreamDriver)new CompactDriver())).build();
            }
            if (this.messageConverter == null) {
                this.messageConverter = DefaultKafkaMessageConverter.builder().serializer(this.serializer.get()).build();
            }
        }
    }
}

