package io.confluent.kafka.secretregistry.storage;

import io.confluent.kafka.secretregistry.rest.SecretRegistryConfig;
import io.confluent.kafka.secretregistry.storage.exceptions.SerializationException;
import io.confluent.kafka.secretregistry.storage.exceptions.StoreException;
import io.confluent.kafka.secretregistry.storage.exceptions.StoreTimeoutException;
import io.confluent.kafka.secretregistry.storage.serialization.Serializer;
import io.confluent.kafka.secretregistry.utils.ShutdownableThread;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/kafka/secretregistry/storage/KafkaStoreReaderThread.class */
public class KafkaStoreReaderThread<K, V> extends ShutdownableThread {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) KafkaStoreReaderThread.class);
    private final String topic;
    private final TopicPartition topicPartition;
    private final String groupId;
    private final StoreUpdateHandler<K, V> storeUpdateHandler;
    private final Serializer<K, V> serializer;
    private final Store<K, V> localStore;
    private final ReentrantLock offsetUpdateLock;
    private final Condition offsetReachedThreshold;
    private Consumer<byte[], byte[]> consumer;
    private final Producer<byte[], byte[]> producer;
    private long offsetInSecretsTopic;
    private final K noopKey;
    private Properties consumerProps;

    public KafkaStoreReaderThread(String str, String str2, String str3, StoreUpdateHandler<K, V> storeUpdateHandler, Serializer<K, V> serializer, Store<K, V> store, Producer<byte[], byte[]> producer, K k, SecretRegistryConfig secretRegistryConfig) {
        super("kafka-store-reader-thread-" + str2, false);
        this.offsetInSecretsTopic = -1L;
        this.consumerProps = new Properties();
        this.offsetUpdateLock = new ReentrantLock();
        this.offsetReachedThreshold = this.offsetUpdateLock.newCondition();
        this.topic = str2;
        this.groupId = str3;
        this.storeUpdateHandler = storeUpdateHandler;
        this.serializer = serializer;
        this.localStore = store;
        this.producer = producer;
        this.noopKey = k;
        KafkaStore.addSecretRegistryConfigsToClientProperties(secretRegistryConfig, this.consumerProps);
        this.consumerProps.put("group.id", this.groupId);
        this.consumerProps.put("client.id", "KafkaStore-reader-" + this.topic);
        this.consumerProps.put("bootstrap.servers", str);
        this.consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        this.consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        this.consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
        this.consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
        log.info("Kafka store reader thread starting consumer");
        this.consumer = new KafkaConsumer(this.consumerProps);
        int i = 0;
        List<PartitionInfo> list = null;
        while (true) {
            int i2 = i;
            i++;
            if (i2 >= 10) {
                break;
            }
            list = this.consumer.partitionsFor(this.topic);
            if (list != null && list.size() >= 1) {
                break;
            } else {
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                }
            }
        }
        if (list == null || list.size() < 1) {
            throw new IllegalArgumentException("Unable to subscribe to the Kafka topic " + str2 + " backing this data store. Topic may not exist.");
        }
        if (list.size() > 1) {
            throw new IllegalStateException("Unexpected number of partitions in the " + str2 + " topic. Expected 1 and instead got " + list.size());
        }
        this.topicPartition = new TopicPartition(str2, 0);
        this.consumer.assign(Arrays.asList(this.topicPartition));
        this.consumer.seekToBeginning(Arrays.asList(this.topicPartition));
        log.info("Initialized last consumed offset to " + this.offsetInSecretsTopic);
        log.debug("Kafka store reader thread started");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void readToEnd() {
        Map<TopicPartition, Long> endOffsets = this.consumer.endOffsets(this.consumer.assignment());
        log.debug("Reading to end of offsets {}", endOffsets);
        while (!endOffsets.isEmpty()) {
            Iterator<Map.Entry<TopicPartition, Long>> it = endOffsets.entrySet().iterator();
            while (true) {
                if (it.hasNext()) {
                    Map.Entry<TopicPartition, Long> next = it.next();
                    if (this.consumer.position(next.getKey()) < next.getValue().longValue()) {
                        poll();
                        break;
                    }
                    it.remove();
                }
            }
        }
    }

    @Override // io.confluent.kafka.secretregistry.utils.ShutdownableThread
    public void doWork() {
        poll();
    }

    private void poll() {
        try {
            Iterator<ConsumerRecord<byte[], byte[]>> it = this.consumer.poll(Long.MAX_VALUE).iterator();
            while (it.hasNext()) {
                ConsumerRecord<byte[], byte[]> next = it.next();
                try {
                    K deserializeKey = this.serializer.deserializeKey(next.key());
                    if (deserializeKey.equals(this.noopKey)) {
                        try {
                            this.offsetUpdateLock.lock();
                            this.offsetInSecretsTopic = next.offset();
                            this.offsetReachedThreshold.signalAll();
                            this.offsetUpdateLock.unlock();
                        } catch (Throwable th) {
                            this.offsetUpdateLock.unlock();
                            throw th;
                        }
                    } else {
                        try {
                            V deserializeValue = next.value() == null ? null : this.serializer.deserializeValue(deserializeKey, next.value());
                            try {
                                log.trace("Applying update from record in topic {}, partition {} with offset {} to the local store", next.topic(), Integer.valueOf(next.partition()), Long.valueOf(next.offset()));
                                if (this.storeUpdateHandler.validateUpdate(deserializeKey, deserializeValue)) {
                                    if (deserializeValue == null) {
                                        this.localStore.delete(deserializeKey);
                                    } else {
                                        this.localStore.put(deserializeKey, deserializeValue);
                                    }
                                    this.storeUpdateHandler.handleUpdate(deserializeKey, deserializeValue);
                                } else if (this.localStore.get(deserializeKey) == null) {
                                    try {
                                        this.producer.send(new ProducerRecord<>(this.topic, 0, next.key(), null));
                                        log.debug("Tombstoned invalid key {}", deserializeKey);
                                    } catch (KafkaException e) {
                                        log.warn("Failed to tombstone invalid key {}", deserializeKey, e);
                                    }
                                }
                                try {
                                    this.offsetUpdateLock.lock();
                                    this.offsetInSecretsTopic = next.offset();
                                    this.offsetReachedThreshold.signalAll();
                                    this.offsetUpdateLock.unlock();
                                } catch (Throwable th2) {
                                    this.offsetUpdateLock.unlock();
                                    throw th2;
                                    break;
                                }
                            } catch (StoreException e2) {
                                log.error("Failed to add record from the Kafka topic" + this.topic + " the local store");
                            }
                        } catch (SerializationException e3) {
                            log.error("Failed to deserialize a secret or config update", (Throwable) e3);
                        }
                    }
                } catch (SerializationException e4) {
                    log.error("Failed to deserialize the secret or config key", (Throwable) e4);
                }
            }
        } catch (RecordTooLargeException e5) {
            throw new IllegalStateException("Consumer threw RecordTooLargeException. A secret has been written that exceeds the default maximum fetch size.", e5);
        } catch (WakeupException e6) {
        } catch (RuntimeException e7) {
            log.error("KafkaStoreReader thread has died for an unknown reason.");
            throw e7;
        }
    }

    @Override // io.confluent.kafka.secretregistry.utils.ShutdownableThread
    public void shutdown() {
        try {
            log.debug("Starting shutdown of KafkaStoreReaderThread.");
            super.initiateShutdown();
            if (this.consumer != null) {
                this.consumer.wakeup();
            }
            if (this.localStore != null) {
                this.localStore.close();
            }
            super.awaitShutdown();
            if (this.consumer != null) {
                this.consumer.close();
            }
            log.info("KafkaStoreReaderThread shutdown complete.");
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void waitUntilOffset(long j, long j2, TimeUnit timeUnit) throws StoreException {
        if (j < 0) {
            throw new StoreException("KafkaStoreReaderThread can't wait for a negative offset.");
        }
        log.trace("Waiting to read offset {}. Currently at offset {}", Long.valueOf(j), Long.valueOf(this.offsetInSecretsTopic));
        try {
            this.offsetUpdateLock.lock();
            long convert = TimeUnit.NANOSECONDS.convert(j2, timeUnit);
            while (this.offsetInSecretsTopic < j && convert > 0) {
                try {
                    convert = this.offsetReachedThreshold.awaitNanos(convert);
                } catch (InterruptedException e) {
                    log.debug("Interrupted while waiting for the background store reader thread to reach the specified offset: " + j, (Throwable) e);
                }
            }
            if (this.offsetInSecretsTopic < j) {
                throw new StoreTimeoutException("KafkaStoreReaderThread failed to reach target offset within the timeout interval. targetOffset: " + j + ", offsetReached: " + this.offsetInSecretsTopic + ", timeout(ms): " + TimeUnit.MILLISECONDS.convert(j2, timeUnit));
            }
        } finally {
            this.offsetUpdateLock.unlock();
        }
    }

    public String getConsumerProperty(String str) {
        return this.consumerProps.getProperty(str);
    }
}
