package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Count;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Min;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskIdFormatException;
import org.apache.kafka.streams.processor.PartitionGrouper;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.streams.state.internals.ThreadCacheMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/*  JADX ERROR: NullPointerException in pass: ClassModifier
    java.lang.NullPointerException: Cannot invoke "java.util.List.forEach(java.util.function.Consumer)" because "blocks" is null
    	at jadx.core.utils.BlockUtils.collectAllInsns(BlockUtils.java:1017)
    	at jadx.core.dex.visitors.ClassModifier.removeBridgeMethod(ClassModifier.java:239)
    	at jadx.core.dex.visitors.ClassModifier.removeSyntheticMethods(ClassModifier.java:154)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.ClassModifier.visit(ClassModifier.java:64)
    */
/* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamThread.class */
public class StreamThread extends Thread {
    private static final Logger log = LoggerFactory.getLogger(StreamThread.class);
    private static final AtomicInteger STREAM_THREAD_ID_SEQUENCE = new AtomicInteger(1);
    public final PartitionGrouper partitionGrouper;
    private final StreamsMetadataState streamsMetadataState;
    public final String applicationId;
    public final String clientId;
    public final UUID processId;
    protected final StreamsConfig config;
    protected final TopologyBuilder builder;
    protected final Set<String> sourceTopics;
    protected final Pattern topicPattern;
    protected final Producer<byte[], byte[]> producer;
    protected final Consumer<byte[], byte[]> consumer;
    protected final Consumer<byte[], byte[]> restoreConsumer;
    private final String logPrefix;
    private final String threadClientId;
    private final AtomicBoolean running;
    private final Map<TaskId, StreamTask> activeTasks;
    private final Map<TaskId, StandbyTask> standbyTasks;
    private final Map<TopicPartition, StreamTask> activeTasksByPartition;
    private final Map<TopicPartition, StandbyTask> standbyTasksByPartition;
    private final Set<TaskId> prevTasks;
    private final Time time;
    private final long pollTimeMs;
    private final long cleanTimeMs;
    private final long commitTimeMs;
    private final StreamsMetricsImpl sensors;
    final StateDirectory stateDirectory;
    private StreamPartitionAssignor partitionAssignor;
    private long timerStartedMs;
    private long lastCleanMs;
    private long lastCommitMs;
    private Throwable rebalanceException;
    private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords;
    private boolean processStandbyRecords;
    private AtomicBoolean initialized;
    private final long cacheSizeBytes;
    private ThreadCache cache;
    final ConsumerRebalanceListener rebalanceListener;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.kafka.streams.processor.internals.StreamThread$1 */
    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamThread$1.class */
    public class AnonymousClass1 implements ConsumerRebalanceListener {
        AnonymousClass1() {
        }

        public void onPartitionsAssigned(Collection<TopicPartition> collection) {
            try {
                StreamThread.log.info("stream-thread [{}] New partitions [{}] assigned at the end of consumer rebalance.", StreamThread.this.getName(), collection);
                StreamThread.this.addStreamTasks(collection);
                StreamThread.this.addStandbyTasks();
                StreamThread.access$302(StreamThread.this, StreamThread.this.time.milliseconds());
                StreamThread.this.streamsMetadataState.onChange(StreamThread.this.partitionAssignor.getPartitionsByHostState(), StreamThread.this.partitionAssignor.clusterMetadata());
                StreamThread.this.initialized.set(true);
            } catch (Throwable th) {
                StreamThread.this.rebalanceException = th;
                throw th;
            }
        }

        public void onPartitionsRevoked(Collection<TopicPartition> collection) {
            try {
                try {
                    StreamThread.log.info("stream-thread [{}] partitions [{}] revoked at the beginning of consumer rebalance.", StreamThread.this.getName(), collection);
                    StreamThread.this.initialized.set(false);
                    StreamThread.access$302(StreamThread.this, Long.MAX_VALUE);
                    StreamThread.this.shutdownTasksAndState(true);
                    StreamThread.this.streamsMetadataState.onChange(Collections.emptyMap(), StreamThread.this.partitionAssignor.clusterMetadata());
                    StreamThread.this.removeStreamTasks();
                    StreamThread.this.removeStandbyTasks();
                } catch (Throwable th) {
                    StreamThread.this.rebalanceException = th;
                    throw th;
                }
            } catch (Throwable th2) {
                StreamThread.this.streamsMetadataState.onChange(Collections.emptyMap(), StreamThread.this.partitionAssignor.clusterMetadata());
                StreamThread.this.removeStreamTasks();
                StreamThread.this.removeStandbyTasks();
                throw th2;
            }
        }
    }

    /* renamed from: org.apache.kafka.streams.processor.internals.StreamThread$2 */
    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamThread$2.class */
    public class AnonymousClass2 implements AbstractTaskAction {
        AnonymousClass2() {
        }

        @Override // org.apache.kafka.streams.processor.internals.StreamThread.AbstractTaskAction
        public void apply(AbstractTask abstractTask) {
            StreamThread.log.info("{} Closing the state manager of task {}", StreamThread.this.logPrefix, abstractTask.id());
            abstractTask.closeStateManager();
        }
    }

    /* renamed from: org.apache.kafka.streams.processor.internals.StreamThread$3 */
    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamThread$3.class */
    public class AnonymousClass3 implements AbstractTaskAction {
        AnonymousClass3() {
        }

        @Override // org.apache.kafka.streams.processor.internals.StreamThread.AbstractTaskAction
        public void apply(AbstractTask abstractTask) {
            StreamThread.log.info("{} Committing consumer offsets of task {}", StreamThread.this.logPrefix, abstractTask.id());
            abstractTask.commitOffsets();
        }
    }

    /* renamed from: org.apache.kafka.streams.processor.internals.StreamThread$4 */
    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamThread$4.class */
    public class AnonymousClass4 implements AbstractTaskAction {
        AnonymousClass4() {
        }

        @Override // org.apache.kafka.streams.processor.internals.StreamThread.AbstractTaskAction
        public void apply(AbstractTask abstractTask) {
            StreamThread.log.info("{} Flushing state stores of task {}", StreamThread.this.logPrefix, abstractTask.id());
            abstractTask.flushState();
        }
    }

    /* renamed from: org.apache.kafka.streams.processor.internals.StreamThread$5 */
    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamThread$5.class */
    public class AnonymousClass5 implements AbstractTaskAction {
        AnonymousClass5() {
        }

        @Override // org.apache.kafka.streams.processor.internals.StreamThread.AbstractTaskAction
        public void apply(AbstractTask abstractTask) {
            StreamThread.log.info("{} Closing a task {}", StreamThread.this.logPrefix, abstractTask.id());
            abstractTask.close();
            StreamThread.this.sensors.taskDestructionSensor.record();
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamThread$AbstractTaskAction.class */
    public interface AbstractTaskAction {
        void apply(AbstractTask abstractTask);
    }

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamThread$StreamsMetricsImpl.class */
    public class StreamsMetricsImpl implements StreamsMetrics, ThreadCacheMetrics {
        final Metrics metrics;
        final String metricGrpName = "stream-metrics";
        final String sensorNamePrefix;
        final Map<String, String> metricTags;
        final Sensor commitTimeSensor;
        final Sensor pollTimeSensor;
        final Sensor processTimeSensor;
        final Sensor punctuateTimeSensor;
        final Sensor taskCreationSensor;
        final Sensor taskDestructionSensor;

        public StreamsMetricsImpl(Metrics metrics) {
            this.metrics = metrics;
            this.sensorNamePrefix = "thread." + StreamThread.this.threadClientId;
            this.metricTags = Collections.singletonMap("client-id", StreamThread.this.threadClientId);
            this.commitTimeSensor = metrics.sensor(this.sensorNamePrefix + ".commit-time");
            this.commitTimeSensor.add(metrics.metricName("commit-time-avg", this.metricGrpName, "The average commit time in ms", this.metricTags), new Avg());
            this.commitTimeSensor.add(metrics.metricName("commit-time-max", this.metricGrpName, "The maximum commit time in ms", this.metricTags), new Max());
            this.commitTimeSensor.add(metrics.metricName("commit-calls-rate", this.metricGrpName, "The average per-second number of commit calls", this.metricTags), new Rate(new Count()));
            this.pollTimeSensor = metrics.sensor(this.sensorNamePrefix + ".poll-time");
            this.pollTimeSensor.add(metrics.metricName("poll-time-avg", this.metricGrpName, "The average poll time in ms", this.metricTags), new Avg());
            this.pollTimeSensor.add(metrics.metricName("poll-time-max", this.metricGrpName, "The maximum poll time in ms", this.metricTags), new Max());
            this.pollTimeSensor.add(metrics.metricName("poll-calls-rate", this.metricGrpName, "The average per-second number of record-poll calls", this.metricTags), new Rate(new Count()));
            this.processTimeSensor = metrics.sensor(this.sensorNamePrefix + ".process-time");
            this.processTimeSensor.add(metrics.metricName("process-time-avg-ms", this.metricGrpName, "The average process time in ms", this.metricTags), new Avg());
            this.processTimeSensor.add(metrics.metricName("process-time-max-ms", this.metricGrpName, "The maximum process time in ms", this.metricTags), new Max());
            this.processTimeSensor.add(metrics.metricName("process-calls-rate", this.metricGrpName, "The average per-second number of process calls", this.metricTags), new Rate(new Count()));
            this.punctuateTimeSensor = metrics.sensor(this.sensorNamePrefix + ".punctuate-time");
            this.punctuateTimeSensor.add(metrics.metricName("punctuate-time-avg", this.metricGrpName, "The average punctuate time in ms", this.metricTags), new Avg());
            this.punctuateTimeSensor.add(metrics.metricName("punctuate-time-max", this.metricGrpName, "The maximum punctuate time in ms", this.metricTags), new Max());
            this.punctuateTimeSensor.add(metrics.metricName("punctuate-calls-rate", this.metricGrpName, "The average per-second number of punctuate calls", this.metricTags), new Rate(new Count()));
            this.taskCreationSensor = metrics.sensor(this.sensorNamePrefix + ".task-creation");
            this.taskCreationSensor.add(metrics.metricName("task-creation-rate", this.metricGrpName, "The average per-second number of newly created tasks", this.metricTags), new Rate(new Count()));
            this.taskDestructionSensor = metrics.sensor(this.sensorNamePrefix + ".task-destruction");
            this.taskDestructionSensor.add(metrics.metricName("task-destruction-rate", this.metricGrpName, "The average per-second number of destructed tasks", this.metricTags), new Rate(new Count()));
        }

        @Override // org.apache.kafka.streams.StreamsMetrics
        public void recordLatency(Sensor sensor, long j, long j2) {
            sensor.record(j2 - j, StreamThread.this.timerStartedMs);
        }

        @Override // org.apache.kafka.streams.state.internals.ThreadCacheMetrics
        public void recordCacheSensor(Sensor sensor, double d) {
            sensor.record(d);
        }

        @Override // org.apache.kafka.streams.StreamsMetrics
        public Sensor addLatencySensor(String str, String str2, String str3, String... strArr) {
            HashMap hashMap = new HashMap(this.metricTags);
            if (strArr.length % 2 != 0) {
                throw new IllegalArgumentException("Tags needs to be specified in key-value pairs");
            }
            for (int i = 0; i < strArr.length; i += 2) {
                hashMap.put(strArr[i], strArr[i + 1]);
            }
            String str4 = "stream-" + str + "-metrics";
            Sensor sensor = this.metrics.sensor(this.sensorNamePrefix + "." + str + "-" + str3);
            addLatencyMetrics(str4, sensor, "all", str3, this.metricTags);
            Sensor sensor2 = this.metrics.sensor(this.sensorNamePrefix + "." + str + "-" + str2 + "-" + str3, new Sensor[]{sensor});
            addLatencyMetrics(str4, sensor2, str2, str3, hashMap);
            return sensor2;
        }

        @Override // org.apache.kafka.streams.state.internals.ThreadCacheMetrics
        public Sensor addCacheSensor(String str, String str2, String... strArr) {
            HashMap hashMap = new HashMap(this.metricTags);
            if (strArr.length % 2 != 0) {
                throw new IllegalArgumentException("Tags needs to be specified in key-value pairs");
            }
            for (int i = 0; i < strArr.length; i += 2) {
                hashMap.put(strArr[i], strArr[i + 1]);
            }
            Sensor sensor = this.metrics.sensor(this.sensorNamePrefix + "-" + str + "-" + str2);
            addCacheMetrics("stream-thread-cache-metrics", sensor, str, str2, hashMap);
            return sensor;
        }

        private void addCacheMetrics(String str, Sensor sensor, String str2, String str3, Map<String, String> map) {
            maybeAddMetric(sensor, this.metrics.metricName(str2 + "-" + str3 + "-avg", str, "The current count of " + str2 + " " + str3 + " operation.", map), new Avg());
            maybeAddMetric(sensor, this.metrics.metricName(str2 + "-" + str3 + "-min", str, "The current count of " + str2 + " " + str3 + " operation.", map), new Min());
            maybeAddMetric(sensor, this.metrics.metricName(str2 + "-" + str3 + "-max", str, "The current count of " + str2 + " " + str3 + " operation.", map), new Max());
        }

        private void addLatencyMetrics(String str, Sensor sensor, String str2, String str3, Map<String, String> map) {
            maybeAddMetric(sensor, this.metrics.metricName(str2 + "-" + str3 + "-avg-latency-ms", str, "The average latency in milliseconds of " + str2 + " " + str3 + " operation.", map), new Avg());
            maybeAddMetric(sensor, this.metrics.metricName(str2 + "-" + str3 + "-max-latency-ms", str, "The max latency in milliseconds of " + str2 + " " + str3 + " operation.", map), new Max());
            maybeAddMetric(sensor, this.metrics.metricName(str2 + "-" + str3 + "-qps", str, "The average number of occurrence of " + str2 + " " + str3 + " operation per second.", map), new Rate(new Count()));
        }

        private void maybeAddMetric(Sensor sensor, MetricName metricName, MeasurableStat measurableStat) {
            if (this.metrics.metrics().containsKey(metricName)) {
                return;
            }
            sensor.add(metricName, measurableStat);
        }
    }

    public boolean isInitialized() {
        return this.initialized.get();
    }

    public StreamThread(TopologyBuilder topologyBuilder, StreamsConfig streamsConfig, KafkaClientSupplier kafkaClientSupplier, String str, String str2, UUID uuid, Metrics metrics, Time time, StreamsMetadataState streamsMetadataState) {
        super("StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement());
        this.partitionAssignor = null;
        this.rebalanceException = null;
        this.processStandbyRecords = false;
        this.initialized = new AtomicBoolean(false);
        this.rebalanceListener = new ConsumerRebalanceListener() { // from class: org.apache.kafka.streams.processor.internals.StreamThread.1
            AnonymousClass1() {
            }

            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                try {
                    StreamThread.log.info("stream-thread [{}] New partitions [{}] assigned at the end of consumer rebalance.", StreamThread.this.getName(), collection);
                    StreamThread.this.addStreamTasks(collection);
                    StreamThread.this.addStandbyTasks();
                    StreamThread.access$302(StreamThread.this, StreamThread.this.time.milliseconds());
                    StreamThread.this.streamsMetadataState.onChange(StreamThread.this.partitionAssignor.getPartitionsByHostState(), StreamThread.this.partitionAssignor.clusterMetadata());
                    StreamThread.this.initialized.set(true);
                } catch (Throwable th) {
                    StreamThread.this.rebalanceException = th;
                    throw th;
                }
            }

            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                try {
                    try {
                        StreamThread.log.info("stream-thread [{}] partitions [{}] revoked at the beginning of consumer rebalance.", StreamThread.this.getName(), collection);
                        StreamThread.this.initialized.set(false);
                        StreamThread.access$302(StreamThread.this, Long.MAX_VALUE);
                        StreamThread.this.shutdownTasksAndState(true);
                        StreamThread.this.streamsMetadataState.onChange(Collections.emptyMap(), StreamThread.this.partitionAssignor.clusterMetadata());
                        StreamThread.this.removeStreamTasks();
                        StreamThread.this.removeStandbyTasks();
                    } catch (Throwable th) {
                        StreamThread.this.rebalanceException = th;
                        throw th;
                    }
                } catch (Throwable th2) {
                    StreamThread.this.streamsMetadataState.onChange(Collections.emptyMap(), StreamThread.this.partitionAssignor.clusterMetadata());
                    StreamThread.this.removeStreamTasks();
                    StreamThread.this.removeStandbyTasks();
                    throw th2;
                }
            }
        };
        this.applicationId = str;
        String name = getName();
        this.config = streamsConfig;
        this.builder = topologyBuilder;
        this.sourceTopics = topologyBuilder.sourceTopics();
        this.topicPattern = topologyBuilder.sourceTopicPattern();
        this.clientId = str2;
        this.processId = uuid;
        this.partitionGrouper = (PartitionGrouper) streamsConfig.getConfiguredInstance(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, PartitionGrouper.class);
        this.streamsMetadataState = streamsMetadataState;
        this.threadClientId = str2 + "-" + name;
        this.sensors = new StreamsMetricsImpl(metrics);
        if (streamsConfig.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG).longValue() < 0) {
            log.warn("Negative cache size passed in thread [{}]. Reverting to cache size of 0 bytes.", name);
        }
        this.cacheSizeBytes = Math.max(0L, streamsConfig.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG).longValue() / streamsConfig.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG).intValue());
        this.cache = new ThreadCache(this.threadClientId, this.cacheSizeBytes, this.sensors);
        this.logPrefix = String.format("stream-thread [%s]", name);
        log.info("{} Creating producer client", this.logPrefix);
        this.producer = kafkaClientSupplier.getProducer(streamsConfig.getProducerConfigs(this.threadClientId));
        log.info("{} Creating consumer client", this.logPrefix);
        this.consumer = kafkaClientSupplier.getConsumer(streamsConfig.getConsumerConfigs(this, str, this.threadClientId));
        log.info("{} Creating restore consumer client", this.logPrefix);
        this.restoreConsumer = kafkaClientSupplier.getRestoreConsumer(streamsConfig.getRestoreConsumerConfigs(this.threadClientId));
        this.activeTasks = new ConcurrentHashMap();
        this.standbyTasks = new HashMap();
        this.activeTasksByPartition = new HashMap();
        this.standbyTasksByPartition = new HashMap();
        this.prevTasks = new HashSet();
        this.standbyRecords = new HashMap();
        this.stateDirectory = new StateDirectory(str, streamsConfig.getString(StreamsConfig.STATE_DIR_CONFIG));
        this.pollTimeMs = streamsConfig.getLong(StreamsConfig.POLL_MS_CONFIG).longValue();
        this.commitTimeMs = streamsConfig.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG).longValue();
        this.cleanTimeMs = streamsConfig.getLong(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG).longValue();
        this.time = time;
        this.timerStartedMs = time.milliseconds();
        this.lastCleanMs = Long.MAX_VALUE;
        this.lastCommitMs = this.timerStartedMs;
        this.running = new AtomicBoolean(true);
    }

    public void partitionAssignor(StreamPartitionAssignor streamPartitionAssignor) {
        this.partitionAssignor = streamPartitionAssignor;
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        log.info("{} Starting", this.logPrefix);
        try {
            try {
                runLoop();
                shutdown();
            } catch (Exception e) {
                log.error("{} Streams application error during processing: ", this.logPrefix, e);
                throw e;
            } catch (KafkaException e2) {
                throw e2;
            }
        } catch (Throwable th) {
            shutdown();
            throw th;
        }
    }

    public void close() {
        this.running.set(false);
    }

    public Map<TaskId, StreamTask> tasks() {
        return Collections.unmodifiableMap(this.activeTasks);
    }

    private void shutdown() {
        log.info("{} Shutting down", this.logPrefix);
        shutdownTasksAndState(false);
        try {
            this.producer.close();
        } catch (Throwable th) {
            log.error("{} Failed to close producer: ", this.logPrefix, th);
        }
        try {
            this.consumer.close();
        } catch (Throwable th2) {
            log.error("{} Failed to close consumer: ", this.logPrefix, th2);
        }
        try {
            this.restoreConsumer.close();
        } catch (Throwable th3) {
            log.error("{} Failed to close restore consumer: ", this.logPrefix, th3);
        }
        removeStreamTasks();
        removeStandbyTasks();
        log.info("{} Stream thread shutdown complete", this.logPrefix);
    }

    public void shutdownTasksAndState(boolean z) {
        commitOffsets(z);
        closeAllTasks();
        flushAllState(z);
        this.producer.flush();
        closeAllStateManagers(z);
        try {
            this.restoreConsumer.assign(Collections.emptyList());
        } catch (Exception e) {
            log.error("{} Failed to un-assign change log partitions: ", this.logPrefix, e);
            if (z) {
                throw e;
            }
        }
    }

    private void performOnAllTasks(AbstractTaskAction abstractTaskAction, String str, boolean z) {
        ArrayList<AbstractTask> arrayList = new ArrayList(this.activeTasks.values());
        arrayList.addAll(this.standbyTasks.values());
        for (AbstractTask abstractTask : arrayList) {
            try {
                abstractTaskAction.apply(abstractTask);
            } catch (KafkaException e) {
                log.error("{} Failed while executing {} {} duet to {}: ", new Object[]{this.logPrefix, abstractTask.getClass().getSimpleName(), abstractTask.id(), str, e});
                if (z) {
                    throw e;
                }
            }
        }
    }

    private void closeAllStateManagers(boolean z) {
        performOnAllTasks(new AbstractTaskAction() { // from class: org.apache.kafka.streams.processor.internals.StreamThread.2
            AnonymousClass2() {
            }

            @Override // org.apache.kafka.streams.processor.internals.StreamThread.AbstractTaskAction
            public void apply(AbstractTask abstractTask) {
                StreamThread.log.info("{} Closing the state manager of task {}", StreamThread.this.logPrefix, abstractTask.id());
                abstractTask.closeStateManager();
            }
        }, "close state manager", z);
    }

    private void commitOffsets(boolean z) {
        performOnAllTasks(new AbstractTaskAction() { // from class: org.apache.kafka.streams.processor.internals.StreamThread.3
            AnonymousClass3() {
            }

            @Override // org.apache.kafka.streams.processor.internals.StreamThread.AbstractTaskAction
            public void apply(AbstractTask abstractTask) {
                StreamThread.log.info("{} Committing consumer offsets of task {}", StreamThread.this.logPrefix, abstractTask.id());
                abstractTask.commitOffsets();
            }
        }, "commit consumer offsets", z);
    }

    private void flushAllState(boolean z) {
        performOnAllTasks(new AbstractTaskAction() { // from class: org.apache.kafka.streams.processor.internals.StreamThread.4
            AnonymousClass4() {
            }

            @Override // org.apache.kafka.streams.processor.internals.StreamThread.AbstractTaskAction
            public void apply(AbstractTask abstractTask) {
                StreamThread.log.info("{} Flushing state stores of task {}", StreamThread.this.logPrefix, abstractTask.id());
                abstractTask.flushState();
            }
        }, "flush state", z);
    }

    private long computeLatency() {
        long j = this.timerStartedMs;
        this.timerStartedMs = this.time.milliseconds();
        return Math.max(this.timerStartedMs - j, 0L);
    }

    private void runLoop() {
        int i = 0;
        boolean z = true;
        boolean z2 = false;
        if (this.topicPattern != null) {
            this.consumer.subscribe(this.topicPattern, this.rebalanceListener);
        } else {
            this.consumer.subscribe(new ArrayList(this.sourceTopics), this.rebalanceListener);
        }
        while (stillRunning()) {
            this.timerStartedMs = this.time.milliseconds();
            if (z) {
                z = false;
                boolean z3 = i == 0;
                ConsumerRecords poll = this.consumer.poll(z3 ? this.pollTimeMs : 0L);
                if (this.rebalanceException != null) {
                    throw new StreamsException(this.logPrefix + " Failed to rebalance", this.rebalanceException);
                }
                if (poll.isEmpty()) {
                    z2 = false;
                } else {
                    for (TopicPartition topicPartition : poll.partitions()) {
                        this.activeTasksByPartition.get(topicPartition).addRecords(topicPartition, poll.records(topicPartition));
                    }
                    z2 = true;
                }
                if (z3) {
                    this.sensors.pollTimeSensor.record(computeLatency());
                }
            }
            if (i > 0 || z2) {
                i = 0;
                if (this.activeTasks.isEmpty()) {
                    z = true;
                } else {
                    for (StreamTask streamTask : this.activeTasks.values()) {
                        i += streamTask.process();
                        z = z || streamTask.requiresPoll();
                        this.sensors.processTimeSensor.record(computeLatency());
                        maybePunctuate(streamTask);
                        if (streamTask.commitNeeded()) {
                            commitOne(streamTask);
                        }
                    }
                }
            } else {
                z = true;
            }
            maybeCommit();
            maybeUpdateStandbyTasks();
            maybeClean();
        }
    }

    private void maybeUpdateStandbyTasks() {
        if (this.standbyTasks.isEmpty()) {
            return;
        }
        if (this.processStandbyRecords) {
            if (!this.standbyRecords.isEmpty()) {
                HashMap hashMap = new HashMap();
                for (TopicPartition topicPartition : this.standbyRecords.keySet()) {
                    List<ConsumerRecord<byte[], byte[]>> list = this.standbyRecords.get(topicPartition);
                    if (list != null) {
                        List<ConsumerRecord<byte[], byte[]>> update = this.standbyTasksByPartition.get(topicPartition).update(topicPartition, list);
                        if (update != null) {
                            hashMap.put(topicPartition, update);
                        } else {
                            this.restoreConsumer.resume(Collections.singleton(topicPartition));
                        }
                    }
                }
                this.standbyRecords = hashMap;
            }
            this.processStandbyRecords = false;
        }
        ConsumerRecords poll = this.restoreConsumer.poll(0L);
        if (poll.isEmpty()) {
            return;
        }
        for (TopicPartition topicPartition2 : poll.partitions()) {
            StandbyTask standbyTask = this.standbyTasksByPartition.get(topicPartition2);
            if (standbyTask == null) {
                throw new StreamsException(this.logPrefix + " Missing standby task for partition " + topicPartition2);
            }
            List<ConsumerRecord<byte[], byte[]>> update2 = standbyTask.update(topicPartition2, poll.records(topicPartition2));
            if (update2 != null) {
                this.restoreConsumer.pause(Collections.singleton(topicPartition2));
                this.standbyRecords.put(topicPartition2, update2);
            }
        }
    }

    private boolean stillRunning() {
        if (this.running.get()) {
            return true;
        }
        log.debug("{} Shutting down at user request", this.logPrefix);
        return false;
    }

    private void maybePunctuate(StreamTask streamTask) {
        try {
            if (streamTask.maybePunctuate()) {
                this.sensors.punctuateTimeSensor.record(computeLatency());
            }
        } catch (KafkaException e) {
            log.error("{} Failed to punctuate active task {}: ", new Object[]{this.logPrefix, streamTask.id(), e});
            throw e;
        }
    }

    protected void maybeCommit() {
        long milliseconds = this.time.milliseconds();
        if (this.commitTimeMs < 0 || this.lastCommitMs + this.commitTimeMs >= milliseconds) {
            return;
        }
        log.info("{} Committing all tasks because the commit interval {}ms has elapsed", this.logPrefix, Long.valueOf(this.commitTimeMs));
        commitAll();
        this.lastCommitMs = milliseconds;
        this.processStandbyRecords = true;
    }

    protected void maybeClean() {
        long milliseconds = this.time.milliseconds();
        if (milliseconds > this.lastCleanMs + this.cleanTimeMs) {
            this.stateDirectory.cleanRemovedTasks();
            this.lastCleanMs = milliseconds;
        }
    }

    private void commitAll() {
        Iterator<StreamTask> it = this.activeTasks.values().iterator();
        while (it.hasNext()) {
            commitOne(it.next());
        }
        Iterator<StandbyTask> it2 = this.standbyTasks.values().iterator();
        while (it2.hasNext()) {
            commitOne(it2.next());
        }
    }

    private void commitOne(AbstractTask abstractTask) {
        log.info("{} Committing task {}", this.logPrefix, abstractTask.id());
        try {
            abstractTask.commit();
        } catch (KafkaException e) {
            log.error("{} Failed to commit {} {} state: ", new Object[]{this.logPrefix, abstractTask.getClass().getSimpleName(), abstractTask.id(), e});
            throw e;
        } catch (CommitFailedException e2) {
            log.warn("{} Failed to commit {} {} state: ", new Object[]{this.logPrefix, abstractTask.getClass().getSimpleName(), abstractTask.id(), e2});
        }
        this.sensors.commitTimeSensor.record(computeLatency());
    }

    public Set<TaskId> prevTasks() {
        return Collections.unmodifiableSet(this.prevTasks);
    }

    public Set<TaskId> cachedTasks() {
        HashSet hashSet = new HashSet();
        File[] listTaskDirectories = this.stateDirectory.listTaskDirectories();
        if (listTaskDirectories != null) {
            for (File file : listTaskDirectories) {
                try {
                    TaskId parse = TaskId.parse(file.getName());
                    if (new File(file, ProcessorStateManager.CHECKPOINT_FILE_NAME).exists()) {
                        hashSet.add(parse);
                    }
                } catch (TaskIdFormatException e) {
                }
            }
        }
        return hashSet;
    }

    protected StreamTask createStreamTask(TaskId taskId, Collection<TopicPartition> collection) {
        log.info("{} Creating active task {} with assigned partitions [{}]", new Object[]{this.logPrefix, taskId, collection});
        this.sensors.taskCreationSensor.record();
        return new StreamTask(taskId, this.applicationId, collection, this.builder.build(Integer.valueOf(taskId.topicGroupId)), this.consumer, this.producer, this.restoreConsumer, this.config, this.sensors, this.stateDirectory, this.cache);
    }

    public void addStreamTasks(Collection<TopicPartition> collection) {
        if (this.partitionAssignor == null) {
            throw new IllegalStateException(this.logPrefix + " Partition assignor has not been initialized while adding stream tasks: this should not happen.");
        }
        HashMap hashMap = new HashMap();
        for (TopicPartition topicPartition : collection) {
            for (TaskId taskId : this.partitionAssignor.tasksForPartition(topicPartition)) {
                Set set = (Set) hashMap.get(taskId);
                if (set == null) {
                    set = new HashSet();
                    hashMap.put(taskId, set);
                }
                set.add(topicPartition);
            }
        }
        for (Map.Entry entry : hashMap.entrySet()) {
            TaskId taskId2 = (TaskId) entry.getKey();
            Set set2 = (Set) entry.getValue();
            try {
                StreamTask createStreamTask = createStreamTask(taskId2, set2);
                this.activeTasks.put(taskId2, createStreamTask);
                Iterator it = set2.iterator();
                while (it.hasNext()) {
                    this.activeTasksByPartition.put((TopicPartition) it.next(), createStreamTask);
                }
            } catch (StreamsException e) {
                log.error("{} Failed to create an active task {}: ", new Object[]{this.logPrefix, taskId2, e});
                throw e;
            }
        }
    }

    private StandbyTask createStandbyTask(TaskId taskId, Collection<TopicPartition> collection) {
        log.info("{} Creating new standby task {} with assigned partitions [{}]", new Object[]{this.logPrefix, taskId, collection});
        this.sensors.taskCreationSensor.record();
        ProcessorTopology build = this.builder.build(Integer.valueOf(taskId.topicGroupId));
        if (build.stateStores().isEmpty()) {
            return null;
        }
        return new StandbyTask(taskId, this.applicationId, collection, build, this.consumer, this.restoreConsumer, this.config, this.sensors, this.stateDirectory);
    }

    public void addStandbyTasks() {
        if (this.partitionAssignor == null) {
            throw new IllegalStateException(this.logPrefix + " Partition assignor has not been initialized while adding standby tasks: this should not happen.");
        }
        HashMap hashMap = new HashMap();
        for (Map.Entry<TaskId, Set<TopicPartition>> entry : this.partitionAssignor.standbyTasks().entrySet()) {
            TaskId key = entry.getKey();
            Set<TopicPartition> value = entry.getValue();
            StandbyTask createStandbyTask = createStandbyTask(key, value);
            if (createStandbyTask != null) {
                this.standbyTasks.put(key, createStandbyTask);
                Iterator<TopicPartition> it = value.iterator();
                while (it.hasNext()) {
                    this.standbyTasksByPartition.put(it.next(), createStandbyTask);
                }
                Iterator<TopicPartition> it2 = createStandbyTask.checkpointedOffsets().keySet().iterator();
                while (it2.hasNext()) {
                    this.standbyTasksByPartition.put(it2.next(), createStandbyTask);
                }
                hashMap.putAll(createStandbyTask.checkpointedOffsets());
            }
        }
        this.restoreConsumer.assign(new ArrayList(hashMap.keySet()));
        for (Map.Entry entry2 : hashMap.entrySet()) {
            TopicPartition topicPartition = (TopicPartition) entry2.getKey();
            long longValue = ((Long) entry2.getValue()).longValue();
            if (longValue >= 0) {
                this.restoreConsumer.seek(topicPartition, longValue);
            } else {
                this.restoreConsumer.seekToBeginning(Collections.singleton(topicPartition));
            }
        }
    }

    public void removeStreamTasks() {
        log.info("{} Removing all active tasks [{}]", this.logPrefix, this.activeTasks.keySet());
        try {
            this.prevTasks.clear();
            this.prevTasks.addAll(this.activeTasks.keySet());
            this.activeTasks.clear();
            this.activeTasksByPartition.clear();
        } catch (Exception e) {
            log.error("{} Failed to remove stream tasks: ", this.logPrefix, e);
        }
    }

    public void removeStandbyTasks() {
        log.info("{} Removing all standby tasks [{}]", this.logPrefix, this.standbyTasks.keySet());
        this.standbyTasks.clear();
        this.standbyTasksByPartition.clear();
        this.standbyRecords.clear();
    }

    private void closeAllTasks() {
        performOnAllTasks(new AbstractTaskAction() { // from class: org.apache.kafka.streams.processor.internals.StreamThread.5
            AnonymousClass5() {
            }

            @Override // org.apache.kafka.streams.processor.internals.StreamThread.AbstractTaskAction
            public void apply(AbstractTask abstractTask) {
                StreamThread.log.info("{} Closing a task {}", StreamThread.this.logPrefix, abstractTask.id());
                abstractTask.close();
                StreamThread.this.sensors.taskDestructionSensor.record();
            }
        }, "close", false);
    }

    @Override // java.lang.Thread
    public String toString() {
        StringBuilder sb = new StringBuilder("StreamsThread appId:" + this.applicationId + "\n");
        sb.append("\tStreamsThread clientId:" + this.clientId + "\n");
        sb.append("\tStreamsThread threadId:" + getName() + "\n");
        if (this.activeTasks != null) {
            sb.append("\tActive tasks:\n");
            Iterator<TaskId> it = this.activeTasks.keySet().iterator();
            while (it.hasNext()) {
                sb.append("\t\t" + this.activeTasks.get(it.next()).toString());
            }
        }
        if (this.standbyTasks != null) {
            sb.append("\tStandby tasks:\n");
            Iterator<TaskId> it2 = this.standbyTasks.keySet().iterator();
            while (it2.hasNext()) {
                sb.append("\t\t" + this.standbyTasks.get(it2.next()).toString());
            }
            sb.append("\n");
        }
        return sb.toString();
    }

    /*  JADX ERROR: Failed to decode insn: 0x0002: MOVE_MULTI, method: org.apache.kafka.streams.processor.internals.StreamThread.access$302(org.apache.kafka.streams.processor.internals.StreamThread, long):long
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[6]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    static /* synthetic */ long access$302(org.apache.kafka.streams.processor.internals.StreamThread r6, long r7) {
        /*
            r0 = r6
            r1 = r7
            // decode failed: arraycopy: source index -1 out of bounds for object array[6]
            r0.lastCleanMs = r1
            return r-1
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.kafka.streams.processor.internals.StreamThread.access$302(org.apache.kafka.streams.processor.internals.StreamThread, long):long");
    }

    static {
    }
}
