package kafka.restore.schedulers;

import java.util.concurrent.ArrayBlockingQueue;
import kafka.restore.messages.MessageRequest;
import kafka.restore.messages.MessageStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:kafka/restore/schedulers/AbstractAsyncServiceScheduler.class */
public abstract class AbstractAsyncServiceScheduler {
    private static final int DEFAULT_REQUEST_QUEUE_CAPACITY = 500000;
    private static final String REQUEST_QUEUE_CONSUMER_THREAD_NAME = "requestQueueConsumerThread";
    private static final Logger log = LoggerFactory.getLogger((Class<?>) AbstractAsyncServiceScheduler.class);
    private final AsyncServiceSchedulerResultsReceiver resultsReceiver;
    private ArrayBlockingQueue<MessageRequest> requestQueue;
    private Thread requestQueueConsumerThread;
    private final int requestQueueCapacity;
    private volatile AsyncServiceSchedulerStatus status;

    /* loaded from: input_file:kafka/restore/schedulers/AbstractAsyncServiceScheduler$AsyncServiceSchedulerStatus.class */
    public enum AsyncServiceSchedulerStatus {
        NOT_STARTED,
        RUNNING,
        PAUSING,
        PAUSED,
        SHUTTING_DOWN,
        SHUTDOWN,
        ERROR
    }

    public AbstractAsyncServiceScheduler(AsyncServiceSchedulerResultsReceiver asyncServiceSchedulerResultsReceiver) {
        this(asyncServiceSchedulerResultsReceiver, DEFAULT_REQUEST_QUEUE_CAPACITY);
    }

    public AbstractAsyncServiceScheduler(AsyncServiceSchedulerResultsReceiver asyncServiceSchedulerResultsReceiver, int i) {
        if (i < 1) {
            throw new IllegalArgumentException("Request Queue Capacity must be at least one");
        }
        this.resultsReceiver = asyncServiceSchedulerResultsReceiver;
        this.requestQueueCapacity = i;
        this.status = AsyncServiceSchedulerStatus.NOT_STARTED;
    }

    public synchronized boolean startUp() {
        if (this.status != AsyncServiceSchedulerStatus.NOT_STARTED && this.status != AsyncServiceSchedulerStatus.SHUTDOWN) {
            log.error("startUp() can only be called on a service scheduler that has not been started or is shutdown.");
            return false;
        }
        this.requestQueue = new ArrayBlockingQueue<>(this.requestQueueCapacity);
        startRequestQueueConsumerThread();
        return true;
    }

    private void consumeRequestsFromRequestQueue() {
        while (this.status == AsyncServiceSchedulerStatus.RUNNING) {
            MessageRequest messageRequest = null;
            try {
                messageRequest = this.requestQueue.take();
                log.info(String.format("[%s]: took a request out of requestQueue: %s, messages in queue: %s", messageRequest.getTopicPartition(), messageRequest.getClass().getSimpleName(), Integer.valueOf(this.requestQueue.size())));
                processRequestFromRequestQueue(messageRequest);
            } catch (InterruptedException e) {
                log.warn("requestConsumerThread received InterruptedException while waiting for or processing request. exit consuming request queue loop.", (Throwable) e);
            } catch (Exception e2) {
                log.error(String.format("[%s]: Unexpected exception caught while processing request: " + messageRequest + ", swallow it", messageRequest.getTopicPartition()), (Throwable) e2);
            }
        }
        if (this.status != AsyncServiceSchedulerStatus.PAUSING && this.status != AsyncServiceSchedulerStatus.SHUTTING_DOWN) {
            log.error("Async Service Scheduler has entered illegal state " + this.status.name() + " while exiting from request queue consumer thread.");
            this.status = AsyncServiceSchedulerStatus.ERROR;
        }
        this.status = this.status == AsyncServiceSchedulerStatus.PAUSING ? AsyncServiceSchedulerStatus.PAUSED : AsyncServiceSchedulerStatus.SHUTDOWN;
        log.info("Request Consumer Thread exiting upon entry to " + this.status.name() + " state");
    }

    public MessageStatusCode submitRequest(MessageRequest messageRequest) {
        if (this.status == AsyncServiceSchedulerStatus.RUNNING) {
            return this.requestQueue.offer(messageRequest) ? MessageStatusCode.SCHEDULED : MessageStatusCode.REQUEST_QUEUE_FULL;
        }
        log.error("submitRequest() can only be called on a service scheduler that is running. A non-running service scheduler was likely shut down via a call to shutdown().");
        return MessageStatusCode.ILLEGAL_STATE_ERROR;
    }

    protected abstract void processRequestFromRequestQueue(MessageRequest messageRequest);

    public synchronized boolean shutdown() {
        if (this.status == AsyncServiceSchedulerStatus.PAUSED || this.status == AsyncServiceSchedulerStatus.ERROR) {
            this.status = AsyncServiceSchedulerStatus.SHUTDOWN;
            return true;
        }
        if (this.status != AsyncServiceSchedulerStatus.RUNNING) {
            log.error("shutdown() can only be called on a service scheduler that is running, paused, or in error state.");
            return false;
        }
        this.status = AsyncServiceSchedulerStatus.SHUTTING_DOWN;
        this.requestQueueConsumerThread.interrupt();
        try {
            this.requestQueueConsumerThread.join();
            return true;
        } catch (InterruptedException e) {
            log.error("shutdown() was interrupted prior to background thread terminating", (Throwable) e);
            this.status = AsyncServiceSchedulerStatus.ERROR;
            return false;
        }
    }

    public AsyncServiceSchedulerStatus getStatus() {
        return this.status;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AsyncServiceSchedulerResultsReceiver getResultsReceiver() {
        return this.resultsReceiver;
    }

    public synchronized boolean pause() {
        if (this.status != AsyncServiceSchedulerStatus.RUNNING) {
            log.error("pause() can only be called on service scheduler that is running.");
            return false;
        }
        this.status = AsyncServiceSchedulerStatus.PAUSING;
        this.requestQueueConsumerThread.interrupt();
        try {
            this.requestQueueConsumerThread.join();
            return true;
        } catch (InterruptedException e) {
            log.error("Pause was interrupted prior to completing");
            this.status = AsyncServiceSchedulerStatus.ERROR;
            return false;
        }
    }

    public synchronized boolean resume() {
        if (this.status != AsyncServiceSchedulerStatus.PAUSED) {
            log.error("resume() can only be called on service scheduler in PAUSED state.");
            return false;
        }
        startRequestQueueConsumerThread();
        return true;
    }

    private void startRequestQueueConsumerThread() {
        this.status = AsyncServiceSchedulerStatus.RUNNING;
        this.requestQueueConsumerThread = new Thread(this::consumeRequestsFromRequestQueue, REQUEST_QUEUE_CONSUMER_THREAD_NAME);
        this.requestQueueConsumerThread.start();
    }
}
