package org.apache.kafka.streams.processor.internals.tasks;

import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.internals.ReadOnlyTask;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.class */
public class DefaultTaskExecutor implements TaskExecutor {
    private final Time time;
    private final String name;
    private final TaskManager taskManager;
    private StreamTask currentTask = null;
    private TaskExecutorThread taskExecutorThread = null;
    private CountDownLatch shutdownGate;

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor$TaskExecutorThread.class */
    private class TaskExecutorThread extends Thread {
        private final AtomicBoolean isRunning;
        private final AtomicReference<KafkaFutureImpl<StreamTask>> pauseRequested;
        private final Logger log;

        public TaskExecutorThread(String str) {
            super(str);
            this.isRunning = new AtomicBoolean(true);
            this.pauseRequested = new AtomicReference<>(null);
            this.log = new LogContext(String.format("%s ", str)).logger(DefaultTaskExecutor.class);
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            this.log.info("Task executor thread started");
            while (this.isRunning.get()) {
                try {
                    runOnce(DefaultTaskExecutor.this.time.milliseconds());
                } finally {
                    if (DefaultTaskExecutor.this.currentTask != null) {
                        unassignCurrentTask();
                    }
                    DefaultTaskExecutor.this.shutdownGate.countDown();
                    this.log.info("Task executor thread shutdown");
                }
            }
        }

        private void runOnce(long j) {
            KafkaFutureImpl<StreamTask> andSet = this.pauseRequested.getAndSet(null);
            if (andSet != null) {
                andSet.complete(unassignCurrentTask());
            }
            if (DefaultTaskExecutor.this.currentTask == null) {
                DefaultTaskExecutor.this.currentTask = DefaultTaskExecutor.this.taskManager.assignNextTask(DefaultTaskExecutor.this);
            } else if (DefaultTaskExecutor.this.currentTask.isProcessable(j)) {
                DefaultTaskExecutor.this.currentTask.process(j);
            } else {
                unassignCurrentTask();
            }
        }

        private StreamTask unassignCurrentTask() {
            if (DefaultTaskExecutor.this.currentTask == null) {
                throw new IllegalStateException("Does not own any task while being ask to unassign from task manager");
            }
            DefaultTaskExecutor.this.currentTask.prepareCommit();
            DefaultTaskExecutor.this.taskManager.unassignTask(DefaultTaskExecutor.this.currentTask, DefaultTaskExecutor.this);
            StreamTask streamTask = DefaultTaskExecutor.this.currentTask;
            DefaultTaskExecutor.this.currentTask = null;
            return streamTask;
        }
    }

    public DefaultTaskExecutor(TaskManager taskManager, String str, Time time) {
        this.time = time;
        this.name = str;
        this.taskManager = taskManager;
    }

    @Override // org.apache.kafka.streams.processor.internals.tasks.TaskExecutor
    public String name() {
        return this.name;
    }

    @Override // org.apache.kafka.streams.processor.internals.tasks.TaskExecutor
    public void start() {
        if (this.taskExecutorThread == null) {
            this.taskExecutorThread = new TaskExecutorThread(this.name);
            this.taskExecutorThread.start();
            this.shutdownGate = new CountDownLatch(1);
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.tasks.TaskExecutor
    public void shutdown(Duration duration) {
        if (this.taskExecutorThread != null) {
            this.taskExecutorThread.isRunning.set(false);
            this.taskExecutorThread.interrupt();
            try {
                if (!this.shutdownGate.await(duration.toMillis(), TimeUnit.MILLISECONDS)) {
                    throw new StreamsException("State updater thread did not shutdown within the timeout");
                }
                this.taskExecutorThread = null;
            } catch (InterruptedException e) {
            }
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.tasks.TaskExecutor
    public ReadOnlyTask currentTask() {
        if (this.currentTask != null) {
            return new ReadOnlyTask(this.currentTask);
        }
        return null;
    }

    @Override // org.apache.kafka.streams.processor.internals.tasks.TaskExecutor
    public KafkaFuture<StreamTask> unassign() {
        KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
        if (this.taskExecutorThread != null) {
            this.taskExecutorThread.pauseRequested.set(kafkaFutureImpl);
        } else {
            kafkaFutureImpl.complete(null);
        }
        return kafkaFutureImpl;
    }
}
