package com.linkedin.kafka.cruisecontrol.statemachine;

import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.common.KafkaCruiseControlThreadFactory;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/statemachine/StateMachineProcessor.class */
public class StateMachineProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(StateMachineProcessor.class);
    private static final int TASK_POLL_SECONDS = 10;
    private volatile ExecutorService executor;
    private final BlockingQueue<Task> taskQueue = new LinkedBlockingQueue();
    private volatile boolean shutdownRequested = false;

    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/statemachine/StateMachineProcessor$ShutdownTask.class */
    private static class ShutdownTask implements Task {
        static final ShutdownTask SHUTDOWN_TASK = new ShutdownTask();

        private ShutdownTask() {
        }

        @Override // com.linkedin.kafka.cruisecontrol.statemachine.Task
        public String taskId() {
            return "ShutdownTask";
        }
    }

    public void init() {
        if (this.executor != null) {
            throw new RuntimeException("Executor is already initialized.");
        }
        this.executor = Executors.newFixedThreadPool(2, new KafkaCruiseControlThreadFactory("StateMachineProcessor", true, LOG, Optional.empty()));
    }

    public void startUp() {
        if (this.shutdownRequested) {
            throw new RuntimeException("StateMachineProcessor already shutdown. Can't start it back up.");
        }
        this.executor.submit(this::processTasks);
    }

    public void shutdown() {
        if (this.shutdownRequested) {
            throw new RuntimeException("StateMachineProcessor getting shut down or is already shut down.");
        }
        this.shutdownRequested = true;
        this.taskQueue.add(ShutdownTask.SHUTDOWN_TASK);
        KafkaCruiseControlUtils.executeSilently(this.executor, (v0) -> {
            v0.shutdown();
        });
    }

    public String handleTask(Task task) {
        if (this.shutdownRequested) {
            throw new RuntimeException("StateMachineProcessor already shutdown. Can't execute task: " + task);
        }
        this.taskQueue.add(task);
        return task.taskId();
    }

    private void processTasks() {
        Task poll;
        do {
            try {
                poll = this.taskQueue.poll(10L, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                LOG.info("State machine processing thread was interrupted. Shutting down.");
                LOG.debug("Sate machine thread interrupted.", e);
                Thread.currentThread().interrupt();
            }
            if (poll == ShutdownTask.SHUTDOWN_TASK) {
                LOG.info("Shutting down StateMachineProcessor event loop.");
                return;
            }
            if (poll != null) {
                LOG.info("Starting to process task ({}) with id: {}", poll.taskId(), poll.getClass().getName());
                this.executor.submit(poll);
            }
            if (this.shutdownRequested) {
                return;
            }
        } while (!Thread.currentThread().isInterrupted());
    }
}
