/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.connect.elasticsearch;

import io.confluent.connect.elasticsearch.OffsetState;
import io.confluent.connect.elasticsearch.OffsetTracker;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class AsyncOffsetTracker
implements OffsetTracker {
    private static final Logger log = LoggerFactory.getLogger(AsyncOffsetTracker.class);
    private final Map<TopicPartition, Map<Long, OffsetState>> offsetsByPartition = new HashMap<TopicPartition, Map<Long, OffsetState>>();
    private final Map<TopicPartition, Long> maxOffsetByPartition = new HashMap<TopicPartition, Long>();
    private final AtomicLong numEntries = new AtomicLong();
    private final SinkTaskContext context;

    public AsyncOffsetTracker(SinkTaskContext context) {
        this.context = context;
    }

    @Override
    public synchronized void closePartitions(Collection<TopicPartition> topicPartitions) {
        topicPartitions.forEach(tp -> {
            Map<Long, OffsetState> offsets = this.offsetsByPartition.remove(tp);
            if (offsets != null) {
                this.numEntries.getAndAdd(-offsets.size());
            }
            this.maxOffsetByPartition.remove(tp);
        });
    }

    @Override
    public synchronized OffsetState addPendingRecord(SinkRecord sinkRecord) {
        log.trace("Adding pending record");
        TopicPartition tp = new TopicPartition(sinkRecord.topic(), sinkRecord.kafkaPartition().intValue());
        if (!this.context.assignment().contains(tp)) {
            String msg = String.format("Found a topic name '%s' that doesn't match assigned partitions. Connector doesn't support topic mutating SMTs", sinkRecord.topic());
            throw new ConnectException(msg);
        }
        Long partitionMax = this.maxOffsetByPartition.get(tp);
        if (partitionMax == null || sinkRecord.kafkaOffset() > partitionMax) {
            this.numEntries.incrementAndGet();
            return this.offsetsByPartition.computeIfAbsent(tp, key -> new LinkedHashMap()).computeIfAbsent(sinkRecord.kafkaOffset(), AsyncOffsetState::new);
        }
        return new AsyncOffsetState(sinkRecord.kafkaOffset());
    }

    @Override
    public long numOffsetStateEntries() {
        return this.numEntries.get();
    }

    @Override
    public synchronized void updateOffsets() {
        log.trace("Updating offsets");
        this.offsetsByPartition.forEach((topicPartition, offsets) -> {
            OffsetState offsetState;
            Long max = this.maxOffsetByPartition.get(topicPartition);
            boolean newMaxFound = false;
            Iterator iterator = offsets.values().iterator();
            while (iterator.hasNext() && (offsetState = (OffsetState)iterator.next()).isProcessed()) {
                iterator.remove();
                this.numEntries.decrementAndGet();
                if (max != null && offsetState.offset() <= max) continue;
                max = offsetState.offset();
                newMaxFound = true;
            }
            if (newMaxFound) {
                this.maxOffsetByPartition.put((TopicPartition)topicPartition, max);
            }
        });
        log.trace("Updated offsets, num entries: {}", (Object)this.numEntries);
    }

    @Override
    public synchronized Map<TopicPartition, OffsetAndMetadata> offsets(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
        return this.maxOffsetByPartition.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata((Long)e.getValue() + 1L)));
    }

    static class AsyncOffsetState
    implements OffsetState {
        private final long offset;
        private volatile boolean processed;

        AsyncOffsetState(long offset) {
            this.offset = offset;
        }

        @Override
        public void markProcessed() {
            this.processed = true;
        }

        @Override
        public boolean isProcessed() {
            return this.processed;
        }

        @Override
        public long offset() {
            return this.offset;
        }
    }
}

