package io.confluent.ksql.engine;

import com.google.common.util.concurrent.AbstractScheduledService;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.query.QueryRegistry;
import io.confluent.ksql.services.KafkaTopicClient;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.QueryApplicationId;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.streams.StreamsConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/engine/TransientQueryCleanupService.class */
public class TransientQueryCleanupService extends AbstractScheduledService {
    private static final Logger LOG = LoggerFactory.getLogger(TransientQueryCleanupService.class);
    private final Pattern internalTopicPrefixPattern;
    private final Pattern transientAppIdPattern;
    private final Set<String> queriesGuaranteedToBeRunningAtSomePoint;
    private final String stateDir;
    private final KafkaTopicClient topicClient;
    private final int initialDelay;
    private final int intervalPeriod;
    private Optional<Set<String>> localCommandsQueryAppIds;
    private QueryRegistry queryRegistry;
    private int numLeakedTopics;
    private int numLeakedStateDirs;
    private int numLeakedTopicsFailedToCleanUp;
    private int numLeakedStateDirsFailedToCleanUp;

    public TransientQueryCleanupService(ServiceContext serviceContext, KsqlConfig ksqlConfig) {
        String buildInternalTopicPrefix = QueryApplicationId.buildInternalTopicPrefix(ksqlConfig, false);
        this.internalTopicPrefixPattern = Pattern.compile(buildInternalTopicPrefix);
        this.transientAppIdPattern = Pattern.compile(buildInternalTopicPrefix + ".*_[0-9]\\d*_[0-9]\\d*");
        this.initialDelay = ksqlConfig.getInt("ksql.transient.query.cleanup.service.initial.delay.seconds").intValue();
        this.intervalPeriod = ksqlConfig.getInt("ksql.transient.query.cleanup.service.period.seconds").intValue();
        this.stateDir = ksqlConfig.getKsqlStreamConfigProps().getOrDefault("state.dir", StreamsConfig.configDef().defaultValues().get("state.dir")).toString();
        this.topicClient = serviceContext.getTopicClient();
        this.localCommandsQueryAppIds = Optional.empty();
        this.queriesGuaranteedToBeRunningAtSomePoint = new HashSet();
        this.numLeakedTopics = 0;
        this.numLeakedStateDirs = 0;
    }

    protected void runOneIteration() {
        findAndDeleteLeakedTopics();
        findAndDeleteLeakedStateDirs();
    }

    public AbstractScheduledService.Scheduler scheduler() {
        return AbstractScheduledService.Scheduler.newFixedRateSchedule(this.initialDelay, this.intervalPeriod, TimeUnit.SECONDS);
    }

    public void setQueryRegistry(QueryRegistry queryRegistry) {
        this.queryRegistry = queryRegistry;
    }

    @SuppressFBWarnings({"EI_EXPOSE_REP2"})
    public void setLocalCommandsQueryAppIds(Set<String> set) {
        this.localCommandsQueryAppIds = Optional.of(set);
    }

    private void findAndDeleteLeakedTopics() {
        try {
            List<String> findLeakedTopics = findLeakedTopics();
            this.numLeakedTopics = findLeakedTopics.size();
            LOG.info("Cleaning up {} leaked topics: {}", Integer.valueOf(this.numLeakedTopics), findLeakedTopics);
            getTopicClient().deleteTopics(findLeakedTopics);
            this.numLeakedTopicsFailedToCleanUp = findLeakedTopics().size();
        } catch (Throwable th) {
            LOG.error("Failed to clean up topics with exception: " + th.getMessage(), th);
        }
    }

    private void findAndDeleteLeakedStateDirs() {
        try {
            List<String> findLeakedStateDirs = findLeakedStateDirs();
            this.numLeakedStateDirs = findLeakedStateDirs.size();
            LOG.info("Cleaning up {} leaked state directories: {}", Integer.valueOf(this.numLeakedStateDirs), findLeakedStateDirs.stream().map(str -> {
                return this.stateDir + "/" + str;
            }).collect(Collectors.toList()));
            findLeakedStateDirs.forEach(this::deleteLeakedStateDir);
            this.numLeakedStateDirsFailedToCleanUp = findLeakedStateDirs().size();
        } catch (Throwable th) {
            LOG.error("Failed to clean up state directories with exception: " + th.getMessage(), th);
        }
    }

    private void deleteLeakedStateDir(String str) {
        String str2 = this.stateDir + "/" + str;
        try {
            Files.deleteIfExists(Paths.get(str2, new String[0]));
        } catch (IOException e) {
            LOG.info("Transient Query Cleanup Service failed to delete leaked state directory: " + str2, e);
        }
    }

    List<String> findLeakedTopics() {
        return (List) getTopicClient().listTopicNames().stream().filter(this::isLeaked).collect(Collectors.toList());
    }

    List<String> findLeakedStateDirs() {
        return (List) listAllStateFiles().stream().filter(this::isLeaked).collect(Collectors.toList());
    }

    List<String> listAllStateFiles() {
        File[] listFiles = new File(this.stateDir).listFiles();
        return listFiles == null ? Collections.emptyList() : (List) Arrays.stream(listFiles).map((v0) -> {
            return v0.getName();
        }).collect(Collectors.toList());
    }

    boolean isLeaked(String str) {
        if (foundInLocalCommands(str)) {
            return true;
        }
        if (!this.internalTopicPrefixPattern.matcher(str).find() || !isCorrespondingQueryTerminated(str)) {
            return false;
        }
        Matcher matcher = this.transientAppIdPattern.matcher(str);
        if (matcher.find()) {
            return wasQueryGuaranteedToBeRunningAtSomePoint(matcher.group());
        }
        return false;
    }

    boolean isCorrespondingQueryTerminated(String str) {
        Stream<R> map = this.queryRegistry.getAllLiveQueries().stream().map(queryMetadata -> {
            return queryMetadata.getQueryId().toString();
        });
        str.getClass();
        return map.noneMatch((v1) -> {
            return r1.contains(v1);
        });
    }

    public void registerRunningQuery(String str) {
        this.queriesGuaranteedToBeRunningAtSomePoint.add(str);
    }

    boolean wasQueryGuaranteedToBeRunningAtSomePoint(String str) {
        return this.queriesGuaranteedToBeRunningAtSomePoint.contains(str);
    }

    boolean foundInLocalCommands(String str) {
        return ((Boolean) this.localCommandsQueryAppIds.map(set -> {
            Stream stream = set.stream();
            str.getClass();
            return Boolean.valueOf(stream.anyMatch((v1) -> {
                return r1.contains(v1);
            }));
        }).orElse(false)).booleanValue();
    }

    KafkaTopicClient getTopicClient() {
        return this.topicClient;
    }

    String getStateDir() {
        return this.stateDir;
    }

    public int getNumLeakedTopics() {
        return this.numLeakedTopics;
    }

    public int getNumLeakedStateDirs() {
        return this.numLeakedStateDirs;
    }

    public int getNumLeakedTopicsFailedToCleanUp() {
        return this.numLeakedTopicsFailedToCleanUp;
    }

    public int getNumLeakedStateDirsFailedToCleanUp() {
        return this.numLeakedStateDirsFailedToCleanUp;
    }
}
