/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.spectator.atlas;

import com.netflix.spectator.api.AbstractRegistry;
import com.netflix.spectator.api.Clock;
import com.netflix.spectator.api.Counter;
import com.netflix.spectator.api.DistributionSummary;
import com.netflix.spectator.api.Gauge;
import com.netflix.spectator.api.Id;
import com.netflix.spectator.api.Measurement;
import com.netflix.spectator.api.Meter;
import com.netflix.spectator.api.Registry;
import com.netflix.spectator.api.RegistryConfig;
import com.netflix.spectator.api.Tag;
import com.netflix.spectator.api.Timer;
import com.netflix.spectator.atlas.AtlasConfig;
import com.netflix.spectator.atlas.AtlasCounter;
import com.netflix.spectator.atlas.AtlasDistributionSummary;
import com.netflix.spectator.atlas.AtlasGauge;
import com.netflix.spectator.atlas.AtlasMaxGauge;
import com.netflix.spectator.atlas.AtlasMeter;
import com.netflix.spectator.atlas.AtlasTimer;
import com.netflix.spectator.atlas.GzipLevelOutputStream;
import com.netflix.spectator.atlas.MeasurementConsumer;
import com.netflix.spectator.atlas.OverridableClock;
import com.netflix.spectator.atlas.RollupPolicy;
import com.netflix.spectator.atlas.StepClock;
import com.netflix.spectator.atlas.StreamHelper;
import com.netflix.spectator.atlas.SubscriptionManager;
import com.netflix.spectator.atlas.ValidationHelper;
import com.netflix.spectator.atlas.impl.Consolidator;
import com.netflix.spectator.atlas.impl.EvalPayload;
import com.netflix.spectator.atlas.impl.Evaluator;
import com.netflix.spectator.atlas.impl.MeasurementSerializer;
import com.netflix.spectator.atlas.impl.PublishPayload;
import com.netflix.spectator.atlas.shaded.spectator-atlas.json.core.JsonFactory;
import com.netflix.spectator.atlas.shaded.spectator-atlas.json.databind.ObjectMapper;
import com.netflix.spectator.atlas.shaded.spectator-atlas.json.databind.module.SimpleModule;
import com.netflix.spectator.atlas.shaded.spectator-atlas.json.dataformat.smile.SmileFactory;
import com.netflix.spectator.impl.AsciiSet;
import com.netflix.spectator.impl.Scheduler;
import com.netflix.spectator.ipc.http.HttpClient;
import com.netflix.spectator.ipc.http.HttpResponse;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.URI;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Stream;
import javax.inject.Inject;
import javax.inject.Singleton;

@Singleton
public final class AtlasRegistry
extends AbstractRegistry
implements AutoCloseable {
    private static final String CLOCK_SKEW_TIMER = "spectator.atlas.clockSkew";
    private static final String PUBLISH_TASK_TIMER = "spectator.atlas.publishTaskTime";
    private final Clock stepClock;
    private final AtlasConfig config;
    private final Duration step;
    private final long stepMillis;
    private final long meterTTL;
    private final URI uri;
    private final Duration lwcStep;
    private final long lwcStepMillis;
    private final Duration configRefreshFrequency;
    private final URI evalUri;
    private final int connectTimeout;
    private final int readTimeout;
    private final int batchSize;
    private final int numThreads;
    private final Map<String, String> commonTags;
    private final Function<String, String> fixTagString;
    private final ObjectMapper jsonMapper;
    private final ObjectMapper smileMapper;
    private final Registry debugRegistry;
    private final ValidationHelper validationHelper;
    private final RollupPolicy rollupPolicy;
    private final HttpClient client;
    private Scheduler scheduler;
    private ExecutorService senderPool;
    private final SubscriptionManager subManager;
    private final Evaluator evaluator;
    private long lastPollTimestamp = -1L;
    private long lastFlushTimestamp = -1L;
    private final Map<Id, Consolidator> atlasMeasurements = new LinkedHashMap<Id, Consolidator>();
    private final StreamHelper streamHelper = new StreamHelper();

    @Inject
    public AtlasRegistry(Clock clock, AtlasConfig config) {
        this(clock, config, null);
    }

    AtlasRegistry(Clock clock, AtlasConfig config, HttpClient client) {
        super((Clock)new OverridableClock(clock), (RegistryConfig)config);
        this.config = config;
        this.stepClock = new StepClock(clock, config.lwcStep().toMillis());
        this.step = config.step();
        this.stepMillis = this.step.toMillis();
        this.meterTTL = config.meterTTL().toMillis();
        this.uri = URI.create(config.uri());
        this.lwcStep = config.lwcStep();
        this.lwcStepMillis = this.lwcStep.toMillis();
        if (this.lwcStepMillis > this.stepMillis) {
            throw new IllegalArgumentException("lwcStep cannot be larger than step (" + this.lwcStep + " > " + this.step + ")");
        }
        if (this.stepMillis % this.lwcStepMillis != 0L) {
            throw new IllegalArgumentException("step is not an even multiple of lwcStep (" + this.step + " % " + this.lwcStep + " != 0)");
        }
        this.configRefreshFrequency = config.configRefreshFrequency();
        this.evalUri = URI.create(config.evalUri());
        this.connectTimeout = (int)config.connectTimeout().toMillis();
        this.readTimeout = (int)config.readTimeout().toMillis();
        this.batchSize = config.batchSize();
        this.numThreads = config.numThreads();
        this.commonTags = new TreeMap<String, String>(config.commonTags());
        this.fixTagString = this.createReplacementFunction(config.validTagCharacters());
        SimpleModule module = new SimpleModule().addSerializer(Measurement.class, new MeasurementSerializer(this.fixTagString));
        this.jsonMapper = new ObjectMapper(new JsonFactory()).registerModule(module);
        this.smileMapper = new ObjectMapper(new SmileFactory()).registerModule(module);
        this.debugRegistry = Optional.ofNullable(config.debugRegistry()).orElse((Registry)this);
        this.validationHelper = new ValidationHelper(this.logger, this.jsonMapper, this.debugRegistry);
        this.rollupPolicy = config.rollupPolicy();
        this.client = client != null ? client : HttpClient.create((Registry)this.debugRegistry);
        this.subManager = new SubscriptionManager(this.jsonMapper, this.client, clock, config);
        this.evaluator = new Evaluator(this.commonTags, this::toMap, this.lwcStepMillis);
        if (config.autoStart()) {
            this.start();
        }
    }

    private Function<String, String> createReplacementFunction(String pattern) {
        if (pattern == null) {
            return Function.identity();
        }
        AsciiSet set = AsciiSet.fromPattern((String)pattern);
        return s -> set.replaceNonMembers(s, '_');
    }

    public void start() {
        if (this.scheduler == null) {
            this.logger.info("common tags: {}", this.commonTags);
            ThreadFactory factory = new ThreadFactory(){
                private final AtomicInteger next = new AtomicInteger();

                @Override
                public Thread newThread(Runnable r) {
                    String name = "spectator-atlas-publish-" + this.next.getAndIncrement();
                    Thread t = new Thread(r, name);
                    t.setDaemon(true);
                    return t;
                }
            };
            this.senderPool = Executors.newFixedThreadPool(this.numThreads, factory);
            Scheduler.Options options = new Scheduler.Options().withFrequency(Scheduler.Policy.FIXED_RATE_SKIP_IF_LONG, this.step).withInitialDelay(Duration.ofMillis(this.config.initialPollingDelay(this.clock(), this.stepMillis))).withStopOnFailure(false);
            this.scheduler = new Scheduler(this.debugRegistry, "spectator-reg-atlas", this.numThreads);
            this.scheduler.schedule(options, this::sendToAtlas);
            this.logger.info("started collecting metrics every {} reporting to {}", (Object)this.step, (Object)this.uri);
            Scheduler.Options lwcOptions = new Scheduler.Options().withFrequency(Scheduler.Policy.FIXED_RATE_SKIP_IF_LONG, this.lwcStep).withInitialDelay(Duration.ofMillis(this.config.initialPollingDelay(this.clock(), this.lwcStepMillis))).withStopOnFailure(false);
            this.scheduler.schedule(lwcOptions, this::sendToLWC);
            Scheduler.Options subOptions = new Scheduler.Options().withFrequency(Scheduler.Policy.FIXED_DELAY, this.configRefreshFrequency).withStopOnFailure(false);
            this.scheduler.schedule(subOptions, this::fetchSubscriptions);
        } else {
            this.logger.warn("registry already started, ignoring duplicate request");
        }
    }

    public void stop() {
        if (this.scheduler != null) {
            this.scheduler.shutdown();
            this.scheduler = null;
            this.logger.info("stopped collecting metrics every {}ms reporting to {}", (Object)this.step, (Object)this.uri);
        } else {
            this.logger.warn("registry stopped, but was never started");
        }
        try {
            OverridableClock overridableClock = (OverridableClock)this.clock();
            long now = this.clock().wallTime();
            overridableClock.setWallTime(now);
            this.logger.info("flushing data for previous interval to Atlas");
            this.sendToAtlas();
            this.logger.info("flushing data for final interval to Atlas");
            overridableClock.setWallTime(now / this.lwcStepMillis * this.lwcStepMillis + this.lwcStepMillis);
            this.pollMeters(overridableClock.wallTime());
            overridableClock.setWallTime(now / this.stepMillis * this.stepMillis + this.stepMillis);
            this.sendToAtlas();
        }
        catch (Exception e) {
            this.logger.warn("failed to flush data to Atlas", (Throwable)e);
        }
        if (this.senderPool != null) {
            this.senderPool.shutdown();
            this.senderPool = null;
        }
    }

    @Override
    public void close() {
        this.stop();
    }

    long lastCompletedTimestamp(long s) {
        long now = this.clock().wallTime();
        return now / s * s;
    }

    private Timer publishTaskTimer(String id) {
        return this.debugRegistry.timer(PUBLISH_TASK_TIMER, new String[]{"id", id});
    }

    private byte[] encodeBatch(PublishPayload payload) throws IOException {
        ByteArrayOutputStream baos = this.streamHelper.getOrCreateStream();
        try (GzipLevelOutputStream out = new GzipLevelOutputStream(baos);){
            this.smileMapper.writeValue(out, (Object)payload);
        }
        return baos.toByteArray();
    }

    private void sendBatch(RollupPolicy.Result batch) {
        this.publishTaskTimer("sendBatch").record(() -> {
            try {
                HttpResponse res;
                Instant date;
                PublishPayload p = new PublishPayload(batch.commonTags(), batch.measurements());
                if (this.logger.isTraceEnabled()) {
                    this.logger.trace("publish payload: {}", (Object)this.jsonMapper.writeValueAsString(p));
                }
                this.recordClockSkew((date = (res = this.client.post(this.uri).withConnectTimeout(this.connectTimeout).withReadTimeout(this.readTimeout).addHeader("Content-Encoding", "gzip").withContent("application/x-jackson-smile", this.encodeBatch(p)).send()).dateHeader("Date")) == null ? 0L : date.toEpochMilli());
                this.validationHelper.recordResults(batch.measurements().size(), res);
            }
            catch (Exception e) {
                this.logger.warn("failed to send metrics (uri={})", (Object)this.uri, (Object)e);
                this.validationHelper.incrementDroppedHttp(batch.measurements().size());
            }
        });
    }

    void sendToAtlas() {
        this.publishTaskTimer("sendToAtlas").record(() -> {
            if (this.config.enabled() && this.senderPool != null) {
                long t = this.lastCompletedTimestamp(this.stepMillis);
                if (t > this.lastFlushTimestamp) {
                    this.pollMeters(t);
                    this.logger.debug("sending to Atlas for time: {}", (Object)t);
                    ArrayList<CompletableFuture<Void>> futures = new ArrayList<CompletableFuture<Void>>();
                    for (RollupPolicy.Result batch : this.getBatches(t)) {
                        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> this.sendBatch(batch), this.senderPool);
                        futures.add(future);
                    }
                    CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
                    this.lastFlushTimestamp = t;
                } else {
                    this.logger.debug("skipping duplicate flush attempt for time: {}", (Object)t);
                }
            } else {
                this.logger.debug("publishing is disabled, skipping collection");
            }
            this.removeExpiredMeters();
        });
    }

    private void sendBatchLWC(EvalPayload batch) {
        this.publishTaskTimer("sendBatchLWC").record(() -> {
            try {
                String json = this.jsonMapper.writeValueAsString(batch);
                if (this.logger.isTraceEnabled()) {
                    this.logger.trace("eval payload: {}", (Object)json);
                }
                this.client.post(this.evalUri).withConnectTimeout(this.connectTimeout).withReadTimeout(this.readTimeout).withJsonContent(json).send();
            }
            catch (Exception e) {
                this.logger.warn("failed to send metrics for subscriptions (uri={})", (Object)this.evalUri, (Object)e);
            }
        });
    }

    void sendToLWC() {
        this.publishTaskTimer("sendToLWC").record(() -> {
            long t = this.lastCompletedTimestamp(this.lwcStepMillis);
            this.pollMeters(t);
            if (this.config.lwcEnabled()) {
                this.logger.debug("sending to LWC for time: {}", (Object)t);
                try {
                    EvalPayload payload = this.evaluator.eval(t);
                    if (!payload.getMetrics().isEmpty()) {
                        ArrayList<CompletableFuture<Void>> futures = new ArrayList<CompletableFuture<Void>>();
                        for (EvalPayload batch : payload.toBatches(this.batchSize)) {
                            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> this.sendBatchLWC(batch), this.senderPool);
                            futures.add(future);
                        }
                        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
                    }
                }
                catch (Exception e) {
                    this.logger.warn("failed to send metrics for subscriptions (uri={})", (Object)this.evalUri, (Object)e);
                }
            } else {
                this.logger.debug("lwc is disabled, skipping subscriptions");
            }
        });
    }

    synchronized void pollMeters(long t) {
        this.publishTaskTimer("pollMeters").record(() -> {
            if (t > this.lastPollTimestamp) {
                MeasurementConsumer consumer = (id, timestamp, value) -> {
                    Consolidator consolidator = this.atlasMeasurements.get(id);
                    if (consolidator == null) {
                        int multiple = (int)(this.stepMillis / this.lwcStepMillis);
                        consolidator = Consolidator.create(id, this.stepMillis, multiple);
                        this.atlasMeasurements.put(id, consolidator);
                    }
                    consolidator.update(timestamp, value);
                    this.evaluator.update(id, timestamp, value);
                };
                this.logger.debug("collecting measurements for time: {}", (Object)t);
                this.publishTaskTimer("pollMeasurements").record(() -> {
                    Iterator iterator = this.iterator();
                    while (iterator.hasNext()) {
                        Meter meter = (Meter)iterator.next();
                        ((AtlasMeter)meter).measure(consumer);
                    }
                });
                this.lastPollTimestamp = t;
            }
        });
    }

    public void removeExpiredMeters() {
        super.removeExpiredMeters();
    }

    private void fetchSubscriptions() {
        if (this.config.lwcEnabled()) {
            this.subManager.refresh();
            this.evaluator.sync(this.subManager.subscriptions());
        } else {
            this.logger.debug("lwc is disabled, skipping subscription config refresh");
        }
    }

    private void recordClockSkew(long responseTimestamp) {
        if (responseTimestamp == 0L) {
            this.logger.debug("no date timestamp on response, cannot record skew");
        } else {
            long delta = this.clock().wallTime() - responseTimestamp;
            if (delta >= 0L) {
                this.debugRegistry.timer(CLOCK_SKEW_TIMER, new String[]{"id", "fast"}).record(delta, TimeUnit.MILLISECONDS);
            } else {
                this.debugRegistry.timer(CLOCK_SKEW_TIMER, new String[]{"id", "slow"}).record(-delta, TimeUnit.MILLISECONDS);
            }
            this.logger.debug("clock skew between client and server: {}ms", (Object)delta);
        }
    }

    private Map<String, String> toMap(Id id) {
        HashMap<String, String> tags = new HashMap<String, String>();
        for (Tag t : id.tags()) {
            String k = this.fixTagString.apply(t.key());
            String v = this.fixTagString.apply(t.value());
            tags.put(k, v);
        }
        String name = this.fixTagString.apply(id.name());
        tags.put("name", name);
        return tags;
    }

    synchronized List<RollupPolicy.Result> getBatches(long t) {
        ArrayList<RollupPolicy.Result> batches = new ArrayList<RollupPolicy.Result>();
        this.publishTaskTimer("getBatches").record(() -> {
            int n = this.atlasMeasurements.size();
            this.debugRegistry.distributionSummary("spectator.registrySize").record((long)n);
            ArrayList<Measurement> input = new ArrayList<Measurement>(n);
            Iterator<Map.Entry<Id, Consolidator>> it = this.atlasMeasurements.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry<Id, Consolidator> entry = it.next();
                Consolidator consolidator = entry.getValue();
                consolidator.update(t, Double.NaN);
                double v = consolidator.value(t);
                if (!Double.isNaN(v)) {
                    input.add(new Measurement(entry.getKey(), t, v));
                }
                if (!consolidator.isEmpty()) continue;
                it.remove();
            }
            List results = (List)this.rollupPolicy.apply(input);
            int rollupSize = results.stream().mapToInt(r -> r.measurements().size()).sum();
            this.debugRegistry.distributionSummary("spectator.rollupResultSize").record((long)rollupSize);
            for (RollupPolicy.Result result : results) {
                List<Measurement> ms = result.measurements();
                for (int i = 0; i < ms.size(); i += this.batchSize) {
                    List<Measurement> batch = ms.subList(i, Math.min(ms.size(), i + this.batchSize));
                    batches.add(new RollupPolicy.Result(result.commonTags(), batch));
                }
            }
        });
        return batches;
    }

    public Stream<Measurement> measurements() {
        long t = this.lastCompletedTimestamp(this.stepMillis);
        this.pollMeters(t);
        this.removeExpiredMeters();
        return this.getBatches(t).stream().flatMap(r -> r.measurements().stream());
    }

    protected Counter newCounter(Id id) {
        return new AtlasCounter((Registry)this, id, this.clock(), this.meterTTL, this.lwcStepMillis);
    }

    protected DistributionSummary newDistributionSummary(Id id) {
        return new AtlasDistributionSummary(id, this.clock(), this.meterTTL, this.lwcStepMillis);
    }

    protected Timer newTimer(Id id) {
        return new AtlasTimer(id, this.clock(), this.meterTTL, this.lwcStepMillis);
    }

    protected Gauge newGauge(Id id) {
        return new AtlasGauge((Registry)this, id, this.stepClock, this.meterTTL);
    }

    protected Gauge newMaxGauge(Id id) {
        return new AtlasMaxGauge((Registry)this, id, this.clock(), this.meterTTL, this.lwcStepMillis);
    }
}

