package org.nuxeo.lib.stream.log.kafka;

import java.io.ByteArrayInputStream;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Queue;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.utils.Bytes;
import org.nuxeo.lib.stream.log.LogOffset;
import org.nuxeo.lib.stream.log.LogPartition;
import org.nuxeo.lib.stream.log.LogRecord;
import org.nuxeo.lib.stream.log.LogTailer;
import org.nuxeo.lib.stream.log.RebalanceException;
import org.nuxeo.lib.stream.log.RebalanceListener;
import org.nuxeo.lib.stream.log.internals.LogOffsetImpl;

/* loaded from: input_file:org/nuxeo/lib/stream/log/kafka/KafkaLogTailer.class */
public class KafkaLogTailer<M extends Externalizable> implements LogTailer<M>, ConsumerRebalanceListener {
    private static final Log log = LogFactory.getLog(KafkaLogTailer.class);
    protected final String group;
    protected final Map<TopicPartition, Long> lastOffsets = new HashMap();
    protected final Map<TopicPartition, Long> lastCommittedOffsets = new HashMap();
    protected final Queue<ConsumerRecord<String, Bytes>> records = new LinkedList();
    protected final KafkaNamespace ns;
    protected KafkaConsumer<String, Bytes> consumer;
    protected String id;
    protected Collection<TopicPartition> topicPartitions;
    protected Collection<LogPartition> partitions;
    protected boolean closed;
    protected Collection<String> names;
    protected RebalanceListener listener;
    protected boolean isRebalanced;
    protected boolean consumerMoved;

    protected KafkaLogTailer(KafkaNamespace kafkaNamespace, String str, Properties properties) {
        Objects.requireNonNull(str);
        this.ns = kafkaNamespace;
        this.group = str;
        properties.put("group.id", kafkaNamespace.getKafkaGroup(str));
        this.consumer = new KafkaConsumer<>(properties);
    }

    public static <M extends Externalizable> KafkaLogTailer<M> createAndAssign(KafkaNamespace kafkaNamespace, Collection<LogPartition> collection, String str, Properties properties) {
        KafkaLogTailer<M> kafkaLogTailer = new KafkaLogTailer<>(kafkaNamespace, str, properties);
        kafkaLogTailer.id = buildId(kafkaLogTailer.group, collection);
        kafkaLogTailer.partitions = collection;
        kafkaLogTailer.topicPartitions = (Collection) collection.stream().map(logPartition -> {
            return new TopicPartition(kafkaNamespace.getTopicName(logPartition.name()), logPartition.partition());
        }).collect(Collectors.toList());
        kafkaLogTailer.consumer.assign(kafkaLogTailer.topicPartitions);
        log.debug(String.format("Created tailer with assignments: %s using prefix: %s", kafkaLogTailer.id, kafkaNamespace));
        return kafkaLogTailer;
    }

    public static <M extends Externalizable> KafkaLogTailer<M> createAndSubscribe(KafkaNamespace kafkaNamespace, Collection<String> collection, String str, Properties properties, RebalanceListener rebalanceListener) {
        KafkaLogTailer<M> kafkaLogTailer = new KafkaLogTailer<>(kafkaNamespace, str, properties);
        kafkaLogTailer.id = buildSubscribeId(kafkaLogTailer.group, collection);
        kafkaLogTailer.names = collection;
        Stream<String> stream = collection.stream();
        kafkaNamespace.getClass();
        Collection collection2 = (Collection) stream.map(kafkaNamespace::getTopicName).collect(Collectors.toList());
        kafkaLogTailer.listener = rebalanceListener;
        kafkaLogTailer.consumer.subscribe(collection2, kafkaLogTailer);
        kafkaLogTailer.partitions = Collections.emptyList();
        kafkaLogTailer.topicPartitions = Collections.emptyList();
        log.debug(String.format("Created tailer with subscription: %s using prefix: %s", kafkaLogTailer.id, kafkaNamespace));
        return kafkaLogTailer;
    }

    protected static String buildId(String str, Collection<LogPartition> collection) {
        return str + ":" + ((String) collection.stream().map((v0) -> {
            return v0.toString();
        }).collect(Collectors.joining("|")));
    }

    protected static String buildSubscribeId(String str, Collection<String> collection) {
        return str + ":" + ((String) collection.stream().collect(Collectors.joining("|")));
    }

    @Override // org.nuxeo.lib.stream.log.LogTailer
    public LogRecord<M> read(Duration duration) throws InterruptedException {
        if (this.closed) {
            throw new IllegalStateException("The tailer has been closed.");
        }
        if (this.records.isEmpty()) {
            int poll = poll(duration);
            if (this.isRebalanced) {
                this.isRebalanced = false;
                log.debug("Rebalance happens during poll, raising exception");
                throw new RebalanceException("Partitions has been rebalanced");
            }
            if (poll == 0) {
                if (!log.isTraceEnabled()) {
                    return null;
                }
                log.trace("No data " + this.id + " after " + duration.toMillis() + " ms");
                return null;
            }
        }
        ConsumerRecord<String, Bytes> poll2 = this.records.poll();
        this.lastOffsets.put(new TopicPartition(poll2.topic(), poll2.partition()), Long.valueOf(poll2.offset()));
        M messageOf = messageOf((Bytes) poll2.value());
        LogOffsetImpl logOffsetImpl = new LogOffsetImpl(LogPartition.of(this.ns.getLogName(poll2.topic()), poll2.partition()), poll2.offset());
        this.consumerMoved = false;
        if (log.isDebugEnabled()) {
            log.debug(String.format("Read from %s/%s, key: %s, value: %s", logOffsetImpl, this.group, poll2.key(), messageOf));
        }
        return new LogRecord<>(messageOf, logOffsetImpl);
    }

    protected M messageOf(Bytes bytes) {
        ObjectInputStream objectInputStream = null;
        try {
            try {
                objectInputStream = new ObjectInputStream(new ByteArrayInputStream(bytes.get()));
                M m = (M) objectInputStream.readObject();
                if (objectInputStream != null) {
                    try {
                        objectInputStream.close();
                    } catch (IOException e) {
                    }
                }
                return m;
            } catch (IOException | ClassNotFoundException e2) {
                throw new RuntimeException(e2);
            }
        } catch (Throwable th) {
            if (objectInputStream != null) {
                try {
                    objectInputStream.close();
                } catch (IOException e3) {
                    throw th;
                }
            }
            throw th;
        }
    }

    protected int poll(Duration duration) throws InterruptedException {
        this.records.clear();
        try {
            Iterator it = this.consumer.poll(duration.toMillis()).iterator();
            while (it.hasNext()) {
                ConsumerRecord<String, Bytes> consumerRecord = (ConsumerRecord) it.next();
                if (log.isDebugEnabled() && this.records.isEmpty()) {
                    log.debug("Poll first record: " + this.ns.getLogName(consumerRecord.topic()) + ":" + consumerRecord.partition() + ":+" + consumerRecord.offset());
                }
                this.records.add(consumerRecord);
            }
            if (log.isDebugEnabled()) {
                String str = "Polling " + this.id + " returns " + this.records.size() + " records";
                if (this.records.size() > 0) {
                    log.debug(str);
                } else {
                    log.trace(str);
                }
            }
            return this.records.size();
        } catch (InterruptException e) {
            throw new InterruptedException(e.getMessage());
        } catch (WakeupException e2) {
            log.debug("Receiving wakeup from another thread to close the tailer");
            close();
            throw new IllegalStateException("poll interrupted because tailer has been closed");
        }
    }

    @Override // org.nuxeo.lib.stream.log.LogTailer
    public void toEnd() {
        log.debug("toEnd: " + this.id);
        this.lastOffsets.clear();
        this.records.clear();
        this.consumer.seekToEnd(Collections.emptyList());
        this.consumerMoved = true;
    }

    @Override // org.nuxeo.lib.stream.log.LogTailer
    public void toStart() {
        log.debug("toStart: " + this.id);
        this.lastOffsets.clear();
        this.records.clear();
        this.consumer.seekToBeginning(Collections.emptyList());
        this.consumerMoved = true;
    }

    @Override // org.nuxeo.lib.stream.log.LogTailer
    public void toLastCommitted() {
        if (log.isDebugEnabled()) {
            log.debug("toLastCommitted tailer: " + this.id);
        }
        String str = (String) this.consumer.assignment().stream().map(topicPartition -> {
            return String.format("%s-%02d:+%d", this.ns.getLogName(topicPartition.topic()), Integer.valueOf(topicPartition.partition()), Long.valueOf(toLastCommitted(topicPartition)));
        }).collect(Collectors.joining("|"));
        if (str.length() > 0 && log.isInfoEnabled()) {
            log.info("toLastCommitted offsets: " + this.group + ":" + str);
        }
        this.lastOffsets.clear();
        this.records.clear();
        this.consumerMoved = false;
    }

    protected long toLastCommitted(TopicPartition topicPartition) {
        OffsetAndMetadata committed;
        Long l = this.lastCommittedOffsets.get(topicPartition);
        if (l == null && (committed = this.consumer.committed(topicPartition)) != null) {
            l = Long.valueOf(committed.offset());
        }
        if (l != null) {
            this.consumer.seek(topicPartition, l.longValue());
        } else {
            this.consumer.seekToBeginning(Collections.singletonList(topicPartition));
            l = Long.valueOf(this.consumer.position(topicPartition));
        }
        this.lastCommittedOffsets.put(topicPartition, l);
        if (log.isDebugEnabled()) {
            log.debug(String.format(" toLastCommitted: %s-%02d:+%d", this.ns.getLogName(topicPartition.topic()), Integer.valueOf(topicPartition.partition()), l));
        }
        return l.longValue();
    }

    @Override // org.nuxeo.lib.stream.log.LogTailer
    public void seek(LogOffset logOffset) {
        log.debug("Seek to: " + logOffset.offset() + " from tailer: " + this.id);
        TopicPartition topicPartition = new TopicPartition(this.ns.getTopicName(logOffset.partition().name()), logOffset.partition().partition());
        this.consumer.seek(topicPartition, logOffset.offset());
        this.lastOffsets.remove(topicPartition);
        int partition = topicPartition.partition();
        this.records.removeIf(consumerRecord -> {
            return consumerRecord.partition() == partition;
        });
        this.consumerMoved = true;
    }

    @Override // org.nuxeo.lib.stream.log.LogTailer
    public void reset() {
        log.info("Reset committed offsets for all assigned partitions: " + this.topicPartitions + " tailer: " + this.id);
        Map beginningOffsets = this.consumer.beginningOffsets(this.topicPartitions);
        HashMap hashMap = new HashMap();
        beginningOffsets.forEach((topicPartition, l) -> {
        });
        this.consumer.commitSync(hashMap);
        this.lastCommittedOffsets.clear();
        toLastCommitted();
    }

    @Override // org.nuxeo.lib.stream.log.LogTailer
    public void reset(LogPartition logPartition) {
        log.info("Reset committed offset for partition: " + logPartition + " tailer: " + this.id);
        TopicPartition topicPartition = new TopicPartition(this.ns.getTopicName(logPartition.name()), logPartition.partition());
        Map beginningOffsets = this.consumer.beginningOffsets(Collections.singleton(topicPartition));
        HashMap hashMap = new HashMap();
        beginningOffsets.forEach((topicPartition2, l) -> {
        });
        this.consumer.commitSync(hashMap);
        this.lastCommittedOffsets.remove(topicPartition);
        seek(new LogOffsetImpl(logPartition, ((Long) beginningOffsets.get(topicPartition)).longValue()));
    }

    @Override // org.nuxeo.lib.stream.log.LogTailer
    public LogOffset offsetForTimestamp(LogPartition logPartition, long j) {
        OffsetAndTimestamp offsetAndTimestamp;
        TopicPartition topicPartition = new TopicPartition(this.ns.getTopicName(logPartition.name()), logPartition.partition());
        Map offsetsForTimes = this.consumer.offsetsForTimes(Collections.singletonMap(topicPartition, Long.valueOf(j)));
        if (offsetsForTimes.size() != 1 || (offsetAndTimestamp = (OffsetAndTimestamp) offsetsForTimes.get(topicPartition)) == null) {
            return null;
        }
        return new LogOffsetImpl(logPartition, offsetAndTimestamp.offset());
    }

    @Override // org.nuxeo.lib.stream.log.LogTailer
    public void commit() {
        if (this.consumerMoved) {
            forceCommit();
            return;
        }
        HashMap hashMap = new HashMap();
        this.lastOffsets.forEach((topicPartition, l) -> {
        });
        this.lastOffsets.clear();
        if (hashMap.isEmpty()) {
            return;
        }
        this.consumer.commitSync(hashMap);
        hashMap.forEach((topicPartition2, offsetAndMetadata) -> {
            this.lastCommittedOffsets.put(topicPartition2, Long.valueOf(offsetAndMetadata.offset()));
        });
        if (log.isDebugEnabled()) {
            log.debug("Committed offsets  " + this.group + ":" + ((String) hashMap.entrySet().stream().map(entry -> {
                return String.format("%s-%02d:+%d", this.ns.getLogName(((TopicPartition) entry.getKey()).topic()), Integer.valueOf(((TopicPartition) entry.getKey()).partition()), Long.valueOf(((OffsetAndMetadata) entry.getValue()).offset()));
            }).collect(Collectors.joining("|"))));
        }
    }

    protected void forceCommit() {
        log.info("Force commit after a move");
        Map map = (Map) this.topicPartitions.stream().collect(Collectors.toMap(topicPartition -> {
            return topicPartition;
        }, topicPartition2 -> {
            return new OffsetAndMetadata(this.consumer.position(topicPartition2));
        }));
        this.consumer.commitSync(map);
        map.forEach((topicPartition3, offsetAndMetadata) -> {
            this.lastCommittedOffsets.put(topicPartition3, Long.valueOf(offsetAndMetadata.offset()));
        });
        this.consumerMoved = false;
        this.lastOffsets.clear();
    }

    @Override // org.nuxeo.lib.stream.log.LogTailer
    public LogOffset commit(LogPartition logPartition) {
        TopicPartition topicPartition = new TopicPartition(this.ns.getTopicName(logPartition.name()), logPartition.partition());
        Long l = this.lastOffsets.get(topicPartition);
        if (l == null) {
            if (!log.isDebugEnabled()) {
                return null;
            }
            log.debug("unchanged partition, nothing to commit: " + logPartition);
            return null;
        }
        Long valueOf = Long.valueOf(l.longValue() + 1);
        this.consumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(valueOf.longValue())));
        LogOffsetImpl logOffsetImpl = new LogOffsetImpl(logPartition, valueOf.longValue());
        if (log.isInfoEnabled()) {
            log.info("Committed: " + valueOf + "/" + this.group);
        }
        return logOffsetImpl;
    }

    @Override // org.nuxeo.lib.stream.log.LogTailer
    public Collection<LogPartition> assignments() {
        return this.partitions;
    }

    @Override // org.nuxeo.lib.stream.log.LogTailer
    public String group() {
        return this.group;
    }

    @Override // org.nuxeo.lib.stream.log.LogTailer
    public boolean closed() {
        return this.closed;
    }

    @Override // org.nuxeo.lib.stream.log.LogTailer, java.lang.AutoCloseable
    public void close() {
        if (this.consumer != null) {
            log.info("Closing tailer: " + this.id);
            try {
                this.consumer.close();
            } catch (ConcurrentModificationException e) {
                log.info("Closing tailer from another thread, send wakeup");
                this.consumer.wakeup();
                return;
            } catch (InterruptException | IllegalStateException e2) {
                log.warn("Discard error while closing consumer: ", e2);
            } catch (Throwable th) {
                log.error("interrupted", th);
            }
            this.consumer = null;
        }
        this.closed = true;
    }

    public String toString() {
        return "KafkaLogTailer{ns='" + this.ns + "', id=" + this.id + ", closed=" + this.closed + '}';
    }

    public void onPartitionsRevoked(Collection<TopicPartition> collection) {
        Collection<LogPartition> collection2 = (Collection) collection.stream().map(topicPartition -> {
            return LogPartition.of(this.ns.getLogName(topicPartition.topic()), topicPartition.partition());
        }).collect(Collectors.toList());
        log.info(String.format("Rebalance revoked: %s", collection2));
        this.id += "-revoked";
        if (this.listener != null) {
            this.listener.onPartitionsRevoked(collection2);
        }
    }

    public void onPartitionsAssigned(Collection<TopicPartition> collection) {
        this.partitions = (Collection) collection.stream().map(topicPartition -> {
            return LogPartition.of(this.ns.getLogName(topicPartition.topic()), topicPartition.partition());
        }).collect(Collectors.toList());
        this.topicPartitions = collection;
        this.id = buildId(this.group, this.partitions);
        this.lastCommittedOffsets.clear();
        this.lastOffsets.clear();
        this.records.clear();
        this.isRebalanced = true;
        log.info(String.format("Rebalance assigned: %s", this.partitions));
        if (this.listener != null) {
            this.listener.onPartitionsAssigned(this.partitions);
        }
    }
}
