package org.apache.kafka.connect.runtime;

import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/connect/runtime/SubmittedRecords.class */
class SubmittedRecords {
    private static final Logger log = LoggerFactory.getLogger(SubmittedRecords.class);
    final Map<Map<String, Object>, Deque<SubmittedRecord>> records = new HashMap();
    private int numUnackedMessages = 0;
    private CountDownLatch messageDrainLatch;

    /* loaded from: input_file:org/apache/kafka/connect/runtime/SubmittedRecords$CommittableOffsets.class */
    static class CommittableOffsets {
        public static final CommittableOffsets EMPTY = new CommittableOffsets(Collections.emptyMap(), 0, 0, 0, 0, null);
        private final Map<Map<String, Object>, Map<String, Object>> offsets;
        private final int numCommittableMessages;
        private final int numUncommittableMessages;
        private final int numDeques;
        private final int largestDequeSize;
        private final Map<String, Object> largestDequePartition;

        CommittableOffsets(Map<Map<String, Object>, Map<String, Object>> map, int i, int i2, int i3, int i4, Map<String, Object> map2) {
            this.offsets = map != null ? new HashMap<>(map) : Collections.emptyMap();
            this.numCommittableMessages = i;
            this.numUncommittableMessages = i2;
            this.numDeques = i3;
            this.largestDequeSize = i4;
            this.largestDequePartition = map2;
        }

        public Map<Map<String, Object>, Map<String, Object>> offsets() {
            return Collections.unmodifiableMap(this.offsets);
        }

        public int numCommittableMessages() {
            return this.numCommittableMessages;
        }

        public int numUncommittableMessages() {
            return this.numUncommittableMessages;
        }

        public int numDeques() {
            return this.numDeques;
        }

        public int largestDequeSize() {
            return this.largestDequeSize;
        }

        public Map<String, Object> largestDequePartition() {
            return this.largestDequePartition;
        }

        public boolean hasPending() {
            return this.numUncommittableMessages > 0;
        }

        public boolean isEmpty() {
            return this.numCommittableMessages == 0 && this.numUncommittableMessages == 0 && this.offsets.isEmpty();
        }

        public CommittableOffsets updatedWith(CommittableOffsets committableOffsets) {
            HashMap hashMap = new HashMap(this.offsets);
            hashMap.putAll(committableOffsets.offsets);
            return new CommittableOffsets(hashMap, this.numCommittableMessages + committableOffsets.numCommittableMessages, committableOffsets.numUncommittableMessages, committableOffsets.numDeques, committableOffsets.largestDequeSize, committableOffsets.largestDequePartition);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/connect/runtime/SubmittedRecords$SubmittedRecord.class */
    public class SubmittedRecord {
        private final Map<String, Object> partition;
        private final Map<String, Object> offset;
        private final AtomicBoolean acked = new AtomicBoolean(false);

        public SubmittedRecord(Map<String, Object> map, Map<String, Object> map2) {
            this.partition = map;
            this.offset = map2;
        }

        public void ack() {
            if (this.acked.compareAndSet(false, true)) {
                SubmittedRecords.this.messageAcked();
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean acked() {
            return this.acked.get();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Map<String, Object> partition() {
            return this.partition;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Map<String, Object> offset() {
            return this.offset;
        }
    }

    public SubmittedRecord submit(SourceRecord sourceRecord) {
        return submit(sourceRecord.sourcePartition(), sourceRecord.sourceOffset());
    }

    SubmittedRecord submit(Map<String, Object> map, Map<String, Object> map2) {
        SubmittedRecord submittedRecord = new SubmittedRecord(map, map2);
        this.records.computeIfAbsent(submittedRecord.partition(), map3 -> {
            return new LinkedList();
        }).add(submittedRecord);
        synchronized (this) {
            this.numUnackedMessages++;
        }
        return submittedRecord;
    }

    public boolean removeLastOccurrence(SubmittedRecord submittedRecord) {
        Deque<SubmittedRecord> deque = this.records.get(submittedRecord.partition());
        if (deque == null) {
            log.warn("Attempted to remove record from submitted queue for partition {}, but no records with that partition appear to have been submitted", submittedRecord.partition());
            return false;
        }
        boolean removeLastOccurrence = deque.removeLastOccurrence(submittedRecord);
        if (deque.isEmpty()) {
            this.records.remove(submittedRecord.partition());
        }
        if (removeLastOccurrence) {
            messageAcked();
        } else {
            log.warn("Attempted to remove record from submitted queue for partition {}, but the record has not been submitted or has already been removed", submittedRecord.partition());
        }
        return removeLastOccurrence;
    }

    public CommittableOffsets committableOffsets() {
        HashMap hashMap = new HashMap();
        int i = 0;
        int i2 = 0;
        int i3 = 0;
        Map<String, Object> map = null;
        for (Map.Entry<Map<String, Object>, Deque<SubmittedRecord>> entry : this.records.entrySet()) {
            Map<String, Object> key = entry.getKey();
            Deque<SubmittedRecord> value = entry.getValue();
            int size = value.size();
            if (canCommitHead(value)) {
                hashMap.put(key, committableOffset(value));
            }
            int size2 = value.size();
            i += size - size2;
            i2 += size2;
            if (size2 > i3) {
                i3 = size2;
                map = key;
            }
        }
        this.records.values().removeIf((v0) -> {
            return v0.isEmpty();
        });
        return new CommittableOffsets(hashMap, i, i2, this.records.size(), i3, map);
    }

    public boolean awaitAllMessages(long j, TimeUnit timeUnit) {
        CountDownLatch countDownLatch;
        synchronized (this) {
            countDownLatch = new CountDownLatch(this.numUnackedMessages);
            this.messageDrainLatch = countDownLatch;
        }
        try {
            return countDownLatch.await(j, timeUnit);
        } catch (InterruptedException e) {
            return false;
        }
    }

    private Map<String, Object> committableOffset(Deque<SubmittedRecord> deque) {
        Map<String, Object> map = null;
        while (true) {
            Map<String, Object> map2 = map;
            if (!canCommitHead(deque)) {
                return map2;
            }
            map = deque.poll().offset();
        }
    }

    private boolean canCommitHead(Deque<SubmittedRecord> deque) {
        return deque.peek() != null && deque.peek().acked();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void messageAcked() {
        this.numUnackedMessages--;
        if (this.messageDrainLatch != null) {
            this.messageDrainLatch.countDown();
        }
    }
}
