package org.axonframework.commandhandling.distributed;

import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReference;
import org.axonframework.commandhandling.CommandBus;
import org.axonframework.commandhandling.CommandCallback;
import org.axonframework.commandhandling.CommandMessage;
import org.axonframework.commandhandling.GenericCommandResultMessage;
import org.axonframework.commandhandling.MonitorAwareCallback;
import org.axonframework.commandhandling.NoHandlerForCommandException;
import org.axonframework.commandhandling.callbacks.LoggingCallback;
import org.axonframework.commandhandling.distributed.commandfilter.CommandNameFilter;
import org.axonframework.commandhandling.distributed.commandfilter.DenyAll;
import org.axonframework.commandhandling.distributed.commandfilter.DenyCommandNameFilter;
import org.axonframework.common.BuilderUtils;
import org.axonframework.common.Registration;
import org.axonframework.messaging.MessageDispatchInterceptor;
import org.axonframework.messaging.MessageHandler;
import org.axonframework.messaging.MessageHandlerInterceptor;
import org.axonframework.monitoring.MessageMonitor;
import org.axonframework.monitoring.NoOpMessageMonitor;

/* loaded from: input_file:org/axonframework/commandhandling/distributed/DistributedCommandBus.class */
public class DistributedCommandBus implements CommandBus {
    public static final int INITIAL_LOAD_FACTOR = 100;
    private static final String DISPATCH_ERROR_MESSAGE = "An error occurred while trying to dispatch a command on the DistributedCommandBus";
    private final CommandRouter commandRouter;
    private final CommandBusConnector connector;
    private final MessageMonitor<? super CommandMessage<?>> messageMonitor;
    private final CommandCallback<Object, Object> defaultCommandCallback;
    private final List<MessageDispatchInterceptor<? super CommandMessage<?>>> dispatchInterceptors = new CopyOnWriteArrayList();
    private final AtomicReference<CommandMessageFilter> commandFilter = new AtomicReference<>(DenyAll.INSTANCE);
    private volatile int loadFactor = 100;

    /* loaded from: input_file:org/axonframework/commandhandling/distributed/DistributedCommandBus$Builder.class */
    public static class Builder {
        private CommandRouter commandRouter;
        private CommandBusConnector connector;
        private CommandCallback<Object, Object> defaultCommandCallback = null;
        private MessageMonitor<? super CommandMessage<?>> messageMonitor = NoOpMessageMonitor.INSTANCE;

        public Builder commandRouter(CommandRouter commandRouter) {
            BuilderUtils.assertNonNull(commandRouter, "CommandRouter may not be null");
            this.commandRouter = commandRouter;
            return this;
        }

        public Builder connector(CommandBusConnector commandBusConnector) {
            BuilderUtils.assertNonNull(commandBusConnector, "CommandBusConnector may not be null");
            this.connector = commandBusConnector;
            return this;
        }

        public Builder messageMonitor(MessageMonitor<? super CommandMessage<?>> messageMonitor) {
            BuilderUtils.assertNonNull(messageMonitor, "MessageMonitor may not be null");
            this.messageMonitor = messageMonitor;
            return this;
        }

        public Builder defaultCommandCallback(CommandCallback<Object, Object> commandCallback) {
            this.defaultCommandCallback = commandCallback;
            return this;
        }

        public DistributedCommandBus build() {
            return new DistributedCommandBus(this);
        }

        protected void validate() {
            BuilderUtils.assertNonNull(this.commandRouter, "The CommandRouter is a hard requirement and should be provided");
            BuilderUtils.assertNonNull(this.connector, "The CommandBusConnector is a hard requirement and should be provided");
        }
    }

    public static Builder builder() {
        return new Builder();
    }

    protected DistributedCommandBus(Builder builder) {
        builder.validate();
        this.commandRouter = builder.commandRouter;
        this.connector = builder.connector;
        this.messageMonitor = builder.messageMonitor;
        this.defaultCommandCallback = builder.defaultCommandCallback;
    }

    @Override // org.axonframework.commandhandling.CommandBus
    public <C> void dispatch(CommandMessage<C> commandMessage) {
        if (this.defaultCommandCallback != null) {
            dispatch(commandMessage, this.defaultCommandCallback);
            return;
        }
        LoggingCallback loggingCallback = LoggingCallback.INSTANCE;
        if (!NoOpMessageMonitor.INSTANCE.equals(this.messageMonitor)) {
            dispatch(commandMessage, loggingCallback);
            return;
        }
        CommandMessage<? extends C> intercept = intercept(commandMessage);
        Optional<Member> findDestination = this.commandRouter.findDestination(intercept);
        if (!findDestination.isPresent()) {
            loggingCallback.onResult(intercept, GenericCommandResultMessage.asCommandResultMessage((Throwable) new NoHandlerForCommandException(String.format("No node known to accept [%s]", intercept.getCommandName()))));
            return;
        }
        Member member = findDestination.get();
        try {
            this.connector.send(member, intercept);
        } catch (Exception e) {
            member.suspect();
            loggingCallback.onResult(intercept, GenericCommandResultMessage.asCommandResultMessage((Throwable) new CommandDispatchException("An error occurred while trying to dispatch a command on the DistributedCommandBus: " + e.getMessage(), e)));
        }
    }

    @Override // org.axonframework.commandhandling.CommandBus
    public <C, R> void dispatch(CommandMessage<C> commandMessage, CommandCallback<? super C, ? super R> commandCallback) {
        CommandMessage<? extends C> intercept = intercept(commandMessage);
        MessageMonitor.MonitorCallback onMessageIngested = this.messageMonitor.onMessageIngested(intercept);
        Optional<Member> findDestination = this.commandRouter.findDestination(intercept);
        if (!findDestination.isPresent()) {
            NoHandlerForCommandException noHandlerForCommandException = new NoHandlerForCommandException(String.format("No node known to accept [%s]", intercept.getCommandName()));
            onMessageIngested.reportFailure(noHandlerForCommandException);
            commandCallback.onResult(intercept, GenericCommandResultMessage.asCommandResultMessage((Throwable) noHandlerForCommandException));
            return;
        }
        Member member = findDestination.get();
        try {
            this.connector.send(member, intercept, new MonitorAwareCallback(commandCallback, onMessageIngested));
        } catch (Exception e) {
            onMessageIngested.reportFailure(e);
            member.suspect();
            commandCallback.onResult(intercept, GenericCommandResultMessage.asCommandResultMessage((Throwable) new CommandDispatchException("An error occurred while trying to dispatch a command on the DistributedCommandBus: " + e.getMessage(), e)));
        }
    }

    private <C> CommandMessage<? extends C> intercept(CommandMessage<C> commandMessage) {
        CommandMessage<C> commandMessage2 = commandMessage;
        Iterator<MessageDispatchInterceptor<? super CommandMessage<?>>> it = this.dispatchInterceptors.iterator();
        while (it.hasNext()) {
            commandMessage2 = (CommandMessage) it.next().handle((MessageDispatchInterceptor<? super CommandMessage<?>>) commandMessage2);
        }
        return (CommandMessage<? extends C>) commandMessage2;
    }

    @Override // org.axonframework.commandhandling.CommandBus
    public Registration subscribe(String str, MessageHandler<? super CommandMessage<?>> messageHandler) {
        Registration subscribe = this.connector.subscribe(str, messageHandler);
        updateFilter(this.commandFilter.get().or(new CommandNameFilter(str)));
        return () -> {
            updateFilter(this.commandFilter.get().and(new DenyCommandNameFilter(str)));
            return subscribe.cancel();
        };
    }

    private void updateFilter(CommandMessageFilter commandMessageFilter) {
        if (this.commandFilter.getAndSet(commandMessageFilter).equals(commandMessageFilter)) {
            return;
        }
        this.commandRouter.updateMembership(this.loadFactor, commandMessageFilter);
    }

    public int getLoadFactor() {
        return this.loadFactor;
    }

    public void updateLoadFactor(int i) {
        this.loadFactor = i;
        this.commandRouter.updateMembership(i, this.commandFilter.get());
    }

    @Override // org.axonframework.messaging.MessageDispatchInterceptorSupport
    public Registration registerDispatchInterceptor(MessageDispatchInterceptor<? super CommandMessage<?>> messageDispatchInterceptor) {
        this.dispatchInterceptors.add(messageDispatchInterceptor);
        return () -> {
            return this.dispatchInterceptors.remove(messageDispatchInterceptor);
        };
    }

    @Override // org.axonframework.messaging.MessageHandlerInterceptorSupport
    public Registration registerHandlerInterceptor(MessageHandlerInterceptor<? super CommandMessage<?>> messageHandlerInterceptor) {
        return this.connector.registerHandlerInterceptor(messageHandlerInterceptor);
    }
}
