/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.kafka.cruisecontrol.detector;

import com.linkedin.cruisecontrol.detector.Anomaly;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControl;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.analyzer.OptimizationResult;
import com.linkedin.kafka.cruisecontrol.common.KafkaCruiseControlThreadFactory;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.config.UpdatableSbcGoalsConfig;
import com.linkedin.kafka.cruisecontrol.detector.AnomalyDetectorState;
import com.linkedin.kafka.cruisecontrol.detector.AnomalyDetectorUtils;
import com.linkedin.kafka.cruisecontrol.detector.AnomalyState;
import com.linkedin.kafka.cruisecontrol.detector.BrokerFailureDetector;
import com.linkedin.kafka.cruisecontrol.detector.BrokerFailures;
import com.linkedin.kafka.cruisecontrol.detector.GoalViolationDetector;
import com.linkedin.kafka.cruisecontrol.detector.GoalViolations;
import com.linkedin.kafka.cruisecontrol.detector.KafkaAnomaly;
import com.linkedin.kafka.cruisecontrol.detector.ResourceUtilizationHandler;
import com.linkedin.kafka.cruisecontrol.detector.notifier.AnomalyNotificationResult;
import com.linkedin.kafka.cruisecontrol.detector.notifier.AnomalyNotifier;
import com.linkedin.kafka.cruisecontrol.detector.notifier.AnomalyType;
import com.linkedin.kafka.cruisecontrol.exception.KafkaCruiseControlException;
import com.linkedin.kafka.cruisecontrol.exception.OptimizationFailureException;
import com.linkedin.kafka.cruisecontrol.executor.ExecutorState;
import com.linkedin.kafka.cruisecontrol.monitor.LoadMonitor;
import io.confluent.databalancer.metrics.DataBalancerMetricsRegistry;
import io.confluent.databalancer.operation.EvenClusterLoadStateManager;
import io.confluent.databalancer.persistence.ApiStatePersistenceStore;
import java.util.HashSet;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.common.errors.RebalancePlanComputationException;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AnomalyDetector {
    static final String METRIC_REGISTRY_NAME = "AnomalyDetector";
    private static final int INIT_JITTER_BOUND = 10000;
    private static final int NUM_ANOMALY_DETECTION_THREADS = 2;
    private static final Logger LOG = LoggerFactory.getLogger(AnomalyDetector.class);
    private final KafkaCruiseControl kafkaCruiseControl;
    private AnomalyNotifier anomalyNotifier;
    private GoalViolationDetector goalViolationDetector;
    private BrokerFailureDetector brokerFailureDetector;
    private ResourceUtilizationHandler resourceUtilizationHandler;
    private final boolean isResourceDetectionEnabled;
    private ScheduledExecutorService detectorScheduler;
    private final long anomalyDetectionIntervalMs;
    private final int resourceOptimizationIntervalMs;
    private final LinkedBlockingDeque<Anomaly> anomalies;
    private volatile boolean shutdown;
    private final LoadMonitor loadMonitor;
    private AnomalyDetectorState anomalyDetectorState;
    private volatile Anomaly anomalyInProgress;
    private final AtomicLong numCheckedWithDelay;
    private final Object shutdownLock;
    private final KafkaCruiseControlConfig config;
    private final Time time;
    private final ConfluentAdmin adminClient;
    private final DataBalancerMetricsRegistry metricRegistry;
    private final ApiStatePersistenceStore persistenceStore;
    private final UpdatableSbcGoalsConfig updatableSbcGoalsConfig;

    public AnomalyDetector(KafkaCruiseControlConfig config, ConfluentAdmin adminClient, LoadMonitor loadMonitor, KafkaCruiseControl kafkaCruiseControl, Time time, DataBalancerMetricsRegistry metricRegistry, ApiStatePersistenceStore persistenceStore, UpdatableSbcGoalsConfig updatableSbcGoalsConfig) {
        this.anomalies = new LinkedBlockingDeque();
        this.config = config;
        this.adminClient = adminClient;
        this.anomalyDetectionIntervalMs = config.getLong("anomaly.detection.interval.ms");
        this.isResourceDetectionEnabled = config.getBoolean("resource.utilization.detector.enabled");
        this.resourceOptimizationIntervalMs = config.getInt("resource.utilization.detector.interval.ms");
        this.loadMonitor = loadMonitor;
        this.kafkaCruiseControl = kafkaCruiseControl;
        this.time = time;
        this.metricRegistry = metricRegistry;
        this.persistenceStore = persistenceStore;
        this.shutdown = false;
        this.anomalyInProgress = null;
        this.numCheckedWithDelay = new AtomicLong();
        this.shutdownLock = new Object();
        this.updatableSbcGoalsConfig = updatableSbcGoalsConfig;
    }

    AnomalyDetector(LinkedBlockingDeque<Anomaly> anomalies, long anomalyDetectionIntervalMs, KafkaCruiseControl kafkaCruiseControl, DataBalancerMetricsRegistry dataBalancerMetricsRegistry, AnomalyNotifier anomalyNotifier, GoalViolationDetector goalViolationDetector, BrokerFailureDetector brokerFailureDetector, ScheduledExecutorService detectorScheduler, LoadMonitor loadMonitor, UpdatableSbcGoalsConfig updatableSbcGoalsConfig) {
        this.config = null;
        this.adminClient = null;
        this.persistenceStore = null;
        this.metricRegistry = dataBalancerMetricsRegistry;
        this.anomalies = anomalies;
        this.anomalyDetectionIntervalMs = anomalyDetectionIntervalMs;
        this.resourceOptimizationIntervalMs = 60000;
        this.anomalyNotifier = anomalyNotifier;
        this.goalViolationDetector = goalViolationDetector;
        this.brokerFailureDetector = brokerFailureDetector;
        this.isResourceDetectionEnabled = false;
        this.kafkaCruiseControl = kafkaCruiseControl;
        this.time = Time.SYSTEM;
        this.detectorScheduler = detectorScheduler;
        this.shutdown = false;
        this.loadMonitor = loadMonitor;
        this.anomalyInProgress = null;
        this.numCheckedWithDelay = new AtomicLong();
        this.shutdownLock = new Object();
        Map<AnomalyType, Boolean> selfHealingEnabled = anomalyNotifier.selfHealingEnabled();
        this.anomalyDetectorState = new AnomalyDetectorState((Time)new SystemTime(), selfHealingEnabled, 10, dataBalancerMetricsRegistry);
        this.updatableSbcGoalsConfig = updatableSbcGoalsConfig;
    }

    public EvenClusterLoadStateManager init(Integer brokerId, KafkaCruiseControl.CcStartupMode startupMode) {
        this.anomalyNotifier = this.config.getConfiguredInstance("anomaly.notifier.class", AnomalyNotifier.class);
        this.goalViolationDetector = new GoalViolationDetector(this.config, this.loadMonitor, this.anomalies, this.time, this.kafkaCruiseControl, this.updatableSbcGoalsConfig, startupMode, this.metricRegistry);
        this.brokerFailureDetector = new BrokerFailureDetector(this.config, (Admin)this.adminClient, this.loadMonitor, this.anomalies, this.time, this.kafkaCruiseControl, this.updatableSbcGoalsConfig, this.persistenceStore);
        if (this.isResourceDetectionEnabled) {
            LOG.info("Resource utilization detector is enabled.");
            this.resourceUtilizationHandler = new ResourceUtilizationHandler(this.config, this.loadMonitor, this.time, this.metricRegistry);
        }
        this.detectorScheduler = Executors.newScheduledThreadPool(2, new KafkaCruiseControlThreadFactory(METRIC_REGISTRY_NAME, false, LOG));
        int numCachedRecentAnomalyStates = this.config.getInt("num.cached.recent.anomaly.states");
        this.metricRegistry.newGauge(AnomalyDetector.class, "balancedness-score", this.goalViolationDetector::balancednessScore);
        Map<AnomalyType, Boolean> selfHealingEnabled = this.anomalyNotifier.selfHealingEnabled();
        this.anomalyDetectorState = new AnomalyDetectorState(this.time, selfHealingEnabled, numCachedRecentAnomalyStates, this.metricRegistry);
        return new EvenClusterLoadStateManager(brokerId, this.time, this.persistenceStore, selfHealingEnabled.get((Object)AnomalyType.GOAL_VIOLATION));
    }

    public void startDetection() {
        LOG.info("Starting anomaly detector.");
        this.brokerFailureDetector.startDetection();
        int jitter = new Random().nextInt(10000);
        LOG.debug("Starting goal violation detector with delay of {} ms", (Object)jitter);
        this.detectorScheduler.scheduleAtFixedRate(this.goalViolationDetector, this.anomalyDetectionIntervalMs / 2L + (long)jitter, this.anomalyDetectionIntervalMs, TimeUnit.MILLISECONDS);
        if (this.isResourceDetectionEnabled) {
            this.detectorScheduler.scheduleAtFixedRate(this.resourceUtilizationHandler, this.resourceOptimizationIntervalMs, this.resourceOptimizationIntervalMs, TimeUnit.MILLISECONDS);
        }
        this.detectorScheduler.submit(new AnomalyHandlerTask());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void shutdown() {
        LOG.info("Shutting down anomaly detector.");
        Object object = this.shutdownLock;
        synchronized (object) {
            this.shutdown = true;
        }
        KafkaCruiseControlUtils.executeSilently(this.brokerFailureDetector, BrokerFailureDetector::shutdownNow);
        this.anomalies.addFirst(AnomalyDetectorUtils.SHUTDOWN_ANOMALY);
        KafkaCruiseControlUtils.executeSilently(this.detectorScheduler, this::shutdownDetectorScheduler);
        LOG.info("Anomaly detector shutdown completed.");
    }

    BrokerFailureDetector getBrokerFailureDetector() {
        return this.brokerFailureDetector;
    }

    private void shutdownDetectorScheduler(ScheduledExecutorService detectorScheduler) {
        try {
            detectorScheduler.shutdown();
            detectorScheduler.awaitTermination(this.anomalyDetectionIntervalMs, TimeUnit.MILLISECONDS);
            if (!detectorScheduler.isTerminated()) {
                LOG.warn("The sampling scheduler failed to shutdown in " + this.anomalyDetectionIntervalMs + " ms.");
            }
        }
        catch (InterruptedException e) {
            LOG.warn("Interrupted while waiting for anomaly detector to shutdown.");
        }
    }

    public synchronized AnomalyDetectorState anomalyDetectorState() {
        this.anomalyDetectorState.refreshMetrics(this.anomalyNotifier.selfHealingEnabledRatio(), this.goalViolationDetector.balancednessScore());
        return this.anomalyDetectorState;
    }

    long numSelfHealingStarted() {
        return this.anomalyDetectorState.numSelfHealingStarted();
    }

    long numSelfHealingErrors() {
        return this.anomalyDetectorState.numSelfHealingErrors();
    }

    Anomaly ongoingSelfHealingAnomaly() {
        return this.anomalyDetectorState.ongoingSelfHealingAnomaly();
    }

    public void maybeClearOngoingAnomalyDetectionTimeMs() {
        this.anomalyDetectorState.maybeClearOngoingAnomalyDetectionTimeMs();
    }

    public boolean setSelfHealingFor(AnomalyType anomalyType, boolean isSelfHealingEnabled) {
        boolean oldSelfHealingEnabled = this.anomalyNotifier.setSelfHealingFor(anomalyType, isSelfHealingEnabled);
        this.anomalyDetectorState.setSelfHealingFor(anomalyType, isSelfHealingEnabled);
        return oldSelfHealingEnabled;
    }

    public boolean unevenLoadSelfHealingEnabled() {
        return this.anomalyNotifier.selfHealingEnabled().get((Object)AnomalyType.GOAL_VIOLATION);
    }

    public long numCheckedWithDelay() {
        return this.numCheckedWithDelay.get();
    }

    public void markSelfHealingFinished(String anomalyId) {
        this.anomalyDetectorState.markSelfHealingFinished(anomalyId);
    }

    public void notifyNewBrokers(Set<Integer> newBrokers) {
        this.brokerFailureDetector.notifyNewBrokers(newBrokers);
        this.goalViolationDetector.notifyNewBrokers(newBrokers);
    }

    public void notifyDeadBrokers(Set<Integer> deadBrokers) {
        this.brokerFailureDetector.notifyDeadBrokers(deadBrokers);
        this.goalViolationDetector.notifyDeadBrokers(deadBrokers);
    }

    class AnomalyHandlerTask
    implements Runnable {
        AnomalyHandlerTask() {
        }

        @Override
        public void run() {
            LOG.info("Starting anomaly handler");
            while (true) {
                boolean retryHandling = false;
                AnomalyDetector.this.anomalyInProgress = null;
                try {
                    AnomalyDetector.this.anomalyInProgress = (Anomaly)AnomalyDetector.this.anomalies.takeFirst();
                    LOG.trace("Processing anomaly {}.", (Object)AnomalyDetector.this.anomalyInProgress);
                    if (AnomalyDetector.this.anomalyInProgress == AnomalyDetectorUtils.SHUTDOWN_ANOMALY) {
                        AnomalyDetector.this.anomalyInProgress = null;
                        break;
                    }
                    this.handleAnomalyInProgress();
                }
                catch (InterruptedException e) {
                    LOG.debug("Received interrupted exception.", (Throwable)e);
                    retryHandling = true;
                }
                catch (KafkaCruiseControlException kcce) {
                    LOG.warn("Anomaly handler received exception when trying to fix the anomaly {}.", (Object)AnomalyDetector.this.anomalyInProgress, (Object)kcce);
                    retryHandling = true;
                }
                catch (RebalancePlanComputationException rebalancePlanComputationException) {
                    LOG.warn("Anomaly handler received an exception when trying to compute a plan for anomaly {}.", (Object)AnomalyDetector.this.anomalyInProgress, (Object)rebalancePlanComputationException);
                    retryHandling = true;
                }
                catch (Throwable t) {
                    LOG.error("Uncaught exception in anomaly handler.", t);
                    retryHandling = true;
                }
                if (!retryHandling || AnomalyDetector.this.anomalyInProgress == null) continue;
                this.checkWithDelay(AnomalyDetector.this.anomalyDetectionIntervalMs);
            }
            LOG.info("Anomaly handler exited.");
        }

        private void handleAnomalyInProgress() throws Exception {
            AnomalyType anomalyType = AnomalyDetectorUtils.getAnomalyType(AnomalyDetector.this.anomalyInProgress);
            AnomalyDetector.this.anomalyDetectorState.addAnomalyDetection(anomalyType, AnomalyDetector.this.anomalyInProgress);
            ExecutorState.State executorState = AnomalyDetector.this.kafkaCruiseControl.executionState();
            if (executorState != ExecutorState.State.NO_TASK_IN_PROGRESS) {
                LOG.debug("Schedule delayed check for anomaly {} because executor is in {} state", (Object)AnomalyDetector.this.anomalyInProgress, (Object)executorState);
                this.checkWithDelay(AnomalyDetector.this.anomalyDetectionIntervalMs);
            } else if (AnomalyDetector.this.kafkaCruiseControl.executorIsReserved()) {
                LOG.debug("Ignoring anomaly {} because the executor is reserved", (Object)AnomalyDetector.this.anomalyInProgress);
                AnomalyDetector.this.anomalyDetectorState.onAnomalyHandle(AnomalyDetector.this.anomalyInProgress, AnomalyState.Status.IGNORED);
            } else {
                this.processAnomalyInProgress(anomalyType);
            }
        }

        private void processAnomalyInProgress(AnomalyType anomalyType) throws Exception {
            AnomalyDetector.this.anomalyDetectorState.markAnomalyRate(anomalyType);
            AnomalyNotificationResult notificationResult = this.notifyAnomalyInProgress(anomalyType);
            if (notificationResult != null) {
                AnomalyDetector.this.anomalyDetectorState.maybeSetOngoingAnomalyDetectionTimeMs();
                switch (notificationResult.action()) {
                    case FIX: {
                        this.fixAnomalyInProgress(anomalyType);
                        break;
                    }
                    case CHECK: {
                        this.checkWithDelay(notificationResult.delay());
                        break;
                    }
                    case IGNORE: {
                        AnomalyDetector.this.anomalyDetectorState.onAnomalyHandle(AnomalyDetector.this.anomalyInProgress, AnomalyState.Status.IGNORED);
                        break;
                    }
                    default: {
                        throw new IllegalStateException("Unrecognized anomaly notification result.");
                    }
                }
            }
        }

        private AnomalyNotificationResult notifyAnomalyInProgress(AnomalyType anomalyType) {
            AnomalyNotificationResult notificationResult = null;
            switch (anomalyType) {
                case GOAL_VIOLATION: {
                    GoalViolations goalViolations = (GoalViolations)AnomalyDetector.this.anomalyInProgress;
                    notificationResult = AnomalyDetector.this.anomalyNotifier.onGoalViolation(goalViolations);
                    break;
                }
                case BROKER_FAILURE: {
                    BrokerFailures brokerFailures = (BrokerFailures)AnomalyDetector.this.anomalyInProgress;
                    notificationResult = AnomalyDetector.this.anomalyNotifier.onBrokerFailure(brokerFailures);
                    break;
                }
                default: {
                    throw new IllegalStateException("Unrecognized anomaly type.");
                }
            }
            LOG.debug("Received notification result {}", (Object)notificationResult);
            return notificationResult;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void checkWithDelay(long delay) {
            if (AnomalyDetectorUtils.getAnomalyType(AnomalyDetector.this.anomalyInProgress) == AnomalyType.BROKER_FAILURE) {
                Object object = AnomalyDetector.this.shutdownLock;
                synchronized (object) {
                    if (AnomalyDetector.this.shutdown) {
                        LOG.debug("Skip delayed checking anomaly {}, because anomaly detector is shutting down.", (Object)AnomalyDetector.this.anomalyInProgress);
                    } else {
                        LOG.debug("Scheduling broker failure detection with delay of {} ms", (Object)delay);
                        AnomalyDetector.this.numCheckedWithDelay.incrementAndGet();
                        AnomalyDetector.this.detectorScheduler.schedule(AnomalyDetector.this.brokerFailureDetector::scheduleDetection, delay, TimeUnit.MILLISECONDS);
                        AnomalyDetector.this.anomalyDetectorState.onAnomalyHandle(AnomalyDetector.this.anomalyInProgress, AnomalyState.Status.CHECK_WITH_DELAY);
                    }
                }
            } else {
                AnomalyDetector.this.anomalyDetectorState.onAnomalyHandle(AnomalyDetector.this.anomalyInProgress, AnomalyState.Status.IGNORED);
            }
        }

        private boolean isAnomalyInProgressReadyToFix(AnomalyType anomalyType) {
            HashSet<Integer> failedBrokerIds = new HashSet();
            if (AnomalyDetector.this.brokerFailureDetector.failedBrokers() != null) {
                failedBrokerIds = AnomalyDetector.this.brokerFailureDetector.failedBrokers().keySet();
            }
            if (AnomalyDetector.this.kafkaCruiseControl.meetCompletenessRequirements(AnomalyDetector.this.anomalyInProgress.goalsConfig().goals(), failedBrokerIds)) {
                return true;
            }
            LOG.warn("Skipping {} fix because load completeness requirement is not met for goals.", (Object)anomalyType);
            AnomalyDetector.this.anomalyDetectorState.onAnomalyHandle(AnomalyDetector.this.anomalyInProgress, AnomalyState.Status.COMPLETENESS_NOT_READY);
            return false;
        }

        private OptimizationResult.PlaintextSummary optimizationResultSummary(AnomalyType anomalyType) {
            switch (anomalyType) {
                case GOAL_VIOLATION: 
                case BROKER_FAILURE: {
                    return ((KafkaAnomaly)AnomalyDetector.this.anomalyInProgress).optimizationResultSummary();
                }
            }
            throw new IllegalStateException("Unrecognized anomaly type.");
        }

        private void logSelfHealingOperation(String anomalyId, OptimizationFailureException ofe, OptimizationResult.PlaintextSummary optimizationResultSummary) {
            if (optimizationResultSummary != null) {
                if (LOG.isDebugEnabled()) {
                    LOG.info("[{}] Self-healing started successfully. Logging verbose summary in the next DEBUG log", (Object)anomalyId);
                    LOG.debug("[{}] Self-healing started successfully:\n{}", (Object)anomalyId, (Object)optimizationResultSummary.verboseSummary());
                } else {
                    LOG.info("[{}] Self-healing started successfully:\n{}", (Object)anomalyId, (Object)optimizationResultSummary);
                }
            } else {
                LOG.warn("[{}] Self-healing failed to start:\n{}", (Object)anomalyId, (Object)ofe);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void fixAnomalyInProgress(AnomalyType anomalyType) throws Exception {
            Object object = AnomalyDetector.this.shutdownLock;
            synchronized (object) {
                if (AnomalyDetector.this.shutdown) {
                    LOG.info("Skip fixing anomaly {}, because anomaly detector is shutting down.", (Object)AnomalyDetector.this.anomalyInProgress);
                } else {
                    boolean isReadyToFix = this.isAnomalyInProgressReadyToFix(anomalyType);
                    if (isReadyToFix) {
                        LOG.info("Fixing anomaly {}", (Object)AnomalyDetector.this.anomalyInProgress);
                        boolean startedSuccessfully = false;
                        String anomalyId = AnomalyDetector.this.anomalyInProgress.anomalyId();
                        AnomalyDetector.this.anomalyDetectorState.onAnomalyHandle(AnomalyDetector.this.anomalyInProgress, AnomalyState.Status.ATTEMPTING_FIX);
                        try {
                            startedSuccessfully = AnomalyDetector.this.anomalyInProgress.fix();
                            OptimizationResult.PlaintextSummary optimizationResult = this.optimizationResultSummary(anomalyType);
                            this.logSelfHealingOperation(anomalyId, null, optimizationResult);
                        }
                        catch (OptimizationFailureException ofe) {
                            try {
                                this.logSelfHealingOperation(anomalyId, ofe, null);
                                throw ofe;
                            }
                            catch (Throwable throwable) {
                                AnomalyDetector.this.anomalyDetectorState.onAnomalyHandle(AnomalyDetector.this.anomalyInProgress, startedSuccessfully ? AnomalyState.Status.FIX_STARTED : AnomalyState.Status.FIX_FAILED_TO_START);
                                if (startedSuccessfully) {
                                    AnomalyDetector.this.anomalyDetectorState.incrementNumSelfHealingStarted();
                                    LOG.info("[{}] Self-healing started successfully.", (Object)anomalyId);
                                } else {
                                    AnomalyDetector.this.markSelfHealingFinished(AnomalyDetector.this.anomalyInProgress.anomalyId());
                                    AnomalyDetector.this.anomalyDetectorState.incrementNumSelfHealingErrors();
                                    LOG.warn("[{}] Self-healing failed to start.", (Object)anomalyId);
                                }
                                throw throwable;
                            }
                        }
                        AnomalyDetector.this.anomalyDetectorState.onAnomalyHandle(AnomalyDetector.this.anomalyInProgress, startedSuccessfully ? AnomalyState.Status.FIX_STARTED : AnomalyState.Status.FIX_FAILED_TO_START);
                        if (startedSuccessfully) {
                            AnomalyDetector.this.anomalyDetectorState.incrementNumSelfHealingStarted();
                            LOG.info("[{}] Self-healing started successfully.", (Object)anomalyId);
                        } else {
                            AnomalyDetector.this.markSelfHealingFinished(AnomalyDetector.this.anomalyInProgress.anomalyId());
                            AnomalyDetector.this.anomalyDetectorState.incrementNumSelfHealingErrors();
                            LOG.warn("[{}] Self-healing failed to start.", (Object)anomalyId);
                        }
                    }
                    this.handlePostFixAnomaly(isReadyToFix);
                }
            }
        }

        private void handlePostFixAnomaly(boolean isReadyToFix) {
            AnomalyDetector.this.anomalies.clear();
            AnomalyDetector.this.detectorScheduler.schedule(AnomalyDetector.this.brokerFailureDetector::scheduleDetection, isReadyToFix ? 0L : AnomalyDetector.this.anomalyDetectionIntervalMs, TimeUnit.MILLISECONDS);
        }
    }
}

