package org.apache.kafka.streams.processor.internals;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TaskMetadata;
import org.apache.kafka.streams.processor.ThreadMetadata;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamThread.class */
public class StreamThread extends Thread {
    private final Time time;
    private final Logger log;
    private final String logPrefix;
    private final Object stateLock;
    private final Duration pollTime;
    private final long commitTimeMs;
    private final int maxPollTimeMs;
    private final String originalReset;
    private final TaskManager taskManager;
    private final AtomicLong nextProbingRebalanceMs;
    private final StreamsMetricsImpl streamsMetrics;
    private final Sensor commitSensor;
    private final Sensor pollSensor;
    private final Sensor pollRecordsSensor;
    private final Sensor punctuateSensor;
    private final Sensor processRecordsSensor;
    private final Sensor processLatencySensor;
    private final Sensor processRateSensor;
    private final Sensor pollRatioSensor;
    private final Sensor processRatioSensor;
    private final Sensor punctuateRatioSensor;
    private final Sensor commitRatioSensor;
    private long now;
    private long lastPollMs;
    private long lastCommitMs;
    private int numIterations;
    private volatile State state;
    private volatile ThreadMetadata threadMetadata;
    private StateListener stateListener;
    private final ChangelogReader changelogReader;
    private final ConsumerRebalanceListener rebalanceListener;
    private final Consumer<byte[], byte[]> mainConsumer;
    private final Consumer<byte[], byte[]> restoreConsumer;
    private final Admin adminClient;
    private final InternalTopologyBuilder builder;

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamThread$InternalConsumerConfig.class */
    private static final class InternalConsumerConfig extends ConsumerConfig {
        private InternalConsumerConfig(Map<String, Object> map) {
            super(ConsumerConfig.addDeserializerToConfig(map, new ByteArrayDeserializer(), new ByteArrayDeserializer()), false);
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamThread$ProcessingMode.class */
    public enum ProcessingMode {
        AT_LEAST_ONCE("AT_LEAST_ONCE"),
        EXACTLY_ONCE_ALPHA("EXACTLY_ONCE_ALPHA"),
        EXACTLY_ONCE_BETA("EXACTLY_ONCE_BETA");

        public final String name;

        ProcessingMode(String str) {
            this.name = str;
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamThread$State.class */
    public enum State implements ThreadStateTransitionValidator {
        CREATED(1, 5),
        STARTING(2, 3, 5),
        PARTITIONS_REVOKED(2, 3, 5),
        PARTITIONS_ASSIGNED(2, 3, 4, 5),
        RUNNING(2, 3, 4, 5),
        PENDING_SHUTDOWN(6),
        DEAD(new Integer[0]);

        private final Set<Integer> validTransitions = new HashSet();

        State(Integer... numArr) {
            this.validTransitions.addAll(Arrays.asList(numArr));
        }

        public boolean isAlive() {
            return equals(RUNNING) || equals(STARTING) || equals(PARTITIONS_REVOKED) || equals(PARTITIONS_ASSIGNED);
        }

        @Override // org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidator
        public boolean isValidTransition(ThreadStateTransitionValidator threadStateTransitionValidator) {
            return this.validTransitions.contains(Integer.valueOf(((State) threadStateTransitionValidator).ordinal()));
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamThread$StateListener.class */
    public interface StateListener {
        void onChange(Thread thread, ThreadStateTransitionValidator threadStateTransitionValidator, ThreadStateTransitionValidator threadStateTransitionValidator2);
    }

    public void setStateListener(StateListener stateListener) {
        this.stateListener = stateListener;
    }

    public State state() {
        return this.state;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public State setState(State state) {
        synchronized (this.stateLock) {
            State state2 = this.state;
            if (this.state == State.PENDING_SHUTDOWN && state != State.DEAD) {
                this.log.debug("Ignoring request to transit from PENDING_SHUTDOWN to {}: only DEAD state is a valid next state", state);
                return null;
            }
            if (this.state == State.DEAD) {
                this.log.debug("Ignoring request to transit from DEAD to {}: no valid next state after DEAD", state);
                return null;
            }
            if (!this.state.isValidTransition(state)) {
                this.log.error("Unexpected state transition from {} to {}", state2, state);
                throw new StreamsException(this.logPrefix + "Unexpected state transition from " + state2 + " to " + state);
            }
            this.log.info("State transition from {} to {}", state2, state);
            this.state = state;
            if (state == State.RUNNING) {
                updateThreadMetadata(this.taskManager.activeTaskMap(), this.taskManager.standbyTaskMap());
            } else {
                updateThreadMetadata(Collections.emptyMap(), Collections.emptyMap());
            }
            if (this.stateListener != null) {
                this.stateListener.onChange(this, this.state, state2);
            }
            return state2;
        }
    }

    public boolean isRunning() {
        boolean isAlive;
        synchronized (this.stateLock) {
            isAlive = this.state.isAlive();
        }
        return isAlive;
    }

    public static StreamThread create(InternalTopologyBuilder internalTopologyBuilder, StreamsConfig streamsConfig, KafkaClientSupplier kafkaClientSupplier, Admin admin, UUID uuid, String str, StreamsMetricsImpl streamsMetricsImpl, Time time, StreamsMetadataState streamsMetadataState, long j, StateDirectory stateDirectory, StateRestoreListener stateRestoreListener, int i) {
        String str2 = str + "-StreamThread-" + i;
        String format = String.format("stream-thread [%s] ", str2);
        LogContext logContext = new LogContext(format);
        Logger logger = logContext.logger(StreamThread.class);
        logger.info("Creating restore consumer client");
        Consumer<byte[], byte[]> restoreConsumer = kafkaClientSupplier.getRestoreConsumer(streamsConfig.getRestoreConsumerConfigs(ClientUtils.getRestoreConsumerClientId(str2)));
        StoreChangelogReader storeChangelogReader = new StoreChangelogReader(time, streamsConfig, logContext, admin, restoreConsumer, stateRestoreListener);
        TaskManager taskManager = new TaskManager(storeChangelogReader, uuid, format, new ActiveTaskCreator(internalTopologyBuilder, streamsConfig, streamsMetricsImpl, stateDirectory, storeChangelogReader, new ThreadCache(logContext, j, streamsMetricsImpl), time, kafkaClientSupplier, str2, uuid, logger), new StandbyTaskCreator(internalTopologyBuilder, streamsConfig, streamsMetricsImpl, stateDirectory, storeChangelogReader, str2, logger), internalTopologyBuilder, admin, stateDirectory, processingMode(streamsConfig));
        logger.info("Creating consumer client");
        Map<String, Object> mainConsumerConfigs = streamsConfig.getMainConsumerConfigs(streamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG), ClientUtils.getConsumerClientId(str2), i);
        mainConsumerConfigs.put(StreamsConfig.InternalConfig.TIME, time);
        mainConsumerConfigs.put(StreamsConfig.InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR, taskManager);
        mainConsumerConfigs.put(StreamsConfig.InternalConfig.STREAMS_METADATA_STATE_FOR_PARTITION_ASSIGNOR, streamsMetadataState);
        mainConsumerConfigs.put(StreamsConfig.InternalConfig.STREAMS_ADMIN_CLIENT, admin);
        AtomicInteger atomicInteger = new AtomicInteger();
        mainConsumerConfigs.put(StreamsConfig.InternalConfig.ASSIGNMENT_ERROR_CODE, atomicInteger);
        AtomicLong atomicLong = new AtomicLong(Long.MAX_VALUE);
        mainConsumerConfigs.put(StreamsConfig.InternalConfig.NEXT_SCHEDULED_REBALANCE_MS, atomicLong);
        String str3 = (String) mainConsumerConfigs.get("auto.offset.reset");
        if (!internalTopologyBuilder.latestResetTopicsPattern().pattern().isEmpty() || !internalTopologyBuilder.earliestResetTopicsPattern().pattern().isEmpty()) {
            mainConsumerConfigs.put("auto.offset.reset", StreamsConfig.NO_OPTIMIZATION);
        }
        Consumer<byte[], byte[]> consumer = kafkaClientSupplier.getConsumer(mainConsumerConfigs);
        storeChangelogReader.setMainConsumer(consumer);
        taskManager.setMainConsumer(consumer);
        StreamThread streamThread = new StreamThread(time, streamsConfig, admin, consumer, restoreConsumer, storeChangelogReader, str3, taskManager, streamsMetricsImpl, internalTopologyBuilder, str2, logContext, atomicInteger, atomicLong);
        taskManager.setPartitionResetter(set -> {
            streamThread.resetOffsets(set, null);
        });
        return streamThread.updateThreadMetadata(ClientUtils.getSharedAdminClientId(str));
    }

    public static ProcessingMode processingMode(StreamsConfig streamsConfig) {
        return StreamsConfig.EXACTLY_ONCE.equals(streamsConfig.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)) ? ProcessingMode.EXACTLY_ONCE_ALPHA : StreamsConfig.EXACTLY_ONCE_BETA.equals(streamsConfig.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)) ? ProcessingMode.EXACTLY_ONCE_BETA : ProcessingMode.AT_LEAST_ONCE;
    }

    public static boolean eosEnabled(StreamsConfig streamsConfig) {
        ProcessingMode processingMode = processingMode(streamsConfig);
        return processingMode == ProcessingMode.EXACTLY_ONCE_ALPHA || processingMode == ProcessingMode.EXACTLY_ONCE_BETA;
    }

    public StreamThread(Time time, StreamsConfig streamsConfig, Admin admin, Consumer<byte[], byte[]> consumer, Consumer<byte[], byte[]> consumer2, ChangelogReader changelogReader, String str, TaskManager taskManager, StreamsMetricsImpl streamsMetricsImpl, InternalTopologyBuilder internalTopologyBuilder, String str2, LogContext logContext, AtomicInteger atomicInteger, AtomicLong atomicLong) {
        super(str2);
        this.state = State.CREATED;
        this.stateLock = new Object();
        this.adminClient = admin;
        this.streamsMetrics = streamsMetricsImpl;
        this.commitSensor = ThreadMetrics.commitSensor(str2, streamsMetricsImpl);
        this.pollSensor = ThreadMetrics.pollSensor(str2, streamsMetricsImpl);
        this.pollRecordsSensor = ThreadMetrics.pollRecordsSensor(str2, streamsMetricsImpl);
        this.pollRatioSensor = ThreadMetrics.pollRatioSensor(str2, streamsMetricsImpl);
        this.processLatencySensor = ThreadMetrics.processLatencySensor(str2, streamsMetricsImpl);
        this.processRecordsSensor = ThreadMetrics.processRecordsSensor(str2, streamsMetricsImpl);
        this.processRateSensor = ThreadMetrics.processRateSensor(str2, streamsMetricsImpl);
        this.processRatioSensor = ThreadMetrics.processRatioSensor(str2, streamsMetricsImpl);
        this.punctuateSensor = ThreadMetrics.punctuateSensor(str2, streamsMetricsImpl);
        this.punctuateRatioSensor = ThreadMetrics.punctuateRatioSensor(str2, streamsMetricsImpl);
        this.commitRatioSensor = ThreadMetrics.commitRatioSensor(str2, streamsMetricsImpl);
        ThreadMetrics.createTaskSensor(str2, streamsMetricsImpl);
        ThreadMetrics.closeTaskSensor(str2, streamsMetricsImpl);
        if (streamsMetricsImpl.version() == StreamsMetricsImpl.Version.FROM_0100_TO_24) {
            ThreadMetrics.skipRecordSensor(str2, streamsMetricsImpl);
            ThreadMetrics.commitOverTasksSensor(str2, streamsMetricsImpl);
        }
        this.time = time;
        this.builder = internalTopologyBuilder;
        this.logPrefix = logContext.logPrefix();
        this.log = logContext.logger(StreamThread.class);
        this.rebalanceListener = new StreamsRebalanceListener(time, taskManager, this, this.log, atomicInteger);
        this.taskManager = taskManager;
        this.restoreConsumer = consumer2;
        this.mainConsumer = consumer;
        this.changelogReader = changelogReader;
        this.originalReset = str;
        this.nextProbingRebalanceMs = atomicLong;
        this.pollTime = Duration.ofMillis(streamsConfig.getLong(StreamsConfig.POLL_MS_CONFIG).longValue());
        this.maxPollTimeMs = new InternalConsumerConfig(streamsConfig.getMainConsumerConfigs("dummyGroupId", "dummyClientId", 1)).getInt("max.poll.interval.ms").intValue();
        this.commitTimeMs = streamsConfig.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG).longValue();
        this.numIterations = 1;
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        String message;
        this.log.info("Starting");
        if (setState(State.STARTING) == null) {
            this.log.info("StreamThread already shutdown. Not running");
            return;
        }
        boolean z = false;
        try {
            try {
                runLoop();
                z = true;
                completeShutdown(true);
            } catch (Exception e) {
                if ((e instanceof UnsupportedVersionException) && (message = e.getMessage()) != null && message.startsWith("Broker unexpectedly doesn't support requireStable flag on version ")) {
                    this.log.error("Shutting down because the Kafka cluster seems to be on a too old version. Setting {}=\"{}\" requires broker version 2.5 or higher.", StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_BETA);
                    throw e;
                }
                this.log.error("Encountered the following exception during processing and the thread is going to shut down: ", e);
                throw e;
            }
        } catch (Throwable th) {
            completeShutdown(z);
            throw th;
        }
    }

    /* JADX WARN: Type inference failed for: r6v1, types: [java.lang.Throwable, org.apache.kafka.streams.errors.TaskCorruptedException] */
    void runLoop() {
        subscribeConsumer();
        while (true) {
            if (!isRunning() && !this.taskManager.isRebalanceInProgress()) {
                return;
            }
            try {
                runOnce();
                if (this.nextProbingRebalanceMs.get() < this.time.milliseconds()) {
                    this.log.info("Triggering the followup rebalance scheduled for {} ms.", Long.valueOf(this.nextProbingRebalanceMs.get()));
                    this.mainConsumer.enforceRebalance();
                    this.nextProbingRebalanceMs.set(Long.MAX_VALUE);
                }
            } catch (TaskCorruptedException e) {
                this.log.warn("Detected the states of tasks " + e.corruptedTaskWithChangelogs() + " are corrupted. Will close the task as dirty and re-create and bootstrap from scratch.", (Throwable) e);
                try {
                    this.taskManager.handleCorruption(e.corruptedTaskWithChangelogs());
                } catch (TaskMigratedException e2) {
                    handleTaskMigrated(e2);
                }
            } catch (TaskMigratedException e3) {
                handleTaskMigrated(e3);
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void handleTaskMigrated(TaskMigratedException taskMigratedException) {
        this.log.warn("Detected that the thread is being fenced. This implies that this thread missed a rebalance and dropped out of the consumer group. Will close out all assigned tasks and rejoin the consumer group.", taskMigratedException);
        this.taskManager.handleLostAll();
        this.mainConsumer.unsubscribe();
        subscribeConsumer();
    }

    private void subscribeConsumer() {
        if (this.builder.usesPatternSubscription()) {
            this.mainConsumer.subscribe(this.builder.sourceTopicPattern(), this.rebalanceListener);
        } else {
            this.mainConsumer.subscribe(this.builder.sourceTopicCollection(), this.rebalanceListener);
        }
    }

    void runOnce() {
        ConsumerRecords<byte[], byte[]> pollRequests;
        long milliseconds = this.time.milliseconds();
        this.now = milliseconds;
        if (this.state == State.PARTITIONS_ASSIGNED) {
            pollRequests = pollRequests(Duration.ZERO);
        } else if (this.state == State.PARTITIONS_REVOKED) {
            pollRequests = pollRequests(Duration.ZERO);
        } else if (this.state == State.RUNNING || this.state == State.STARTING) {
            pollRequests = pollRequests(this.pollTime);
        } else {
            if (this.state != State.PENDING_SHUTDOWN) {
                this.log.error("Unexpected state {} during normal iteration", this.state);
                throw new StreamsException(this.logPrefix + "Unexpected state " + this.state + " during normal iteration");
            }
            pollRequests = pollRequests(Duration.ZERO);
        }
        long advanceNowAndComputeLatency = advanceNowAndComputeLatency();
        if (pollRequests != null && !pollRequests.isEmpty()) {
            this.pollSensor.record(advanceNowAndComputeLatency, this.now);
            this.pollRecordsSensor.record(pollRequests.count(), this.now);
            this.taskManager.addRecordsToTasks(pollRequests);
        }
        if (!isRunning()) {
            this.log.debug("State already transits to {}, skipping the run once call after poll request", this.state);
            return;
        }
        if (this.state == State.PARTITIONS_ASSIGNED || (this.state == State.RUNNING && this.taskManager.needsInitializationOrRestoration())) {
            this.changelogReader.enforceRestoreActive();
            if (this.taskManager.tryToCompleteRestoration()) {
                this.changelogReader.transitToUpdateStandby();
                setState(State.RUNNING);
            }
        }
        this.changelogReader.restore();
        advanceNowAndComputeLatency();
        int i = 0;
        long j = 0;
        long j2 = 0;
        long j3 = 0;
        if (this.state == State.RUNNING) {
            while (true) {
                int process = this.taskManager.process(this.numIterations, this.time);
                long advanceNowAndComputeLatency2 = advanceNowAndComputeLatency();
                j2 += advanceNowAndComputeLatency2;
                if (process > 0) {
                    this.processRateSensor.record(process, this.now);
                    this.processLatencySensor.record(advanceNowAndComputeLatency2 / process, this.now);
                    i += process;
                }
                int punctuate = this.taskManager.punctuate();
                long advanceNowAndComputeLatency3 = advanceNowAndComputeLatency();
                j3 += advanceNowAndComputeLatency3;
                if (punctuate > 0) {
                    this.punctuateSensor.record(advanceNowAndComputeLatency3 / punctuate, this.now);
                }
                long j4 = this.now;
                int maybeCommit = maybeCommit();
                long max = Math.max(this.now - j4, 0L);
                j += max;
                if (maybeCommit > 0) {
                    this.commitSensor.record(max / maybeCommit, this.now);
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Committed all active tasks {} and standby tasks {} in {}ms", new Object[]{this.taskManager.activeTaskIds(), this.taskManager.standbyTaskIds(), Long.valueOf(max)});
                    }
                }
                if (process == 0) {
                    break;
                }
                if (Math.max(this.now - this.lastPollMs, 0L) > this.maxPollTimeMs / 2) {
                    this.numIterations = this.numIterations > 1 ? this.numIterations / 2 : this.numIterations;
                } else if (punctuate > 0 || maybeCommit > 0) {
                    this.numIterations = this.numIterations > 1 ? this.numIterations / 2 : this.numIterations;
                } else {
                    this.numIterations++;
                }
            }
            this.taskManager.recordTaskProcessRatio(j2, this.now);
        }
        this.now = this.time.milliseconds();
        long j5 = this.now - milliseconds;
        this.processRecordsSensor.record(i, this.now);
        this.processRatioSensor.record(j2 / j5, this.now);
        this.punctuateRatioSensor.record(j3 / j5, this.now);
        this.pollRatioSensor.record(advanceNowAndComputeLatency / j5, this.now);
        this.commitRatioSensor.record(j / j5, this.now);
    }

    private ConsumerRecords<byte[], byte[]> pollRequests(Duration duration) {
        ConsumerRecords<byte[], byte[]> consumerRecords = null;
        this.lastPollMs = this.now;
        try {
            consumerRecords = this.mainConsumer.poll(duration);
        } catch (InvalidOffsetException e) {
            resetOffsets(e.partitions(), e);
        }
        return consumerRecords;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void resetOffsets(Set<TopicPartition> set, Exception exc) {
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        HashSet hashSet3 = new HashSet();
        HashSet hashSet4 = new HashSet();
        for (TopicPartition topicPartition : set) {
            if (this.builder.earliestResetTopicsPattern().matcher(topicPartition.topic()).matches()) {
                addToResetList(topicPartition, hashSet2, "Setting topic '{}' to consume from {} offset", "earliest", hashSet);
            } else if (this.builder.latestResetTopicsPattern().matcher(topicPartition.topic()).matches()) {
                addToResetList(topicPartition, hashSet3, "Setting topic '{}' to consume from {} offset", StreamsConfig.METRICS_LATEST, hashSet);
            } else if ("earliest".equals(this.originalReset)) {
                addToResetList(topicPartition, hashSet2, "No custom setting defined for topic '{}' using original config '{}' for offset reset", "earliest", hashSet);
            } else if (StreamsConfig.METRICS_LATEST.equals(this.originalReset)) {
                addToResetList(topicPartition, hashSet3, "No custom setting defined for topic '{}' using original config '{}' for offset reset", StreamsConfig.METRICS_LATEST, hashSet);
            } else {
                hashSet4.add(topicPartition);
            }
        }
        if (!hashSet4.isEmpty()) {
            String format = String.format("No valid committed offset found for input [%s] and no valid reset policy configured. You need to set configuration parameter \"auto.offset.reset\" or specify a topic specific reset policy via StreamsBuilder#stream(..., Consumed.with(Topology.AutoOffsetReset)) or StreamsBuilder#table(..., Consumed.with(Topology.AutoOffsetReset))", (String) hashSet4.stream().map((v0) -> {
                return v0.topic();
            }).distinct().collect(Collectors.joining(",")));
            if (exc != null) {
                throw new StreamsException(format, exc);
            }
            throw new StreamsException(format);
        }
        if (!hashSet2.isEmpty()) {
            this.mainConsumer.seekToBeginning(hashSet2);
        }
        if (hashSet3.isEmpty()) {
            return;
        }
        this.mainConsumer.seekToEnd(hashSet3);
    }

    private void addToResetList(TopicPartition topicPartition, Set<TopicPartition> set, String str, String str2, Set<String> set2) {
        String str3 = topicPartition.topic();
        if (set2.add(str3)) {
            this.log.info(str, str3, str2);
        }
        set.add(topicPartition);
    }

    int maybeCommit() {
        int maybeCommitActiveTasksPerUserRequested;
        if (this.now - this.lastCommitMs > this.commitTimeMs) {
            if (this.log.isTraceEnabled()) {
                this.log.trace("Committing all active tasks {} and standby tasks {} since {}ms has elapsed (commit interval is {}ms)", new Object[]{this.taskManager.activeTaskIds(), this.taskManager.standbyTaskIds(), Long.valueOf(this.now - this.lastCommitMs), Long.valueOf(this.commitTimeMs)});
            }
            maybeCommitActiveTasksPerUserRequested = this.taskManager.commit((Collection) this.taskManager.tasks().values().stream().filter(task -> {
                return task.state() == Task.State.RUNNING || task.state() == Task.State.RESTORING;
            }).collect(Collectors.toSet()));
            if (maybeCommitActiveTasksPerUserRequested > 0) {
                this.taskManager.maybePurgeCommittedRecords();
            }
            if (maybeCommitActiveTasksPerUserRequested == -1) {
                this.log.trace("Unable to commit as we are in the middle of a rebalance, will try again when it completes.");
            } else {
                advanceNowAndComputeLatency();
                this.lastCommitMs = this.now;
            }
        } else {
            maybeCommitActiveTasksPerUserRequested = this.taskManager.maybeCommitActiveTasksPerUserRequested();
        }
        return maybeCommitActiveTasksPerUserRequested;
    }

    private long advanceNowAndComputeLatency() {
        long j = this.now;
        this.now = this.time.milliseconds();
        return Math.max(this.now - j, 0L);
    }

    public void shutdown() {
        this.log.info("Informed to shut down");
        if (setState(State.PENDING_SHUTDOWN) == State.CREATED) {
            completeShutdown(true);
        }
    }

    private void completeShutdown(boolean z) {
        setState(State.PENDING_SHUTDOWN);
        this.log.info("Shutting down");
        try {
            this.taskManager.shutdown(z);
        } catch (Throwable th) {
            this.log.error("Failed to close task manager due to the following error:", th);
        }
        try {
            this.changelogReader.clear();
        } catch (Throwable th2) {
            this.log.error("Failed to close changelog reader due to the following error:", th2);
        }
        try {
            this.mainConsumer.close();
        } catch (Throwable th3) {
            this.log.error("Failed to close consumer due to the following error:", th3);
        }
        try {
            this.restoreConsumer.close();
        } catch (Throwable th4) {
            this.log.error("Failed to close restore consumer due to the following error:", th4);
        }
        this.streamsMetrics.removeAllThreadLevelSensors(getName());
        setState(State.DEAD);
        this.log.info("Shutdown complete");
    }

    public final ThreadMetadata threadMetadata() {
        return this.threadMetadata;
    }

    StreamThread updateThreadMetadata(String str) {
        this.threadMetadata = new ThreadMetadata(getName(), state().name(), ClientUtils.getConsumerClientId(getName()), ClientUtils.getRestoreConsumerClientId(getName()), this.taskManager.producerClientIds(), str, Collections.emptySet(), Collections.emptySet());
        return this;
    }

    private void updateThreadMetadata(Map<TaskId, Task> map, Map<TaskId, Task> map2) {
        HashSet hashSet = new HashSet();
        for (Map.Entry<TaskId, Task> entry : map.entrySet()) {
            hashSet.add(new TaskMetadata(entry.getKey().toString(), entry.getValue().inputPartitions()));
        }
        HashSet hashSet2 = new HashSet();
        for (Map.Entry<TaskId, Task> entry2 : map2.entrySet()) {
            hashSet2.add(new TaskMetadata(entry2.getKey().toString(), entry2.getValue().inputPartitions()));
        }
        this.threadMetadata = new ThreadMetadata(getName(), state().name(), ClientUtils.getConsumerClientId(getName()), ClientUtils.getRestoreConsumerClientId(getName()), this.taskManager.producerClientIds(), this.threadMetadata.adminClientId(), hashSet, hashSet2);
    }

    public Map<TaskId, Task> activeTaskMap() {
        return this.taskManager.activeTaskMap();
    }

    public List<Task> activeTasks() {
        return this.taskManager.activeTaskIterable();
    }

    public Map<TaskId, Task> allTasks() {
        return this.taskManager.tasks();
    }

    @Override // java.lang.Thread
    public String toString() {
        return toString("");
    }

    public String toString(String str) {
        return str + "\tStreamsThread threadId: " + getName() + "\n" + this.taskManager.toString(str);
    }

    public Map<MetricName, Metric> producerMetrics() {
        return this.taskManager.producerMetrics();
    }

    public Map<MetricName, Metric> consumerMetrics() {
        return ClientUtils.consumerMetrics(this.mainConsumer, this.restoreConsumer);
    }

    public Map<MetricName, Metric> adminClientMetrics() {
        return ClientUtils.adminClientMetrics(this.adminClient);
    }

    void setNow(long j) {
        this.now = j;
    }

    TaskManager taskManager() {
        return this.taskManager;
    }

    int currentNumIterations() {
        return this.numIterations;
    }

    ConsumerRebalanceListener rebalanceListener() {
        return this.rebalanceListener;
    }

    Consumer<byte[], byte[]> mainConsumer() {
        return this.mainConsumer;
    }

    Consumer<byte[], byte[]> restoreConsumer() {
        return this.restoreConsumer;
    }

    Admin adminClient() {
        return this.adminClient;
    }

    InternalTopologyBuilder internalTopologyBuilder() {
        return this.builder;
    }
}
