package org.nuxeo.ecm.core.event.pipe;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.ecm.core.event.EventBundle;
import org.nuxeo.ecm.core.event.EventServiceAdmin;
import org.nuxeo.ecm.core.event.impl.AsyncEventExecutor;
import org.nuxeo.ecm.core.event.impl.EventListenerDescriptor;
import org.nuxeo.runtime.api.Framework;

/* loaded from: input_file:org/nuxeo/ecm/core/event/pipe/AbstractListenerPipeConsumer.class */
public abstract class AbstractListenerPipeConsumer<T> extends AbstractPipeConsumer<T> {
    private static final Log log = LogFactory.getLog(AbstractListenerPipeConsumer.class);
    protected volatile AsyncEventExecutor asyncExec;
    protected boolean stopping;

    @Override // org.nuxeo.ecm.core.event.pipe.AbstractPipeConsumer, org.nuxeo.ecm.core.event.pipe.PipeConsumer
    public void initConsumer(String str, Map<String, String> map) {
        super.initConsumer(str, map);
        this.asyncExec = new AsyncEventExecutor();
        if (Framework.getRuntime() == null) {
            throw new RuntimeException("Nuxeo Runtime not initialized");
        }
    }

    @Override // org.nuxeo.ecm.core.event.pipe.PipeConsumer
    public void shutdown() throws InterruptedException {
        this.stopping = true;
        waitForCompletion(1000L);
    }

    @Override // org.nuxeo.ecm.core.event.pipe.AbstractPipeConsumer
    protected boolean processEventBundles(List<EventBundle> list) {
        List<EventListenerDescriptor> enabledAsyncPostCommitListenersDescriptors = ((EventServiceAdmin) Framework.getService(EventServiceAdmin.class)).getListenerList().getEnabledAsyncPostCommitListenersDescriptors();
        Iterator<EventBundle> it = list.iterator();
        while (it.hasNext()) {
            this.asyncExec.run(enabledAsyncPostCommitListenersDescriptors, it.next());
        }
        return true;
    }

    @Override // org.nuxeo.ecm.core.event.pipe.PipeConsumer
    public boolean waitForCompletion(long j) throws InterruptedException {
        return this.asyncExec.waitForCompletion(j);
    }
}
