package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.channels.FileLock;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Count;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskIdFormatException;
import org.apache.kafka.streams.processor.PartitionGrouper;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/*  JADX ERROR: NullPointerException in pass: ClassModifier
    java.lang.NullPointerException: Cannot invoke "java.util.List.forEach(java.util.function.Consumer)" because "blocks" is null
    	at jadx.core.utils.BlockUtils.collectAllInsns(BlockUtils.java:1017)
    	at jadx.core.dex.visitors.ClassModifier.removeBridgeMethod(ClassModifier.java:239)
    	at jadx.core.dex.visitors.ClassModifier.removeSyntheticMethods(ClassModifier.java:154)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.ClassModifier.visit(ClassModifier.java:64)
    */
/* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamThread.class */
public class StreamThread extends Thread {
    private static final Logger log = LoggerFactory.getLogger(StreamThread.class);
    private static final AtomicInteger STREAM_THREAD_ID_SEQUENCE = new AtomicInteger(1);
    public final PartitionGrouper partitionGrouper;
    public final String jobId;
    public final String clientId;
    public final UUID processId;
    protected final StreamsConfig config;
    protected final TopologyBuilder builder;
    protected final Set<String> sourceTopics;
    protected final Producer<byte[], byte[]> producer;
    protected final Consumer<byte[], byte[]> consumer;
    protected final Consumer<byte[], byte[]> restoreConsumer;
    private final AtomicBoolean running;
    private final Map<TaskId, StreamTask> activeTasks;
    private final Map<TaskId, StandbyTask> standbyTasks;
    private final Map<TopicPartition, StreamTask> activeTasksByPartition;
    private final Map<TopicPartition, StandbyTask> standbyTasksByPartition;
    private final Set<TaskId> prevTasks;
    private final Time time;
    private final File stateDir;
    private final long pollTimeMs;
    private final long cleanTimeMs;
    private final long commitTimeMs;
    private final StreamsMetricsImpl sensors;
    private StreamPartitionAssignor partitionAssignor;
    private long lastClean;
    private long lastCommit;
    private long recordsProcessed;
    private Throwable rebalanceException;
    private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords;
    private boolean processStandbyRecords;
    final ConsumerRebalanceListener rebalanceListener;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.kafka.streams.processor.internals.StreamThread$1 */
    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamThread$1.class */
    public class AnonymousClass1 implements ConsumerRebalanceListener {
        AnonymousClass1() {
        }

        public void onPartitionsAssigned(Collection<TopicPartition> collection) {
            try {
                StreamThread.this.addStreamTasks(collection);
                StreamThread.this.addStandbyTasks();
                StreamThread.access$202(StreamThread.this, StreamThread.this.time.milliseconds());
            } catch (Throwable th) {
                StreamThread.this.rebalanceException = th;
                throw th;
            }
        }

        public void onPartitionsRevoked(Collection<TopicPartition> collection) {
            try {
                try {
                    StreamThread.this.commitAll();
                    StreamThread.access$202(StreamThread.this, Long.MAX_VALUE);
                    StreamThread.this.removeStreamTasks();
                    StreamThread.this.removeStandbyTasks();
                } finally {
                }
            } catch (Throwable th) {
                StreamThread.this.removeStreamTasks();
                StreamThread.this.removeStandbyTasks();
                throw th;
            }
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamThread$StreamsMetricsImpl.class */
    public class StreamsMetricsImpl implements StreamsMetrics {
        final Metrics metrics;
        final String metricGrpName = "stream-metrics";
        final Map<String, String> metricTags = new LinkedHashMap();
        final Sensor commitTimeSensor;
        final Sensor pollTimeSensor;
        final Sensor processTimeSensor;
        final Sensor punctuateTimeSensor;
        final Sensor taskCreationSensor;
        final Sensor taskDestructionSensor;

        public StreamsMetricsImpl(Metrics metrics) {
            this.metrics = metrics;
            this.metricTags.put("client-id", StreamThread.this.clientId + "-" + StreamThread.this.getName());
            this.commitTimeSensor = metrics.sensor("commit-time");
            this.commitTimeSensor.add(metrics.metricName("commit-time-avg", this.metricGrpName, "The average commit time in ms", this.metricTags), new Avg());
            this.commitTimeSensor.add(metrics.metricName("commit-time-max", this.metricGrpName, "The maximum commit time in ms", this.metricTags), new Max());
            this.commitTimeSensor.add(metrics.metricName("commit-calls-rate", this.metricGrpName, "The average per-second number of commit calls", this.metricTags), new Rate(new Count()));
            this.pollTimeSensor = metrics.sensor("poll-time");
            this.pollTimeSensor.add(metrics.metricName("poll-time-avg", this.metricGrpName, "The average poll time in ms", this.metricTags), new Avg());
            this.pollTimeSensor.add(metrics.metricName("poll-time-max", this.metricGrpName, "The maximum poll time in ms", this.metricTags), new Max());
            this.pollTimeSensor.add(metrics.metricName("poll-calls-rate", this.metricGrpName, "The average per-second number of record-poll calls", this.metricTags), new Rate(new Count()));
            this.processTimeSensor = metrics.sensor("process-time");
            this.processTimeSensor.add(metrics.metricName("process-time-avg-ms", this.metricGrpName, "The average process time in ms", this.metricTags), new Avg());
            this.processTimeSensor.add(metrics.metricName("process-time-max-ms", this.metricGrpName, "The maximum process time in ms", this.metricTags), new Max());
            this.processTimeSensor.add(metrics.metricName("process-calls-rate", this.metricGrpName, "The average per-second number of process calls", this.metricTags), new Rate(new Count()));
            this.punctuateTimeSensor = metrics.sensor("punctuate-time");
            this.punctuateTimeSensor.add(metrics.metricName("punctuate-time-avg", this.metricGrpName, "The average punctuate time in ms", this.metricTags), new Avg());
            this.punctuateTimeSensor.add(metrics.metricName("punctuate-time-max", this.metricGrpName, "The maximum punctuate time in ms", this.metricTags), new Max());
            this.punctuateTimeSensor.add(metrics.metricName("punctuate-calls-rate", this.metricGrpName, "The average per-second number of punctuate calls", this.metricTags), new Rate(new Count()));
            this.taskCreationSensor = metrics.sensor("task-creation");
            this.taskCreationSensor.add(metrics.metricName("task-creation-rate", this.metricGrpName, "The average per-second number of newly created tasks", this.metricTags), new Rate(new Count()));
            this.taskDestructionSensor = metrics.sensor("task-destruction");
            this.taskDestructionSensor.add(metrics.metricName("task-destruction-rate", this.metricGrpName, "The average per-second number of destructed tasks", this.metricTags), new Rate(new Count()));
        }

        @Override // org.apache.kafka.streams.StreamsMetrics
        public void recordLatency(Sensor sensor, long j, long j2) {
            sensor.record((j2 - j) / 1000000, j2);
        }

        @Override // org.apache.kafka.streams.StreamsMetrics
        public Sensor addLatencySensor(String str, String str2, String str3, String... strArr) {
            HashMap hashMap = new HashMap(this.metricTags);
            if (strArr.length % 2 != 0) {
                throw new IllegalArgumentException("Tags needs to be specified in key-value pairs");
            }
            for (int i = 0; i < strArr.length; i += 2) {
                hashMap.put(strArr[i], strArr[i + 1]);
            }
            String str4 = "stream-" + str + "-metrics";
            Sensor sensor = this.metrics.sensor(str + "-" + str3);
            addLatencyMetrics(str4, sensor, "all", str3, this.metricTags);
            Sensor sensor2 = this.metrics.sensor(str + "-" + str2 + "-" + str3, new Sensor[]{sensor});
            addLatencyMetrics(str4, sensor2, str2, str3, hashMap);
            return sensor2;
        }

        private void addLatencyMetrics(String str, Sensor sensor, String str2, String str3, Map<String, String> map) {
            maybeAddMetric(sensor, this.metrics.metricName(str2 + "-" + str3 + "-avg-latency-ms", str, "The average latency in milliseconds of " + str2 + " " + str3 + " operation.", map), new Avg());
            maybeAddMetric(sensor, this.metrics.metricName(str2 + "-" + str3 + "-max-latency-ms", str, "The max latency in milliseconds of " + str2 + " " + str3 + " operation.", map), new Max());
            maybeAddMetric(sensor, this.metrics.metricName(str2 + "-" + str3 + "-qps", str, "The average number of occurrence of " + str2 + " " + str3 + " operation per second.", map), new Rate(new Count()));
        }

        private void maybeAddMetric(Sensor sensor, MetricName metricName, MeasurableStat measurableStat) {
            if (this.metrics.metrics().containsKey(metricName)) {
                return;
            }
            sensor.add(metricName, measurableStat);
        }
    }

    public static File makeStateDir(String str, String str2) {
        File file = new File(str2);
        if (!file.exists()) {
            file.mkdir();
        }
        File file2 = new File(file, str);
        if (!file2.exists()) {
            file2.mkdir();
        }
        return file2;
    }

    public StreamThread(TopologyBuilder topologyBuilder, StreamsConfig streamsConfig, String str, String str2, UUID uuid, Metrics metrics, Time time) {
        this(topologyBuilder, streamsConfig, null, null, null, str, str2, uuid, metrics, time);
    }

    StreamThread(TopologyBuilder topologyBuilder, StreamsConfig streamsConfig, Producer<byte[], byte[]> producer, Consumer<byte[], byte[]> consumer, Consumer<byte[], byte[]> consumer2, String str, String str2, UUID uuid, Metrics metrics, Time time) {
        super("StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement());
        this.partitionAssignor = null;
        this.rebalanceException = null;
        this.processStandbyRecords = false;
        this.rebalanceListener = new ConsumerRebalanceListener() { // from class: org.apache.kafka.streams.processor.internals.StreamThread.1
            AnonymousClass1() {
            }

            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                try {
                    StreamThread.this.addStreamTasks(collection);
                    StreamThread.this.addStandbyTasks();
                    StreamThread.access$202(StreamThread.this, StreamThread.this.time.milliseconds());
                } catch (Throwable th) {
                    StreamThread.this.rebalanceException = th;
                    throw th;
                }
            }

            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                try {
                    try {
                        StreamThread.this.commitAll();
                        StreamThread.access$202(StreamThread.this, Long.MAX_VALUE);
                        StreamThread.this.removeStreamTasks();
                        StreamThread.this.removeStandbyTasks();
                    } finally {
                    }
                } catch (Throwable th) {
                    StreamThread.this.removeStreamTasks();
                    StreamThread.this.removeStandbyTasks();
                    throw th;
                }
            }
        };
        this.jobId = str;
        this.config = streamsConfig;
        this.builder = topologyBuilder;
        this.sourceTopics = topologyBuilder.sourceTopics();
        this.clientId = str2;
        this.processId = uuid;
        this.partitionGrouper = (PartitionGrouper) streamsConfig.getConfiguredInstance(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, PartitionGrouper.class);
        this.producer = producer != null ? producer : createProducer();
        this.consumer = consumer != null ? consumer : createConsumer();
        this.restoreConsumer = consumer2 != null ? consumer2 : createRestoreConsumer();
        this.activeTasks = new HashMap();
        this.standbyTasks = new HashMap();
        this.activeTasksByPartition = new HashMap();
        this.standbyTasksByPartition = new HashMap();
        this.prevTasks = new HashSet();
        this.standbyRecords = new HashMap();
        this.stateDir = makeStateDir(this.jobId, this.config.getString(StreamsConfig.STATE_DIR_CONFIG));
        this.pollTimeMs = streamsConfig.getLong(StreamsConfig.POLL_MS_CONFIG).longValue();
        this.commitTimeMs = streamsConfig.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG).longValue();
        this.cleanTimeMs = streamsConfig.getLong(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG).longValue();
        this.lastClean = Long.MAX_VALUE;
        this.lastCommit = time.milliseconds();
        this.recordsProcessed = 0L;
        this.time = time;
        this.sensors = new StreamsMetricsImpl(metrics);
        this.running = new AtomicBoolean(true);
    }

    public void partitionAssignor(StreamPartitionAssignor streamPartitionAssignor) {
        this.partitionAssignor = streamPartitionAssignor;
    }

    private Producer<byte[], byte[]> createProducer() {
        String name = getName();
        log.info("Creating producer client for stream thread [" + name + "]");
        return new KafkaProducer(this.config.getProducerConfigs(this.clientId + "-" + name), new ByteArraySerializer(), new ByteArraySerializer());
    }

    private Consumer<byte[], byte[]> createConsumer() {
        String name = getName();
        log.info("Creating consumer client for stream thread [" + name + "]");
        return new KafkaConsumer(this.config.getConsumerConfigs(this, this.jobId, this.clientId + "-" + name), new ByteArrayDeserializer(), new ByteArrayDeserializer());
    }

    private Consumer<byte[], byte[]> createRestoreConsumer() {
        String name = getName();
        log.info("Creating restore consumer client for stream thread [" + name + "]");
        return new KafkaConsumer(this.config.getRestoreConsumerConfigs(this.clientId + "-" + name), new ByteArrayDeserializer(), new ByteArrayDeserializer());
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        log.info("Starting stream thread [" + getName() + "]");
        try {
            try {
                try {
                    runLoop();
                    shutdown();
                } catch (KafkaException e) {
                    throw e;
                }
            } catch (Exception e2) {
                log.error("Streams application error during processing in thread [" + getName() + "]: ", e2);
                throw e2;
            }
        } catch (Throwable th) {
            shutdown();
            throw th;
        }
    }

    public void close() {
        this.running.set(false);
    }

    public Map<TaskId, StreamTask> tasks() {
        return Collections.unmodifiableMap(this.activeTasks);
    }

    private void shutdown() {
        log.info("Shutting down stream thread [" + getName() + "]");
        try {
            commitAll();
        } catch (Throwable th) {
        }
        removeStandbyTasks();
        try {
            this.producer.close();
        } catch (Throwable th2) {
            log.error("Failed to close producer in thread [" + getName() + "]: ", th2);
        }
        try {
            this.consumer.close();
        } catch (Throwable th3) {
            log.error("Failed to close consumer in thread [" + getName() + "]: ", th3);
        }
        try {
            this.restoreConsumer.close();
        } catch (Throwable th4) {
            log.error("Failed to close restore consumer in thread [" + getName() + "]: ", th4);
        }
        removeStreamTasks();
        log.info("Stream thread shutdown complete [" + getName() + "]");
    }

    private void runLoop() {
        int i = 0;
        long j = 0;
        boolean z = true;
        this.consumer.subscribe(new ArrayList(this.sourceTopics), this.rebalanceListener);
        while (stillRunning()) {
            if (z) {
                z = false;
                long milliseconds = this.time.milliseconds();
                ConsumerRecords poll = this.consumer.poll(i == 0 ? this.pollTimeMs : 0L);
                j = this.time.milliseconds();
                if (this.rebalanceException != null) {
                    throw new StreamsException("Failed to rebalance", this.rebalanceException);
                }
                if (!poll.isEmpty()) {
                    for (TopicPartition topicPartition : poll.partitions()) {
                        this.activeTasksByPartition.get(topicPartition).addRecords(topicPartition, poll.records(topicPartition));
                    }
                }
                this.sensors.pollTimeSensor.record(this.time.milliseconds() - milliseconds);
            }
            i = 0;
            if (this.activeTasks.isEmpty()) {
                z = true;
            } else {
                for (StreamTask streamTask : this.activeTasks.values()) {
                    long milliseconds2 = this.time.milliseconds();
                    i += streamTask.process();
                    z = z || streamTask.requiresPoll();
                    this.sensors.processTimeSensor.record(this.time.milliseconds() - milliseconds2);
                }
                maybePunctuate();
                if (j + this.pollTimeMs < this.time.milliseconds()) {
                    z = true;
                }
            }
            maybeCommit();
            maybeUpdateStandbyTasks();
            maybeClean();
        }
    }

    private void maybeUpdateStandbyTasks() {
        if (this.standbyTasks.isEmpty()) {
            return;
        }
        if (this.processStandbyRecords) {
            if (!this.standbyRecords.isEmpty()) {
                HashMap hashMap = new HashMap();
                for (TopicPartition topicPartition : this.standbyRecords.keySet()) {
                    List<ConsumerRecord<byte[], byte[]>> list = this.standbyRecords.get(topicPartition);
                    if (list != null) {
                        List<ConsumerRecord<byte[], byte[]>> update = this.standbyTasksByPartition.get(topicPartition).update(topicPartition, list);
                        if (update != null) {
                            hashMap.put(topicPartition, update);
                        } else {
                            this.restoreConsumer.resume(new TopicPartition[]{topicPartition});
                        }
                    }
                }
                this.standbyRecords = hashMap;
            }
            this.processStandbyRecords = false;
        }
        ConsumerRecords poll = this.restoreConsumer.poll(0L);
        if (poll.isEmpty()) {
            return;
        }
        for (TopicPartition topicPartition2 : poll.partitions()) {
            StandbyTask standbyTask = this.standbyTasksByPartition.get(topicPartition2);
            if (standbyTask == null) {
                log.error("missing standby task for partition {}", topicPartition2);
                throw new StreamsException("missing standby task for partition " + topicPartition2);
            }
            List<ConsumerRecord<byte[], byte[]>> update2 = standbyTask.update(topicPartition2, poll.records(topicPartition2));
            if (update2 != null) {
                this.restoreConsumer.pause(new TopicPartition[]{topicPartition2});
                this.standbyRecords.put(topicPartition2, update2);
            }
        }
    }

    private boolean stillRunning() {
        if (this.running.get()) {
            return true;
        }
        log.debug("Shutting down at user request.");
        return false;
    }

    private void maybePunctuate() {
        for (StreamTask streamTask : this.activeTasks.values()) {
            try {
                if (streamTask.maybePunctuate(this.time.milliseconds())) {
                    this.sensors.punctuateTimeSensor.record(this.time.milliseconds() - r0);
                }
            } catch (KafkaException e) {
                log.error("Failed to punctuate active task #" + streamTask.id() + " in thread [" + getName() + "]: ", e);
                throw e;
            }
        }
    }

    protected void maybeCommit() {
        long milliseconds = this.time.milliseconds();
        if (this.commitTimeMs >= 0 && this.lastCommit + this.commitTimeMs < milliseconds) {
            log.trace("Committing processor instances because the commit interval has elapsed.");
            commitAll();
            this.lastCommit = milliseconds;
            this.processStandbyRecords = true;
            return;
        }
        for (StreamTask streamTask : this.activeTasks.values()) {
            try {
                if (streamTask.commitNeeded()) {
                    commitOne(streamTask, this.time.milliseconds());
                }
            } catch (KafkaException e) {
                log.error("Failed to commit active task #" + streamTask.id() + " in thread [" + getName() + "]: ", e);
                throw e;
            }
        }
    }

    public void commitAll() {
        Iterator<StreamTask> it = this.activeTasks.values().iterator();
        while (it.hasNext()) {
            commitOne(it.next(), this.time.milliseconds());
        }
        Iterator<StandbyTask> it2 = this.standbyTasks.values().iterator();
        while (it2.hasNext()) {
            commitOne(it2.next(), this.time.milliseconds());
        }
    }

    private void commitOne(AbstractTask abstractTask, long j) {
        try {
            abstractTask.commit();
        } catch (CommitFailedException e) {
            log.warn("Failed to commit " + abstractTask.getClass().getSimpleName() + " #" + abstractTask.id() + " in thread [" + getName() + "]: ", e);
        } catch (KafkaException e2) {
            log.error("Failed to commit " + abstractTask.getClass().getSimpleName() + " #" + abstractTask.id() + " in thread [" + getName() + "]: ", e2);
            throw e2;
        }
        this.sensors.commitTimeSensor.record(this.time.milliseconds() - j);
    }

    protected void maybeClean() {
        long milliseconds = this.time.milliseconds();
        if (milliseconds > this.lastClean + this.cleanTimeMs) {
            File[] listFiles = this.stateDir.listFiles();
            if (listFiles != null) {
                for (File file : listFiles) {
                    try {
                        String name = file.getName();
                        TaskId parse = TaskId.parse(name.substring(name.lastIndexOf("-") + 1));
                        if (file.exists()) {
                            FileLock fileLock = null;
                            try {
                                try {
                                    fileLock = ProcessorStateManager.lockStateDirectory(file);
                                    if (fileLock != null) {
                                        log.info("Deleting obsolete state directory {} for task {} after delayed {} ms.", new Object[]{file.getAbsolutePath(), parse, Long.valueOf(this.cleanTimeMs)});
                                        Utils.delete(file);
                                    }
                                    if (fileLock != null) {
                                        try {
                                            fileLock.release();
                                        } catch (IOException e) {
                                            log.error("Failed to release the state directory lock");
                                        }
                                    }
                                } catch (Throwable th) {
                                    if (fileLock != null) {
                                        try {
                                            fileLock.release();
                                        } catch (IOException e2) {
                                            log.error("Failed to release the state directory lock");
                                        }
                                    }
                                    throw th;
                                    break;
                                }
                            } catch (FileNotFoundException e3) {
                                if (fileLock != null) {
                                    try {
                                        fileLock.release();
                                    } catch (IOException e4) {
                                        log.error("Failed to release the state directory lock");
                                    }
                                }
                            } catch (IOException e5) {
                                log.error("Failed to lock the state directory due to an unexpected exception", e5);
                                if (fileLock != null) {
                                    try {
                                        fileLock.release();
                                    } catch (IOException e6) {
                                        log.error("Failed to release the state directory lock");
                                    }
                                }
                            }
                        }
                    } catch (TaskIdFormatException e7) {
                    }
                }
            }
            this.lastClean = milliseconds;
        }
    }

    public Set<TaskId> prevTasks() {
        return Collections.unmodifiableSet(this.prevTasks);
    }

    public Set<TaskId> cachedTasks() {
        HashSet hashSet = new HashSet();
        File[] listFiles = this.stateDir.listFiles();
        if (listFiles != null) {
            for (File file : listFiles) {
                try {
                    TaskId parse = TaskId.parse(file.getName());
                    if (new File(file, ProcessorStateManager.CHECKPOINT_FILE_NAME).exists()) {
                        hashSet.add(parse);
                    }
                } catch (TaskIdFormatException e) {
                }
            }
        }
        return hashSet;
    }

    protected StreamTask createStreamTask(TaskId taskId, Collection<TopicPartition> collection) {
        this.sensors.taskCreationSensor.record();
        return new StreamTask(taskId, this.jobId, collection, this.builder.build(Integer.valueOf(taskId.topicGroupId)), this.consumer, this.producer, this.restoreConsumer, this.config, this.sensors);
    }

    public void addStreamTasks(Collection<TopicPartition> collection) {
        if (this.partitionAssignor == null) {
            throw new IllegalStateException("Partition assignor has not been initialized while adding stream tasks: this should not happen.");
        }
        HashMap hashMap = new HashMap();
        for (TopicPartition topicPartition : collection) {
            for (TaskId taskId : this.partitionAssignor.tasksForPartition(topicPartition)) {
                Set set = (Set) hashMap.get(taskId);
                if (set == null) {
                    set = new HashSet();
                    hashMap.put(taskId, set);
                }
                set.add(topicPartition);
            }
        }
        for (Map.Entry entry : hashMap.entrySet()) {
            TaskId taskId2 = (TaskId) entry.getKey();
            Set set2 = (Set) entry.getValue();
            try {
                StreamTask createStreamTask = createStreamTask(taskId2, set2);
                this.activeTasks.put(taskId2, createStreamTask);
                Iterator it = set2.iterator();
                while (it.hasNext()) {
                    this.activeTasksByPartition.put((TopicPartition) it.next(), createStreamTask);
                }
            } catch (StreamsException e) {
                log.error("Failed to create an active task #" + taskId2 + " in thread [" + getName() + "]: ", e);
                throw e;
            }
        }
    }

    public void removeStreamTasks() {
        try {
            Iterator<StreamTask> it = this.activeTasks.values().iterator();
            while (it.hasNext()) {
                closeOne(it.next());
            }
            this.prevTasks.clear();
            this.prevTasks.addAll(this.activeTasks.keySet());
            this.activeTasks.clear();
            this.activeTasksByPartition.clear();
        } catch (Exception e) {
            log.error("Failed to remove stream tasks in thread [" + getName() + "]: ", e);
        }
    }

    private void closeOne(AbstractTask abstractTask) {
        log.info("Removing a task {}", abstractTask.id());
        try {
            abstractTask.close();
        } catch (StreamsException e) {
            log.error("Failed to close a " + abstractTask.getClass().getSimpleName() + " #" + abstractTask.id() + " in thread [" + getName() + "]: ", e);
        }
        this.sensors.taskDestructionSensor.record();
    }

    protected StandbyTask createStandbyTask(TaskId taskId, Collection<TopicPartition> collection) {
        this.sensors.taskCreationSensor.record();
        ProcessorTopology build = this.builder.build(Integer.valueOf(taskId.topicGroupId));
        if (build.stateStoreSuppliers().isEmpty()) {
            return null;
        }
        return new StandbyTask(taskId, this.jobId, collection, build, this.consumer, this.restoreConsumer, this.config, this.sensors);
    }

    public void addStandbyTasks() {
        if (this.partitionAssignor == null) {
            throw new IllegalStateException("Partition assignor has not been initialized while adding standby tasks: this should not happen.");
        }
        HashMap hashMap = new HashMap();
        for (Map.Entry<TaskId, Set<TopicPartition>> entry : this.partitionAssignor.standbyTasks().entrySet()) {
            TaskId key = entry.getKey();
            Set<TopicPartition> value = entry.getValue();
            StandbyTask createStandbyTask = createStandbyTask(key, value);
            if (createStandbyTask != null) {
                this.standbyTasks.put(key, createStandbyTask);
                Iterator<TopicPartition> it = value.iterator();
                while (it.hasNext()) {
                    this.standbyTasksByPartition.put(it.next(), createStandbyTask);
                }
                Iterator<TopicPartition> it2 = createStandbyTask.checkpointedOffsets().keySet().iterator();
                while (it2.hasNext()) {
                    this.standbyTasksByPartition.put(it2.next(), createStandbyTask);
                }
                hashMap.putAll(createStandbyTask.checkpointedOffsets());
            }
        }
        this.restoreConsumer.assign(new ArrayList(hashMap.keySet()));
        for (Map.Entry entry2 : hashMap.entrySet()) {
            TopicPartition topicPartition = (TopicPartition) entry2.getKey();
            long longValue = ((Long) entry2.getValue()).longValue();
            if (longValue >= 0) {
                this.restoreConsumer.seek(topicPartition, longValue);
            } else {
                this.restoreConsumer.seekToBeginning(new TopicPartition[]{topicPartition});
            }
        }
    }

    public void removeStandbyTasks() {
        try {
            Iterator<StandbyTask> it = this.standbyTasks.values().iterator();
            while (it.hasNext()) {
                closeOne(it.next());
            }
            this.standbyTasks.clear();
            this.standbyTasksByPartition.clear();
            this.standbyRecords.clear();
            this.restoreConsumer.assign(Collections.emptyList());
        } catch (Exception e) {
            log.error("Failed to remove standby tasks in thread [" + getName() + "]: ", e);
        }
    }

    /*  JADX ERROR: Failed to decode insn: 0x0002: MOVE_MULTI, method: org.apache.kafka.streams.processor.internals.StreamThread.access$202(org.apache.kafka.streams.processor.internals.StreamThread, long):long
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[6]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    static /* synthetic */ long access$202(org.apache.kafka.streams.processor.internals.StreamThread r6, long r7) {
        /*
            r0 = r6
            r1 = r7
            // decode failed: arraycopy: source index -1 out of bounds for object array[6]
            r0.lastClean = r1
            return r-1
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.kafka.streams.processor.internals.StreamThread.access$202(org.apache.kafka.streams.processor.internals.StreamThread, long):long");
    }

    static {
    }
}
