package org.nuxeo.ecm.platform.queue.core;

import java.io.Serializable;
import java.net.URI;
import java.util.Date;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.ecm.core.management.api.AdministrativeStatusManager;
import org.nuxeo.ecm.platform.lock.api.AlreadyLockedException;
import org.nuxeo.ecm.platform.lock.api.LockCoordinator;
import org.nuxeo.ecm.platform.lock.api.NoSuchLockException;
import org.nuxeo.ecm.platform.lock.api.NotOwnerException;
import org.nuxeo.ecm.platform.queue.api.QueueError;
import org.nuxeo.ecm.platform.queue.api.QueueHandler;
import org.nuxeo.ecm.platform.queue.api.QueueInfo;
import org.nuxeo.ecm.platform.queue.api.QueuePersister;
import org.nuxeo.ecm.platform.queue.api.QueueProcessor;
import org.nuxeo.ecm.platform.queue.api.Transacted;
import org.nuxeo.runtime.api.Framework;

/* loaded from: input_file:org/nuxeo/ecm/platform/queue/core/DefaultQueueHandler.class */
public class DefaultQueueHandler implements QueueHandler {
    protected static Log log = LogFactory.getLog(DefaultQueueHandler.class);
    protected int delay;
    protected DefaultQueueRegistry registry;

    public void setDelay(int i) {
        this.delay = i;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public DefaultQueueHandler(int i, DefaultQueueRegistry defaultQueueRegistry) {
        this.delay = i;
        this.registry = defaultQueueRegistry;
    }

    @Transacted
    public <C extends Serializable> void newContent(URI uri, URI uri2, C c) {
        if (!isServerActive()) {
            throw new QueueError("Server is not active");
        }
        QueuePersister<C> persister = this.registry.getPersister(uri2);
        QueueInfo addContent = persister.addContent(uri, uri2, c);
        QueueProcessor<C> processor = this.registry.getProcessor(uri2);
        persister.setExecuteTime(uri2, new Date());
        processor.process(addContent);
    }

    @Transacted
    public <C extends Serializable> void newContentIfUnknown(URI uri, URI uri2, C c) {
        if (!isServerActive()) {
            throw new QueueError("Server is not active");
        }
        LockCoordinator lockCoordinator = (LockCoordinator) Framework.getLocalService(LockCoordinator.class);
        try {
            lockCoordinator.lock(uri, uri2, "locking for injecting  " + uri2, this.delay);
            QueuePersister<C> persister = this.registry.getPersister(uri2);
            try {
                if (persister.hasContent(uri2)) {
                    try {
                        lockCoordinator.unlock(uri, uri2);
                        return;
                    } catch (NoSuchLockException e) {
                        throw new QueueError("Resource is unexpectedly not locked", e);
                    } catch (NotOwnerException e2) {
                        log.warn("Resource is unexpectedly locked by another user", e2);
                        return;
                    }
                }
                QueueInfo addContent = persister.addContent(uri, uri2, c);
                try {
                    lockCoordinator.unlock(uri, uri2);
                    QueueProcessor<C> processor = this.registry.getProcessor(uri2);
                    persister.setExecuteTime(uri2, new Date());
                    processor.process(addContent);
                } catch (NotOwnerException e3) {
                    log.warn("Resource is unexpectedly locked by another user", e3);
                } catch (NoSuchLockException e4) {
                    throw new QueueError("Resource is unexpectedly not locked", e4);
                }
            } catch (Throwable th) {
                try {
                    lockCoordinator.unlock(uri, uri2);
                    throw th;
                } catch (NoSuchLockException e5) {
                    throw new QueueError("Resource is unexpectedly not locked", e5);
                } catch (NotOwnerException e6) {
                    log.warn("Resource is unexpectedly locked by another user", e6);
                }
            }
        } catch (AlreadyLockedException e7) {
            log.debug("Already locked resource " + uri2, e7);
        } catch (Throwable th2) {
            throw new QueueError("Couldn't lock the resource", th2, uri2);
        }
    }

    protected boolean isServerActive() {
        return ((AdministrativeStatusManager) Framework.getLocalService(AdministrativeStatusManager.class)).getNuxeoInstanceStatus().isActive();
    }

    public URI newName(String str, String str2) {
        return this.registry.newContentName(str, str2);
    }

    @Transacted
    public <C extends Serializable> QueueInfo<C> cancel(URI uri) {
        return this.registry.getPersister(uri).removeContent(uri);
    }

    public <C extends Serializable> QueueInfo<C> retry(URI uri) {
        QueueInfo<C> info = this.registry.getPersister(uri).getInfo(uri);
        this.registry.getProcessor(uri).process(info);
        return info;
    }
}
