package io.confluent.ksql.topic;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.io.Closer;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.parser.SqlFormatter;
import io.confluent.ksql.parser.tree.DropStatement;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.schema.registry.SchemaRegistryUtil;
import io.confluent.ksql.serde.FormatFactory;
import io.confluent.ksql.serde.SerdeFeature;
import io.confluent.ksql.services.KafkaTopicClient;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.statement.Injector;
import io.confluent.ksql.util.ExecutorUtil;
import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.KsqlException;
import java.util.Objects;
import java.util.stream.Collectors;

/* loaded from: input_file:io/confluent/ksql/topic/TopicDeleteInjector.class */
public class TopicDeleteInjector implements Injector {
    private final MetaStore metastore;
    private final KafkaTopicClient topicClient;
    private final SchemaRegistryClient schemaRegistryClient;

    public TopicDeleteInjector(KsqlExecutionContext ksqlExecutionContext, ServiceContext serviceContext) {
        this(((KsqlExecutionContext) Objects.requireNonNull(ksqlExecutionContext, "executionContext")).getMetaStore(), serviceContext.getTopicClient(), serviceContext.getSchemaRegistryClient());
    }

    @VisibleForTesting
    TopicDeleteInjector(MetaStore metaStore, KafkaTopicClient kafkaTopicClient, SchemaRegistryClient schemaRegistryClient) {
        this.metastore = (MetaStore) Objects.requireNonNull(metaStore, "metastore");
        this.topicClient = (KafkaTopicClient) Objects.requireNonNull(kafkaTopicClient, "topicClient");
        this.schemaRegistryClient = (SchemaRegistryClient) Objects.requireNonNull(schemaRegistryClient, "schemaRegistryClient");
    }

    @Override // io.confluent.ksql.statement.Injector
    public <T extends Statement> ConfiguredStatement<T> inject(ConfiguredStatement<T> configuredStatement) {
        if (!(configuredStatement.getStatement() instanceof DropStatement)) {
            return configuredStatement;
        }
        DropStatement statement = configuredStatement.getStatement();
        if (!statement.isDeleteTopic()) {
            return configuredStatement;
        }
        SourceName name = statement.getName();
        DataSource source = this.metastore.getSource(name);
        if (source != null) {
            if (source.isSource()) {
                throw new KsqlException("Cannot delete topic for read-only source: " + name.text());
            }
            checkTopicRefs(source);
            deleteTopic(source);
            Closer create = Closer.create();
            create.register(() -> {
                deleteKeySubject(source);
            });
            create.register(() -> {
                deleteValueSubject(source);
            });
            try {
                create.close();
            } catch (KsqlException e) {
                throw e;
            } catch (Exception e2) {
                throw new KsqlException(e2);
            }
        } else if (!statement.getIfExists()) {
            throw new KsqlException("Could not find source to delete topic for: " + configuredStatement);
        }
        DropStatement withoutDeleteClause = statement.withoutDeleteClause();
        return configuredStatement.withStatement(SqlFormatter.formatSql(withoutDeleteClause) + ";", withoutDeleteClause);
    }

    private void deleteTopic(DataSource dataSource) {
        try {
            ExecutorUtil.executeWithRetries(() -> {
                this.topicClient.deleteTopics(ImmutableList.of(dataSource.getKafkaTopicName()));
            }, ExecutorUtil.RetryBehaviour.ALWAYS);
        } catch (Exception e) {
            throw new RuntimeException("Could not delete the corresponding kafka topic: " + dataSource.getKafkaTopicName(), e);
        }
    }

    private void deleteKeySubject(DataSource dataSource) {
        try {
            if (FormatFactory.fromName(dataSource.getKsqlTopic().getKeyFormat().getFormat()).supportsFeature(SerdeFeature.SCHEMA_INFERENCE)) {
                SchemaRegistryUtil.deleteSubjectWithRetries(this.schemaRegistryClient, KsqlConstants.getSRSubject(dataSource.getKafkaTopicName(), true));
            }
        } catch (Exception e) {
            checkSchemaError(e, dataSource.getKafkaTopicName());
        }
    }

    private void deleteValueSubject(DataSource dataSource) {
        try {
            if (FormatFactory.fromName(dataSource.getKsqlTopic().getValueFormat().getFormat()).supportsFeature(SerdeFeature.SCHEMA_INFERENCE)) {
                SchemaRegistryUtil.deleteSubjectWithRetries(this.schemaRegistryClient, KsqlConstants.getSRSubject(dataSource.getKafkaTopicName(), false));
            }
        } catch (Exception e) {
            checkSchemaError(e, dataSource.getKafkaTopicName());
        }
    }

    private static void checkSchemaError(Exception exc, String str) {
        if (!SchemaRegistryUtil.isSubjectNotFoundErrorCode(exc)) {
            throw new RuntimeException("Could not clean up the schema registry for topic: " + str, exc);
        }
    }

    private void checkTopicRefs(DataSource dataSource) {
        String kafkaTopicName = dataSource.getKafkaTopicName();
        SourceName name = dataSource.getName();
        String str = (String) this.metastore.getAllDataSources().values().stream().filter(dataSource2 -> {
            return dataSource2.getKafkaTopicName().equals(kafkaTopicName);
        }).map((v0) -> {
            return v0.getName();
        }).filter(sourceName -> {
            return !name.equals(sourceName);
        }).map((v0) -> {
            return v0.text();
        }).sorted().collect(Collectors.joining(", "));
        if (!str.isEmpty()) {
            throw new RuntimeException(String.format("Refusing to delete topic. Found other data sources (%s) using topic %s", str, kafkaTopicName));
        }
    }
}
