package io.kcache;

import io.kcache.KafkaCacheConfig;
import io.kcache.exceptions.CacheException;
import io.kcache.exceptions.CacheInitializationException;
import io.kcache.exceptions.CacheTimeoutException;
import io.kcache.exceptions.EntryTooLargeException;
import io.kcache.utils.InMemoryBoundedCache;
import io.kcache.utils.InMemoryCache;
import io.kcache.utils.OffsetCheckpoint;
import io.kcache.utils.ShutdownableThread;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/kcache/KafkaCache.class */
public class KafkaCache<K, V> implements Cache<K, V> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) KafkaCache.class);
    private String topic;
    private int desiredReplicationFactor;
    private int desiredNumPartitions;
    private List<Integer> partitions;
    private KafkaCacheConfig.Offset offset;
    private String groupId;
    private String clientId;
    private CacheUpdateHandler<K, V> cacheUpdateHandler;
    private Serde<K> keySerde;
    private Serde<V> valueSerde;
    private Cache<K, V> localCache;
    private boolean skipValidation;
    private boolean requireCompact;
    private boolean readOnly;
    private int initTimeout;
    private int timeout;
    private long pollTimeout;
    private String checkpointDir;
    private int checkpointVersion;
    private boolean checkpointBeforeInit;
    private String bootstrapBrokers;
    private Producer<byte[], byte[]> producer;
    private Partitioner partitioner;
    private Consumer<byte[], byte[]> consumer;
    private KafkaCache<K, V>.WorkerThread kafkaTopicReader;
    private KafkaCacheConfig config;
    private OffsetCheckpoint checkpointFile;
    private final AtomicBoolean initialized = new AtomicBoolean(false);
    private final Map<TopicPartition, Long> checkpointFileCache = new HashMap();
    private final Map<Integer, Long> lastWrittenOffsets = new ConcurrentHashMap();
    private final Queue<CompletableFuture<Integer>> syncCallbacks = new ArrayDeque();

    /* loaded from: input_file:io/kcache/KafkaCache$Metadata.class */
    public static class Metadata<V> {
        private final RecordMetadata recordMetadata;
        private final V oldValue;

        public Metadata(RecordMetadata recordMetadata, V v) {
            this.recordMetadata = recordMetadata;
            this.oldValue = v;
        }

        public RecordMetadata getRecordMetadata() {
            return this.recordMetadata;
        }

        public V getOldValue() {
            return this.oldValue;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            Metadata metadata = (Metadata) obj;
            return Objects.equals(this.recordMetadata, metadata.recordMetadata) && Objects.equals(this.oldValue, metadata.oldValue);
        }

        public int hashCode() {
            return Objects.hash(this.recordMetadata, this.oldValue);
        }

        public String toString() {
            return "Metadata{recordMetadata=" + this.recordMetadata + ", oldValue=" + this.oldValue + '}';
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/kcache/KafkaCache$WorkerThread.class */
    public class WorkerThread extends ShutdownableThread {
        private final ReentrantLock offsetUpdateLock;
        private final Condition offsetReachedThreshold;
        private final Map<Integer, Long> lastReadOffsets;

        public WorkerThread() {
            super("kafka-cache-reader-thread-" + KafkaCache.this.topic);
            this.lastReadOffsets = new ConcurrentHashMap();
            this.offsetUpdateLock = new ReentrantLock();
            this.offsetReachedThreshold = this.offsetUpdateLock.newCondition();
            if (KafkaCache.this.partitions.isEmpty()) {
                int i = 0;
                while (true) {
                    int i2 = i;
                    i++;
                    if (i2 >= 10) {
                        break;
                    }
                    List<PartitionInfo> partitionsFor = KafkaCache.this.consumer.partitionsFor(KafkaCache.this.topic, Duration.ofMillis(KafkaCache.this.initTimeout));
                    if (partitionsFor != null && !partitionsFor.isEmpty()) {
                        KafkaCache.this.partitions = (List) partitionsFor.stream().map((v0) -> {
                            return v0.partition();
                        }).collect(Collectors.toList());
                        break;
                    }
                    try {
                        Thread.sleep(1000L);
                    } catch (InterruptedException e) {
                    }
                }
                if (KafkaCache.this.partitions.isEmpty()) {
                    throw new IllegalArgumentException("Unable to subscribe to the Kafka topic " + KafkaCache.this.topic + " backing this data cache. Topic may not exist.");
                }
            }
            List<TopicPartition> list = (List) KafkaCache.this.partitions.stream().peek(num -> {
                this.lastReadOffsets.put(num, -1L);
            }).map(num2 -> {
                return new TopicPartition(KafkaCache.this.topic, num2.intValue());
            }).collect(Collectors.toList());
            KafkaCache.this.consumer.assign(list);
            if (KafkaCache.this.localCache.isPersistent()) {
                for (TopicPartition topicPartition : list) {
                    Long l = (Long) KafkaCache.this.checkpointFileCache.get(topicPartition);
                    if (l != null) {
                        KafkaCache.log.info("Seeking to checkpoint {} for {}", l, topicPartition);
                        KafkaCache.this.consumer.seek(topicPartition, l.longValue());
                    } else {
                        KafkaCache.log.info("Seeking to start for {}", topicPartition);
                        seekToStart(Collections.singleton(topicPartition), Duration.ofMillis(KafkaCache.this.initTimeout));
                    }
                }
            } else {
                KafkaCache.log.info("Seeking to start for all partitions for topic {}", KafkaCache.this.topic);
                seekToStart(list, Duration.ofMillis(KafkaCache.this.initTimeout));
            }
            KafkaCache.log.info("Initialized last read offsets to {}", this.lastReadOffsets);
            KafkaCache.log.info("KafkaTopicReader thread started for {}.", KafkaCache.this.clientId);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public int readToEndOffsets(Duration duration) throws IOException {
            Set<TopicPartition> assignment = KafkaCache.this.consumer.assignment();
            Map<TopicPartition, Long> endOffsets = KafkaCache.this.consumer.endOffsets(assignment, duration);
            KafkaCache.log.info("Reading to end of offsets {}", endOffsets);
            int i = 0;
            while (!hasReadToEndOffsets(endOffsets, duration)) {
                try {
                    i += poll();
                } catch (InvalidOffsetException e) {
                    if (KafkaCache.this.localCache.isPersistent()) {
                        KafkaCache.this.localCache.close();
                        KafkaCache.this.localCache.destroy();
                        KafkaCache.this.localCache.init();
                    }
                    KafkaCache.log.warn("Seeking to start due to invalid offset", (Throwable) e);
                    seekToStart(assignment, duration);
                    i = 0;
                }
            }
            KafkaCache.log.info("During init or sync, processed {} records from topic {}", Integer.valueOf(i), KafkaCache.this.topic);
            return i;
        }

        private void seekToStart(Collection<TopicPartition> collection, Duration duration) {
            switch (KafkaCache.this.offset.getOffsetType()) {
                case BEGINNING:
                    KafkaCache.this.consumer.seekToBeginning(collection);
                    return;
                case END:
                    KafkaCache.this.consumer.seekToEnd(collection);
                    return;
                case ABSOLUTE:
                    Iterator<TopicPartition> it = collection.iterator();
                    while (it.hasNext()) {
                        KafkaCache.this.consumer.seek(it.next(), KafkaCache.this.offset.getOffset());
                    }
                    return;
                case RELATIVE:
                    Map<TopicPartition, Long> endOffsets = KafkaCache.this.consumer.endOffsets(collection, duration);
                    for (TopicPartition topicPartition : collection) {
                        KafkaCache.this.consumer.seek(topicPartition, Math.max(endOffsets.get(topicPartition).longValue() - KafkaCache.this.offset.getOffset(), 0L));
                    }
                    return;
                case TIMESTAMP:
                    Map<TopicPartition, OffsetAndTimestamp> map = null;
                    try {
                        map = KafkaCache.this.consumer.offsetsForTimes((Map) collection.stream().collect(Collectors.toMap(topicPartition2 -> {
                            return topicPartition2;
                        }, topicPartition3 -> {
                            return Long.valueOf(KafkaCache.this.offset.getOffset());
                        })), duration);
                    } catch (KafkaException e) {
                        KafkaCache.log.warn("Could not fetch offset times for topic {}", KafkaCache.this.topic, e);
                    }
                    for (TopicPartition topicPartition4 : collection) {
                        if (map == null || map.get(topicPartition4) == null) {
                            KafkaCache.this.consumer.seekToBeginning(Collections.singleton(topicPartition4));
                            KafkaCache.log.warn("Could not find offset time for topic {}, partition {}, ts {}, seeking to beginning", topicPartition4.topic(), Integer.valueOf(topicPartition4.partition()), Long.valueOf(KafkaCache.this.offset.getOffset()));
                        } else {
                            KafkaCache.this.consumer.seek(topicPartition4, map.get(topicPartition4).offset());
                        }
                    }
                    return;
                default:
                    return;
            }
        }

        private boolean hasReadToEndOffsets(Map<TopicPartition, Long> map, Duration duration) {
            map.entrySet().removeIf(entry -> {
                return KafkaCache.this.consumer.position((TopicPartition) entry.getKey(), duration) >= ((Long) entry.getValue()).longValue();
            });
            return map.isEmpty();
        }

        @Override // io.kcache.utils.ShutdownableThread
        protected void doWork() {
            int size;
            synchronized (KafkaCache.this) {
                size = KafkaCache.this.syncCallbacks.size();
            }
            int i = 0;
            if (size > 0) {
                try {
                    i = readToEndOffsets(Duration.ofMillis(KafkaCache.this.timeout));
                } catch (IOException | KafkaException e) {
                    KafkaCache.log.error("Error while reading to end offsets", e);
                    return;
                }
            }
            synchronized (KafkaCache.this) {
                for (int i2 = 0; i2 < size; i2++) {
                    CompletableFuture completableFuture = (CompletableFuture) KafkaCache.this.syncCallbacks.poll();
                    if (completableFuture != null) {
                        completableFuture.complete(Integer.valueOf(i));
                    }
                }
            }
            poll();
        }

        /* JADX WARN: Can't fix incorrect switch cases order, some code will duplicate */
        /* JADX WARN: Finally extract failed */
        /* JADX WARN: Multi-variable type inference failed */
        private int poll() {
            ConsumerRecord<K, V> next;
            Object deserialize;
            int i = 0;
            try {
                ConsumerRecords<K, V> poll = KafkaCache.this.consumer.poll(Duration.ofMillis(KafkaCache.this.pollTimeout));
                i = poll.count();
                KafkaCache.this.cacheUpdateHandler.startBatch(i);
                Iterator<ConsumerRecord<K, V>> it = poll.iterator();
                while (it.hasNext()) {
                    next = it.next();
                    try {
                        try {
                            try {
                                deserialize = KafkaCache.this.keySerde.deserializer().deserialize(KafkaCache.this.topic, next.headers(), (byte[]) next.key());
                            } catch (Throwable th) {
                                updateOffset(next.partition(), next.offset());
                                throw th;
                            }
                        } catch (Exception e) {
                            KafkaCache.log.error("Failed to deserialize the key", (Throwable) e);
                            updateOffset(next.partition(), next.offset());
                        }
                    } catch (Exception e2) {
                        KafkaCache.log.error("Failed to add record from the Kafka topic " + KafkaCache.this.topic + " to the local cache", (Throwable) e2);
                        updateOffset(next.partition(), next.offset());
                    }
                    try {
                        Object deserialize2 = next.value() == null ? null : KafkaCache.this.valueSerde.deserializer().deserialize(KafkaCache.this.topic, next.headers(), (byte[]) next.value());
                        Headers headers = next.headers();
                        TopicPartition topicPartition = new TopicPartition(next.topic(), next.partition());
                        long offset = next.offset();
                        long timestamp = next.timestamp();
                        TimestampType timestampType = next.timestampType();
                        Optional<Integer> leaderEpoch = next.leaderEpoch();
                        Object obj = null;
                        switch (KafkaCache.this.cacheUpdateHandler.validateUpdate(headers, deserialize, deserialize2, topicPartition, offset, timestamp, timestampType, leaderEpoch)) {
                            case SUCCESS:
                                if (deserialize != null) {
                                    KafkaCache.log.trace("Applying update ({}, {}) to the local cache", deserialize, deserialize2);
                                    obj = deserialize2 == null ? KafkaCache.this.localCache.remove(deserialize) : KafkaCache.this.localCache.put(deserialize, deserialize2);
                                }
                                KafkaCache.this.cacheUpdateHandler.handleUpdate(headers, deserialize, deserialize2, obj, topicPartition, offset, timestamp, timestampType, leaderEpoch);
                                break;
                            case ROLLBACK_FAILURE:
                                if (!KafkaCache.this.readOnly && deserialize != null) {
                                    Object obj2 = KafkaCache.this.localCache.get(deserialize);
                                    if (!Objects.equals(deserialize2, obj2)) {
                                        try {
                                            KafkaCache.this.producer.send(new ProducerRecord<>(next.topic(), Integer.valueOf(next.partition()), (byte[]) next.key(), obj2 == null ? null : KafkaCache.this.valueSerde.serializer().serialize(KafkaCache.this.topic, headers, obj2), headers));
                                            KafkaCache.log.warn("Rollback invalid update to key {}", deserialize);
                                            break;
                                        } catch (KafkaException e3) {
                                            KafkaCache.log.error("Failed to rollback invalid update to key {}", deserialize, e3);
                                            break;
                                        }
                                    }
                                } else {
                                    KafkaCache.log.warn("Ignore invalid update to key {}", deserialize);
                                    break;
                                }
                                break;
                            case IGNORE_FAILURE:
                                KafkaCache.log.warn("Ignore invalid update to key {}", deserialize);
                                break;
                        }
                    } catch (Exception e4) {
                        KafkaCache.log.error("Failed to deserialize a value", (Throwable) e4);
                        updateOffset(next.partition(), next.offset());
                    }
                }
                if (KafkaCache.this.localCache.isPersistent() && (KafkaCache.this.checkpointBeforeInit || KafkaCache.this.initialized.get())) {
                    try {
                        KafkaCache.this.localCache.flush();
                        checkpointOffsets(KafkaCache.this.cacheUpdateHandler.checkpoint(i));
                    } catch (CacheException e5) {
                        KafkaCache.log.warn("Failed to flush", (Throwable) e5);
                    }
                }
                KafkaCache.this.cacheUpdateHandler.endBatch(i);
            } catch (Throwable th2) {
                KafkaCache.this.cacheUpdateHandler.failBatch(i, th2);
                if (!(th2 instanceof WakeupException)) {
                    if (!(th2 instanceof KafkaException)) {
                        KafkaCache.log.error("KafkaTopicReader thread for {} has died for an unknown reason.", KafkaCache.this.clientId, th2);
                        throw th2;
                    }
                    KafkaCache.log.error("Error while polling", th2);
                }
            }
            return i;
            updateOffset(next.partition(), next.offset());
        }

        private void updateOffset(int i, long j) {
            try {
                this.offsetUpdateLock.lock();
                this.lastReadOffsets.put(Integer.valueOf(i), Long.valueOf(j));
                this.offsetReachedThreshold.signalAll();
                this.offsetUpdateLock.unlock();
            } catch (Throwable th) {
                this.offsetUpdateLock.unlock();
                throw th;
            }
        }

        private void checkpointOffsets(Map<TopicPartition, Long> map) {
            KafkaCache.this.checkpointFileCache.putAll(map != null ? map : (Map) this.lastReadOffsets.entrySet().stream().collect(Collectors.toMap(entry -> {
                return new TopicPartition(KafkaCache.this.topic, ((Integer) entry.getKey()).intValue());
            }, entry2 -> {
                return Long.valueOf(((Long) entry2.getValue()).longValue() + 1);
            })));
            try {
                KafkaCache.this.checkpointFile.write(KafkaCache.this.checkpointFileCache);
            } catch (IOException e) {
                KafkaCache.log.warn("Failed to write offset checkpoint file to {}: {}", KafkaCache.this.checkpointFile, e);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void waitUntilOffset(int i, long j, Duration duration) throws CacheException {
            if (j < 0) {
                throw new CacheException("KafkaTopicReader thread can't wait for a negative offset.");
            }
            KafkaCache.log.trace("Waiting to read offset {}. Currently at offset {}", Long.valueOf(j), this.lastReadOffsets.get(Integer.valueOf(i)));
            try {
                this.offsetUpdateLock.lock();
                long nanos = duration.toNanos();
                while (this.lastReadOffsets.get(Integer.valueOf(i)).longValue() < j && nanos > 0) {
                    try {
                        nanos = this.offsetReachedThreshold.awaitNanos(nanos);
                    } catch (InterruptedException e) {
                        KafkaCache.log.debug("Interrupted while waiting for the background cache reader thread to reach the specified offset: " + j, (Throwable) e);
                    }
                }
                if (this.lastReadOffsets.get(Integer.valueOf(i)).longValue() < j) {
                    throw new CacheTimeoutException("KafkaCacheTopic thread failed to reach target offset within the timeout interval. targetOffset: " + j + ", offsetReached: " + this.lastReadOffsets.get(Integer.valueOf(i)) + ", timeout(ms): " + duration.toMillis());
                }
            } finally {
                this.offsetUpdateLock.unlock();
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public int waitUntilLastWrittenOffsets(Duration duration) throws CacheException {
            HashMap hashMap = new HashMap(KafkaCache.this.lastWrittenOffsets);
            if (!hasValidLastWrittenOffsets(hashMap)) {
                return -1;
            }
            int countUnreadOffsets = countUnreadOffsets(hashMap);
            if (countUnreadOffsets == 0) {
                return countUnreadOffsets;
            }
            try {
                this.offsetUpdateLock.lock();
                long nanos = duration.toNanos();
                while (countUnreadOffsets(hashMap) > 0 && nanos > 0) {
                    try {
                        nanos = this.offsetReachedThreshold.awaitNanos(nanos);
                    } catch (InterruptedException e) {
                        KafkaCache.log.debug("Interrupted while waiting for the background cache reader thread to reach the end offsets", (Throwable) e);
                    }
                }
                if (countUnreadOffsets(hashMap) == 0) {
                    return countUnreadOffsets;
                }
                KafkaCache.log.warn("Could not read to last written offsets {}", hashMap);
                return -1;
            } finally {
                this.offsetUpdateLock.unlock();
            }
        }

        private boolean hasValidLastWrittenOffsets(Map<Integer, Long> map) {
            return map.keySet().containsAll(KafkaCache.this.partitions);
        }

        private int countUnreadOffsets(Map<Integer, Long> map) {
            return ((Integer) map.entrySet().stream().map(entry -> {
                return Integer.valueOf(Math.max((int) (((Long) entry.getValue()).longValue() - this.lastReadOffsets.getOrDefault(Integer.valueOf(((Integer) entry.getKey()).intValue()), -1L).longValue()), 0));
            }).reduce(0, (v0, v1) -> {
                return Integer.sum(v0, v1);
            })).intValue();
        }

        @Override // io.kcache.utils.ShutdownableThread
        public void shutdown() throws InterruptedException {
            KafkaCache.log.debug("Starting shutdown of KafkaTopicReader thread for {}.", KafkaCache.this.clientId);
            super.initiateShutdown();
            if (KafkaCache.this.consumer != null) {
                KafkaCache.this.consumer.wakeup();
            }
            super.awaitShutdown();
            Utils.closeQuietly(KafkaCache.this.consumer, "Kafka cache consumer for " + KafkaCache.this.clientId);
            KafkaCache.log.info("KafkaTopicReader thread shutdown complete for {}.", KafkaCache.this.clientId);
        }
    }

    public KafkaCache(String str, Serde<K> serde, Serde<V> serde2) {
        Properties properties = new Properties();
        properties.put(KafkaCacheConfig.KAFKACACHE_BOOTSTRAP_SERVERS_CONFIG, str);
        setUp(new KafkaCacheConfig(properties), serde, serde2, null, null, null, null);
    }

    public KafkaCache(KafkaCacheConfig kafkaCacheConfig, Serde<K> serde, Serde<V> serde2) {
        setUp(kafkaCacheConfig, serde, serde2, null, null, null, null);
    }

    public KafkaCache(KafkaCacheConfig kafkaCacheConfig, Serde<K> serde, Serde<V> serde2, CacheUpdateHandler<K, V> cacheUpdateHandler, Cache<K, V> cache) {
        setUp(kafkaCacheConfig, serde, serde2, cacheUpdateHandler, null, null, cache);
    }

    public KafkaCache(KafkaCacheConfig kafkaCacheConfig, Serde<K> serde, Serde<V> serde2, CacheUpdateHandler<K, V> cacheUpdateHandler, String str, Comparator<K> comparator) {
        setUp(kafkaCacheConfig, serde, serde2, cacheUpdateHandler, str, comparator, null);
    }

    private void setUp(KafkaCacheConfig kafkaCacheConfig, Serde<K> serde, Serde<V> serde2, CacheUpdateHandler<K, V> cacheUpdateHandler, String str, Comparator<K> comparator, Cache<K, V> cache) {
        this.config = kafkaCacheConfig;
        this.topic = kafkaCacheConfig.getString(KafkaCacheConfig.KAFKACACHE_TOPIC_CONFIG);
        this.desiredReplicationFactor = kafkaCacheConfig.getInt(KafkaCacheConfig.KAFKACACHE_TOPIC_REPLICATION_FACTOR_CONFIG).intValue();
        this.desiredNumPartitions = kafkaCacheConfig.getInt(KafkaCacheConfig.KAFKACACHE_TOPIC_NUM_PARTITIONS_CONFIG).intValue();
        this.partitions = kafkaCacheConfig.partitions();
        this.offset = kafkaCacheConfig.offset();
        this.groupId = kafkaCacheConfig.getString(KafkaCacheConfig.KAFKACACHE_GROUP_ID_CONFIG);
        this.clientId = kafkaCacheConfig.getString(KafkaCacheConfig.KAFKACACHE_CLIENT_ID_CONFIG);
        if (this.clientId == null) {
            this.clientId = "kafka-cache-reader-" + this.topic;
        }
        this.skipValidation = kafkaCacheConfig.getBoolean(KafkaCacheConfig.KAFKACACHE_TOPIC_SKIP_VALIDATION_CONFIG).booleanValue();
        this.requireCompact = kafkaCacheConfig.getBoolean(KafkaCacheConfig.KAFKACACHE_TOPIC_REQUIRE_COMPACT_CONFIG).booleanValue();
        this.readOnly = kafkaCacheConfig.getBoolean(KafkaCacheConfig.KAFKACACHE_TOPIC_READ_ONLY_CONFIG).booleanValue();
        this.initTimeout = kafkaCacheConfig.getInt(KafkaCacheConfig.KAFKACACHE_INIT_TIMEOUT_CONFIG).intValue();
        this.timeout = kafkaCacheConfig.getInt(KafkaCacheConfig.KAFKACACHE_TIMEOUT_CONFIG).intValue();
        this.pollTimeout = kafkaCacheConfig.getLong(KafkaCacheConfig.KAFKACACHE_POLL_TIMEOUT_CONFIG).longValue();
        this.checkpointDir = kafkaCacheConfig.getString(KafkaCacheConfig.KAFKACACHE_CHECKPOINT_DIR_CONFIG);
        this.checkpointVersion = kafkaCacheConfig.getInt(KafkaCacheConfig.KAFKACACHE_CHECKPOINT_VERSION_CONFIG).intValue();
        this.checkpointBeforeInit = kafkaCacheConfig.getBoolean(KafkaCacheConfig.KAFKACACHE_CHECKPOINT_BEFORE_INIT_CONFIG).booleanValue();
        this.cacheUpdateHandler = cacheUpdateHandler != null ? cacheUpdateHandler : (obj, obj2, obj3, topicPartition, j, j2) -> {
        };
        this.keySerde = serde;
        this.valueSerde = serde2;
        this.localCache = cache != null ? cache : createLocalCache(str, comparator);
        this.bootstrapBrokers = kafkaCacheConfig.bootstrapBrokers();
        log.info("Initializing Kafka cache {} with broker endpoints {}", this.clientId, this.bootstrapBrokers);
    }

    private Cache<K, V> createLocalCache(String str, Comparator<K> comparator) {
        if (str == null) {
            str = "default";
        }
        try {
            CacheType cacheType = CacheType.get(this.config.getString(KafkaCacheConfig.KAFKACACHE_BACKING_CACHE_CONFIG));
            int intValue = this.config.getInt(KafkaCacheConfig.KAFKACACHE_BOUNDED_CACHE_SIZE_CONFIG).intValue();
            int intValue2 = this.config.getInt(KafkaCacheConfig.KAFKACACHE_BOUNDED_CACHE_EXPIRY_SECS_CONFIG).intValue();
            String str2 = null;
            boolean z = false;
            switch (cacheType) {
                case MEMORY:
                    return (intValue >= 0 || intValue2 >= 0) ? new InMemoryBoundedCache(Integer.valueOf(intValue), Duration.ofSeconds(intValue2), null, comparator) : new InMemoryCache(comparator);
                case BDBJE:
                    str2 = "io.kcache.bdbje.BdbJECache";
                    z = true;
                    break;
                case CAFFEINE:
                    str2 = "io.kcache.caffeine.CaffeineCache";
                    break;
                case LMDB:
                    str2 = "io.kcache.lmdb.LmdbCache";
                    z = true;
                    break;
                case MAPDB:
                    str2 = "io.kcache.mapdb.MapDBCache";
                    z = true;
                    break;
                case RDBMS:
                    str2 = "io.kcache.rdbms.RdbmsCache";
                    z = true;
                    break;
                case ROCKSDB:
                    str2 = "io.kcache.rocksdb.RocksDBCache";
                    z = true;
                    break;
            }
            Class<?> cls = Class.forName(str2);
            Cache<K, V> cache = z ? (Cache) cls.getConstructor(String.class, String.class, Serde.class, Serde.class, Comparator.class).newInstance(str, this.config.getString(KafkaCacheConfig.KAFKACACHE_DATA_DIR_CONFIG), this.keySerde, this.valueSerde, comparator) : (Cache) cls.getConstructor(Integer.class, Duration.class, CacheLoader.class, Comparator.class).newInstance(Integer.valueOf(intValue), Duration.ofSeconds(intValue2), null, comparator);
            cache.configure(this.config.originalsWithPrefix("kafkacache.backing.cache." + cacheType + "."));
            return cache;
        } catch (Exception e) {
            throw new CacheInitializationException("Could not create backing cache", e);
        }
    }

    @Override // java.util.SortedMap
    public Comparator<? super K> comparator() {
        return this.localCache.comparator();
    }

    @Override // io.kcache.Cache
    public boolean isPersistent() {
        return this.localCache.isPersistent();
    }

    @Override // io.kcache.Cache
    public void init() throws CacheInitializationException {
        if (this.initialized.get()) {
            throw new CacheInitializationException("Illegal state while initializing cache for " + this.clientId + ". Cache was already initialized");
        }
        if (this.localCache.isPersistent()) {
            try {
                this.checkpointFile = new OffsetCheckpoint(this.checkpointDir, this.checkpointVersion, this.topic);
                this.checkpointFileCache.putAll(this.checkpointFile.read());
                log.info("Successfully read checkpoints");
            } catch (IOException e) {
                throw new CacheInitializationException("Failed to read checkpoints", e);
            }
        }
        this.localCache.init();
        if (!this.skipValidation) {
            createOrVerifyTopic();
        }
        this.consumer = new KafkaConsumer(getConsumerProperties());
        if (!this.readOnly) {
            Properties producerProperties = getProducerProperties();
            this.producer = new KafkaProducer(producerProperties);
            this.partitioner = (Partitioner) new ProducerConfig(producerProperties).getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class, Collections.singletonMap("client.id", this.clientId));
        }
        this.kafkaTopicReader = new WorkerThread();
        try {
            int readToEndOffsets = this.kafkaTopicReader.readToEndOffsets(Duration.ofMillis(this.initTimeout));
            this.kafkaTopicReader.start();
            if (!this.initialized.compareAndSet(false, true)) {
                throw new CacheInitializationException("Illegal state while initializing cache for " + this.clientId + ". Cache was already initialized");
            }
            this.cacheUpdateHandler.cacheInitialized(readToEndOffsets, new HashMap(this.checkpointFileCache));
        } catch (IOException e2) {
            throw new CacheInitializationException("Failed to read to end offsets", e2);
        }
    }

    @Override // io.kcache.Cache
    public void reset() {
        this.lastWrittenOffsets.clear();
        this.localCache.reset();
        this.cacheUpdateHandler.cacheReset();
    }

    @Override // io.kcache.Cache
    public void sync() {
        int i = -1;
        if (this.kafkaTopicReader != null) {
            i = this.kafkaTopicReader.waitUntilLastWrittenOffsets(Duration.ofMillis(this.timeout));
        }
        if (i < 0) {
            i = waitUntilEndOffsets();
        }
        this.localCache.sync();
        this.cacheUpdateHandler.cacheSynchronized(i, new HashMap(this.checkpointFileCache));
    }

    private int waitUntilEndOffsets() throws CacheException {
        CompletableFuture<Integer> completableFuture = new CompletableFuture<>();
        synchronized (this) {
            this.syncCallbacks.add(completableFuture);
        }
        if (this.consumer != null) {
            this.consumer.wakeup();
        }
        int i = 0;
        try {
            i = completableFuture.get().intValue();
        } catch (InterruptedException | ExecutionException e) {
            log.warn("Failed to read to end offsets", e);
        }
        return i;
    }

    private Properties getConsumerProperties() {
        Properties properties = new Properties();
        addKafkaCacheConfigsToClientProperties(properties);
        properties.put("group.id", this.groupId);
        properties.put("client.id", this.clientId);
        properties.put("bootstrap.servers", this.bootstrapBrokers);
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
        return properties;
    }

    private Properties getProducerProperties() {
        Properties properties = new Properties();
        addKafkaCacheConfigsToClientProperties(properties);
        properties.put("bootstrap.servers", this.bootstrapBrokers);
        properties.put(ProducerConfig.ACKS_CONFIG, "-1");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
        properties.put("retries", 0);
        properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false);
        return properties;
    }

    private void addKafkaCacheConfigsToClientProperties(Properties properties) {
        properties.putAll(this.config.originalsWithPrefix("kafkacache."));
    }

    private void createOrVerifyTopic() throws CacheInitializationException {
        Properties properties = new Properties();
        addKafkaCacheConfigsToClientProperties(properties);
        properties.put("bootstrap.servers", this.bootstrapBrokers);
        try {
            AdminClient create = AdminClient.create(properties);
            try {
                if (create.listTopics().names().get(this.initTimeout, TimeUnit.MILLISECONDS).contains(this.topic)) {
                    verifyTopic(create);
                } else {
                    if (this.readOnly) {
                        throw new CacheInitializationException("Topic does not exist " + this.topic + " and cache is configured read-only");
                    }
                    createTopic(create);
                }
                if (create != null) {
                    create.close();
                }
            } catch (Throwable th) {
                if (create != null) {
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (InterruptedException | ExecutionException e) {
            throw new CacheInitializationException("Failed trying to create or validate topic " + this.topic, e);
        } catch (TimeoutException e2) {
            throw new CacheInitializationException("Timed out trying to create or validate topic " + this.topic, e2);
        }
    }

    private void createTopic(AdminClient adminClient) throws CacheInitializationException, InterruptedException, ExecutionException, TimeoutException {
        log.info("Creating topic {}", this.topic);
        int size = adminClient.describeCluster().nodes().get(this.initTimeout, TimeUnit.MILLISECONDS).size();
        if (size <= 0) {
            throw new CacheInitializationException("No live Kafka brokers");
        }
        int min = Math.min(size, this.desiredReplicationFactor);
        if (min < this.desiredReplicationFactor) {
            log.warn("Creating the topic " + this.topic + " using a replication factor of " + min + ", which is less than the desired one of " + this.desiredReplicationFactor + ". If this is a production environment, it's crucial to add more brokers and increase the replication factor of the topic.");
        }
        NewTopic newTopic = new NewTopic(this.topic, this.desiredNumPartitions, (short) min);
        Map<String, String> map = (Map) this.config.originalsWithPrefix("kafkacache.topic.config.").entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return entry.getValue().toString();
        }));
        map.put("cleanup.policy", "compact");
        newTopic.configs(map);
        try {
            adminClient.createTopics(Collections.singleton(newTopic)).all().get(this.initTimeout, TimeUnit.MILLISECONDS);
        } catch (ExecutionException e) {
            if (!(e.getCause() instanceof TopicExistsException)) {
                throw e;
            }
            verifyTopic(adminClient);
        }
    }

    private void verifyTopic(AdminClient adminClient) throws CacheInitializationException, InterruptedException, ExecutionException, TimeoutException {
        log.info("Validating topic {}", this.topic);
        try {
            TopicDescription topicDescription = adminClient.describeTopics(Collections.singleton(this.topic)).allTopicNames().get(this.initTimeout, TimeUnit.MILLISECONDS).get(this.topic);
            if (topicDescription.partitions().size() < this.desiredNumPartitions) {
                log.warn("The number of partitions for the topic " + this.topic + " is less than the desired value of " + this.desiredReplicationFactor + ".");
            }
            if (topicDescription.partitions().get(0).replicas().size() < this.desiredReplicationFactor) {
                log.warn("The replication factor of the topic " + this.topic + " is less than the desired one of " + this.desiredReplicationFactor + ". If this is a production environment, it's crucial to add more brokers and increase the replication factor of the topic.");
            }
            ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, this.topic);
            String value = adminClient.describeConfigs(Collections.singleton(configResource)).all().get(this.initTimeout, TimeUnit.MILLISECONDS).get(configResource).get("cleanup.policy").value();
            if ("compact".equals(value)) {
                return;
            }
            String str = "The retention policy of the topic " + this.topic + " is not 'compact'. You must configure the topic to 'compact' cleanup policy to avoid Kafka deleting your data after a week. Refer to Kafka documentation for more details on cleanup policies.";
            if (this.requireCompact) {
                log.error(str);
                throw new CacheInitializationException("The retention policy of the topic " + this.topic + " is incorrect. Expected cleanup.policy to be 'compact' but it is " + value);
            }
            log.warn(str);
        } catch (ExecutionException e) {
            if (!(e.getCause() instanceof UnknownTopicOrPartitionException)) {
                throw e;
            }
            log.warn("Could not validate existing topic {}", this.topic);
        }
    }

    @Override // java.util.Map
    public int size() {
        assertInitialized();
        return this.localCache.size();
    }

    @Override // java.util.Map
    public boolean isEmpty() {
        assertInitialized();
        return this.localCache.isEmpty();
    }

    @Override // java.util.Map
    public boolean containsKey(Object obj) {
        assertInitialized();
        return this.localCache.containsKey(obj);
    }

    @Override // java.util.Map
    public boolean containsValue(Object obj) {
        assertInitialized();
        return this.localCache.containsValue(obj);
    }

    @Override // java.util.Map
    public V get(Object obj) {
        assertInitialized();
        return (V) this.localCache.get(obj);
    }

    @Override // java.util.Map
    public V put(K k, V v) {
        return put(null, k, v).getOldValue();
    }

    public Metadata<V> put(Headers headers, K k, V v) {
        return put(headers, k, v, false);
    }

    public Metadata<V> put(Headers headers, K k, V v, boolean z) {
        if (this.readOnly) {
            throw new CacheException("Cache is read-only");
        }
        assertInitialized();
        return new Metadata<>(doPut(() -> {
            ProducerRecord<byte[], byte[]> record = toRecord(headers, k, v);
            log.trace("Sending record to Kafka cache topic: {}", record);
            Future<RecordMetadata> send = this.producer.send(record);
            if (z) {
                this.producer.flush();
            }
            return send;
        }), k != null ? get(k) : null);
    }

    @Override // java.util.Map
    public void putAll(Map<? extends K, ? extends V> map) {
        putAll(null, map);
    }

    public RecordMetadata putAll(Headers headers, Map<? extends K, ? extends V> map) {
        return putAll(headers, map, true);
    }

    public RecordMetadata putAll(Headers headers, Map<? extends K, ? extends V> map, boolean z) {
        if (this.readOnly) {
            throw new CacheException("Cache is read-only");
        }
        assertInitialized();
        if (map.isEmpty()) {
            return null;
        }
        return doPut(() -> {
            Future<RecordMetadata> future = null;
            for (Map.Entry<K, V> entry : map.entrySet()) {
                ProducerRecord<byte[], byte[]> record = toRecord(headers, entry.getKey(), entry.getValue());
                log.trace("Sending record to Kafka cache topic: {}", record);
                future = this.producer.send(record);
            }
            if (z) {
                this.producer.flush();
            }
            return future;
        });
    }

    private RecordMetadata doPut(Supplier<Future<RecordMetadata>> supplier) {
        Integer num = null;
        Long l = null;
        boolean z = false;
        try {
            try {
                try {
                    try {
                        try {
                            RecordMetadata recordMetadata = supplier.get().get(this.timeout, TimeUnit.MILLISECONDS);
                            log.trace("Waiting for the local cache to catch up to offset {}", Long.valueOf(recordMetadata.offset()));
                            num = Integer.valueOf(recordMetadata.partition());
                            long offset = recordMetadata.offset();
                            l = this.lastWrittenOffsets.put(num, Long.valueOf(offset));
                            this.kafkaTopicReader.waitUntilOffset(num.intValue(), offset, Duration.ofMillis(this.timeout));
                            z = true;
                            if (1 == 0 && num != null) {
                                if (l != null) {
                                    this.lastWrittenOffsets.put(num, l);
                                } else {
                                    this.lastWrittenOffsets.remove(num);
                                }
                            }
                            return recordMetadata;
                        } catch (TimeoutException e) {
                            throw new CacheTimeoutException("Put operation timed out while waiting for an ack from Kafka", e);
                        }
                    } catch (ExecutionException e2) {
                        if (e2.getCause() instanceof RecordTooLargeException) {
                            throw new EntryTooLargeException("Put operation failed because entry is too large");
                        }
                        throw new CacheException("Put operation failed while waiting for an ack from Kafka", e2);
                    }
                } catch (InterruptedException e3) {
                    throw new CacheException("Put operation interrupted while waiting for an ack from Kafka", e3);
                }
            } catch (KafkaException e4) {
                throw new CacheException("Put operation to Kafka failed", e4);
            }
        } catch (Throwable th) {
            if (!z && num != null) {
                if (l != null) {
                    this.lastWrittenOffsets.put(num, l);
                } else {
                    this.lastWrittenOffsets.remove(num);
                }
            }
            throw th;
        }
    }

    private ProducerRecord<byte[], byte[]> toRecord(Headers headers, K k, V v) {
        byte[] serialize;
        if (k == null) {
            serialize = null;
        } else {
            try {
                serialize = this.keySerde.serializer().serialize(this.topic, headers, k);
            } catch (Exception e) {
                throw new CacheException("Error serializing key while creating the Kafka produce record", e);
            }
        }
        byte[] bArr = serialize;
        byte[] serialize2 = v == null ? null : this.valueSerde.serializer().serialize(this.topic, headers, v);
        return new ProducerRecord<>(this.topic, partition(this.topic, k, bArr, v, serialize2), bArr, serialize2, headers);
    }

    private Integer partition(String str, Object obj, byte[] bArr, Object obj2, byte[] bArr2) {
        if (this.partitioner == null || (this.partitioner instanceof DefaultPartitioner)) {
            return null;
        }
        try {
            int partition = this.partitioner.partition(str, obj, bArr, obj2, bArr2, null);
            if (partition < 0) {
                throw new IllegalArgumentException(String.format("The partitioner generated an invalid partition number: %d. Partition number should always be non-negative.", Integer.valueOf(partition)));
            }
            return Integer.valueOf(partition);
        } catch (Exception e) {
            log.warn("Could not invoke partitioner", (Throwable) e);
            return null;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // java.util.Map
    public V remove(Object obj) {
        return put(obj, null);
    }

    @Override // java.util.Map
    public void clear() {
        throw new UnsupportedOperationException();
    }

    @Override // java.util.SortedMap, java.util.Map
    public Set<K> keySet() {
        assertInitialized();
        return this.readOnly ? Collections.unmodifiableSet(this.localCache.keySet()) : this.localCache.keySet();
    }

    @Override // java.util.SortedMap, java.util.Map
    public Collection<V> values() {
        assertInitialized();
        return this.readOnly ? Collections.unmodifiableCollection(this.localCache.values()) : this.localCache.values();
    }

    @Override // java.util.SortedMap, java.util.Map
    public Set<Map.Entry<K, V>> entrySet() {
        assertInitialized();
        return this.readOnly ? Collections.unmodifiableSet(this.localCache.entrySet()) : this.localCache.entrySet();
    }

    @Override // java.util.SortedMap
    public K firstKey() {
        assertInitialized();
        return this.localCache.firstKey();
    }

    @Override // java.util.SortedMap
    public K lastKey() {
        assertInitialized();
        return this.localCache.lastKey();
    }

    @Override // io.kcache.Cache
    public Cache<K, V> subCache(K k, boolean z, K k2, boolean z2) {
        assertInitialized();
        return this.localCache.subCache(k, z, k2, z2);
    }

    @Override // io.kcache.Cache
    public KeyValueIterator<K, V> range(K k, boolean z, K k2, boolean z2) {
        assertInitialized();
        return this.localCache.range(k, z, k2, z2);
    }

    @Override // io.kcache.Cache
    public KeyValueIterator<K, V> all() {
        assertInitialized();
        return this.localCache.all();
    }

    @Override // io.kcache.Cache
    public Cache<K, V> descendingCache() {
        assertInitialized();
        return this.localCache.descendingCache();
    }

    @Override // io.kcache.Cache
    public void flush() {
        if (this.producer != null) {
            this.producer.flush();
        }
        this.localCache.flush();
        this.cacheUpdateHandler.cacheFlushed();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.kafkaTopicReader != null) {
            try {
                this.kafkaTopicReader.shutdown();
            } catch (InterruptedException e) {
            }
        }
        Utils.closeQuietly(this.producer, "Kafka cache producer for " + this.clientId);
        this.localCache.close();
        if (this.checkpointFile != null) {
            this.checkpointFile.close();
        }
        this.cacheUpdateHandler.close();
        log.info("Kafka cache shut down complete for {}", this.clientId);
    }

    @Override // io.kcache.Cache
    public void destroy() throws IOException {
        this.localCache.destroy();
        this.cacheUpdateHandler.cacheDestroyed();
    }

    private void assertInitialized() throws CacheException {
        if (!this.initialized.get()) {
            throw new CacheException("Illegal state. Cache for " + this.clientId + " not initialized yet");
        }
    }

    KafkaCache<K, V>.WorkerThread getWorkerThread() {
        return this.kafkaTopicReader;
    }
}
