/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.cruisecontrol.analyzer.history;

import com.linkedin.kafka.cruisecontrol.common.KafkaCruiseControlThreadFactory;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import io.confluent.cruisecontrol.analyzer.history.AbstractTopicPartitionHistory;
import io.confluent.cruisecontrol.analyzer.history.GoalOptimizationHistoryListener;
import io.confluent.cruisecontrol.analyzer.history.SuspendedTopicPartition;
import io.confluent.cruisecontrol.analyzer.history.TopicPartitionHistoryPool;
import io.confluent.cruisecontrol.analyzer.history.TopicPartitionMovement;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
public class GoalOptimizationHistory
implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(GoalOptimizationHistory.class);
    private static final long TERMINATION_TIMEOUT_MS = 60000L;
    private final AtomicLong epoch;
    private final int topicPartitionMaximumMovements;
    private final long topicPartitionSuspensionMs;
    private final TopicPartitionHistoryPool<TopicPartitionMovement> recentTopicPartitionMovements;
    private final TopicPartitionHistoryPool<SuspendedTopicPartition> recentSuspendedTopicPartitions;
    private final ExecutorService topicPartitionMovementsCleanerExecutor;
    private final ExecutorService suspendedTopicPartitionCleanerExecutor;
    private final Collection<GoalOptimizationHistoryListener<TopicPartitionMovement>> topicPartitionMovementsListeners;
    private final Collection<GoalOptimizationHistoryListener<SuspendedTopicPartition>> suspendedTopicPartitionsListeners;
    private final ConcurrentMap<TopicPartition, Integer> numberOfMovementsByTopicPartition;
    private final InterruptiblePoller topicPartitionMovementsCleaner;
    private final InterruptiblePoller suspendedTopicPartitionsCleaner;

    public static GoalOptimizationHistory create(KafkaCruiseControlConfig config) {
        AtomicLong epoch = new AtomicLong(0L);
        int topicPartitionMaximumMovements = config.getInt("topic.partition.maximum.movements");
        long topicPartitionSuspensionMs = config.getLong("topic.partition.suspension.ms");
        TopicPartitionHistoryPool<TopicPartitionMovement> recentTopicPartitionMovements = new TopicPartitionHistoryPool<TopicPartitionMovement>();
        TopicPartitionHistoryPool<SuspendedTopicPartition> recentSuspendedTopicPartitions = new TopicPartitionHistoryPool<SuspendedTopicPartition>();
        ExecutorService topicPartitionMovementsCleanerExecutor = Executors.newSingleThreadExecutor(new KafkaCruiseControlThreadFactory("TopicPartitionMovementsCleaner", true, LOG));
        ExecutorService suspendedTopicPartitionCleanerExecutor = Executors.newSingleThreadExecutor(new KafkaCruiseControlThreadFactory("SuspendedTopicPartitionsCleaner", true, LOG));
        ConcurrentLinkedQueue<GoalOptimizationHistoryListener<TopicPartitionMovement>> topicPartitionMovementsListeners = new ConcurrentLinkedQueue<GoalOptimizationHistoryListener<TopicPartitionMovement>>();
        ConcurrentLinkedQueue<GoalOptimizationHistoryListener<SuspendedTopicPartition>> suspendedTopicPartitionsListeners = new ConcurrentLinkedQueue<GoalOptimizationHistoryListener<SuspendedTopicPartition>>();
        ConcurrentHashMap<TopicPartition, Integer> numberOfMovementsByTopicPartition = new ConcurrentHashMap<TopicPartition, Integer>();
        InterruptiblePoller topicPartitionMovementsCleaner = InterruptiblePoller.of(() -> {
            try {
                TopicPartitionMovement topicPartitionMovement = (TopicPartitionMovement)recentTopicPartitionMovements.takeExpired();
                TopicPartition tp = topicPartitionMovement.topicPartition();
                numberOfMovementsByTopicPartition.compute(tp, (topicPartition, oldNumber) -> {
                    if (oldNumber == null) {
                        return -1;
                    }
                    if (oldNumber == 1) {
                        return null;
                    }
                    return oldNumber - 1;
                });
                long currentEpoch = epoch.get();
                long tpEpoch = topicPartitionMovement.epoch();
                if (currentEpoch <= tpEpoch) {
                    if (currentEpoch < tpEpoch) {
                        LOG.warn("GoalOptimizationHistory has a smaller epoch ({}) than the one from TopicPartitionMovement ({})", (Object)currentEpoch, (Object)tpEpoch);
                    }
                    for (GoalOptimizationHistoryListener listener : topicPartitionMovementsListeners) {
                        GoalOptimizationHistory.safeExecute(() -> listener.onExpiredHistory(topicPartitionMovement));
                    }
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        InterruptiblePoller suspendedTopicPartitionsCleaner = InterruptiblePoller.of(() -> {
            try {
                SuspendedTopicPartition suspendedTopicPartition = (SuspendedTopicPartition)recentSuspendedTopicPartitions.takeExpired();
                long currentEpoch = epoch.get();
                long tpEpoch = suspendedTopicPartition.epoch();
                if (currentEpoch <= suspendedTopicPartition.epoch()) {
                    if (currentEpoch < tpEpoch) {
                        LOG.warn("GoalOptimizationHistory has a smaller epoch ({}) than the one from SuspendedTopicPartition ({})", (Object)currentEpoch, (Object)tpEpoch);
                    }
                    for (GoalOptimizationHistoryListener listener : suspendedTopicPartitionsListeners) {
                        GoalOptimizationHistory.safeExecute(() -> listener.onExpiredHistory(suspendedTopicPartition));
                    }
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        return new GoalOptimizationHistory(epoch, topicPartitionMaximumMovements, topicPartitionSuspensionMs, recentTopicPartitionMovements, recentSuspendedTopicPartitions, topicPartitionMovementsCleanerExecutor, suspendedTopicPartitionCleanerExecutor, topicPartitionMovementsListeners, suspendedTopicPartitionsListeners, numberOfMovementsByTopicPartition, topicPartitionMovementsCleaner, suspendedTopicPartitionsCleaner);
    }

    GoalOptimizationHistory(AtomicLong epoch, int topicPartitionMaximumMovements, long topicPartitionSuspensionMs, TopicPartitionHistoryPool<TopicPartitionMovement> recentTopicPartitionMovements, TopicPartitionHistoryPool<SuspendedTopicPartition> recentSuspendedTopicPartitions, ExecutorService topicPartitionMovementsCleanerExecutor, ExecutorService suspendedTopicPartitionCleanerExecutor, Collection<GoalOptimizationHistoryListener<TopicPartitionMovement>> topicPartitionMovementsListeners, Collection<GoalOptimizationHistoryListener<SuspendedTopicPartition>> suspendedTopicPartitionsListeners, ConcurrentMap<TopicPartition, Integer> numberOfMovementsByTopicPartition, InterruptiblePoller topicPartitionMovementsCleaner, InterruptiblePoller suspendedTopicPartitionsCleaner) {
        this.epoch = epoch;
        this.topicPartitionMaximumMovements = topicPartitionMaximumMovements;
        this.topicPartitionSuspensionMs = topicPartitionSuspensionMs;
        this.recentTopicPartitionMovements = recentTopicPartitionMovements;
        this.recentSuspendedTopicPartitions = recentSuspendedTopicPartitions;
        this.topicPartitionMovementsCleanerExecutor = topicPartitionMovementsCleanerExecutor;
        this.suspendedTopicPartitionCleanerExecutor = suspendedTopicPartitionCleanerExecutor;
        this.topicPartitionMovementsListeners = topicPartitionMovementsListeners;
        this.suspendedTopicPartitionsListeners = suspendedTopicPartitionsListeners;
        this.numberOfMovementsByTopicPartition = numberOfMovementsByTopicPartition;
        this.topicPartitionMovementsCleaner = topicPartitionMovementsCleaner;
        this.suspendedTopicPartitionsCleaner = suspendedTopicPartitionsCleaner;
        this.startCleaners();
    }

    private void startCleaners() {
        this.topicPartitionMovementsCleanerExecutor.execute(this.topicPartitionMovementsCleaner);
        this.suspendedTopicPartitionCleanerExecutor.execute(this.suspendedTopicPartitionsCleaner);
    }

    public void addTopicPartitionMovementListener(GoalOptimizationHistoryListener<TopicPartitionMovement> topicPartitionMovementListener) {
        this.topicPartitionMovementsListeners.add(topicPartitionMovementListener);
    }

    public void addSuspendedTopicPartitionListener(GoalOptimizationHistoryListener<SuspendedTopicPartition> suspendedTopicPartitionListener) {
        this.suspendedTopicPartitionsListeners.add(suspendedTopicPartitionListener);
    }

    public void record(TopicPartitionMovement topicPartitionMovement) {
        long currentEpoch = this.epoch.get();
        boolean added = this.recentTopicPartitionMovements.add(topicPartitionMovement);
        if (added) {
            TopicPartition tp = topicPartitionMovement.topicPartition();
            int newNumber = this.numberOfMovementsByTopicPartition.compute(tp, (topicPartition, oldNumber) -> oldNumber == null ? 1 : oldNumber + 1);
            for (GoalOptimizationHistoryListener<TopicPartitionMovement> listener : this.topicPartitionMovementsListeners) {
                GoalOptimizationHistory.safeExecute(() -> listener.onNewHistory(topicPartitionMovement));
            }
            this.maybeUpdateSuspendedTopicPartition(tp, newNumber, currentEpoch);
        } else {
            LOG.debug("Received TopicPartitionMovements with outdated epoch: {}, current epoch: {}", (Object)topicPartitionMovement, (Object)currentEpoch);
        }
    }

    private void maybeUpdateSuspendedTopicPartition(TopicPartition tp, int numberOfMovements, long epoch) {
        SuspendedTopicPartition suspendedTopicPartition;
        boolean updated;
        if (numberOfMovements > this.topicPartitionMaximumMovements && (updated = this.recentSuspendedTopicPartitions.update(suspendedTopicPartition = new SuspendedTopicPartition(tp, this.topicPartitionSuspensionMs, epoch)))) {
            for (GoalOptimizationHistoryListener<SuspendedTopicPartition> listener : this.suspendedTopicPartitionsListeners) {
                GoalOptimizationHistory.safeExecute(() -> listener.onNewHistory(suspendedTopicPartition));
            }
        }
    }

    public void clear() {
        long newEpoch = this.epoch.incrementAndGet();
        this.recentTopicPartitionMovements.newEpoch(newEpoch);
        this.recentSuspendedTopicPartitions.newEpoch(newEpoch);
        for (GoalOptimizationHistoryListener<TopicPartitionMovement> goalOptimizationHistoryListener : this.topicPartitionMovementsListeners) {
            GoalOptimizationHistory.safeExecute(() -> listener.onUpdatedEpoch(newEpoch));
        }
        for (GoalOptimizationHistoryListener<AbstractTopicPartitionHistory> goalOptimizationHistoryListener : this.suspendedTopicPartitionsListeners) {
            GoalOptimizationHistory.safeExecute(() -> listener.onUpdatedEpoch(newEpoch));
        }
    }

    public int numberOfMovements(TopicPartition tp) {
        return this.numberOfMovementsByTopicPartition.getOrDefault(tp, 0);
    }

    @Override
    public void close() throws Exception {
        this.topicPartitionMovementsCleanerExecutor.shutdownNow();
        this.suspendedTopicPartitionCleanerExecutor.shutdownNow();
        boolean topicPartitionMovementsCleanerTerminated = false;
        boolean suspendedTopicPartitionCleanerTerminated = false;
        try {
            topicPartitionMovementsCleanerTerminated = this.topicPartitionMovementsCleanerExecutor.awaitTermination(60000L, TimeUnit.MILLISECONDS);
            suspendedTopicPartitionCleanerTerminated = this.suspendedTopicPartitionCleanerExecutor.awaitTermination(60000L, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        catch (Exception e) {
            LOG.error("Exception is thrown while waiting for the termination of cleaners", (Throwable)e);
            throw e;
        }
        finally {
            if (!topicPartitionMovementsCleanerTerminated) {
                LOG.error("TopicPartitionMovementsCleaner didn't terminate within {} ms", (Object)60000L);
            }
            if (!suspendedTopicPartitionCleanerTerminated) {
                LOG.error("SuspendedTopicPartitionsCleaner didn't terminate within {} ms", (Object)60000L);
            }
        }
    }

    private static void safeExecute(Runnable task) {
        try {
            task.run();
        }
        catch (Exception e) {
            LOG.error("Exception were caught during GoalOptimizationHistory notifications", (Throwable)e);
        }
    }

    static class InterruptiblePoller
    implements Runnable {
        private final Runnable task;

        private InterruptiblePoller(Runnable task) {
            this.task = task;
        }

        public static InterruptiblePoller of(Runnable task) {
            return new InterruptiblePoller(task);
        }

        @Override
        public void run() {
            block9: {
                block7: {
                    Throwable thrown = null;
                    try {
                        while (!Thread.interrupted()) {
                            this.task.run();
                        }
                        if (thrown == null) break block7;
                    }
                    catch (Throwable e) {
                        block8: {
                            try {
                                thrown = e;
                                if (thrown == null) break block8;
                            }
                            catch (Throwable throwable) {
                                if (thrown != null) {
                                    LOG.error(String.format("Thread %s exited abnormally", Thread.currentThread().getName()), thrown);
                                } else {
                                    LOG.info("Thread {} exited", (Object)Thread.currentThread().getName());
                                }
                                throw throwable;
                            }
                            LOG.error(String.format("Thread %s exited abnormally", Thread.currentThread().getName()), thrown);
                        }
                        LOG.info("Thread {} exited", (Object)Thread.currentThread().getName());
                    }
                    LOG.error(String.format("Thread %s exited abnormally", Thread.currentThread().getName()), thrown);
                    break block9;
                }
                LOG.info("Thread {} exited", (Object)Thread.currentThread().getName());
            }
        }
    }
}

