package org.apache.kafka.metadata.ingester;

import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.metadata.placement.CellDescriber;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/kafka/metadata/ingester/Ingester.class */
public final class Ingester implements AutoCloseable {
    private final LogContext logContext;
    private final Logger log;
    private final IngestionWorkerFactory workerFactory;
    private final IngestionHandler handler;
    private final int maxOutstandingRecords;
    private final Map<Long, IngestionWorker> unclosedWorkers;
    private boolean closed;
    private volatile WorkerManager workerManager;

    /* loaded from: input_file:org/apache/kafka/metadata/ingester/Ingester$Builder.class */
    public static class Builder {
        private LogContext logContext = null;
        private IngestionWorkerFactory workerFactory = null;
        private IngestionHandler handler = null;
        private int maxOutstandingRecords = CellDescriber.K2_CELL;

        public Builder setLogContext(LogContext logContext) {
            this.logContext = logContext;
            return this;
        }

        public Builder setWorkerFactory(IngestionWorkerFactory ingestionWorkerFactory) {
            this.workerFactory = ingestionWorkerFactory;
            return this;
        }

        public Builder setHandler(IngestionHandler ingestionHandler) {
            this.handler = ingestionHandler;
            return this;
        }

        public Builder setMaxOutstandingRecords(int i) {
            this.maxOutstandingRecords = i;
            return this;
        }

        public Ingester build() {
            if (this.logContext == null) {
                this.logContext = new LogContext("[Ingester] ");
            }
            if (this.workerFactory == null) {
                throw new RuntimeException("You must set the IngestionWorkerFactory.");
            }
            if (this.handler == null) {
                throw new RuntimeException("You must set the IngestionHandler.");
            }
            return new Ingester(this.logContext, this.workerFactory, this.handler, this.maxOutstandingRecords);
        }
    }

    /* loaded from: input_file:org/apache/kafka/metadata/ingester/Ingester$IngestionCompletionHandler.class */
    static class IngestionCompletionHandler implements BiConsumer<Void, Throwable> {
        private final WorkerManager workerManager;
        private final int numRecords;

        IngestionCompletionHandler(WorkerManager workerManager, int i) {
            this.workerManager = workerManager;
            this.numRecords = i;
            workerManager.outstandingRecords.getAndAdd(i);
        }

        @Override // java.util.function.BiConsumer
        public void accept(Void r4, Throwable th) {
            this.workerManager.outstandingRecords.getAndAdd(-this.numRecords);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/metadata/ingester/Ingester$WorkerManager.class */
    public class WorkerManager implements IngestionWorkerManager {
        final long epoch;
        final AtomicInteger outstandingRecords = new AtomicInteger(0);

        WorkerManager(long j) {
            this.epoch = j;
        }

        @Override // org.apache.kafka.metadata.ingester.IngestionWorkerManager
        public LogContext logContext() {
            return Ingester.this.logContext;
        }

        @Override // org.apache.kafka.metadata.ingester.IngestionWorkerManager
        public long epoch() {
            return this.epoch;
        }

        @Override // org.apache.kafka.metadata.ingester.IngestionWorkerManager
        public boolean handleRecords(List<IngesterRecord> list) {
            if (isCurrentWorker()) {
                Ingester.this.handler.handle(this.epoch, list).whenComplete((BiConsumer<? super Void, ? super Throwable>) new IngestionCompletionHandler(this, list.size()));
                return this.outstandingRecords.get() < Ingester.this.maxOutstandingRecords;
            }
            Ingester.this.log.debug("Ignoring {} records from previous epoch {}.", Integer.valueOf(list.size()), Long.valueOf(this.epoch));
            return false;
        }

        @Override // org.apache.kafka.metadata.ingester.IngestionWorkerManager
        public void handleWorkerShutdownComplete(Throwable th) {
            Ingester.this.handleWorkerShutdownComplete(this);
        }

        private boolean isCurrentWorker() {
            return Ingester.this.isCurrentWorker(this);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static String offsetsToString(Map<TopicPartition, Long> map) {
        TreeMap treeMap = new TreeMap((topicPartition, topicPartition2) -> {
            int compareTo = topicPartition.topic().compareTo(topicPartition2.topic());
            return compareTo != 0 ? compareTo : Integer.compare(topicPartition.partition(), topicPartition2.partition());
        });
        treeMap.putAll(map);
        StringBuilder sb = new StringBuilder("[");
        String str = "";
        for (Map.Entry entry : treeMap.entrySet()) {
            sb.append(str);
            str = ", ";
            sb.append(entry.getKey()).append(": ").append(entry.getValue());
        }
        sb.append("]");
        return sb.toString();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Collection<String> findUniqueTopicNames(Collection<TopicPartition> collection) {
        HashSet hashSet = new HashSet();
        collection.forEach(topicPartition -> {
            hashSet.add(topicPartition.topic());
        });
        return hashSet;
    }

    private boolean isCurrentWorker(WorkerManager workerManager) {
        return this.workerManager == workerManager;
    }

    private synchronized void handleWorkerShutdownComplete(WorkerManager workerManager) {
        this.unclosedWorkers.remove(Long.valueOf(workerManager.epoch), workerManager);
        this.unclosedWorkers.notifyAll();
    }

    private Ingester(LogContext logContext, IngestionWorkerFactory ingestionWorkerFactory, IngestionHandler ingestionHandler, int i) {
        this.logContext = logContext;
        this.log = logContext.logger(Ingester.class);
        this.workerFactory = ingestionWorkerFactory;
        this.handler = ingestionHandler;
        this.maxOutstandingRecords = i;
        synchronized (this) {
            this.unclosedWorkers = new HashMap();
            this.closed = false;
            this.workerManager = null;
        }
    }

    @Override // java.lang.AutoCloseable
    public synchronized void close() throws InterruptedException {
        if (this.closed) {
            return;
        }
        this.closed = true;
        maybeBeginWorkerShutdown("we are closing the ingester");
        while (!this.unclosedWorkers.isEmpty()) {
            this.unclosedWorkers.wait(500L);
        }
    }

    public synchronized void start(long j, Map<TopicPartition, Long> map) {
        if (this.closed) {
            throw new RuntimeException("Cannot start a new ingestion epoch, because the ingester is closed.");
        }
        if (this.workerManager != null && this.workerManager.epoch == j) {
            throw new RuntimeException("Cannot start a new epoch " + j + " because that is the same as the current epoch.");
        }
        maybeBeginWorkerShutdown("we are starting epoch " + j);
        WorkerManager workerManager = new WorkerManager(j);
        this.unclosedWorkers.put(Long.valueOf(j), this.workerFactory.create(workerManager, map));
        this.workerManager = workerManager;
        this.log.info("Started ingestion worker at epoch {} with offsets {}", Long.valueOf(j), offsetsToString(map));
    }

    private synchronized void maybeBeginWorkerShutdown(String str) {
        if (this.workerManager == null) {
            this.log.debug("No need to stop because {}. There is no current ingestion worker.", str);
            return;
        }
        IngestionWorker ingestionWorker = this.unclosedWorkers.get(Long.valueOf(this.workerManager.epoch));
        if (ingestionWorker != null) {
            this.log.info("Stopping ingestion worker at epoch {} because {}.", Long.valueOf(this.workerManager.epoch), str);
            ingestionWorker.beginShutdown();
        } else {
            this.log.info("Found no ingestion worker to stop at epoch {} because {}.", Long.valueOf(this.workerManager.epoch), str);
        }
        this.workerManager = null;
    }
}
