/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.multitenant;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets;
import io.confluent.kafka.multitenant.BasePhysicalClusterMetadata;
import io.confluent.kafka.multitenant.KafkaLogicalClusterMetadata;
import io.confluent.kafka.multitenant.SslCertificateManager;
import io.confluent.kafka.multitenant.TenantLifecycleManager;
import io.confluent.kafka.multitenant.quota.MultiTenantQuotaConfig;
import io.confluent.kafka.multitenant.quota.TenantQuotaCallback;
import io.confluent.kafka.multitenant.utils.AuthUtils;
import java.io.IOException;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.MetricValueProvider;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Min;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PhysicalClusterMetadata
extends BasePhysicalClusterMetadata<KafkaLogicalClusterMetadata> {
    private static final Logger LOG = LoggerFactory.getLogger(PhysicalClusterMetadata.class);
    static final String DATA_DIR_NAME = "..data";
    private static final String LOGICAL_CLUSTER_FILE_EXT_WITH_DOT = ".json";
    private static final Long CLOSE_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(30L);
    private static final String LKC_LOAD_METRICS_GROUP_NAME = "confluent-lkc-load-metrics";
    private static final String LKC_METADATA_LOAD_TIME_FROM_FS_UPDATE_SENSOR_NAME = "lkc-metadata-load-time-from-fs-update";
    static final String LKC_METADATA_LOAD_TIME_FROM_FS_UPDATE_MIN_METRIC_NAME = "lkc-metadata-load-time-from-fs-update-min";
    static final String LKC_METADATA_LOAD_TIME_FROM_FS_UPDATE_MAX_METRIC_NAME = "lkc-metadata-load-time-from-fs-update-max";
    static final String LKC_METADATA_LOAD_TIME_FROM_FS_UPDATE_AVG_METRIC_NAME = "lkc-metadata-load-time-from-fs-update-avg";
    private static final String LKC_METADATA_END_TO_END_LOAD_TIME_SENSOR_NAME = "lkc-metadata-end-to-end-load-time";
    static final String LKC_METADATA_END_TO_END_LOAD_TIME_MIN_METRIC_NAME = "lkc-metadata-end-to-end-load-time-min";
    static final String LKC_METADATA_END_TO_END_LOAD_TIME_MAX_METRIC_NAME = "lkc-metadata-end-to-end-load-time-max";
    static final String LKC_METADATA_END_TO_END_LOAD_TIME_AVG_METRIC_NAME = "lkc-metadata-end-to-end-load-time-avg";
    private Set<String> kafkaLogicalClusterIds = new HashSet<String>();
    private static final String NUMBER_OF_TENANTS_GROUP_NAME = "confluent-number-of-tenants";
    static final String NUMBER_OF_TENANTS_METRIC_NAME = "number-of-tenants";
    private final IntGauge numberOfTenantsMetric = new IntGauge();
    private final Sensor lkcTimeToLoadFromFilesytemUpdateSensor;
    private final Sensor lkcTimeToLoadEndToEndSensor;
    private final Time time;
    private String logicalClustersDir;
    private List<String> watchDirs = new ArrayList<String>();
    private final Map<String, KafkaLogicalClusterMetadata> logicalClusterMap;
    final MetadataChangeListener dirWatcher;
    private final Thread dirListenerThread;
    private final ScheduledExecutorService executorService;
    private long reloadDelaysMs;
    private volatile Future<?> reloadFuture = null;
    private final ReadWriteLock cacheLock;
    private boolean disableSslCertManager = false;
    private final AtomicReference<State> state;
    private final Set<String> staleLogicalClusters;
    public TenantLifecycleManager tenantLifecycleManager;
    public SslCertificateManager sslCertificateManager;

    public PhysicalClusterMetadata(Metrics metrics) {
        this(metrics, Time.SYSTEM);
    }

    public PhysicalClusterMetadata(Metrics metrics, Time time) {
        this.lkcTimeToLoadFromFilesytemUpdateSensor = metrics.sensor(LKC_METADATA_LOAD_TIME_FROM_FS_UPDATE_SENSOR_NAME);
        this.lkcTimeToLoadFromFilesytemUpdateSensor.add(metrics.metricName(LKC_METADATA_LOAD_TIME_FROM_FS_UPDATE_MIN_METRIC_NAME, LKC_LOAD_METRICS_GROUP_NAME, "The minimum time to load logical cluster metadata from file system update in ms"), (MeasurableStat)new Min());
        this.lkcTimeToLoadFromFilesytemUpdateSensor.add(metrics.metricName(LKC_METADATA_LOAD_TIME_FROM_FS_UPDATE_MAX_METRIC_NAME, LKC_LOAD_METRICS_GROUP_NAME, "The maximum time to load logical cluster metadata from file system update in ms"), (MeasurableStat)new Max());
        this.lkcTimeToLoadFromFilesytemUpdateSensor.add(metrics.metricName(LKC_METADATA_LOAD_TIME_FROM_FS_UPDATE_AVG_METRIC_NAME, LKC_LOAD_METRICS_GROUP_NAME, "The mean time to load logical cluster metadata from file system update in ms"), (MeasurableStat)new Avg());
        this.lkcTimeToLoadEndToEndSensor = metrics.sensor(LKC_METADATA_END_TO_END_LOAD_TIME_SENSOR_NAME);
        this.lkcTimeToLoadEndToEndSensor.add(metrics.metricName(LKC_METADATA_END_TO_END_LOAD_TIME_MIN_METRIC_NAME, LKC_LOAD_METRICS_GROUP_NAME, "The minimum end to end load time of logical cluster metadata in ms"), (MeasurableStat)new Min());
        this.lkcTimeToLoadEndToEndSensor.add(metrics.metricName(LKC_METADATA_END_TO_END_LOAD_TIME_MAX_METRIC_NAME, LKC_LOAD_METRICS_GROUP_NAME, "The maximum end to end load time of logical cluster metadata in ms"), (MeasurableStat)new Max());
        this.lkcTimeToLoadEndToEndSensor.add(metrics.metricName(LKC_METADATA_END_TO_END_LOAD_TIME_AVG_METRIC_NAME, LKC_LOAD_METRICS_GROUP_NAME, "The mean end to end load time of logical cluster metadata in ms"), (MeasurableStat)new Avg());
        MetricName numTenantsMetricName = metrics.metricName(NUMBER_OF_TENANTS_METRIC_NAME, NUMBER_OF_TENANTS_GROUP_NAME, "The number of tenants (i.e. logical clusters) in the physical cluster");
        if (!metrics.metrics().containsKey(numTenantsMetricName)) {
            metrics.addMetric(numTenantsMetricName, (MetricValueProvider)this.numberOfTenantsMetric);
        }
        this.time = time;
        this.state = new AtomicReference<State>(State.NOT_READY);
        this.cacheLock = new ReentrantReadWriteLock();
        this.logicalClusterMap = new ConcurrentHashMap<String, KafkaLogicalClusterMetadata>();
        this.staleLogicalClusters = new CopyOnWriteArraySet<String>();
        this.dirWatcher = new MetadataChangeListener();
        this.dirListenerThread = new Thread((Runnable)this.dirWatcher, "confluent-tenants-change-listener");
        this.executorService = Executors.newSingleThreadScheduledExecutor(runnable -> {
            Thread thread = new Thread(runnable, "physical-cluster-metadata-retry");
            thread.setDaemon(true);
            return thread;
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void configure(Map<String, ?> configs) {
        Object dirConfigValue;
        String instanceKey = AuthUtils.getBrokerSessionUuid(configs);
        this.tenantLifecycleManager = new TenantLifecycleManager(configs, this.time);
        Map<String, ?> nextConfigs = configs;
        String enableSslDynamicStoreUpdate = (String)nextConfigs.get("confluent.broker.listeners.ssl.file.store.reload.enable");
        if (Boolean.parseBoolean(enableSslDynamicStoreUpdate)) {
            this.disableSslCertManager = true;
        }
        if (!this.disableSslCertManager) {
            this.sslCertificateManager = new SslCertificateManager(configs);
        }
        if ((dirConfigValue = configs.get("multitenant.metadata.dir")) == null) {
            throw new ConfigException("multitenant.metadata.dir is not set");
        }
        this.logicalClustersDir = dirConfigValue.toString();
        Object reloadDelayValue = configs.get("multitenant.metadata.reload.delay.ms");
        this.reloadDelaysMs = reloadDelayValue == null ? ConfluentConfigs.MULTITENANT_METADATA_RELOAD_DELAY_MS_DEFAULT.longValue() : ((Long)reloadDelayValue).longValue();
        this.linkRequestPercentageMultipler = ConfluentConfigs.linkRequestPercentageMultiplier(configs);
        Map map = INSTANCES;
        synchronized (map) {
            BasePhysicalClusterMetadata instance = (BasePhysicalClusterMetadata)INSTANCES.get(instanceKey);
            if (instance != null) {
                if (this != instance) {
                    throw new UnsupportedOperationException("PhysicalClusterMetadata instance already exists for broker session " + instanceKey);
                }
                LOG.info("Skipping configuring this instance (broker session {}): Already configured.", (Object)instanceKey);
                return;
            }
            INSTANCES.put(instanceKey, this);
        }
        try {
            this.startWatching();
        }
        catch (IOException ioe) {
            this.close(instanceKey);
            throw new ConfigException("Failed to load PhysicalClusterMetadata: " + ioe.getMessage());
        }
        LOG.warn("Configured and started instance for broker session {}", (Object)instanceKey);
    }

    void configure(String logicalClustersDir, long reloadDelaysMs, ConfluentAdmin adminClient, String brokerId, String sslCertsPath, List<String> sslListenerNames, boolean useBcfks) {
        this.configure(logicalClustersDir, reloadDelaysMs, new TenantLifecycleManager(0L, null), new SslCertificateManager(brokerId, null, sslCertsPath, s -> adminClient, sslListenerNames, useBcfks));
    }

    void configure(String logicalClustersDir, long reloadDelaysMs, TenantLifecycleManager lifecycleManager, SslCertificateManager sslCertificateManager) {
        this.reloadDelaysMs = reloadDelaysMs;
        this.logicalClustersDir = logicalClustersDir;
        this.tenantLifecycleManager = lifecycleManager;
        this.sslCertificateManager = sslCertificateManager;
    }

    protected void reconfigureQuotas() {
        this.cacheLock.readLock().lock();
        try {
            this.updateQuotas();
        }
        catch (Exception e) {
            LOG.warn("Failed to reconfigure quotas of at least one logical cluster", (Throwable)e);
        }
        finally {
            this.cacheLock.readLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close(String brokerSessionUuid) {
        Map map = INSTANCES;
        synchronized (map) {
            BasePhysicalClusterMetadata instance = (BasePhysicalClusterMetadata)INSTANCES.get(brokerSessionUuid);
            if (instance != null && instance == this) {
                INSTANCES.remove(brokerSessionUuid);
                LOG.info("Removed instance for broker session {}", (Object)brokerSessionUuid);
            } else if (instance != null) {
                LOG.info("Closing instance that doesn't match the instance in the static map with the same broker session {} will not remove that instance from the map.", (Object)brokerSessionUuid);
            }
        }
        this.shutdown();
    }

    public void startClients(Function<String, Admin> adminClientSupplier) {
        this.tenantLifecycleManager.setAdminSupplierAndCreateClient(adminClientSupplier);
        if (!this.disableSslCertManager) {
            this.sslCertificateManager.setAdminSupplierAndCreateClient(adminClientSupplier);
            this.sslCertificateManager.loadSslCertFiles();
        }
    }

    private void addDirForWatchService(String dir) {
        this.watchDirs.add(dir);
    }

    public Map<Endpoint, CompletableFuture<Void>> start(Map<String, Object> interBrokerClientConfigs, Collection<Endpoint> endpoints) {
        this.interBrokerClientConfig = new HashMap<String, Object>(interBrokerClientConfigs);
        return endpoints.stream().collect(Collectors.toMap(Function.identity(), a -> CompletableFuture.completedFuture(null)));
    }

    void startWatching() throws IOException {
        if (State.CLOSED.equals((Object)this.state.get())) {
            throw new IllegalStateException("Physical Cluster Metadata Cache already shut down.");
        }
        if (this.state.compareAndSet(State.NOT_READY, State.RUNNING)) {
            this.addDirForWatchService(this.logicalClustersDir);
            try {
                this.dirWatcher.register();
            }
            catch (IOException ioe) {
                this.state.compareAndSet(State.RUNNING, State.NOT_READY);
                LOG.error("Failed to register watch service for = {}", this.watchDirs, (Object)ioe);
                this.dirWatcher.close();
                throw ioe;
            }
            if (!this.disableSslCertManager) {
                try {
                    this.sslCertificateManager.startWatching();
                }
                catch (IOException ioe) {
                    this.state.compareAndSet(State.RUNNING, State.NOT_READY);
                    throw ioe;
                }
            }
            this.loadAllFiles();
            this.reloadFuture = this.executorService.scheduleWithFixedDelay(this::reloadCache, this.reloadDelaysMs, this.reloadDelaysMs, TimeUnit.MILLISECONDS);
            LOG.info("Loaded logical cluster metadata from files in dir={} (known) stale logical clusters={}", (Object)this.logicalClustersDir, this.staleLogicalClusters);
            this.dirListenerThread.start();
        }
    }

    void shutdown() {
        LOG.info("Shutting down");
        if (this.state.getAndSet(State.CLOSED) != State.CLOSED) {
            try {
                this.dirListenerThread.interrupt();
                this.dirListenerThread.join(CLOSE_TIMEOUT_MS);
            }
            catch (InterruptedException ie) {
                LOG.error("Shutting down tenant metadata listener thread was interrupted", (Throwable)ie);
            }
            if (!this.disableSslCertManager) {
                this.sslCertificateManager.shutdown();
            }
            if (this.reloadFuture != null) {
                this.reloadFuture.cancel(true);
            }
            this.executorService.shutdownNow();
            try {
                this.executorService.awaitTermination(CLOSE_TIMEOUT_MS, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException e) {
                LOG.debug("Shutting down was interrupted", (Throwable)e);
            }
            this.tenantLifecycleManager.close();
            if (!this.disableSslCertManager) {
                this.sslCertificateManager.close();
            }
            LOG.info("Closed Physical Cluster Metadata Cache");
        }
    }

    public boolean isUp() {
        return State.RUNNING.equals((Object)this.state.get());
    }

    public String dedicatedLogicalClusterId() {
        if (this.kafkaLogicalClusterIds.size() == 1) {
            return (String)this.kafkaLogicalClusterIds.stream().findFirst().get();
        }
        return "";
    }

    public Set<String> kafkaLogicalClusterIds() {
        return this.kafkaLogicalClusterIds;
    }

    public Set<String> logicalClusterIds() {
        this.ensureOpen();
        return Sets.difference(this.logicalClusterMap.keySet(), this.staleLogicalClusters).immutableCopy();
    }

    public Set<String> logicalClusterIdsIncludingStale() {
        this.ensureOpen();
        return Sets.union(this.logicalClusterMap.keySet(), this.staleLogicalClusters).immutableCopy();
    }

    public KafkaLogicalClusterMetadata metadata(String logicalClusterId) {
        this.ensureOpen();
        if (!this.staleLogicalClusters.contains(logicalClusterId)) {
            return this.logicalClusterMap.get(logicalClusterId);
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void loadAllFiles() {
        Path logicalClustersDataDir = this.logicalClustersDataDir();
        if (!Files.exists(logicalClustersDataDir, new LinkOption[0])) {
            LOG.info("{} does not exist.", (Object)logicalClustersDataDir);
            return;
        }
        try (Stream<Path> fileStream = Files.list(logicalClustersDataDir);){
            HashSet<String> tempKafkaLogicalClusterIds = new HashSet<String>();
            HashSet<String> logicalClustersInDir = new HashSet<String>();
            for (Path filePath : fileStream.collect(Collectors.toList())) {
                String logicalClusterId = this.loadLogicalClusterMetadata(filePath);
                if (logicalClusterId == null) continue;
                logicalClustersInDir.add(logicalClusterId);
                KafkaLogicalClusterMetadata lcMeta = this.logicalClusterMap.get(logicalClusterId);
                if (lcMeta == null || !lcMeta.isKafkaLogicalCluster()) continue;
                tempKafkaLogicalClusterIds.add(logicalClusterId);
            }
            this.kafkaLogicalClusterIds = tempKafkaLogicalClusterIds;
            HashSet removedLogicalClusters = new HashSet();
            removedLogicalClusters.addAll(Sets.difference(this.logicalClusterIdsIncludingStale(), logicalClustersInDir));
            for (String removedLogicalCluster : removedLogicalClusters) {
                this.removeLogicalCluster(removedLogicalCluster);
                this.markUpToDate(removedLogicalCluster);
                LOG.info("Removed logical cluster {}", (Object)removedLogicalCluster);
            }
            for (String deactivatedCluster : this.tenantLifecycleManager.inactiveClusters()) {
                this.removeLogicalCluster(deactivatedCluster);
                this.markUpToDate(deactivatedCluster);
            }
            this.tenantLifecycleManager.deleteTenants();
        }
        catch (IOException ioe) {
            LOG.warn("Failed to read metadata files from dir={}", (Object)this.logicalClustersDataDir(), (Object)ioe);
        }
        finally {
            this.updateQuotas();
        }
    }

    void reloadCache() {
        this.cacheLock.writeLock().lock();
        try {
            if (!State.CLOSED.equals((Object)this.state.get())) {
                if (!this.staleLogicalClusters.isEmpty()) {
                    LOG.info("Re-loading cache: (known) stale logical clusters={}", this.staleLogicalClusters);
                }
                this.loadAllFiles();
            }
        }
        finally {
            this.cacheLock.writeLock().unlock();
        }
        if (!this.disableSslCertManager) {
            this.sslCertificateManager.loadSslCertFiles();
        }
    }

    private void markStale(String logicalClusterId) {
        this.staleLogicalClusters.add(logicalClusterId);
    }

    private void markUpToDate(String logicalClusterId) {
        this.staleLogicalClusters.remove(logicalClusterId);
    }

    private void updateLogicalClusterMetadata() {
        this.cacheLock.readLock().lock();
        try {
            this.loadAllFiles();
        }
        catch (Exception e) {
            LOG.warn("Failed to load/update metadata of at least one logical cluster", (Throwable)e);
        }
        finally {
            this.cacheLock.readLock().unlock();
        }
    }

    private String loadLogicalClusterMetadata(Path lcFile) {
        String logicalClusterId = PhysicalClusterMetadata.logicalClusterId(lcFile);
        if (logicalClusterId == null) {
            LOG.warn("Ignoring create/update of a non-json file {}", (Object)lcFile);
            return null;
        }
        try {
            ObjectMapper objectMapper = new ObjectMapper();
            KafkaLogicalClusterMetadata lcMeta = (KafkaLogicalClusterMetadata)objectMapper.readValue(lcFile.toFile(), KafkaLogicalClusterMetadata.class);
            if (!logicalClusterId.equals(lcMeta.logicalClusterId()) || !lcMeta.isValid()) {
                LOG.warn("Logical cluster file {} has invalid metadata {}.", (Object)lcFile, (Object)lcMeta);
                this.markStale(logicalClusterId);
                return logicalClusterId;
            }
            KafkaLogicalClusterMetadata oldMeta = this.addLogicalCluster(lcMeta.logicalClusterId(), lcMeta);
            this.markUpToDate(logicalClusterId);
            this.tenantLifecycleManager.updateTenantState(lcMeta);
            if (this.addOrUpdate(oldMeta, lcMeta)) {
                Date creationDate;
                LOG.info("Added/Updated logical cluster {}", (Object)lcMeta);
                long now = this.time.milliseconds();
                long lastModified = lcFile.toFile().lastModified();
                if (lastModified == 0L) {
                    throw new KafkaException("File " + String.valueOf(lcFile.toFile()) + " invalid");
                }
                long timeSinceFsUpdate = now - lastModified;
                this.lkcTimeToLoadFromFilesytemUpdateSensor.record((double)timeSinceFsUpdate);
                if (oldMeta == null && lcMeta.lifecycleMetadata() != null && (creationDate = lcMeta.lifecycleMetadata().creationDate()) != null) {
                    long endToEndLoadTime = now - creationDate.getTime();
                    this.lkcTimeToLoadEndToEndSensor.record((double)endToEndLoadTime);
                }
            }
        }
        catch (Exception e) {
            LOG.error("Failed to load metadata file for logical cluster {}", (Object)logicalClusterId, (Object)e);
            this.markStale(logicalClusterId);
        }
        return logicalClusterId;
    }

    private boolean addOrUpdate(KafkaLogicalClusterMetadata oldMeta, KafkaLogicalClusterMetadata newMeta) {
        boolean updateNeeded = !newMeta.equals((Object)oldMeta);
        boolean expiredCluster = oldMeta == null && !newMeta.isActive();
        return updateNeeded && !expiredCluster;
    }

    private void updateQuotas() {
        Map<String, MultiTenantQuotaConfig> tenantQuotas = this.logicalClusterMap.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> ((KafkaLogicalClusterMetadata)e.getValue()).quotaConfig(this.linkRequestPercentageMultipler)));
        TenantQuotaCallback.updateQuotas(tenantQuotas, MultiTenantQuotaConfig.UNLIMITED_QUOTA);
    }

    private static String logicalClusterId(Path lcFile) {
        String fileName = lcFile.getFileName().toString();
        int indexOfDot = fileName.lastIndexOf(LOGICAL_CLUSTER_FILE_EXT_WITH_DOT);
        return indexOfDot < 0 ? null : fileName.substring(0, indexOfDot);
    }

    private Path logicalClustersDataDir() {
        return Paths.get(this.logicalClustersDir, DATA_DIR_NAME);
    }

    private KafkaLogicalClusterMetadata addLogicalCluster(String clusterId, KafkaLogicalClusterMetadata metadata) {
        KafkaLogicalClusterMetadata res = this.logicalClusterMap.put(clusterId, metadata);
        this.numberOfTenantsMetric.update(this.logicalClusterMap.size());
        return res;
    }

    private void removeLogicalCluster(String clusterId) {
        this.logicalClusterMap.remove(clusterId);
        this.numberOfTenantsMetric.update(this.logicalClusterMap.size());
    }

    private static class IntGauge
    implements Gauge<Integer> {
        private int value;

        private IntGauge() {
        }

        public synchronized Integer value(MetricConfig config, long now) {
            return this.value;
        }

        synchronized int update(int value) {
            this.value = value;
            return this.value;
        }
    }

    public static enum State {
        NOT_READY,
        RUNNING,
        CLOSED;

    }

    class MetadataChangeListener
    implements Runnable {
        private WatchService watchService = null;
        private Path dirPath = null;
        private Map<WatchKey, Path> watchKeyPathMap = new HashMap<WatchKey, Path>();

        MetadataChangeListener() {
        }

        void register() throws IOException {
            this.watchService = FileSystems.getDefault().newWatchService();
            for (String watchDir : PhysicalClusterMetadata.this.watchDirs) {
                this.dirPath = Paths.get(watchDir, new String[0]);
                if (!Files.exists(this.dirPath, new LinkOption[0])) {
                    Files.createDirectories(this.dirPath, new FileAttribute[0]);
                }
                WatchKey watchkey = this.dirPath.register(this.watchService, StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_MODIFY, StandardWatchEventKinds.ENTRY_DELETE, StandardWatchEventKinds.OVERFLOW);
                this.watchKeyPathMap.put(watchkey, this.dirPath);
                LOG.info("Watch service registered for dirpath = {}", (Object)this.dirPath);
            }
        }

        public void close() {
            if (this.watchService != null) {
                try {
                    this.watchService.close();
                    this.watchService = null;
                    LOG.info("Closed watcher for {}", this.watchKeyPathMap);
                }
                catch (IOException ioe) {
                    LOG.error("Failed to shutdown watcher for {}.", PhysicalClusterMetadata.this.watchDirs, (Object)ioe);
                }
            }
        }

        boolean isRegistered() {
            return this.watchService != null;
        }

        @Override
        public void run() {
            for (WatchKey key : this.watchKeyPathMap.keySet()) {
                LOG.info("Starting listening for changes in {}", (Object)this.watchKeyPathMap.get(key));
            }
            try {
                this.runWatcher(this.watchService, this.watchKeyPathMap);
            }
            catch (InterruptedException ie) {
                LOG.warn("Watching {} was interrupted.", this.watchKeyPathMap);
            }
            catch (Exception e) {
                LOG.warn("Stopping watching. ", (Throwable)e);
            }
            finally {
                this.close();
            }
        }

        private void runWatcher(WatchService watchService, Map<WatchKey, Path> watchKeyPathMap) throws InterruptedException {
            while (!watchKeyPathMap.isEmpty()) {
                WatchKey watchKey = watchService.take();
                for (WatchEvent<?> event : watchKey.pollEvents()) {
                    LOG.debug("Got event: {} {}", event.kind(), event.context());
                    Path filename = watchKeyPathMap.get(watchKey).resolve((Path)event.context());
                    String file = filename.getFileName().toString();
                    if (!PhysicalClusterMetadata.DATA_DIR_NAME.equals(file)) continue;
                    if (event.kind() == StandardWatchEventKinds.ENTRY_DELETE) {
                        LOG.warn("Directory with logical cluster metadata is removed. Ignoring.");
                        continue;
                    }
                    PhysicalClusterMetadata.this.updateLogicalClusterMetadata();
                }
                if (watchKey.reset()) continue;
                Path removedPath = watchKeyPathMap.remove(watchKey);
                LOG.warn("Watch key no longer registered for {}.", (Object)removedPath);
            }
            LOG.warn("No watch keys registered. Stopped watch service.");
        }
    }
}

