package org.axonframework.kafka.eventhandling;

import java.time.Instant;
import java.util.Arrays;
import java.util.Optional;
import java.util.function.BiFunction;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.axonframework.common.Assert;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.GenericEventMessage;
import org.axonframework.eventhandling.async.SequencingPolicy;
import org.axonframework.eventhandling.async.SequentialPerAggregatePolicy;
import org.axonframework.eventsourcing.GenericDomainEventMessage;
import org.axonframework.messaging.MetaData;
import org.axonframework.serialization.LazyDeserializingObject;
import org.axonframework.serialization.SerializedMessage;
import org.axonframework.serialization.SerializedObject;
import org.axonframework.serialization.Serializer;
import org.axonframework.serialization.SimpleSerializedObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/axonframework/kafka/eventhandling/DefaultKafkaMessageConverter.class */
public class DefaultKafkaMessageConverter implements KafkaMessageConverter<String, byte[]> {
    private static final Logger logger = LoggerFactory.getLogger(KafkaMessageConverter.class);
    private final Serializer serializer;
    private final SequencingPolicy<? super EventMessage<?>> sequencingPolicy;
    private final BiFunction<String, Object, RecordHeader> headerValueMapper;

    public DefaultKafkaMessageConverter(Serializer serializer) {
        this(serializer, SequentialPerAggregatePolicy.instance(), HeaderUtils.byteMapper());
    }

    public DefaultKafkaMessageConverter(Serializer serializer, SequencingPolicy<? super EventMessage<?>> sequencingPolicy, BiFunction<String, Object, RecordHeader> biFunction) {
        Assert.notNull(serializer, () -> {
            return "Serializer may not be null";
        });
        Assert.notNull(sequencingPolicy, () -> {
            return "SequencingPolicy may not be null";
        });
        Assert.notNull(biFunction, () -> {
            return "HeaderValueMapper may not be null";
        });
        this.serializer = serializer;
        this.sequencingPolicy = sequencingPolicy;
        this.headerValueMapper = biFunction;
    }

    @Override // org.axonframework.kafka.eventhandling.KafkaMessageConverter
    public ProducerRecord<String, byte[]> createKafkaMessage(EventMessage<?> eventMessage, String str) {
        SerializedObject serializePayload = eventMessage.serializePayload(this.serializer, byte[].class);
        return new ProducerRecord<>(str, (Integer) null, (Long) null, key(eventMessage), (byte[]) serializePayload.getData(), HeaderUtils.toHeaders(eventMessage, serializePayload, this.headerValueMapper));
    }

    private String key(EventMessage<?> eventMessage) {
        Object sequenceIdentifierFor = this.sequencingPolicy.getSequenceIdentifierFor(eventMessage);
        if (sequenceIdentifierFor != null) {
            return sequenceIdentifierFor.toString();
        }
        return null;
    }

    @Override // org.axonframework.kafka.eventhandling.KafkaMessageConverter
    public Optional<EventMessage<?>> readKafkaMessage(ConsumerRecord<String, byte[]> consumerRecord) {
        try {
            Headers headers = consumerRecord.headers();
            if (isAxonMessage(headers)) {
                return buildMessage(headers, extractSerializedMessage(headers, (byte[]) consumerRecord.value()));
            }
        } catch (Exception e) {
            logger.trace("Error converting {} to axon", consumerRecord, e);
        }
        return Optional.empty();
    }

    private Optional<EventMessage<?>> buildMessage(Headers headers, SerializedMessage<?> serializedMessage) {
        long longValue = HeaderUtils.valueAsLong(headers, "axon-message-timestamp").longValue();
        return headers.lastHeader("axon-message-aggregate-id") != null ? domainEvent(headers, serializedMessage, longValue) : event(serializedMessage, longValue);
    }

    private SerializedMessage<?> extractSerializedMessage(Headers headers, byte[] bArr) {
        return new SerializedMessage<>(HeaderUtils.valueAsString(headers, "axon-message-id"), new LazyDeserializingObject(new SimpleSerializedObject(bArr, byte[].class, HeaderUtils.valueAsString(headers, "axon-message-type"), HeaderUtils.valueAsString(headers, "axon-message-revision", null)), this.serializer), new LazyDeserializingObject(MetaData.from(HeaderUtils.extractAxonMetadata(headers))));
    }

    private boolean isAxonMessage(Headers headers) {
        return HeaderUtils.keys(headers).containsAll(Arrays.asList("axon-message-id", "axon-message-type"));
    }

    private Optional<EventMessage<?>> domainEvent(Headers headers, SerializedMessage<?> serializedMessage, long j) {
        return Optional.of(new GenericDomainEventMessage(HeaderUtils.valueAsString(headers, "axon-message-aggregate-type"), HeaderUtils.valueAsString(headers, "axon-message-aggregate-id"), HeaderUtils.valueAsLong(headers, "axon-message-aggregate-seq").longValue(), serializedMessage, () -> {
            return Instant.ofEpochMilli(j);
        }));
    }

    private Optional<EventMessage<?>> event(SerializedMessage<?> serializedMessage, long j) {
        return Optional.of(new GenericEventMessage(serializedMessage, () -> {
            return Instant.ofEpochMilli(j);
        }));
    }
}
