/*
 * Decompiled with CFR 0.152.
 */
package org.nuxeo.lib.stream.log.kafka;

import java.io.Externalizable;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.utils.Bytes;
import org.nuxeo.lib.stream.codec.Codec;
import org.nuxeo.lib.stream.codec.NoCodec;
import org.nuxeo.lib.stream.codec.SerializableCodec;
import org.nuxeo.lib.stream.log.LogOffset;
import org.nuxeo.lib.stream.log.LogPartition;
import org.nuxeo.lib.stream.log.LogRecord;
import org.nuxeo.lib.stream.log.LogTailer;
import org.nuxeo.lib.stream.log.Name;
import org.nuxeo.lib.stream.log.NameResolver;
import org.nuxeo.lib.stream.log.RebalanceException;
import org.nuxeo.lib.stream.log.RebalanceListener;
import org.nuxeo.lib.stream.log.internals.LogOffsetImpl;

public class KafkaLogTailer<M extends Externalizable>
implements LogTailer<M>,
ConsumerRebalanceListener {
    private static final Log log = LogFactory.getLog(KafkaLogTailer.class);
    protected final Name group;
    protected final Map<TopicPartition, Long> lastOffsets = new HashMap<TopicPartition, Long>();
    protected final Map<TopicPartition, Long> lastCommittedOffsets = new HashMap<TopicPartition, Long>();
    protected final Queue<ConsumerRecord<String, Bytes>> records = new LinkedList<ConsumerRecord<String, Bytes>>();
    protected final Codec<M> codec;
    protected final Codec<M> decodeCodec;
    protected final NameResolver resolver;
    protected KafkaConsumer<String, Bytes> consumer;
    protected String id;
    protected Collection<TopicPartition> topicPartitions;
    protected Collection<LogPartition> partitions;
    protected boolean closed;
    protected Collection<Name> names;
    protected RebalanceListener listener;
    protected boolean isRebalanced;
    protected boolean isRevoked;
    protected boolean isLost;
    protected boolean consumerMoved;
    protected static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
    protected long lastPollTimestamp;
    protected int lastPollSize;

    protected KafkaLogTailer(Codec<M> codec, NameResolver resolver, Name group, Properties consumerProps) {
        this.codec = codec;
        this.resolver = resolver;
        this.decodeCodec = NoCodec.NO_CODEC.equals(codec) ? new SerializableCodec<M>() : codec;
        Objects.requireNonNull(group);
        this.group = group;
        consumerProps.put("group.id", resolver.getId(group));
        consumerProps.put("client.id", resolver.getId(group) + "-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement());
        this.consumer = new KafkaConsumer(consumerProps);
    }

    public static <M extends Externalizable> KafkaLogTailer<M> createAndAssign(Codec<M> codec, NameResolver resolver, Collection<LogPartition> partitions, Name group, Properties consumerProps) {
        KafkaLogTailer<M> ret = new KafkaLogTailer<M>(codec, resolver, group, consumerProps);
        ret.id = KafkaLogTailer.buildId(ret.group, partitions);
        ret.partitions = partitions;
        ret.topicPartitions = partitions.stream().map(partition -> new TopicPartition(resolver.getId(partition.name()), partition.partition())).collect(Collectors.toList());
        ret.consumer.assign(ret.topicPartitions);
        log.debug((Object)String.format("Created tailer with assignments: %s", ret.id));
        return ret;
    }

    public static <M extends Externalizable> KafkaLogTailer<M> createAndSubscribe(Codec<M> codec, NameResolver resolver, Collection<Name> names, Name group, Properties consumerProps, RebalanceListener listener) {
        KafkaLogTailer<M> ret = new KafkaLogTailer<M>(codec, resolver, group, consumerProps);
        ret.id = KafkaLogTailer.buildSubscribeId(ret.group, names);
        ret.names = names;
        Collection topics = names.stream().map(resolver::getId).collect(Collectors.toList());
        ret.listener = listener;
        ret.consumer.subscribe(topics, ret);
        ret.partitions = Collections.emptyList();
        ret.topicPartitions = Collections.emptyList();
        log.debug((Object)String.format("Created tailer with subscription: %s", ret.id));
        return ret;
    }

    protected static String buildId(Name group, Collection<LogPartition> partitions) {
        return group.getId() + ":" + partitions.stream().map(LogPartition::toString).collect(Collectors.joining("|"));
    }

    protected static String buildSubscribeId(Name group, Collection<Name> names) {
        return group.getId() + ":" + names.stream().map(Name::getId).collect(Collectors.joining("|"));
    }

    @Override
    public LogRecord<M> read(Duration timeout) throws InterruptedException {
        if (this.closed) {
            throw new IllegalStateException("The tailer has been closed.");
        }
        if (this.records.isEmpty()) {
            int items = this.poll(timeout);
            if (this.isRebalanced || this.isRevoked || this.isLost) {
                if (this.isRebalanced) {
                    log.debug((Object)"Rebalance happens during poll, raising exception");
                    this.isRebalanced = false;
                } else {
                    log.warn((Object)("Incomplete rebalance during poll, raising exception, revoked: " + this.isRevoked + ", lost: " + this.isLost));
                    this.isLost = false;
                    this.isRevoked = false;
                }
                throw new RebalanceException("Partitions has been rebalanced");
            }
            if (items == 0) {
                if (log.isTraceEnabled()) {
                    log.trace((Object)("No data " + this.id + " after " + timeout.toMillis() + " ms"));
                }
                return null;
            }
        }
        ConsumerRecord<String, Bytes> record = this.records.poll();
        this.lastOffsets.put(new TopicPartition(record.topic(), record.partition()), record.offset());
        Externalizable value = (Externalizable)this.decodeCodec.decode(((Bytes)record.value()).get());
        LogPartition partition = LogPartition.of(this.resolver.getName(record.topic()), record.partition());
        LogOffsetImpl offset = new LogOffsetImpl(partition, record.offset());
        this.consumerMoved = false;
        if (log.isDebugEnabled()) {
            log.debug((Object)String.format("Read from %s/%s, key: %s, value: %s", offset, this.group, record.key(), value));
        }
        return new LogRecord<Externalizable>(value, offset);
    }

    protected int poll(Duration timeout) throws InterruptedException {
        this.records.clear();
        try {
            this.lastPollTimestamp = System.currentTimeMillis();
            for (ConsumerRecord record : this.consumer.poll(timeout)) {
                if (log.isDebugEnabled() && this.records.isEmpty()) {
                    log.debug((Object)("Poll first record: " + this.resolver.getName(record.topic()).getUrn() + ":" + record.partition() + ":+" + record.offset()));
                }
                this.records.add((ConsumerRecord<String, Bytes>)record);
            }
        }
        catch (InterruptException e) {
            throw new InterruptedException(e.getMessage());
        }
        catch (WakeupException e) {
            log.debug((Object)"Receiving wakeup from another thread to close the tailer");
            this.close();
            throw new IllegalStateException("poll interrupted because tailer has been closed");
        }
        if (log.isDebugEnabled()) {
            String msg = "Polling " + this.id + " returns " + this.records.size() + " records";
            if (this.records.isEmpty()) {
                log.trace((Object)msg);
            } else {
                log.debug((Object)msg);
            }
        }
        this.lastPollSize = this.records.size();
        return this.records.size();
    }

    @Override
    public void toEnd() {
        log.debug((Object)("toEnd: " + this.id));
        this.lastOffsets.clear();
        this.records.clear();
        this.consumer.seekToEnd(Collections.emptyList());
        this.consumerMoved = true;
    }

    @Override
    public void toStart() {
        log.debug((Object)("toStart: " + this.id));
        this.lastOffsets.clear();
        this.records.clear();
        this.consumer.seekToBeginning(Collections.emptyList());
        this.consumerMoved = true;
    }

    @Override
    public void toLastCommitted() {
        String msg;
        if (log.isDebugEnabled()) {
            log.debug((Object)("toLastCommitted tailer: " + this.id));
        }
        if ((msg = this.consumer.assignment().stream().map(tp -> String.format("%s-%02d:+%d", this.resolver.getName(tp.topic()).getUrn(), tp.partition(), this.toLastCommitted((TopicPartition)tp))).collect(Collectors.joining("|"))).length() > 0 && log.isInfoEnabled()) {
            log.info((Object)("toLastCommitted offsets: " + this.group + ":" + msg));
        }
        this.lastOffsets.clear();
        this.records.clear();
        this.consumerMoved = false;
    }

    protected long toLastCommitted(TopicPartition topicPartition) {
        Map offsetMeta;
        Long offset = this.lastCommittedOffsets.get(topicPartition);
        if (offset == null && (offsetMeta = this.consumer.committed(Collections.singleton(topicPartition))) != null && offsetMeta.get(topicPartition) != null) {
            offset = ((OffsetAndMetadata)offsetMeta.get(topicPartition)).offset();
        }
        if (offset != null) {
            this.consumer.seek(topicPartition, offset.longValue());
        } else {
            this.consumer.seekToBeginning(Collections.singletonList(topicPartition));
            offset = this.consumer.position(topicPartition);
        }
        this.lastCommittedOffsets.put(topicPartition, offset);
        if (log.isDebugEnabled()) {
            log.debug((Object)String.format(" toLastCommitted: %s-%02d:+%d", this.resolver.getName(topicPartition.topic()).getUrn(), topicPartition.partition(), offset));
        }
        return offset;
    }

    @Override
    public void seek(LogOffset offset) {
        log.debug((Object)("Seek to: " + offset.offset() + " from tailer: " + this.id));
        TopicPartition topicPartition = new TopicPartition(this.resolver.getId(offset.partition().name()), offset.partition().partition());
        this.consumer.seek(topicPartition, offset.offset());
        this.lastOffsets.remove(topicPartition);
        int partition = topicPartition.partition();
        this.records.removeIf(rec -> rec.partition() == partition);
        this.consumerMoved = true;
    }

    @Override
    public void reset() {
        log.info((Object)("Reset committed offsets for all assigned partitions: " + this.topicPartitions + " tailer: " + this.id));
        Map beginningOffsets = this.consumer.beginningOffsets(this.topicPartitions);
        HashMap offsetToCommit = new HashMap();
        beginningOffsets.forEach((tp, offset) -> offsetToCommit.put(tp, new OffsetAndMetadata(offset.longValue())));
        this.consumer.commitSync(offsetToCommit);
        this.lastCommittedOffsets.clear();
        this.toLastCommitted();
    }

    @Override
    public void reset(LogPartition partition) {
        log.info((Object)("Reset committed offset for partition: " + partition + " tailer: " + this.id));
        TopicPartition topicPartition = new TopicPartition(this.resolver.getId(partition.name()), partition.partition());
        Map beginningOffsets = this.consumer.beginningOffsets(Collections.singleton(topicPartition));
        HashMap offsetToCommit = new HashMap();
        beginningOffsets.forEach((tp, offset) -> offsetToCommit.put(tp, new OffsetAndMetadata(offset.longValue())));
        this.consumer.commitSync(offsetToCommit);
        this.lastCommittedOffsets.remove(topicPartition);
        this.seek(new LogOffsetImpl(partition, (Long)beginningOffsets.get(topicPartition)));
    }

    @Override
    public LogOffset offsetForTimestamp(LogPartition partition, long timestamp) {
        OffsetAndTimestamp offsetAndTimestamp;
        TopicPartition topicPartition = new TopicPartition(this.resolver.getId(partition.name()), partition.partition());
        Map offsetsForTimes = this.consumer.offsetsForTimes(Collections.singletonMap(topicPartition, timestamp));
        if (offsetsForTimes.size() == 1 && (offsetAndTimestamp = (OffsetAndTimestamp)offsetsForTimes.get(topicPartition)) != null) {
            return new LogOffsetImpl(partition, offsetAndTimestamp.offset());
        }
        return null;
    }

    @Override
    public void commit() {
        if (this.consumerMoved) {
            this.forceCommit();
            return;
        }
        HashMap<TopicPartition, OffsetAndMetadata> offsetToCommit = new HashMap<TopicPartition, OffsetAndMetadata>();
        this.lastOffsets.forEach((tp, offset) -> offsetToCommit.put((TopicPartition)tp, new OffsetAndMetadata(offset + 1L)));
        this.lastOffsets.clear();
        if (offsetToCommit.isEmpty()) {
            return;
        }
        try {
            this.consumer.commitSync(offsetToCommit);
        }
        catch (CommitFailedException | RebalanceInProgressException e) {
            log.error((Object)("Fail to commit " + offsetToCommit + " assigned " + this.assignments() + " last committed: " + this.lastCommittedOffsets + " last size: " + this.lastPollSize + " elapsed: " + (System.currentTimeMillis() - this.lastPollTimestamp) + " ms, records polled: " + this.records.size()));
            throw e;
        }
        offsetToCommit.forEach((topicPartition, offset) -> this.lastCommittedOffsets.put((TopicPartition)topicPartition, offset.offset()));
        if (log.isDebugEnabled()) {
            String msg = offsetToCommit.entrySet().stream().map(entry -> String.format("%s-%02d:+%d", this.resolver.getName(((TopicPartition)entry.getKey()).topic()).getUrn(), ((TopicPartition)entry.getKey()).partition(), ((OffsetAndMetadata)entry.getValue()).offset())).collect(Collectors.joining("|"));
            log.debug((Object)("Committed offsets  " + this.group + ":" + msg));
        }
    }

    protected void forceCommit() {
        log.info((Object)"Force commit after a move");
        Map<TopicPartition, OffsetAndMetadata> offsets = this.topicPartitions.stream().collect(Collectors.toMap(tp -> tp, tp -> new OffsetAndMetadata(this.consumer.position(tp))));
        this.consumer.commitSync(offsets);
        offsets.forEach((topicPartition, offset) -> this.lastCommittedOffsets.put((TopicPartition)topicPartition, offset.offset()));
        this.consumerMoved = false;
        this.lastOffsets.clear();
    }

    @Override
    public LogOffset commit(LogPartition partition) {
        TopicPartition topicPartition = new TopicPartition(this.resolver.getId(partition.name()), partition.partition());
        Long offset = this.lastOffsets.get(topicPartition);
        if (offset == null) {
            if (log.isDebugEnabled()) {
                log.debug((Object)("unchanged partition, nothing to commit: " + partition));
            }
            return null;
        }
        offset = offset + 1L;
        this.consumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(offset.longValue())));
        LogOffsetImpl ret = new LogOffsetImpl(partition, offset);
        if (log.isInfoEnabled()) {
            log.info((Object)("Committed: " + offset + "/" + this.group));
        }
        return ret;
    }

    @Override
    public Collection<LogPartition> assignments() {
        return this.partitions;
    }

    @Override
    public Name group() {
        return this.group;
    }

    @Override
    public boolean closed() {
        return this.closed;
    }

    @Override
    public Codec<M> getCodec() {
        return this.codec;
    }

    @Override
    public void close() {
        if (this.consumer != null) {
            log.debug((Object)("Closing tailer: " + this.id));
            try {
                this.consumer.close();
            }
            catch (ConcurrentModificationException e) {
                if (this.consumer != null) {
                    log.info((Object)"Closing tailer from another thread, send wakeup");
                    this.consumer.wakeup();
                }
                return;
            }
            catch (IllegalStateException | InterruptException e) {
                log.info((Object)"Discard error while closing consumer: ", e);
            }
            catch (Throwable t) {
                log.error((Object)"interrupted", t);
            }
            this.consumer = null;
        }
        this.closed = true;
    }

    public String toString() {
        return "KafkaLogTailer{id=" + this.id + ", closed=" + this.closed + ", codec=" + this.codec + "}";
    }

    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        Collection revoked = partitions.stream().map(tp -> LogPartition.of(this.resolver.getName(tp.topic()), tp.partition())).collect(Collectors.toList());
        log.info((Object)String.format("Rebalance revoked: %s", revoked));
        this.cleanDuringRebalancing();
        this.isRevoked = true;
        this.id = this.id + "-revoked";
        if (this.listener != null) {
            this.listener.onPartitionsRevoked(revoked);
        }
    }

    public void onPartitionsAssigned(Collection<TopicPartition> newPartitions) {
        this.partitions = newPartitions.stream().map(tp -> LogPartition.of(this.resolver.getName(tp.topic()), tp.partition())).collect(Collectors.toList());
        this.topicPartitions = newPartitions;
        this.id = KafkaLogTailer.buildId(this.group, this.partitions);
        this.cleanDuringRebalancing();
        this.isRebalanced = true;
        log.info((Object)String.format("Rebalance assigned: %s", this.partitions));
        if (this.listener != null) {
            this.listener.onPartitionsAssigned(this.partitions);
        }
    }

    public void onPartitionsLost(Collection<TopicPartition> partitions) {
        Collection lost = partitions.stream().map(tp -> LogPartition.of(this.resolver.getName(tp.topic()), tp.partition())).collect(Collectors.toList());
        log.warn((Object)String.format("Rebalance Partition Lost: %s", lost));
        this.id = this.id + "-lost";
        this.cleanDuringRebalancing();
        this.isLost = true;
        if (this.listener != null) {
            this.listener.onPartitionsLost(lost);
        }
    }

    protected void cleanDuringRebalancing() {
        this.lastCommittedOffsets.clear();
        this.lastOffsets.clear();
        this.records.clear();
        this.isLost = false;
        this.isRevoked = false;
        this.isRebalanced = false;
    }
}

