/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.util;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.util.TopicAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TopicCleanupPolicyVerifier {
    private static final Logger log = LoggerFactory.getLogger(TopicCleanupPolicyVerifier.class);
    static final long CHECK_INTERVAL_MS = TimeUnit.MINUTES.toMillis(30L);
    private static final long SHUTDOWN_TIMEOUT_SECONDS = 5L;
    private static final double JITTER_FACTOR = 0.1;
    private final String topic;
    private final String workerTopicConfig;
    private final String topicPurpose;
    private final Supplier<TopicAdmin> adminSupplier;
    ScheduledExecutorService executor;

    public TopicCleanupPolicyVerifier(String topic, String workerTopicConfig, String topicPurpose, Supplier<TopicAdmin> adminSupplier) {
        this.topic = topic;
        this.workerTopicConfig = workerTopicConfig;
        this.topicPurpose = topicPurpose;
        this.adminSupplier = adminSupplier;
    }

    static String sanitizeTopicName(String topicName) {
        if (topicName == null) {
            return "unknown";
        }
        return topicName.replaceAll("[^a-zA-Z0-9_-]", "_");
    }

    public synchronized void start() {
        if (this.executor == null) {
            this.executor = Executors.newSingleThreadScheduledExecutor(r -> {
                Thread t = new Thread(r, "TopicCleanupPolicyVerifier-" + TopicCleanupPolicyVerifier.sanitizeTopicName(this.topic));
                t.setDaemon(true);
                return t;
            });
            long initialDelay = (long)((double)CHECK_INTERVAL_MS * 0.1 * Math.random());
            this.executor.scheduleAtFixedRate(this::verifyCleanupPolicy, initialDelay, CHECK_INTERVAL_MS, TimeUnit.MILLISECONDS);
            log.debug("Started periodic cleanup policy verification for topic '{}' with interval of {} minutes", (Object)this.topic, (Object)TimeUnit.MILLISECONDS.toMinutes(CHECK_INTERVAL_MS));
        }
    }

    public synchronized void stop() {
        if (this.executor != null) {
            this.executor.shutdown();
            try {
                if (!this.executor.awaitTermination(5L, TimeUnit.SECONDS)) {
                    this.executor.shutdownNow();
                }
            }
            catch (InterruptedException e) {
                this.executor.shutdownNow();
                Thread.currentThread().interrupt();
            }
            this.executor = null;
            log.debug("Stopped periodic cleanup policy verification for topic '{}'", (Object)this.topic);
        }
    }

    void verifyCleanupPolicy() {
        try {
            TopicAdmin admin = this.adminSupplier.get();
            if (admin == null) {
                log.warn("Unable to verify cleanup policy for topic '{}': admin client is not available", (Object)this.topic);
                return;
            }
            admin.verifyTopicCleanupPolicyOnlyCompact(this.topic, this.workerTopicConfig, this.topicPurpose);
        }
        catch (ConfigException e) {
            log.warn(e.getMessage());
        }
        catch (Exception e) {
            log.warn("Error while verifying cleanup policy for topic '{}'", (Object)this.topic, (Object)e);
        }
    }
}

