/*
 * Decompiled with CFR 0.152.
 */
package org.nuxeo.lib.stream.computation.log;

import io.dropwizard.metrics5.Counter;
import io.dropwizard.metrics5.MetricName;
import io.dropwizard.metrics5.MetricRegistry;
import io.dropwizard.metrics5.SharedMetricRegistries;
import io.dropwizard.metrics5.Timer;
import io.opencensus.common.Scope;
import io.opencensus.trace.AttributeValue;
import io.opencensus.trace.BlankSpan;
import io.opencensus.trace.Link;
import io.opencensus.trace.Span;
import io.opencensus.trace.SpanContext;
import io.opencensus.trace.Tracer;
import io.opencensus.trace.Tracing;
import io.opencensus.trace.propagation.BinaryFormat;
import io.opencensus.trace.propagation.SpanContextParseException;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import net.jodah.failsafe.Failsafe;
import net.jodah.failsafe.RetryPolicy;
import net.jodah.failsafe.SyncFailsafe;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.lib.stream.computation.Computation;
import org.nuxeo.lib.stream.computation.ComputationMetadataMapping;
import org.nuxeo.lib.stream.computation.ComputationPolicy;
import org.nuxeo.lib.stream.computation.Record;
import org.nuxeo.lib.stream.computation.Watermark;
import org.nuxeo.lib.stream.computation.internals.ComputationContextImpl;
import org.nuxeo.lib.stream.computation.internals.WatermarkMonotonicInterval;
import org.nuxeo.lib.stream.computation.log.LogStreamManager;
import org.nuxeo.lib.stream.log.LogPartition;
import org.nuxeo.lib.stream.log.LogRecord;
import org.nuxeo.lib.stream.log.LogTailer;
import org.nuxeo.lib.stream.log.Name;
import org.nuxeo.lib.stream.log.RebalanceException;
import org.nuxeo.lib.stream.log.RebalanceListener;

public class ComputationRunner
implements Runnable,
RebalanceListener {
    public static final Duration READ_TIMEOUT = Duration.ofMillis(25L);
    protected static final long STARVING_TIMEOUT_MS = 1000L;
    protected static final long INACTIVITY_BREAK_MS = 100L;
    private static final Log log = LogFactory.getLog(ComputationRunner.class);
    protected final LogStreamManager streamManager;
    protected final ComputationMetadataMapping metadata;
    protected final LogTailer<Record> tailer;
    protected final Supplier<Computation> supplier;
    protected final CountDownLatch assignmentLatch = new CountDownLatch(1);
    protected final WatermarkMonotonicInterval lowWatermark = new WatermarkMonotonicInterval();
    protected final ComputationPolicy policy;
    protected ComputationContextImpl context;
    protected volatile boolean stop;
    protected volatile boolean drain;
    protected Computation computation;
    protected long counter;
    protected long inRecords;
    protected long inCheckpointRecords;
    protected long outRecords;
    protected long lastReadTime = System.currentTimeMillis();
    protected long lastTimerExecution;
    protected String threadName;
    protected List<LogPartition> defaultAssignment;
    public static final String NUXEO_METRICS_REGISTRY_NAME = "org.nuxeo.runtime.metrics.MetricsService";
    public static final String GLOBAL_FAILURE_COUNT_REGISTRY_NAME = MetricRegistry.name((String)"nuxeo", (String[])new String[]{"streams", "failure"}).getKey();
    protected final MetricRegistry registry = SharedMetricRegistries.getOrCreate((String)"org.nuxeo.runtime.metrics.MetricsService");
    protected Counter globalFailureCount;
    protected Counter failureCount;
    protected Counter recordSkippedCount;
    protected Counter runningCount;
    protected Timer processRecordTimer;
    protected Timer processTimerTimer;
    protected static AtomicInteger skipFailures = new AtomicInteger(0);
    protected boolean recordActivity;
    protected SpanContext lastSpanContext;

    public ComputationRunner(Supplier<Computation> supplier, ComputationMetadataMapping metadata, List<LogPartition> defaultAssignment, LogStreamManager streamManager, ComputationPolicy policy) {
        this.supplier = supplier;
        this.metadata = metadata;
        this.streamManager = streamManager;
        this.policy = policy;
        if (metadata.inputStreams().isEmpty()) {
            this.tailer = null;
            this.context = new ComputationContextImpl(streamManager, metadata, policy, false);
            this.assignmentLatch.countDown();
        } else if (streamManager.supportSubscribe()) {
            this.tailer = streamManager.subscribe(Name.ofUrn(metadata.name()), metadata.inputStreams().stream().map(Name::ofUrn).collect(Collectors.toList()), this);
            this.context = new ComputationContextImpl(streamManager, metadata, policy, true);
        } else {
            this.context = new ComputationContextImpl(streamManager, metadata, policy, defaultAssignment.isEmpty());
            this.tailer = streamManager.createTailer(Name.ofUrn(metadata.name()), defaultAssignment);
            this.assignmentLatch.countDown();
        }
        this.defaultAssignment = defaultAssignment;
    }

    public void stop() {
        log.debug((Object)(this.metadata.name() + ": Receives Stop signal"));
        this.stop = true;
        if (this.computation != null) {
            this.computation.signalStop();
        }
    }

    public void drain() {
        log.debug((Object)(this.metadata.name() + ": Receives Drain signal"));
        this.drain = true;
    }

    public boolean waitForAssignments(Duration timeout) throws InterruptedException {
        if (!this.assignmentLatch.await(timeout.toMillis(), TimeUnit.MILLISECONDS)) {
            log.warn((Object)(this.metadata.name() + ": Timeout waiting for assignment"));
            return false;
        }
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Loose catch block
     */
    @Override
    public void run() {
        block37: {
            this.threadName = Thread.currentThread().getName();
            boolean interrupted = false;
            boolean normalTermination = false;
            this.computation = this.supplier.get();
            log.debug((Object)(this.metadata.name() + ": Init"));
            this.registerMetrics();
            this.computation.init(this.context);
            log.debug((Object)(this.metadata.name() + ": Start"));
            this.processLoop();
            normalTermination = true;
            try {
                this.computation.destroy();
                this.closeTailer();
            }
            finally {
                if (interrupted) {
                    Thread.currentThread().interrupt();
                }
            }
            if (normalTermination || interrupted) {
                log.debug((Object)(this.metadata.name() + ": Terminated"));
            } else {
                log.error((Object)String.format("Terminate computation: %s due to previous failure", this.metadata.name()));
                this.globalFailureCount.inc();
                this.failureCount.inc();
            }
            break block37;
            catch (InterruptedException e) {
                block33: {
                    interrupted = true;
                    String msg = this.metadata.name() + ": Interrupted";
                    if (log.isTraceEnabled()) {
                        log.debug((Object)msg, (Throwable)e);
                        break block33;
                    }
                    log.debug((Object)msg);
                }
                try {
                    this.computation.destroy();
                    this.closeTailer();
                }
                finally {
                    if (interrupted) {
                        Thread.currentThread().interrupt();
                    }
                }
                if (normalTermination || interrupted) {
                    log.debug((Object)(this.metadata.name() + ": Terminated"));
                } else {
                    log.error((Object)String.format("Terminate computation: %s due to previous failure", this.metadata.name()));
                    this.globalFailureCount.inc();
                    this.failureCount.inc();
                }
            }
            catch (Exception e2) {
                if (!Thread.currentThread().isInterrupted()) {
                    log.error((Object)(this.metadata.name() + ": Exception in processLoop: " + e2.getMessage()), (Throwable)e2);
                    throw e2;
                }
                log.info((Object)(this.metadata.name() + ": Interrupted"), (Throwable)e2);
                {
                    catch (Throwable throwable) {
                        try {
                            this.computation.destroy();
                            this.closeTailer();
                        }
                        finally {
                            if (interrupted) {
                                Thread.currentThread().interrupt();
                            }
                        }
                        if (normalTermination || interrupted) {
                            log.debug((Object)(this.metadata.name() + ": Terminated"));
                        } else {
                            log.error((Object)String.format("Terminate computation: %s due to previous failure", this.metadata.name()));
                            this.globalFailureCount.inc();
                            this.failureCount.inc();
                        }
                        throw throwable;
                    }
                }
                try {
                    this.computation.destroy();
                    this.closeTailer();
                }
                finally {
                    if (interrupted) {
                        Thread.currentThread().interrupt();
                    }
                }
                if (normalTermination || interrupted) {
                    log.debug((Object)(this.metadata.name() + ": Terminated"));
                } else {
                    log.error((Object)String.format("Terminate computation: %s due to previous failure", this.metadata.name()));
                    this.globalFailureCount.inc();
                    this.failureCount.inc();
                }
            }
        }
    }

    protected void registerMetrics() {
        this.globalFailureCount = this.registry.counter(GLOBAL_FAILURE_COUNT_REGISTRY_NAME);
        String name = Name.ofUrn(this.metadata.name()).getId();
        this.runningCount = this.registry.counter(MetricName.build((String[])new String[]{"nuxeo.streams.computation.running"}).tagged(new String[]{"computation", name}));
        this.failureCount = this.registry.counter(MetricName.build((String[])new String[]{"nuxeo.streams.computation.failure"}).tagged(new String[]{"computation", name}));
        this.recordSkippedCount = this.registry.counter(MetricName.build((String[])new String[]{"nuxeo.streams.computation.skippedRecord"}).tagged(new String[]{"computation", name}));
        this.processRecordTimer = this.registry.timer(MetricName.build((String[])new String[]{"nuxeo.streams.computation.processRecord"}).tagged(new String[]{"computation", name}));
        this.processTimerTimer = this.registry.timer(MetricName.build((String[])new String[]{"nuxeo.streams.computation.processTimer"}).tagged(new String[]{"computation", name}));
    }

    protected void closeTailer() {
        if (this.tailer != null && !this.tailer.closed()) {
            this.tailer.close();
        }
    }

    protected void processLoop() throws InterruptedException {
        while (this.continueLoop()) {
            boolean timerActivity = this.processTimer();
            this.recordActivity = this.processRecord();
            ++this.counter;
            if (timerActivity || this.recordActivity) continue;
            Thread.sleep(100L);
        }
    }

    protected boolean continueLoop() {
        if (this.stop || Thread.currentThread().isInterrupted()) {
            return false;
        }
        if (this.drain) {
            long now = System.currentTimeMillis();
            if (this.metadata.inputStreams().isEmpty()) {
                if (this.lastTimerExecution > 0L && now - this.lastTimerExecution > 1000L) {
                    log.info((Object)(this.metadata.name() + ": End of source drain, last timer 1000 ms ago"));
                    return false;
                }
            } else if (!this.recordActivity && now - this.lastReadTime > 1000L) {
                log.info((Object)(this.metadata.name() + ": End of drain no more input after " + (now - this.lastReadTime) + " ms, " + this.inRecords + " records read, " + this.counter + " reads attempt"));
                return false;
            }
        }
        return true;
    }

    protected boolean processTimer() {
        Map<String, Long> timers = this.context.getTimers();
        if (timers.isEmpty()) {
            return false;
        }
        if (this.tailer != null && this.tailer.assignments().isEmpty()) {
            return false;
        }
        long now = System.currentTimeMillis();
        LinkedHashMap sortedTimer = timers.entrySet().stream().filter(entry -> (Long)entry.getValue() <= now).sorted(Map.Entry.comparingByValue()).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (e1, e2) -> e1, LinkedHashMap::new));
        if (sortedTimer.isEmpty()) {
            return false;
        }
        return this.processTimerWithTracing(now, sortedTimer);
    }

    protected boolean processTimerWithTracing(long now, LinkedHashMap<String, Long> sortedTimer) {
        BlankSpan span;
        Tracer tracer = Tracing.getTracer();
        if (this.lastSpanContext != null) {
            span = tracer.spanBuilderWithRemoteParent("comp/" + this.computation.metadata().name() + "/timer", this.lastSpanContext).startSpan();
            span.addLink(Link.fromSpanContext((SpanContext)this.lastSpanContext, (Link.Type)Link.Type.PARENT_LINKED_SPAN));
            HashMap<String, AttributeValue> map = new HashMap<String, AttributeValue>();
            map.put("comp.name", AttributeValue.stringAttributeValue((String)this.computation.metadata().name()));
            map.put("comp.thread", AttributeValue.stringAttributeValue((String)Thread.currentThread().getName()));
            map.put("record.last_offset", AttributeValue.stringAttributeValue((String)this.context.getLastOffset().toString()));
            span.putAttributes(map);
            this.lastSpanContext = null;
        } else {
            span = BlankSpan.INSTANCE;
        }
        try {
            boolean bl;
            block16: {
                Scope scope;
                block14: {
                    boolean bl2;
                    block15: {
                        scope = Tracing.getTracer().withSpan((Span)span);
                        try {
                            boolean[] timerUpdate = new boolean[]{false};
                            sortedTimer.forEach((key, value) -> {
                                this.context.removeTimer((String)key);
                                this.processTimerWithRetry((String)key, (Long)value);
                                timerUpdate[0] = true;
                            });
                            if (!timerUpdate[0]) break block14;
                            this.checkSourceLowWatermark();
                            this.lastTimerExecution = now;
                            this.setThreadName("timer");
                            this.checkpointIfNecessary();
                            if (this.context.requireTerminate()) {
                                this.stop = true;
                            }
                            bl2 = true;
                            if (scope == null) break block15;
                        }
                        catch (Throwable throwable) {
                            if (scope != null) {
                                try {
                                    scope.close();
                                }
                                catch (Throwable throwable2) {
                                    throwable.addSuppressed(throwable2);
                                }
                            }
                            throw throwable;
                        }
                        scope.close();
                    }
                    return bl2;
                }
                bl = false;
                if (scope == null) break block16;
                scope.close();
            }
            return bl;
        }
        finally {
            span.end();
        }
    }

    protected void processTimerWithRetry(String key, Long value) {
        try (Timer.Context ignored = this.processTimerTimer.time();){
            ((SyncFailsafe)((SyncFailsafe)((SyncFailsafe)Failsafe.with((RetryPolicy)this.policy.getRetryPolicy()).onRetry(failure -> this.computation.processRetry(this.context, (Throwable)failure))).onFailure(failure -> this.computation.processFailure(this.context, (Throwable)failure))).withFallback(() -> this.processFallback(this.context))).run(() -> this.computation.processTimer(this.context, key, value));
        }
    }

    protected boolean processRecord() throws InterruptedException {
        if (this.context.requireTerminate()) {
            this.stop = true;
            return true;
        }
        if (this.tailer == null) {
            return false;
        }
        Duration timeoutRead = this.getTimeoutDuration();
        LogRecord<Record> logRecord = null;
        try {
            logRecord = this.tailer.read(timeoutRead);
        }
        catch (RebalanceException rebalanceException) {
            // empty catch block
        }
        if (logRecord != null) {
            Record record = logRecord.message();
            Name stream = logRecord.offset().partition().name();
            Record filteredRecord = this.streamManager.getFilter(stream).afterRead(record, logRecord.offset());
            if (filteredRecord == null) {
                if (log.isDebugEnabled()) {
                    log.debug((Object)("Filtering skip record: " + record));
                }
                return false;
            }
            if (filteredRecord != record) {
                logRecord = new LogRecord<Record>(filteredRecord, logRecord.offset());
                record = filteredRecord;
            }
            this.lastReadTime = System.currentTimeMillis();
            ++this.inRecords;
            this.lowWatermark.mark(record.getWatermark());
            this.context.setLastOffset(logRecord.offset());
            String from = this.metadata.reverseMap(stream.getUrn());
            this.processRecordWithTracing(from, record);
            return true;
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void processRecordWithTracing(String from, Record record) {
        Span span = this.getSpanFromRecord(record);
        try (Scope scope = Tracing.getTracer().withSpan(span);){
            this.processRecordWithRetry(from, record);
            this.checkRecordFlags(record);
            this.checkSourceLowWatermark();
            this.setThreadName("record");
            this.checkpointIfNecessary();
        }
        finally {
            span.end();
        }
    }

    protected Span getSpanFromRecord(Record record) {
        byte[] traceContext = record.getTraceContext();
        if (traceContext == null || traceContext.length == 0) {
            return BlankSpan.INSTANCE;
        }
        Tracer tracer = Tracing.getTracer();
        BinaryFormat binaryFormat = Tracing.getPropagationComponent().getBinaryFormat();
        try {
            this.lastSpanContext = binaryFormat.fromByteArray(traceContext);
            Span span = tracer.spanBuilderWithRemoteParent("comp/" + this.computation.metadata().name() + "/record", this.lastSpanContext).startSpan();
            span.addLink(Link.fromSpanContext((SpanContext)this.lastSpanContext, (Link.Type)Link.Type.PARENT_LINKED_SPAN));
            HashMap<String, AttributeValue> map = new HashMap<String, AttributeValue>();
            map.put("comp.name", AttributeValue.stringAttributeValue((String)this.computation.metadata().name()));
            map.put("comp.thread", AttributeValue.stringAttributeValue((String)Thread.currentThread().getName()));
            map.put("record.key", AttributeValue.stringAttributeValue((String)record.getKey()));
            map.put("record.offset", AttributeValue.stringAttributeValue((String)this.context.getLastOffset().toString()));
            map.put("record.watermark", AttributeValue.stringAttributeValue((String)Watermark.ofValue(record.getWatermark()).toString()));
            map.put("record.submit_thread", AttributeValue.stringAttributeValue((String)record.getAppenderThread()));
            map.put("record.data.length", AttributeValue.longAttributeValue((long)record.getData().length));
            span.putAttributes(map);
            return span;
        }
        catch (SpanContextParseException e) {
            log.warn((Object)("Invalid span context in record: " + record.getKey() + " length: " + traceContext.length));
            return BlankSpan.INSTANCE;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void processRecordWithRetry(String from, Record record) {
        this.runningCount.inc();
        try (Timer.Context ignored = this.processRecordTimer.time();){
            ((SyncFailsafe)((SyncFailsafe)((SyncFailsafe)Failsafe.with((RetryPolicy)this.policy.getRetryPolicy()).onRetry(failure -> this.computation.processRetry(this.context, (Throwable)failure))).onFailure(failure -> this.computation.processFailure(this.context, (Throwable)failure))).withFallback(() -> this.processFallback(this.context))).run(() -> this.computation.processRecord(this.context, from, record));
        }
        finally {
            this.runningCount.dec();
        }
    }

    protected void processFallback(ComputationContextImpl context) {
        if (this.policy.continueOnFailure()) {
            log.error((Object)String.format("Skip record after failure: %s", context.getLastOffset()));
            context.askForCheckpoint();
            this.recordSkippedCount.inc();
        } else if (this.skipFailureForRecovery()) {
            log.error((Object)String.format("Skip record after failure instead of terminating because of recovery mode: %s", context.getLastOffset()));
            context.askForCheckpoint();
            this.recordSkippedCount.inc();
        } else {
            log.error((Object)String.format("Terminate computation: %s due to previous failure", this.metadata.name()));
            context.cancelAskForCheckpoint();
            context.askForTermination();
            this.globalFailureCount.inc();
            this.failureCount.inc();
        }
    }

    protected boolean skipFailureForRecovery() {
        return this.policy.getSkipFirstFailures() > 0 && skipFailures.incrementAndGet() <= this.policy.getSkipFirstFailures();
    }

    protected Duration getTimeoutDuration() {
        long adaptedReadTimeout = Math.max(0L, System.currentTimeMillis() - this.lastReadTime);
        return Duration.ofMillis(Math.min(READ_TIMEOUT.toMillis(), adaptedReadTimeout));
    }

    protected void checkSourceLowWatermark() {
        long watermark = this.context.getSourceLowWatermark();
        if (watermark > 0L) {
            this.lowWatermark.mark(Watermark.ofValue(watermark));
            this.context.setSourceLowWatermark(0L);
        }
    }

    protected void checkRecordFlags(Record record) {
        if (record.getFlags().contains((Object)Record.Flag.POISON_PILL)) {
            log.info((Object)(this.metadata.name() + ": Receive POISON PILL"));
            this.context.askForCheckpoint();
            this.stop = true;
        } else if (record.getFlags().contains((Object)Record.Flag.COMMIT)) {
            this.context.askForCheckpoint();
        }
    }

    protected void checkpointIfNecessary() {
        if (this.context.requireCheckpoint()) {
            boolean completed = false;
            try {
                this.checkpoint();
                completed = true;
            }
            finally {
                if (!completed) {
                    log.error((Object)(this.metadata.name() + ": CHECKPOINT FAILURE: Resume may create duplicates."));
                }
            }
        }
    }

    protected void checkpoint() {
        this.sendRecords();
        this.saveTimers();
        this.saveState();
        this.saveOffsets();
        this.lowWatermark.checkpoint();
        this.context.removeCheckpointFlag();
        this.inCheckpointRecords = this.inRecords;
        this.setThreadName("checkpoint");
        log.debug((Object)(this.metadata.name() + ": checkpoint done"));
    }

    protected void saveTimers() {
    }

    protected void saveState() {
    }

    protected void saveOffsets() {
        if (this.tailer != null) {
            this.tailer.commit();
            Span span = Tracing.getTracer().getCurrentSpan();
            span.addAnnotation("Checkpoint positions at " + Instant.now());
        }
    }

    protected void sendRecords() {
        boolean firstRecord = true;
        for (String stream : this.metadata.outputStreams()) {
            for (Record record : this.context.getRecords(stream)) {
                if (record.getWatermark() == 0L) {
                    record.setWatermark(this.lowWatermark.getLow().getValue());
                }
                if (firstRecord) {
                    Span span = Tracing.getTracer().getCurrentSpan();
                    span.addAnnotation("Sending records at " + Instant.now());
                    firstRecord = false;
                }
                this.streamManager.append(stream, record);
                ++this.outRecords;
            }
            this.context.getRecords(stream).clear();
        }
    }

    public Watermark getLowWatermark() {
        return this.lowWatermark.getLow();
    }

    protected void setThreadName(String message) {
        String name = this.threadName + ",in:" + this.inRecords + ",inCheckpoint:" + this.inCheckpointRecords + ",out:" + this.outRecords + ",lastRead:" + this.lastReadTime + ",lastTimer:" + this.lastTimerExecution + ",wm:" + this.lowWatermark.getLow().getValue() + ",loop:" + this.counter;
        if (message != null) {
            name = name + "," + message;
        }
        Thread.currentThread().setName(name);
    }

    @Override
    public void onPartitionsRevoked(Collection<LogPartition> partitions) {
        this.setThreadName("rebalance revoked");
    }

    @Override
    public void onPartitionsAssigned(Collection<LogPartition> partitions) {
        this.lastReadTime = System.currentTimeMillis();
        boolean isSpare = partitions.isEmpty();
        this.setThreadName("rebalance assigned");
        this.context = new ComputationContextImpl(this.streamManager, this.metadata, this.policy, partitions.isEmpty());
        log.debug((Object)(this.metadata.name() + ": Init isSpare=" + isSpare));
        this.computation.init(this.context);
        this.lastReadTime = System.currentTimeMillis();
        this.lastTimerExecution = 0L;
        this.assignmentLatch.countDown();
    }
}

