/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.kafka.cruisecontrol.detector;

import com.linkedin.cruisecontrol.detector.Anomaly;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControl;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.config.UpdatableSbcGoalsConfig;
import com.linkedin.kafka.cruisecontrol.detector.BrokerFailures;
import com.linkedin.kafka.cruisecontrol.monitor.LoadMonitor;
import io.confluent.databalancer.persistence.ApiStatePersistenceStore;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import javax.annotation.concurrent.GuardedBy;
import kafka.utils.ShutdownableThread;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BrokerFailureDetector
extends ShutdownableThread {
    private static final Logger LOG = LoggerFactory.getLogger(BrokerFailureDetector.class);
    private static final String THREAD_NAME = "SBK_BrokerFailureDetector";
    private static final long MIN_BROKER_FAILURE_REFRESH_INTERVAL_MS = Duration.ofMinutes(10L).toMillis();
    private final KafkaCruiseControl kafkaCruiseControl;
    private final Map<Integer, Long> failedBrokers;
    private final LoadMonitor loadMonitor;
    private Queue<Anomaly> anomalies;
    private Time time;
    private final boolean allowCapacityEstimation;
    private final boolean excludeRecentlyRemovedBrokers;
    @GuardedBy(value="this")
    private final List<BrokerChangedEvent> brokerChangedEvents;
    private boolean initialized;
    private final KafkaCruiseControlConfig config;
    private final Admin adminClient;
    private final UpdatableSbcGoalsConfig updatableSbcGoalsConfig;
    private final ApiStatePersistenceStore persistenceStore;

    public BrokerFailureDetector(KafkaCruiseControlConfig config, Admin adminClient, LoadMonitor loadMonitor, Queue<Anomaly> anomalies, Time time, KafkaCruiseControl kafkaCruiseControl, UpdatableSbcGoalsConfig updatableSbcGoalsConfig, ApiStatePersistenceStore persistenceStore) {
        super(THREAD_NAME, true);
        this.config = config;
        this.adminClient = adminClient;
        this.updatableSbcGoalsConfig = updatableSbcGoalsConfig;
        this.failedBrokers = new HashMap<Integer, Long>();
        this.loadMonitor = loadMonitor;
        this.anomalies = anomalies;
        this.time = time;
        this.kafkaCruiseControl = kafkaCruiseControl;
        this.allowCapacityEstimation = config.getBoolean("anomaly.detection.allow.capacity.estimation");
        this.excludeRecentlyRemovedBrokers = config.getBoolean("broker.failure.exclude.recently.removed.brokers");
        this.persistenceStore = persistenceStore;
        this.brokerChangedEvents = new LinkedList<BrokerChangedEvent>();
    }

    public synchronized void notifyNewBrokers(Set<Integer> newBrokers) {
        LOG.info("Notify new broker arrival: {}", newBrokers);
        this.brokerChangedEvents.add(new NewBrokerEvent(newBrokers));
        this.scheduleDetection();
    }

    public synchronized void notifyDeadBrokers(Set<Integer> deadBrokers) {
        LOG.info("Notify broker removal: {}", deadBrokers);
        this.brokerChangedEvents.add(new DeadBrokerEvent(deadBrokers));
        this.scheduleDetection();
    }

    Queue<Anomaly> getAnomalies() {
        return this.anomalies;
    }

    void setAnomalies(Queue<Anomaly> anomalies) {
        this.anomalies = anomalies;
    }

    void setTime(Time time) {
        this.time = time;
    }

    void startDetection() {
        this.start();
    }

    private void detectBrokerFailures(Collection<BrokerChangedEvent> brokerChangeEvents) throws InterruptedException {
        Collection<Integer> aliveBrokers = KafkaCruiseControlUtils.getAllBrokersInCluster(this.adminClient);
        brokerChangeEvents.forEach(event -> event.updateAliveBrokers(aliveBrokers));
        this.updateFailedBrokers(aliveBrokers);
        this.reportBrokerFailures();
    }

    synchronized void scheduleDetection() {
        LOG.info("Scheduled check for broker failure detection triggered");
        ((Object)((Object)this)).notify();
    }

    public Map<Integer, Long> failedBrokers() {
        return new HashMap<Integer, Long>(this.failedBrokers);
    }

    void shutdownNow() {
        this.shutdown();
    }

    private void loadPersistedFailedBrokerList() {
        this.failedBrokers.clear();
        this.failedBrokers.putAll(this.persistenceStore.getFailedBrokers());
    }

    private Map<Integer, Long> updateFailedBrokers(Collection<Integer> aliveBrokers) throws InterruptedException {
        Set<Integer> currentFailedBrokers = this.loadMonitor.brokersWithReplicas(60000);
        currentFailedBrokers.removeAll(aliveBrokers);
        LOG.info("Alive brokers: {}, failed brokers: {}", aliveBrokers, currentFailedBrokers);
        boolean updated = this.failedBrokers.entrySet().removeIf(entry -> !currentFailedBrokers.contains(entry.getKey()));
        for (Integer brokerId : currentFailedBrokers) {
            if (this.failedBrokers.putIfAbsent(brokerId, this.time.milliseconds()) != null) continue;
            updated = true;
        }
        LOG.info("Updated list of failed broker: {}", this.failedBrokers);
        if (updated) {
            this.persistenceStore.save(this.failedBrokers);
            LOG.info("Saved failed broker list to persistence store: {}", this.failedBrokers);
        }
        return this.failedBrokers;
    }

    private void reportBrokerFailures() {
        if (!this.failedBrokers.isEmpty()) {
            this.anomalies.add(new BrokerFailures(this.kafkaCruiseControl, this.failedBrokers(), this.allowCapacityEstimation, this.excludeRecentlyRemovedBrokers, this.updatableSbcGoalsConfig.config().rebalancingGoals()));
        }
    }

    private void initialize() throws InterruptedException {
        this.loadPersistedFailedBrokerList();
        this.detectBrokerFailures(Collections.emptyList());
        this.initialized = true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void doWork() {
        try {
            LinkedList<BrokerChangedEvent> brokerChangeEvents;
            if (!this.initialized) {
                this.initialize();
                LOG.debug("Broker Failure Detector initialized.");
            }
            long selfHealingThresholdMs = this.config.getLong("broker.failure.self.healing.threshold.ms");
            BrokerFailureDetector brokerFailureDetector = this;
            synchronized (brokerFailureDetector) {
                if (this.brokerChangedEvents.isEmpty()) {
                    long waitTimeoutMs = Math.max(selfHealingThresholdMs / 2L, MIN_BROKER_FAILURE_REFRESH_INTERVAL_MS);
                    ((Object)((Object)this)).wait(waitTimeoutMs);
                }
                brokerChangeEvents = new LinkedList<BrokerChangedEvent>(this.brokerChangedEvents);
                this.brokerChangedEvents.clear();
            }
            LOG.info("Broker change event(s) detected: {}", brokerChangeEvents);
            this.detectBrokerFailures(brokerChangeEvents);
        }
        catch (InterruptedException e) {
            LOG.info("Broker failure detector interrupted. Exiting the doWork loop.");
        }
    }

    private static class NewBrokerEvent
    implements BrokerChangedEvent {
        private final Set<Integer> newBrokers;

        public NewBrokerEvent(Set<Integer> newBrokers) {
            this.newBrokers = newBrokers;
        }

        @Override
        public void updateAliveBrokers(Collection<Integer> aliveBrokers) {
            aliveBrokers.addAll(this.newBrokers);
        }

        public String toString() {
            return "New brokers: " + this.newBrokers.toString();
        }
    }

    private static class DeadBrokerEvent
    implements BrokerChangedEvent {
        private final Set<Integer> deadBrokers;

        public DeadBrokerEvent(Set<Integer> deadBrokers) {
            this.deadBrokers = deadBrokers;
        }

        @Override
        public void updateAliveBrokers(Collection<Integer> aliveBrokers) {
            aliveBrokers.removeAll(this.deadBrokers);
        }

        public String toString() {
            return "Dead brokers: " + this.deadBrokers.toString();
        }
    }

    private static interface BrokerChangedEvent {
        public void updateAliveBrokers(Collection<Integer> var1);
    }
}

