package org.nuxeo.lib.stream.computation.log;

import io.dropwizard.metrics5.Counter;
import io.dropwizard.metrics5.MetricName;
import io.dropwizard.metrics5.MetricRegistry;
import io.dropwizard.metrics5.SharedMetricRegistries;
import io.dropwizard.metrics5.Timer;
import java.time.Duration;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import net.jodah.failsafe.Failsafe;
import net.jodah.failsafe.SyncFailsafe;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.lib.stream.computation.Computation;
import org.nuxeo.lib.stream.computation.ComputationMetadataMapping;
import org.nuxeo.lib.stream.computation.ComputationPolicy;
import org.nuxeo.lib.stream.computation.Record;
import org.nuxeo.lib.stream.computation.Watermark;
import org.nuxeo.lib.stream.computation.internals.ComputationContextImpl;
import org.nuxeo.lib.stream.computation.internals.WatermarkMonotonicInterval;
import org.nuxeo.lib.stream.log.LogPartition;
import org.nuxeo.lib.stream.log.LogRecord;
import org.nuxeo.lib.stream.log.LogTailer;
import org.nuxeo.lib.stream.log.Name;
import org.nuxeo.lib.stream.log.RebalanceException;
import org.nuxeo.lib.stream.log.RebalanceListener;

/* loaded from: input_file:org/nuxeo/lib/stream/computation/log/ComputationRunner.class */
public class ComputationRunner implements Runnable, RebalanceListener {
    protected static final long STARVING_TIMEOUT_MS = 1000;
    protected static final long INACTIVITY_BREAK_MS = 100;
    protected final LogStreamManager streamManager;
    protected final ComputationMetadataMapping metadata;
    protected final LogTailer<Record> tailer;
    protected final Supplier<Computation> supplier;
    protected final ComputationPolicy policy;
    protected ComputationContextImpl context;
    protected volatile boolean stop;
    protected volatile boolean drain;
    protected Computation computation;
    protected long counter;
    protected long inRecords;
    protected long inCheckpointRecords;
    protected long outRecords;
    protected long lastTimerExecution;
    protected String threadName;
    protected List<LogPartition> defaultAssignment;
    public static final String NUXEO_METRICS_REGISTRY_NAME = "org.nuxeo.runtime.metrics.MetricsService";
    protected Counter globalFailureCount;
    protected Counter failureCount;
    protected Counter recordSkippedCount;
    protected Counter runningCount;
    protected Timer processRecordTimer;
    protected Timer processTimerTimer;
    protected boolean recordActivity;
    public static final Duration READ_TIMEOUT = Duration.ofMillis(25);
    private static final Log log = LogFactory.getLog(ComputationRunner.class);
    public static final String GLOBAL_FAILURE_COUNT_REGISTRY_NAME = MetricRegistry.name("nuxeo", new String[]{"streams", "failure"}).getKey();
    protected static AtomicInteger skipFailures = new AtomicInteger(0);
    protected final CountDownLatch assignmentLatch = new CountDownLatch(1);
    protected final WatermarkMonotonicInterval lowWatermark = new WatermarkMonotonicInterval();
    protected long lastReadTime = System.currentTimeMillis();
    protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate(NUXEO_METRICS_REGISTRY_NAME);

    public ComputationRunner(Supplier<Computation> supplier, ComputationMetadataMapping computationMetadataMapping, List<LogPartition> list, LogStreamManager logStreamManager, ComputationPolicy computationPolicy) {
        this.supplier = supplier;
        this.metadata = computationMetadataMapping;
        this.streamManager = logStreamManager;
        this.policy = computationPolicy;
        if (computationMetadataMapping.inputStreams().isEmpty()) {
            this.tailer = null;
            this.context = new ComputationContextImpl(logStreamManager, computationMetadataMapping, computationPolicy, false);
            this.assignmentLatch.countDown();
        } else if (logStreamManager.supportSubscribe()) {
            this.tailer = logStreamManager.subscribe(Name.ofUrn(computationMetadataMapping.name()), (Collection) computationMetadataMapping.inputStreams().stream().map(Name::ofUrn).collect(Collectors.toList()), this);
            this.context = new ComputationContextImpl(logStreamManager, computationMetadataMapping, computationPolicy, true);
        } else {
            this.context = new ComputationContextImpl(logStreamManager, computationMetadataMapping, computationPolicy, list.isEmpty());
            this.tailer = logStreamManager.createTailer(Name.ofUrn(computationMetadataMapping.name()), list);
            this.assignmentLatch.countDown();
        }
        this.defaultAssignment = list;
    }

    public void stop() {
        log.debug(this.metadata.name() + ": Receives Stop signal");
        this.stop = true;
        if (this.computation != null) {
            this.computation.signalStop();
        }
    }

    public void drain() {
        log.debug(this.metadata.name() + ": Receives Drain signal");
        this.drain = true;
    }

    public boolean waitForAssignments(Duration duration) throws InterruptedException {
        if (this.assignmentLatch.await(duration.toMillis(), TimeUnit.MILLISECONDS)) {
            return true;
        }
        log.warn(this.metadata.name() + ": Timeout waiting for assignment");
        return false;
    }

    @Override // java.lang.Runnable
    public void run() {
        this.threadName = Thread.currentThread().getName();
        boolean z = false;
        this.computation = this.supplier.get();
        log.debug(this.metadata.name() + ": Init");
        registerMetrics();
        try {
            try {
                this.computation.init(this.context);
                log.debug(this.metadata.name() + ": Start");
                processLoop();
                z = true;
                try {
                    this.computation.destroy();
                    closeTailer();
                    if (0 != 0) {
                        Thread.currentThread().interrupt();
                    }
                    if (1 != 0 || 0 != 0) {
                        log.debug(this.metadata.name() + ": Terminated");
                        return;
                    }
                    log.error(String.format("Terminate computation: %s due to previous failure", this.metadata.name()));
                    this.globalFailureCount.inc();
                    this.failureCount.inc();
                } finally {
                }
            } catch (Throwable th) {
                try {
                    this.computation.destroy();
                    closeTailer();
                    if (0 != 0) {
                        Thread.currentThread().interrupt();
                    }
                    if (z || 0 != 0) {
                        log.debug(this.metadata.name() + ": Terminated");
                    } else {
                        log.error(String.format("Terminate computation: %s due to previous failure", this.metadata.name()));
                        this.globalFailureCount.inc();
                        this.failureCount.inc();
                    }
                    throw th;
                } finally {
                }
            }
        } catch (InterruptedException e) {
            String str = this.metadata.name() + ": Interrupted";
            if (log.isTraceEnabled()) {
                log.debug(str, e);
            } else {
                log.debug(str);
            }
            try {
                this.computation.destroy();
                closeTailer();
                if (1 != 0) {
                    Thread.currentThread().interrupt();
                }
                if (z || 1 != 0) {
                    log.debug(this.metadata.name() + ": Terminated");
                    return;
                }
                log.error(String.format("Terminate computation: %s due to previous failure", this.metadata.name()));
                this.globalFailureCount.inc();
                this.failureCount.inc();
            } catch (Throwable th2) {
                if (1 != 0) {
                    Thread.currentThread().interrupt();
                }
                throw th2;
            }
        } catch (Exception e2) {
            if (!Thread.currentThread().isInterrupted()) {
                log.error(this.metadata.name() + ": Exception in processLoop: " + e2.getMessage(), e2);
                throw e2;
            }
            log.info(this.metadata.name() + ": Interrupted", e2);
            try {
                this.computation.destroy();
                closeTailer();
                if (0 != 0) {
                    Thread.currentThread().interrupt();
                }
                if (z || 0 != 0) {
                    log.debug(this.metadata.name() + ": Terminated");
                    return;
                }
                log.error(String.format("Terminate computation: %s due to previous failure", this.metadata.name()));
                this.globalFailureCount.inc();
                this.failureCount.inc();
            } finally {
                if (0 != 0) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

    protected void registerMetrics() {
        this.globalFailureCount = this.registry.counter(GLOBAL_FAILURE_COUNT_REGISTRY_NAME);
        String id = Name.ofUrn(this.metadata.name()).getId();
        this.runningCount = this.registry.counter(MetricName.build(new String[]{"nuxeo.streams.computation.running"}).tagged(new String[]{"computation", id}));
        this.failureCount = this.registry.counter(MetricName.build(new String[]{"nuxeo.streams.computation.failure"}).tagged(new String[]{"computation", id}));
        this.recordSkippedCount = this.registry.counter(MetricName.build(new String[]{"nuxeo.streams.computation.skippedRecord"}).tagged(new String[]{"computation", id}));
        this.processRecordTimer = this.registry.timer(MetricName.build(new String[]{"nuxeo.streams.computation.processRecord"}).tagged(new String[]{"computation", id}));
        this.processTimerTimer = this.registry.timer(MetricName.build(new String[]{"nuxeo.streams.computation.processTimer"}).tagged(new String[]{"computation", id}));
    }

    protected void closeTailer() {
        if (this.tailer == null || this.tailer.closed()) {
            return;
        }
        this.tailer.close();
    }

    protected void processLoop() throws InterruptedException {
        while (continueLoop()) {
            boolean processTimer = processTimer();
            this.recordActivity = processRecord();
            this.counter++;
            if (!processTimer && !this.recordActivity) {
                Thread.sleep(INACTIVITY_BREAK_MS);
            }
        }
    }

    protected boolean continueLoop() {
        if (this.stop || Thread.currentThread().isInterrupted()) {
            return false;
        }
        if (!this.drain) {
            return true;
        }
        long currentTimeMillis = System.currentTimeMillis();
        if (this.metadata.inputStreams().isEmpty()) {
            if (this.lastTimerExecution <= 0 || currentTimeMillis - this.lastTimerExecution <= STARVING_TIMEOUT_MS) {
                return true;
            }
            log.info(this.metadata.name() + ": End of source drain, last timer 1000 ms ago");
            return false;
        }
        if (this.recordActivity || currentTimeMillis - this.lastReadTime <= STARVING_TIMEOUT_MS) {
            return true;
        }
        Log log2 = log;
        String name = this.metadata.name();
        long j = currentTimeMillis - this.lastReadTime;
        long j2 = this.inRecords;
        long j3 = this.counter;
        log2.info(name + ": End of drain no more input after " + j + " ms, " + log2 + " records read, " + j2 + " reads attempt");
        return false;
    }

    protected boolean processTimer() {
        Map<String, Long> timers = this.context.getTimers();
        if (timers.isEmpty()) {
            return false;
        }
        if (this.tailer != null && this.tailer.assignments().isEmpty()) {
            return false;
        }
        long currentTimeMillis = System.currentTimeMillis();
        boolean[] zArr = {false};
        ((LinkedHashMap) timers.entrySet().stream().filter(entry -> {
            return ((Long) entry.getValue()).longValue() <= currentTimeMillis;
        }).sorted(Map.Entry.comparingByValue()).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }, (l, l2) -> {
            return l;
        }, LinkedHashMap::new))).forEach((str, l3) -> {
            this.context.removeTimer(str);
            processTimerWithRetry(str, l3);
            zArr[0] = true;
        });
        if (!zArr[0]) {
            return false;
        }
        checkSourceLowWatermark();
        this.lastTimerExecution = currentTimeMillis;
        setThreadName("timer");
        checkpointIfNecessary();
        if (!this.context.requireTerminate()) {
            return true;
        }
        this.stop = true;
        return true;
    }

    protected void processTimerWithRetry(String str, Long l) {
        Timer.Context time = this.processTimerTimer.time();
        try {
            ((SyncFailsafe) ((SyncFailsafe) ((SyncFailsafe) Failsafe.with(this.policy.getRetryPolicy()).onRetry(th -> {
                this.computation.processRetry(this.context, th);
            })).onFailure(th2 -> {
                this.computation.processFailure(this.context, th2);
            })).withFallback(() -> {
                processFallback(this.context);
            })).run(() -> {
                this.computation.processTimer(this.context, str, l.longValue());
            });
            if (time != null) {
                time.close();
            }
        } catch (Throwable th3) {
            if (time != null) {
                try {
                    time.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    protected boolean processRecord() throws InterruptedException {
        if (this.context.requireTerminate()) {
            this.stop = true;
            return true;
        }
        if (this.tailer == null) {
            return false;
        }
        LogRecord<Record> logRecord = null;
        try {
            logRecord = this.tailer.read(getTimeoutDuration());
        } catch (RebalanceException e) {
        }
        if (logRecord == null) {
            return false;
        }
        Record message = logRecord.message();
        Name name = logRecord.offset().partition().name();
        Record afterRead = this.streamManager.getFilter(name).afterRead(message, logRecord.offset());
        if (afterRead == null) {
            if (!log.isDebugEnabled()) {
                return false;
            }
            log.debug("Filtering skip record: " + message);
            return false;
        }
        if (afterRead != message) {
            logRecord = new LogRecord<>(afterRead, logRecord.offset());
            message = afterRead;
        }
        this.lastReadTime = System.currentTimeMillis();
        this.inRecords++;
        this.lowWatermark.mark(message.getWatermark());
        this.context.setLastOffset(logRecord.offset());
        processRecordWithRetry(this.metadata.reverseMap(name.getUrn()), message);
        checkRecordFlags(message);
        checkSourceLowWatermark();
        setThreadName("record");
        checkpointIfNecessary();
        return true;
    }

    protected void processRecordWithRetry(String str, Record record) {
        this.runningCount.inc();
        try {
            Timer.Context time = this.processRecordTimer.time();
            try {
                ((SyncFailsafe) ((SyncFailsafe) ((SyncFailsafe) Failsafe.with(this.policy.getRetryPolicy()).onRetry(th -> {
                    this.computation.processRetry(this.context, th);
                })).onFailure(th2 -> {
                    this.computation.processFailure(this.context, th2);
                })).withFallback(() -> {
                    processFallback(this.context);
                })).run(() -> {
                    this.computation.processRecord(this.context, str, record);
                });
                if (time != null) {
                    time.close();
                }
            } finally {
            }
        } finally {
            this.runningCount.dec();
        }
    }

    protected void processFallback(ComputationContextImpl computationContextImpl) {
        if (this.policy.continueOnFailure()) {
            log.error(String.format("Skip record after failure: %s", computationContextImpl.getLastOffset()));
            computationContextImpl.askForCheckpoint();
            this.recordSkippedCount.inc();
        } else if (skipFailureForRecovery()) {
            log.error(String.format("Skip record after failure instead of terminating because of recovery mode: %s", computationContextImpl.getLastOffset()));
            computationContextImpl.askForCheckpoint();
            this.recordSkippedCount.inc();
        } else {
            log.error(String.format("Terminate computation: %s due to previous failure", this.metadata.name()));
            computationContextImpl.cancelAskForCheckpoint();
            computationContextImpl.askForTermination();
            this.globalFailureCount.inc();
            this.failureCount.inc();
        }
    }

    protected boolean skipFailureForRecovery() {
        return this.policy.getSkipFirstFailures() > 0 && skipFailures.incrementAndGet() <= this.policy.getSkipFirstFailures();
    }

    protected Duration getTimeoutDuration() {
        return Duration.ofMillis(Math.min(READ_TIMEOUT.toMillis(), Math.max(0L, System.currentTimeMillis() - this.lastReadTime)));
    }

    protected void checkSourceLowWatermark() {
        long sourceLowWatermark = this.context.getSourceLowWatermark();
        if (sourceLowWatermark > 0) {
            this.lowWatermark.mark(Watermark.ofValue(sourceLowWatermark));
            this.context.setSourceLowWatermark(0L);
        }
    }

    protected void checkRecordFlags(Record record) {
        if (record.getFlags().contains(Record.Flag.POISON_PILL)) {
            log.info(this.metadata.name() + ": Receive POISON PILL");
            this.context.askForCheckpoint();
            this.stop = true;
        } else if (record.getFlags().contains(Record.Flag.COMMIT)) {
            this.context.askForCheckpoint();
        }
    }

    protected void checkpointIfNecessary() {
        if (this.context.requireCheckpoint()) {
            boolean z = false;
            try {
                checkpoint();
                z = true;
                if (1 == 0) {
                    log.error(this.metadata.name() + ": CHECKPOINT FAILURE: Resume may create duplicates.");
                }
            } catch (Throwable th) {
                if (!z) {
                    log.error(this.metadata.name() + ": CHECKPOINT FAILURE: Resume may create duplicates.");
                }
                throw th;
            }
        }
    }

    protected void checkpoint() {
        sendRecords();
        saveTimers();
        saveState();
        saveOffsets();
        this.lowWatermark.checkpoint();
        this.context.removeCheckpointFlag();
        this.inCheckpointRecords = this.inRecords;
        setThreadName("checkpoint");
        log.debug(this.metadata.name() + ": checkpoint done");
    }

    protected void saveTimers() {
    }

    protected void saveState() {
    }

    protected void saveOffsets() {
        if (this.tailer != null) {
            this.tailer.commit();
        }
    }

    protected void sendRecords() {
        for (String str : this.metadata.outputStreams()) {
            for (Record record : this.context.getRecords(str)) {
                if (record.getWatermark() == 0) {
                    record.setWatermark(this.lowWatermark.getLow().getValue());
                }
                this.streamManager.append(str, record);
                this.outRecords++;
            }
            this.context.getRecords(str).clear();
        }
    }

    public Watermark getLowWatermark() {
        return this.lowWatermark.getLow();
    }

    protected void setThreadName(String str) {
        String str2 = this.threadName;
        long j = this.inRecords;
        long j2 = this.inCheckpointRecords;
        long j3 = this.outRecords;
        long j4 = this.lastReadTime;
        long j5 = this.lastTimerExecution;
        this.lowWatermark.getLow().getValue();
        long j6 = this.counter;
        String str3 = str2 + ",in:" + j + ",inCheckpoint:" + str2 + ",out:" + j2 + ",lastRead:" + str2 + ",lastTimer:" + j3 + ",wm:" + str2 + ",loop:" + j4;
        if (str != null) {
            str3 = str3 + "," + str;
        }
        Thread.currentThread().setName(str3);
    }

    @Override // org.nuxeo.lib.stream.log.RebalanceListener
    public void onPartitionsRevoked(Collection<LogPartition> collection) {
        setThreadName("rebalance revoked");
    }

    @Override // org.nuxeo.lib.stream.log.RebalanceListener
    public void onPartitionsAssigned(Collection<LogPartition> collection) {
        this.lastReadTime = System.currentTimeMillis();
        boolean isEmpty = collection.isEmpty();
        setThreadName("rebalance assigned");
        this.context = new ComputationContextImpl(this.streamManager, this.metadata, this.policy, collection.isEmpty());
        log.debug(this.metadata.name() + ": Init isSpare=" + isEmpty);
        this.computation.init(this.context);
        this.lastReadTime = System.currentTimeMillis();
        this.lastTimerExecution = 0L;
        this.assignmentLatch.countDown();
    }
}
