package org.axonframework.disruptor.commandhandling;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.axonframework.commandhandling.CommandBus;
import org.axonframework.commandhandling.CommandCallback;
import org.axonframework.commandhandling.CommandMessage;
import org.axonframework.commandhandling.CommandResultMessage;
import org.axonframework.commandhandling.DuplicateCommandHandlerResolution;
import org.axonframework.commandhandling.DuplicateCommandHandlerResolver;
import org.axonframework.commandhandling.GenericCommandResultMessage;
import org.axonframework.commandhandling.MonitorAwareCallback;
import org.axonframework.commandhandling.NoHandlerForCommandException;
import org.axonframework.commandhandling.callbacks.NoOpCallback;
import org.axonframework.common.Assert;
import org.axonframework.common.AxonThreadFactory;
import org.axonframework.common.BuilderUtils;
import org.axonframework.common.ObjectUtils;
import org.axonframework.common.Registration;
import org.axonframework.common.caching.Cache;
import org.axonframework.common.caching.NoCache;
import org.axonframework.common.transaction.TransactionManager;
import org.axonframework.eventsourcing.AggregateFactory;
import org.axonframework.eventsourcing.NoSnapshotTriggerDefinition;
import org.axonframework.eventsourcing.SnapshotTriggerDefinition;
import org.axonframework.eventsourcing.eventstore.EventStore;
import org.axonframework.messaging.Message;
import org.axonframework.messaging.MessageDispatchInterceptor;
import org.axonframework.messaging.MessageHandler;
import org.axonframework.messaging.MessageHandlerInterceptor;
import org.axonframework.messaging.ScopeDescriptor;
import org.axonframework.messaging.annotation.ClasspathHandlerDefinition;
import org.axonframework.messaging.annotation.ClasspathParameterResolverFactory;
import org.axonframework.messaging.annotation.HandlerDefinition;
import org.axonframework.messaging.annotation.ParameterResolverFactory;
import org.axonframework.messaging.unitofwork.RollbackConfiguration;
import org.axonframework.messaging.unitofwork.RollbackConfigurationType;
import org.axonframework.modelling.command.Aggregate;
import org.axonframework.modelling.command.AggregateNotFoundException;
import org.axonframework.modelling.command.AggregateScopeDescriptor;
import org.axonframework.modelling.command.AnnotationCommandTargetResolver;
import org.axonframework.modelling.command.CommandTargetResolver;
import org.axonframework.modelling.command.Repository;
import org.axonframework.modelling.command.RepositoryProvider;
import org.axonframework.monitoring.MessageMonitor;
import org.axonframework.monitoring.NoOpMessageMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/axonframework/disruptor/commandhandling/DisruptorCommandBus.class */
public class DisruptorCommandBus implements CommandBus {
    private static final Logger logger = LoggerFactory.getLogger(DisruptorCommandBus.class);
    private final List<MessageDispatchInterceptor<? super CommandMessage<?>>> dispatchInterceptors;
    private final List<MessageHandlerInterceptor<? super CommandMessage<?>>> invokerInterceptors;
    private final List<MessageHandlerInterceptor<? super CommandMessage<?>>> publisherInterceptors;
    private final ExecutorService executorService;
    private final boolean rescheduleOnCorruptState;
    private final long coolingDownPeriod;
    private final CommandTargetResolver commandTargetResolver;
    private final int publisherCount;
    private final MessageMonitor<? super CommandMessage<?>> messageMonitor;
    private final Disruptor<CommandHandlingEntry> disruptor;
    private final CommandHandlerInvoker[] commandHandlerInvokers;
    private final DuplicateCommandHandlerResolver duplicateCommandHandlerResolver;
    private final CommandCallback<Object, Object> defaultCommandCallback;
    private final ConcurrentMap<String, MessageHandler<? super CommandMessage<?>>> commandHandlers = new ConcurrentHashMap();
    private volatile boolean started = true;
    private volatile boolean disruptorShutDown = false;

    /* loaded from: input_file:org/axonframework/disruptor/commandhandling/DisruptorCommandBus$Builder.class */
    public static class Builder {
        private static final int DEFAULT_BUFFER_SIZE = 4096;
        private Executor executor;
        private TransactionManager transactionManager;
        private final List<MessageHandlerInterceptor<? super CommandMessage<?>>> invokerInterceptors = new ArrayList();
        private final List<MessageHandlerInterceptor<? super CommandMessage<?>>> publisherInterceptors = new ArrayList();
        private final List<MessageDispatchInterceptor<? super CommandMessage<?>>> dispatchInterceptors = new ArrayList();
        private boolean rescheduleCommandsOnCorruptState = true;
        private long coolingDownPeriod = 1000;
        private CommandTargetResolver commandTargetResolver = AnnotationCommandTargetResolver.builder().build();
        private int publisherThreadCount = 1;
        private MessageMonitor<? super CommandMessage<?>> messageMonitor = NoOpMessageMonitor.INSTANCE;
        private RollbackConfiguration rollbackConfiguration = RollbackConfigurationType.UNCHECKED_EXCEPTIONS;
        private int bufferSize = DEFAULT_BUFFER_SIZE;
        private ProducerType producerType = ProducerType.MULTI;
        private WaitStrategy waitStrategy = new BlockingWaitStrategy();
        private int invokerThreadCount = 1;
        private Cache cache = NoCache.INSTANCE;
        private DuplicateCommandHandlerResolver duplicateCommandHandlerResolver = DuplicateCommandHandlerResolution.logAndOverride();
        private CommandCallback<Object, Object> defaultCommandCallback = FailureLoggingCommandCallback.INSTANCE;

        public Builder invokerInterceptors(List<MessageHandlerInterceptor<? super CommandMessage<?>>> list) {
            this.invokerInterceptors.clear();
            this.invokerInterceptors.addAll(list);
            return this;
        }

        public Builder publisherInterceptors(List<MessageHandlerInterceptor<CommandMessage<?>>> list) {
            this.publisherInterceptors.clear();
            this.publisherInterceptors.addAll(list);
            return this;
        }

        public Builder dispatchInterceptors(List<MessageDispatchInterceptor<CommandMessage<?>>> list) {
            this.dispatchInterceptors.clear();
            this.dispatchInterceptors.addAll(list);
            return this;
        }

        public Builder executor(Executor executor) {
            this.executor = executor;
            return this;
        }

        public Builder rescheduleCommandsOnCorruptState(boolean z) {
            this.rescheduleCommandsOnCorruptState = z;
            return this;
        }

        public Builder coolingDownPeriod(long j) {
            assertCoolingDownPeriod(j);
            this.coolingDownPeriod = j;
            return this;
        }

        public Builder commandTargetResolver(CommandTargetResolver commandTargetResolver) {
            BuilderUtils.assertNonNull(commandTargetResolver, "CommandTargetResolver may not be null");
            this.commandTargetResolver = commandTargetResolver;
            return this;
        }

        public Builder publisherThreadCount(int i) {
            assertPublisherThreadCount(i);
            this.publisherThreadCount = i;
            return this;
        }

        public Builder messageMonitor(MessageMonitor<? super CommandMessage<?>> messageMonitor) {
            BuilderUtils.assertNonNull(messageMonitor, "MessageMonitor may not be null");
            this.messageMonitor = messageMonitor;
            return this;
        }

        public Builder transactionManager(TransactionManager transactionManager) {
            this.transactionManager = transactionManager;
            return this;
        }

        public Builder rollbackConfiguration(RollbackConfiguration rollbackConfiguration) {
            BuilderUtils.assertNonNull(rollbackConfiguration, "RollbackConfiguration may not be null");
            this.rollbackConfiguration = rollbackConfiguration;
            return this;
        }

        public Builder bufferSize(int i) {
            assertBufferSize(i);
            this.bufferSize = i;
            return this;
        }

        public Builder producerType(ProducerType producerType) {
            BuilderUtils.assertNonNull(producerType, "ProducerType may not be null");
            this.producerType = producerType;
            return this;
        }

        public Builder waitStrategy(WaitStrategy waitStrategy) {
            BuilderUtils.assertNonNull(waitStrategy, "WaitStrategy may not be null");
            this.waitStrategy = waitStrategy;
            return this;
        }

        public Builder invokerThreadCount(int i) {
            assertInvokerThreadCount(i);
            this.invokerThreadCount = i;
            return this;
        }

        public Builder cache(Cache cache) {
            BuilderUtils.assertNonNull(cache, "Cache may not be null");
            this.cache = cache;
            return this;
        }

        public Builder duplicateCommandHandlerResolver(DuplicateCommandHandlerResolver duplicateCommandHandlerResolver) {
            BuilderUtils.assertNonNull(duplicateCommandHandlerResolver, "DuplicateCommandHandlerResolver may not be null");
            this.duplicateCommandHandlerResolver = duplicateCommandHandlerResolver;
            return this;
        }

        public Builder defaultCommandCallback(CommandCallback<Object, Object> commandCallback) {
            this.defaultCommandCallback = (CommandCallback) ObjectUtils.getOrDefault(commandCallback, NoOpCallback.INSTANCE);
            return this;
        }

        public DisruptorCommandBus build() {
            return new DisruptorCommandBus(this);
        }

        protected void validate() {
            assertCoolingDownPeriod(this.coolingDownPeriod);
            assertPublisherThreadCount(this.publisherThreadCount);
            assertBufferSize(this.bufferSize);
            assertInvokerThreadCount(this.invokerThreadCount);
        }

        private void assertCoolingDownPeriod(long j) {
            BuilderUtils.assertThat(Long.valueOf(j), l -> {
                return l.longValue() > 0;
            }, "The cooling down period must be a positive number");
        }

        private void assertBufferSize(int i) {
            BuilderUtils.assertThat(Integer.valueOf(i), num -> {
                return num.intValue() > 0 && num.intValue() % 2 == 0;
            }, "The buffer size must be positive and a power of 2");
        }

        private void assertPublisherThreadCount(int i) {
            BuilderUtils.assertThat(Integer.valueOf(i), num -> {
                return num.intValue() > 0;
            }, "The publisher thread count must at least be 1");
        }

        private void assertInvokerThreadCount(int i) {
            BuilderUtils.assertThat(Integer.valueOf(i), num -> {
                return num.intValue() > 0;
            }, "The invoker thread count must be at least 1");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/axonframework/disruptor/commandhandling/DisruptorCommandBus$DisruptorRepository.class */
    public class DisruptorRepository<T> implements Repository<T> {
        private final Class<T> type;

        public DisruptorRepository(Class<T> cls) {
            this.type = cls;
        }

        public Aggregate<T> load(String str, Long l) {
            return CommandHandlerInvoker.getRepository(this.type).load(str, l);
        }

        public Aggregate<T> load(String str) {
            return CommandHandlerInvoker.getRepository(this.type).load(str);
        }

        public Aggregate<T> newInstance(Callable<T> callable) throws Exception {
            return CommandHandlerInvoker.getRepository(this.type).newInstance(callable);
        }

        public void send(Message<?> message, ScopeDescriptor scopeDescriptor) throws Exception {
            CompletableFuture<?> completableFuture = new CompletableFuture<>();
            send(message, scopeDescriptor, completableFuture);
            try {
                completableFuture.get();
            } catch (ExecutionException e) {
                if (!(e.getCause() instanceof Exception)) {
                    throw e;
                }
                throw ((Exception) e.getCause());
            }
        }

        private void send(final Message<?> message, ScopeDescriptor scopeDescriptor, final CompletableFuture<?> completableFuture) {
            if (!canResolve(scopeDescriptor)) {
                completableFuture.complete(null);
                return;
            }
            final String obj = ((AggregateScopeDescriptor) scopeDescriptor).getIdentifier().toString();
            RingBuffer ringBuffer = DisruptorCommandBus.this.disruptor.getRingBuffer();
            int i = 0;
            int i2 = 0;
            if ((DisruptorCommandBus.this.commandHandlerInvokers.length > 1 || DisruptorCommandBus.this.publisherCount > 1) && obj != null) {
                int hashCode = obj.hashCode() & Integer.MAX_VALUE;
                if (DisruptorCommandBus.this.commandHandlerInvokers.length > 1) {
                    i = hashCode % DisruptorCommandBus.this.commandHandlerInvokers.length;
                }
                if (DisruptorCommandBus.this.publisherCount > 1) {
                    i2 = hashCode % DisruptorCommandBus.this.publisherCount;
                }
            }
            long next = ringBuffer.next();
            try {
                ((CommandHandlingEntry) ringBuffer.get(next)).resetAsCallable(() -> {
                    try {
                        return load(obj).handle(message);
                    } catch (AggregateNotFoundException e) {
                        DisruptorCommandBus.logger.debug("Aggregate (with id: [{}]) cannot be loaded. Hence, message '[{}]' cannot be handled.", obj, message);
                        return null;
                    }
                }, i, i2, new BlacklistDetectingCallback<>(new CommandCallback<Object, Object>() { // from class: org.axonframework.disruptor.commandhandling.DisruptorCommandBus.DisruptorRepository.1
                    public void onResult(CommandMessage<?> commandMessage, CommandResultMessage<?> commandResultMessage) {
                        if (!commandResultMessage.isExceptional()) {
                            completableFuture.complete(null);
                        } else {
                            DisruptorCommandBus.logger.warn("Failed sending message [{}] to aggregate with id [{}]", message, obj);
                            completableFuture.completeExceptionally(commandResultMessage.exceptionResult());
                        }
                    }
                }, DisruptorCommandBus.this.disruptor.getRingBuffer(), (commandMessage, commandCallback) -> {
                    send(message, scopeDescriptor, completableFuture);
                }, DisruptorCommandBus.this.rescheduleOnCorruptState));
                ringBuffer.publish(next);
            } catch (Throwable th) {
                ringBuffer.publish(next);
                throw th;
            }
        }

        public boolean canResolve(ScopeDescriptor scopeDescriptor) {
            return (scopeDescriptor instanceof AggregateScopeDescriptor) && Objects.equals(this.type.getSimpleName(), ((AggregateScopeDescriptor) scopeDescriptor).getType());
        }
    }

    /* loaded from: input_file:org/axonframework/disruptor/commandhandling/DisruptorCommandBus$ExceptionHandler.class */
    private class ExceptionHandler implements com.lmax.disruptor.ExceptionHandler {
        private ExceptionHandler() {
        }

        public void handleEventException(Throwable th, long j, Object obj) {
            DisruptorCommandBus.logger.error("Exception occurred while processing a {}.", ((CommandHandlingEntry) obj).getMessage().getPayloadType().getSimpleName(), th);
        }

        public void handleOnStartException(Throwable th) {
            DisruptorCommandBus.logger.error("Failed to start the DisruptorCommandBus.", th);
            DisruptorCommandBus.this.disruptor.shutdown();
        }

        public void handleOnShutdownException(Throwable th) {
            DisruptorCommandBus.logger.error("Error while shutting down the DisruptorCommandBus", th);
        }
    }

    /* loaded from: input_file:org/axonframework/disruptor/commandhandling/DisruptorCommandBus$FailureLoggingCommandCallback.class */
    private static class FailureLoggingCommandCallback implements CommandCallback<Object, Object> {
        private static final FailureLoggingCommandCallback INSTANCE = new FailureLoggingCommandCallback();

        private FailureLoggingCommandCallback() {
        }

        public void onResult(CommandMessage<?> commandMessage, CommandResultMessage<?> commandResultMessage) {
            if (commandResultMessage.isExceptional()) {
                DisruptorCommandBus.logger.info("An error occurred while handling a command [{}].", commandMessage.getCommandName(), commandResultMessage.exceptionResult());
            }
        }
    }

    public static Builder builder() {
        return new Builder();
    }

    protected DisruptorCommandBus(Builder builder) {
        builder.validate();
        this.dispatchInterceptors = new CopyOnWriteArrayList(builder.dispatchInterceptors);
        this.invokerInterceptors = new CopyOnWriteArrayList(builder.invokerInterceptors);
        this.publisherInterceptors = new ArrayList(builder.publisherInterceptors);
        Executor executor = builder.executor;
        if (executor == null) {
            this.executorService = Executors.newCachedThreadPool(new AxonThreadFactory("DisruptorCommandBus"));
            executor = this.executorService;
        } else {
            this.executorService = null;
        }
        this.rescheduleOnCorruptState = builder.rescheduleCommandsOnCorruptState;
        this.coolingDownPeriod = builder.coolingDownPeriod;
        this.commandTargetResolver = builder.commandTargetResolver;
        this.defaultCommandCallback = builder.defaultCommandCallback;
        EventPublisher[] initializePublisherThreads = initializePublisherThreads(builder.publisherThreadCount, executor, builder.transactionManager, builder.rollbackConfiguration);
        this.publisherCount = initializePublisherThreads.length;
        this.messageMonitor = builder.messageMonitor;
        this.duplicateCommandHandlerResolver = builder.duplicateCommandHandlerResolver;
        this.disruptor = new Disruptor<>(CommandHandlingEntry::new, builder.bufferSize, executor, builder.producerType, builder.waitStrategy);
        this.commandHandlerInvokers = initializeInvokerThreads(builder.invokerThreadCount, builder.cache);
        this.disruptor.setDefaultExceptionHandler(new ExceptionHandler());
        this.disruptor.handleEventsWith(this.commandHandlerInvokers).then(initializePublisherThreads);
        this.disruptor.start();
    }

    private EventPublisher[] initializePublisherThreads(int i, Executor executor, TransactionManager transactionManager, RollbackConfiguration rollbackConfiguration) {
        EventPublisher[] eventPublisherArr = new EventPublisher[i];
        Arrays.setAll(eventPublisherArr, i2 -> {
            return new EventPublisher(executor, transactionManager, rollbackConfiguration, i2);
        });
        return eventPublisherArr;
    }

    private CommandHandlerInvoker[] initializeInvokerThreads(int i, Cache cache) {
        CommandHandlerInvoker[] commandHandlerInvokerArr = new CommandHandlerInvoker[i];
        Arrays.setAll(commandHandlerInvokerArr, i2 -> {
            return new CommandHandlerInvoker(cache, i2);
        });
        return commandHandlerInvokerArr;
    }

    public <C> void dispatch(CommandMessage<C> commandMessage) {
        dispatch(commandMessage, this.defaultCommandCallback);
    }

    public <C, R> void dispatch(CommandMessage<C> commandMessage, CommandCallback<? super C, ? super R> commandCallback) {
        Assert.state(this.started, () -> {
            return "CommandBus has been shut down. It is not accepting any Commands";
        });
        CommandMessage<C> commandMessage2 = commandMessage;
        Iterator<MessageDispatchInterceptor<? super CommandMessage<?>>> it = this.dispatchInterceptors.iterator();
        while (it.hasNext()) {
            commandMessage2 = it.next().handle(commandMessage2);
        }
        MessageMonitor.MonitorCallback onMessageIngested = this.messageMonitor.onMessageIngested(commandMessage2);
        try {
            doDispatch(commandMessage2, new MonitorAwareCallback(commandCallback, onMessageIngested));
        } catch (Exception e) {
            onMessageIngested.reportFailure(e);
            commandCallback.onResult(commandMessage2, GenericCommandResultMessage.asCommandResultMessage(e));
        }
    }

    private <C, R> void doDispatch(CommandMessage<? extends C> commandMessage, CommandCallback<? super C, R> commandCallback) {
        String identifier;
        Assert.state(!this.disruptorShutDown, () -> {
            return "Disruptor has been shut down. Cannot dispatch or re-dispatch commands";
        });
        MessageHandler<? super CommandMessage<?>> messageHandler = this.commandHandlers.get(commandMessage.getCommandName());
        if (messageHandler == null) {
            commandCallback.onResult(commandMessage, GenericCommandResultMessage.asCommandResultMessage(new NoHandlerForCommandException(String.format("No handler was subscribed to command [%s]", commandMessage.getCommandName()))));
            return;
        }
        RingBuffer ringBuffer = this.disruptor.getRingBuffer();
        int i = 0;
        int i2 = 0;
        if ((this.commandHandlerInvokers.length > 1 || this.publisherCount > 1) && (identifier = this.commandTargetResolver.resolveTarget(commandMessage).getIdentifier()) != null) {
            int hashCode = identifier.hashCode() & Integer.MAX_VALUE;
            if (this.commandHandlerInvokers.length > 1) {
                i = hashCode % this.commandHandlerInvokers.length;
            }
            if (this.publisherCount > 1) {
                i2 = hashCode % this.publisherCount;
            }
        }
        long next = ringBuffer.next();
        try {
            ((CommandHandlingEntry) ringBuffer.get(next)).reset(commandMessage, messageHandler, i, i2, new BlacklistDetectingCallback(commandCallback, this.disruptor.getRingBuffer(), this::doDispatch, this.rescheduleOnCorruptState), this.invokerInterceptors, this.publisherInterceptors);
            ringBuffer.publish(next);
        } catch (Throwable th) {
            ringBuffer.publish(next);
            throw th;
        }
    }

    public <T> Repository<T> createRepository(EventStore eventStore, AggregateFactory<T> aggregateFactory) {
        return createRepository(eventStore, (AggregateFactory) aggregateFactory, (SnapshotTriggerDefinition) NoSnapshotTriggerDefinition.INSTANCE);
    }

    public <T> Repository<T> createRepository(EventStore eventStore, AggregateFactory<T> aggregateFactory, RepositoryProvider repositoryProvider) {
        return createRepository(eventStore, (AggregateFactory) aggregateFactory, (SnapshotTriggerDefinition) NoSnapshotTriggerDefinition.INSTANCE, repositoryProvider);
    }

    public <T> Repository<T> createRepository(EventStore eventStore, AggregateFactory<T> aggregateFactory, SnapshotTriggerDefinition snapshotTriggerDefinition) {
        return createRepository(eventStore, aggregateFactory, snapshotTriggerDefinition, ClasspathParameterResolverFactory.forClass(aggregateFactory.getAggregateType()));
    }

    public <T> Repository<T> createRepository(EventStore eventStore, AggregateFactory<T> aggregateFactory, SnapshotTriggerDefinition snapshotTriggerDefinition, RepositoryProvider repositoryProvider) {
        return createRepository(eventStore, aggregateFactory, snapshotTriggerDefinition, ClasspathParameterResolverFactory.forClass(aggregateFactory.getAggregateType()), ClasspathHandlerDefinition.forClass(aggregateFactory.getAggregateType()), repositoryProvider);
    }

    public <T> Repository<T> createRepository(EventStore eventStore, AggregateFactory<T> aggregateFactory, ParameterResolverFactory parameterResolverFactory) {
        return createRepository(eventStore, (AggregateFactory) aggregateFactory, (SnapshotTriggerDefinition) NoSnapshotTriggerDefinition.INSTANCE, parameterResolverFactory);
    }

    public <T> Repository<T> createRepository(EventStore eventStore, AggregateFactory<T> aggregateFactory, ParameterResolverFactory parameterResolverFactory, HandlerDefinition handlerDefinition, RepositoryProvider repositoryProvider) {
        return createRepository(eventStore, aggregateFactory, NoSnapshotTriggerDefinition.INSTANCE, parameterResolverFactory, handlerDefinition, repositoryProvider);
    }

    public <T> Repository<T> createRepository(EventStore eventStore, AggregateFactory<T> aggregateFactory, SnapshotTriggerDefinition snapshotTriggerDefinition, ParameterResolverFactory parameterResolverFactory) {
        return createRepository(eventStore, aggregateFactory, snapshotTriggerDefinition, parameterResolverFactory, ClasspathHandlerDefinition.forClass(aggregateFactory.getAggregateType()), null);
    }

    public <T> Repository<T> createRepository(EventStore eventStore, AggregateFactory<T> aggregateFactory, SnapshotTriggerDefinition snapshotTriggerDefinition, ParameterResolverFactory parameterResolverFactory, HandlerDefinition handlerDefinition, RepositoryProvider repositoryProvider) {
        for (CommandHandlerInvoker commandHandlerInvoker : this.commandHandlerInvokers) {
            commandHandlerInvoker.createRepository(eventStore, repositoryProvider, aggregateFactory, snapshotTriggerDefinition, parameterResolverFactory, handlerDefinition);
        }
        return new DisruptorRepository(aggregateFactory.getAggregateType());
    }

    public Registration subscribe(String str, MessageHandler<? super CommandMessage<?>> messageHandler) {
        this.commandHandlers.compute(str, (str2, messageHandler2) -> {
            return (messageHandler2 == null || messageHandler2 == messageHandler) ? messageHandler : this.duplicateCommandHandlerResolver.resolve(str2, messageHandler2, messageHandler);
        });
        return () -> {
            return this.commandHandlers.remove(str, messageHandler);
        };
    }

    public void stop() {
        if (this.started) {
            this.started = false;
            long currentTimeMillis = System.currentTimeMillis();
            long cursor = this.disruptor.getRingBuffer().getCursor();
            while (System.currentTimeMillis() - currentTimeMillis < this.coolingDownPeriod && !Thread.interrupted()) {
                if (this.disruptor.getRingBuffer().getCursor() != cursor) {
                    currentTimeMillis = System.currentTimeMillis();
                    cursor = this.disruptor.getRingBuffer().getCursor();
                }
            }
            this.disruptorShutDown = true;
            this.disruptor.shutdown();
            if (this.executorService != null) {
                this.executorService.shutdown();
            }
        }
    }

    public Registration registerDispatchInterceptor(MessageDispatchInterceptor<? super CommandMessage<?>> messageDispatchInterceptor) {
        this.dispatchInterceptors.add(messageDispatchInterceptor);
        return () -> {
            return this.dispatchInterceptors.remove(messageDispatchInterceptor);
        };
    }

    public Registration registerHandlerInterceptor(MessageHandlerInterceptor<? super CommandMessage<?>> messageHandlerInterceptor) {
        this.invokerInterceptors.add(messageHandlerInterceptor);
        return () -> {
            return this.invokerInterceptors.remove(messageHandlerInterceptor);
        };
    }
}
