/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnknownProducerIdException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.CumulativeCount;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.SampledStat;
import org.apache.kafka.common.metrics.stats.WindowedCount;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.ProductionExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.kstream.internals.metrics.Sensors;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.AbstractTask;
import org.apache.kafka.streams.processor.internals.ChangelogReader;
import org.apache.kafka.streams.processor.internals.PartitionGroup;
import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorNodePunctuator;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.PunctuationQueue;
import org.apache.kafka.streams.processor.internals.PunctuationSchedule;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
import org.apache.kafka.streams.processor.internals.RecordQueue;
import org.apache.kafka.streams.processor.internals.RecoverableClientException;
import org.apache.kafka.streams.processor.internals.SourceNode;
import org.apache.kafka.streams.processor.internals.StampedRecord;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
import org.apache.kafka.streams.state.internals.ThreadCache;

public class StreamTask
extends AbstractTask
implements ProcessorNodePunctuator {
    private static final ConsumerRecord<Object, Object> DUMMY_RECORD = new ConsumerRecord("__null_topic__", -1, -1L, null, null);
    static final byte LATEST_MAGIC_BYTE = 1;
    private final Time time;
    private final long maxTaskIdleMs;
    private final int maxBufferedSize;
    private final TaskMetrics taskMetrics;
    private final PartitionGroup partitionGroup;
    private final RecordCollector recordCollector;
    private final PartitionGroup.RecordInfo recordInfo;
    private final Map<TopicPartition, Long> consumedOffsets;
    private final PunctuationQueue streamTimePunctuationQueue;
    private final PunctuationQueue systemTimePunctuationQueue;
    private final ProducerSupplier producerSupplier;
    private Sensor closeTaskSensor;
    private long idleStartTime;
    private Producer<byte[], byte[]> producer;
    private boolean commitRequested = false;
    private boolean transactionInFlight = false;

    public StreamTask(TaskId id, Set<TopicPartition> partitions, ProcessorTopology topology, Consumer<byte[], byte[]> consumer, ChangelogReader changelogReader, StreamsConfig config, StreamsMetricsImpl metrics, StateDirectory stateDirectory, ThreadCache cache, Time time, ProducerSupplier producerSupplier) {
        this(id, partitions, topology, consumer, changelogReader, config, metrics, stateDirectory, cache, time, producerSupplier, null);
    }

    public StreamTask(TaskId id, Set<TopicPartition> partitions, ProcessorTopology topology, Consumer<byte[], byte[]> consumer, ChangelogReader changelogReader, StreamsConfig config, StreamsMetricsImpl streamsMetrics, StateDirectory stateDirectory, ThreadCache cache, Time time, ProducerSupplier producerSupplier, RecordCollector recordCollector) {
        super(id, partitions, topology, consumer, changelogReader, false, stateDirectory, config);
        this.time = time;
        this.producerSupplier = producerSupplier;
        this.producer = producerSupplier.get();
        String threadId = Thread.currentThread().getName();
        this.taskMetrics = new TaskMetrics(threadId, id, streamsMetrics);
        this.closeTaskSensor = ThreadMetrics.closeTaskSensor(threadId, streamsMetrics);
        ProductionExceptionHandler productionExceptionHandler = config.defaultProductionExceptionHandler();
        this.recordCollector = recordCollector == null ? new RecordCollectorImpl(id.toString(), this.logContext, productionExceptionHandler, ThreadMetrics.skipRecordSensor(threadId, streamsMetrics)) : recordCollector;
        this.recordCollector.init(this.producer);
        this.streamTimePunctuationQueue = new PunctuationQueue();
        this.systemTimePunctuationQueue = new PunctuationQueue();
        this.maxTaskIdleMs = config.getLong("max.task.idle.ms");
        this.maxBufferedSize = config.getInt("buffered.records.per.partition");
        this.consumedOffsets = new HashMap<TopicPartition, Long>();
        HashMap<TopicPartition, RecordQueue> partitionQueues = new HashMap<TopicPartition, RecordQueue>();
        ProcessorContextImpl processorContextImpl = new ProcessorContextImpl(id, this, config, this.recordCollector, this.stateMgr, streamsMetrics, cache);
        this.processorContext = processorContextImpl;
        TimestampExtractor defaultTimestampExtractor = config.defaultTimestampExtractor();
        DeserializationExceptionHandler defaultDeserializationExceptionHandler = config.defaultDeserializationExceptionHandler();
        for (TopicPartition partition : partitions) {
            SourceNode source = topology.source(partition.topic());
            TimestampExtractor sourceTimestampExtractor = source.getTimestampExtractor() != null ? source.getTimestampExtractor() : defaultTimestampExtractor;
            RecordQueue queue = new RecordQueue(partition, source, sourceTimestampExtractor, defaultDeserializationExceptionHandler, this.processorContext, this.logContext);
            partitionQueues.put(partition, queue);
        }
        this.recordInfo = new PartitionGroup.RecordInfo();
        this.partitionGroup = new PartitionGroup(partitionQueues, Sensors.recordLatenessSensor(processorContextImpl));
        this.stateMgr.registerGlobalStateStores(topology.globalStateStores());
        if (this.eosEnabled) {
            this.initializeTransactions();
        }
    }

    @Override
    public void initializeMetadata() {
        try {
            Map<TopicPartition, OffsetAndMetadata> offsetsAndMetadata = this.consumer.committed(this.partitions).entrySet().stream().filter(e -> e.getValue() != null).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
            this.initializeCommittedOffsets(offsetsAndMetadata);
            this.initializeTaskTime(offsetsAndMetadata);
        }
        catch (AuthorizationException e2) {
            throw new ProcessorStateException(String.format("task [%s] AuthorizationException when initializing offsets for %s", this.id, this.partitions), e2);
        }
        catch (WakeupException e3) {
            throw e3;
        }
        catch (KafkaException e4) {
            throw new ProcessorStateException(String.format("task [%s] Failed to initialize offsets for %s", this.id, this.partitions), e4);
        }
    }

    private void initializeCommittedOffsets(Map<TopicPartition, OffsetAndMetadata> offsetsAndMetadata) {
        HashSet<String> changelogTopicNames = new HashSet<String>(this.topology.storeToChangelogTopic().values());
        Map<TopicPartition, Long> committedOffsetsForChangelogs = offsetsAndMetadata.entrySet().stream().filter(e -> changelogTopicNames.contains(((TopicPartition)e.getKey()).topic())).collect(Collectors.toMap(Map.Entry::getKey, e -> ((OffsetAndMetadata)e.getValue()).offset()));
        for (TopicPartition tp : this.partitions) {
            committedOffsetsForChangelogs.putIfAbsent(tp, 0L);
        }
        this.stateMgr.putOffsetLimits(committedOffsetsForChangelogs);
    }

    private void initializeTaskTime(Map<TopicPartition, OffsetAndMetadata> offsetsAndMetadata) {
        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsetsAndMetadata.entrySet()) {
            TopicPartition partition = entry.getKey();
            OffsetAndMetadata metadata = entry.getValue();
            if (metadata != null) {
                long committedTimestamp = this.decodeTimestamp(metadata.metadata());
                this.partitionGroup.setPartitionTime(partition, committedTimestamp);
                this.log.debug("A committed timestamp was detected: setting the partition time of partition {} to {} in stream task {}", new Object[]{partition, committedTimestamp, this.id});
                continue;
            }
            this.log.debug("No committed timestamp was found in metadata for partition {}", (Object)partition);
        }
        HashSet nonCommitted = new HashSet(this.partitions);
        nonCommitted.removeAll(offsetsAndMetadata.keySet());
        for (TopicPartition partition : nonCommitted) {
            this.log.debug("No committed offset for partition {}, therefore no timestamp can be found for this partition", (Object)partition);
        }
    }

    @Override
    public boolean initializeStateStores() {
        this.log.debug("Initializing state stores");
        this.registerStateStores();
        return this.changelogPartitions().isEmpty();
    }

    @Override
    public void initializeTopology() {
        this.initTopology();
        if (this.eosEnabled) {
            try {
                this.producer.beginTransaction();
            }
            catch (ProducerFencedException | UnknownProducerIdException e) {
                throw new TaskMigratedException(this, e);
            }
            this.transactionInFlight = true;
        }
        this.processorContext.initialize();
        this.taskInitialized = true;
        this.idleStartTime = -1L;
        this.stateMgr.ensureStoresRegistered();
    }

    @Override
    public void resume() {
        this.log.debug("Resuming");
        if (this.eosEnabled) {
            if (this.producer != null) {
                throw new IllegalStateException("Task producer should be null.");
            }
            this.producer = this.producerSupplier.get();
            this.initializeTransactions();
            this.recordCollector.init(this.producer);
            try {
                this.stateMgr.clearCheckpoints();
            }
            catch (IOException e) {
                throw new ProcessorStateException(String.format("%sError while deleting the checkpoint file", this.logPrefix), e);
            }
        }
        this.initializeMetadata();
    }

    boolean isProcessable(long now) {
        if (this.partitionGroup.allPartitionsBuffered()) {
            this.idleStartTime = -1L;
            return true;
        }
        if (this.partitionGroup.numBuffered() > 0) {
            if (this.idleStartTime == -1L) {
                this.idleStartTime = now;
            }
            if (now - this.idleStartTime >= this.maxTaskIdleMs) {
                this.taskMetrics.taskEnforcedProcessSensor.record();
                return true;
            }
            return false;
        }
        return false;
    }

    public boolean process() {
        StampedRecord record = this.partitionGroup.nextRecord(this.recordInfo);
        if (record == null) {
            return false;
        }
        try {
            ProcessorNode currNode = this.recordInfo.node();
            TopicPartition partition = this.recordInfo.partition();
            this.log.trace("Start processing one record [{}]", (Object)record);
            this.updateProcessorContext(record, currNode);
            currNode.process(record.key(), record.value());
            this.log.trace("Completed processing one record [{}]", (Object)record);
            this.consumedOffsets.put(partition, record.offset());
            this.commitNeeded = true;
            if (this.recordInfo.queue().size() == this.maxBufferedSize) {
                this.consumer.resume(Collections.singleton(partition));
            }
        }
        catch (RecoverableClientException e) {
            throw new TaskMigratedException(this, (Throwable)((Object)e));
        }
        catch (KafkaException e) {
            String stackTrace = this.getStacktraceString(e);
            throw new StreamsException(String.format("Exception caught in process. taskId=%s, processor=%s, topic=%s, partition=%d, offset=%d, stacktrace=%s", this.id(), this.processorContext.currentNode().name(), record.topic(), record.partition(), record.offset(), stackTrace), e);
        }
        finally {
            this.processorContext.setCurrentNode(null);
        }
        return true;
    }

    private String getStacktraceString(KafkaException e) {
        String stacktrace = null;
        try (StringWriter stringWriter = new StringWriter();
             PrintWriter printWriter = new PrintWriter(stringWriter);){
            e.printStackTrace(printWriter);
            stacktrace = stringWriter.toString();
        }
        catch (IOException ioe) {
            this.log.error("Encountered error extracting stacktrace from this exception", (Throwable)ioe);
        }
        return stacktrace;
    }

    @Override
    public void punctuate(ProcessorNode node, long timestamp, PunctuationType type, Punctuator punctuator) {
        if (this.processorContext.currentNode() != null) {
            throw new IllegalStateException(String.format("%sCurrent node is not null", this.logPrefix));
        }
        this.updateProcessorContext(new StampedRecord(DUMMY_RECORD, timestamp), node);
        if (this.log.isTraceEnabled()) {
            this.log.trace("Punctuating processor {} with timestamp {} and punctuation type {}", new Object[]{node.name(), timestamp, type});
        }
        try {
            node.punctuate(timestamp, punctuator);
        }
        catch (RecoverableClientException e) {
            throw new TaskMigratedException(this, (Throwable)((Object)e));
        }
        catch (KafkaException e) {
            throw new StreamsException(String.format("%sException caught while punctuating processor '%s'", this.logPrefix, node.name()), e);
        }
        finally {
            this.processorContext.setCurrentNode(null);
        }
    }

    private void updateProcessorContext(StampedRecord record, ProcessorNode currNode) {
        this.processorContext.setRecordContext(new ProcessorRecordContext(record.timestamp, record.offset(), record.partition(), record.topic(), record.headers()));
        this.processorContext.setCurrentNode(currNode);
    }

    @Override
    public void commit() {
        this.commit(true, this.extractPartitionTimes());
    }

    void commit(boolean startNewTransaction, Map<TopicPartition, Long> partitionTimes) {
        long startNs = this.time.nanoseconds();
        this.log.debug("Committing");
        this.flushState();
        if (!this.eosEnabled) {
            this.stateMgr.checkpoint(this.activeTaskCheckpointableOffsets());
        }
        HashMap<TopicPartition, OffsetAndMetadata> consumedOffsetsAndMetadata = new HashMap<TopicPartition, OffsetAndMetadata>(this.consumedOffsets.size());
        for (Map.Entry<TopicPartition, Long> entry : this.consumedOffsets.entrySet()) {
            TopicPartition partition = entry.getKey();
            long offset = entry.getValue() + 1L;
            long partitionTime = partitionTimes.get(partition);
            consumedOffsetsAndMetadata.put(partition, new OffsetAndMetadata(offset, this.encodeTimestamp(partitionTime)));
        }
        try {
            if (this.eosEnabled) {
                this.producer.sendOffsetsToTransaction(consumedOffsetsAndMetadata, this.applicationId);
                this.producer.commitTransaction();
                this.transactionInFlight = false;
                if (startNewTransaction) {
                    this.producer.beginTransaction();
                    this.transactionInFlight = true;
                }
            } else {
                this.consumer.commitSync(consumedOffsetsAndMetadata);
            }
        }
        catch (CommitFailedException | ProducerFencedException | UnknownProducerIdException error) {
            throw new TaskMigratedException(this, error);
        }
        this.commitNeeded = false;
        this.commitRequested = false;
        this.taskMetrics.taskCommitTimeSensor.record((double)(this.time.nanoseconds() - startNs));
    }

    private Map<TopicPartition, Long> activeTaskCheckpointableOffsets() {
        HashMap<TopicPartition, Long> checkpointableOffsets = new HashMap<TopicPartition, Long>(this.recordCollector.offsets());
        for (Map.Entry<TopicPartition, Long> entry : this.consumedOffsets.entrySet()) {
            checkpointableOffsets.putIfAbsent(entry.getKey(), entry.getValue());
        }
        return checkpointableOffsets;
    }

    @Override
    protected void flushState() {
        this.log.trace("Flushing state and producer");
        super.flushState();
        try {
            this.recordCollector.flush();
        }
        catch (RecoverableClientException e) {
            throw new TaskMigratedException(this, (Throwable)((Object)e));
        }
    }

    Map<TopicPartition, Long> purgableOffsets() {
        HashMap<TopicPartition, Long> purgableConsumedOffsets = new HashMap<TopicPartition, Long>();
        for (Map.Entry<TopicPartition, Long> entry : this.consumedOffsets.entrySet()) {
            TopicPartition tp = entry.getKey();
            if (!this.topology.isRepartitionTopic(tp.topic())) continue;
            purgableConsumedOffsets.put(tp, entry.getValue() + 1L);
        }
        return purgableConsumedOffsets;
    }

    private void initTopology() {
        this.log.trace("Initializing processor nodes of the topology");
        for (ProcessorNode node : this.topology.processors()) {
            this.processorContext.setCurrentNode(node);
            try {
                node.init(this.processorContext);
            }
            finally {
                this.processorContext.setCurrentNode(null);
            }
        }
    }

    public void suspend() {
        this.log.debug("Suspending");
        this.suspend(true, false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void suspend(boolean clean, boolean isZombie) {
        Map<TopicPartition, Long> partitionTimes;
        block19: {
            partitionTimes = this.extractPartitionTimes();
            try {
                this.closeTopology();
            }
            catch (RuntimeException fatal) {
                if (!clean) break block19;
                throw fatal;
            }
        }
        if (clean) {
            TaskMigratedException taskMigratedException = null;
            try {
                this.commit(false, partitionTimes);
            }
            finally {
                if (this.eosEnabled) {
                    this.stateMgr.checkpoint(this.activeTaskCheckpointableOffsets());
                    try {
                        this.recordCollector.close();
                    }
                    catch (RecoverableClientException e) {
                        taskMigratedException = new TaskMigratedException(this, (Throwable)((Object)e));
                    }
                    finally {
                        this.producer = null;
                    }
                }
            }
            if (taskMigratedException != null) {
                throw taskMigratedException;
            }
        } else {
            super.flushState();
            if (this.eosEnabled) {
                this.maybeAbortTransactionAndCloseRecordCollector(isZombie);
            }
        }
    }

    private void maybeAbortTransactionAndCloseRecordCollector(boolean isZombie) {
        if (!isZombie) {
            try {
                if (this.transactionInFlight) {
                    this.producer.abortTransaction();
                }
                this.transactionInFlight = false;
            }
            catch (ProducerFencedException producerFencedException) {
                // empty catch block
            }
        }
        try {
            this.recordCollector.close();
        }
        catch (Throwable e) {
            this.log.error("Failed to close producer due to the following error:", e);
        }
        finally {
            this.producer = null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void closeTopology() {
        this.log.trace("Closing processor topology");
        this.partitionGroup.clear();
        RuntimeException exception = null;
        if (this.taskInitialized) {
            for (ProcessorNode node : this.topology.processors()) {
                this.processorContext.setCurrentNode(node);
                try {
                    node.close();
                }
                catch (RuntimeException e) {
                    exception = e;
                }
                finally {
                    this.processorContext.setCurrentNode(null);
                }
            }
        }
        if (exception != null) {
            throw exception;
        }
    }

    void closeSuspended(boolean clean, RuntimeException firstException) {
        try {
            this.closeStateManager(clean);
        }
        catch (RuntimeException e) {
            if (firstException == null) {
                firstException = e;
            }
            this.log.error("Could not close state manager due to the following error:", (Throwable)e);
        }
        this.partitionGroup.close();
        this.taskMetrics.removeAllSensors();
        this.closeTaskSensor.record();
        if (firstException != null) {
            throw firstException;
        }
    }

    @Override
    public void close(boolean clean, boolean isZombie) {
        this.log.debug("Closing");
        RuntimeException firstException = null;
        try {
            this.suspend(clean, isZombie);
        }
        catch (RuntimeException e) {
            clean = false;
            firstException = e;
            this.log.error("Could not close task due to the following error:", (Throwable)e);
        }
        this.closeSuspended(clean, firstException);
        this.taskClosed = true;
    }

    public void addRecords(TopicPartition partition, Iterable<ConsumerRecord<byte[], byte[]>> records) {
        int newQueueSize = this.partitionGroup.addRawRecords(partition, records);
        if (this.log.isTraceEnabled()) {
            this.log.trace("Added records into the buffered queue of partition {}, new queue size is {}", (Object)partition, (Object)newQueueSize);
        }
        if (newQueueSize > this.maxBufferedSize) {
            this.consumer.pause(Collections.singleton(partition));
        }
    }

    public Cancellable schedule(long interval, PunctuationType type, Punctuator punctuator) {
        switch (type) {
            case STREAM_TIME: {
                return this.schedule(0L, interval, type, punctuator);
            }
            case WALL_CLOCK_TIME: {
                return this.schedule(this.time.milliseconds() + interval, interval, type, punctuator);
            }
        }
        throw new IllegalArgumentException("Unrecognized PunctuationType: " + (Object)((Object)type));
    }

    Cancellable schedule(long startTime, long interval, PunctuationType type, Punctuator punctuator) {
        if (this.processorContext.currentNode() == null) {
            throw new IllegalStateException(String.format("%sCurrent node is null", this.logPrefix));
        }
        PunctuationSchedule schedule = new PunctuationSchedule(this.processorContext.currentNode(), startTime, interval, punctuator);
        switch (type) {
            case STREAM_TIME: {
                return this.streamTimePunctuationQueue.schedule(schedule);
            }
            case WALL_CLOCK_TIME: {
                return this.systemTimePunctuationQueue.schedule(schedule);
            }
        }
        throw new IllegalArgumentException("Unrecognized PunctuationType: " + (Object)((Object)type));
    }

    int numBuffered() {
        return this.partitionGroup.numBuffered();
    }

    public boolean maybePunctuateStreamTime() {
        long streamTime = this.partitionGroup.streamTime();
        if (streamTime == -1L) {
            return false;
        }
        boolean punctuated = this.streamTimePunctuationQueue.mayPunctuate(streamTime, PunctuationType.STREAM_TIME, this);
        if (punctuated) {
            this.commitNeeded = true;
        }
        return punctuated;
    }

    public boolean maybePunctuateSystemTime() {
        long systemTime = this.time.milliseconds();
        boolean punctuated = this.systemTimePunctuationQueue.mayPunctuate(systemTime, PunctuationType.WALL_CLOCK_TIME, this);
        if (punctuated) {
            this.commitNeeded = true;
        }
        return punctuated;
    }

    void requestCommit() {
        this.commitRequested = true;
    }

    boolean commitRequested() {
        return this.commitRequested;
    }

    RecordCollector recordCollector() {
        return this.recordCollector;
    }

    long streamTime() {
        return this.partitionGroup.streamTime();
    }

    long partitionTime(TopicPartition partition) {
        return this.partitionGroup.partitionTimestamp(partition);
    }

    Producer<byte[], byte[]> getProducer() {
        return this.producer;
    }

    private void initializeTransactions() {
        try {
            this.producer.initTransactions();
        }
        catch (TimeoutException retriable) {
            this.log.error("Timeout exception caught when initializing transactions for task {}. This might happen if the broker is slow to respond, if the network connection to the broker was interrupted, or if similar circumstances arise. You can increase producer parameter `max.block.ms` to increase this timeout.", (Object)this.id, (Object)retriable);
            throw new StreamsException(String.format("%sFailed to initialize task %s due to timeout.", this.logPrefix, this.id), retriable);
        }
    }

    String encodeTimestamp(long partitionTime) {
        ByteBuffer buffer = ByteBuffer.allocate(9);
        buffer.put((byte)1);
        buffer.putLong(partitionTime);
        return Base64.getEncoder().encodeToString(buffer.array());
    }

    long decodeTimestamp(String encryptedString) {
        if (encryptedString.length() == 0) {
            return -1L;
        }
        ByteBuffer buffer = ByteBuffer.wrap(Base64.getDecoder().decode(encryptedString));
        byte version = buffer.get();
        switch (version) {
            case 1: {
                return buffer.getLong();
            }
        }
        this.log.warn("Unsupported offset metadata version found. Supported version {}. Found version {}.", (Object)1, (Object)version);
        return -1L;
    }

    private Map<TopicPartition, Long> extractPartitionTimes() {
        HashMap<TopicPartition, Long> partitionTimes = new HashMap<TopicPartition, Long>();
        for (TopicPartition partition : this.partitionGroup.partitions()) {
            partitionTimes.put(partition, this.partitionTime(partition));
        }
        return partitionTimes;
    }

    public static interface ProducerSupplier {
        public Producer<byte[], byte[]> get();
    }

    protected static final class TaskMetrics {
        final StreamsMetricsImpl metrics;
        final Sensor taskCommitTimeSensor;
        final Sensor taskEnforcedProcessSensor;
        private final String threadId;
        private final String taskName;

        TaskMetrics(String threadId, TaskId taskId, StreamsMetricsImpl metrics) {
            this.threadId = threadId;
            this.taskName = taskId.toString();
            this.metrics = metrics;
            String group = "stream-task-metrics";
            Sensor parent = ThreadMetrics.commitOverTasksSensor(threadId, metrics);
            Map<String, String> tagMap = metrics.taskLevelTagMap(threadId, this.taskName);
            this.taskCommitTimeSensor = metrics.taskLevelSensor(threadId, this.taskName, "commit", Sensor.RecordingLevel.DEBUG, parent);
            this.taskCommitTimeSensor.add(new MetricName("commit-latency-avg", "stream-task-metrics", "The average latency of commit operation.", tagMap), (MeasurableStat)new Avg());
            this.taskCommitTimeSensor.add(new MetricName("commit-latency-max", "stream-task-metrics", "The max latency of commit operation.", tagMap), (MeasurableStat)new Max());
            this.taskCommitTimeSensor.add(new MetricName("commit-rate", "stream-task-metrics", "The average number of occurrence of commit operation per second.", tagMap), (MeasurableStat)new Rate(TimeUnit.SECONDS, (SampledStat)new WindowedCount()));
            this.taskCommitTimeSensor.add(new MetricName("commit-total", "stream-task-metrics", "The total number of occurrence of commit operations.", tagMap), (MeasurableStat)new CumulativeCount());
            this.taskEnforcedProcessSensor = metrics.taskLevelSensor(threadId, this.taskName, "enforced-processing", Sensor.RecordingLevel.DEBUG, parent);
            this.taskEnforcedProcessSensor.add(new MetricName("enforced-processing-rate", "stream-task-metrics", "The average number of occurrence of enforced-processing operation per second.", tagMap), (MeasurableStat)new Rate(TimeUnit.SECONDS, (SampledStat)new WindowedCount()));
            this.taskEnforcedProcessSensor.add(new MetricName("enforced-processing-total", "stream-task-metrics", "The total number of occurrence of enforced-processing operations.", tagMap), (MeasurableStat)new CumulativeCount());
        }

        void removeAllSensors() {
            this.metrics.removeAllTaskLevelSensors(this.threadId, this.taskName);
        }
    }
}

