/*
 * Decompiled with CFR 0.152.
 */
package kafka.tier.topic;

import io.confluent.rest.TierTopicHeadDataLossDetectionRequest;
import io.confluent.rest.TierTopicHeadDataLossDetectionResponse;
import java.time.Duration;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import kafka.server.InternalAdmin;
import kafka.tier.TopicIdPartition;
import kafka.tier.client.TierTopicProducerSupplier;
import kafka.tier.domain.AbstractTierMetadata;
import kafka.tier.exceptions.TierMetadataFatalException;
import kafka.tier.exceptions.TierMetadataRetriableException;
import kafka.tier.state.TierPartitionState;
import kafka.tier.topic.TierTopic;
import kafka.tier.topic.TierTopicAppender;
import kafka.tier.topic.TierTopicConsumer;
import kafka.tier.topic.TierTopicDataLossValidator;
import kafka.tier.topic.TierTopicManagerConfig;
import kafka.tier.topic.recovery.ValidationSource;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TierTopicManager
implements Runnable,
TierTopicAppender {
    private static final Logger log = LoggerFactory.getLogger(TierTopicManager.class);
    private static final int TOPIC_CREATION_BACKOFF_MS = 2000;
    private static final long TIER_TOPIC_DATA_LOSS_DETECTION_TIMEOUT_MS = Duration.ofMinutes(15L).toMillis();
    private final TierTopicManagerConfig config;
    private final Supplier<Producer<byte[], byte[]>> producerSupplier;
    private final Supplier<InternalAdmin> internalAdminSupplier;
    private final TierTopic tierTopic;
    private final TierTopicConsumer tierTopicConsumer;
    private final AtomicBoolean ready = new AtomicBoolean(false);
    private final AtomicBoolean shutdown = new AtomicBoolean(false);
    private final ReentrantReadWriteLock sendLock = new ReentrantReadWriteLock();
    private final Map<AbstractTierMetadata, CompletableFuture<TierPartitionState.AppendResult>> queuedRequests = new LinkedHashMap<AbstractTierMetadata, CompletableFuture<TierPartitionState.AppendResult>>();
    private final Thread becomeReadyThread = new KafkaThread("TierTopicManagerThread", (Runnable)this, false);
    private final TierTopicDataLossValidator dataLossValidator;
    private volatile Producer<byte[], byte[]> producer;

    public TierTopicManager(TierTopicManagerConfig config, TierTopic tierTopic, TierTopicConsumer tierTopicConsumer, Supplier<Producer<byte[], byte[]>> producerSupplier, Supplier<InternalAdmin> internalAdminSupplier, TierTopicDataLossValidator dataLossValidator) {
        if (config.logDirs.size() > 1) {
            throw new UnsupportedOperationException("Tiered storage does not support multiple log directories");
        }
        this.config = config;
        this.tierTopic = tierTopic;
        this.tierTopicConsumer = tierTopicConsumer;
        this.producerSupplier = producerSupplier;
        this.internalAdminSupplier = internalAdminSupplier;
        this.dataLossValidator = dataLossValidator;
    }

    public TierTopicManager(TierTopicManagerConfig config, TierTopic tierTopic, TierTopicConsumer tierTopicConsumer, Supplier<InternalAdmin> internalAdminSupplier, Time time, TierTopicDataLossValidator dataLossValidator) {
        this(config, tierTopic, tierTopicConsumer, new TierTopicProducerSupplier(config, time), internalAdminSupplier, dataLossValidator);
    }

    public void startup() {
        this.becomeReadyThread.start();
    }

    @Override
    public void run() {
        try {
            while (!this.ready.get() && !this.shutdown.get()) {
                if (this.tryBecomeReady(true)) continue;
                log.warn("Failed to become ready. Retrying in {}ms.", (Object)2000);
                Thread.sleep(2000L);
            }
        }
        catch (Exception e) {
            if (this.shutdown.get()) {
                log.debug("Ignoring exception caught during shutdown", (Throwable)e);
            } else {
                log.error("Caught fatal exception in TierTopicManager", (Throwable)e);
            }
        }
        finally {
            log.info("TierTopicManager thread exited. ready: {} shutdown: {}", (Object)this.ready.get(), (Object)this.shutdown.get());
        }
    }

    @Override
    public CompletableFuture<TierPartitionState.AppendResult> addMetadata(AbstractTierMetadata metadata) {
        CompletableFuture<TierPartitionState.AppendResult> future = new CompletableFuture<TierPartitionState.AppendResult>();
        this.addMetadata(metadata, future);
        return future;
    }

    public boolean isReady() {
        return this.ready.get();
    }

    @Override
    public boolean isReadyForWrites() {
        return this.ready.get() && (this.tierTopicConsumer.status().isInitialized() || this.tierTopicConsumer.status().isRunning());
    }

    public static Set<TopicPartition> partitions(String topicName, int numPartitions) {
        return IntStream.range(0, numPartitions).mapToObj(partitionId -> new TopicPartition(topicName, partitionId)).collect(Collectors.toSet());
    }

    public boolean tryBecomeReady(boolean startConsumerThread) {
        if (this.config.interBrokerClientConfigs.get().isEmpty()) {
            log.info("Could not resolve bootstrap server. Will retry.");
            return false;
        }
        try {
            this.tierTopic.initialize(this.internalAdminSupplier.get(), this.config.configuredNumPartitions, this.config.configuredReplicationFactor);
        }
        catch (Exception e) {
            log.warn("Caught exception when ensuring tier topic is created. Will retry.", (Throwable)e);
            return false;
        }
        try {
            this.startProduceConsume(startConsumerThread);
        }
        catch (TimeoutException toe) {
            log.warn("Caught TimeoutException when starting produce consume. Will retry.", (Throwable)toe);
            return false;
        }
        return true;
    }

    public void shutdown() {
        if (this.shutdown.compareAndSet(false, true)) {
            this.dataLossValidator().shutdown();
            try {
                this.becomeReadyThread.join();
            }
            catch (InterruptedException e) {
                log.error("Shutdown interrupted", (Throwable)e);
            }
            finally {
                this.cleanup();
            }
        }
    }

    public TierTopicHeadDataLossDetectionResponse detectDataLossInTierTopicHeadOnDemand(TierTopicHeadDataLossDetectionRequest request, long timeoutMs) {
        this.sendLock.readLock().lock();
        try {
            if (!this.ready.get()) {
                throw new IllegalStateException("TierTopicManager is not ready yet, failing request.");
            }
            TierTopicHeadDataLossDetectionResponse tierTopicHeadDataLossDetectionResponse = this.detectDataLossInTierTopicHead(request, ValidationSource.ON_DEMAND_VALIDATION, timeoutMs);
            return tierTopicHeadDataLossDetectionResponse;
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        finally {
            this.sendLock.readLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void detectDataLossInTierTopicHeadDuringStartup() {
        this.sendLock.readLock().lock();
        try {
            this.detectDataLossInTierTopicHead(new TierTopicHeadDataLossDetectionRequest(ValidationSource.BOOTSTRAP_VALIDATION.toString(), new HashSet()), ValidationSource.BOOTSTRAP_VALIDATION, -1L);
        }
        catch (Exception e) {
            String errorMsgPrefix = "Could not complete tier topic data loss detection during startup.";
            String errorMsgSuffix = " This could mean the broker may have missed detecting data loss in tier topic for some partitions.";
            log.error(errorMsgPrefix + " due to an unexpected internal failure." + errorMsgSuffix, (Throwable)e);
        }
        finally {
            this.sendLock.readLock().unlock();
        }
    }

    TierTopicHeadDataLossDetectionResponse detectDataLossInTierTopicHead(TierTopicHeadDataLossDetectionRequest request, ValidationSource validationSource, long timeoutMs) throws ExecutionException, InterruptedException {
        if (this.shutdown.get()) {
            throw new IllegalStateException("TierTopicManager component is shutting down, failing request.");
        }
        TierTopicHeadDataLossDetectionResponse response = this.dataLossValidator().detectDataLossInTierTopicHead(request, validationSource, this.producer, timeoutMs);
        return response;
    }

    public TierTopicDataLossValidator dataLossValidator() {
        return this.dataLossValidator;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void startProduceConsume(boolean startConsumerThread) {
        if (this.producer == null) {
            this.producer = this.producerSupplier.get();
        }
        this.detectDataLossInTierTopicHeadDuringStartup();
        this.tierTopicConsumer.initialize(this.tierTopic);
        this.tierTopicConsumer.setupMetrics();
        if (startConsumerThread) {
            this.tierTopicConsumer.start();
        }
        TierTopicManager tierTopicManager = this;
        synchronized (tierTopicManager) {
            this.ready.set(true);
            for (Map.Entry<AbstractTierMetadata, CompletableFuture<TierPartitionState.AppendResult>> entry : this.queuedRequests.entrySet()) {
                this.addMetadata(entry.getKey(), entry.getValue());
            }
            this.queuedRequests.clear();
        }
    }

    private void cleanup() {
        this.sendLock.writeLock().lock();
        try {
            this.ready.set(false);
            if (this.producer != null) {
                this.producer.close(Duration.ofSeconds(1L));
            }
            for (CompletableFuture<TierPartitionState.AppendResult> future : this.queuedRequests.values()) {
                future.completeExceptionally(new TierMetadataFatalException("Tier topic manager shutting down"));
            }
            this.queuedRequests.clear();
        }
        finally {
            this.tierTopicConsumer.shutdown();
            this.sendLock.writeLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private void addMetadata(AbstractTierMetadata metadata, CompletableFuture<TierPartitionState.AppendResult> future) {
        this.sendLock.readLock().lock();
        try {
            if (this.shutdown.get()) {
                future.completeExceptionally(new CancellationException("TierTopicManager component is shutting down, failing request."));
                return;
            }
            TierTopicManager tierTopicManager = this;
            synchronized (tierTopicManager) {
                if (!this.ready.get()) {
                    CompletableFuture<TierPartitionState.AppendResult> previous = this.queuedRequests.put(metadata, future);
                    if (previous == null) return;
                    previous.completeExceptionally(new TierMetadataFatalException("A new request is being queued obsoleting existing request for: " + metadata));
                    return;
                }
            }
            TopicIdPartition topicPartition = metadata.topicIdPartition();
            this.tierTopicConsumer.trackMaterialization(metadata, future);
            TopicPartition tierTopicPartition = this.tierTopic.toTierTopicPartition(topicPartition);
            this.producer.send(new ProducerRecord(tierTopicPartition.topic(), Integer.valueOf(tierTopicPartition.partition()), (Object)metadata.serializeKey(), (Object)metadata.serializeValue()), (recordMetadata, exception) -> {
                if (exception != null) {
                    this.tierTopicConsumer.cancelTracked(metadata);
                    if (TierTopicManager.retriable(exception)) {
                        future.completeExceptionally((Throwable)((Object)new TierMetadataRetriableException("Retriable exception sending tier metadata.", exception)));
                    } else {
                        future.completeExceptionally(new TierMetadataFatalException("Fatal exception sending tier metadata.", exception));
                    }
                }
            });
            return;
        }
        finally {
            this.sendLock.readLock().unlock();
        }
    }

    private static boolean retriable(Exception e) {
        return e instanceof RetriableException;
    }
}

