package org.apache.kafka.metadata.ingester;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.ThreadUtils;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/kafka/metadata/ingester/KafkaConsumerIngestionWorker.class */
public final class KafkaConsumerIngestionWorker implements IngestionWorker, Runnable {
    private final Logger log;
    private final long maxPollMs;
    private final IngestionWorkerManager manager;
    private final Map<TopicPartition, Long> initialOffsets;
    private IngestionWorkerState state;
    private final Consumer<String, String> consumer;
    private final Thread thread;

    /* loaded from: input_file:org/apache/kafka/metadata/ingester/KafkaConsumerIngestionWorker$Factory.class */
    public static class Factory implements IngestionWorkerFactory {
        private final String name;
        private final Map<String, Object> configs;
        private long maxPollMs = 20000;
        private Function<Map<String, Object>, Consumer<String, String>> consumerFactory = map -> {
            return new KafkaConsumer(map, new StringDeserializer(), new StringDeserializer());
        };

        Factory setConsumerFactory(Function<Map<String, Object>, Consumer<String, String>> function) {
            this.consumerFactory = function;
            return this;
        }

        public Factory(String str, Map<String, Object> map) {
            this.name = str;
            this.configs = new HashMap(map);
            this.configs.put("enable.auto.commit", "false");
        }

        @Override // org.apache.kafka.metadata.ingester.IngestionWorkerFactory
        public IngestionWorker create(IngestionWorkerManager ingestionWorkerManager, Map<TopicPartition, Long> map) {
            return new KafkaConsumerIngestionWorker(this, ingestionWorkerManager, map);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/metadata/ingester/KafkaConsumerIngestionWorker$IngestionWorkerState.class */
    public enum IngestionWorkerState {
        UNINITIALIZED,
        OFFSET_OUT_OF_RANGE,
        RUNNING,
        THROTTLED,
        SHUTTING_DOWN
    }

    static String effectiveLogContextPrefix(String str, long j) {
        int lastIndexOf = str.lastIndexOf(93);
        return lastIndexOf < 0 ? str : str.substring(0, lastIndexOf) + ", epoch=" + j + "] ";
    }

    private KafkaConsumerIngestionWorker(Factory factory, IngestionWorkerManager ingestionWorkerManager, Map<TopicPartition, Long> map) {
        this.log = new LogContext(effectiveLogContextPrefix(ingestionWorkerManager.logContext().logPrefix(), ingestionWorkerManager.epoch())).logger(KafkaConsumerIngestionWorker.class);
        this.maxPollMs = factory.maxPollMs;
        this.manager = ingestionWorkerManager;
        synchronized (this) {
            this.initialOffsets = new HashMap(map);
            this.state = IngestionWorkerState.UNINITIALIZED;
            this.consumer = factory.consumerFactory.apply(factory.configs);
            try {
                this.thread = ThreadUtils.createThreadFactory(factory.name + "-kafka-consumer-ingestion-worker-" + ingestionWorkerManager.epoch(), true).newThread(this);
                this.thread.start();
            } catch (Exception e) {
                this.consumer.close();
                throw e;
            }
        }
        this.log.info("CREATED KafkaConsumerIngestionWorker with factory = " + String.valueOf(factory));
    }

    @Override // org.apache.kafka.metadata.ingester.IngestionWorker
    public void beginShutdown() {
        synchronized (this) {
            this.log.info("BEGINSHUTDOWN. state = " + String.valueOf(this.state));
            if (this.state == IngestionWorkerState.SHUTTING_DOWN) {
                return;
            }
            this.state = IngestionWorkerState.SHUTTING_DOWN;
            this.consumer.wakeup();
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        this.log.info("starting thread.");
        Throwable th = null;
        do {
            try {
            } catch (Throwable th2) {
                this.log.error("thread exiting with error", th2);
                th = th2;
            }
        } while (execute());
        try {
            this.consumer.close(Duration.ofMinutes(1L));
        } catch (Throwable th3) {
            this.log.error("Error closing consumer", th3);
        }
        this.manager.handleWorkerShutdownComplete(th);
    }

    private boolean execute() throws Throwable {
        IngestionWorkerState ingestionWorkerState;
        synchronized (this) {
            ingestionWorkerState = this.state;
        }
        switch (ingestionWorkerState) {
            case UNINITIALIZED:
                initialize();
                return true;
            case OFFSET_OUT_OF_RANGE:
                reinitialize();
                return true;
            case RUNNING:
            case THROTTLED:
                poll();
                return true;
            case SHUTTING_DOWN:
                this.log.debug("entered SHUTTING_DOWN state.");
                return false;
            default:
                return true;
        }
    }

    void initialize() {
        this.log.info("initializing offsets: {}.", Ingester.offsetsToString(this.initialOffsets));
        this.consumer.assign(this.initialOffsets.keySet());
        for (Map.Entry<TopicPartition, Long> entry : this.initialOffsets.entrySet()) {
            this.consumer.seek(entry.getKey(), entry.getValue().longValue());
        }
        if (maybeTransition(IngestionWorkerState.RUNNING)) {
            this.log.info("entering RUNNING state.");
        }
    }

    synchronized boolean maybeTransition(IngestionWorkerState ingestionWorkerState) {
        if (this.state == IngestionWorkerState.SHUTTING_DOWN || this.state == ingestionWorkerState) {
            return false;
        }
        this.state = ingestionWorkerState;
        return true;
    }

    void reinitialize() {
        this.log.error("resetting offsets to the beginning for: {}", Ingester.findUniqueTopicNames(this.initialOffsets.keySet()));
        this.consumer.seekToBeginning(this.initialOffsets.keySet());
        if (maybeTransition(IngestionWorkerState.RUNNING)) {
            this.log.info("re-entering RUNNING state.");
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v38, types: [java.util.List] */
    void poll() throws Throwable {
        ArrayList arrayList;
        try {
            if (this.log.isTraceEnabled()) {
                this.log.trace("polling for {} ms", Long.valueOf(this.maxPollMs));
            }
            ConsumerRecords poll = this.consumer.poll(Duration.ofMillis(this.maxPollMs));
            if (this.log.isTraceEnabled()) {
                this.log.trace("got {} records", Integer.valueOf(poll.count()));
            }
            if (poll.isEmpty()) {
                arrayList = Collections.emptyList();
            } else {
                arrayList = new ArrayList(poll.count());
                poll.forEach(consumerRecord -> {
                    arrayList.add(IngesterRecord.fromConsumerRecord(consumerRecord));
                });
            }
            if (this.manager.handleRecords(arrayList)) {
                if (maybeTransition(IngestionWorkerState.RUNNING)) {
                    this.log.debug("re-entering RUNNING state.");
                    this.consumer.resume(this.initialOffsets.keySet());
                    return;
                }
                return;
            }
            if (maybeTransition(IngestionWorkerState.THROTTLED)) {
                this.log.debug("throttling for {} ms.", Long.valueOf(this.maxPollMs));
                this.consumer.pause(this.initialOffsets.keySet());
            }
        } catch (InvalidOffsetException e) {
            this.log.error("Offset out of range error", e);
            maybeTransition(IngestionWorkerState.OFFSET_OUT_OF_RANGE);
        } catch (WakeupException e2) {
            this.log.debug("Received wakeup exception");
        } catch (InterruptException e3) {
            this.log.warn("interrupted", e3);
            Thread.currentThread().interrupt();
            throw new InterruptedException();
        }
    }
}
