package org.apache.kafka.streams.processor.internals;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.internals.ClientUtils;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/InternalTopicManager.class */
public class InternalTopicManager {
    private static final String INTERRUPTED_ERROR_MESSAGE = "Thread got interrupted. This indicates a bug. Please report at https://issues.apache.org/jira/projects/KAFKA or dev-mailing list (https://kafka.apache.org/contact).";
    private final Time time;
    private final Admin adminClient;
    private final short replicationFactor;
    private final long windowChangeLogAdditionalRetention;
    private final long retryBackOffMs;
    private final long retryTimeoutMs;
    private final Map<String, String> defaultTopicConfigs = new HashMap();
    private final Logger log = new LogContext(String.format("stream-thread [%s] ", Thread.currentThread().getName())).logger(getClass());

    /* JADX WARN: Multi-variable type inference failed */
    public InternalTopicManager(Time time, Admin admin, StreamsConfig streamsConfig) {
        this.time = time;
        this.adminClient = admin;
        this.replicationFactor = streamsConfig.getInt(StreamsConfig.REPLICATION_FACTOR_CONFIG).shortValue();
        this.windowChangeLogAdditionalRetention = streamsConfig.getLong(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG).longValue();
        this.retryBackOffMs = streamsConfig.getLong(StreamsConfig.RETRY_BACKOFF_MS_CONFIG).longValue();
        Map<String, Object> mainConsumerConfigs = streamsConfig.getMainConsumerConfigs("dummy", "dummy", -1);
        mainConsumerConfigs.put("key.deserializer", ByteArrayDeserializer.class);
        mainConsumerConfigs.put("value.deserializer", ByteArrayDeserializer.class);
        this.retryTimeoutMs = new ClientUtils.QuietConsumerConfig(mainConsumerConfigs).getInt("max.poll.interval.ms").intValue() / 2;
        this.log.debug("Configs:" + Utils.NL + "\t{} = {}" + Utils.NL + "\t{} = {}", new Object[]{StreamsConfig.REPLICATION_FACTOR_CONFIG, Short.valueOf(this.replicationFactor), StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, Long.valueOf(this.windowChangeLogAdditionalRetention)});
        for (Map.Entry entry : streamsConfig.originalsWithPrefix(StreamsConfig.TOPIC_PREFIX).entrySet()) {
            if (entry.getValue() != null) {
                this.defaultTopicConfigs.put(entry.getKey(), entry.getValue().toString());
            }
        }
    }

    public Set<String> makeReady(Map<String, InternalTopicConfig> map) {
        this.log.debug("Starting to validate internal topics {} in partition assignor.", map);
        long milliseconds = this.time.milliseconds() + this.retryTimeoutMs;
        Set<String> hashSet = new HashSet(map.keySet());
        HashSet hashSet2 = new HashSet();
        while (!hashSet.isEmpty()) {
            HashSet hashSet3 = new HashSet();
            hashSet = validateTopics(hashSet, map, hashSet3);
            hashSet2.addAll(hashSet);
            if (!hashSet.isEmpty()) {
                HashSet hashSet4 = new HashSet();
                for (String str : hashSet) {
                    if (!hashSet3.contains(str)) {
                        InternalTopicConfig internalTopicConfig = (InternalTopicConfig) Objects.requireNonNull(map.get(str));
                        Map<String, String> properties = internalTopicConfig.getProperties(this.defaultTopicConfigs, this.windowChangeLogAdditionalRetention);
                        this.log.debug("Going to create topic {} with {} partitions and config {}.", new Object[]{internalTopicConfig.name(), internalTopicConfig.numberOfPartitions(), properties});
                        hashSet4.add(new NewTopic(internalTopicConfig.name(), internalTopicConfig.numberOfPartitions(), Optional.of(Short.valueOf(this.replicationFactor))).configs(properties));
                    }
                }
                for (Map.Entry entry : this.adminClient.createTopics(hashSet4).values().entrySet()) {
                    String str2 = (String) entry.getKey();
                    try {
                        ((KafkaFuture) entry.getValue()).get();
                        hashSet.remove(str2);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        this.log.error(INTERRUPTED_ERROR_MESSAGE, e);
                        throw new IllegalStateException(INTERRUPTED_ERROR_MESSAGE, e);
                    } catch (TimeoutException e2) {
                        this.log.error("Creating topic {} timed out.\nError message was: {}", str2, e2.toString());
                    } catch (ExecutionException e3) {
                        Throwable cause = e3.getCause();
                        if (!(cause instanceof TopicExistsException)) {
                            this.log.error("Unexpected error during topic creation for {}.\nError message was: {}", str2, cause.toString());
                            throw new StreamsException(String.format("Could not create topic %s.", str2), cause);
                        }
                        this.log.info("Could not create topic {}. Topic is probably marked for deletion (number of partitions is unknown).\nWill retry to create this topic in {} ms (to let broker finish async delete operation first).\nError message was: {}", new Object[]{str2, Long.valueOf(this.retryBackOffMs), cause.toString()});
                    }
                }
            }
            if (!hashSet.isEmpty()) {
                long milliseconds2 = this.time.milliseconds();
                if (milliseconds2 >= milliseconds) {
                    String format = String.format("Could not create topics within %d milliseconds. This can happen if the Kafka cluster is temporarily not available.", Long.valueOf(this.retryTimeoutMs));
                    this.log.error(format);
                    throw new TimeoutException(format);
                }
                this.log.info("Topics {} could not be made ready. Will retry in {} milliseconds. Remaining time in milliseconds: {}", new Object[]{hashSet, Long.valueOf(this.retryBackOffMs), Long.valueOf(milliseconds - milliseconds2)});
                Utils.sleep(this.retryBackOffMs);
            }
        }
        this.log.debug("Completed validating internal topics and created {}", hashSet2);
        return hashSet2;
    }

    protected Map<String, Integer> getNumPartitions(Set<String> set, Set<String> set2) {
        this.log.debug("Trying to check if topics {} have been created with expected number of partitions.", set);
        Map values = this.adminClient.describeTopics(set).values();
        HashMap hashMap = new HashMap();
        for (Map.Entry entry : values.entrySet()) {
            String str = (String) entry.getKey();
            try {
                hashMap.put(str, Integer.valueOf(((TopicDescription) ((KafkaFuture) entry.getValue()).get()).partitions().size()));
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                this.log.error(INTERRUPTED_ERROR_MESSAGE, e);
                throw new IllegalStateException(INTERRUPTED_ERROR_MESSAGE, e);
            } catch (ExecutionException e2) {
                Throwable cause = e2.getCause();
                if (cause instanceof UnknownTopicOrPartitionException) {
                    this.log.debug("Topic {} is unknown or not found, hence not existed yet.\nError message was: {}", str, cause.toString());
                } else {
                    if (!(cause instanceof LeaderNotAvailableException)) {
                        this.log.error("Unexpected error during topic description for {}.\nError message was: {}", str, cause.toString());
                        throw new StreamsException(String.format("Could not create topic %s.", str), cause);
                    }
                    set2.add(str);
                    this.log.debug("The leader of topic {} is not available.\nError message was: {}", str, cause.toString());
                }
            } catch (TimeoutException e3) {
                set2.add(str);
                this.log.debug("Describing topic {} (to get number of partitions) timed out.\nError message was: {}", str, e3.toString());
            }
        }
        return hashMap;
    }

    private Set<String> validateTopics(Set<String> set, Map<String, InternalTopicConfig> map, Set<String> set2) {
        if (!map.keySet().containsAll(set)) {
            throw new IllegalStateException("The topics map " + map.keySet() + " does not contain all the topics " + set + " trying to validate.");
        }
        Map<String, Integer> numPartitions = getNumPartitions(set, set2);
        HashSet hashSet = new HashSet();
        for (String str : set) {
            Optional<Integer> numberOfPartitions = map.get(str).numberOfPartitions();
            if (!numberOfPartitions.isPresent()) {
                this.log.error("Found undefined number of partitions for topic {}", str);
                throw new StreamsException("Topic " + str + " number of partitions not defined");
            }
            if (!numPartitions.containsKey(str)) {
                hashSet.add(str);
            } else if (!numPartitions.get(str).equals(numberOfPartitions.get())) {
                String format = String.format("Existing internal topic %s has invalid partitions: expected: %d; actual: %d. Use 'kafka.tools.StreamsResetter' tool to clean up invalid topics before processing.", str, numberOfPartitions.get(), numPartitions.get(str));
                this.log.error(format);
                throw new StreamsException(format);
            }
        }
        return hashSet;
    }
}
