/*
 * Decompiled with CFR 0.152.
 */
package ddtrot.dd.trace.common.metrics;

import ddtrot.dd.trace.common.metrics.AggregateMetric;
import ddtrot.dd.trace.common.metrics.Batch;
import ddtrot.dd.trace.common.metrics.InboxItem;
import ddtrot.dd.trace.common.metrics.MetricKey;
import ddtrot.dd.trace.common.metrics.MetricWriter;
import ddtrot.dd.trace.common.metrics.SignalItem;
import ddtrot.dd.trace.core.util.LRUCache;
import ddtrot.org.jctools.maps.NonBlockingHashMap;
import ddtrot.org.jctools.queues.MessagePassingQueue;
import ddtrot.org.jctools.queues.MpscCompoundQueue;
import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class Aggregator
implements Runnable {
    private static final long DEFAULT_SLEEP_MILLIS = 10L;
    private static final Logger log = LoggerFactory.getLogger(Aggregator.class);
    private final Queue<Batch> batchPool;
    private final MpscCompoundQueue<InboxItem> inbox;
    private final LRUCache<MetricKey, AggregateMetric> aggregates;
    private final NonBlockingHashMap<MetricKey, Batch> pending;
    private final Set<MetricKey> commonKeys;
    private final MetricWriter writer;
    private final long reportingIntervalNanos;
    private final long sleepMillis;
    private boolean dirty;

    Aggregator(MetricWriter writer, Queue<Batch> batchPool, MpscCompoundQueue<InboxItem> inbox, NonBlockingHashMap<MetricKey, Batch> pending, Set<MetricKey> commonKeys, int maxAggregates, long reportingInterval, TimeUnit reportingIntervalTimeUnit) {
        this(writer, batchPool, inbox, pending, commonKeys, maxAggregates, reportingInterval, reportingIntervalTimeUnit, 10L);
    }

    Aggregator(MetricWriter writer, Queue<Batch> batchPool, MpscCompoundQueue<InboxItem> inbox, NonBlockingHashMap<MetricKey, Batch> pending, Set<MetricKey> commonKeys, int maxAggregates, long reportingInterval, TimeUnit reportingIntervalTimeUnit, long sleepMillis) {
        this.writer = writer;
        this.batchPool = batchPool;
        this.inbox = inbox;
        this.commonKeys = commonKeys;
        this.aggregates = new LRUCache<MetricKey, AggregateMetric>(new CommonKeyCleaner(commonKeys), maxAggregates * 4 / 3, 0.75f, maxAggregates);
        this.pending = pending;
        this.reportingIntervalNanos = reportingIntervalTimeUnit.toNanos(reportingInterval);
        this.sleepMillis = sleepMillis;
    }

    public void clearAggregates() {
        this.aggregates.clear();
    }

    @Override
    public void run() {
        Thread currentThread = Thread.currentThread();
        Drainer drainer = new Drainer();
        while (!currentThread.isInterrupted() && !drainer.stopped) {
            try {
                if (!this.inbox.isEmpty()) {
                    this.inbox.drain(drainer);
                    continue;
                }
                Thread.sleep(this.sleepMillis);
            }
            catch (InterruptedException e) {
                currentThread.interrupt();
            }
            catch (Throwable error) {
                log.debug("error aggregating metrics", error);
            }
        }
        log.debug("metrics aggregator exited");
    }

    private void report(long when, SignalItem signal) {
        boolean skipped = true;
        if (this.dirty) {
            try {
                this.expungeStaleAggregates();
                if (!this.aggregates.isEmpty()) {
                    skipped = false;
                    this.writer.startBucket(this.aggregates.size(), when, this.reportingIntervalNanos);
                    for (Map.Entry aggregate : this.aggregates.entrySet()) {
                        this.writer.add((MetricKey)aggregate.getKey(), (AggregateMetric)aggregate.getValue());
                        ((AggregateMetric)aggregate.getValue()).clear();
                    }
                    this.writer.finishBucket();
                }
            }
            catch (Throwable error) {
                this.writer.reset();
                log.debug("Error publishing metrics. Dropping payload", error);
            }
            this.dirty = false;
        }
        signal.complete();
        if (skipped) {
            log.debug("skipped metrics reporting because no points have changed");
        }
    }

    private void expungeStaleAggregates() {
        Iterator it = this.aggregates.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry pair = it.next();
            AggregateMetric metric = (AggregateMetric)pair.getValue();
            if (metric.getHitCount() != 0) continue;
            it.remove();
            this.commonKeys.remove(pair.getKey());
        }
    }

    private long wallClockTime() {
        return TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis());
    }

    private static final class CommonKeyCleaner
    implements LRUCache.ExpiryListener<MetricKey, AggregateMetric> {
        private final Set<MetricKey> commonKeys;

        private CommonKeyCleaner(Set<MetricKey> commonKeys) {
            this.commonKeys = commonKeys;
        }

        @Override
        public void accept(Map.Entry<MetricKey, AggregateMetric> expired) {
            this.commonKeys.remove(expired.getKey());
        }
    }

    private final class Drainer
    implements MessagePassingQueue.Consumer<InboxItem> {
        boolean stopped = false;

        private Drainer() {
        }

        @Override
        public void accept(InboxItem item) {
            if (item instanceof SignalItem) {
                SignalItem signal = (SignalItem)item;
                if (!this.stopped) {
                    Aggregator.this.report(Aggregator.this.wallClockTime(), signal);
                    this.stopped = item instanceof SignalItem.StopSignal;
                    if (this.stopped) {
                        signal.complete();
                    }
                } else {
                    signal.ignore();
                }
            } else if (item instanceof Batch && !this.stopped) {
                Batch batch = (Batch)item;
                MetricKey key = batch.getKey();
                Aggregator.this.pending.remove(key, batch);
                AggregateMetric aggregate = Aggregator.this.aggregates.computeIfAbsent(key, k -> new AggregateMetric());
                batch.contributeTo(aggregate);
                Aggregator.this.dirty = true;
                Aggregator.this.batchPool.offer(batch);
            }
        }
    }
}

