package org.axonframework.config;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.BuilderUtils;
import org.axonframework.common.annotation.AnnotationUtils;
import org.axonframework.common.transaction.NoTransactionManager;
import org.axonframework.common.transaction.TransactionManager;
import org.axonframework.config.EventProcessingConfigurer;
import org.axonframework.eventhandling.DirectEventProcessingStrategy;
import org.axonframework.eventhandling.ErrorHandler;
import org.axonframework.eventhandling.EventHandlerInvoker;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.EventProcessor;
import org.axonframework.eventhandling.ListenerInvocationErrorHandler;
import org.axonframework.eventhandling.LoggingErrorHandler;
import org.axonframework.eventhandling.MultiEventHandlerInvoker;
import org.axonframework.eventhandling.PropagatingErrorHandler;
import org.axonframework.eventhandling.SimpleEventHandlerInvoker;
import org.axonframework.eventhandling.SubscribingEventProcessor;
import org.axonframework.eventhandling.TrackedEventMessage;
import org.axonframework.eventhandling.TrackingEventProcessor;
import org.axonframework.eventhandling.TrackingEventProcessorConfiguration;
import org.axonframework.eventhandling.async.SequencingPolicy;
import org.axonframework.eventhandling.async.SequentialPerAggregatePolicy;
import org.axonframework.eventhandling.tokenstore.TokenStore;
import org.axonframework.eventhandling.tokenstore.inmemory.InMemoryTokenStore;
import org.axonframework.messaging.Message;
import org.axonframework.messaging.MessageHandlerInterceptor;
import org.axonframework.messaging.StreamableMessageSource;
import org.axonframework.messaging.SubscribableMessageSource;
import org.axonframework.messaging.interceptors.CorrelationDataInterceptor;
import org.axonframework.messaging.unitofwork.RollbackConfiguration;
import org.axonframework.messaging.unitofwork.RollbackConfigurationType;
import org.axonframework.modelling.saga.repository.SagaStore;
import org.axonframework.modelling.saga.repository.inmemory.InMemorySagaStore;
import org.axonframework.monitoring.MessageMonitor;

/* loaded from: input_file:org/axonframework/config/EventProcessingModule.class */
public class EventProcessingModule implements ModuleConfiguration, EventProcessingConfiguration, EventProcessingConfigurer {
    private Configuration configuration;
    private final List<TypeProcessingGroupSelector> typeSelectors = new ArrayList();
    private final List<InstanceProcessingGroupSelector> instanceSelectors = new ArrayList();
    private final List<SagaConfigurer<?>> sagaConfigurations = new ArrayList();
    private final List<Component<Object>> eventHandlerBuilders = new ArrayList();
    private final Map<String, Component<ListenerInvocationErrorHandler>> listenerInvocationErrorHandlers = new HashMap();
    private final Map<String, Component<ErrorHandler>> errorHandlers = new HashMap();
    private final Map<String, EventProcessingConfigurer.EventProcessorBuilder> eventProcessorBuilders = new HashMap();
    private final Map<String, Component<EventProcessor>> eventProcessors = new HashMap();
    private final List<BiFunction<Configuration, String, MessageHandlerInterceptor<? super EventMessage<?>>>> defaultHandlerInterceptors = new ArrayList();
    private final Map<String, List<Function<Configuration, MessageHandlerInterceptor<? super EventMessage<?>>>>> handlerInterceptorsBuilders = new HashMap();
    private final Map<String, String> processingGroupsAssignments = new HashMap();
    private final Map<String, Component<SequencingPolicy<? super EventMessage<?>>>> sequencingPolicies = new HashMap();
    private final Map<String, MessageMonitorFactory> messageMonitorFactories = new HashMap();
    private final Map<String, Component<TokenStore>> tokenStore = new HashMap();
    private final Map<String, Component<RollbackConfiguration>> rollbackConfigurations = new HashMap();
    private final Map<String, Component<TransactionManager>> transactionManagers = new HashMap();
    private Function<Class<?>, String> typeFallback = cls -> {
        return cls.getSimpleName() + "Processor";
    };
    private final TypeProcessingGroupSelector defaultTypeSelector = TypeProcessingGroupSelector.defaultSelector(cls -> {
        return annotatedProcessingGroupOfType(cls).orElseGet(() -> {
            return this.typeFallback.apply(cls);
        });
    });
    private Function<Object, String> instanceFallback = EventProcessingModule::packageOfObject;
    private final InstanceProcessingGroupSelector defaultInstanceSelector = InstanceProcessingGroupSelector.defaultSelector(obj -> {
        return annotatedProcessingGroupOfType(obj.getClass()).orElseGet(() -> {
            return this.instanceFallback.apply(obj);
        });
    });
    private final Component<ListenerInvocationErrorHandler> defaultListenerInvocationErrorHandler = new Component<>((Supplier<Configuration>) () -> {
        return this.configuration;
    }, "listenerInvocationErrorHandler", configuration -> {
        return (ListenerInvocationErrorHandler) configuration.getComponent(ListenerInvocationErrorHandler.class, LoggingErrorHandler::new);
    });
    private final Component<ErrorHandler> defaultErrorHandler = new Component<>((Supplier<Configuration>) () -> {
        return this.configuration;
    }, "errorHandler", configuration -> {
        return (ErrorHandler) configuration.getComponent(ErrorHandler.class, PropagatingErrorHandler::instance);
    });
    private final Component<SequencingPolicy<? super EventMessage<?>>> defaultSequencingPolicy = new Component<>((Supplier<Configuration>) () -> {
        return this.configuration;
    }, "sequencingPolicy", configuration -> {
        return SequentialPerAggregatePolicy.instance();
    });
    private final Component<TokenStore> defaultTokenStore = new Component<>((Supplier<Configuration>) () -> {
        return this.configuration;
    }, "tokenStore", configuration -> {
        return (TokenStore) configuration.getComponent(TokenStore.class, InMemoryTokenStore::new);
    });
    private final Component<RollbackConfiguration> defaultRollbackConfiguration = new Component<>((Supplier<Configuration>) () -> {
        return this.configuration;
    }, "rollbackConfiguration", configuration -> {
        return (RollbackConfiguration) configuration.getComponent(RollbackConfiguration.class, () -> {
            return RollbackConfigurationType.ANY_THROWABLE;
        });
    });
    private final Component<SagaStore> sagaStore = new Component<>((Supplier<Configuration>) () -> {
        return this.configuration;
    }, "sagaStore", configuration -> {
        return (SagaStore) configuration.getComponent(SagaStore.class, InMemorySagaStore::new);
    });
    private final Component<TransactionManager> defaultTransactionManager = new Component<>((Supplier<Configuration>) () -> {
        return this.configuration;
    }, "transactionManager", configuration -> {
        return (TransactionManager) configuration.getComponent(TransactionManager.class, NoTransactionManager::instance);
    });
    private Component<StreamableMessageSource<TrackedEventMessage<?>>> defaultStreamableSource = new Component<>((Supplier<Configuration>) () -> {
        return this.configuration;
    }, "defaultStreamableMessageSource", configuration -> {
        return configuration.eventBus();
    });
    private Component<SubscribableMessageSource<? extends EventMessage<?>>> defaultSubscribableSource = new Component<>((Supplier<Configuration>) () -> {
        return this.configuration;
    }, "defaultSubscribableMessageSource", (v0) -> {
        return v0.eventBus();
    });
    private final Component<TrackingEventProcessorConfiguration> defaultTrackingEventProcessorConfiguration = new Component<>((Supplier<Configuration>) () -> {
        return this.configuration;
    }, "trackingEventProcessorConfiguration", configuration -> {
        return (TrackingEventProcessorConfiguration) configuration.getComponent(TrackingEventProcessorConfiguration.class, TrackingEventProcessorConfiguration::forSingleThreadedProcessing);
    });
    private EventProcessingConfigurer.EventProcessorBuilder defaultEventProcessorBuilder = this::defaultEventProcessor;
    private Function<String, String> defaultProcessingGroupAssignment = Function.identity();

    /* loaded from: input_file:org/axonframework/config/EventProcessingModule$InstanceProcessingGroupSelector.class */
    private static class InstanceProcessingGroupSelector extends ProcessingGroupSelector<Object> {
        /* JADX INFO: Access modifiers changed from: private */
        public static InstanceProcessingGroupSelector defaultSelector(Function<Object, String> function) {
            return new InstanceProcessingGroupSelector(Integer.MIN_VALUE, function.andThen((v0) -> {
                return Optional.of(v0);
            }));
        }

        private InstanceProcessingGroupSelector(int i, Function<Object, Optional<String>> function) {
            super(i, function);
        }

        private InstanceProcessingGroupSelector(String str, int i, Predicate<Object> predicate) {
            super(str, i, predicate);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/axonframework/config/EventProcessingModule$ProcessingGroupSelector.class */
    public static class ProcessingGroupSelector<T> {
        private final int priority;
        private final Function<T, Optional<String>> function;

        private ProcessingGroupSelector(int i, Function<T, Optional<String>> function) {
            this.priority = i;
            this.function = function;
        }

        private ProcessingGroupSelector(String str, int i, Predicate<T> predicate) {
            this(i, obj -> {
                return predicate.test(obj) ? Optional.of(str) : Optional.empty();
            });
        }

        public Optional<String> select(T t) {
            return this.function.apply(t);
        }

        public int getPriority() {
            return this.priority;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/axonframework/config/EventProcessingModule$TypeProcessingGroupSelector.class */
    public static class TypeProcessingGroupSelector extends ProcessingGroupSelector<Class<?>> {
        /* JADX INFO: Access modifiers changed from: private */
        public static TypeProcessingGroupSelector defaultSelector(Function<Class<?>, String> function) {
            return new TypeProcessingGroupSelector(Integer.MIN_VALUE, function.andThen((v0) -> {
                return Optional.of(v0);
            }));
        }

        private TypeProcessingGroupSelector(int i, Function<Class<?>, Optional<String>> function) {
            super(i, function);
        }

        private TypeProcessingGroupSelector(String str, int i, Predicate<Class<?>> predicate) {
            super(str, i, predicate);
        }
    }

    @Override // org.axonframework.config.ModuleConfiguration
    public void initialize(Configuration configuration) {
        this.configuration = configuration;
        this.eventProcessors.clear();
        this.instanceSelectors.sort(Comparator.comparing((v0) -> {
            return v0.getPriority();
        }).reversed());
        HashMap hashMap = new HashMap();
        registerSimpleEventHandlerInvokers(hashMap);
        registerSagaManagers(hashMap);
        hashMap.forEach((str, list) -> {
            this.eventProcessors.put(str, new Component<>(configuration, str, configuration2 -> {
                return buildEventProcessor(list, str);
            }));
        });
        initializeProcessors();
    }

    private void initializeProcessors() {
        this.configuration.onStart(Integer.MIN_VALUE, () -> {
            this.eventProcessors.values().forEach((v0) -> {
                v0.get();
            });
        });
    }

    private String selectProcessingGroupByType(Class<?> cls) {
        return (String) this.typeSelectors.stream().map(typeProcessingGroupSelector -> {
            return typeProcessingGroupSelector.select(cls);
        }).filter((v0) -> {
            return v0.isPresent();
        }).map((v0) -> {
            return v0.get();
        }).findFirst().orElseGet(() -> {
            return this.defaultTypeSelector.select(cls).orElseThrow(IllegalStateException::new);
        });
    }

    private void registerSimpleEventHandlerInvokers(Map<String, List<Function<Configuration, EventHandlerInvoker>>> map) {
        HashMap hashMap = new HashMap();
        this.eventHandlerBuilders.stream().map((v0) -> {
            return v0.get();
        }).forEach(obj -> {
            ((List) hashMap.computeIfAbsent((String) this.instanceSelectors.stream().map(instanceProcessingGroupSelector -> {
                return instanceProcessingGroupSelector.select(obj);
            }).filter((v0) -> {
                return v0.isPresent();
            }).map((v0) -> {
                return v0.get();
            }).findFirst().orElse(this.defaultInstanceSelector.select(obj).orElse(selectProcessingGroupByType(obj.getClass()))), str -> {
                return new ArrayList();
            })).add(obj);
        });
        hashMap.forEach((str, list) -> {
            ((List) map.computeIfAbsent(processorNameForProcessingGroup(str), str -> {
                return new ArrayList();
            })).add(configuration -> {
                return SimpleEventHandlerInvoker.builder().eventHandlers(list).parameterResolverFactory(this.configuration.parameterResolverFactory()).listenerInvocationErrorHandler(listenerInvocationErrorHandler(str)).sequencingPolicy(sequencingPolicy(str)).build();
            });
        });
    }

    private void registerSagaManagers(Map<String, List<Function<Configuration, EventHandlerInvoker>>> map) {
        this.sagaConfigurations.forEach(sagaConfigurer -> {
            SagaConfiguration initialize = sagaConfigurer.initialize(this.configuration);
            ((List) map.computeIfAbsent(processorNameForProcessingGroup(selectProcessingGroupByType(initialize.type())), str -> {
                return new ArrayList();
            })).add(configuration -> {
                return initialize.manager();
            });
        });
    }

    private EventProcessor buildEventProcessor(List<Function<Configuration, EventHandlerInvoker>> list, String str) {
        EventProcessor build = this.eventProcessorBuilders.getOrDefault(str, this.defaultEventProcessorBuilder).build(str, this.configuration, new MultiEventHandlerInvoker((List) list.stream().map(function -> {
            return (EventHandlerInvoker) function.apply(this.configuration);
        }).collect(Collectors.toList())));
        Stream<R> map = this.handlerInterceptorsBuilders.getOrDefault(str, new ArrayList()).stream().map(function2 -> {
            return (MessageHandlerInterceptor) function2.apply(this.configuration);
        });
        build.getClass();
        map.forEach(build::registerHandlerInterceptor);
        Stream filter = this.defaultHandlerInterceptors.stream().map(biFunction -> {
            return (MessageHandlerInterceptor) biFunction.apply(this.configuration, str);
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        });
        build.getClass();
        filter.forEach(build::registerHandlerInterceptor);
        build.registerHandlerInterceptor(new CorrelationDataInterceptor(this.configuration.correlationDataProviders()));
        return build;
    }

    @Override // org.axonframework.config.EventProcessingConfiguration
    public <T extends EventProcessor> Optional<T> eventProcessorByProcessingGroup(String str) {
        ensureInitialized();
        return Optional.ofNullable(eventProcessors().get(processorNameForProcessingGroup(str)));
    }

    @Override // org.axonframework.config.EventProcessingConfiguration
    public Map<String, EventProcessor> eventProcessors() {
        ensureInitialized();
        HashMap hashMap = new HashMap(this.eventProcessors.size());
        this.eventProcessors.forEach((str, component) -> {
        });
        return hashMap;
    }

    @Override // org.axonframework.config.EventProcessingConfiguration
    public String sagaProcessingGroup(Class<?> cls) {
        return selectProcessingGroupByType(cls);
    }

    @Override // org.axonframework.config.EventProcessingConfiguration
    public List<MessageHandlerInterceptor<? super EventMessage<?>>> interceptorsFor(String str) {
        ensureInitialized();
        return (List) eventProcessor(str).map((v0) -> {
            return v0.getHandlerInterceptors();
        }).orElse(Collections.emptyList());
    }

    @Override // org.axonframework.config.EventProcessingConfiguration
    public ListenerInvocationErrorHandler listenerInvocationErrorHandler(String str) {
        ensureInitialized();
        return this.listenerInvocationErrorHandlers.containsKey(str) ? this.listenerInvocationErrorHandlers.get(str).get() : this.defaultListenerInvocationErrorHandler.get();
    }

    @Override // org.axonframework.config.EventProcessingConfiguration
    public SequencingPolicy<? super EventMessage<?>> sequencingPolicy(String str) {
        ensureInitialized();
        return this.sequencingPolicies.containsKey(str) ? this.sequencingPolicies.get(str).get() : this.defaultSequencingPolicy.get();
    }

    @Override // org.axonframework.config.EventProcessingConfiguration
    public RollbackConfiguration rollbackConfiguration(String str) {
        ensureInitialized();
        return this.rollbackConfigurations.containsKey(str) ? this.rollbackConfigurations.get(str).get() : this.defaultRollbackConfiguration.get();
    }

    @Override // org.axonframework.config.EventProcessingConfiguration
    public ErrorHandler errorHandler(String str) {
        ensureInitialized();
        return this.errorHandlers.containsKey(str) ? this.errorHandlers.get(str).get() : this.defaultErrorHandler.get();
    }

    @Override // org.axonframework.config.EventProcessingConfiguration
    public SagaStore sagaStore() {
        ensureInitialized();
        return this.sagaStore.get();
    }

    @Override // org.axonframework.config.EventProcessingConfiguration
    public List<SagaConfiguration<?>> sagaConfigurations() {
        ensureInitialized();
        return (List) this.sagaConfigurations.stream().map(sagaConfigurer -> {
            return sagaConfigurer.initialize(this.configuration);
        }).collect(Collectors.toList());
    }

    private String processorNameForProcessingGroup(String str) {
        ensureInitialized();
        return this.processingGroupsAssignments.getOrDefault(str, this.defaultProcessingGroupAssignment.apply(str));
    }

    @Override // org.axonframework.config.EventProcessingConfiguration
    public MessageMonitor<? super Message<?>> messageMonitor(Class<?> cls, String str) {
        ensureInitialized();
        return this.messageMonitorFactories.containsKey(str) ? this.messageMonitorFactories.get(str).create(this.configuration, cls, str) : this.configuration.messageMonitor(cls, str);
    }

    @Override // org.axonframework.config.EventProcessingConfiguration
    public TokenStore tokenStore(String str) {
        ensureInitialized();
        return this.tokenStore.containsKey(str) ? this.tokenStore.get(str).get() : this.defaultTokenStore.get();
    }

    @Override // org.axonframework.config.EventProcessingConfiguration
    public TransactionManager transactionManager(String str) {
        ensureInitialized();
        return this.transactionManagers.containsKey(str) ? this.transactionManagers.get(str).get() : this.defaultTransactionManager.get();
    }

    private void ensureInitialized() {
        BuilderUtils.assertNonNull(this.configuration, "Configuration is not initialized yet");
    }

    @Override // org.axonframework.config.EventProcessingConfigurer
    public <T> EventProcessingConfigurer registerSaga(Class<T> cls, Consumer<SagaConfigurer<T>> consumer) {
        SagaConfigurer<T> forType = SagaConfigurer.forType(cls);
        consumer.accept(forType);
        this.sagaConfigurations.add(forType);
        return this;
    }

    @Override // org.axonframework.config.EventProcessingConfigurer
    public EventProcessingConfigurer registerSagaStore(Function<Configuration, SagaStore> function) {
        this.sagaStore.update(function);
        return this;
    }

    @Override // org.axonframework.config.EventProcessingConfigurer
    public EventProcessingConfigurer registerEventHandler(Function<Configuration, Object> function) {
        this.eventHandlerBuilders.add(new Component<>((Supplier<Configuration>) () -> {
            return this.configuration;
        }, "eventHandler", function));
        return this;
    }

    @Override // org.axonframework.config.EventProcessingConfigurer
    public EventProcessingConfigurer registerDefaultListenerInvocationErrorHandler(Function<Configuration, ListenerInvocationErrorHandler> function) {
        this.defaultListenerInvocationErrorHandler.update(function);
        return this;
    }

    @Override // org.axonframework.config.EventProcessingConfigurer
    public EventProcessingConfigurer registerListenerInvocationErrorHandler(String str, Function<Configuration, ListenerInvocationErrorHandler> function) {
        this.listenerInvocationErrorHandlers.put(str, new Component<>((Supplier<Configuration>) () -> {
            return this.configuration;
        }, "listenerInvocationErrorHandler", function));
        return this;
    }

    @Override // org.axonframework.config.EventProcessingConfigurer
    public EventProcessingConfigurer configureDefaultStreamableMessageSource(Function<Configuration, StreamableMessageSource<TrackedEventMessage<?>>> function) {
        this.defaultStreamableSource.update(function);
        return this;
    }

    @Override // org.axonframework.config.EventProcessingConfigurer
    public EventProcessingConfigurer configureDefaultSubscribableMessageSource(Function<Configuration, SubscribableMessageSource<EventMessage<?>>> function) {
        this.defaultSubscribableSource.update(function);
        return this;
    }

    @Override // org.axonframework.config.EventProcessingConfigurer
    public EventProcessingConfigurer registerTrackingEventProcessor(String str, Function<Configuration, StreamableMessageSource<TrackedEventMessage<?>>> function) {
        return registerTrackingEventProcessor(str, function, configuration -> {
            return this.defaultTrackingEventProcessorConfiguration.get();
        });
    }

    @Override // org.axonframework.config.EventProcessingConfigurer
    public EventProcessingConfigurer registerTrackingEventProcessor(String str, Function<Configuration, StreamableMessageSource<TrackedEventMessage<?>>> function, Function<Configuration, TrackingEventProcessorConfiguration> function2) {
        registerEventProcessor(str, (str2, configuration, eventHandlerInvoker) -> {
            return trackingEventProcessor(str2, eventHandlerInvoker, (TrackingEventProcessorConfiguration) function2.apply(configuration), (StreamableMessageSource) function.apply(configuration));
        });
        return this;
    }

    @Override // org.axonframework.config.EventProcessingConfigurer
    public EventProcessingConfigurer registerEventProcessorFactory(EventProcessingConfigurer.EventProcessorBuilder eventProcessorBuilder) {
        this.defaultEventProcessorBuilder = eventProcessorBuilder;
        return this;
    }

    @Override // org.axonframework.config.EventProcessingConfigurer
    public EventProcessingConfigurer registerEventProcessor(String str, EventProcessingConfigurer.EventProcessorBuilder eventProcessorBuilder) {
        if (this.eventProcessorBuilders.containsKey(str)) {
            throw new AxonConfigurationException(String.format("Event processor with name %s already exists", str));
        }
        this.eventProcessorBuilders.put(str, eventProcessorBuilder);
        return this;
    }

    @Override // org.axonframework.config.EventProcessingConfigurer
    public EventProcessingConfigurer registerTokenStore(String str, Function<Configuration, TokenStore> function) {
        this.tokenStore.put(str, new Component<>((Supplier<Configuration>) () -> {
            return this.configuration;
        }, "tokenStore", function));
        return this;
    }

    @Override // org.axonframework.config.EventProcessingConfigurer
    public EventProcessingConfigurer registerTokenStore(Function<Configuration, TokenStore> function) {
        this.defaultTokenStore.update(function);
        return this;
    }

    @Override // org.axonframework.config.EventProcessingConfigurer
    public EventProcessingConfigurer usingSubscribingEventProcessors() {
        this.defaultEventProcessorBuilder = (str, configuration, eventHandlerInvoker) -> {
            return subscribingEventProcessor(str, eventHandlerInvoker, this.defaultSubscribableSource.get());
        };
        return this;
    }

    @Override // org.axonframework.config.EventProcessingConfigurer
    public EventProcessingConfigurer usingTrackingEventProcessors() {
        this.defaultEventProcessorBuilder = (str, configuration, eventHandlerInvoker) -> {
            return trackingEventProcessor(str, eventHandlerInvoker, this.defaultTrackingEventProcessorConfiguration.get(), this.defaultStreamableSource.get());
        };
        return this;
    }

    @Override // org.axonframework.config.EventProcessingConfigurer
    public EventProcessingConfigurer registerSubscribingEventProcessor(String str, Function<Configuration, SubscribableMessageSource<? extends EventMessage<?>>> function) {
        registerEventProcessor(str, (str2, configuration, eventHandlerInvoker) -> {
            return subscribingEventProcessor(str2, eventHandlerInvoker, (SubscribableMessageSource) function.apply(configuration));
        });
        return this;
    }

    @Override // org.axonframework.config.EventProcessingConfigurer
    public EventProcessingConfigurer registerDefaultErrorHandler(Function<Configuration, ErrorHandler> function) {
        this.defaultErrorHandler.update(function);
        return this;
    }

    @Override // org.axonframework.config.EventProcessingConfigurer
    public EventProcessingConfigurer registerErrorHandler(String str, Function<Configuration, ErrorHandler> function) {
        this.errorHandlers.put(str, new Component<>((Supplier<Configuration>) () -> {
            return this.configuration;
        }, "errorHandler", function));
        return this;
    }

    @Override // org.axonframework.config.EventProcessingConfigurer
    public EventProcessingConfigurer byDefaultAssignHandlerInstancesTo(Function<Object, String> function) {
        this.instanceFallback = function;
        return this;
    }

    @Override // org.axonframework.config.EventProcessingConfigurer
    public EventProcessingConfigurer byDefaultAssignHandlerTypesTo(Function<Class<?>, String> function) {
        this.typeFallback = function;
        return this;
    }

    @Override // org.axonframework.config.EventProcessingConfigurer
    public EventProcessingConfigurer assignHandlerInstancesMatching(String str, int i, Predicate<Object> predicate) {
        this.instanceSelectors.add(new InstanceProcessingGroupSelector(str, i, predicate));
        return this;
    }

    @Override // org.axonframework.config.EventProcessingConfigurer
    public EventProcessingConfigurer assignHandlerTypesMatching(String str, int i, Predicate<Class<?>> predicate) {
        this.typeSelectors.add(new TypeProcessingGroupSelector(str, i, predicate));
        return this;
    }

    @Override // org.axonframework.config.EventProcessingConfigurer
    public EventProcessingConfigurer assignProcessingGroup(String str, String str2) {
        this.processingGroupsAssignments.put(str, str2);
        return this;
    }

    @Override // org.axonframework.config.EventProcessingConfigurer
    public EventProcessingConfigurer assignProcessingGroup(Function<String, String> function) {
        this.defaultProcessingGroupAssignment = function;
        return this;
    }

    @Override // org.axonframework.config.EventProcessingConfigurer
    public EventProcessingConfigurer registerHandlerInterceptor(String str, Function<Configuration, MessageHandlerInterceptor<? super EventMessage<?>>> function) {
        if (this.configuration != null) {
            eventProcessor(str).ifPresent(eventProcessor -> {
                eventProcessor.registerHandlerInterceptor((MessageHandlerInterceptor) function.apply(this.configuration));
            });
        }
        this.handlerInterceptorsBuilders.computeIfAbsent(str, str2 -> {
            return new ArrayList();
        }).add(function);
        return this;
    }

    @Override // org.axonframework.config.EventProcessingConfigurer
    public EventProcessingConfigurer registerDefaultHandlerInterceptor(BiFunction<Configuration, String, MessageHandlerInterceptor<? super EventMessage<?>>> biFunction) {
        this.defaultHandlerInterceptors.add(biFunction);
        return this;
    }

    @Override // org.axonframework.config.EventProcessingConfigurer
    public EventProcessingConfigurer registerSequencingPolicy(String str, Function<Configuration, SequencingPolicy<? super EventMessage<?>>> function) {
        this.sequencingPolicies.put(str, new Component<>((Supplier<Configuration>) () -> {
            return this.configuration;
        }, "sequencingPolicy", function));
        return this;
    }

    @Override // org.axonframework.config.EventProcessingConfigurer
    public EventProcessingConfigurer registerDefaultSequencingPolicy(Function<Configuration, SequencingPolicy<? super EventMessage<?>>> function) {
        this.defaultSequencingPolicy.update(function);
        return this;
    }

    @Override // org.axonframework.config.EventProcessingConfigurer
    public EventProcessingConfigurer registerMessageMonitorFactory(String str, MessageMonitorFactory messageMonitorFactory) {
        this.messageMonitorFactories.put(str, messageMonitorFactory);
        return this;
    }

    @Override // org.axonframework.config.EventProcessingConfigurer
    public EventProcessingConfigurer registerRollbackConfiguration(String str, Function<Configuration, RollbackConfiguration> function) {
        this.rollbackConfigurations.put(str, new Component<>((Supplier<Configuration>) () -> {
            return this.configuration;
        }, "rollbackConfiguration", function));
        return this;
    }

    @Override // org.axonframework.config.EventProcessingConfigurer
    public EventProcessingConfigurer registerTransactionManager(String str, Function<Configuration, TransactionManager> function) {
        this.transactionManagers.put(str, new Component<>((Supplier<Configuration>) () -> {
            return this.configuration;
        }, "transactionManager", function));
        return this;
    }

    @Override // org.axonframework.config.EventProcessingConfigurer
    public EventProcessingConfigurer registerTrackingEventProcessorConfiguration(Function<Configuration, TrackingEventProcessorConfiguration> function) {
        this.defaultTrackingEventProcessorConfiguration.update(function);
        return this;
    }

    private EventProcessor defaultEventProcessor(String str, Configuration configuration, EventHandlerInvoker eventHandlerInvoker) {
        return configuration.eventBus() instanceof StreamableMessageSource ? trackingEventProcessor(str, eventHandlerInvoker, this.defaultTrackingEventProcessorConfiguration.get(), this.defaultStreamableSource.get()) : subscribingEventProcessor(str, eventHandlerInvoker, this.defaultSubscribableSource.get());
    }

    private SubscribingEventProcessor subscribingEventProcessor(String str, EventHandlerInvoker eventHandlerInvoker, SubscribableMessageSource<? extends EventMessage<?>> subscribableMessageSource) {
        return SubscribingEventProcessor.builder().name(str).eventHandlerInvoker(eventHandlerInvoker).rollbackConfiguration(rollbackConfiguration(str)).errorHandler(errorHandler(str)).messageMonitor(messageMonitor(SubscribingEventProcessor.class, str)).messageSource(subscribableMessageSource).processingStrategy(DirectEventProcessingStrategy.INSTANCE).transactionManager(transactionManager(str)).build();
    }

    private TrackingEventProcessor trackingEventProcessor(String str, EventHandlerInvoker eventHandlerInvoker, TrackingEventProcessorConfiguration trackingEventProcessorConfiguration, StreamableMessageSource<TrackedEventMessage<?>> streamableMessageSource) {
        return TrackingEventProcessor.builder().name(str).eventHandlerInvoker(eventHandlerInvoker).rollbackConfiguration(rollbackConfiguration(str)).errorHandler(errorHandler(str)).messageMonitor(messageMonitor(TrackingEventProcessor.class, str)).messageSource(streamableMessageSource).tokenStore(tokenStore(str)).transactionManager(transactionManager(str)).trackingEventProcessorConfiguration(trackingEventProcessorConfiguration).build();
    }

    protected static String packageOfObject(Object obj) {
        return obj.getClass().getName().replace("." + obj.getClass().getSimpleName(), "");
    }

    private static Optional<String> annotatedProcessingGroupOfType(Class<?> cls) {
        return AnnotationUtils.findAnnotationAttributes(cls, ProcessingGroup.class).map(map -> {
            return (String) map.get("processingGroup");
        });
    }
}
