package com.microsoft.azure.spring.integration.eventhub.support;

import com.microsoft.azure.eventprocessorhost.PartitionContext;
import com.microsoft.azure.spring.cloud.context.core.util.Tuple;
import com.microsoft.azure.spring.integration.core.api.PartitionSupplier;
import com.microsoft.azure.spring.integration.eventhub.api.EventHubClientFactory;
import com.microsoft.azure.spring.integration.eventhub.api.EventHubRxOperation;
import com.microsoft.azure.spring.integration.eventhub.impl.EventHubProcessor;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import org.springframework.messaging.Message;
import rx.Observable;
import rx.subscriptions.Subscriptions;

/* loaded from: input_file:com/microsoft/azure/spring/integration/eventhub/support/RxEventHubTestOperation.class */
public class RxEventHubTestOperation extends EventHubTestOperation implements EventHubRxOperation {
    private final ConcurrentHashMap<Tuple<String, String>, Observable<Message<?>>> subjectByNameAndGroup;

    public RxEventHubTestOperation(EventHubClientFactory eventHubClientFactory, Supplier<PartitionContext> supplier) {
        super(eventHubClientFactory, supplier);
        this.subjectByNameAndGroup = new ConcurrentHashMap<>();
    }

    private static <T> Observable<T> toObservable(CompletableFuture<T> completableFuture) {
        return Observable.create(subscriber -> {
            completableFuture.whenComplete((obj, th) -> {
                if (th != null) {
                    subscriber.onError(th);
                } else {
                    subscriber.onNext(obj);
                    subscriber.onCompleted();
                }
            });
        });
    }

    public <T> Observable<Void> sendRx(String str, Message<T> message, PartitionSupplier partitionSupplier) {
        return toObservable(sendAsync(str, message, partitionSupplier));
    }

    public Observable<Message<?>> subscribe(String str, String str2, Class<?> cls) {
        Tuple<String, String> of = Tuple.of(str, str2);
        this.subjectByNameAndGroup.computeIfAbsent(of, tuple -> {
            return Observable.create(subscriber -> {
                Objects.requireNonNull(subscriber);
                register(str, str2, new EventHubProcessor((v1) -> {
                    r5.onNext(v1);
                }, cls, getCheckpointConfig(), getMessageConverter()));
                subscriber.add(Subscriptions.create(() -> {
                    unregister(str, str2);
                }));
            }).share();
        });
        return this.subjectByNameAndGroup.get(of);
    }
}
