package org.nuxeo.ecm.core.event.impl;

import java.rmi.dgc.VMID;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;
import javax.naming.NamingException;
import javax.transaction.RollbackException;
import javax.transaction.Synchronization;
import javax.transaction.SystemException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.common.logging.SequenceTracer;
import org.nuxeo.ecm.core.api.NuxeoException;
import org.nuxeo.ecm.core.api.RecoverableClientException;
import org.nuxeo.ecm.core.event.Event;
import org.nuxeo.ecm.core.event.EventBundle;
import org.nuxeo.ecm.core.event.EventContext;
import org.nuxeo.ecm.core.event.EventListener;
import org.nuxeo.ecm.core.event.EventService;
import org.nuxeo.ecm.core.event.EventServiceAdmin;
import org.nuxeo.ecm.core.event.EventStats;
import org.nuxeo.ecm.core.event.PostCommitEventListener;
import org.nuxeo.ecm.core.event.ReconnectedEventBundle;
import org.nuxeo.ecm.core.event.jms.AsyncProcessorConfig;
import org.nuxeo.ecm.core.event.pipe.EventPipeDescriptor;
import org.nuxeo.ecm.core.event.pipe.EventPipeRegistry;
import org.nuxeo.ecm.core.event.pipe.dispatch.EventBundleDispatcher;
import org.nuxeo.ecm.core.event.pipe.dispatch.EventDispatcherDescriptor;
import org.nuxeo.ecm.core.event.pipe.dispatch.EventDispatcherRegistry;
import org.nuxeo.runtime.api.Framework;
import org.nuxeo.runtime.transaction.TransactionHelper;

/* loaded from: input_file:org/nuxeo/ecm/core/event/impl/EventServiceImpl.class */
public class EventServiceImpl implements EventService, EventServiceAdmin, Synchronization {
    public static final VMID VMID = new VMID();
    private static final Log log = LogFactory.getLog(EventServiceImpl.class);
    protected static final ThreadLocal<CompositeEventBundle> threadBundles = new ThreadLocal<CompositeEventBundle>() { // from class: org.nuxeo.ecm.core.event.impl.EventServiceImpl.1
        /* JADX INFO: Access modifiers changed from: protected */
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.lang.ThreadLocal
        public CompositeEventBundle initialValue() {
            return new CompositeEventBundle();
        }
    };
    protected EventBundleDispatcher pipeDispatcher;
    protected final List<AsyncWaitHook> asyncWaitHooks = new CopyOnWriteArrayList();
    protected boolean blockAsyncProcessing = false;
    protected boolean blockSyncPostCommitProcessing = false;
    protected boolean bulkModeEnabled = false;
    protected EventPipeRegistry registeredPipes = new EventPipeRegistry();
    protected EventDispatcherRegistry dispatchers = new EventDispatcherRegistry();
    protected final EventListenerList listenerDescriptors = new EventListenerList();
    protected PostCommitEventExecutor postCommitExec = new PostCommitEventExecutor();
    protected volatile AsyncEventExecutor asyncExec = new AsyncEventExecutor();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/nuxeo/ecm/core/event/impl/EventServiceImpl$CompositeEventBundle.class */
    public static class CompositeEventBundle {
        boolean registeredSynchronization;
        final Map<String, EventBundle> byRepository;

        private CompositeEventBundle() {
            this.byRepository = new HashMap();
        }

        void push(Event event) {
            String repositoryName = event.getContext().getRepositoryName();
            if (!this.byRepository.containsKey(repositoryName)) {
                this.byRepository.put(repositoryName, new EventBundleImpl());
            }
            this.byRepository.get(repositoryName).push(event);
        }
    }

    public void init() {
        this.asyncExec.init();
        EventDispatcherDescriptor dispatcherDescriptor = this.dispatchers.getDispatcherDescriptor();
        if (dispatcherDescriptor != null) {
            List<EventPipeDescriptor> pipes = this.registeredPipes.getPipes();
            if (pipes.isEmpty()) {
                return;
            }
            this.pipeDispatcher = dispatcherDescriptor.getInstance();
            this.pipeDispatcher.init(pipes, dispatcherDescriptor.getParameters());
        }
    }

    public EventBundleDispatcher getEventBundleDispatcher() {
        return this.pipeDispatcher;
    }

    public void shutdown(long j) throws InterruptedException {
        this.postCommitExec.shutdown(j);
        Set set = (Set) this.asyncWaitHooks.stream().filter(asyncWaitHook -> {
            return !asyncWaitHook.shutdown();
        }).collect(Collectors.toSet());
        if (!set.isEmpty()) {
            throw new RuntimeException("Asynch services are still running : " + set);
        }
        if (!this.asyncExec.shutdown(j)) {
            throw new RuntimeException("Async executor is still running, timeout expired");
        }
        if (this.pipeDispatcher != null) {
            this.pipeDispatcher.shutdown();
        }
    }

    public void registerForAsyncWait(AsyncWaitHook asyncWaitHook) {
        this.asyncWaitHooks.add(asyncWaitHook);
    }

    public void unregisterForAsyncWait(AsyncWaitHook asyncWaitHook) {
        this.asyncWaitHooks.remove(asyncWaitHook);
    }

    @Deprecated
    public int getActiveAsyncTaskCount() {
        return this.asyncExec.getUnfinishedCount();
    }

    @Override // org.nuxeo.ecm.core.event.EventService
    public void waitForAsyncCompletion() {
        waitForAsyncCompletion(Long.MAX_VALUE);
    }

    @Override // org.nuxeo.ecm.core.event.EventService
    public void waitForAsyncCompletion(long j) {
        Set set = (Set) this.asyncWaitHooks.stream().filter(asyncWaitHook -> {
            return !asyncWaitHook.waitForAsyncCompletion();
        }).collect(Collectors.toSet());
        if (!set.isEmpty()) {
            throw new RuntimeException("Async tasks are still running : " + set);
        }
        try {
            if (!this.asyncExec.waitForCompletion(j)) {
                throw new RuntimeException("Async event listeners thread pool is not terminated");
            }
            if (this.pipeDispatcher != null) {
                try {
                    this.pipeDispatcher.waitForCompletion(j);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(e);
                }
            }
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e2);
        }
    }

    @Override // org.nuxeo.ecm.core.event.EventService
    public void addEventListener(EventListenerDescriptor eventListenerDescriptor) {
        this.listenerDescriptors.add(eventListenerDescriptor);
        log.debug("Registered event listener: " + eventListenerDescriptor.getName());
    }

    public void addEventPipe(EventPipeDescriptor eventPipeDescriptor) {
        this.registeredPipes.addContribution(eventPipeDescriptor);
        log.debug("Registered event pipe: " + eventPipeDescriptor.getName());
    }

    public void addEventDispatcher(EventDispatcherDescriptor eventDispatcherDescriptor) {
        this.dispatchers.addContrib(eventDispatcherDescriptor);
        log.debug("Registered event dispatcher: " + eventDispatcherDescriptor.getName());
    }

    @Override // org.nuxeo.ecm.core.event.EventService
    public void removeEventListener(EventListenerDescriptor eventListenerDescriptor) {
        this.listenerDescriptors.removeDescriptor(eventListenerDescriptor);
        log.debug("Unregistered event listener: " + eventListenerDescriptor.getName());
    }

    public void removeEventPipe(EventPipeDescriptor eventPipeDescriptor) {
        this.registeredPipes.removeContribution(eventPipeDescriptor);
        log.debug("Unregistered event pipe: " + eventPipeDescriptor.getName());
    }

    public void removeEventDispatcher(EventDispatcherDescriptor eventDispatcherDescriptor) {
        this.dispatchers.removeContrib(eventDispatcherDescriptor);
        log.debug("Unregistered event dispatcher: " + eventDispatcherDescriptor.getName());
    }

    @Override // org.nuxeo.ecm.core.event.EventService
    public void fireEvent(String str, EventContext eventContext) {
        fireEvent(new EventImpl(str, eventContext));
    }

    @Override // org.nuxeo.ecm.core.event.EventService, org.nuxeo.ecm.core.event.EventProducer
    public void fireEvent(Event event) {
        String str;
        String name = event.getName();
        EventStats eventStats = (EventStats) Framework.getService(EventStats.class);
        for (EventListenerDescriptor eventListenerDescriptor : this.listenerDescriptors.getEnabledInlineListenersDescriptors()) {
            if (eventListenerDescriptor.acceptEvent(name)) {
                try {
                    long currentTimeMillis = System.currentTimeMillis();
                    SequenceTracer.start("Fire sync event " + event.getName());
                    eventListenerDescriptor.asEventListener().handleEvent(event);
                    long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                    SequenceTracer.stop("done in " + currentTimeMillis2 + " ms");
                    if (eventStats != null) {
                        eventStats.logSyncExec(eventListenerDescriptor, currentTimeMillis2);
                    }
                    if (event.isCanceled()) {
                        return;
                    }
                } catch (RuntimeException e) {
                    SequenceTracer.destroy("failure");
                    String str2 = "Exception during " + eventListenerDescriptor.getName() + " sync listener execution, ";
                    if (event.isBubbleException()) {
                        str = str2 + "other listeners will be ignored";
                    } else if (event.isMarkedForRollBack()) {
                        str = str2 + "transaction will be rolled back";
                        if (event.getRollbackMessage() != null) {
                            str = str + " (" + event.getRollbackMessage() + ")";
                        }
                    } else {
                        str = str2 + "continuing to run other listeners";
                    }
                    if (e instanceof RecoverableClientException) {
                        log.info(str + "\n" + e.getMessage());
                        log.debug(str, e);
                    } else {
                        log.error(str, e);
                    }
                    if (event.isBubbleException()) {
                        throw e;
                    }
                    if (event.isMarkedForRollBack()) {
                        throw new RuntimeException(str, event.getRollbackException() != null ? event.getRollbackException() : e);
                    }
                }
            }
        }
        if (event.isInline()) {
            return;
        }
        ShallowEvent create = ShallowEvent.create(event);
        if (!event.isImmediate()) {
            recordEvent(create);
            return;
        }
        EventBundleImpl eventBundleImpl = new EventBundleImpl();
        eventBundleImpl.push(create);
        fireEventBundle(eventBundleImpl);
    }

    @Override // org.nuxeo.ecm.core.event.EventService, org.nuxeo.ecm.core.event.EventProducer
    public void fireEventBundle(EventBundle eventBundle) {
        boolean z = false;
        if ((eventBundle instanceof ReconnectedEventBundle) && ((ReconnectedEventBundle) eventBundle).comesFromJMS()) {
            z = true;
        }
        List<EventListenerDescriptor> enabledSyncPostCommitListenersDescriptors = this.listenerDescriptors.getEnabledSyncPostCommitListenersDescriptors();
        List<EventListenerDescriptor> enabledAsyncPostCommitListenersDescriptors = this.listenerDescriptors.getEnabledAsyncPostCommitListenersDescriptors();
        if (this.bulkModeEnabled) {
            List<EventListenerDescriptor> arrayList = new ArrayList();
            if (!this.blockSyncPostCommitProcessing) {
                arrayList = enabledSyncPostCommitListenersDescriptors;
            }
            if (!this.blockAsyncProcessing) {
                arrayList.addAll(enabledAsyncPostCommitListenersDescriptors);
            }
            if (arrayList.isEmpty()) {
                return;
            }
            this.postCommitExec.runBulk(arrayList, eventBundle);
            return;
        }
        if (this.blockSyncPostCommitProcessing) {
            log.debug("Dropping PostCommit handler execution");
        } else if (z) {
            log.debug("Deactivating sync post-commit listener since we are called from JMS");
        } else if (!enabledSyncPostCommitListenersDescriptors.isEmpty()) {
            this.postCommitExec.run(enabledSyncPostCommitListenersDescriptors, eventBundle);
        }
        if (this.blockAsyncProcessing) {
            log.debug("Dopping bundle");
            return;
        }
        if (AsyncProcessorConfig.forceJMSUsage() && !z) {
            log.debug("Skipping async exec, this will be triggered via JMS");
        } else if (this.pipeDispatcher == null) {
            this.asyncExec.run(enabledAsyncPostCommitListenersDescriptors, eventBundle);
        } else {
            this.pipeDispatcher.sendEventBundle(eventBundle);
        }
    }

    @Override // org.nuxeo.ecm.core.event.EventService
    public void fireEventBundleSync(EventBundle eventBundle) {
        Iterator<EventListenerDescriptor> it = this.listenerDescriptors.getEnabledSyncPostCommitListenersDescriptors().iterator();
        while (it.hasNext()) {
            it.next().asPostCommitListener().handleEvent(eventBundle);
        }
        Iterator<EventListenerDescriptor> it2 = this.listenerDescriptors.getEnabledAsyncPostCommitListenersDescriptors().iterator();
        while (it2.hasNext()) {
            it2.next().asPostCommitListener().handleEvent(eventBundle);
        }
    }

    @Override // org.nuxeo.ecm.core.event.EventService
    public List<EventListener> getEventListeners() {
        return this.listenerDescriptors.getInLineListeners();
    }

    @Override // org.nuxeo.ecm.core.event.EventService
    public List<PostCommitEventListener> getPostCommitEventListeners() {
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(this.listenerDescriptors.getSyncPostCommitListeners());
        arrayList.addAll(this.listenerDescriptors.getAsyncPostCommitListeners());
        return arrayList;
    }

    public EventListenerList getEventListenerList() {
        return this.listenerDescriptors;
    }

    @Override // org.nuxeo.ecm.core.event.EventService
    public EventListenerDescriptor getEventListener(String str) {
        return this.listenerDescriptors.getDescriptor(str);
    }

    @Override // org.nuxeo.ecm.core.event.EventServiceAdmin
    public EventListenerList getListenerList() {
        return this.listenerDescriptors;
    }

    @Override // org.nuxeo.ecm.core.event.EventServiceAdmin
    public void setListenerEnabledFlag(String str, boolean z) {
        if (this.listenerDescriptors.hasListener(str)) {
            for (EventListenerDescriptor eventListenerDescriptor : this.listenerDescriptors.getAsyncPostCommitListenersDescriptors()) {
                if (eventListenerDescriptor.getName().equals(str)) {
                    eventListenerDescriptor.setEnabled(z);
                    synchronized (this) {
                        this.listenerDescriptors.recomputeEnabledListeners();
                    }
                    return;
                }
            }
            for (EventListenerDescriptor eventListenerDescriptor2 : this.listenerDescriptors.getSyncPostCommitListenersDescriptors()) {
                if (eventListenerDescriptor2.getName().equals(str)) {
                    eventListenerDescriptor2.setEnabled(z);
                    synchronized (this) {
                        this.listenerDescriptors.recomputeEnabledListeners();
                    }
                    return;
                }
            }
            for (EventListenerDescriptor eventListenerDescriptor3 : this.listenerDescriptors.getInlineListenersDescriptors()) {
                if (eventListenerDescriptor3.getName().equals(str)) {
                    eventListenerDescriptor3.setEnabled(z);
                    synchronized (this) {
                        this.listenerDescriptors.recomputeEnabledListeners();
                    }
                    return;
                }
            }
        }
    }

    @Override // org.nuxeo.ecm.core.event.EventServiceAdmin
    public int getActiveThreadsCount() {
        return this.asyncExec.getActiveCount();
    }

    @Override // org.nuxeo.ecm.core.event.EventServiceAdmin
    public int getEventsInQueueCount() {
        return this.asyncExec.getUnfinishedCount();
    }

    @Override // org.nuxeo.ecm.core.event.EventServiceAdmin
    public boolean isBlockAsyncHandlers() {
        return this.blockAsyncProcessing;
    }

    @Override // org.nuxeo.ecm.core.event.EventServiceAdmin
    public boolean isBlockSyncPostCommitHandlers() {
        return this.blockSyncPostCommitProcessing;
    }

    @Override // org.nuxeo.ecm.core.event.EventServiceAdmin
    public void setBlockAsyncHandlers(boolean z) {
        this.blockAsyncProcessing = z;
    }

    @Override // org.nuxeo.ecm.core.event.EventServiceAdmin
    public void setBlockSyncPostCommitHandlers(boolean z) {
        this.blockSyncPostCommitProcessing = z;
    }

    @Override // org.nuxeo.ecm.core.event.EventServiceAdmin
    public boolean isBulkModeEnabled() {
        return this.bulkModeEnabled;
    }

    @Override // org.nuxeo.ecm.core.event.EventServiceAdmin
    public void setBulkModeEnabled(boolean z) {
        this.bulkModeEnabled = z;
    }

    protected void recordEvent(Event event) {
        CompositeEventBundle compositeEventBundle = threadBundles.get();
        compositeEventBundle.push(event);
        if (!TransactionHelper.isTransactionActive()) {
            if (event.isCommitEvent()) {
                handleTxCommited();
            }
        } else {
            if (compositeEventBundle.registeredSynchronization) {
                return;
            }
            try {
                TransactionHelper.lookupTransactionManager().getTransaction().registerSynchronization(this);
                compositeEventBundle.registeredSynchronization = true;
            } catch (NamingException | SystemException | RollbackException e) {
                throw new RuntimeException("Cannot register Synchronization", e);
            }
        }
    }

    public void beforeCompletion() {
    }

    public void afterCompletion(int i) {
        if (i == 3) {
            handleTxCommited();
        } else if (i == 4) {
            handleTxRollbacked();
        } else {
            log.error("Unexpected afterCompletion status: " + i);
        }
    }

    protected void handleTxRollbacked() {
        threadBundles.remove();
    }

    protected void handleTxCommited() {
        CompositeEventBundle compositeEventBundle = threadBundles.get();
        threadBundles.remove();
        for (EventBundle eventBundle : compositeEventBundle.byRepository.values()) {
            try {
                fireEventBundle(eventBundle);
            } catch (NuxeoException e) {
                log.error("Error while processing " + eventBundle, e);
            }
        }
    }
}
