/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.kafka.eventhandling.consumer;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.axonframework.common.Assert;
import org.axonframework.common.AxonThreadFactory;
import org.axonframework.common.stream.BlockingStream;
import org.axonframework.eventhandling.TrackedEventMessage;
import org.axonframework.kafka.eventhandling.DefaultKafkaMessageConverter;
import org.axonframework.kafka.eventhandling.KafkaMessageConverter;
import org.axonframework.kafka.eventhandling.consumer.Buffer;
import org.axonframework.kafka.eventhandling.consumer.ConsumerFactory;
import org.axonframework.kafka.eventhandling.consumer.ConsumerUtil;
import org.axonframework.kafka.eventhandling.consumer.DefaultConsumerFactory;
import org.axonframework.kafka.eventhandling.consumer.FetchEventsTask;
import org.axonframework.kafka.eventhandling.consumer.Fetcher;
import org.axonframework.kafka.eventhandling.consumer.KafkaEventMessage;
import org.axonframework.kafka.eventhandling.consumer.KafkaMessageStream;
import org.axonframework.kafka.eventhandling.consumer.KafkaTrackingToken;
import org.axonframework.kafka.eventhandling.consumer.SortedKafkaMessageBuffer;
import org.axonframework.serialization.Serializer;
import org.axonframework.serialization.xml.XStreamSerializer;

public class AsyncFetcher<K, V>
implements Fetcher {
    private final Supplier<Buffer<KafkaEventMessage>> bufferFactory;
    private final ExecutorService pool;
    private final KafkaMessageConverter<K, V> converter;
    private final ConsumerFactory<K, V> consumerFactory;
    private final String topic;
    private final BiFunction<ConsumerRecord<K, V>, KafkaTrackingToken, Void> callback;
    private final long pollTimeout;
    private final boolean requirePoolShutdown;
    private final Set<FetchEventsTask> activeFetchers = ConcurrentHashMap.newKeySet();

    private AsyncFetcher(Builder<K, V> builder) {
        this.bufferFactory = ((Builder)builder).bufferFactory;
        this.consumerFactory = ((Builder)builder).consumerFactory;
        this.converter = ((Builder)builder).converter;
        this.topic = ((Builder)builder).topic;
        this.requirePoolShutdown = ((Builder)builder).requirePoolShutdown;
        this.pool = ((Builder)builder).pool;
        this.callback = ((Builder)builder).callback;
        this.pollTimeout = ((Builder)builder).pollTimeout;
    }

    public static <K, V> Builder<K, V> builder(Map<String, Object> consumerConfig) {
        return AsyncFetcher.builder(new DefaultConsumerFactory(consumerConfig));
    }

    public static <K, V> Builder<K, V> builder(ConsumerFactory<K, V> consumerFactory) {
        return new Builder(consumerFactory);
    }

    @Override
    public BlockingStream<TrackedEventMessage<?>> start(KafkaTrackingToken token) {
        Consumer<K, V> consumer = this.consumerFactory.createConsumer();
        ConsumerUtil.seek(this.topic, consumer, token);
        if (KafkaTrackingToken.isEmpty(token)) {
            token = KafkaTrackingToken.emptyToken();
        }
        Buffer<KafkaEventMessage> buffer = this.bufferFactory.get();
        FetchEventsTask<K, V> fetcherTask = new FetchEventsTask<K, V>(consumer, token, buffer, this.converter, this.callback, this.pollTimeout, this.activeFetchers::remove);
        this.activeFetchers.add(fetcherTask);
        this.pool.execute(fetcherTask);
        return new KafkaMessageStream(buffer, fetcherTask::close);
    }

    @Override
    public void shutdown() {
        this.activeFetchers.forEach(FetchEventsTask::close);
        if (this.requirePoolShutdown) {
            this.pool.shutdown();
        }
    }

    public static final class Builder<K, V> {
        private final ConsumerFactory<K, V> consumerFactory;
        private Supplier<Buffer<KafkaEventMessage>> bufferFactory = SortedKafkaMessageBuffer::new;
        private KafkaMessageConverter<K, V> converter = new DefaultKafkaMessageConverter((Serializer)new XStreamSerializer());
        private String topic = "Axon.Events";
        private long pollTimeout = 5000L;
        private ExecutorService pool = Executors.newCachedThreadPool((ThreadFactory)new AxonThreadFactory("AsyncFetcher-pool-thread"));
        private BiFunction<ConsumerRecord<K, V>, KafkaTrackingToken, Void> callback = (r, t) -> null;
        private boolean requirePoolShutdown = true;

        private Builder(ConsumerFactory<K, V> consumerFactory) {
            Assert.notNull(consumerFactory, () -> "ConsumerFactory may not be null");
            this.consumerFactory = consumerFactory;
        }

        public Builder<K, V> withPool(ExecutorService sevice) {
            Assert.notNull((Object)sevice, () -> "Pool may not be null");
            this.requirePoolShutdown = false;
            this.pool = sevice;
            return this;
        }

        public Builder<K, V> onRecordPublished(BiFunction<ConsumerRecord<K, V>, KafkaTrackingToken, Void> callback) {
            Assert.notNull(callback, () -> "Callback may not be null");
            this.callback = callback;
            return this;
        }

        public Builder<K, V> withPollTimeout(long timeout, TimeUnit unit) {
            this.pollTimeout = unit.toMillis(timeout);
            return this;
        }

        public Builder<K, V> withMessageConverter(KafkaMessageConverter<K, V> converter) {
            Assert.notNull(converter, () -> "Converter may not be null");
            this.converter = converter;
            return this;
        }

        public Builder<K, V> withTopic(String topic) {
            Assert.notNull((Object)topic, () -> "Topic may not be null");
            this.topic = topic;
            return this;
        }

        public Builder<K, V> withBufferFactory(Supplier<Buffer<KafkaEventMessage>> bufferFactory) {
            Assert.notNull(bufferFactory, () -> "Buffer factory may not be null");
            this.bufferFactory = bufferFactory;
            return this;
        }

        public Fetcher build() {
            return new AsyncFetcher(this);
        }
    }
}

