/*
 * Decompiled with CFR 0.152.
 */
package kafka.server.cell;

import java.util.Collections;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import kafka.common.CellLoadDescriptionInternal;
import kafka.controller.ClusterBalanceManager;
import kafka.server.KafkaConfig;
import org.apache.kafka.common.CellLoad;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.controller.metrics.CellMetrics;
import org.apache.kafka.server.util.KafkaScheduler;
import org.apache.kafka.server.util.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.runtime.BoxedUnit;

public class CellLoadRefresher {
    public static final long DEFAULT_REFRESH_INTERVAL_MS = TimeUnit.MILLISECONDS.convert(10L, TimeUnit.MINUTES);
    private static final Logger log = LoggerFactory.getLogger(CellLoadRefresher.class);
    private final BiConsumer<Set<CellLoad>, Long> cellLoadConsumer;
    private final ClusterBalanceManager balanceManager;
    private final Scheduler scheduler;
    private final long refreshIntervalMs;
    private final Time time;
    private final KafkaConfig config;
    private final Optional<CellMetrics> metricsOpt;

    public static Scheduler createDefaultScheduler() {
        return new KafkaScheduler(1, true, "cell-load-refresher-", false);
    }

    public CellLoadRefresher(BiConsumer<Set<CellLoad>, Long> cellLoadConsumer, ClusterBalanceManager balanceManager, Time time, KafkaConfig config, Scheduler scheduler, CellMetrics metrics) {
        this(cellLoadConsumer, balanceManager, time, scheduler, config, metrics, DEFAULT_REFRESH_INTERVAL_MS);
    }

    CellLoadRefresher(BiConsumer<Set<CellLoad>, Long> cellLoadConsumer, ClusterBalanceManager balanceManager, Time time, Scheduler scheduler, KafkaConfig config, CellMetrics metrics, long refreshIntervalMs) {
        this.cellLoadConsumer = cellLoadConsumer;
        this.balanceManager = balanceManager;
        this.scheduler = scheduler;
        this.time = time;
        this.config = config;
        this.refreshIntervalMs = refreshIntervalMs;
        this.metricsOpt = Optional.ofNullable(metrics);
    }

    public void start() {
        this.scheduler.schedule("CellLoadRefresher", this::refreshCellLoads, 0L, this.refreshIntervalMs);
        log.info("CellLoadRefresher scheduler has now started");
    }

    BoxedUnit refreshCellLoads() {
        if (!this.config.cellLoadRefresherEnabled()) {
            return BoxedUnit.UNIT;
        }
        long startTimestampMs = this.time.milliseconds();
        this.balanceManager.cellLoad(Collections.emptyList(), (error, result) -> {
            if (error.error() == Errors.NONE) {
                HashSet<CellLoad> cellLoads = new HashSet<CellLoad>(result.orElse(new CellLoadDescriptionInternal(Collections.emptyList())).getCellLoad());
                long timestampMs = this.time.milliseconds();
                log.info("Received cellLoads of length {}, and took {} milliseconds", (Object)cellLoads.size(), (Object)(timestampMs - startTimestampMs));
                if (!cellLoads.isEmpty()) {
                    this.cellLoadConsumer.accept(cellLoads, timestampMs);
                    this.metricsOpt.ifPresent(metrics -> metrics.updateCellLoads(cellLoads, timestampMs));
                }
            } else {
                log.error("Received error due to {}", (Object)error);
            }
        });
        return BoxedUnit.UNIT;
    }
}

