/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.license.validator;

import com.yammer.metrics.Metrics;
import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.MetricName;
import io.confluent.license.InvalidLicenseException;
import io.confluent.license.License;
import io.confluent.license.LicenseChanged;
import io.confluent.license.LicenseManager;
import io.confluent.license.LicenseStore;
import io.confluent.license.validator.LicenseConfig;
import java.time.Duration;
import java.util.Date;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.license.LicenseTrackingInfoHolder;
import org.apache.kafka.server.license.LicenseValidator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConfluentLicenseValidator
implements LicenseValidator,
Consumer<LicenseChanged> {
    private static final Logger log = LoggerFactory.getLogger(ConfluentLicenseValidator.class);
    private static final String CP_FOR_CC_LICENSE_WARNING = "Your license for Customer-Managed Confluent Platform for Confluent Cloud may be used solely to access and use the Cloud Service. When operating under this specific license, Confluent does not provide support for any self-managed (Confluent Platform) components utilised exclusively for Confluent Platform broker use cases. To be eligible for support, you must ensure you are connecting to a Confluent Cloud broker or are utilising a valid Enterprise license for a Confluent Platform subscription.";
    private static final Duration LICENSE_CHECK_INTERVAL = Duration.ofDays(1L);
    public static final String METRIC_GROUP = "confluent.license";
    public static final String METRIC_NAME = "licenseStatus";
    private final Duration checkLicenseInterval;
    private Map<String, ?> configs;
    private MetricName licenseStatusMetricName;
    private ScheduledExecutorService executorService;
    private KafkaLicenseStore licenseStore;
    private LicenseManager licenseManager;
    private volatile LicenseStatus licenseStatus;
    private volatile String errorMessage;

    public ConfluentLicenseValidator() {
        this(LICENSE_CHECK_INTERVAL);
    }

    ConfluentLicenseValidator(Duration checkLicenseInterval) {
        this(null, null, checkLicenseInterval);
    }

    protected ConfluentLicenseValidator(LicenseManager licenseManager, ScheduledExecutorService executorService, Duration checkLicenseInterval) {
        this.licenseManager = licenseManager;
        this.executorService = executorService;
        this.checkLicenseInterval = checkLicenseInterval;
    }

    public void configure(Map<String, ?> configs) {
        this.configs = configs;
    }

    public final boolean enabled() {
        return true;
    }

    public void start(String componentId) {
        HashMap<String, Object> licenseConfigs = new HashMap<String, Object>(this.configs);
        LicenseConfig tmpConfig = new LicenseConfig(componentId, this.configs);
        this.replacePrefix(tmpConfig, licenseConfigs, "confluent.metadata.", "confluent.license.");
        this.replacePrefix(tmpConfig, licenseConfigs, "confluent.metadata.consumer.", "confluent.license.consumer.");
        this.replacePrefix(tmpConfig, licenseConfigs, "confluent.metadata.producer.", "confluent.license.producer.");
        this.replacePrefix(tmpConfig, licenseConfigs, "confluent.metadata.admin.", "confluent.license.admin.");
        LicenseConfig licenseConfig = new LicenseConfig(componentId, this.configs);
        this.licenseStore = this.createLicenseStore(licenseConfig);
        if (this.licenseManager == null) {
            this.licenseManager = this.createLicenseManager(licenseConfig);
        }
        this.licenseManager.addListener(this);
        License configuredLicense = this.licenseManager.configuredLicense();
        if (configuredLicense != null) {
            this.updateLicenseStatus(configuredLicense);
        }
        String licenseStoreDesc = "license store with topic " + this.licenseStore.topic();
        Future<?> startFuture = this.startLicenseStore(licenseConfig.license, configuredLicense == null, licenseStoreDesc);
        if (configuredLicense == null) {
            try {
                startFuture.get(licenseConfig.topicCreateTimeout.toMillis() + 15000L, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException e) {
                throw new InterruptException("Start up of " + licenseStoreDesc + " was interrupted", e);
            }
            catch (java.util.concurrent.TimeoutException e) {
                throw new TimeoutException("Start up timed out for " + licenseStoreDesc, (Throwable)e);
            }
            catch (ExecutionException e) {
                if (e.getCause() instanceof InvalidLicenseException) {
                    throw (InvalidLicenseException)((Object)e.getCause());
                }
                throw new KafkaException("Failed to start " + licenseStoreDesc + ". A valid license must be configured using 'confluent.license' if topic is unavailable.", e.getCause());
            }
        }
        this.registerMetric(METRIC_GROUP);
        this.schedulePeriodicValidation();
    }

    public boolean isLicenseValid() {
        return this.licenseStatus != null && this.licenseStatus.active;
    }

    @Override
    public void accept(LicenseChanged licenseChanged) {
        License license = licenseChanged.license();
        if (licenseChanged.type() == LicenseChanged.Type.INVALID) {
            this.updateInvalidStatus(licenseChanged.description());
        } else if (licenseChanged.type() == LicenseChanged.Type.EXPIRED) {
            Date expirationDate = license.expirationDate();
            if (license.isTrial()) {
                this.updateExpiredStatus(LicenseStatus.TRIAL_EXPIRED, expirationDate);
            } else if (license.isFreeTier()) {
                this.updateExpiredStatus(LicenseStatus.FREE_TIER_EXPIRED, expirationDate);
            } else {
                this.updateExpiredStatus(LicenseStatus.LICENSE_EXPIRED, expirationDate);
            }
        } else {
            if (license.deploymentScope() == License.DeploymentScope.CP_FOR_CC) {
                log.warn(CP_FOR_CC_LICENSE_WARNING);
            }
            this.updateLicenseStatus(license);
        }
    }

    public LicenseTrackingInfoHolder get() {
        return () -> {
            License currentLicense = this.licenseManager.registerOrValidateLicense("");
            return Map.of("audience", currentLicense.audienceString(), "lid", currentLicense.licenseId(), "pid", currentLicense.platformId());
        };
    }

    public void close() {
        if (this.executorService != null) {
            this.executorService.shutdownNow();
            try {
                this.executorService.awaitTermination(60L, TimeUnit.SECONDS);
            }
            catch (Exception e) {
                log.error("License executor did not terminate");
            }
        }
        if (this.licenseManager != null) {
            this.licenseManager.removeListener(this);
            this.licenseManager.stop();
        }
        try {
            Metrics.defaultRegistry().removeMetric(this.licenseStatusMetricName);
        }
        catch (Exception e) {
            log.debug("Metric {} not found", (Object)this.licenseStatusMetricName);
        }
    }

    protected KafkaLicenseStore createLicenseStore(LicenseConfig licenseConfig) {
        return new KafkaLicenseStore(licenseConfig.topic, licenseConfig.producerConfigs(), licenseConfig.consumerConfigs(), licenseConfig.topicAndAdminClientConfigs(), licenseConfig.topicCreateTimeout, licenseConfig.retryBackoffMinMs, licenseConfig.retryBackoffMaxMs);
    }

    protected LicenseManager createLicenseManager(LicenseConfig licenseConfig) {
        return new LicenseManager(licenseConfig.topicAndAdminClientConfigs(), this.licenseStore, licenseConfig.license, false, true);
    }

    protected void updateExpiredStatus(LicenseStatus status, Date expirationDate) {
        switch (status.ordinal()) {
            case 1: {
                this.errorMessage = "Your trial license has expired. Please add a valid license to continue using the product.";
                break;
            }
            case 3: {
                this.errorMessage = "Your free-tier license has expired. Please add a valid license to continue using the product.";
                break;
            }
            case 5: {
                this.errorMessage = String.format("Your license expired at %s. Please add a valid license to continue using the product.", expirationDate);
                break;
            }
            default: {
                throw new IllegalStateException("Unexpected expired license status " + String.valueOf((Object)status));
            }
        }
        this.licenseStatus = status;
    }

    protected void updateInvalidStatus(String reason) {
        this.errorMessage = reason;
        this.licenseStatus = LicenseStatus.INVALID_LICENSE;
    }

    LicenseStatus licenseStatus() {
        return this.licenseStatus;
    }

    protected void updateLicenseStatus(LicenseStatus status) {
        this.errorMessage = null;
        this.licenseStatus = status;
    }

    private void updateLicenseStatus(License license) {
        if (license.isTrial()) {
            this.updateLicenseStatus(LicenseStatus.TRIAL);
        } else if (license.isFreeTier()) {
            this.updateLicenseStatus(LicenseStatus.FREE_TIER);
        } else {
            this.updateLicenseStatus(LicenseStatus.LICENSE_ACTIVE);
        }
    }

    protected void registerMetric(String metricGroup) {
        String metricType = LicenseValidator.class.getSimpleName();
        MetricName metricName = new MetricName(metricGroup, metricType, METRIC_NAME, null, String.format("%s:type=%s,name=%s", metricGroup, metricType, METRIC_NAME));
        Metrics.defaultRegistry().newGauge(metricName, (Gauge)new Gauge<String>(){

            public String value() {
                return ConfluentLicenseValidator.this.licenseStatus.name().toLowerCase(Locale.ROOT);
            }
        });
        this.licenseStatusMetricName = metricName;
    }

    private Future<?> startLicenseStore(String configuredLicense, boolean failOnError, String storeDesc) {
        if (this.executorService == null) {
            this.executorService = Executors.newSingleThreadScheduledExecutor(runnable -> {
                Thread thread = new Thread(runnable, "confluent-license-manager");
                thread.setDaemon(true);
                return thread;
            });
        }
        return this.executorService.submit(() -> {
            try {
                this.licenseStore.start();
                License license = this.licenseManager.registerOrValidateLicense(configuredLicense);
                if (license != null) {
                    this.updateLicenseStatus(license);
                    long timeUntilExpireMs = license.timeRemaining(TimeUnit.MILLISECONDS);
                    this.licenseManager.timeUntilLicenseExpirationMs(timeUntilExpireMs);
                }
            }
            catch (InvalidLicenseException e) {
                log.error("License validation has failed. This means both the stored and configured license was invalid. Starting License Store with invalid license: {}", (Object)e.getMessage());
            }
            catch (Throwable t) {
                if (failOnError) {
                    throw t;
                }
                log.warn("Could not start " + storeDesc + ", configured license will be used without storing in license topic", t);
            }
            this.licenseManager.start();
        });
    }

    protected void schedulePeriodicValidation() {
        this.executorService.scheduleAtFixedRate(this::checkLicense, this.checkLicenseInterval.toMillis(), this.checkLicenseInterval.toMillis(), TimeUnit.MILLISECONDS);
    }

    protected void checkLicense() {
        long timeRemainingMs;
        long millisBeforeNextCheck;
        if (!this.isLicenseValid()) {
            log.error("************** {} Will check again every hour until a valid license is found. **************", (Object)this.errorMessage);
        }
        if ((millisBeforeNextCheck = this.getNextCheckIntervalMsAndMaybeLog(timeRemainingMs = this.licenseManager.timeUntilLicenseExpirationMs())) > 0L) {
            log.debug("Scheduling next license check in {} milliseconds", (Object)millisBeforeNextCheck);
            this.executorService.schedule(this::checkLicense, millisBeforeNextCheck, TimeUnit.MILLISECONDS);
        }
    }

    protected long getNextCheckIntervalMsAndMaybeLog(long timeRemainingMs) {
        String message = String.format("You have %d days remaining until license expires.", TimeUnit.MILLISECONDS.toDays(timeRemainingMs));
        if (timeRemainingMs <= 0L) {
            return TimeUnit.HOURS.toMillis(1L);
        }
        if (timeRemainingMs < TimeUnit.DAYS.toMillis(3L)) {
            log.error(message);
            return TimeUnit.HOURS.toMillis(1L);
        }
        if (timeRemainingMs < TimeUnit.DAYS.toMillis(5L)) {
            log.error(message);
            return TimeUnit.HOURS.toMillis(3L);
        }
        if (timeRemainingMs < TimeUnit.DAYS.toMillis(30L)) {
            log.warn(message);
        }
        return -1L;
    }

    private void replacePrefix(AbstractConfig srcConfig, Map<String, Object> dstConfigs, String srcPrefix, String dstPrefix) {
        Map prefixedConfigs = srcConfig.originalsWithPrefix(srcPrefix);
        prefixedConfigs.forEach((k, v) -> {
            dstConfigs.remove(srcPrefix + k);
            dstConfigs.putIfAbsent(dstPrefix + k, v);
        });
    }

    static class KafkaLicenseStore
    extends LicenseStore {
        private volatile boolean active;

        KafkaLicenseStore(String topic, Map<String, Object> producerConfig, Map<String, Object> consumerConfig, Map<String, Object> topicConfig, Duration topicCreateTimeout, Duration retryBackoffMinMs, Duration retryBackoffMaxMs) {
            super(topic, producerConfig, consumerConfig, topicConfig, topicCreateTimeout, retryBackoffMinMs, retryBackoffMaxMs, Time.SYSTEM);
        }

        @Override
        protected void startLog() {
            this.startLogStore();
            this.active = true;
        }

        @Override
        protected void stopLog() {
            this.active = false;
            this.stopLogStore();
        }

        protected void startLogStore() {
            super.startLog();
        }

        protected void stopLogStore() {
            super.stopLog();
        }

        @Override
        public String licenseScan() {
            return this.active ? super.licenseScan() : null;
        }

        @Override
        public synchronized void registerLicense(String license, Callback callback) {
            if (this.active) {
                super.registerLicense(license, callback);
            } else {
                log.debug("License store is not active, not registering license");
            }
        }

        protected boolean active() {
            return this.active;
        }
    }

    public static enum LicenseStatus {
        TRIAL(true),
        TRIAL_EXPIRED(false),
        FREE_TIER(true),
        FREE_TIER_EXPIRED(false),
        LICENSE_ACTIVE(true),
        LICENSE_EXPIRED(false),
        INVALID_LICENSE(false);

        final boolean active;

        private LicenseStatus(boolean active) {
            this.active = active;
        }
    }
}

