package io.confluent.parallelconsumer.state;

import io.confluent.csid.utils.LoopingResumingIterator;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import java.time.Clock;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.NavigableSet;
import java.util.Optional;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/parallelconsumer/state/ShardManager.class */
public class ShardManager<K, V> {
    private static final Logger log = LoggerFactory.getLogger(ShardManager.class);
    private final ParallelConsumerOptions options;
    private final WorkManager<K, V> wm;
    private final Clock clock;
    private final Map<ShardKey, ProcessingShard<K, V>> processingShards = new ConcurrentHashMap();
    private final NavigableSet<WorkContainer<?, ?>> retryQueue = new TreeSet(Comparator.comparing(workContainer -> {
        return workContainer.getDelayUntilRetryDue();
    }));
    private Optional<ShardKey> iterationResumePoint = Optional.empty();

    Optional<ProcessingShard<K, V>> getShard(ShardKey shardKey) {
        return Optional.ofNullable(this.processingShards.get(shardKey));
    }

    private LoopingResumingIterator<ShardKey, ProcessingShard<K, V>> getIterator(Optional<ShardKey> optional) {
        return new LoopingResumingIterator<>(optional, this.processingShards);
    }

    ShardKey computeShardKey(WorkContainer<?, ?> workContainer) {
        return ShardKey.of(workContainer, this.options.getOrdering());
    }

    public long getNumberOfWorkQueuedInShardsAwaitingSelection() {
        return this.processingShards.values().stream().mapToLong((v0) -> {
            return v0.getCountOfWorkAwaitingSelection();
        }).sum();
    }

    public boolean workIsWaitingToBeProcessed() {
        return this.processingShards.values().parallelStream().anyMatch((v0) -> {
            return v0.workIsWaitingToBeProcessed();
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void removeAnyShardsReferencedBy(NavigableMap<Long, WorkContainer<K, V>> navigableMap) {
        Iterator<WorkContainer<K, V>> it = navigableMap.values().iterator();
        while (it.hasNext()) {
            removeShardFor(it.next());
        }
    }

    private void removeShardFor(WorkContainer<K, V> workContainer) {
        ShardKey computeShardKey = computeShardKey(workContainer);
        if (this.processingShards.containsKey(computeShardKey)) {
            this.processingShards.get(computeShardKey).remove(workContainer.offset());
            removeShardIfEmpty(computeShardKey);
        } else {
            log.trace("Shard referenced by WC: {} with shard key: {} already removed", workContainer, computeShardKey);
        }
        this.retryQueue.remove(workContainer);
    }

    public void addWorkContainer(WorkContainer<K, V> workContainer) {
        ShardKey computeShardKey = computeShardKey(workContainer);
        this.processingShards.computeIfAbsent(computeShardKey, shardKey -> {
            return new ProcessingShard(computeShardKey, this.options, this.wm.getPm());
        }).addWorkContainer(workContainer);
    }

    void removeShardIfEmpty(ShardKey shardKey) {
        Optional<ProcessingShard<K, V>> shard = getShard(shardKey);
        if (this.options.getOrdering().equals(ParallelConsumerOptions.ProcessingOrder.KEY) && shard.isPresent() && shard.get().isEmpty()) {
            log.trace("Removing empty shard (key: {})", shardKey);
            this.processingShards.remove(shardKey);
        }
    }

    public void onSuccess(WorkContainer<?, ?> workContainer) {
        this.retryQueue.remove(workContainer);
        ShardKey computeShardKey = computeShardKey(workContainer);
        Optional<ProcessingShard<K, V>> shard = getShard(computeShardKey);
        if (!shard.isPresent()) {
            log.trace("Dropping successful result for revoked partition {}. Record in question was: {}", computeShardKey, workContainer.getCr());
        } else {
            shard.get().onSuccess(workContainer);
            removeShardIfEmpty(computeShardKey);
        }
    }

    public void onFailure(WorkContainer<?, ?> workContainer) {
        log.debug("Work FAILED");
        this.retryQueue.add(workContainer);
    }

    public Optional<Duration> getLowestRetryTime() {
        for (WorkContainer<?, ?> workContainer : this.retryQueue) {
            if (workContainer.isNotInFlight()) {
                return Optional.of(workContainer.getDelayUntilRetryDue());
            }
        }
        return Optional.empty();
    }

    public List<WorkContainer<K, V>> getWorkIfAvailable(int i) {
        LoopingResumingIterator<ShardKey, ProcessingShard<K, V>> iterator = getIterator(this.iterationResumePoint);
        ArrayList arrayList = new ArrayList();
        while (arrayList.size() < i && iterator.hasNext()) {
            arrayList.addAll(iterator.next().getValue().getWorkIfAvailable(i - arrayList.size()));
        }
        if (arrayList.size() >= i) {
            log.debug("Work taken is now over max (iteration resume point is {})", this.iterationResumePoint);
        }
        updateResumePoint(iterator);
        return arrayList;
    }

    private void updateResumePoint(LoopingResumingIterator<ShardKey, ProcessingShard<K, V>> loopingResumingIterator) {
        if (loopingResumingIterator.hasNext()) {
            this.iterationResumePoint = Optional.of(loopingResumingIterator.next().getKey());
            log.debug("Work taken is now over max, stopping (saving iteration resume point {})", this.iterationResumePoint);
        }
    }

    public ShardManager(ParallelConsumerOptions parallelConsumerOptions, WorkManager<K, V> workManager, Clock clock) {
        this.options = parallelConsumerOptions;
        this.wm = workManager;
        this.clock = clock;
    }

    public ParallelConsumerOptions getOptions() {
        return this.options;
    }

    private Clock getClock() {
        return this.clock;
    }
}
