package io.confluent.ksql.rest.util;

import com.google.common.collect.ImmutableList;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.schema.registry.SchemaRegistryUtil;
import io.confluent.ksql.serde.FormatFactory;
import io.confluent.ksql.serde.SerdeFeature;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.ExecutorUtil;
import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.KsqlException;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.common.errors.TopicDeletionDisabledException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/rest/util/ClusterTerminator.class */
public class ClusterTerminator {
    private static final Logger LOGGER = LoggerFactory.getLogger(ClusterTerminator.class);
    private final KsqlEngine ksqlEngine;
    private final ServiceContext serviceContext;
    private final List<String> managedTopics;

    @SuppressFBWarnings({"EI_EXPOSE_REP2"})
    public ClusterTerminator(KsqlEngine ksqlEngine, ServiceContext serviceContext, List<String> list) {
        Objects.requireNonNull(ksqlEngine, "ksqlEngine is null.");
        this.ksqlEngine = ksqlEngine;
        this.serviceContext = (ServiceContext) Objects.requireNonNull(serviceContext);
        this.managedTopics = ImmutableList.copyOf((Collection) Objects.requireNonNull(list, "managedTopics"));
    }

    public void terminateCluster(List<String> list) {
        terminatePersistentQueries();
        deleteSinkTopics(list);
        deleteTopics(this.managedTopics);
        this.ksqlEngine.close();
    }

    private void terminatePersistentQueries() {
        this.ksqlEngine.getPersistentQueries().forEach((v0) -> {
            v0.close();
        });
    }

    private void deleteSinkTopics(List<String> list) {
        if (list.isEmpty()) {
            return;
        }
        List<DataSource> sourcesToDelete = getSourcesToDelete((List) list.stream().map(Pattern::compile).collect(Collectors.toList()), this.ksqlEngine.getMetaStore());
        deleteTopics(topicNames(sourcesToDelete));
        cleanUpSinkSchemas(subjectNames(sourcesToDelete));
    }

    private List<String> filterNonExistingTopics(Collection<String> collection) {
        Set listTopicNames = this.serviceContext.getTopicClient().listTopicNames();
        Stream<String> stream = collection.stream();
        listTopicNames.getClass();
        return (List) stream.filter((v1) -> {
            return r1.contains(v1);
        }).collect(Collectors.toList());
    }

    private void deleteTopics(Collection<String> collection) {
        try {
            ExecutorUtil.executeWithRetries(() -> {
                this.serviceContext.getTopicClient().deleteTopics(filterNonExistingTopics(collection));
            }, ExecutorUtil.RetryBehaviour.ALWAYS);
        } catch (TopicDeletionDisabledException e) {
            LOGGER.info("Did not delete any topics: {}", e.getMessage());
        } catch (Exception e2) {
            throw new KsqlException("Exception while deleting topics: " + StringUtils.join(collection, ", "));
        }
    }

    private void cleanUpSinkSchemas(Collection<String> collection) {
        collection.retainAll((Set) SchemaRegistryUtil.getSubjectNames(this.serviceContext.getSchemaRegistryClient()).collect(Collectors.toSet()));
        collection.forEach(this::deleteSubject);
    }

    private void deleteSubject(String str) {
        try {
            SchemaRegistryUtil.deleteSubjectWithRetries(this.serviceContext.getSchemaRegistryClient(), str);
        } catch (Exception e) {
            LOGGER.warn("Failed to clean up Avro schema for subject: " + str, e);
        }
    }

    private static List<DataSource> getSourcesToDelete(List<Pattern> list, MetaStore metaStore) {
        Predicate predicate = str -> {
            return list.stream().anyMatch(pattern -> {
                return pattern.matcher(str).matches();
            });
        };
        return (List) metaStore.getAllDataSources().values().stream().filter((v0) -> {
            return v0.isCasTarget();
        }).filter(dataSource -> {
            return predicate.test(dataSource.getKsqlTopic().getKafkaTopicName());
        }).collect(Collectors.toList());
    }

    private static Set<String> topicNames(List<DataSource> list) {
        return (Set) list.stream().map((v0) -> {
            return v0.getKsqlTopic();
        }).map((v0) -> {
            return v0.getKafkaTopicName();
        }).collect(Collectors.toSet());
    }

    private static Set<String> subjectNames(List<DataSource> list) {
        HashSet hashSet = new HashSet();
        for (DataSource dataSource : list) {
            if (FormatFactory.fromName(dataSource.getKsqlTopic().getKeyFormat().getFormat()).supportsFeature(SerdeFeature.SCHEMA_INFERENCE)) {
                hashSet.add(KsqlConstants.getSRSubject(dataSource.getKafkaTopicName(), true));
            }
            if (FormatFactory.fromName(dataSource.getKsqlTopic().getValueFormat().getFormat()).supportsFeature(SerdeFeature.SCHEMA_INFERENCE)) {
                hashSet.add(KsqlConstants.getSRSubject(dataSource.getKafkaTopicName(), false));
            }
        }
        return hashSet;
    }
}
