package org.apache.kafka.connect.storage;

import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConvertingFutureCallback;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.connect.util.TopicAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/connect/storage/KafkaOffsetBackingStore.class */
public class KafkaOffsetBackingStore implements OffsetBackingStore {
    private static final Logger log = LoggerFactory.getLogger(KafkaOffsetBackingStore.class);
    private KafkaBasedLog<byte[], byte[]> offsetLog;
    private HashMap<ByteBuffer, ByteBuffer> data;
    private final Callback<ConsumerRecord<byte[], byte[]>> consumedCallback = new Callback<ConsumerRecord<byte[], byte[]>>() { // from class: org.apache.kafka.connect.storage.KafkaOffsetBackingStore.3
        @Override // org.apache.kafka.connect.util.Callback
        public void onCompletion(Throwable th, ConsumerRecord<byte[], byte[]> consumerRecord) {
            KafkaOffsetBackingStore.this.data.put(consumerRecord.key() != null ? ByteBuffer.wrap((byte[]) consumerRecord.key()) : null, consumerRecord.value() != null ? ByteBuffer.wrap((byte[]) consumerRecord.value()) : null);
        }
    };

    /* loaded from: input_file:org/apache/kafka/connect/storage/KafkaOffsetBackingStore$SetCallbackFuture.class */
    private static class SetCallbackFuture implements org.apache.kafka.clients.producer.Callback, Future<Void> {
        private int numLeft;
        private boolean completed = false;
        private Throwable exception = null;
        private final Callback<Void> callback;

        public SetCallbackFuture(int i, Callback<Void> callback) {
            this.numLeft = i;
            this.callback = callback;
        }

        public synchronized void onCompletion(RecordMetadata recordMetadata, Exception exc) {
            if (exc != null) {
                if (this.completed) {
                    return;
                }
                this.exception = exc;
                this.callback.onCompletion(exc, null);
                this.completed = true;
                notify();
                return;
            }
            this.numLeft--;
            if (this.numLeft == 0) {
                this.callback.onCompletion(null, null);
                this.completed = true;
                notify();
            }
        }

        @Override // java.util.concurrent.Future
        public synchronized boolean cancel(boolean z) {
            return false;
        }

        @Override // java.util.concurrent.Future
        public synchronized boolean isCancelled() {
            return false;
        }

        @Override // java.util.concurrent.Future
        public synchronized boolean isDone() {
            return this.completed;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Future
        public synchronized Void get() throws InterruptedException, ExecutionException {
            while (!this.completed) {
                wait();
            }
            if (this.exception != null) {
                throw new ExecutionException(this.exception);
            }
            return null;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Future
        public synchronized Void get(long j, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
            long currentTimeMillis = System.currentTimeMillis() + timeUnit.toMillis(j);
            while (!this.completed) {
                long currentTimeMillis2 = currentTimeMillis - System.currentTimeMillis();
                if (currentTimeMillis2 < 0) {
                    throw new TimeoutException("KafkaOffsetBackingStore Future timed out.");
                }
                wait(currentTimeMillis2);
            }
            if (this.exception != null) {
                throw new ExecutionException(this.exception);
            }
            return null;
        }
    }

    @Override // org.apache.kafka.connect.storage.OffsetBackingStore
    public void configure(WorkerConfig workerConfig) {
        String string = workerConfig.getString(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG);
        if (string == null || string.trim().length() == 0) {
            throw new ConfigException("Offset storage topic must be specified");
        }
        this.data = new HashMap<>();
        Map originals = workerConfig.originals();
        HashMap hashMap = new HashMap(originals);
        hashMap.put("key.serializer", ByteArraySerializer.class.getName());
        hashMap.put("value.serializer", ByteArraySerializer.class.getName());
        hashMap.put("delivery.timeout.ms", Integer.MAX_VALUE);
        HashMap hashMap2 = new HashMap(originals);
        hashMap2.put("key.deserializer", ByteArrayDeserializer.class.getName());
        hashMap2.put("value.deserializer", ByteArrayDeserializer.class.getName());
        HashMap hashMap3 = new HashMap(originals);
        this.offsetLog = createKafkaBasedLog(string, hashMap, hashMap2, this.consumedCallback, TopicAdmin.defineTopic(string).compacted().partitions(workerConfig.getInt(DistributedConfig.OFFSET_STORAGE_PARTITIONS_CONFIG).intValue()).replicationFactor(workerConfig.getShort(DistributedConfig.OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG).shortValue()).build(), hashMap3);
    }

    private KafkaBasedLog<byte[], byte[]> createKafkaBasedLog(String str, Map<String, Object> map, Map<String, Object> map2, Callback<ConsumerRecord<byte[], byte[]>> callback, final NewTopic newTopic, final Map<String, Object> map3) {
        return new KafkaBasedLog<>(str, map, map2, callback, Time.SYSTEM, new Runnable() { // from class: org.apache.kafka.connect.storage.KafkaOffsetBackingStore.1
            @Override // java.lang.Runnable
            public void run() {
                KafkaOffsetBackingStore.log.debug("Creating admin client to manage Connect internal offset topic");
                TopicAdmin topicAdmin = new TopicAdmin(map3);
                Throwable th = null;
                try {
                    topicAdmin.createTopics(newTopic);
                    if (topicAdmin != null) {
                        if (0 == 0) {
                            topicAdmin.close();
                            return;
                        }
                        try {
                            topicAdmin.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                } catch (Throwable th3) {
                    if (topicAdmin != null) {
                        if (0 != 0) {
                            try {
                                topicAdmin.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            topicAdmin.close();
                        }
                    }
                    throw th3;
                }
            }
        });
    }

    @Override // org.apache.kafka.connect.storage.OffsetBackingStore
    public void start() {
        log.info("Starting KafkaOffsetBackingStore");
        this.offsetLog.start();
        log.info("Finished reading offsets topic and starting KafkaOffsetBackingStore");
    }

    @Override // org.apache.kafka.connect.storage.OffsetBackingStore
    public void stop() {
        log.info("Stopping KafkaOffsetBackingStore");
        this.offsetLog.stop();
        log.info("Stopped KafkaOffsetBackingStore");
    }

    @Override // org.apache.kafka.connect.storage.OffsetBackingStore
    public Future<Map<ByteBuffer, ByteBuffer>> get(final Collection<ByteBuffer> collection) {
        ConvertingFutureCallback<Void, Map<ByteBuffer, ByteBuffer>> convertingFutureCallback = new ConvertingFutureCallback<Void, Map<ByteBuffer, ByteBuffer>>() { // from class: org.apache.kafka.connect.storage.KafkaOffsetBackingStore.2
            @Override // org.apache.kafka.connect.util.ConvertingFutureCallback
            public Map<ByteBuffer, ByteBuffer> convert(Void r6) {
                HashMap hashMap = new HashMap();
                for (ByteBuffer byteBuffer : collection) {
                    hashMap.put(byteBuffer, KafkaOffsetBackingStore.this.data.get(byteBuffer));
                }
                return hashMap;
            }
        };
        this.offsetLog.readToEnd(convertingFutureCallback);
        return convertingFutureCallback;
    }

    @Override // org.apache.kafka.connect.storage.OffsetBackingStore
    public Future<Void> set(Map<ByteBuffer, ByteBuffer> map, Callback<Void> callback) {
        SetCallbackFuture setCallbackFuture = new SetCallbackFuture(map.size(), callback);
        for (Map.Entry<ByteBuffer, ByteBuffer> entry : map.entrySet()) {
            ByteBuffer key = entry.getKey();
            ByteBuffer value = entry.getValue();
            this.offsetLog.send(key == null ? null : key.array(), value == null ? null : value.array(), setCallbackFuture);
        }
        return setCallbackFuture;
    }
}
