/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.server.util;

import java.util.concurrent.Delayed;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.server.util.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaScheduler
implements Scheduler {
    private static final Logger log = LoggerFactory.getLogger(KafkaScheduler.class);
    private static final String DEFAULT_THREAD_NAME_PREFIX = "kafka-scheduler-";
    private final AtomicInteger schedulerThreadId = new AtomicInteger(0);
    private final int threads;
    private final boolean daemon;
    private final String threadNamePrefix;
    private final boolean trackStuckIO;
    private volatile ScheduledThreadPoolExecutor executor;

    public KafkaScheduler(int threads) {
        this(threads, true);
    }

    public KafkaScheduler(int threads, boolean daemon) {
        this(threads, daemon, DEFAULT_THREAD_NAME_PREFIX, false);
    }

    public KafkaScheduler(int threads, boolean daemon, boolean trackStuckIO) {
        this(threads, daemon, DEFAULT_THREAD_NAME_PREFIX, trackStuckIO);
    }

    public KafkaScheduler(int threads, boolean daemon, String threadNamePrefix, boolean trackStuckIO) {
        this.threads = threads;
        this.daemon = daemon;
        this.threadNamePrefix = threadNamePrefix;
        this.trackStuckIO = trackStuckIO;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void startup() {
        log.debug("Initializing task scheduler.");
        KafkaScheduler kafkaScheduler = this;
        synchronized (kafkaScheduler) {
            if (this.isStarted()) {
                throw new IllegalStateException("This scheduler has already been started.");
            }
            ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(this.threads);
            executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
            executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
            executor.setRemoveOnCancelPolicy(true);
            executor.setThreadFactory(runnable -> new KafkaThread(this.threadNamePrefix + this.schedulerThreadId.getAndIncrement(), runnable, this.daemon, this.trackStuckIO));
            this.executor = executor;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void shutdown() throws InterruptedException {
        log.debug("Shutting down task scheduler.");
        ScheduledThreadPoolExecutor maybeExecutor = null;
        KafkaScheduler kafkaScheduler = this;
        synchronized (kafkaScheduler) {
            if (this.isStarted()) {
                maybeExecutor = this.executor;
                maybeExecutor.shutdown();
                this.executor = null;
            }
        }
        if (maybeExecutor != null) {
            maybeExecutor.awaitTermination(1L, TimeUnit.DAYS);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public ScheduledFuture<?> schedule(String name, Runnable task, long delayMs, long periodMs) {
        log.debug("Scheduling task {} with initial delay {} ms and period {} ms.", new Object[]{name, delayMs, periodMs});
        KafkaScheduler kafkaScheduler = this;
        synchronized (kafkaScheduler) {
            if (this.isStarted()) {
                Runnable runnable = () -> {
                    try {
                        log.trace("Beginning execution of scheduled task '{}'.", (Object)name);
                        task.run();
                    }
                    catch (Throwable t) {
                        log.error("Uncaught exception in scheduled task '{}'", (Object)name, (Object)t);
                    }
                    finally {
                        log.trace("Completed execution of scheduled task '{}'.", (Object)name);
                    }
                };
                if (periodMs > 0L) {
                    return this.executor.scheduleAtFixedRate(runnable, delayMs, periodMs, TimeUnit.MILLISECONDS);
                }
                return this.executor.schedule(runnable, delayMs, TimeUnit.MILLISECONDS);
            }
            log.info("Kafka scheduler is not running at the time task '{}' is scheduled. The task is ignored.", (Object)name);
            return new NoOpScheduledFutureTask();
        }
    }

    public boolean isStarted() {
        return this.executor != null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void resizeThreadPool(int newSize) {
        KafkaScheduler kafkaScheduler = this;
        synchronized (kafkaScheduler) {
            if (this.isStarted()) {
                this.executor.setCorePoolSize(newSize);
            }
        }
    }

    @Override
    public int size() {
        ScheduledThreadPoolExecutor cachedExecutor = this.executor;
        if (cachedExecutor != null) {
            return cachedExecutor.getQueue().size();
        }
        return 0;
    }

    public String threadNamePrefix() {
        return this.threadNamePrefix;
    }

    public boolean taskRunning(ScheduledFuture<?> task) {
        ScheduledThreadPoolExecutor e = this.executor;
        return e != null && e.getQueue().contains(task);
    }

    public int pendingTaskSize() {
        return this.isStarted() ? this.executor.getQueue().size() : 0;
    }

    private static class NoOpScheduledFutureTask
    implements ScheduledFuture<Void> {
        private NoOpScheduledFutureTask() {
        }

        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            return true;
        }

        @Override
        public boolean isCancelled() {
            return true;
        }

        @Override
        public boolean isDone() {
            return true;
        }

        @Override
        public Void get() {
            return null;
        }

        @Override
        public Void get(long timeout, TimeUnit unit) {
            return null;
        }

        @Override
        public long getDelay(TimeUnit unit) {
            return 0L;
        }

        @Override
        public int compareTo(Delayed o) {
            long diff = this.getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);
            if (diff < 0L) {
                return -1;
            }
            if (diff > 0L) {
                return 1;
            }
            return 0;
        }
    }
}

