package io.confluent.ksql.services;

import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import io.confluent.ksql.exception.KafkaDeleteTopicsException;
import io.confluent.ksql.exception.KafkaResponseGetFailedException;
import io.confluent.ksql.exception.KsqlTopicAuthorizationException;
import io.confluent.ksql.services.KafkaTopicClient;
import io.confluent.ksql.util.ExecutorUtil;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.KsqlServerException;
import io.confluent.ksql.util.Pair;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.CreateTopicsOptions;
import org.apache.kafka.clients.admin.DescribeTopicsOptions;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.TopicDeletionDisabledException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:io/confluent/ksql/services/KafkaTopicClientImpl.class */
public class KafkaTopicClientImpl implements KafkaTopicClient {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaTopicClient.class);
    private static final String DEFAULT_REPLICATION_PROP = "default.replication.factor";
    private static final String DELETE_TOPIC_ENABLE = "delete.topic.enable";
    private final Supplier<Admin> adminClient;

    public KafkaTopicClientImpl(Supplier<Admin> supplier) {
        this.adminClient = (Supplier) Objects.requireNonNull(supplier, "sharedAdminClient");
    }

    public void createTopic(String str, int i, short s, Map<String, ?> map, CreateTopicsOptions createTopicsOptions) {
        if (isTopicExists(str)) {
            validateTopicProperties(str, i, s);
            return;
        }
        NewTopic newTopic = new NewTopic(str, i, s == -1 ? getDefaultClusterReplication() : s);
        newTopic.configs(toStringConfigs(map));
        try {
            LOG.info("Creating topic '{}' {}", str, createTopicsOptions.shouldValidateOnly() ? "(ONLY VALIDATE)" : "");
            ExecutorUtil.executeWithRetries(() -> {
                return (Void) this.adminClient.get().createTopics(Collections.singleton(newTopic), createTopicsOptions).all().get();
            }, ExecutorUtil.RetryBehaviour.ON_RETRYABLE);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new KafkaResponseGetFailedException("Failed to guarantee existence of topic " + str, e);
        } catch (Exception e2) {
            throw new KafkaResponseGetFailedException("Failed to guarantee existence of topic " + str, e2);
        } catch (TopicAuthorizationException e3) {
            throw new KsqlTopicAuthorizationException(AclOperation.CREATE, Collections.singleton(str));
        } catch (TopicExistsException e4) {
            validateTopicProperties(str, i, s);
        }
    }

    private short getDefaultClusterReplication() {
        try {
            return Short.parseShort(getConfig().get(DEFAULT_REPLICATION_PROP).value());
        } catch (Exception e) {
            throw new KsqlServerException("Could not get default replication from Kafka cluster!", e);
        } catch (KsqlServerException e2) {
            throw e2;
        }
    }

    public boolean isTopicExists(String str) {
        LOG.trace("Checking for existence of topic '{}'", str);
        try {
            ExecutorUtil.executeWithRetries(() -> {
                return (TopicDescription) ((KafkaFuture) this.adminClient.get().describeTopics(ImmutableList.of(str), new DescribeTopicsOptions().includeAuthorizedOperations(true)).values().get(str)).get();
            }, ExecutorUtil.RetryBehaviour.ON_RETRYABLE.and(th -> {
                return !(th instanceof UnknownTopicOrPartitionException);
            }));
            return true;
        } catch (TopicAuthorizationException e) {
            throw new KsqlTopicAuthorizationException(AclOperation.DESCRIBE, Collections.singleton(str));
        } catch (Exception e2) {
            if (Throwables.getRootCause(e2) instanceof UnknownTopicOrPartitionException) {
                return false;
            }
            throw new KafkaResponseGetFailedException("Failed to check if exists for topic: " + str, e2);
        }
    }

    public Set<String> listTopicNames() {
        try {
            return (Set) ExecutorUtil.executeWithRetries(() -> {
                return (Set) this.adminClient.get().listTopics().names().get();
            }, ExecutorUtil.RetryBehaviour.ON_RETRYABLE);
        } catch (Exception e) {
            throw new KafkaResponseGetFailedException("Failed to retrieve Kafka Topic names", e);
        }
    }

    public Map<String, TopicDescription> describeTopics(Collection<String> collection) {
        try {
            return (Map) ExecutorUtil.executeWithRetries(() -> {
                return (Map) this.adminClient.get().describeTopics(collection, new DescribeTopicsOptions().includeAuthorizedOperations(true)).all().get();
            }, ExecutorUtil.RetryBehaviour.ON_RETRYABLE);
        } catch (ExecutionException e) {
            throw new KafkaResponseGetFailedException("Failed to Describe Kafka Topic(s): " + collection, e.getCause());
        } catch (Exception e2) {
            throw new KafkaResponseGetFailedException("Failed to Describe Kafka Topic(s): " + collection, e2);
        } catch (TopicAuthorizationException e3) {
            throw new KsqlTopicAuthorizationException(AclOperation.DESCRIBE, collection);
        }
    }

    public Map<String, String> getTopicConfig(String str) {
        return topicConfig(str, true);
    }

    public boolean addTopicConfig(String str, Map<String, ?> map) {
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, str);
        Map<String, String> stringConfigs = toStringConfigs(map);
        try {
            Map<String, String> map2 = topicConfig(str, false);
            if (!stringConfigs.entrySet().stream().anyMatch(entry -> {
                return !Objects.equals(map2.get(entry.getKey()), entry.getValue());
            })) {
                return false;
            }
            Map singletonMap = Collections.singletonMap(configResource, (Set) stringConfigs.entrySet().stream().map(entry2 -> {
                return new ConfigEntry((String) entry2.getKey(), (String) entry2.getValue());
            }).map(configEntry -> {
                return new AlterConfigOp(configEntry, AlterConfigOp.OpType.SET);
            }).collect(Collectors.toSet()));
            ExecutorUtil.executeWithRetries(() -> {
                return (Void) this.adminClient.get().incrementalAlterConfigs(singletonMap).all().get();
            }, ExecutorUtil.RetryBehaviour.ON_RETRYABLE);
            return true;
        } catch (Exception e) {
            throw new KafkaResponseGetFailedException("Failed to set config for Kafka Topic " + str, e);
        } catch (UnsupportedVersionException e2) {
            return addTopicConfigLegacy(str, stringConfigs);
        }
    }

    public KafkaTopicClient.TopicCleanupPolicy getTopicCleanupPolicy(String str) {
        String lowerCase = getTopicConfig(str).getOrDefault("cleanup.policy", "").toLowerCase();
        if (lowerCase.equals("compact")) {
            return KafkaTopicClient.TopicCleanupPolicy.COMPACT;
        }
        if (lowerCase.equals("delete")) {
            return KafkaTopicClient.TopicCleanupPolicy.DELETE;
        }
        if (lowerCase.contains("compact") && lowerCase.contains("delete")) {
            return KafkaTopicClient.TopicCleanupPolicy.COMPACT_DELETE;
        }
        throw new KsqlException("Could not get the topic configs for : " + str);
    }

    public void deleteTopics(Collection<String> collection) {
        if (collection.isEmpty()) {
            return;
        }
        Map map = this.adminClient.get().deleteTopics(collection).topicNameValues();
        ArrayList newArrayList = Lists.newArrayList();
        ArrayList newArrayList2 = Lists.newArrayList();
        for (Map.Entry entry : map.entrySet()) {
            try {
                ((KafkaFuture) entry.getValue()).get(30L, TimeUnit.SECONDS);
            } catch (Exception e) {
                Throwable rootCause = ExceptionUtils.getRootCause(e);
                if (rootCause instanceof TopicDeletionDisabledException) {
                    throw new TopicDeletionDisabledException("Topic deletion is disabled. To delete the topic, you must set 'delete.topic.enable' to true in the Kafka broker configuration.");
                }
                if (rootCause instanceof TopicAuthorizationException) {
                    throw new KsqlTopicAuthorizationException(AclOperation.DELETE, Collections.singleton(entry.getKey()));
                }
                if (!(rootCause instanceof UnknownTopicOrPartitionException)) {
                    LOG.error(String.format("Could not delete topic '%s'", entry.getKey()), e);
                    newArrayList.add(entry.getKey());
                    newArrayList2.add(new Pair(entry.getKey(), rootCause));
                }
            }
        }
        if (!newArrayList.isEmpty()) {
            throw new KafkaDeleteTopicsException("Failed to clean up topics: " + String.join(",", newArrayList), newArrayList2);
        }
    }

    public void deleteInternalTopics(String str) {
        try {
            Set<String> listTopicNames = listTopicNames();
            ArrayList newArrayList = Lists.newArrayList();
            for (String str2 : listTopicNames) {
                if (isInternalTopic(str2, str)) {
                    newArrayList.add(str2);
                }
            }
            if (!newArrayList.isEmpty()) {
                deleteTopics(newArrayList);
            }
        } catch (Exception e) {
            LOG.error("Exception while trying to clean up internal topics for application id: {}.", str, e);
        }
    }

    public Map<TopicPartition, Long> listTopicsOffsets(Collection<String> collection, OffsetSpec offsetSpec) {
        Map map = (Map) describeTopics(collection).entrySet().stream().flatMap(entry -> {
            return ((TopicDescription) entry.getValue()).partitions().stream().map(topicPartitionInfo -> {
                return new TopicPartition((String) entry.getKey(), topicPartitionInfo.partition());
            });
        }).collect(Collectors.toMap(topicPartition -> {
            return topicPartition;
        }, topicPartition2 -> {
            return offsetSpec;
        }));
        try {
            return (Map) ExecutorUtil.executeWithRetries(() -> {
                return (Map) ((Map) this.adminClient.get().listOffsets(map).all().get()).entrySet().stream().collect(Collectors.toMap((v0) -> {
                    return v0.getKey();
                }, entry2 -> {
                    return Long.valueOf(((ListOffsetsResult.ListOffsetsResultInfo) entry2.getValue()).offset());
                }));
            }, ExecutorUtil.RetryBehaviour.ON_RETRYABLE);
        } catch (ExecutionException e) {
            throw new KafkaResponseGetFailedException("Failed to get topic offsets. partitions: " + map.keySet(), e.getCause());
        } catch (Exception e2) {
            throw new KafkaResponseGetFailedException("Failed to get topic offsets. partitions: " + map.keySet(), e2);
        } catch (TopicAuthorizationException e3) {
            throw new KsqlTopicAuthorizationException(AclOperation.DESCRIBE, e3.unauthorizedTopics());
        }
    }

    private Config getConfig() {
        return KafkaClusterUtil.getConfig(this.adminClient.get());
    }

    private static boolean isInternalTopic(String str, String str2) {
        return str.startsWith(new StringBuilder().append(str2).append("-").toString()) && (str.endsWith("-changelog") || str.endsWith("-repartition"));
    }

    private void validateTopicProperties(String str, int i, int i2) {
        TopicValidationUtil.validateTopicProperties(i, i2, describeTopic(str));
        LOG.debug("Did not create topic {} with {} partitions and replication-factor {} since it exists", new Object[]{str, Integer.valueOf(i), Integer.valueOf(i2)});
    }

    private Map<String, String> topicConfig(String str, boolean z) {
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, str);
        List singletonList = Collections.singletonList(configResource);
        try {
            return (Map) ((Config) ((Map) ExecutorUtil.executeWithRetries(() -> {
                return (Map) this.adminClient.get().describeConfigs(singletonList).all().get();
            }, ExecutorUtil.RetryBehaviour.ON_RETRYABLE)).get(configResource)).entries().stream().filter(configEntry -> {
                return configEntry.value() != null;
            }).filter(configEntry2 -> {
                return z || !configEntry2.isDefault();
            }).collect(Collectors.toMap((v0) -> {
                return v0.name();
            }, (v0) -> {
                return v0.value();
            }));
        } catch (Exception e) {
            throw new KafkaResponseGetFailedException("Failed to get config for Kafka Topic " + str, e);
        }
    }

    private boolean addTopicConfigLegacy(String str, Map<String, String> map) {
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, str);
        try {
            Map<String, String> map2 = topicConfig(str, false);
            map2.putAll(map);
            Map singletonMap = Collections.singletonMap(configResource, new Config((Set) map2.entrySet().stream().map(entry -> {
                return new ConfigEntry((String) entry.getKey(), (String) entry.getValue());
            }).collect(Collectors.toSet())));
            ExecutorUtil.executeWithRetries(() -> {
                return (Void) this.adminClient.get().alterConfigs(singletonMap).all().get();
            }, ExecutorUtil.RetryBehaviour.ON_RETRYABLE);
            return true;
        } catch (Exception e) {
            throw new KafkaResponseGetFailedException("Failed to set config for Kafka Topic " + str, e);
        }
    }

    private static Map<String, String> toStringConfigs(Map<String, ?> map) {
        return (Map) map.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return entry.getValue().toString();
        }));
    }
}
