package org.axonframework.commandhandling.distributed;

import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import org.axonframework.commandhandling.CommandBus;
import org.axonframework.commandhandling.CommandCallback;
import org.axonframework.commandhandling.CommandMessage;
import org.axonframework.commandhandling.MonitorAwareCallback;
import org.axonframework.commandhandling.distributed.commandfilter.CommandNameFilter;
import org.axonframework.commandhandling.distributed.commandfilter.DenyAll;
import org.axonframework.commandhandling.distributed.commandfilter.DenyCommandNameFilter;
import org.axonframework.common.Assert;
import org.axonframework.common.Registration;
import org.axonframework.messaging.MessageDispatchInterceptor;
import org.axonframework.messaging.MessageHandler;
import org.axonframework.monitoring.MessageMonitor;
import org.axonframework.monitoring.NoOpMessageMonitor;

/* loaded from: input_file:org/axonframework/commandhandling/distributed/DistributedCommandBus.class */
public class DistributedCommandBus implements CommandBus {
    public static final int INITIAL_LOAD_FACTOR = 100;
    private static final String DISPATCH_ERROR_MESSAGE = "An error occurred while trying to dispatch a command on the DistributedCommandBus";
    private final CommandRouter commandRouter;
    private final CommandBusConnector connector;
    private final List<MessageDispatchInterceptor<CommandMessage<?>>> dispatchInterceptors;
    private volatile int loadFactor;
    private final AtomicReference<Predicate<CommandMessage<?>>> commandFilter;
    private final MessageMonitor<? super CommandMessage<?>> messageMonitor;

    public DistributedCommandBus(CommandRouter commandRouter, CommandBusConnector commandBusConnector) {
        this(commandRouter, commandBusConnector, NoOpMessageMonitor.INSTANCE);
    }

    public DistributedCommandBus(CommandRouter commandRouter, CommandBusConnector commandBusConnector, MessageMonitor<? super CommandMessage<?>> messageMonitor) {
        this.dispatchInterceptors = new CopyOnWriteArrayList();
        this.loadFactor = 100;
        this.commandFilter = new AtomicReference<>(DenyAll.INSTANCE);
        Assert.notNull(commandRouter, () -> {
            return "serviceRegistry may not be null";
        });
        Assert.notNull(commandBusConnector, () -> {
            return "connector may not be null";
        });
        Assert.notNull(messageMonitor, () -> {
            return "messageMonitor may not be null";
        });
        this.commandRouter = commandRouter;
        this.connector = commandBusConnector;
        this.messageMonitor = messageMonitor;
    }

    public <C> void dispatch(CommandMessage<C> commandMessage) {
        if (!NoOpMessageMonitor.INSTANCE.equals(this.messageMonitor)) {
            dispatch(commandMessage, null);
            return;
        }
        CommandMessage<? extends C> intercept = intercept(commandMessage);
        Member orElseThrow = this.commandRouter.findDestination(commandMessage).orElseThrow(() -> {
            return new CommandDispatchException("No node known to accept " + commandMessage.getCommandName());
        });
        try {
            this.connector.send(orElseThrow, intercept);
        } catch (Exception e) {
            orElseThrow.suspect();
            throw new CommandDispatchException("An error occurred while trying to dispatch a command on the DistributedCommandBus: " + e.getMessage(), e);
        }
    }

    public <C, R> void dispatch(CommandMessage<C> commandMessage, CommandCallback<? super C, R> commandCallback) {
        CommandMessage<? extends C> intercept = intercept(commandMessage);
        CommandCallback<? super C, R> monitorAwareCallback = new MonitorAwareCallback<>(commandCallback, this.messageMonitor.onMessageIngested(commandMessage));
        Member orElseThrow = this.commandRouter.findDestination(commandMessage).orElseThrow(() -> {
            return new CommandDispatchException("No node known to accept " + commandMessage.getCommandName());
        });
        try {
            this.connector.send(orElseThrow, intercept, monitorAwareCallback);
        } catch (Exception e) {
            orElseThrow.suspect();
            throw new CommandDispatchException("An error occurred while trying to dispatch a command on the DistributedCommandBus: " + e.getMessage(), e);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private <C> CommandMessage<? extends C> intercept(CommandMessage<C> commandMessage) {
        CommandMessage commandMessage2 = commandMessage;
        Iterator<MessageDispatchInterceptor<CommandMessage<?>>> it = this.dispatchInterceptors.iterator();
        while (it.hasNext()) {
            commandMessage2 = it.next().handle(commandMessage2);
        }
        return commandMessage;
    }

    public Registration subscribe(String str, MessageHandler<? super CommandMessage<?>> messageHandler) {
        Registration subscribe = this.connector.subscribe(str, messageHandler);
        updateFilter(this.commandFilter.get().or(new CommandNameFilter(str)));
        return () -> {
            updateFilter(this.commandFilter.get().and(new DenyCommandNameFilter(str)));
            return subscribe.cancel();
        };
    }

    private void updateFilter(Predicate<CommandMessage<?>> predicate) {
        if (this.commandFilter.getAndSet(predicate).equals(predicate)) {
            return;
        }
        this.commandRouter.updateMembership(this.loadFactor, predicate);
    }

    public int getLoadFactor() {
        return this.loadFactor;
    }

    public void updateLoadFactor(int i) {
        this.loadFactor = i;
        this.commandRouter.updateMembership(i, this.commandFilter.get());
    }

    public void setCommandDispatchInterceptors(Collection<MessageDispatchInterceptor<CommandMessage<?>>> collection) {
        this.dispatchInterceptors.clear();
        this.dispatchInterceptors.addAll(collection);
    }
}
