package org.nuxeo.lib.stream.log.kafka;

import java.io.ByteArrayOutputStream;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.time.Duration;
import java.util.Collections;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Bytes;
import org.nuxeo.lib.stream.log.LogOffset;
import org.nuxeo.lib.stream.log.internals.CloseableLogAppender;
import org.nuxeo.lib.stream.log.internals.LogOffsetImpl;

/* loaded from: input_file:org/nuxeo/lib/stream/log/kafka/KafkaLogAppender.class */
public class KafkaLogAppender<M extends Externalizable> implements CloseableLogAppender<M> {
    private static final Log log = LogFactory.getLog(KafkaLogAppender.class);
    protected final String topic;
    protected final Properties consumerProps;
    protected final Properties producerProps;
    protected final int size;
    protected final ConcurrentLinkedQueue<KafkaLogTailer<M>> tailers = new ConcurrentLinkedQueue<>();
    protected final String name;
    protected final KafkaNamespace ns;
    protected KafkaProducer<String, Bytes> producer;
    protected boolean closed;

    private KafkaLogAppender(KafkaNamespace kafkaNamespace, String str, Properties properties, Properties properties2) {
        this.ns = kafkaNamespace;
        this.topic = kafkaNamespace.getTopicName(str);
        this.name = str;
        this.producerProps = properties;
        this.consumerProps = properties2;
        this.producer = new KafkaProducer<>(this.producerProps);
        this.size = this.producer.partitionsFor(this.topic).size();
        if (log.isDebugEnabled()) {
            log.debug(String.format("Created appender: %s on topic: %s with %d partitions", str, this.topic, Integer.valueOf(this.size)));
        }
    }

    public static <M extends Externalizable> KafkaLogAppender<M> open(KafkaNamespace kafkaNamespace, String str, Properties properties, Properties properties2) {
        return new KafkaLogAppender<>(kafkaNamespace, str, properties, properties2);
    }

    @Override // org.nuxeo.lib.stream.log.LogAppender
    public String name() {
        return this.name;
    }

    public String getTopic() {
        return this.topic;
    }

    @Override // org.nuxeo.lib.stream.log.LogAppender
    public int size() {
        return this.size;
    }

    @Override // org.nuxeo.lib.stream.log.LogAppender
    public LogOffset append(String str, M m) {
        Objects.requireNonNull(str);
        return append((str.hashCode() & Integer.MAX_VALUE) % this.size, str, m);
    }

    @Override // org.nuxeo.lib.stream.log.LogAppender
    public LogOffset append(int i, Externalizable externalizable) {
        return append(i, String.valueOf(i), externalizable);
    }

    public LogOffset append(int i, String str, Externalizable externalizable) {
        ProducerRecord producerRecord = new ProducerRecord(this.topic, Integer.valueOf(i), str, Bytes.wrap(messageAsByteArray(externalizable)));
        try {
            LogOffsetImpl logOffsetImpl = new LogOffsetImpl(this.name, i, ((RecordMetadata) this.producer.send(producerRecord).get()).offset());
            if (log.isDebugEnabled()) {
                log.debug(String.format("append to %s-%02d:+%d, len: %d, key: %s, value: %s", this.name, Integer.valueOf(i), Long.valueOf(logOffsetImpl.offset()), Integer.valueOf(((Bytes) producerRecord.value()).get().length), str, externalizable));
            }
            return logOffsetImpl;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("Unable to send record: " + producerRecord, e);
        } catch (ExecutionException e2) {
            throw new RuntimeException("Unable to send record: " + producerRecord, e2);
        }
    }

    protected byte[] messageAsByteArray(Externalizable externalizable) {
        try {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            Throwable th = null;
            try {
                try {
                    ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);
                    objectOutputStream.writeObject(externalizable);
                    objectOutputStream.flush();
                    byte[] byteArray = byteArrayOutputStream.toByteArray();
                    if (byteArrayOutputStream != null) {
                        if (0 != 0) {
                            try {
                                byteArrayOutputStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            byteArrayOutputStream.close();
                        }
                    }
                    return byteArray;
                } finally {
                }
            } finally {
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // org.nuxeo.lib.stream.log.LogAppender
    public boolean waitFor(LogOffset logOffset, String str, Duration duration) throws InterruptedException {
        if (!this.name.equals(logOffset.partition().name())) {
            throw new IllegalArgumentException(this.name + " can not wait for an offset with a different Log: " + logOffset);
        }
        TopicPartition topicPartition = new TopicPartition(this.topic, logOffset.partition().partition());
        try {
            boolean isProcessed = isProcessed(str, topicPartition, logOffset.offset());
            if (isProcessed) {
                if (log.isDebugEnabled()) {
                    log.debug("waitFor " + logOffset + "/" + str + " returns: " + isProcessed);
                }
                return true;
            }
            long millis = duration.toMillis();
            long currentTimeMillis = System.currentTimeMillis() + millis;
            long min = Math.min(100L, millis);
            while (!isProcessed && System.currentTimeMillis() < currentTimeMillis) {
                Thread.sleep(min);
                isProcessed = isProcessed(str, topicPartition, logOffset.offset());
            }
            boolean z = isProcessed;
            if (log.isDebugEnabled()) {
                log.debug("waitFor " + logOffset + "/" + str + " returns: " + isProcessed);
            }
            return z;
        } catch (Throwable th) {
            if (log.isDebugEnabled()) {
                log.debug("waitFor " + logOffset + "/" + str + " returns: false");
            }
            throw th;
        }
    }

    @Override // org.nuxeo.lib.stream.log.LogAppender
    public boolean closed() {
        return this.closed;
    }

    protected boolean isProcessed(String str, TopicPartition topicPartition, long j) {
        Properties properties = (Properties) this.consumerProps.clone();
        properties.put("group.id", this.ns.getKafkaGroup(str));
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
        Throwable th = null;
        try {
            try {
                kafkaConsumer.assign(Collections.singletonList(topicPartition));
                long position = kafkaConsumer.position(topicPartition);
                boolean z = position > 0 && position > j;
                if (log.isDebugEnabled()) {
                    log.debug("isProcessed " + topicPartition.topic() + ":" + topicPartition.partition() + "/" + str + ":+" + j + "? " + z + ", current position: " + position);
                }
                if (kafkaConsumer != null) {
                    if (0 != 0) {
                        try {
                            kafkaConsumer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        kafkaConsumer.close();
                    }
                }
                return z;
            } finally {
            }
        } catch (Throwable th3) {
            if (kafkaConsumer != null) {
                if (th != null) {
                    try {
                        kafkaConsumer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaConsumer.close();
                }
            }
            throw th3;
        }
    }

    @Override // org.nuxeo.lib.stream.log.internals.CloseableLogAppender, java.lang.AutoCloseable
    public void close() {
        log.debug("Closing appender: " + this.name);
        this.tailers.stream().filter((v0) -> {
            return Objects.nonNull(v0);
        }).forEach(kafkaLogTailer -> {
            try {
                kafkaLogTailer.close();
            } catch (Exception e) {
                log.error("Failed to close tailer: " + kafkaLogTailer);
            }
        });
        this.tailers.clear();
        if (this.producer != null) {
            this.producer.close();
            this.producer = null;
        }
        this.closed = true;
    }
}
