package io.confluent.ksql.engine;

import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.AbstractExecutionThreadService;
import com.spun.util.io.FileUtils;
import io.confluent.ksql.schema.registry.SchemaRegistryUtil;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.QueryApplicationId;
import java.io.File;
import java.nio.file.Paths;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/engine/QueryCleanupService.class */
public class QueryCleanupService extends AbstractExecutionThreadService {
    private static final Logger LOG = LoggerFactory.getLogger(QueryCleanupService.class);
    private static final Runnable SHUTDOWN_SENTINEL = () -> {
    };
    private final BlockingQueue<Runnable> cleanupTasks = new LinkedBlockingDeque();

    /* loaded from: input_file:io/confluent/ksql/engine/QueryCleanupService$QueryCleanupTask.class */
    public static class QueryCleanupTask implements Runnable {
        private final String appId;
        private final String queryTopicPrefix;
        private final String altQueryTopicPrefix;
        private final Optional<String> topologyName;
        private final String pathName;
        private final boolean isTransient;
        private final ServiceContext serviceContext;

        public QueryCleanupTask(ServiceContext serviceContext, String str, Optional<String> optional, boolean z, String str2, String str3, String str4) {
            this.serviceContext = (ServiceContext) Objects.requireNonNull(serviceContext, "serviceContext");
            this.appId = (String) Objects.requireNonNull(str, "appId");
            this.topologyName = (Optional) Objects.requireNonNull(optional, "queryId");
            this.queryTopicPrefix = (String) optional.map(str5 -> {
                return QueryApplicationId.buildInternalTopicPrefix(str3, str4) + str5;
            }).orElse(str);
            this.altQueryTopicPrefix = (String) optional.map(str6 -> {
                return QueryApplicationId.buildInternalTopicPrefix(str3, str4.split("_")[0] + "-") + str6;
            }).orElse(str);
            this.isTransient = z;
            this.pathName = (String) optional.map(str7 -> {
                return str2 + "/" + str + "/__" + str7 + "__";
            }).orElse(str2 + "/" + str);
            if (z && optional.isPresent()) {
                throw new IllegalArgumentException("Transient Queries can not have named topologies");
            }
        }

        public String getAppId() {
            return this.appId;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                File file = new File(String.valueOf(Paths.get(this.pathName, new String[0]).normalize()));
                if (file.exists()) {
                    FileUtils.deleteDirectory(file);
                    QueryCleanupService.LOG.warn("Deleted local state store for non-existing query {}. This is not expected and was likely due to a race condition when the query was dropped before.", this.queryTopicPrefix);
                }
            } catch (Exception e) {
                QueryCleanupService.LOG.error("Error cleaning up state directory {}\n. {}", this.pathName, e);
            }
            tryRun(() -> {
                QueryCleanupService.LOG.info("Deleting schemas for prefix {}", this.queryTopicPrefix);
                SchemaRegistryUtil.cleanupInternalTopicSchemas(this.queryTopicPrefix, this.serviceContext.getSchemaRegistryClient(), this.isTransient);
            }, "internal topic schemas");
            tryRun(() -> {
                QueryCleanupService.LOG.info("Deleting topics for prefix {}", this.queryTopicPrefix);
                this.serviceContext.getTopicClient().deleteInternalTopics(this.queryTopicPrefix);
                this.serviceContext.getTopicClient().deleteInternalTopics(this.altQueryTopicPrefix);
            }, "internal topics");
            if (!this.topologyName.isPresent() || this.isTransient) {
                tryRun(() -> {
                    this.serviceContext.getConsumerGroupClient().deleteConsumerGroups(ImmutableSet.of(this.appId));
                }, "internal consumer groups");
            }
        }

        private void tryRun(Runnable runnable, String str) {
            try {
                runnable.run();
            } catch (Exception e) {
                QueryCleanupService.LOG.warn("Failed to cleanup {} for {}", new Object[]{str, this.appId, e});
            }
        }
    }

    protected void run() {
        while (true) {
            try {
                Runnable take = this.cleanupTasks.take();
                if (take == SHUTDOWN_SENTINEL) {
                    return;
                } else {
                    take.run();
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return;
            }
        }
    }

    protected void triggerShutdown() {
        this.cleanupTasks.add(SHUTDOWN_SENTINEL);
    }

    public Set<String> pendingApplicationIds() {
        Stream stream = this.cleanupTasks.stream();
        Class<QueryCleanupTask> cls = QueryCleanupTask.class;
        QueryCleanupTask.class.getClass();
        Stream filter = stream.filter((v1) -> {
            return r1.isInstance(v1);
        });
        Class<QueryCleanupTask> cls2 = QueryCleanupTask.class;
        QueryCleanupTask.class.getClass();
        return (Set) filter.map((v1) -> {
            return r1.cast(v1);
        }).map(queryCleanupTask -> {
            return queryCleanupTask.appId;
        }).collect(ImmutableSet.toImmutableSet());
    }

    public boolean isEmpty() {
        return this.cleanupTasks.isEmpty();
    }

    public void addCleanupTask(QueryCleanupTask queryCleanupTask) {
        this.cleanupTasks.add(queryCleanupTask);
    }
}
