package org.apache.kafka.connect.util;

import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/connect/util/KafkaBasedLog.class */
public class KafkaBasedLog<K, V> {
    private static final Logger log = LoggerFactory.getLogger(KafkaBasedLog.class);
    private static final long MAX_SLEEP_MS = TimeUnit.SECONDS.toMillis(1);
    private static final Duration ADMIN_CLIENT_RETRY_DURATION = Duration.ofMinutes(15);
    private static final long ADMIN_CLIENT_RETRY_BACKOFF_MS = TimeUnit.SECONDS.toMillis(10);
    private Time time;
    private final long createTopicTimeoutNs;
    private final String topic;
    private int partitionCount;
    private final Map<String, Object> producerConfigs;
    private final Map<String, Object> consumerConfigs;
    private final Callback<ConsumerRecord<K, V>> consumedCallback;
    private final Supplier<TopicAdmin> topicAdminSupplier;
    private Consumer<K, V> consumer;
    private Producer<K, V> producer;
    private TopicAdmin admin;
    private Thread thread;
    private boolean stopRequested;
    private Queue<Callback<Void>> readLogEndOffsetCallbacks;
    private java.util.function.Consumer<TopicAdmin> initializer;

    /* loaded from: input_file:org/apache/kafka/connect/util/KafkaBasedLog$WorkThread.class */
    private class WorkThread extends Thread {
        public WorkThread() {
            super("KafkaBasedLog Work Thread - " + KafkaBasedLog.this.topic);
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            int size;
            try {
                KafkaBasedLog.log.trace("{} started execution", this);
            } catch (Throwable th) {
                KafkaBasedLog.log.error("Unexpected exception in {}", this, th);
                return;
            }
            while (true) {
                synchronized (KafkaBasedLog.this) {
                    if (KafkaBasedLog.this.stopRequested) {
                        return;
                    } else {
                        size = KafkaBasedLog.this.readLogEndOffsetCallbacks.size();
                    }
                    KafkaBasedLog.log.error("Unexpected exception in {}", this, th);
                    return;
                }
                if (size > 0) {
                    try {
                        try {
                            KafkaBasedLog.this.readToLogEnd(false);
                            KafkaBasedLog.log.trace("Finished read to end log for topic {}", KafkaBasedLog.this.topic);
                        } catch (RetriableException | org.apache.kafka.connect.errors.RetriableException e) {
                            KafkaBasedLog.log.warn("Retriable error while reading log to end for topic '{}'. Retrying automatically. Reason: {}", KafkaBasedLog.this.topic, e.getMessage());
                        }
                    } catch (TimeoutException e2) {
                        KafkaBasedLog.log.warn("Timeout while reading log to end for topic '{}'. Retrying automatically. This may occur when brokers are unavailable or unreachable. Reason: {}", KafkaBasedLog.this.topic, e2.getMessage());
                    } catch (WakeupException e3) {
                    }
                }
                synchronized (KafkaBasedLog.this) {
                    for (int i = 0; i < size; i++) {
                        ((Callback) KafkaBasedLog.this.readLogEndOffsetCallbacks.poll()).onCompletion(null, null);
                    }
                }
                try {
                    KafkaBasedLog.this.poll(2147483647L);
                } catch (WakeupException e4) {
                }
            }
        }
    }

    @Deprecated
    public KafkaBasedLog(String str, Map<String, Object> map, Map<String, Object> map2, Callback<ConsumerRecord<K, V>> callback, Time time, Runnable runnable, long j) {
        this(str, map, map2, () -> {
            return null;
        }, callback, time, runnable != null ? topicAdmin -> {
            runnable.run();
        } : null, j);
    }

    public KafkaBasedLog(String str, Map<String, Object> map, Map<String, Object> map2, Supplier<TopicAdmin> supplier, Callback<ConsumerRecord<K, V>> callback, Time time, java.util.function.Consumer<TopicAdmin> consumer, long j) {
        this.topic = str;
        this.producerConfigs = map;
        this.consumerConfigs = map2;
        this.topicAdminSupplier = (Supplier) Objects.requireNonNull(supplier);
        this.consumedCallback = callback;
        this.stopRequested = false;
        this.readLogEndOffsetCallbacks = new ArrayDeque();
        this.time = time;
        this.createTopicTimeoutNs = TimeUnit.MILLISECONDS.toNanos(j);
        this.initializer = consumer != null ? consumer : topicAdmin -> {
        };
    }

    public KafkaBasedLog(String str, Map<String, Object> map, Map<String, Object> map2, Callback<ConsumerRecord<K, V>> callback, Time time, Runnable runnable) {
        this(str, map, map2, callback, time, runnable, TimeUnit.SECONDS.toMillis(30L));
    }

    public KafkaBasedLog(String str, Map<String, Object> map, Map<String, Object> map2, Supplier<TopicAdmin> supplier, Callback<ConsumerRecord<K, V>> callback, Time time, java.util.function.Consumer<TopicAdmin> consumer) {
        this(str, map, map2, supplier, callback, time, consumer, TimeUnit.SECONDS.toMillis(30L));
    }

    public void start() {
        log.info("Starting KafkaBasedLog with topic " + this.topic);
        this.admin = this.topicAdminSupplier.get();
        this.initializer.accept(this.admin);
        this.producer = createProducer();
        this.consumer = createConsumer();
        ArrayList arrayList = new ArrayList();
        List<PartitionInfo> partitionsFor = this.consumer.partitionsFor(this.topic);
        long nanoseconds = this.time.nanoseconds();
        long j = 100;
        while (partitionsFor == null && this.time.nanoseconds() - nanoseconds < this.createTopicTimeoutNs) {
            this.time.sleep(j);
            j = Math.min(2 * j, MAX_SLEEP_MS);
            partitionsFor = this.consumer.partitionsFor(this.topic);
        }
        if (partitionsFor == null) {
            throw new ConnectException("Could not look up partition metadata for offset backing store topic in allotted period. This could indicate a connectivity issue, unavailable topic partitions, or if this is your first use of the topic it may have taken too long to create.");
        }
        for (PartitionInfo partitionInfo : partitionsFor) {
            arrayList.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
        }
        this.partitionCount = arrayList.size();
        this.consumer.assign(arrayList);
        this.consumer.seekToBeginning(arrayList);
        readToLogEnd(true);
        this.thread = new WorkThread();
        this.thread.start();
        log.info("Finished reading KafkaBasedLog for topic " + this.topic);
        log.info("Started KafkaBasedLog for topic " + this.topic);
    }

    public void stop() {
        log.info("Stopping KafkaBasedLog for topic " + this.topic);
        synchronized (this) {
            this.stopRequested = true;
        }
        if (this.consumer != null) {
            this.consumer.wakeup();
        }
        try {
            if (this.thread != null) {
                this.thread.join();
            }
            if (this.producer != null) {
                try {
                    this.producer.close();
                } catch (KafkaException e) {
                    log.error("Failed to stop KafkaBasedLog producer", e);
                }
            }
            if (this.consumer != null) {
                try {
                    this.consumer.close();
                } catch (KafkaException e2) {
                    log.error("Failed to stop KafkaBasedLog consumer", e2);
                }
            }
            this.admin = null;
            log.info("Stopped KafkaBasedLog for topic " + this.topic);
        } catch (InterruptedException e3) {
            throw new ConnectException("Failed to stop KafkaBasedLog. Exiting without cleanly shutting down it's producer and consumer.", e3);
        }
    }

    public void readToEnd(Callback<Void> callback) {
        log.trace("Starting read to end log for topic {}", this.topic);
        if (this.producer != null) {
            this.producer.flush();
        }
        synchronized (this) {
            this.readLogEndOffsetCallbacks.add(callback);
        }
        this.consumer.wakeup();
    }

    public void flush() {
        if (this.producer == null) {
            throw new IllegalStateException("Producer accessed but was never created");
        }
        this.producer.flush();
    }

    public Future<Void> readToEnd() {
        FutureCallback futureCallback = new FutureCallback(null);
        readToEnd(futureCallback);
        return futureCallback;
    }

    public void send(K k, V v) {
        send(k, v, null);
    }

    public void send(K k, V v, org.apache.kafka.clients.producer.Callback callback) {
        if (this.producer == null) {
            throw new IllegalStateException("Producer accessed but was never created");
        }
        this.producer.send(new ProducerRecord(this.topic, k, v), callback);
    }

    public int partitionCount() {
        return this.partitionCount;
    }

    public long createTopicTimeoutMs() {
        return TimeUnit.NANOSECONDS.toMillis(this.createTopicTimeoutNs);
    }

    private Producer<K, V> createProducer() {
        if (this.producerConfigs == null) {
            return null;
        }
        this.producerConfigs.put("acks", "all");
        this.producerConfigs.put("max.in.flight.requests.per.connection", 1);
        return new KafkaProducer(this.producerConfigs);
    }

    private Consumer<K, V> createConsumer() {
        this.consumerConfigs.put("auto.offset.reset", "earliest");
        this.consumerConfigs.put("enable.auto.commit", false);
        return new KafkaConsumer(this.consumerConfigs);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void poll(long j) {
        try {
            Iterator it = this.consumer.poll(Duration.ofMillis(j)).iterator();
            while (it.hasNext()) {
                this.consumedCallback.onCompletion(null, (ConsumerRecord) it.next());
            }
        } catch (KafkaException e) {
            log.error("Error polling: " + e);
        } catch (WakeupException e2) {
            throw e2;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void readToLogEnd(boolean z) {
        Map<TopicPartition, Long> readEndOffsets = readEndOffsets(this.consumer.assignment(), z);
        log.trace("Reading to end of log offsets {}", readEndOffsets);
        while (!readEndOffsets.isEmpty()) {
            Iterator<Map.Entry<TopicPartition, Long>> it = readEndOffsets.entrySet().iterator();
            while (true) {
                if (it.hasNext()) {
                    Map.Entry<TopicPartition, Long> next = it.next();
                    TopicPartition key = next.getKey();
                    long longValue = next.getValue().longValue();
                    long position = this.consumer.position(key);
                    if (position < longValue) {
                        log.trace("Behind end offset {} for {}; last-read offset is {}", new Object[]{Long.valueOf(longValue), key, Long.valueOf(position)});
                        poll(2147483647L);
                        break;
                    } else {
                        log.trace("Read to end offset {} for {}", Long.valueOf(longValue), key);
                        it.remove();
                    }
                }
            }
        }
    }

    Map<TopicPartition, Long> readEndOffsets(Set<TopicPartition> set, boolean z) {
        log.trace("Reading to end of offset log");
        if (this.admin != null) {
            try {
                return z ? this.admin.retryEndOffsets(set, ADMIN_CLIENT_RETRY_DURATION, ADMIN_CLIENT_RETRY_BACKOFF_MS) : this.admin.endOffsets(set);
            } catch (UnsupportedVersionException e) {
                log.debug("Reading to end of log offsets with consumer since admin client is unsupported: {}", e.getMessage());
                this.admin = null;
            }
        }
        return this.consumer.endOffsets(set);
    }
}
