/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.extensions.kafka.eventhandling.cloudevent;

import io.cloudevents.CloudEvent;
import io.cloudevents.core.v1.CloudEventBuilder;
import java.net.URI;
import java.time.Instant;
import java.time.ZoneOffset;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.BuilderUtils;
import org.axonframework.eventhandling.EventData;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.GenericDomainEventEntry;
import org.axonframework.eventhandling.GenericDomainEventMessage;
import org.axonframework.eventhandling.GenericEventMessage;
import org.axonframework.eventhandling.async.SequencingPolicy;
import org.axonframework.eventhandling.async.SequentialPerAggregatePolicy;
import org.axonframework.extensions.kafka.eventhandling.KafkaMessageConverter;
import org.axonframework.extensions.kafka.eventhandling.cloudevent.ExtensionUtils;
import org.axonframework.extensions.kafka.eventhandling.cloudevent.MetadataUtils;
import org.axonframework.messaging.MetaData;
import org.axonframework.serialization.LazyDeserializingObject;
import org.axonframework.serialization.SerializedMessage;
import org.axonframework.serialization.SerializedObject;
import org.axonframework.serialization.Serializer;
import org.axonframework.serialization.upcasting.event.EventUpcaster;
import org.axonframework.serialization.upcasting.event.EventUpcasterChain;
import org.axonframework.serialization.upcasting.event.InitialEventRepresentation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CloudEventKafkaMessageConverter
implements KafkaMessageConverter<String, CloudEvent> {
    private static final Logger logger = LoggerFactory.getLogger(CloudEventKafkaMessageConverter.class);
    private final Serializer serializer;
    private final SequencingPolicy<? super EventMessage<?>> sequencingPolicy;
    private final EventUpcasterChain upcasterChain;
    private final Map<String, String> extensionNameResolver;
    private final Map<String, String> metadataNameResolver;
    private final Function<EventMessage<?>, URI> sourceSupplier;
    private final Function<EventMessage<?>, Optional<String>> subjectSupplier;
    private final Function<EventMessage<?>, Optional<String>> dataContentTypeSupplier;
    private final Function<EventMessage<?>, Optional<URI>> dataSchemaSupplier;
    private final boolean ignoreInvalidExtensionNames;

    protected CloudEventKafkaMessageConverter(Builder builder) {
        builder.validate();
        this.serializer = builder.serializer;
        this.sequencingPolicy = builder.sequencingPolicy;
        this.upcasterChain = builder.upcasterChain;
        this.extensionNameResolver = builder.metadataToExtensionMap;
        this.metadataNameResolver = builder.metadataToExtensionMap.entrySet().stream().collect(Collectors.toMap(Map.Entry::getValue, Map.Entry::getKey));
        this.sourceSupplier = builder.sourceSupplier;
        this.subjectSupplier = builder.subjectSupplier;
        this.dataContentTypeSupplier = builder.dataContentTypeSupplier;
        this.dataSchemaSupplier = builder.dataSchemaSupplier;
        this.ignoreInvalidExtensionNames = builder.ignoreInvalidExtensionNames;
    }

    public static Builder builder() {
        return new Builder();
    }

    @Override
    public ProducerRecord<String, CloudEvent> createKafkaMessage(EventMessage<?> eventMessage, String topic) {
        SerializedObject serializedObject = eventMessage.serializePayload(this.serializer, byte[].class);
        return new ProducerRecord(topic, null, null, (Object)this.recordKey(eventMessage), (Object)this.toCloudEvent(eventMessage, (SerializedObject<byte[]>)serializedObject), null);
    }

    private CloudEvent toCloudEvent(EventMessage<?> message, SerializedObject<byte[]> serializedObject) {
        CloudEventBuilder builder = new CloudEventBuilder();
        builder.withId(message.getIdentifier());
        builder.withData((byte[])serializedObject.getData());
        this.subjectSupplier.apply(message).ifPresent(arg_0 -> ((CloudEventBuilder)builder).withSubject(arg_0));
        this.dataContentTypeSupplier.apply(message).ifPresent(arg_0 -> ((CloudEventBuilder)builder).withDataContentType(arg_0));
        this.dataSchemaSupplier.apply(message).ifPresent(arg_0 -> ((CloudEventBuilder)builder).withDataSchema(arg_0));
        builder.withSource(this.sourceSupplier.apply(message));
        builder.withType(serializedObject.getType().getName());
        builder.withTime(message.getTimestamp().atOffset(ZoneOffset.UTC));
        ExtensionUtils.setExtensions(builder, message, serializedObject, this.extensionNameResolver, this.ignoreInvalidExtensionNames);
        return builder.build();
    }

    private String recordKey(EventMessage<?> eventMessage) {
        Object sequenceIdentifier = this.sequencingPolicy.getSequenceIdentifierFor(eventMessage);
        return sequenceIdentifier != null ? sequenceIdentifier.toString() : null;
    }

    @Override
    public Optional<EventMessage<?>> readKafkaMessage(ConsumerRecord<String, CloudEvent> consumerRecord) {
        try {
            CloudEvent cloudEvent = (CloudEvent)consumerRecord.value();
            EventData<?> eventData = this.createEventData(cloudEvent, consumerRecord.timestamp());
            return this.upcasterChain.upcast(Stream.of(new InitialEventRepresentation(eventData, this.serializer))).findFirst().map(upcastedEventData -> new SerializedMessage(upcastedEventData.getMessageIdentifier(), new LazyDeserializingObject(upcastedEventData.getData(), this.serializer), upcastedEventData.getMetaData())).flatMap(serializedMessage -> CloudEventKafkaMessageConverter.buildMessage(cloudEvent, serializedMessage, consumerRecord.timestamp()));
        }
        catch (Exception e) {
            logger.trace("Error converting ConsumerRecord [{}] to an EventMessage", consumerRecord, (Object)e);
            return Optional.empty();
        }
    }

    private EventData<?> createEventData(CloudEvent cloudEvent, long fallBackTimestamp) {
        return new GenericDomainEventEntry(ExtensionUtils.asNullableString(cloudEvent.getExtension("axonmessageaggregatetype")), ExtensionUtils.asNullableString(cloudEvent.getExtension("axonmessageaggregateid")), ExtensionUtils.asLong(cloudEvent.getExtension("axonmessageaggregateseq")).longValue(), cloudEvent.getId(), (Object)ExtensionUtils.asOffsetDateTime(cloudEvent.getTime(), fallBackTimestamp), cloudEvent.getType(), ExtensionUtils.asNullableString(cloudEvent.getExtension("axonmessagerevision")), (Object)ExtensionUtils.asBytes(cloudEvent.getData()), (Object)this.extractMetadataAsBytes(cloudEvent));
    }

    private byte[] extractMetadataAsBytes(CloudEvent cloudEvent) {
        MetaData metaData = ExtensionUtils.getExtensionsAsMetadata(cloudEvent, this.metadataNameResolver).mergedWith(MetadataUtils.getAdditionalEntries(cloudEvent));
        return (byte[])this.serializer.serialize((Object)metaData, byte[].class).getData();
    }

    private static boolean isDomainEvent(CloudEvent cloudEvent) {
        return cloudEvent.getExtension("axonmessageaggregatetype") != null && cloudEvent.getExtension("axonmessageaggregateid") != null && cloudEvent.getExtension("axonmessageaggregateseq") != null;
    }

    private static Optional<EventMessage<?>> buildMessage(CloudEvent cloudEvent, SerializedMessage<?> message, long fallbackTimestamp) {
        Instant timestamp = Instant.from(ExtensionUtils.asOffsetDateTime(cloudEvent.getTime(), fallbackTimestamp));
        return CloudEventKafkaMessageConverter.isDomainEvent(cloudEvent) ? CloudEventKafkaMessageConverter.buildDomainEventMessage(cloudEvent, message, timestamp) : CloudEventKafkaMessageConverter.buildEventMessage(message, timestamp);
    }

    private static Optional<EventMessage<?>> buildDomainEventMessage(CloudEvent cloudEvent, SerializedMessage<?> message, Instant timestamp) {
        return Optional.of(new GenericDomainEventMessage(ExtensionUtils.asNullableString(cloudEvent.getExtension("axonmessageaggregatetype")), ExtensionUtils.asNullableString(cloudEvent.getExtension("axonmessageaggregateid")), ExtensionUtils.asLong(cloudEvent.getExtension("axonmessageaggregateseq")).longValue(), message, () -> timestamp));
    }

    private static Optional<EventMessage<?>> buildEventMessage(SerializedMessage<?> message, Instant timestamp) {
        return Optional.of(new GenericEventMessage(message, () -> timestamp));
    }

    public static class Builder {
        private Serializer serializer;
        private SequencingPolicy<? super EventMessage<?>> sequencingPolicy = SequentialPerAggregatePolicy.instance();
        private EventUpcasterChain upcasterChain = new EventUpcasterChain(new EventUpcaster[0]);
        private final Map<String, String> metadataToExtensionMap = this.tracingMap();
        private Function<EventMessage<?>, URI> sourceSupplier = m -> URI.create("https://www.axoniq.io/");
        private Function<EventMessage<?>, Optional<String>> subjectSupplier = MetadataUtils.defaultSubjectSupplier();
        private Function<EventMessage<?>, Optional<String>> dataContentTypeSupplier = MetadataUtils.defaultDataContentTypeSupplier();
        private Function<EventMessage<?>, Optional<URI>> dataSchemaSupplier = MetadataUtils.defaultDataSchemaSupplier();
        private boolean ignoreInvalidExtensionNames = false;

        private Map<String, String> tracingMap() {
            HashMap<String, String> map = new HashMap<String, String>();
            map.put("traceId", "traceid");
            map.put("correlationId", "correlationid");
            return map;
        }

        public Builder serializer(Serializer serializer) {
            BuilderUtils.assertNonNull((Object)serializer, (String)"Serializer may not be null");
            this.serializer = serializer;
            return this;
        }

        public Builder sequencingPolicy(SequencingPolicy<? super EventMessage<?>> sequencingPolicy) {
            BuilderUtils.assertNonNull(sequencingPolicy, (String)"SequencingPolicy may not be null");
            this.sequencingPolicy = sequencingPolicy;
            return this;
        }

        public Builder upcasterChain(EventUpcasterChain upcasterChain) {
            BuilderUtils.assertNonNull((Object)upcasterChain, (String)"UpcasterChain must not be null");
            this.upcasterChain = upcasterChain;
            return this;
        }

        public Builder addMetadataMappers(Map<String, String> metadataMappers) {
            BuilderUtils.assertThat(metadataMappers, ExtensionUtils::isValidMetadataToExtensionMap, (String)"The metadataMappers has invalid extension names");
            this.metadataToExtensionMap.putAll(metadataMappers);
            return this;
        }

        public Builder addMetadataMapper(String metadataKey, String extensionName) {
            BuilderUtils.assertThat((Object)extensionName, ExtensionUtils::isValidExtensionName, (String)"The extension name is invalid");
            this.metadataToExtensionMap.put(metadataKey, extensionName);
            return this;
        }

        public Builder sourceSupplier(Function<EventMessage<?>, URI> sourceSupplier) {
            BuilderUtils.assertNonNull(sourceSupplier, (String)"sourceSupplier must not be null");
            this.sourceSupplier = sourceSupplier;
            return this;
        }

        public Builder subjectSupplier(Function<EventMessage<?>, Optional<String>> subjectSupplier) {
            BuilderUtils.assertNonNull(subjectSupplier, (String)"dataContentTypeSupplier must not be null");
            this.subjectSupplier = subjectSupplier;
            return this;
        }

        public Builder dataContentTypeSupplier(Function<EventMessage<?>, Optional<String>> dataContentTypeSupplier) {
            BuilderUtils.assertNonNull(dataContentTypeSupplier, (String)"dataContentTypeSupplier must not be null");
            this.dataContentTypeSupplier = dataContentTypeSupplier;
            return this;
        }

        public Builder dataSchemaSupplier(Function<EventMessage<?>, Optional<URI>> dataSchemaSupplier) {
            BuilderUtils.assertNonNull(dataSchemaSupplier, (String)"dataSchemaSupplier must not be null");
            this.dataSchemaSupplier = dataSchemaSupplier;
            return this;
        }

        public Builder ignoreInvalidExtensionNames(boolean ignoreInvalidExtensionNames) {
            this.ignoreInvalidExtensionNames = ignoreInvalidExtensionNames;
            return this;
        }

        public CloudEventKafkaMessageConverter build() {
            return new CloudEventKafkaMessageConverter(this);
        }

        protected void validate() throws AxonConfigurationException {
            BuilderUtils.assertNonNull((Object)this.serializer, (String)"The Serializer is a hard requirement and should be provided");
        }
    }
}

