package org.apache.kafka.streams.processor.internals;

import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/GlobalStreamThread.class */
public class GlobalStreamThread extends Thread {
    private final Logger log;
    private final LogContext logContext;
    private final StreamsConfig config;
    private final Consumer<byte[], byte[]> globalConsumer;
    private final StateDirectory stateDirectory;
    private final Time time;
    private final ThreadCache cache;
    private final StreamsMetricsImpl streamsMetrics;
    private final ProcessorTopology topology;
    private final AtomicLong cacheSize;
    private volatile StreamsException startupException;
    private java.util.function.Consumer<Throwable> streamsUncaughtExceptionHandler;
    private volatile State state;
    private final Object stateLock;
    private StreamThread.StateListener stateListener;
    private final String logPrefix;
    private final StateRestoreListener stateRestoreListener;

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/GlobalStreamThread$State.class */
    public enum State implements ThreadStateTransitionValidator {
        CREATED(1, 2),
        RUNNING(2),
        PENDING_SHUTDOWN(3),
        DEAD(new Integer[0]);

        private final Set<Integer> validTransitions = new HashSet();

        State(Integer... numArr) {
            this.validTransitions.addAll(Arrays.asList(numArr));
        }

        public boolean isRunning() {
            return equals(RUNNING);
        }

        public boolean inErrorState() {
            return equals(DEAD) || equals(PENDING_SHUTDOWN);
        }

        @Override // org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidator
        public boolean isValidTransition(ThreadStateTransitionValidator threadStateTransitionValidator) {
            return this.validTransitions.contains(Integer.valueOf(((State) threadStateTransitionValidator).ordinal()));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/GlobalStreamThread$StateConsumer.class */
    public static class StateConsumer {
        private final Consumer<byte[], byte[]> globalConsumer;
        private final GlobalStateMaintainer stateMaintainer;
        private final Time time;
        private final Duration pollTime;
        private final long flushInterval;
        private final Logger log;
        private long lastFlush;

        StateConsumer(LogContext logContext, Consumer<byte[], byte[]> consumer, GlobalStateMaintainer globalStateMaintainer, Time time, Duration duration, long j) {
            this.log = logContext.logger(getClass());
            this.globalConsumer = consumer;
            this.stateMaintainer = globalStateMaintainer;
            this.time = time;
            this.pollTime = duration;
            this.flushInterval = j;
        }

        void initialize() {
            Map<TopicPartition, Long> initialize = this.stateMaintainer.initialize();
            this.globalConsumer.assign(initialize.keySet());
            for (Map.Entry<TopicPartition, Long> entry : initialize.entrySet()) {
                this.globalConsumer.seek(entry.getKey(), entry.getValue().longValue());
            }
            this.lastFlush = this.time.milliseconds();
        }

        void pollAndUpdate() {
            Iterator it = this.globalConsumer.poll(this.pollTime).iterator();
            while (it.hasNext()) {
                this.stateMaintainer.update((ConsumerRecord) it.next());
            }
            long milliseconds = this.time.milliseconds();
            if (milliseconds - this.flushInterval >= this.lastFlush) {
                this.stateMaintainer.flushState();
                this.lastFlush = milliseconds;
            }
        }

        public void close(boolean z) throws IOException {
            try {
                this.globalConsumer.close();
            } catch (RuntimeException e) {
                this.log.error("Failed to close global consumer due to the following error:", e);
            }
            this.stateMaintainer.close(z);
        }
    }

    public void setStateListener(StreamThread.StateListener stateListener) {
        this.stateListener = stateListener;
    }

    public State state() {
        return this.state;
    }

    private void setState(State state) {
        State state2 = this.state;
        synchronized (this.stateLock) {
            if (this.state == State.PENDING_SHUTDOWN && state == State.PENDING_SHUTDOWN) {
                return;
            }
            if (this.state == State.DEAD) {
                return;
            }
            if (!this.state.isValidTransition(state)) {
                this.log.error("Unexpected state transition from {} to {}", state2, state);
                throw new StreamsException(this.logPrefix + "Unexpected state transition from " + state2 + " to " + state);
            }
            this.log.info("State transition from {} to {}", state2, state);
            this.state = state;
            if (this.stateListener != null) {
                this.stateListener.onChange(this, this.state, state2);
            }
        }
    }

    public boolean stillRunning() {
        boolean isRunning;
        synchronized (this.stateLock) {
            isRunning = this.state.isRunning();
        }
        return isRunning;
    }

    public boolean inErrorState() {
        boolean inErrorState;
        synchronized (this.stateLock) {
            inErrorState = this.state.inErrorState();
        }
        return inErrorState;
    }

    public boolean stillInitializing() {
        boolean equals;
        synchronized (this.stateLock) {
            equals = this.state.equals(State.CREATED);
        }
        return equals;
    }

    public GlobalStreamThread(ProcessorTopology processorTopology, StreamsConfig streamsConfig, Consumer<byte[], byte[]> consumer, StateDirectory stateDirectory, long j, StreamsMetricsImpl streamsMetricsImpl, Time time, String str, StateRestoreListener stateRestoreListener, java.util.function.Consumer<Throwable> consumer2) {
        super(str);
        this.state = State.CREATED;
        this.stateLock = new Object();
        this.stateListener = null;
        this.time = time;
        this.config = streamsConfig;
        this.topology = processorTopology;
        this.globalConsumer = consumer;
        this.stateDirectory = stateDirectory;
        this.streamsMetrics = streamsMetricsImpl;
        this.logPrefix = String.format("global-stream-thread [%s] ", str);
        this.logContext = new LogContext(this.logPrefix);
        this.log = this.logContext.logger(getClass());
        this.cache = new ThreadCache(this.logContext, j, this.streamsMetrics);
        this.stateRestoreListener = stateRestoreListener;
        this.streamsUncaughtExceptionHandler = consumer2;
        this.cacheSize = new AtomicLong(-1L);
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        StateConsumer initialize = initialize();
        if (initialize == null) {
            setState(State.PENDING_SHUTDOWN);
            setState(State.DEAD);
            this.log.error("Error happened during initialization of the global state store; this thread has shutdown.");
            this.streamsMetrics.removeAllThreadLevelSensors(getName());
            this.streamsMetrics.removeAllThreadLevelMetrics(getName());
            return;
        }
        setState(State.RUNNING);
        while (stillRunning()) {
            try {
                try {
                    long andSet = this.cacheSize.getAndSet(-1L);
                    if (andSet != -1) {
                        this.cache.resize(andSet);
                    }
                    initialize.pollAndUpdate();
                } catch (Exception e) {
                    this.log.error("Error happened while maintaining global state store. The streams application or client will now close to ERROR.", e);
                    this.streamsUncaughtExceptionHandler.accept(e);
                    setState(State.PENDING_SHUTDOWN);
                    this.log.info("Shutting down");
                    try {
                        initialize.close(false);
                    } catch (IOException e2) {
                        this.log.error("Failed to close state maintainer due to the following error:", e2);
                    }
                    this.streamsMetrics.removeAllThreadLevelSensors(getName());
                    this.streamsMetrics.removeAllThreadLevelMetrics(getName());
                    setState(State.DEAD);
                    this.log.info("Shutdown complete");
                    return;
                } catch (InvalidOffsetException e3) {
                    this.log.error("Updating global state failed due to inconsistent local state. Will attempt to clean up the local state. You can restart KafkaStreams to recover from this error.", e3);
                    this.streamsUncaughtExceptionHandler.accept(new StreamsException("Updating global state failed. You can restart KafkaStreams to launch a new GlobalStreamThread to recover from this error.", (Throwable) e3));
                    setState(State.PENDING_SHUTDOWN);
                    this.log.info("Shutting down");
                    try {
                        initialize.close(true);
                    } catch (IOException e4) {
                        this.log.error("Failed to close state maintainer due to the following error:", e4);
                    }
                    this.streamsMetrics.removeAllThreadLevelSensors(getName());
                    this.streamsMetrics.removeAllThreadLevelMetrics(getName());
                    setState(State.DEAD);
                    this.log.info("Shutdown complete");
                    return;
                }
            } catch (Throwable th) {
                setState(State.PENDING_SHUTDOWN);
                this.log.info("Shutting down");
                try {
                    initialize.close(false);
                } catch (IOException e5) {
                    this.log.error("Failed to close state maintainer due to the following error:", e5);
                }
                this.streamsMetrics.removeAllThreadLevelSensors(getName());
                this.streamsMetrics.removeAllThreadLevelMetrics(getName());
                setState(State.DEAD);
                this.log.info("Shutdown complete");
                throw th;
            }
        }
        setState(State.PENDING_SHUTDOWN);
        this.log.info("Shutting down");
        try {
            initialize.close(false);
        } catch (IOException e6) {
            this.log.error("Failed to close state maintainer due to the following error:", e6);
        }
        this.streamsMetrics.removeAllThreadLevelSensors(getName());
        this.streamsMetrics.removeAllThreadLevelMetrics(getName());
        setState(State.DEAD);
        this.log.info("Shutdown complete");
    }

    public void setUncaughtExceptionHandler(java.util.function.Consumer<Throwable> consumer) {
        this.streamsUncaughtExceptionHandler = consumer;
    }

    public void resize(long j) {
        this.cacheSize.set(j);
    }

    private StateConsumer initialize() {
        StateConsumer stateConsumer = null;
        try {
            GlobalStateManagerImpl globalStateManagerImpl = new GlobalStateManagerImpl(this.logContext, this.time, this.topology, this.globalConsumer, this.stateDirectory, this.stateRestoreListener, this.config);
            GlobalProcessorContextImpl globalProcessorContextImpl = new GlobalProcessorContextImpl(this.config, globalStateManagerImpl, this.streamsMetrics, this.cache, this.time);
            globalStateManagerImpl.setGlobalProcessorContext(globalProcessorContextImpl);
            stateConsumer = new StateConsumer(this.logContext, this.globalConsumer, new GlobalStateUpdateTask(this.logContext, this.topology, globalProcessorContextImpl, globalStateManagerImpl, this.config.defaultDeserializationExceptionHandler()), this.time, Duration.ofMillis(this.config.getLong(StreamsConfig.POLL_MS_CONFIG).longValue()), this.config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG).longValue());
            try {
                stateConsumer.initialize();
                return stateConsumer;
            } catch (InvalidOffsetException e) {
                this.log.error("Bootstrapping global state failed due to inconsistent local state. Will attempt to clean up the local state. You can restart KafkaStreams to recover from this error.", e);
                closeStateConsumer(stateConsumer, true);
                throw new StreamsException("Bootstrapping global state failed. You can restart KafkaStreams to recover from this error.", (Throwable) e);
            }
        } catch (Exception e2) {
            closeStateConsumer(stateConsumer, false);
            this.startupException = new StreamsException("Exception caught during initialization of GlobalStreamThread", e2);
            return null;
        } catch (StreamsException e3) {
            closeStateConsumer(stateConsumer, false);
            this.startupException = e3;
            return null;
        }
    }

    private void closeStateConsumer(StateConsumer stateConsumer, boolean z) {
        if (stateConsumer != null) {
            try {
                stateConsumer.close(z);
            } catch (IOException e) {
                this.log.error("Failed to close state consumer due to the following error:", e);
            }
        }
    }

    @Override // java.lang.Thread
    public synchronized void start() {
        super.start();
        while (stillInitializing()) {
            Utils.sleep(1L);
            if (this.startupException != null) {
                throw this.startupException;
            }
        }
        if (inErrorState()) {
            throw new IllegalStateException("Initialization for the global stream thread failed");
        }
    }

    public void shutdown() {
        setState(State.PENDING_SHUTDOWN);
    }

    public Map<MetricName, Metric> consumerMetrics() {
        return Collections.unmodifiableMap(this.globalConsumer.metrics());
    }
}
