package org.axonframework.extensions.kafka.eventhandling.consumer;

import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.AxonThreadFactory;
import org.axonframework.common.BuilderUtils;
import org.axonframework.common.stream.BlockingStream;
import org.axonframework.eventhandling.TrackedEventMessage;
import org.axonframework.extensions.kafka.eventhandling.DefaultKafkaMessageConverter;
import org.axonframework.extensions.kafka.eventhandling.KafkaMessageConverter;
import org.axonframework.serialization.xml.XStreamSerializer;

/* loaded from: input_file:org/axonframework/extensions/kafka/eventhandling/consumer/AsyncFetcher.class */
public class AsyncFetcher<K, V> implements Fetcher {
    private final ConsumerFactory<K, V> consumerFactory;
    private final Supplier<Buffer<KafkaEventMessage>> bufferFactory;
    private final ExecutorService executorService;
    private final boolean requirePoolShutdown;
    private final KafkaMessageConverter<K, V> messageConverter;
    private final String topic;
    private final BiFunction<ConsumerRecord<K, V>, KafkaTrackingToken, Void> consumerRecordCallback;
    private final long pollTimeout;
    private final Set<FetchEventsTask> activeFetchers = ConcurrentHashMap.newKeySet();

    /* loaded from: input_file:org/axonframework/extensions/kafka/eventhandling/consumer/AsyncFetcher$Builder.class */
    public static final class Builder<K, V> {
        private ConsumerFactory<K, V> consumerFactory;
        private Supplier<Buffer<KafkaEventMessage>> bufferFactory = SortedKafkaMessageBuffer::new;
        private ExecutorService executorService = Executors.newCachedThreadPool(new AxonThreadFactory("AsyncFetcher-pool-thread"));
        private boolean requirePoolShutdown = true;
        private KafkaMessageConverter<K, V> messageConverter = DefaultKafkaMessageConverter.builder().serializer(XStreamSerializer.builder().build()).build();
        private String topic = "Axon.Events";
        private BiFunction<ConsumerRecord<K, V>, KafkaTrackingToken, Void> consumerRecordCallback = (consumerRecord, kafkaTrackingToken) -> {
            return null;
        };
        private long pollTimeout = 5000;

        public Builder<K, V> consumerFactory(ConsumerFactory<K, V> consumerFactory) {
            BuilderUtils.assertNonNull(consumerFactory, "ConsumerFactory may not be null");
            this.consumerFactory = consumerFactory;
            return this;
        }

        public Builder<K, V> consumerFactory(Map<String, Object> map) {
            this.consumerFactory = new DefaultConsumerFactory(map);
            return this;
        }

        public Builder<K, V> executorService(ExecutorService executorService) {
            BuilderUtils.assertNonNull(executorService, "ExecutorService may not be null");
            this.requirePoolShutdown = false;
            this.executorService = executorService;
            return this;
        }

        public Builder<K, V> bufferFactory(Supplier<Buffer<KafkaEventMessage>> supplier) {
            BuilderUtils.assertNonNull(supplier, "Buffer factory may not be null");
            this.bufferFactory = supplier;
            return this;
        }

        public Builder<K, V> messageConverter(KafkaMessageConverter<K, V> kafkaMessageConverter) {
            BuilderUtils.assertNonNull(kafkaMessageConverter, "MessageConverter may not be null");
            this.messageConverter = kafkaMessageConverter;
            return this;
        }

        public Builder<K, V> topic(String str) {
            BuilderUtils.assertThat(str, str2 -> {
                return Objects.nonNull(str2) && !"".equals(str2);
            }, "The topic may not be null or empty");
            this.topic = str;
            return this;
        }

        public Builder<K, V> consumerRecordCallback(BiFunction<ConsumerRecord<K, V>, KafkaTrackingToken, Void> biFunction) {
            BuilderUtils.assertNonNull(biFunction, "The consumerRecordCallback may not be null");
            this.consumerRecordCallback = biFunction;
            return this;
        }

        public Builder<K, V> pollTimeout(long j, TimeUnit timeUnit) {
            this.pollTimeout = timeUnit.toMillis(j);
            return this;
        }

        public AsyncFetcher build() {
            return new AsyncFetcher(this);
        }

        protected void validate() throws AxonConfigurationException {
            BuilderUtils.assertNonNull(this.consumerFactory, "The ConsumerFactory is a hard requirement and should be provided");
        }
    }

    protected AsyncFetcher(Builder<K, V> builder) {
        builder.validate();
        this.consumerFactory = ((Builder) builder).consumerFactory;
        this.bufferFactory = ((Builder) builder).bufferFactory;
        this.executorService = ((Builder) builder).executorService;
        this.requirePoolShutdown = ((Builder) builder).requirePoolShutdown;
        this.messageConverter = ((Builder) builder).messageConverter;
        this.topic = ((Builder) builder).topic;
        this.consumerRecordCallback = ((Builder) builder).consumerRecordCallback;
        this.pollTimeout = ((Builder) builder).pollTimeout;
    }

    public static <K, V> Builder<K, V> builder() {
        return new Builder<>();
    }

    @Override // org.axonframework.extensions.kafka.eventhandling.consumer.Fetcher
    public BlockingStream<TrackedEventMessage<?>> start(KafkaTrackingToken kafkaTrackingToken) {
        Consumer<K, V> createConsumer = this.consumerFactory.createConsumer();
        ConsumerUtil.seek(this.topic, createConsumer, kafkaTrackingToken);
        if (KafkaTrackingToken.isEmpty(kafkaTrackingToken)) {
            kafkaTrackingToken = KafkaTrackingToken.emptyToken();
        }
        Buffer<KafkaEventMessage> buffer = this.bufferFactory.get();
        KafkaMessageConverter<K, V> kafkaMessageConverter = this.messageConverter;
        BiFunction<ConsumerRecord<K, V>, KafkaTrackingToken, Void> biFunction = this.consumerRecordCallback;
        long j = this.pollTimeout;
        Set<FetchEventsTask> set = this.activeFetchers;
        set.getClass();
        FetchEventsTask fetchEventsTask = new FetchEventsTask(createConsumer, kafkaTrackingToken, buffer, kafkaMessageConverter, biFunction, j, (v1) -> {
            r8.remove(v1);
        });
        this.activeFetchers.add(fetchEventsTask);
        this.executorService.execute(fetchEventsTask);
        fetchEventsTask.getClass();
        return new KafkaMessageStream(buffer, fetchEventsTask::close);
    }

    @Override // org.axonframework.extensions.kafka.eventhandling.consumer.Fetcher
    public void shutdown() {
        this.activeFetchers.forEach((v0) -> {
            v0.close();
        });
        if (this.requirePoolShutdown) {
            this.executorService.shutdown();
        }
    }
}
