/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.processor.internals.AbstractPartitionGroup;
import org.apache.kafka.streams.processor.internals.RecordQueue;
import org.apache.kafka.streams.processor.internals.StampedRecord;
import org.slf4j.Logger;

class PartitionGroup
extends AbstractPartitionGroup {
    private final Logger logger;
    private final Map<TopicPartition, RecordQueue> partitionQueues;
    private final Function<TopicPartition, OptionalLong> lagProvider;
    private final Sensor enforcedProcessingSensor;
    private final long maxTaskIdleMs;
    private final Sensor recordLatenessSensor;
    private final PriorityQueue<RecordQueue> nonEmptyQueuesByTime;
    private long streamTime;
    private int totalBuffered;
    private boolean allBuffered;
    private final Map<TopicPartition, Long> idlePartitionDeadlines = new HashMap<TopicPartition, Long>();
    private final Map<TopicPartition, Long> fetchedLags = new HashMap<TopicPartition, Long>();

    PartitionGroup(LogContext logContext, Map<TopicPartition, RecordQueue> partitionQueues, Function<TopicPartition, OptionalLong> lagProvider, Sensor recordLatenessSensor, Sensor enforcedProcessingSensor, long maxTaskIdleMs) {
        this.logger = logContext.logger(PartitionGroup.class);
        this.nonEmptyQueuesByTime = new PriorityQueue<RecordQueue>(partitionQueues.size(), Comparator.comparingLong(RecordQueue::headRecordTimestamp));
        this.partitionQueues = partitionQueues;
        this.lagProvider = lagProvider;
        this.enforcedProcessingSensor = enforcedProcessingSensor;
        this.maxTaskIdleMs = maxTaskIdleMs;
        this.recordLatenessSensor = recordLatenessSensor;
        this.totalBuffered = 0;
        this.allBuffered = false;
        this.streamTime = -1L;
    }

    @Override
    AbstractPartitionGroup.ReadyToProcessResult readyToProcess(long wallClockTime) {
        if (this.maxTaskIdleMs == -1L) {
            if (this.logger.isTraceEnabled() && !this.allBuffered && this.totalBuffered > 0) {
                HashSet<TopicPartition> bufferedPartitions = new HashSet<TopicPartition>();
                HashSet<TopicPartition> emptyPartitions = new HashSet<TopicPartition>();
                for (Map.Entry<TopicPartition, RecordQueue> entry : this.partitionQueues.entrySet()) {
                    if (entry.getValue().isEmpty()) {
                        emptyPartitions.add(entry.getKey());
                        continue;
                    }
                    bufferedPartitions.add(entry.getKey());
                }
                this.logger.trace("Ready for processing because max.task.idle.ms is disabled.\n\tThere may be out-of-order processing for this task as a result.\n\tBuffered partitions: {}\n\tNon-buffered partitions: {}", bufferedPartitions, emptyPartitions);
            }
            return new AbstractPartitionGroup.ReadyToProcessResult(true, Optional.empty());
        }
        HashSet<TopicPartition> queued = new HashSet<TopicPartition>();
        HashMap<TopicPartition, Long> enforced = null;
        StringBuilder logMessageBuilder = new StringBuilder();
        for (Map.Entry<TopicPartition, RecordQueue> entry : this.partitionQueues.entrySet()) {
            TopicPartition partition = entry.getKey();
            RecordQueue queue = entry.getValue();
            if (!queue.isEmpty()) {
                this.logger.trace("Partition {} has buffered data, ready for processing", (Object)partition);
                this.idlePartitionDeadlines.remove(partition);
                queued.add(partition);
                continue;
            }
            Long fetchedLag = this.fetchedLags.getOrDefault(partition, -1L);
            this.appendLog(logMessageBuilder, String.format("Partition %s has fetched lag of %d", partition, fetchedLag));
            if (fetchedLag == -1L) {
                this.idlePartitionDeadlines.remove(partition);
                this.appendLog(logMessageBuilder, String.format("\tWaiting to fetch data for %s", partition));
                return new AbstractPartitionGroup.ReadyToProcessResult(false, Optional.of(logMessageBuilder.toString()));
            }
            if (fetchedLag > 0L) {
                this.idlePartitionDeadlines.remove(partition);
                this.appendLog(logMessageBuilder, String.format("Partition %s has current lag %d, but no data is buffered locally. Waiting to buffer some records.", partition, fetchedLag));
                return new AbstractPartitionGroup.ReadyToProcessResult(false, Optional.of(logMessageBuilder.toString()));
            }
            this.idlePartitionDeadlines.putIfAbsent(partition, wallClockTime + this.maxTaskIdleMs);
            long deadline = this.idlePartitionDeadlines.get(partition);
            if (wallClockTime < deadline) {
                this.appendLog(logMessageBuilder, String.format("Partition %s has current lag 0 and current time is %d. Waiting for new data to be produced for configured idle time %d (deadline is %d).", partition, wallClockTime, this.maxTaskIdleMs, deadline));
                return new AbstractPartitionGroup.ReadyToProcessResult(false, Optional.of(logMessageBuilder.toString()));
            }
            this.logger.trace("Partition {} is ready for processing due to the task idling deadline passing", (Object)partition);
            if (enforced == null) {
                enforced = new HashMap<TopicPartition, Long>();
            }
            enforced.put(partition, deadline);
        }
        if (enforced == null) {
            this.logger.trace("All partitions were buffered locally, so this task is ready for processing.");
            return new AbstractPartitionGroup.ReadyToProcessResult(true, Optional.empty());
        }
        if (queued.isEmpty()) {
            this.appendLog(logMessageBuilder, "No partitions were buffered locally, so this task is not ready for processing.");
            return new AbstractPartitionGroup.ReadyToProcessResult(false, Optional.of(logMessageBuilder.toString()));
        }
        this.enforcedProcessingSensor.record(1.0, wallClockTime);
        this.logger.trace("Continuing to process although some partitions are empty on the broker.\n\tThere may be out-of-order processing for this task as a result.\n\tPartitions with local data: {}.\n\tPartitions we gave up waiting for, with their corresponding deadlines: {}.\n\tConfigured max.task.idle.ms: {}.\n\tCurrent wall-clock time: {}.", new Object[]{queued, enforced, this.maxTaskIdleMs, wallClockTime});
        return new AbstractPartitionGroup.ReadyToProcessResult(true, Optional.empty());
    }

    @Override
    long partitionTimestamp(TopicPartition partition) {
        RecordQueue queue = this.partitionQueues.get(partition);
        if (queue == null) {
            throw new IllegalStateException("Partition " + String.valueOf(partition) + " not found.");
        }
        return queue.partitionTime();
    }

    @Override
    void updatePartitions(Set<TopicPartition> inputPartitions, Function<TopicPartition, RecordQueue> recordQueueCreator) {
        HashSet<TopicPartition> removedPartitions = new HashSet<TopicPartition>();
        HashSet<TopicPartition> newInputPartitions = new HashSet<TopicPartition>(inputPartitions);
        Iterator<Map.Entry<TopicPartition, RecordQueue>> queuesIterator = this.partitionQueues.entrySet().iterator();
        while (queuesIterator.hasNext()) {
            Map.Entry<TopicPartition, RecordQueue> queueEntry = queuesIterator.next();
            TopicPartition topicPartition = queueEntry.getKey();
            if (!newInputPartitions.contains(topicPartition)) {
                this.totalBuffered -= queueEntry.getValue().size();
                queuesIterator.remove();
                removedPartitions.add(topicPartition);
            }
            newInputPartitions.remove(topicPartition);
        }
        for (TopicPartition newInputPartition : newInputPartitions) {
            this.partitionQueues.put(newInputPartition, recordQueueCreator.apply(newInputPartition));
        }
        this.nonEmptyQueuesByTime.removeIf(q -> removedPartitions.contains(q.partition()));
        this.allBuffered = this.allBuffered && newInputPartitions.isEmpty();
    }

    @Override
    void setPartitionTime(TopicPartition partition, long partitionTime) {
        RecordQueue queue = this.partitionQueues.get(partition);
        if (queue == null) {
            throw new IllegalStateException("Partition " + String.valueOf(partition) + " not found.");
        }
        if (this.streamTime < partitionTime) {
            this.streamTime = partitionTime;
        }
        queue.setPartitionTime(partitionTime);
    }

    @Override
    StampedRecord nextRecord(AbstractPartitionGroup.RecordInfo info, long wallClockTime) {
        RecordQueue queue;
        StampedRecord record = null;
        info.queue = queue = this.nonEmptyQueuesByTime.poll();
        if (queue != null) {
            int oldSize = queue.size();
            record = queue.poll(wallClockTime);
            if (record != null) {
                this.totalBuffered -= oldSize - queue.size();
                this.logger.trace("Partition {} polling next record:, oldSize={}, newSize={}, totalBuffered={}, recordTimestamp={}", new Object[]{queue.partition(), oldSize, queue.size(), this.totalBuffered, record.timestamp});
                if (queue.isEmpty()) {
                    this.allBuffered = false;
                    this.logger.trace("Partition {} queue is now empty, allBuffered=false", (Object)queue.partition());
                } else {
                    this.nonEmptyQueuesByTime.offer(queue);
                }
                if (record.timestamp > this.streamTime) {
                    long oldStreamTime = this.streamTime;
                    this.streamTime = record.timestamp;
                    this.recordLatenessSensor.record(0.0, wallClockTime);
                    this.logger.trace("Partition {} stream time updated from {} to {}", new Object[]{queue.partition(), oldStreamTime, this.streamTime});
                } else {
                    this.recordLatenessSensor.record((double)(this.streamTime - record.timestamp), wallClockTime);
                }
            }
        } else {
            this.logger.trace("Partition pulling nextRecord: no queue available");
        }
        return record;
    }

    @Override
    int addRawRecords(TopicPartition partition, Iterable<ConsumerRecord<byte[], byte[]>> rawRecords) {
        RecordQueue recordQueue = this.partitionQueues.get(partition);
        if (recordQueue == null) {
            throw new IllegalStateException("Partition " + String.valueOf(partition) + " not found.");
        }
        int oldSize = recordQueue.size();
        int newSize = recordQueue.addRawRecords(rawRecords);
        if (oldSize == 0 && newSize > 0) {
            this.nonEmptyQueuesByTime.offer(recordQueue);
            if (this.nonEmptyQueuesByTime.size() == this.partitionQueues.size()) {
                this.allBuffered = true;
            }
        }
        this.totalBuffered += newSize - oldSize;
        return newSize;
    }

    @Override
    Set<TopicPartition> partitions() {
        return Collections.unmodifiableSet(this.partitionQueues.keySet());
    }

    @Override
    long streamTime() {
        return this.streamTime;
    }

    @Override
    Long headRecordOffset(TopicPartition partition) {
        RecordQueue recordQueue = this.partitionQueues.get(partition);
        if (recordQueue == null) {
            throw new IllegalStateException("Partition " + String.valueOf(partition) + " not found.");
        }
        return recordQueue.headRecordOffset();
    }

    @Override
    Optional<Integer> headRecordLeaderEpoch(TopicPartition partition) {
        RecordQueue recordQueue = this.partitionQueues.get(partition);
        if (recordQueue == null) {
            throw new IllegalStateException("Partition " + String.valueOf(partition) + " not found.");
        }
        return recordQueue.headRecordLeaderEpoch();
    }

    @Override
    int numBuffered(TopicPartition partition) {
        RecordQueue recordQueue = this.partitionQueues.get(partition);
        if (recordQueue == null) {
            throw new IllegalStateException("Partition " + String.valueOf(partition) + " not found.");
        }
        return recordQueue.size();
    }

    @Override
    int numBuffered() {
        return this.totalBuffered;
    }

    boolean allPartitionsBufferedLocally() {
        return this.allBuffered;
    }

    @Override
    void clear() {
        for (RecordQueue queue : this.partitionQueues.values()) {
            queue.clear();
        }
        this.nonEmptyQueuesByTime.clear();
        this.totalBuffered = 0;
        this.streamTime = -1L;
        this.fetchedLags.clear();
    }

    @Override
    void close() {
        for (RecordQueue queue : this.partitionQueues.values()) {
            queue.close();
        }
    }

    @Override
    void updateLags() {
        if (this.maxTaskIdleMs != -1L) {
            for (TopicPartition tp : this.partitionQueues.keySet()) {
                OptionalLong l = this.lagProvider.apply(tp);
                if (l.isPresent()) {
                    this.fetchedLags.put(tp, l.getAsLong());
                    this.logger.trace("Updated lag for {} to {}", (Object)tp, (Object)l.getAsLong());
                    continue;
                }
                this.fetchedLags.remove(tp);
            }
        }
    }

    private void appendLog(StringBuilder sb, String msg) {
        if (sb.length() > 0) {
            sb.append("\n");
        }
        sb.append(msg);
    }
}

