package org.nuxeo.ecm.core.bulk;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.security.auth.login.LoginContext;
import javax.security.auth.login.LoginException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.ecm.core.api.CloseableCoreSession;
import org.nuxeo.ecm.core.api.CoreInstance;
import org.nuxeo.ecm.core.api.NuxeoException;
import org.nuxeo.ecm.core.api.ScrollResult;
import org.nuxeo.ecm.core.bulk.BulkStatus;
import org.nuxeo.lib.stream.computation.AbstractComputation;
import org.nuxeo.lib.stream.computation.ComputationContext;
import org.nuxeo.lib.stream.computation.Record;
import org.nuxeo.lib.stream.computation.Topology;
import org.nuxeo.runtime.api.Framework;
import org.nuxeo.runtime.codec.CodecService;
import org.nuxeo.runtime.kv.KeyValueService;
import org.nuxeo.runtime.kv.KeyValueStore;
import org.nuxeo.runtime.stream.StreamProcessorTopology;
import org.nuxeo.runtime.transaction.TransactionHelper;

/* loaded from: input_file:org/nuxeo/ecm/core/bulk/StreamBulkProcessor.class */
public class StreamBulkProcessor implements StreamProcessorTopology {
    private static final Log log = LogFactory.getLog(StreamBulkProcessor.class);
    public static final String AVRO_CODEC = "avro";
    public static final String SCROLLER_COMPUTATION_NAME = "bulkDocumentScroller";
    public static final String COUNTER_COMPUTATION_NAME = "bulkCounter";
    public static final String KVWRITER_COMPUTATION_NAME = "keyValueWriter";
    public static final String COUNTER_STREAM_NAME = "counter";
    public static final String KVWRITER_STREAM_NAME = "keyValueWriter";
    public static final String SCROLL_BATCH_SIZE_OPT = "scrollBatchSize";
    public static final String SCROLL_KEEP_ALIVE_SECONDS_OPT = "scrollKeepAlive";
    public static final String BUCKET_SIZE_OPT = "bucketSize";
    public static final String COUNTER_THRESHOLD_MS_OPT = "counterThresholdMs";
    public static final int DEFAULT_SCROLL_BATCH_SIZE = 100;
    public static final int DEFAULT_SCROLL_KEEPALIVE_SECONDS = 60;
    public static final int DEFAULT_BUCKET_SIZE = 50;
    public static final int DEFAULT_COUNTER_THRESHOLD_MS = 30000;

    /* loaded from: input_file:org/nuxeo/ecm/core/bulk/StreamBulkProcessor$BulkDocumentScrollerComputation.class */
    public static class BulkDocumentScrollerComputation extends AbstractComputation {
        protected final int scrollBatchSize;
        protected final int scrollKeepAliveSeconds;
        protected final int bucketSize;
        protected final List<String> documentIds;

        public BulkDocumentScrollerComputation(String str, int i, int i2, int i3, int i4) {
            super(str, 1, i);
            this.scrollBatchSize = i2;
            this.scrollKeepAliveSeconds = i3;
            this.bucketSize = i4;
            this.documentIds = new ArrayList(Integer.max(i2, i4));
        }

        public void processRecord(ComputationContext computationContext, String str, Record record) {
            TransactionHelper.runInTransaction(() -> {
                processRecord(computationContext, record);
            });
        }

        /* JADX WARN: Finally extract failed */
        protected void processRecord(ComputationContext computationContext, Record record) {
            KeyValueStore keyValueStore = ((KeyValueService) Framework.getService(KeyValueService.class)).getKeyValueStore("bulk");
            try {
                String key = record.getKey();
                BulkCommand fromBytes = BulkCommands.fromBytes(record.getData());
                if (!keyValueStore.compareAndSet(key + ":state", BulkStatus.State.SCHEDULED.toString(), BulkStatus.State.SCROLLING_RUNNING.toString())) {
                    StreamBulkProcessor.log.error("Discard record: " + record + " because it's already building");
                    computationContext.askForCheckpoint();
                    return;
                }
                try {
                    LoginContext loginAsUser = Framework.loginAsUser(fromBytes.getUsername());
                    try {
                        CloseableCoreSession openCoreSession = CoreInstance.openCoreSession(fromBytes.getRepository());
                        Throwable th = null;
                        try {
                            Long valueOf = Long.valueOf(Instant.now().toEpochMilli());
                            ScrollResult scroll = openCoreSession.scroll(fromBytes.getQuery(), this.scrollBatchSize, this.scrollKeepAliveSeconds);
                            long j = 0;
                            long j2 = 0;
                            while (scroll.hasResults()) {
                                this.documentIds.addAll(scroll.getResults());
                                while (this.documentIds.size() >= this.bucketSize) {
                                    j2++;
                                    produceBucket(computationContext, fromBytes.getAction(), key, j2 * this.bucketSize);
                                }
                                j += r0.size();
                                scroll = openCoreSession.scroll(scroll.getScrollId());
                                TransactionHelper.commitOrRollbackTransaction();
                                TransactionHelper.startTransaction();
                            }
                            if (!this.documentIds.isEmpty()) {
                                produceBucket(computationContext, fromBytes.getAction(), key, j);
                            }
                            Long valueOf2 = Long.valueOf(Instant.now().toEpochMilli());
                            BulkUpdate bulkUpdate = new BulkUpdate();
                            bulkUpdate.put(key + ":scrollStartTime", valueOf.toString());
                            bulkUpdate.put(key + ":scrollEndTime", valueOf2.toString());
                            bulkUpdate.put(key + ":state", BulkStatus.State.RUNNING.toString());
                            bulkUpdate.put(key + ":count", String.valueOf(j));
                            computationContext.produceRecord("keyValueWriter", key, ((CodecService) Framework.getService(CodecService.class)).getCodec(StreamBulkProcessor.AVRO_CODEC, BulkUpdate.class).encode(bulkUpdate));
                            if (openCoreSession != null) {
                                if (0 != 0) {
                                    try {
                                        openCoreSession.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    openCoreSession.close();
                                }
                            }
                            loginAsUser.logout();
                        } catch (Throwable th3) {
                            if (openCoreSession != null) {
                                if (0 != 0) {
                                    try {
                                        openCoreSession.close();
                                    } catch (Throwable th4) {
                                        th.addSuppressed(th4);
                                    }
                                } else {
                                    openCoreSession.close();
                                }
                            }
                            throw th3;
                        }
                    } catch (Throwable th5) {
                        loginAsUser.logout();
                        throw th5;
                    }
                } catch (LoginException e) {
                    throw new NuxeoException(e);
                }
            } catch (NuxeoException e2) {
                StreamBulkProcessor.log.error("Discard invalid record: " + record, e2);
            }
        }

        protected void produceBucket(ComputationContext computationContext, String str, String str2, long j) {
            List<String> subList = this.documentIds.subList(0, Math.min(this.bucketSize, this.documentIds.size()));
            computationContext.produceRecord(str, BulkRecords.of(str2, j, subList));
            computationContext.askForCheckpoint();
            subList.clear();
        }
    }

    /* loaded from: input_file:org/nuxeo/ecm/core/bulk/StreamBulkProcessor$CounterComputation.class */
    public static class CounterComputation extends AbstractComputation {
        protected final int counterThresholdMs;
        protected final Map<String, Long> counters;

        public CounterComputation(String str, int i) {
            super(str, 1, 1);
            this.counterThresholdMs = i;
            this.counters = new HashMap();
        }

        public void init(ComputationContext computationContext) {
            StreamBulkProcessor.log.debug(String.format("Starting computation: %s reading on: %s, threshold: %dms", StreamBulkProcessor.COUNTER_COMPUTATION_NAME, StreamBulkProcessor.COUNTER_STREAM_NAME, Integer.valueOf(this.counterThresholdMs)));
            computationContext.setTimer(StreamBulkProcessor.COUNTER_STREAM_NAME, System.currentTimeMillis() + this.counterThresholdMs);
        }

        public void processTimer(ComputationContext computationContext, String str, long j) {
            KeyValueStore keyValueStore = ((KeyValueService) Framework.getService(KeyValueService.class)).getKeyValueStore("bulk");
            BulkUpdate bulkUpdate = new BulkUpdate();
            this.counters.forEach((str2, l) -> {
                Long l = keyValueStore.getLong(str2 + ":processedDocs");
                if (l == null) {
                    l = 0L;
                }
                Long valueOf = Long.valueOf(l.longValue() + l.longValue());
                if (valueOf.longValue() == keyValueStore.getLong(str2 + ":count").longValue()) {
                    bulkUpdate.put(str2 + ":state", BulkStatus.State.COMPLETED.toString());
                }
                bulkUpdate.put(str2 + ":processedDocs", String.valueOf(valueOf));
            });
            computationContext.produceRecord("keyValueWriter", str, ((CodecService) Framework.getService(CodecService.class)).getCodec(StreamBulkProcessor.AVRO_CODEC, BulkUpdate.class).encode(bulkUpdate));
            this.counters.clear();
            computationContext.askForCheckpoint();
            computationContext.setTimer(StreamBulkProcessor.COUNTER_STREAM_NAME, System.currentTimeMillis() + this.counterThresholdMs);
        }

        public void processRecord(ComputationContext computationContext, String str, Record record) {
            BulkCounter bulkCounter = (BulkCounter) ((CodecService) Framework.getService(CodecService.class)).getCodec(StreamBulkProcessor.AVRO_CODEC, BulkCounter.class).decode(record.getData());
            String bulkId = bulkCounter.getBulkId();
            this.counters.computeIfPresent(bulkId, (str2, l) -> {
                return Long.valueOf(l.longValue() + bulkCounter.getProcessedDocuments().longValue());
            });
            this.counters.putIfAbsent(bulkId, bulkCounter.getProcessedDocuments());
        }
    }

    /* loaded from: input_file:org/nuxeo/ecm/core/bulk/StreamBulkProcessor$KeyValueWriterComputation.class */
    public static class KeyValueWriterComputation extends AbstractComputation {
        public KeyValueWriterComputation(String str) {
            super(str, 1, 0);
        }

        public void processRecord(ComputationContext computationContext, String str, Record record) {
            KeyValueStore keyValueStore = ((KeyValueService) Framework.getService(KeyValueService.class)).getKeyValueStore("bulk");
            Map<String, String> values = ((BulkUpdate) ((CodecService) Framework.getService(CodecService.class)).getCodec(StreamBulkProcessor.AVRO_CODEC, BulkUpdate.class).decode(record.getData())).getValues();
            keyValueStore.getClass();
            values.forEach(keyValueStore::put);
            computationContext.askForCheckpoint();
        }
    }

    public Topology getTopology(Map<String, String> map) {
        int optionAsInteger = getOptionAsInteger(map, SCROLL_BATCH_SIZE_OPT, 100);
        int optionAsInteger2 = getOptionAsInteger(map, SCROLL_KEEP_ALIVE_SECONDS_OPT, 60);
        int optionAsInteger3 = getOptionAsInteger(map, BUCKET_SIZE_OPT, 50);
        int optionAsInteger4 = getOptionAsInteger(map, COUNTER_THRESHOLD_MS_OPT, DEFAULT_COUNTER_THRESHOLD_MS);
        List<String> actions = ((BulkAdminService) Framework.getService(BulkAdminService.class)).getActions();
        ArrayList arrayList = new ArrayList();
        arrayList.add("i1:documentSet");
        int i = 1;
        Iterator<String> it = actions.iterator();
        while (it.hasNext()) {
            arrayList.add(String.format("o%s:%s", Integer.valueOf(i), it.next()));
            i++;
        }
        arrayList.add(String.format("o%s:%s", Integer.valueOf(i), "keyValueWriter"));
        return Topology.builder().addComputation(() -> {
            return new BulkDocumentScrollerComputation(SCROLLER_COMPUTATION_NAME, arrayList.size(), optionAsInteger, optionAsInteger2, optionAsInteger3);
        }, arrayList).addComputation(() -> {
            return new CounterComputation(COUNTER_COMPUTATION_NAME, optionAsInteger4);
        }, Arrays.asList("i1:counter", "o1:keyValueWriter")).addComputation(() -> {
            return new KeyValueWriterComputation("keyValueWriter");
        }, Collections.singletonList("i1:keyValueWriter")).build();
    }

    protected int getOptionAsInteger(Map<String, String> map, String str, int i) {
        String str2 = map.get(str);
        return str2 == null ? i : Integer.parseInt(str2);
    }
}
