package org.nuxeo.ecm.notification.listener;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.stream.Collectors;
import javax.naming.NamingException;
import javax.transaction.RollbackException;
import javax.transaction.Synchronization;
import javax.transaction.SystemException;
import javax.transaction.TransactionManager;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.nuxeo.ecm.core.api.NuxeoException;
import org.nuxeo.ecm.core.api.NuxeoPrincipal;
import org.nuxeo.ecm.core.event.Event;
import org.nuxeo.ecm.core.event.EventContext;
import org.nuxeo.ecm.core.event.EventListener;
import org.nuxeo.ecm.core.event.impl.DocumentEventContext;
import org.nuxeo.ecm.notification.NotificationService;
import org.nuxeo.ecm.notification.NotificationStreamConfig;
import org.nuxeo.ecm.notification.event.EventFilter;
import org.nuxeo.ecm.notification.message.EventRecord;
import org.nuxeo.lib.stream.codec.Codec;
import org.nuxeo.lib.stream.computation.Record;
import org.nuxeo.lib.stream.log.LogAppender;
import org.nuxeo.runtime.api.Framework;
import org.nuxeo.runtime.codec.CodecService;
import org.nuxeo.runtime.transaction.TransactionHelper;

/* loaded from: input_file:org/nuxeo/ecm/notification/listener/EventsStreamListener.class */
public class EventsStreamListener implements EventListener, Synchronization {
    protected static final String DELIMITER = ":";
    private static final Logger log = LogManager.getLogger(EventsStreamListener.class);
    protected static final ThreadLocal<Boolean> isEnlisted = ThreadLocal.withInitial(() -> {
        return Boolean.FALSE;
    });
    protected static final ThreadLocal<Map<String, EventRecord>> entries = ThreadLocal.withInitial(HashMap::new);

    public void handleEvent(Event event) {
        if (!isEnlisted.get().booleanValue()) {
            isEnlisted.set(Boolean.valueOf(registerSynchronization(this)));
            entries.get().clear();
            log.debug("EventsStreamListener collecting entries for the tx");
        }
        NuxeoPrincipal principal = event.getContext().getPrincipal();
        if (principal == null) {
            return;
        }
        String actingUser = principal.getActingUser();
        if (StringUtils.isBlank(actingUser) || "system".equals(actingUser)) {
            return;
        }
        EventRecord.EventRecordBuilder withUsername = EventRecord.builder().withEventName(event.getName()).withTime(event.getTime()).withUsername(actingUser);
        if (isDocumentEventContext(event.getContext())) {
            withUsername.withDocument(event.getContext().getSourceDocument());
        }
        ((NotificationService) Framework.getService(NotificationService.class)).getEventTransformers().stream().filter(eventTransformer -> {
            return eventTransformer.accept(event);
        }).forEach(eventTransformer2 -> {
            withUsername.withContext(eventTransformer2.buildEventRecordContext(event));
        });
        EventRecord build = withUsername.build();
        entries.get().putIfAbsent(generateKey(event), build);
        if (isEnlisted.get().booleanValue()) {
            return;
        }
        afterCompletion(3);
    }

    protected LogAppender<Record> getAppender() {
        NotificationStreamConfig notificationStreamConfig = (NotificationStreamConfig) Framework.getService(NotificationStreamConfig.class);
        if (StringUtils.isEmpty(notificationStreamConfig.getEventInputStream())) {
            throw new NuxeoException("There is no Stream configured to publish the event record.");
        }
        return notificationStreamConfig.getLogManager(notificationStreamConfig.getLogConfigNotification()).getAppender(notificationStreamConfig.getEventInputStream());
    }

    protected void appendEntries() {
        if (entries.get().isEmpty()) {
            return;
        }
        LogAppender<Record> appender = getAppender();
        Codec codec = ((CodecService) Framework.getService(CodecService.class)).getCodec("avro", EventRecord.class);
        filterEntries(entries.get()).forEach((str, eventRecord) -> {
            appender.append(str, Record.of(str, codec.encode(eventRecord)));
        });
    }

    protected Map<String, EventRecord> filterEntries(Map<String, EventRecord> map) {
        return (Map) map.entrySet().stream().filter(entry -> {
            Iterator<EventFilter> it = ((NotificationService) Framework.getService(NotificationService.class)).getEventFilters().iterator();
            while (it.hasNext()) {
                if (!it.next().acceptEvent(map, (EventRecord) entry.getValue())) {
                    return false;
                }
            }
            return true;
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
    }

    protected String generateKey(Event event) {
        if (isDocumentEventContext(event.getContext())) {
            return String.join(":", event.getName(), event.getContext().getSourceDocument().getId());
        }
        NuxeoPrincipal principal = event.getContext().getPrincipal();
        return String.join(":", event.getName(), principal.getTenantId(), principal.getName());
    }

    protected boolean isDocumentEventContext(EventContext eventContext) {
        return (eventContext instanceof DocumentEventContext) && ((DocumentEventContext) eventContext).getSourceDocument() != null;
    }

    public void beforeCompletion() {
        log.debug("{} going to write {} entries.", getClass().getSimpleName(), Integer.valueOf(entries.get().size()));
    }

    public void afterCompletion(int i) {
        try {
            if (entries.get().isEmpty() || 1 == i || 4 == i) {
                isEnlisted.set(false);
                entries.get().clear();
            } else {
                appendEntries();
                log.debug("{} writes {} entries.", getClass().getSimpleName(), Integer.valueOf(entries.get().size()));
                isEnlisted.set(false);
                entries.get().clear();
            }
        } catch (Throwable th) {
            isEnlisted.set(false);
            entries.get().clear();
            throw th;
        }
    }

    protected boolean registerSynchronization(Synchronization synchronization) {
        try {
            TransactionManager lookupTransactionManager = TransactionHelper.lookupTransactionManager();
            if (lookupTransactionManager == null) {
                log.error("Unable to register synchronization : no TransactionManager");
                return false;
            }
            if (lookupTransactionManager.getTransaction() == null) {
                return false;
            }
            lookupTransactionManager.getTransaction().registerSynchronization(synchronization);
            return true;
        } catch (NamingException | IllegalStateException | SystemException | RollbackException e) {
            log.error("Unable to register synchronization", e);
            return false;
        }
    }
}
