package org.apache.kafka.server.log.remote.metadata.storage;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.internals.FatalExitError;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.class */
public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataManager {
    private static final Logger log = LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManager.class);
    private volatile boolean configured;
    private final AtomicBoolean closing;
    private final AtomicBoolean initialized;
    private final Time time;
    private final boolean startConsumerThread;
    private Thread initializationThread;
    private volatile ProducerManager producerManager;
    private volatile ConsumerManager consumerManager;
    private final ReentrantReadWriteLock lock;
    private RemotePartitionMetadataStore remotePartitionMetadataStore;
    private volatile TopicBasedRemoteLogMetadataManagerConfig rlmmConfig;
    private volatile RemoteLogMetadataTopicPartitioner rlmTopicPartitioner;
    private final Set<TopicIdPartition> pendingAssignPartitions;
    private volatile boolean initializationFailed;
    private final Supplier<RemotePartitionMetadataStore> remoteLogMetadataManagerSupplier;
    private final Function<Integer, RemoteLogMetadataTopicPartitioner> remoteLogMetadataTopicPartitionerFunction;

    public TopicBasedRemoteLogMetadataManager() {
        this(true, (v1) -> {
            return new RemoteLogMetadataTopicPartitioner(v1);
        }, RemotePartitionMetadataStore::new);
    }

    TopicBasedRemoteLogMetadataManager(boolean z, Function<Integer, RemoteLogMetadataTopicPartitioner> function, Supplier<RemotePartitionMetadataStore> supplier) {
        this.configured = false;
        this.closing = new AtomicBoolean(false);
        this.initialized = new AtomicBoolean(false);
        this.time = Time.SYSTEM;
        this.lock = new ReentrantReadWriteLock();
        this.pendingAssignPartitions = Collections.synchronizedSet(new HashSet());
        this.startConsumerThread = z;
        this.remoteLogMetadataManagerSupplier = supplier;
        this.remoteLogMetadataTopicPartitionerFunction = function;
    }

    public CompletableFuture<Void> addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException {
        Objects.requireNonNull(remoteLogSegmentMetadata, "remoteLogSegmentMetadata can not be null");
        this.lock.readLock().lock();
        try {
            ensureInitializedAndNotClosed();
            if (remoteLogSegmentMetadata.state() != RemoteLogSegmentState.COPY_SEGMENT_STARTED) {
                throw new IllegalArgumentException("Given remoteLogSegmentMetadata should have state as " + RemoteLogSegmentState.COPY_SEGMENT_STARTED + " but it contains state as: " + remoteLogSegmentMetadata.state());
            }
            return storeRemoteLogMetadata(remoteLogSegmentMetadata.remoteLogSegmentId().topicIdPartition(), remoteLogSegmentMetadata);
        } finally {
            this.lock.readLock().unlock();
        }
    }

    public CompletableFuture<Void> updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate remoteLogSegmentMetadataUpdate) throws RemoteStorageException {
        Objects.requireNonNull(remoteLogSegmentMetadataUpdate, "segmentMetadataUpdate can not be null");
        this.lock.readLock().lock();
        try {
            ensureInitializedAndNotClosed();
            if (remoteLogSegmentMetadataUpdate.state() == RemoteLogSegmentState.COPY_SEGMENT_STARTED) {
                throw new IllegalArgumentException("Given remoteLogSegmentMetadata should not have the state as: " + RemoteLogSegmentState.COPY_SEGMENT_STARTED);
            }
            return storeRemoteLogMetadata(remoteLogSegmentMetadataUpdate.remoteLogSegmentId().topicIdPartition(), remoteLogSegmentMetadataUpdate);
        } finally {
            this.lock.readLock().unlock();
        }
    }

    public CompletableFuture<Void> putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata remotePartitionDeleteMetadata) throws RemoteStorageException {
        Objects.requireNonNull(remotePartitionDeleteMetadata, "remotePartitionDeleteMetadata can not be null");
        this.lock.readLock().lock();
        try {
            ensureInitializedAndNotClosed();
            return storeRemoteLogMetadata(remotePartitionDeleteMetadata.topicIdPartition(), remotePartitionDeleteMetadata);
        } finally {
            this.lock.readLock().unlock();
        }
    }

    private CompletableFuture<Void> storeRemoteLogMetadata(TopicIdPartition topicIdPartition, RemoteLogMetadata remoteLogMetadata) throws RemoteStorageException {
        log.debug("Storing the partition: {} metadata: {}", topicIdPartition, remoteLogMetadata);
        try {
            return this.producerManager.publishMessage(remoteLogMetadata).thenAcceptAsync(recordMetadata -> {
                try {
                    this.consumerManager.waitTillConsumptionCatchesUp(recordMetadata);
                } catch (TimeoutException e) {
                    throw new KafkaException(e);
                }
            });
        } catch (KafkaException e) {
            if (e instanceof RetriableException) {
                throw e;
            }
            throw new RemoteStorageException(e);
        }
    }

    public Optional<RemoteLogSegmentMetadata> remoteLogSegmentMetadata(TopicIdPartition topicIdPartition, int i, long j) throws RemoteStorageException {
        this.lock.readLock().lock();
        try {
            ensureInitializedAndNotClosed();
            Optional<RemoteLogSegmentMetadata> remoteLogSegmentMetadata = this.remotePartitionMetadataStore.remoteLogSegmentMetadata(topicIdPartition, j, i);
            this.lock.readLock().unlock();
            return remoteLogSegmentMetadata;
        } catch (Throwable th) {
            this.lock.readLock().unlock();
            throw th;
        }
    }

    public Optional<Long> highestOffsetForEpoch(TopicIdPartition topicIdPartition, int i) throws RemoteStorageException {
        this.lock.readLock().lock();
        try {
            ensureInitializedAndNotClosed();
            Optional<Long> highestLogOffset = this.remotePartitionMetadataStore.highestLogOffset(topicIdPartition, i);
            this.lock.readLock().unlock();
            return highestLogOffset;
        } catch (Throwable th) {
            this.lock.readLock().unlock();
            throw th;
        }
    }

    public Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicIdPartition topicIdPartition) throws RemoteStorageException {
        Objects.requireNonNull(topicIdPartition, "topicIdPartition can not be null");
        this.lock.readLock().lock();
        try {
            ensureInitializedAndNotClosed();
            return this.remotePartitionMetadataStore.listRemoteLogSegments(topicIdPartition);
        } finally {
            this.lock.readLock().unlock();
        }
    }

    public Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicIdPartition topicIdPartition, int i) throws RemoteStorageException {
        Objects.requireNonNull(topicIdPartition, "topicIdPartition can not be null");
        this.lock.readLock().lock();
        try {
            ensureInitializedAndNotClosed();
            Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments = this.remotePartitionMetadataStore.listRemoteLogSegments(topicIdPartition, i);
            this.lock.readLock().unlock();
            return listRemoteLogSegments;
        } catch (Throwable th) {
            this.lock.readLock().unlock();
            throw th;
        }
    }

    public int metadataPartition(TopicIdPartition topicIdPartition) {
        return this.rlmTopicPartitioner.metadataPartition(topicIdPartition);
    }

    public Optional<Long> readOffsetForPartition(int i) {
        return this.consumerManager.readOffsetForPartition(i);
    }

    public void onPartitionLeadershipChanges(Set<TopicIdPartition> set, Set<TopicIdPartition> set2) {
        Objects.requireNonNull(set, "leaderPartitions can not be null");
        Objects.requireNonNull(set2, "followerPartitions can not be null");
        log.info("Received leadership notifications with leader partitions {} and follower partitions {}", set, set2);
        this.lock.readLock().lock();
        try {
            if (this.closing.get()) {
                throw new IllegalStateException("This instance is in closing state");
            }
            HashSet hashSet = new HashSet(set);
            hashSet.addAll(set2);
            if (this.initialized.get()) {
                assignPartitions(hashSet);
            } else {
                this.pendingAssignPartitions.addAll(hashSet);
            }
        } finally {
            this.lock.readLock().unlock();
        }
    }

    private void assignPartitions(Set<TopicIdPartition> set) {
        Iterator<TopicIdPartition> it = set.iterator();
        while (it.hasNext()) {
            this.remotePartitionMetadataStore.maybeLoadPartition(it.next());
        }
        this.consumerManager.addAssignmentsForPartitions(set);
    }

    public void onStopPartitions(Set<TopicIdPartition> set) {
        this.lock.readLock().lock();
        try {
            if (this.closing.get()) {
                throw new IllegalStateException("This instance is in closing state");
            }
            if (this.initialized.get()) {
                this.consumerManager.removeAssignmentsForPartitions(set);
            } else if (!this.pendingAssignPartitions.isEmpty()) {
                this.pendingAssignPartitions.removeAll(set);
            }
        } finally {
            this.lock.readLock().unlock();
        }
    }

    public long remoteLogSize(TopicIdPartition topicIdPartition, int i) throws RemoteStorageException {
        long j = 0;
        while (this.remotePartitionMetadataStore.listRemoteLogSegments(topicIdPartition, i).hasNext()) {
            j += r0.next().segmentSizeInBytes();
        }
        return j;
    }

    public void configure(Map<String, ?> map) {
        Objects.requireNonNull(map, "configs can not be null.");
        this.lock.writeLock().lock();
        try {
            if (this.configured) {
                log.info("Skipping configure as it is already configured.");
                return;
            }
            log.info("Started configuring topic-based RLMM with configs: {}", map);
            this.rlmmConfig = new TopicBasedRemoteLogMetadataManagerConfig(map);
            this.rlmTopicPartitioner = this.remoteLogMetadataTopicPartitionerFunction.apply(Integer.valueOf(this.rlmmConfig.metadataTopicPartitionsCount()));
            this.remotePartitionMetadataStore = this.remoteLogMetadataManagerSupplier.get();
            this.configured = true;
            log.info("Successfully configured topic-based RLMM with config: {}", this.rlmmConfig);
            this.initializationThread = KafkaThread.nonDaemon("RLMMInitializationThread", this::initializeResources);
            this.initializationThread.start();
        } finally {
            this.lock.writeLock().unlock();
        }
    }

    private void initializeResources() {
        log.info("Initializing topic-based RLMM resources");
        NewTopic createRemoteLogMetadataTopicRequest = createRemoteLogMetadataTopicRequest();
        boolean z = false;
        long milliseconds = this.time.milliseconds();
        try {
            try {
                Admin create = Admin.create(this.rlmmConfig.commonProperties());
                while (!this.initialized.get() && !this.closing.get()) {
                    if (this.time.milliseconds() - milliseconds > this.rlmmConfig.initializationRetryMaxTimeoutMs()) {
                        log.error("Timed out in initializing the resources, retried to initialize the resource for {} ms.", Long.valueOf(this.rlmmConfig.initializationRetryMaxTimeoutMs()));
                        this.initializationFailed = true;
                        Utils.closeQuietly(create, "AdminClient");
                        return;
                    }
                    if (!z) {
                        z = createTopic(create, createRemoteLogMetadataTopicRequest);
                    }
                    if (z) {
                        try {
                            if (!isPartitionsCountSameAsConfigured(create, createRemoteLogMetadataTopicRequest.name())) {
                                this.initializationFailed = true;
                            }
                            this.lock.writeLock().lock();
                            try {
                                try {
                                    this.producerManager = new ProducerManager(this.rlmmConfig, this.rlmTopicPartitioner);
                                    this.consumerManager = new ConsumerManager(this.rlmmConfig, this.remotePartitionMetadataStore, this.rlmTopicPartitioner, this.time);
                                    if (this.startConsumerThread) {
                                        this.consumerManager.startConsumerThread();
                                    } else {
                                        log.info("RLMM Consumer task thread is not configured to be started.");
                                    }
                                    if (!this.pendingAssignPartitions.isEmpty()) {
                                        assignPartitions(this.pendingAssignPartitions);
                                        this.pendingAssignPartitions.clear();
                                    }
                                    this.initialized.set(true);
                                    log.info("Initialized topic-based RLMM resources successfully");
                                    this.lock.writeLock().unlock();
                                } catch (Throwable th) {
                                    this.lock.writeLock().unlock();
                                    throw th;
                                }
                            } catch (Exception e) {
                                log.error("Encountered error while initializing producer/consumer", e);
                                this.lock.writeLock().unlock();
                                Utils.closeQuietly(create, "AdminClient");
                                return;
                            }
                        } catch (Exception e2) {
                            log.info("Sleep for {} ms before it is retried again.", Long.valueOf(this.rlmmConfig.initializationRetryIntervalMs()));
                            Utils.sleep(this.rlmmConfig.initializationRetryIntervalMs());
                        }
                    } else {
                        log.info("Sleep for {} ms before it is retried again.", Long.valueOf(this.rlmmConfig.initializationRetryIntervalMs()));
                        Utils.sleep(this.rlmmConfig.initializationRetryIntervalMs());
                    }
                }
                Utils.closeQuietly(create, "AdminClient");
            } catch (Exception e3) {
                log.error("Encountered error while initializing topic-based RLMM resources", e3);
                this.initializationFailed = true;
                Utils.closeQuietly((AutoCloseable) null, "AdminClient");
            }
        } catch (Throwable th2) {
            Utils.closeQuietly((AutoCloseable) null, "AdminClient");
            throw th2;
        }
    }

    boolean doesTopicExist(Admin admin, String str) {
        try {
            TopicDescription topicDescription = (TopicDescription) ((KafkaFuture) admin.describeTopics(Collections.singleton(str)).topicNameValues().get(str)).get();
            log.info("Topic {} exists. Description: {}", str, topicDescription);
            return topicDescription != null;
        } catch (InterruptedException | ExecutionException e) {
            log.info("Topic {} does not exist. Error: {}", str, e.getCause().getMessage());
            return false;
        }
    }

    private boolean isPartitionsCountSameAsConfigured(Admin admin, String str) throws InterruptedException, ExecutionException {
        log.debug("Getting topic details to check for partition count and replication factor.");
        TopicDescription topicDescription = (TopicDescription) ((KafkaFuture) admin.describeTopics(Collections.singleton(str)).topicNameValues().get(str)).get();
        int metadataTopicPartitionsCount = this.rlmmConfig.metadataTopicPartitionsCount();
        int size = topicDescription.partitions().size();
        if (size == metadataTopicPartitionsCount) {
            return true;
        }
        log.error("Existing topic partition count [{}] is not same as the expected partition count [{}]", Integer.valueOf(size), Integer.valueOf(metadataTopicPartitionsCount));
        return false;
    }

    private NewTopic createRemoteLogMetadataTopicRequest() {
        HashMap hashMap = new HashMap();
        hashMap.put("retention.ms", Long.toString(this.rlmmConfig.metadataTopicRetentionMs()));
        hashMap.put("cleanup.policy", "delete");
        hashMap.put("remote.storage.enable", "false");
        return new NewTopic(this.rlmmConfig.remoteLogMetadataTopicName(), this.rlmmConfig.metadataTopicPartitionsCount(), this.rlmmConfig.metadataTopicReplicationFactor()).configs(hashMap);
    }

    private boolean createTopic(Admin admin, NewTopic newTopic) {
        boolean z = false;
        String name = newTopic.name();
        try {
            z = doesTopicExist(admin, name);
            if (!z) {
                CreateTopicsResult createTopics = admin.createTopics(Collections.singleton(newTopic));
                createTopics.all().get();
                log.info("Topic {} created. TopicId: {}, numPartitions: {}, replicationFactor: {}, config: {}", new Object[]{name, createTopics.topicId(name).get(), createTopics.numPartitions(name).get(), createTopics.replicationFactor(name).get(), (List) ((Config) createTopics.config(name).get()).entries().stream().filter(configEntry -> {
                    return !configEntry.isDefault();
                }).map(configEntry2 -> {
                    return configEntry2.name() + "=" + configEntry2.value();
                }).collect(Collectors.toList())});
                z = true;
            }
        } catch (Exception e) {
            if (e.getCause() instanceof TopicExistsException) {
                log.info("Topic [{}] already exists", name);
                z = true;
            } else {
                log.error("Encountered error while creating {} topic.", name, e);
            }
        }
        return z;
    }

    public boolean isInitialized() {
        return this.initialized.get();
    }

    private void ensureInitializedAndNotClosed() {
        if (this.initializationFailed) {
            throw new FatalExitError();
        }
        if (this.closing.get() || !this.initialized.get()) {
            throw new IllegalStateException("This instance is in invalid state, initialized: " + this.initialized + " close: " + this.closing);
        }
    }

    public TopicBasedRemoteLogMetadataManagerConfig config() {
        return this.rlmmConfig;
    }

    public void close() throws IOException {
        log.info("Closing topic-based RLMM resources");
        if (this.closing.compareAndSet(false, true)) {
            this.lock.writeLock().lock();
            try {
                if (this.initializationThread != null) {
                    try {
                        this.initializationThread.join();
                    } catch (InterruptedException e) {
                        log.error("Initialization thread was interrupted while waiting to join on close.", e);
                    }
                }
                Utils.closeQuietly(this.producerManager, "ProducerTask");
                Utils.closeQuietly(this.consumerManager, "RLMMConsumerManager");
                Utils.closeQuietly(this.remotePartitionMetadataStore, "RemotePartitionMetadataStore");
            } finally {
                this.lock.writeLock().unlock();
                log.info("Closed topic-based RLMM resources");
            }
        }
    }
}
