package org.nuxeo.ecm.platform.mqueues.audit;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.ecm.core.api.NuxeoException;
import org.nuxeo.ecm.platform.audit.api.AuditLogger;
import org.nuxeo.ecm.platform.audit.api.LogEntry;
import org.nuxeo.ecm.platform.audit.impl.LogEntryImpl;
import org.nuxeo.ecm.platform.mqueues.Computations;
import org.nuxeo.lib.core.mqueues.computation.AbstractComputation;
import org.nuxeo.lib.core.mqueues.computation.ComputationContext;
import org.nuxeo.lib.core.mqueues.computation.Record;
import org.nuxeo.lib.core.mqueues.computation.Topology;
import org.nuxeo.runtime.api.Framework;

/* loaded from: input_file:org/nuxeo/ecm/platform/mqueues/audit/AuditLogWriter.class */
public class AuditLogWriter implements Computations {
    private static final Log log = LogFactory.getLog(AuditLogWriter.class);
    public static final String COMPUTATION_NAME = "AuditLogWriter";
    public static final String BATCH_SIZE_OPT = "batchSize";
    public static final String BATCH_THRESHOLD_MS_OPT = "batchThresholdMs";
    public static final int DEFAULT_BATCH_SIZE = 10;
    public static final int DEFAULT_BATCH_THRESHOLD_MS = 200;

    /* loaded from: input_file:org/nuxeo/ecm/platform/mqueues/audit/AuditLogWriter$AuditLogWriterComputation.class */
    public class AuditLogWriterComputation extends AbstractComputation {
        protected final int batchSize;
        protected final int batchThresholdMs;
        protected final List<LogEntry> logEntries;

        public AuditLogWriterComputation(String str, int i, int i2) {
            super(str, 1, 0);
            this.batchSize = i;
            this.batchThresholdMs = i2;
            this.logEntries = new ArrayList(i);
        }

        public void init(ComputationContext computationContext) {
            AuditLogWriter.log.debug(String.format("Starting computation: %s reading on: %s, batch size: %d, threshold: %dms", AuditLogWriter.COMPUTATION_NAME, "audit", Integer.valueOf(this.batchSize), Integer.valueOf(this.batchThresholdMs)));
            computationContext.setTimer("batch", System.currentTimeMillis() + this.batchThresholdMs);
        }

        public void processTimer(ComputationContext computationContext, String str, long j) {
            writeEntriesToAudit(computationContext);
            computationContext.setTimer("batch", System.currentTimeMillis() + this.batchThresholdMs);
        }

        public void processRecord(ComputationContext computationContext, String str, Record record) {
            try {
                this.logEntries.add(getLogEntryFromJson(record.data));
                if (this.logEntries.size() >= this.batchSize) {
                    writeEntriesToAudit(computationContext);
                }
            } catch (NuxeoException e) {
                AuditLogWriter.log.error("Discard invalid record: " + record, e);
            }
        }

        public void destroy() {
            AuditLogWriter.log.debug(String.format("Destroy computation: %s, pending entries: %d", AuditLogWriter.COMPUTATION_NAME, Integer.valueOf(this.logEntries.size())));
        }

        protected void writeEntriesToAudit(ComputationContext computationContext) {
            if (this.logEntries.isEmpty()) {
                return;
            }
            if (AuditLogWriter.log.isDebugEnabled()) {
                AuditLogWriter.log.debug(String.format("Writing %d log entries to audit backend.", Integer.valueOf(this.logEntries.size())));
            }
            ((AuditLogger) Framework.getService(AuditLogger.class)).addLogEntries(this.logEntries);
            this.logEntries.clear();
            computationContext.askForCheckpoint();
        }

        protected LogEntry getLogEntryFromJson(byte[] bArr) {
            String str = "";
            try {
                str = new String(bArr, "UTF-8");
                return (LogEntry) new ObjectMapper().readValue(str, LogEntryImpl.class);
            } catch (UnsupportedEncodingException e) {
                throw new NuxeoException("Discard log entry, invalid byte array", e);
            } catch (IOException e2) {
                throw new NuxeoException("Invalid json logEntry" + str, e2);
            }
        }
    }

    @Override // org.nuxeo.ecm.platform.mqueues.Computations
    public Topology getTopology(Map<String, String> map) {
        int optionAsInteger = getOptionAsInteger(map, BATCH_SIZE_OPT, 10);
        int optionAsInteger2 = getOptionAsInteger(map, BATCH_THRESHOLD_MS_OPT, DEFAULT_BATCH_THRESHOLD_MS);
        return Topology.builder().addComputation(() -> {
            return new AuditLogWriterComputation(COMPUTATION_NAME, optionAsInteger, optionAsInteger2);
        }, Collections.singletonList("i1:audit")).build();
    }

    protected int getOptionAsInteger(Map<String, String> map, String str, int i) {
        String str2 = map.get(str);
        return str2 == null ? i : Integer.valueOf(str2).intValue();
    }
}
