/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.channels.FileLock;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Count;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.SampledStat;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskIdFormatException;
import org.apache.kafka.streams.processor.PartitionGrouper;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.AbstractTask;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.StandbyTask;
import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamThread
extends Thread {
    private static final Logger log = LoggerFactory.getLogger(StreamThread.class);
    private static final AtomicInteger STREAM_THREAD_ID_SEQUENCE = new AtomicInteger(1);
    public final PartitionGrouper partitionGrouper;
    public final String applicationId;
    public final String clientId;
    public final UUID processId;
    protected final StreamsConfig config;
    protected final TopologyBuilder builder;
    protected final Set<String> sourceTopics;
    protected final Producer<byte[], byte[]> producer;
    protected final Consumer<byte[], byte[]> consumer;
    protected final Consumer<byte[], byte[]> restoreConsumer;
    private final AtomicBoolean running;
    private final Map<TaskId, StreamTask> activeTasks;
    private final Map<TaskId, StandbyTask> standbyTasks;
    private final Map<TopicPartition, StreamTask> activeTasksByPartition;
    private final Map<TopicPartition, StandbyTask> standbyTasksByPartition;
    private final Set<TaskId> prevTasks;
    private final Time time;
    private final File stateDir;
    private final long pollTimeMs;
    private final long cleanTimeMs;
    private final long commitTimeMs;
    private final StreamsMetricsImpl sensors;
    private StreamPartitionAssignor partitionAssignor = null;
    private long lastClean;
    private long lastCommit;
    private Throwable rebalanceException = null;
    private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords;
    private boolean processStandbyRecords = false;
    final ConsumerRebalanceListener rebalanceListener = new ConsumerRebalanceListener(){

        public void onPartitionsAssigned(Collection<TopicPartition> assignment) {
            try {
                StreamThread.this.addStreamTasks(assignment);
                StreamThread.this.addStandbyTasks();
                StreamThread.this.lastClean = StreamThread.this.time.milliseconds();
            }
            catch (Throwable t) {
                StreamThread.this.rebalanceException = t;
                throw t;
            }
        }

        public void onPartitionsRevoked(Collection<TopicPartition> assignment) {
            try {
                StreamThread.this.commitAll();
                StreamThread.this.lastClean = Long.MAX_VALUE;
            }
            catch (Throwable t) {
                StreamThread.this.rebalanceException = t;
                throw t;
            }
            finally {
                StreamThread.this.removeStreamTasks();
                StreamThread.this.removeStandbyTasks();
            }
        }
    };

    static File makeStateDir(String applicationId, String baseDirName) {
        File stateDir;
        File baseDir = new File(baseDirName);
        if (!baseDir.exists()) {
            baseDir.mkdir();
        }
        if (!(stateDir = new File(baseDir, applicationId)).exists()) {
            stateDir.mkdir();
        }
        return stateDir;
    }

    public StreamThread(TopologyBuilder builder, StreamsConfig config, KafkaClientSupplier clientSupplier, String applicationId, String clientId, UUID processId, Metrics metrics, Time time) {
        super("StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement());
        this.applicationId = applicationId;
        this.config = config;
        this.builder = builder;
        this.sourceTopics = builder.sourceTopics(applicationId);
        this.clientId = clientId;
        this.processId = processId;
        this.partitionGrouper = (PartitionGrouper)config.getConfiguredInstance("partition.grouper", PartitionGrouper.class);
        String threadName = this.getName();
        String threadClientId = clientId + "-" + threadName;
        log.info("Creating producer client for stream thread [{}]", (Object)threadName);
        this.producer = clientSupplier.getProducer(config.getProducerConfigs(threadClientId));
        log.info("Creating consumer client for stream thread [{}]", (Object)threadName);
        this.consumer = clientSupplier.getConsumer(config.getConsumerConfigs(this, applicationId, threadClientId));
        log.info("Creating restore consumer client for stream thread [{}]", (Object)threadName);
        this.restoreConsumer = clientSupplier.getRestoreConsumer(config.getRestoreConsumerConfigs(threadClientId));
        this.activeTasks = new HashMap<TaskId, StreamTask>();
        this.standbyTasks = new HashMap<TaskId, StandbyTask>();
        this.activeTasksByPartition = new HashMap<TopicPartition, StreamTask>();
        this.standbyTasksByPartition = new HashMap<TopicPartition, StandbyTask>();
        this.prevTasks = new HashSet<TaskId>();
        this.standbyRecords = new HashMap<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>();
        this.stateDir = StreamThread.makeStateDir(this.applicationId, this.config.getString("state.dir"));
        this.pollTimeMs = config.getLong("poll.ms");
        this.commitTimeMs = config.getLong("commit.interval.ms");
        this.cleanTimeMs = config.getLong("state.cleanup.delay.ms");
        this.lastClean = Long.MAX_VALUE;
        this.lastCommit = time.milliseconds();
        this.time = time;
        this.sensors = new StreamsMetricsImpl(metrics);
        this.running = new AtomicBoolean(true);
    }

    public void partitionAssignor(StreamPartitionAssignor partitionAssignor) {
        this.partitionAssignor = partitionAssignor;
    }

    @Override
    public void run() {
        log.info("Starting stream thread [" + this.getName() + "]");
        try {
            this.runLoop();
        }
        catch (KafkaException e) {
            throw e;
        }
        catch (Exception e) {
            log.error("Streams application error during processing in thread [" + this.getName() + "]: ", (Throwable)e);
            throw e;
        }
        finally {
            this.shutdown();
        }
    }

    public void close() {
        this.running.set(false);
    }

    public Map<TaskId, StreamTask> tasks() {
        return Collections.unmodifiableMap(this.activeTasks);
    }

    private void shutdown() {
        log.info("Shutting down stream thread [" + this.getName() + "]");
        try {
            this.commitAll();
        }
        catch (Throwable e) {
            // empty catch block
        }
        this.removeStandbyTasks();
        try {
            this.producer.close();
        }
        catch (Throwable e) {
            log.error("Failed to close producer in thread [" + this.getName() + "]: ", e);
        }
        try {
            this.consumer.close();
        }
        catch (Throwable e) {
            log.error("Failed to close consumer in thread [" + this.getName() + "]: ", e);
        }
        try {
            this.restoreConsumer.close();
        }
        catch (Throwable e) {
            log.error("Failed to close restore consumer in thread [" + this.getName() + "]: ", e);
        }
        this.removeStreamTasks();
        log.info("Stream thread shutdown complete [" + this.getName() + "]");
    }

    private void runLoop() {
        int totalNumBuffered = 0;
        long lastPoll = 0L;
        boolean requiresPoll = true;
        this.consumer.subscribe(new ArrayList<String>(this.sourceTopics), this.rebalanceListener);
        while (this.stillRunning()) {
            if (requiresPoll) {
                requiresPoll = false;
                long startPoll = this.time.milliseconds();
                ConsumerRecords records = this.consumer.poll(totalNumBuffered == 0 ? this.pollTimeMs : 0L);
                lastPoll = this.time.milliseconds();
                if (this.rebalanceException != null) {
                    throw new StreamsException("Failed to rebalance", this.rebalanceException);
                }
                if (!records.isEmpty()) {
                    for (TopicPartition partition : records.partitions()) {
                        StreamTask task = this.activeTasksByPartition.get(partition);
                        task.addRecords(partition, records.records(partition));
                    }
                }
                long endPoll = this.time.milliseconds();
                this.sensors.pollTimeSensor.record((double)(endPoll - startPoll));
            }
            totalNumBuffered = 0;
            if (!this.activeTasks.isEmpty()) {
                for (StreamTask task : this.activeTasks.values()) {
                    long startProcess = this.time.milliseconds();
                    totalNumBuffered += task.process();
                    requiresPoll = requiresPoll || task.requiresPoll();
                    this.sensors.processTimeSensor.record((double)(this.time.milliseconds() - startProcess));
                    this.maybePunctuate(task);
                    if (!task.commitNeeded()) continue;
                    this.commitOne(task, this.time.milliseconds());
                }
                if (lastPoll + this.pollTimeMs < this.time.milliseconds()) {
                    requiresPoll = true;
                }
            } else {
                requiresPoll = true;
            }
            this.maybeCommit();
            this.maybeUpdateStandbyTasks();
            this.maybeClean();
        }
    }

    private void maybeUpdateStandbyTasks() {
        if (!this.standbyTasks.isEmpty()) {
            ConsumerRecords records;
            if (this.processStandbyRecords) {
                if (!this.standbyRecords.isEmpty()) {
                    HashMap<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> remainingStandbyRecords = new HashMap<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>();
                    for (TopicPartition partition : this.standbyRecords.keySet()) {
                        List<ConsumerRecord<byte[], byte[]>> remaining = this.standbyRecords.get(partition);
                        if (remaining == null) continue;
                        StandbyTask task = this.standbyTasksByPartition.get(partition);
                        remaining = task.update(partition, remaining);
                        if (remaining != null) {
                            remainingStandbyRecords.put(partition, remaining);
                            continue;
                        }
                        this.restoreConsumer.resume(Collections.singleton(partition));
                    }
                    this.standbyRecords = remainingStandbyRecords;
                }
                this.processStandbyRecords = false;
            }
            if (!(records = this.restoreConsumer.poll(0L)).isEmpty()) {
                for (TopicPartition partition : records.partitions()) {
                    StandbyTask task = this.standbyTasksByPartition.get(partition);
                    if (task == null) {
                        log.error("missing standby task for partition {}", (Object)partition);
                        throw new StreamsException("missing standby task for partition " + partition);
                    }
                    List<ConsumerRecord<byte[], byte[]>> remaining = task.update(partition, records.records(partition));
                    if (remaining == null) continue;
                    this.restoreConsumer.pause(Collections.singleton(partition));
                    this.standbyRecords.put(partition, remaining);
                }
            }
        }
    }

    private boolean stillRunning() {
        if (!this.running.get()) {
            log.debug("Shutting down at user request.");
            return false;
        }
        return true;
    }

    private void maybePunctuate(StreamTask task) {
        try {
            long now = this.time.milliseconds();
            if (task.maybePunctuate()) {
                this.sensors.punctuateTimeSensor.record((double)(this.time.milliseconds() - now));
            }
        }
        catch (KafkaException e) {
            log.error("Failed to punctuate active task #" + task.id() + " in thread [" + this.getName() + "]: ", (Throwable)e);
            throw e;
        }
    }

    protected void maybeCommit() {
        long now = this.time.milliseconds();
        if (this.commitTimeMs >= 0L && this.lastCommit + this.commitTimeMs < now) {
            log.trace("Committing processor instances because the commit interval has elapsed.");
            this.commitAll();
            this.lastCommit = now;
            this.processStandbyRecords = true;
        }
    }

    private void commitAll() {
        for (StreamTask streamTask : this.activeTasks.values()) {
            this.commitOne(streamTask, this.time.milliseconds());
        }
        for (StandbyTask standbyTask : this.standbyTasks.values()) {
            this.commitOne(standbyTask, this.time.milliseconds());
        }
    }

    private void commitOne(AbstractTask task, long now) {
        try {
            task.commit();
        }
        catch (CommitFailedException e) {
            log.warn("Failed to commit " + task.getClass().getSimpleName() + " #" + task.id() + " in thread [" + this.getName() + "]: ", (Throwable)e);
        }
        catch (KafkaException e) {
            log.error("Failed to commit " + task.getClass().getSimpleName() + " #" + task.id() + " in thread [" + this.getName() + "]: ", (Throwable)e);
            throw e;
        }
        this.sensors.commitTimeSensor.record((double)(this.time.milliseconds() - now));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void maybeClean() {
        long now = this.time.milliseconds();
        if (now > this.lastClean + this.cleanTimeMs) {
            File[] stateDirs = this.stateDir.listFiles();
            if (stateDirs != null) {
                for (File dir : stateDirs) {
                    try {
                        String dirName = dir.getName();
                        TaskId id = TaskId.parse(dirName.substring(dirName.lastIndexOf("-") + 1));
                        if (!dir.exists()) continue;
                        FileLock directoryLock = null;
                        try {
                            directoryLock = ProcessorStateManager.lockStateDirectory(dir);
                            if (directoryLock == null) continue;
                            log.info("Deleting obsolete state directory {} for task {} after delayed {} ms.", new Object[]{dir.getAbsolutePath(), id, this.cleanTimeMs});
                            Utils.delete((File)dir);
                        }
                        catch (FileNotFoundException e) {
                        }
                        catch (IOException e) {
                            log.error("Failed to lock the state directory due to an unexpected exception", (Throwable)e);
                        }
                        finally {
                            if (directoryLock != null) {
                                try {
                                    directoryLock.release();
                                    directoryLock.channel().close();
                                }
                                catch (IOException e) {
                                    log.error("Failed to release the state directory lock");
                                }
                            }
                        }
                    }
                    catch (TaskIdFormatException e) {
                        // empty catch block
                    }
                }
            }
            this.lastClean = now;
        }
    }

    public Set<TaskId> prevTasks() {
        return Collections.unmodifiableSet(this.prevTasks);
    }

    public Set<TaskId> cachedTasks() {
        HashSet<TaskId> tasks = new HashSet<TaskId>();
        File[] stateDirs = this.stateDir.listFiles();
        if (stateDirs != null) {
            for (File dir : stateDirs) {
                try {
                    TaskId id = TaskId.parse(dir.getName());
                    if (!new File(dir, ".checkpoint").exists()) continue;
                    tasks.add(id);
                }
                catch (TaskIdFormatException e) {
                    // empty catch block
                }
            }
        }
        return tasks;
    }

    protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitions) {
        this.sensors.taskCreationSensor.record();
        ProcessorTopology topology = this.builder.build(this.applicationId, id.topicGroupId);
        return new StreamTask(id, this.applicationId, partitions, topology, this.consumer, this.producer, this.restoreConsumer, this.config, this.sensors);
    }

    private void addStreamTasks(Collection<TopicPartition> assignment) {
        if (this.partitionAssignor == null) {
            throw new IllegalStateException("Partition assignor has not been initialized while adding stream tasks: this should not happen.");
        }
        HashMap<TaskId, HashSet<TopicPartition>> partitionsForTask = new HashMap<TaskId, HashSet<TopicPartition>>();
        for (TopicPartition topicPartition : assignment) {
            Set<TaskId> taskIds = this.partitionAssignor.tasksForPartition(topicPartition);
            for (TaskId taskId : taskIds) {
                HashSet<TopicPartition> partitions = (HashSet<TopicPartition>)partitionsForTask.get(taskId);
                if (partitions == null) {
                    partitions = new HashSet<TopicPartition>();
                    partitionsForTask.put(taskId, partitions);
                }
                partitions.add(topicPartition);
            }
        }
        for (Map.Entry entry : partitionsForTask.entrySet()) {
            TaskId taskId = (TaskId)entry.getKey();
            Set partitions = (Set)entry.getValue();
            try {
                StreamTask task = this.createStreamTask(taskId, partitions);
                this.activeTasks.put(taskId, task);
                for (TopicPartition partition : partitions) {
                    this.activeTasksByPartition.put(partition, task);
                }
            }
            catch (StreamsException e) {
                log.error("Failed to create an active task #" + taskId + " in thread [" + this.getName() + "]: ", (Throwable)((Object)e));
                throw e;
            }
        }
    }

    private void removeStreamTasks() {
        try {
            for (StreamTask task : this.activeTasks.values()) {
                this.closeOne(task);
            }
            this.prevTasks.clear();
            this.prevTasks.addAll(this.activeTasks.keySet());
            this.activeTasks.clear();
            this.activeTasksByPartition.clear();
        }
        catch (Exception e) {
            log.error("Failed to remove stream tasks in thread [" + this.getName() + "]: ", (Throwable)e);
        }
    }

    private void closeOne(AbstractTask task) {
        log.info("Removing a task {}", (Object)task.id());
        try {
            task.close();
        }
        catch (StreamsException e) {
            log.error("Failed to close a " + task.getClass().getSimpleName() + " #" + task.id() + " in thread [" + this.getName() + "]: ", (Throwable)((Object)e));
        }
        this.sensors.taskDestructionSensor.record();
    }

    protected StandbyTask createStandbyTask(TaskId id, Collection<TopicPartition> partitions) {
        this.sensors.taskCreationSensor.record();
        ProcessorTopology topology = this.builder.build(this.applicationId, id.topicGroupId);
        if (!topology.stateStoreSuppliers().isEmpty()) {
            return new StandbyTask(id, this.applicationId, partitions, topology, this.consumer, this.restoreConsumer, this.config, this.sensors);
        }
        return null;
    }

    private void addStandbyTasks() {
        if (this.partitionAssignor == null) {
            throw new IllegalStateException("Partition assignor has not been initialized while adding standby tasks: this should not happen.");
        }
        HashMap<TopicPartition, Long> checkpointedOffsets = new HashMap<TopicPartition, Long>();
        for (Map.Entry<TaskId, Set<TopicPartition>> entry : this.partitionAssignor.standbyTasks().entrySet()) {
            Set<TopicPartition> partitions;
            TaskId taskId = entry.getKey();
            StandbyTask task = this.createStandbyTask(taskId, partitions = entry.getValue());
            if (task == null) continue;
            this.standbyTasks.put(taskId, task);
            for (TopicPartition partition : partitions) {
                this.standbyTasksByPartition.put(partition, task);
            }
            for (TopicPartition partition : task.checkpointedOffsets().keySet()) {
                this.standbyTasksByPartition.put(partition, task);
            }
            checkpointedOffsets.putAll(task.checkpointedOffsets());
        }
        this.restoreConsumer.assign(new ArrayList(checkpointedOffsets.keySet()));
        for (Map.Entry<TaskId, Set<Object>> entry : checkpointedOffsets.entrySet()) {
            TopicPartition partition = (TopicPartition)entry.getKey();
            long offset = (Long)((Object)entry.getValue());
            if (offset >= 0L) {
                this.restoreConsumer.seek(partition, offset);
                continue;
            }
            this.restoreConsumer.seekToBeginning(Collections.singleton(partition));
        }
    }

    private void removeStandbyTasks() {
        try {
            for (StandbyTask task : this.standbyTasks.values()) {
                this.closeOne(task);
            }
            this.standbyTasks.clear();
            this.standbyTasksByPartition.clear();
            this.standbyRecords.clear();
            this.restoreConsumer.assign(Collections.emptyList());
        }
        catch (Exception e) {
            log.error("Failed to remove standby tasks in thread [" + this.getName() + "]: ", (Throwable)e);
        }
    }

    private class StreamsMetricsImpl
    implements StreamsMetrics {
        final Metrics metrics;
        final String metricGrpName;
        final Map<String, String> metricTags;
        final Sensor commitTimeSensor;
        final Sensor pollTimeSensor;
        final Sensor processTimeSensor;
        final Sensor punctuateTimeSensor;
        final Sensor taskCreationSensor;
        final Sensor taskDestructionSensor;

        public StreamsMetricsImpl(Metrics metrics) {
            this.metrics = metrics;
            this.metricGrpName = "stream-metrics";
            this.metricTags = new LinkedHashMap<String, String>();
            this.metricTags.put("client-id", StreamThread.this.clientId + "-" + StreamThread.this.getName());
            this.commitTimeSensor = metrics.sensor("commit-time");
            this.commitTimeSensor.add(metrics.metricName("commit-time-avg", this.metricGrpName, "The average commit time in ms", this.metricTags), (MeasurableStat)new Avg());
            this.commitTimeSensor.add(metrics.metricName("commit-time-max", this.metricGrpName, "The maximum commit time in ms", this.metricTags), (MeasurableStat)new Max());
            this.commitTimeSensor.add(metrics.metricName("commit-calls-rate", this.metricGrpName, "The average per-second number of commit calls", this.metricTags), (MeasurableStat)new Rate((SampledStat)new Count()));
            this.pollTimeSensor = metrics.sensor("poll-time");
            this.pollTimeSensor.add(metrics.metricName("poll-time-avg", this.metricGrpName, "The average poll time in ms", this.metricTags), (MeasurableStat)new Avg());
            this.pollTimeSensor.add(metrics.metricName("poll-time-max", this.metricGrpName, "The maximum poll time in ms", this.metricTags), (MeasurableStat)new Max());
            this.pollTimeSensor.add(metrics.metricName("poll-calls-rate", this.metricGrpName, "The average per-second number of record-poll calls", this.metricTags), (MeasurableStat)new Rate((SampledStat)new Count()));
            this.processTimeSensor = metrics.sensor("process-time");
            this.processTimeSensor.add(metrics.metricName("process-time-avg-ms", this.metricGrpName, "The average process time in ms", this.metricTags), (MeasurableStat)new Avg());
            this.processTimeSensor.add(metrics.metricName("process-time-max-ms", this.metricGrpName, "The maximum process time in ms", this.metricTags), (MeasurableStat)new Max());
            this.processTimeSensor.add(metrics.metricName("process-calls-rate", this.metricGrpName, "The average per-second number of process calls", this.metricTags), (MeasurableStat)new Rate((SampledStat)new Count()));
            this.punctuateTimeSensor = metrics.sensor("punctuate-time");
            this.punctuateTimeSensor.add(metrics.metricName("punctuate-time-avg", this.metricGrpName, "The average punctuate time in ms", this.metricTags), (MeasurableStat)new Avg());
            this.punctuateTimeSensor.add(metrics.metricName("punctuate-time-max", this.metricGrpName, "The maximum punctuate time in ms", this.metricTags), (MeasurableStat)new Max());
            this.punctuateTimeSensor.add(metrics.metricName("punctuate-calls-rate", this.metricGrpName, "The average per-second number of punctuate calls", this.metricTags), (MeasurableStat)new Rate((SampledStat)new Count()));
            this.taskCreationSensor = metrics.sensor("task-creation");
            this.taskCreationSensor.add(metrics.metricName("task-creation-rate", this.metricGrpName, "The average per-second number of newly created tasks", this.metricTags), (MeasurableStat)new Rate((SampledStat)new Count()));
            this.taskDestructionSensor = metrics.sensor("task-destruction");
            this.taskDestructionSensor.add(metrics.metricName("task-destruction-rate", this.metricGrpName, "The average per-second number of destructed tasks", this.metricTags), (MeasurableStat)new Rate((SampledStat)new Count()));
        }

        @Override
        public void recordLatency(Sensor sensor, long startNs, long endNs) {
            sensor.record((double)((endNs - startNs) / 1000000L), endNs);
        }

        @Override
        public Sensor addLatencySensor(String scopeName, String entityName, String operationName, String ... tags) {
            HashMap<String, String> tagMap = new HashMap<String, String>(this.metricTags);
            if (tags.length % 2 != 0) {
                throw new IllegalArgumentException("Tags needs to be specified in key-value pairs");
            }
            for (int i = 0; i < tags.length; i += 2) {
                tagMap.put(tags[i], tags[i + 1]);
            }
            String metricGroupName = "stream-" + scopeName + "-metrics";
            Sensor parent = this.metrics.sensor(scopeName + "-" + operationName);
            this.addLatencyMetrics(metricGroupName, parent, "all", operationName, this.metricTags);
            Sensor sensor = this.metrics.sensor(scopeName + "-" + entityName + "-" + operationName, new Sensor[]{parent});
            this.addLatencyMetrics(metricGroupName, sensor, entityName, operationName, tagMap);
            return sensor;
        }

        private void addLatencyMetrics(String metricGrpName, Sensor sensor, String entityName, String opName, Map<String, String> tags) {
            this.maybeAddMetric(sensor, this.metrics.metricName(entityName + "-" + opName + "-avg-latency-ms", metricGrpName, "The average latency in milliseconds of " + entityName + " " + opName + " operation.", tags), (MeasurableStat)new Avg());
            this.maybeAddMetric(sensor, this.metrics.metricName(entityName + "-" + opName + "-max-latency-ms", metricGrpName, "The max latency in milliseconds of " + entityName + " " + opName + " operation.", tags), (MeasurableStat)new Max());
            this.maybeAddMetric(sensor, this.metrics.metricName(entityName + "-" + opName + "-qps", metricGrpName, "The average number of occurrence of " + entityName + " " + opName + " operation per second.", tags), (MeasurableStat)new Rate((SampledStat)new Count()));
        }

        private void maybeAddMetric(Sensor sensor, MetricName name, MeasurableStat stat) {
            if (!this.metrics.metrics().containsKey(name)) {
                sensor.add(name, stat);
            }
        }
    }
}

