package org.axonframework.axonserver.connector.query.subscription;

import io.axoniq.axonserver.grpc.query.QueryResponse;
import io.axoniq.axonserver.grpc.query.QueryUpdate;
import io.axoniq.axonserver.grpc.query.QueryUpdateCompleteExceptionally;
import io.axoniq.axonserver.grpc.query.SubscriptionQuery;
import io.axoniq.axonserver.grpc.query.SubscriptionQueryRequest;
import io.axoniq.axonserver.grpc.query.SubscriptionQueryResponse;
import io.grpc.stub.StreamObserver;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Supplier;
import org.axonframework.axonserver.connector.AxonServerConfiguration;
import org.axonframework.axonserver.connector.ErrorCode;
import org.axonframework.axonserver.connector.Publisher;
import org.axonframework.axonserver.connector.util.FlowControllingStreamObserver;
import org.axonframework.queryhandling.DefaultSubscriptionQueryResult;
import org.axonframework.queryhandling.SubscriptionQueryBackpressure;
import org.axonframework.queryhandling.SubscriptionQueryResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;

/* loaded from: input_file:org/axonframework/axonserver/connector/query/subscription/AxonServerSubscriptionQueryResult.class */
public class AxonServerSubscriptionQueryResult implements Supplier<SubscriptionQueryResult<QueryResponse, QueryUpdate>>, StreamObserver<SubscriptionQueryResponse> {
    private final Logger logger = LoggerFactory.getLogger(AxonServerSubscriptionQueryResult.class);
    private final SubscriptionQuery subscriptionQuery;
    private final FlowControllingStreamObserver<SubscriptionQueryRequest> requestObserver;
    private final SubscriptionQueryResult<QueryResponse, QueryUpdate> result;
    private final FluxSink<QueryUpdate> updateMessageFluxSink;
    private final Runnable onDispose;
    private MonoSink<QueryResponse> initialResultSink;

    public AxonServerSubscriptionQueryResult(SubscriptionQuery subscriptionQuery, Function<StreamObserver<SubscriptionQueryResponse>, StreamObserver<SubscriptionQueryRequest>> function, AxonServerConfiguration axonServerConfiguration, SubscriptionQueryBackpressure subscriptionQueryBackpressure, int i, Runnable runnable) {
        this.subscriptionQuery = subscriptionQuery;
        this.requestObserver = new FlowControllingStreamObserver<>(function.apply(this), axonServerConfiguration, flowControl -> {
            return SubscriptionQueryRequest.newBuilder().setFlowControl(SubscriptionQuery.newBuilder(this.subscriptionQuery).setNumberOfPermits(flowControl.getPermits())).m2438build();
        }, subscriptionQueryRequest -> {
            return false;
        });
        this.requestObserver.sendInitialPermits();
        this.requestObserver.onNext(SubscriptionQueryRequest.newBuilder().setSubscribe(this.subscriptionQuery).m2438build());
        EmitterProcessor create = EmitterProcessor.create(i);
        this.updateMessageFluxSink = create.sink(subscriptionQueryBackpressure.getOverflowStrategy());
        FluxSink<QueryUpdate> fluxSink = this.updateMessageFluxSink;
        FlowControllingStreamObserver<SubscriptionQueryRequest> flowControllingStreamObserver = this.requestObserver;
        flowControllingStreamObserver.getClass();
        fluxSink.onDispose(flowControllingStreamObserver::onCompleted);
        this.result = new DefaultSubscriptionQueryResult(Mono.create(monoSink -> {
            FlowControllingStreamObserver<SubscriptionQueryRequest> flowControllingStreamObserver2 = this.requestObserver;
            flowControllingStreamObserver2.getClass();
            initialResult(monoSink, (v1) -> {
                r2.onNext(v1);
            });
        }), create.replay().autoConnect(), () -> {
            this.updateMessageFluxSink.complete();
            return true;
        });
        this.onDispose = runnable;
    }

    private void initialResult(MonoSink<QueryResponse> monoSink, Publisher<SubscriptionQueryRequest> publisher) {
        this.initialResultSink = monoSink;
        publisher.publish(SubscriptionQueryRequest.newBuilder().setGetInitialResult(this.subscriptionQuery).m2438build());
    }

    public void onNext(SubscriptionQueryResponse subscriptionQueryResponse) {
        this.requestObserver.markConsumed(1);
        switch (subscriptionQueryResponse.getResponseCase()) {
            case INITIAL_RESULT:
                Optional.ofNullable(this.initialResultSink).ifPresent(monoSink -> {
                    monoSink.success(subscriptionQueryResponse.getInitialResult());
                });
                return;
            case UPDATE:
                this.updateMessageFluxSink.next(subscriptionQueryResponse.getUpdate());
                return;
            case COMPLETE:
                this.requestObserver.onCompleted();
                complete();
                return;
            case COMPLETE_EXCEPTIONALLY:
                this.requestObserver.onCompleted();
                QueryUpdateCompleteExceptionally completeExceptionally = subscriptionQueryResponse.getCompleteExceptionally();
                completeExceptionally(ErrorCode.getFromCode(completeExceptionally.getErrorCode()).convert(completeExceptionally.getErrorMessage()));
                return;
            default:
                return;
        }
    }

    public void onError(Throwable th) {
        completeExceptionally(th);
    }

    private void completeExceptionally(Throwable th) {
        this.onDispose.run();
        updateError(th);
        initialResultError(th);
    }

    private void updateError(Throwable th) {
        try {
            this.updateMessageFluxSink.error(th);
        } catch (Exception e) {
            this.updateMessageFluxSink.complete();
            this.logger.warn("Problem signaling updates error.", e);
        }
    }

    public void onCompleted() {
        complete();
    }

    private void complete() {
        this.onDispose.run();
        this.updateMessageFluxSink.complete();
        initialResultError(new IllegalStateException("Subscription Completed"));
    }

    private void initialResultError(Throwable th) {
        try {
            Optional.ofNullable(this.initialResultSink).ifPresent(monoSink -> {
                monoSink.error(th);
            });
        } catch (Exception e) {
            this.logger.warn("Problem signaling initial result error.", e);
        }
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.function.Supplier
    public SubscriptionQueryResult<QueryResponse, QueryUpdate> get() {
        return this.result;
    }
}
