/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.extensions.kafka.eventhandling;

import java.time.Instant;
import java.util.Arrays;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.stream.Stream;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.BuilderUtils;
import org.axonframework.eventhandling.EventData;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.GenericDomainEventEntry;
import org.axonframework.eventhandling.GenericDomainEventMessage;
import org.axonframework.eventhandling.GenericEventMessage;
import org.axonframework.eventhandling.async.SequencingPolicy;
import org.axonframework.eventhandling.async.SequentialPerAggregatePolicy;
import org.axonframework.extensions.kafka.eventhandling.HeaderUtils;
import org.axonframework.extensions.kafka.eventhandling.KafkaMessageConverter;
import org.axonframework.messaging.MetaData;
import org.axonframework.serialization.LazyDeserializingObject;
import org.axonframework.serialization.SerializedMessage;
import org.axonframework.serialization.SerializedObject;
import org.axonframework.serialization.Serializer;
import org.axonframework.serialization.upcasting.event.EventUpcaster;
import org.axonframework.serialization.upcasting.event.EventUpcasterChain;
import org.axonframework.serialization.upcasting.event.InitialEventRepresentation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultKafkaMessageConverter
implements KafkaMessageConverter<String, byte[]> {
    private static final Logger logger = LoggerFactory.getLogger(DefaultKafkaMessageConverter.class);
    private final Serializer serializer;
    private final SequencingPolicy<? super EventMessage<?>> sequencingPolicy;
    private final BiFunction<String, Object, RecordHeader> headerValueMapper;
    private final EventUpcasterChain upcasterChain;

    protected DefaultKafkaMessageConverter(Builder builder) {
        builder.validate();
        this.serializer = builder.serializer;
        this.sequencingPolicy = builder.sequencingPolicy;
        this.headerValueMapper = builder.headerValueMapper;
        this.upcasterChain = builder.upcasterChain;
    }

    public static Builder builder() {
        return new Builder();
    }

    @Override
    public ProducerRecord<String, byte[]> createKafkaMessage(EventMessage<?> eventMessage, String topic) {
        SerializedObject serializedObject = eventMessage.serializePayload(this.serializer, byte[].class);
        return new ProducerRecord(topic, null, null, (Object)this.recordKey(eventMessage), serializedObject.getData(), (Iterable)HeaderUtils.toHeaders(eventMessage, (SerializedObject<byte[]>)serializedObject, this.headerValueMapper));
    }

    private String recordKey(EventMessage<?> eventMessage) {
        Object sequenceIdentifier = this.sequencingPolicy.getSequenceIdentifierFor(eventMessage);
        return sequenceIdentifier != null ? sequenceIdentifier.toString() : null;
    }

    @Override
    public Optional<EventMessage<?>> readKafkaMessage(ConsumerRecord<String, byte[]> consumerRecord) {
        try {
            Headers headers = consumerRecord.headers();
            if (DefaultKafkaMessageConverter.isAxonMessage(headers)) {
                byte[] messageBody = (byte[])consumerRecord.value();
                EventData<?> eventData = this.createEventData(headers, messageBody);
                return this.upcasterChain.upcast(Stream.of(new InitialEventRepresentation(eventData, this.serializer))).findFirst().map(upcastedEventData -> new SerializedMessage(upcastedEventData.getMessageIdentifier(), new LazyDeserializingObject(upcastedEventData.getData(), this.serializer), upcastedEventData.getMetaData())).flatMap(serializedMessage -> DefaultKafkaMessageConverter.buildMessage(headers, serializedMessage));
            }
        }
        catch (Exception e) {
            logger.trace("Error converting ConsumerRecord [{}] to an EventMessage", consumerRecord, (Object)e);
        }
        return Optional.empty();
    }

    private EventData<?> createEventData(Headers headers, byte[] messageBody) {
        return new GenericDomainEventEntry(HeaderUtils.valueAsString(headers, "axon-message-aggregate-type"), HeaderUtils.valueAsString(headers, "axon-message-aggregate-id"), HeaderUtils.valueAsLong(headers, "axon-message-aggregate-seq", 0L).longValue(), HeaderUtils.valueAsString(headers, "axon-message-id"), (Object)HeaderUtils.valueAsLong(headers, "axon-message-timestamp"), HeaderUtils.valueAsString(headers, "axon-message-type"), HeaderUtils.valueAsString(headers, "axon-message-revision", null), (Object)messageBody, (Object)this.extractMetadataAsBytes(headers));
    }

    private byte[] extractMetadataAsBytes(Headers headers) {
        return (byte[])this.serializer.serialize((Object)MetaData.from(HeaderUtils.extractAxonMetadata(headers)), byte[].class).getData();
    }

    private static boolean isAxonMessage(Headers headers) {
        return HeaderUtils.keys(headers).containsAll(Arrays.asList("axon-message-id", "axon-message-type"));
    }

    private static boolean isDomainEvent(Headers headers) {
        return headers.lastHeader("axon-message-aggregate-type") != null && headers.lastHeader("axon-message-aggregate-id") != null && headers.lastHeader("axon-message-aggregate-seq") != null;
    }

    private static Optional<EventMessage<?>> buildMessage(Headers headers, SerializedMessage<?> message) {
        long timestamp = HeaderUtils.valueAsLong(headers, "axon-message-timestamp");
        return DefaultKafkaMessageConverter.isDomainEvent(headers) ? DefaultKafkaMessageConverter.buildDomainEventMessage(headers, message, timestamp) : DefaultKafkaMessageConverter.buildEventMessage(message, timestamp);
    }

    private static Optional<EventMessage<?>> buildDomainEventMessage(Headers headers, SerializedMessage<?> message, long timestamp) {
        return Optional.of(new GenericDomainEventMessage(HeaderUtils.valueAsString(headers, "axon-message-aggregate-type"), HeaderUtils.valueAsString(headers, "axon-message-aggregate-id"), HeaderUtils.valueAsLong(headers, "axon-message-aggregate-seq").longValue(), message, () -> Instant.ofEpochMilli(timestamp)));
    }

    private static Optional<EventMessage<?>> buildEventMessage(SerializedMessage<?> message, long timestamp) {
        return Optional.of(new GenericEventMessage(message, () -> Instant.ofEpochMilli(timestamp)));
    }

    public static class Builder {
        private Serializer serializer;
        private SequencingPolicy<? super EventMessage<?>> sequencingPolicy = SequentialPerAggregatePolicy.instance();
        private BiFunction<String, Object, RecordHeader> headerValueMapper = HeaderUtils.byteMapper();
        private EventUpcasterChain upcasterChain = new EventUpcasterChain(new EventUpcaster[0]);

        public Builder serializer(Serializer serializer) {
            BuilderUtils.assertNonNull((Object)serializer, (String)"Serializer may not be null");
            this.serializer = serializer;
            return this;
        }

        public Builder sequencingPolicy(SequencingPolicy<? super EventMessage<?>> sequencingPolicy) {
            BuilderUtils.assertNonNull(sequencingPolicy, (String)"SequencingPolicy may not be null");
            this.sequencingPolicy = sequencingPolicy;
            return this;
        }

        public Builder headerValueMapper(BiFunction<String, Object, RecordHeader> headerValueMapper) {
            BuilderUtils.assertNonNull(headerValueMapper, (String)"{} may not be null");
            this.headerValueMapper = headerValueMapper;
            return this;
        }

        public Builder upcasterChain(EventUpcasterChain upcasterChain) {
            BuilderUtils.assertNonNull((Object)upcasterChain, (String)"UpcasterChain must not be null");
            this.upcasterChain = upcasterChain;
            return this;
        }

        public DefaultKafkaMessageConverter build() {
            return new DefaultKafkaMessageConverter(this);
        }

        protected void validate() throws AxonConfigurationException {
            BuilderUtils.assertNonNull((Object)this.serializer, (String)"The Serializer is a hard requirement and should be provided");
        }
    }
}

