/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.extensions.kafka.eventhandling.consumer.streamable;

import com.thoughtworks.xstream.XStream;
import com.thoughtworks.xstream.io.HierarchicalStreamDriver;
import java.lang.invoke.MethodHandles;
import java.time.Instant;
import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;
import org.apache.kafka.clients.consumer.Consumer;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.BuilderUtils;
import org.axonframework.common.Registration;
import org.axonframework.common.stream.BlockingStream;
import org.axonframework.eventhandling.TrackedEventMessage;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.extensions.kafka.eventhandling.DefaultKafkaMessageConverter;
import org.axonframework.extensions.kafka.eventhandling.KafkaMessageConverter;
import org.axonframework.extensions.kafka.eventhandling.consumer.ConsumerFactory;
import org.axonframework.extensions.kafka.eventhandling.consumer.ConsumerSeekUtil;
import org.axonframework.extensions.kafka.eventhandling.consumer.DefaultConsumerFactory;
import org.axonframework.extensions.kafka.eventhandling.consumer.Fetcher;
import org.axonframework.extensions.kafka.eventhandling.consumer.TopicSubscriber;
import org.axonframework.extensions.kafka.eventhandling.consumer.TopicSubscriberBuilder;
import org.axonframework.extensions.kafka.eventhandling.consumer.streamable.Buffer;
import org.axonframework.extensions.kafka.eventhandling.consumer.streamable.ConsumerPositionsUtil;
import org.axonframework.extensions.kafka.eventhandling.consumer.streamable.KafkaEventMessage;
import org.axonframework.extensions.kafka.eventhandling.consumer.streamable.KafkaMessageStream;
import org.axonframework.extensions.kafka.eventhandling.consumer.streamable.KafkaTrackingToken;
import org.axonframework.extensions.kafka.eventhandling.consumer.streamable.SortedKafkaMessageBuffer;
import org.axonframework.extensions.kafka.eventhandling.consumer.streamable.TrackingRecordConverter;
import org.axonframework.messaging.StreamableMessageSource;
import org.axonframework.serialization.Serializer;
import org.axonframework.serialization.xml.CompactDriver;
import org.axonframework.serialization.xml.XStreamSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamableKafkaMessageSource<K, V>
implements StreamableMessageSource<TrackedEventMessage<?>> {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private final TopicSubscriber subscriber;
    private final ConsumerFactory<K, V> consumerFactory;
    private final Fetcher<K, V, KafkaEventMessage> fetcher;
    private final KafkaMessageConverter<K, V> messageConverter;
    private final Supplier<Buffer<KafkaEventMessage>> bufferFactory;

    protected StreamableKafkaMessageSource(Builder<K, V> builder) {
        builder.validate();
        this.subscriber = builder.getSubscriber();
        this.consumerFactory = ((Builder)builder).consumerFactory;
        this.fetcher = ((Builder)builder).fetcher;
        this.messageConverter = ((Builder)builder).messageConverter;
        this.bufferFactory = ((Builder)builder).bufferFactory;
    }

    public static <K, V> Builder<K, V> builder() {
        return new Builder();
    }

    public BlockingStream<TrackedEventMessage<?>> openStream(TrackingToken trackingToken) {
        KafkaTrackingToken token = KafkaTrackingToken.from(trackingToken);
        TrackingRecordConverter<K, V> recordConverter = new TrackingRecordConverter<K, V>(this.messageConverter, token);
        logger.debug("Will start consuming from topics: ", (Object)this.subscriber.describe());
        Consumer<K, V> consumer = this.consumerFactory.createConsumer(null);
        ConsumerSeekUtil.seekToCurrentPositions(consumer, recordConverter::currentToken, this.subscriber);
        Buffer<KafkaEventMessage> buffer = this.bufferFactory.get();
        Registration closeHandler = this.fetcher.poll(consumer, recordConverter, buffer::putAll, buffer::setException);
        return new KafkaMessageStream(buffer, closeHandler);
    }

    public TrackingToken createHeadToken() {
        return KafkaTrackingToken.newInstance(ConsumerPositionsUtil.getHeadPositions(this.consumerFactory.createConsumer(null), this.subscriber));
    }

    public TrackingToken createTokenAt(Instant dateTime) {
        return KafkaTrackingToken.newInstance(ConsumerPositionsUtil.getPositionsBasedOnTime(this.consumerFactory.createConsumer(null), this.subscriber, dateTime));
    }

    public static class Builder<K, V>
    extends TopicSubscriberBuilder<Builder<K, V>> {
        private ConsumerFactory<K, V> consumerFactory;
        private Fetcher<K, V, KafkaEventMessage> fetcher;
        private KafkaMessageConverter<K, V> messageConverter;
        private Supplier<Buffer<KafkaEventMessage>> bufferFactory = SortedKafkaMessageBuffer::new;
        private Supplier<Serializer> serializer;

        public Builder<K, V> serializer(Serializer serializer) {
            BuilderUtils.assertNonNull((Object)serializer, (String)"The Serializer may not be null");
            this.serializer = () -> serializer;
            return this;
        }

        @Override
        protected Builder<K, V> self() {
            return this;
        }

        @Deprecated
        public Builder<K, V> groupIdPrefix(String groupIdPrefix) {
            logger.warn("Using groupIdPrefix in the StreamableKafkaMessageSource.Builder has been deprecated and already effectively does nothing.");
            BuilderUtils.assertThat((Object)groupIdPrefix, name -> Objects.nonNull(name) && !"".equals(name), (String)"The groupIdPrefix may not be null or empty");
            return this;
        }

        @Deprecated
        public Builder<K, V> groupIdSuffixFactory(Supplier<String> groupIdSuffixFactory) {
            logger.warn("Using groupIdSuffixFactory in the StreamableKafkaMessageSource.Builder has been deprecated and already effectively does nothing.");
            BuilderUtils.assertNonNull(groupIdSuffixFactory, (String)"GroupIdSuffixFactory may not be null");
            return this;
        }

        public Builder<K, V> consumerFactory(ConsumerFactory<K, V> consumerFactory) {
            BuilderUtils.assertNonNull(consumerFactory, (String)"ConsumerFactory may not be null");
            this.consumerFactory = consumerFactory;
            return this;
        }

        public Builder<K, V> consumerFactory(Map<String, Object> consumerConfiguration) {
            this.consumerFactory = new DefaultConsumerFactory(consumerConfiguration);
            return this;
        }

        public Builder<K, V> fetcher(Fetcher<K, V, KafkaEventMessage> fetcher) {
            BuilderUtils.assertNonNull(fetcher, (String)"Fetcher may not be null");
            this.fetcher = fetcher;
            return this;
        }

        public Builder<K, V> messageConverter(KafkaMessageConverter<K, V> messageConverter) {
            BuilderUtils.assertNonNull(messageConverter, (String)"MessageConverter may not be null");
            this.messageConverter = messageConverter;
            return this;
        }

        public Builder<K, V> bufferFactory(Supplier<Buffer<KafkaEventMessage>> bufferFactory) {
            BuilderUtils.assertNonNull(bufferFactory, (String)"Buffer factory may not be null");
            this.bufferFactory = bufferFactory;
            return this;
        }

        public StreamableKafkaMessageSource<K, V> build() {
            return new StreamableKafkaMessageSource(this);
        }

        protected void validate() throws AxonConfigurationException {
            BuilderUtils.assertNonNull(this.consumerFactory, (String)"The ConsumerFactory is a hard requirement and should be provided");
            BuilderUtils.assertNonNull(this.fetcher, (String)"The Fetcher is a hard requirement and should be provided");
            if (this.serializer == null) {
                logger.warn("The default XStreamSerializer is used, whereas it is strongly recommended to configure the security context of the XStream instance.", (Throwable)new AxonConfigurationException("A default XStreamSerializer is used, without specifying the security context"));
                this.serializer = () -> XStreamSerializer.builder().xStream(new XStream((HierarchicalStreamDriver)new CompactDriver())).build();
            }
            if (this.messageConverter == null) {
                this.messageConverter = DefaultKafkaMessageConverter.builder().serializer(this.serializer.get()).build();
            }
        }
    }
}

