package io.confluent.ksql.topic;

import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.parser.SqlFormatter;
import io.confluent.ksql.parser.properties.with.CreateSourceAsProperties;
import io.confluent.ksql.parser.properties.with.CreateSourceProperties;
import io.confluent.ksql.parser.tree.CreateAsSelect;
import io.confluent.ksql.parser.tree.CreateSource;
import io.confluent.ksql.parser.tree.CreateStreamAsSelect;
import io.confluent.ksql.parser.tree.CreateTable;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.parser.tree.WindowExpression;
import io.confluent.ksql.services.KafkaTopicClient;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.statement.Injector;
import io.confluent.ksql.topic.TopicProperties;
import io.confluent.ksql.util.KsqlException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

/* loaded from: input_file:io/confluent/ksql/topic/TopicCreateInjector.class */
public class TopicCreateInjector implements Injector {
    private final KafkaTopicClient topicClient;
    private final MetaStore metaStore;

    public TopicCreateInjector(KsqlExecutionContext ksqlExecutionContext, ServiceContext serviceContext) {
        this(serviceContext.getTopicClient(), ksqlExecutionContext.getMetaStore());
    }

    TopicCreateInjector(KafkaTopicClient kafkaTopicClient, MetaStore metaStore) {
        this.topicClient = (KafkaTopicClient) Objects.requireNonNull(kafkaTopicClient, "topicClient");
        this.metaStore = (MetaStore) Objects.requireNonNull(metaStore, "metaStore");
    }

    @Override // io.confluent.ksql.statement.Injector
    public <T extends Statement> ConfiguredStatement<T> inject(ConfiguredStatement<T> configuredStatement) {
        return inject(configuredStatement, new TopicProperties.Builder());
    }

    @VisibleForTesting
    <T extends Statement> ConfiguredStatement<T> inject(ConfiguredStatement<T> configuredStatement, TopicProperties.Builder builder) {
        return configuredStatement.getStatement() instanceof CreateAsSelect ? (ConfiguredStatement<T>) injectForCreateAsSelect(configuredStatement, builder) : configuredStatement.getStatement() instanceof CreateSource ? (ConfiguredStatement<T>) injectForCreateSource(configuredStatement, builder) : configuredStatement;
    }

    private ConfiguredStatement<? extends CreateSource> injectForCreateSource(ConfiguredStatement<? extends CreateSource> configuredStatement, TopicProperties.Builder builder) {
        CreateSource statement = configuredStatement.getStatement();
        CreateSourceProperties properties = statement.getProperties();
        String kafkaTopic = properties.getKafkaTopic();
        if (this.topicClient.isTopicExists(kafkaTopic)) {
            builder.withSource(() -> {
                return this.topicClient.describeTopic(kafkaTopic);
            });
        } else if (!properties.getPartitions().isPresent()) {
            throw new KsqlException("Topic '" + kafkaTopic + "' does not exist. If you want to create a new topic for the stream/table please re-run the statement providing the required 'PARTITIONS' configuration in the WITH clause (and optionally 'REPLICAS'). For example: " + SqlFormatter.formatSql(statement.copyWith(statement.getElements(), properties.withPartitions(2))));
        }
        builder.withName(kafkaTopic).withWithClause(Optional.of(properties.getKafkaTopic()), properties.getPartitions(), properties.getReplicas());
        createTopic(builder, statement instanceof CreateTable ? "compact" : "delete");
        return configuredStatement;
    }

    /* JADX WARN: Multi-variable type inference failed */
    private <T extends CreateAsSelect> ConfiguredStatement<?> injectForCreateAsSelect(ConfiguredStatement<T> configuredStatement, TopicProperties.Builder builder) {
        String str;
        String string = configuredStatement.getSessionConfig().getConfig(true).getString("ksql.output.topic.name.prefix");
        CreateAsSelect statement = configuredStatement.getStatement();
        CreateSourceAsProperties properties = statement.getProperties();
        SourceTopicsExtractor sourceTopicsExtractor = new SourceTopicsExtractor(this.metaStore);
        sourceTopicsExtractor.process(configuredStatement.getStatement().getQuery(), null);
        String kafkaTopicName = sourceTopicsExtractor.getPrimarySourceTopic().getKafkaTopicName();
        builder.withName(string + statement.getName().text()).withSource(() -> {
            return this.topicClient.describeTopic(kafkaTopicName);
        }).withWithClause(properties.getKafkaTopic(), properties.getPartitions(), properties.getReplicas());
        HashMap hashMap = new HashMap();
        if (statement instanceof CreateStreamAsSelect) {
            str = "delete";
        } else if (statement.getQuery().getWindow().isPresent()) {
            str = "compact,delete";
            ((WindowExpression) statement.getQuery().getWindow().get()).getKsqlWindowExpression().getRetention().ifPresent(windowTimeClause -> {
                hashMap.put("retention.ms", Long.valueOf(windowTimeClause.toDuration().toMillis()));
            });
        } else {
            str = "compact";
        }
        TopicProperties createTopic = createTopic(builder, str, hashMap);
        CreateAsSelect copyWith = statement.copyWith(properties.withTopic(createTopic.getTopicName(), createTopic.getPartitions(), createTopic.getReplicas()));
        return configuredStatement.withStatement(SqlFormatter.formatSql(copyWith) + ";", copyWith);
    }

    private TopicProperties createTopic(TopicProperties.Builder builder, String str) {
        return createTopic(builder, str, Collections.emptyMap());
    }

    private TopicProperties createTopic(TopicProperties.Builder builder, String str, Map<String, Object> map) {
        TopicProperties build = builder.build();
        HashMap hashMap = new HashMap();
        hashMap.put("cleanup.policy", str);
        hashMap.putAll(map);
        this.topicClient.createTopic(build.getTopicName(), build.getPartitions(), build.getReplicas(), hashMap);
        return build;
    }
}
