/*
 * Decompiled with CFR 0.152.
 */
package kafka.durability.topic;

import com.typesafe.scalalogging.Logger;
import java.io.Serializable;
import java.time.Duration;
import java.util.Collection;
import java.util.Properties;
import kafka.durability.db.DurabilityDB;
import kafka.durability.materialization.DurabilityEventsMaterialize;
import kafka.durability.topic.DurabilityTopicClient$;
import kafka.durability.topic.DurabilityTopicConfig;
import kafka.utils.Logging;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.KafkaThread;
import scala.Function0;
import scala.Function1;
import scala.Predef$;
import scala.collection.Seq;
import scala.collection.TraversableOnce;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.Set;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;

@ScalaSignature(bytes="\u0006\u0001\u0005\u0015e\u0001B\r\u001b\u0001\u0005B\u0001B\u000e\u0001\u0003\u0006\u0004%\ta\u000e\u0005\ty\u0001\u0011\t\u0011)A\u0005q!AQ\b\u0001BC\u0002\u0013\u0005a\b\u0003\u0005E\u0001\t\u0005\t\u0015!\u0003@\u0011!)\u0005A!b\u0001\n\u00031\u0005\u0002C'\u0001\u0005\u0003\u0005\u000b\u0011B$\t\u000b9\u0003A\u0011A(\t\u000fQ\u0003\u0001\u0019!C\u0001+\"9\u0011\f\u0001a\u0001\n\u0003Q\u0006B\u00021\u0001A\u0003&a\u000bC\u0004f\u0001\u0001\u0007I\u0011A+\t\u000f\u0019\u0004\u0001\u0019!C\u0001O\"1\u0011\u000e\u0001Q!\nYCqa\u001b\u0001C\u0002\u0013%A\u000e\u0003\u0004z\u0001\u0001\u0006I!\u001c\u0005\u0006u\u0002!\t!\u0016\u0005\u0006w\u0002!\t\u0001 \u0005\t{\u0002A)\u0019!C\u0001}\"1\u0011\u0011\u0004\u0001\u0005BqDq!a\u0007\u0001\t\u0003\ti\u0002\u0003\u0005\u0002>\u0001!\tAGA \u0011)\t)\u0005AI\u0001\n\u0003Q\u0012q\t\u0005\t\u0003;\u0002A\u0011\u0001\u000e\u0002`!A\u0011q\u000f\u0001\u0005\u0002i\tIHA\fEkJ\f'-\u001b7jif$v\u000e]5d\u0007>t7/^7fe*\u00111\u0004H\u0001\u0006i>\u0004\u0018n\u0019\u0006\u0003;y\t!\u0002Z;sC\nLG.\u001b;z\u0015\u0005y\u0012!B6bM.\f7\u0001A\n\u0005\u0001\tBc\u0006\u0005\u0002$M5\tAEC\u0001&\u0003\u0015\u00198-\u00197b\u0013\t9CE\u0001\u0004B]f\u0014VM\u001a\t\u0003S1j\u0011A\u000b\u0006\u0003Wy\tQ!\u001e;jYNL!!\f\u0016\u0003\u000f1{wmZ5oOB\u0011q\u0006N\u0007\u0002a)\u0011\u0011GM\u0001\u0005Y\u0006twMC\u00014\u0003\u0011Q\u0017M^1\n\u0005U\u0002$\u0001\u0003*v]:\f'\r\\3\u0002\r\r|gNZ5h+\u0005A\u0004CA\u001d;\u001b\u0005Q\u0012BA\u001e\u001b\u0005U!UO]1cS2LG/\u001f+pa&\u001c7i\u001c8gS\u001e\fqaY8oM&<\u0007%\u0001\u0002eEV\tq\b\u0005\u0002A\u00056\t\u0011I\u0003\u0002>9%\u00111)\u0011\u0002\r\tV\u0014\u0018MY5mSRLHIQ\u0001\u0004I\n\u0004\u0013aC7bi\u0016\u0014\u0018.\u00197ju\u0016,\u0012a\u0012\t\u0003\u0011.k\u0011!\u0013\u0006\u0003\u0015r\tq\"\\1uKJL\u0017\r\\5{CRLwN\\\u0005\u0003\u0019&\u00131\u0004R;sC\nLG.\u001b;z\u000bZ,g\u000e^:NCR,'/[1mSj,\u0017\u0001D7bi\u0016\u0014\u0018.\u00197ju\u0016\u0004\u0013A\u0002\u001fj]&$h\b\u0006\u0003Q#J\u001b\u0006CA\u001d\u0001\u0011\u00151t\u00011\u00019\u0011\u0015it\u00011\u0001@\u0011\u0015)u\u00011\u0001H\u0003\u0015\u0011X-\u00193z+\u00051\u0006CA\u0012X\u0013\tAFEA\u0004C_>dW-\u00198\u0002\u0013I,\u0017\rZ=`I\u0015\fHCA._!\t\u0019C,\u0003\u0002^I\t!QK\\5u\u0011\u001dy\u0016\"!AA\u0002Y\u000b1\u0001\u001f\u00132\u0003\u0019\u0011X-\u00193zA!\u0012!B\u0019\t\u0003G\rL!\u0001\u001a\u0013\u0003\u0011Y|G.\u0019;jY\u0016\f!\u0002Z8TQV$Hm\\<o\u00039!wn\u00155vi\u0012|wO\\0%KF$\"a\u00175\t\u000f}c\u0011\u0011!a\u0001-\u0006YAm\\*ikR$wn\u001e8!Q\ti!-\u0001\bd_:\u001cX/\\3s)\"\u0014X-\u00193\u0016\u00035\u0004\"A\\<\u000e\u0003=T!a\u000b9\u000b\u0005E\u0014\u0018AB2p[6|gN\u0003\u0002 g*\u0011A/^\u0001\u0007CB\f7\r[3\u000b\u0003Y\f1a\u001c:h\u0013\tAxNA\u0006LC\u001a\\\u0017\r\u00165sK\u0006$\u0017aD2p]N,X.\u001a:UQJ,\u0017\r\u001a\u0011\u0002\u000f%\u001c(+Z1es\u0006A1\u000f[;uI><h\u000eF\u0001\\\u0003!\u0019wN\\:v[\u0016\u0014X#A@\u0011\u0011\u0005\u0005\u0011\u0011BA\u0007\u0003\u001bi!!a\u0001\u000b\u0007u\f)AC\u0002\u0002\bI\fqa\u00197jK:$8/\u0003\u0003\u0002\f\u0005\r!\u0001C\"p]N,X.\u001a:\u0011\u000b\r\ny!a\u0005\n\u0007\u0005EAEA\u0003BeJ\f\u0017\u0010E\u0002$\u0003+I1!a\u0006%\u0005\u0011\u0011\u0015\u0010^3\u0002\u0007I,h.\u0001\u0006qCJ$\u0018\u000e^5p]N$\"!a\b\u0011\r\u0005\u0005\u0012qFA\u001b\u001d\u0011\t\u0019#a\u000b\u0011\u0007\u0005\u0015B%\u0004\u0002\u0002()\u0019\u0011\u0011\u0006\u0011\u0002\rq\u0012xn\u001c;?\u0013\r\ti\u0003J\u0001\u0007!J,G-\u001a4\n\t\u0005E\u00121\u0007\u0002\u0004'\u0016$(bAA\u0017IA!\u0011qGA\u001d\u001b\u0005\u0001\u0018bAA\u001ea\nqAk\u001c9jGB\u000b'\u000f^5uS>t\u0017!D:uCJ$8i\u001c8tk6,'\u000fF\u0002\\\u0003\u0003B\u0001\"a\u0011\u0016!\u0003\u0005\rAV\u0001\u0013gR\f'\u000f^\"p]N,X.\u001a+ie\u0016\fG-A\fti\u0006\u0014HoQ8ogVlWM\u001d\u0013eK\u001a\fW\u000f\u001c;%cU\u0011\u0011\u0011\n\u0016\u0004-\u0006-3FAA'!\u0011\ty%!\u0017\u000e\u0005\u0005E#\u0002BA*\u0003+\n\u0011\"\u001e8dQ\u0016\u001c7.\u001a3\u000b\u0007\u0005]C%\u0001\u0006b]:|G/\u0019;j_:LA!a\u0017\u0002R\t\tRO\\2iK\u000e\\W\r\u001a,be&\fgnY3\u0002\u0015A\u0014x\u000e]3si&,7\u000f\u0006\u0003\u0002b\u00055\u0004\u0003BA2\u0003Sj!!!\u001a\u000b\u0007\u0005\u001d$'\u0001\u0003vi&d\u0017\u0002BA6\u0003K\u0012!\u0002\u0015:pa\u0016\u0014H/[3t\u0011\u001d\tyg\u0006a\u0001\u0003c\n\u0001b\u00197jK:$\u0018\n\u001a\t\u0005\u0003C\t\u0019(\u0003\u0003\u0002v\u0005M\"AB*ue&tw-\u0001\bqe>\u001cWm]:SK\u000e|'\u000fZ:\u0015\u0007m\u000bY\bC\u0004\u0002~a\u0001\r!a \u0002\u000fI,7m\u001c:egBA\u0011\u0011AAA\u0003\u001b\ti!\u0003\u0003\u0002\u0004\u0006\r!aD\"p]N,X.\u001a:SK\u000e|'\u000fZ:")
public class DurabilityTopicConsumer
implements Logging,
Runnable {
    private Consumer<byte[], byte[]> consumer;
    private final DurabilityTopicConfig config;
    private final DurabilityDB db;
    private final DurabilityEventsMaterialize materialize;
    private volatile boolean ready;
    private volatile boolean doShutdown;
    private final KafkaThread consumerThread;
    private Logger logger;
    private String logIdent;
    private volatile byte bitmap$0;

    @Override
    public String loggerName() {
        return Logging.loggerName$(this);
    }

    @Override
    public String msgWithLogIdent(String msg) {
        return Logging.msgWithLogIdent$(this, msg);
    }

    @Override
    public void trace(Function0<String> msg) {
        Logging.trace$(this, msg);
    }

    @Override
    public void trace(Function0<String> msg, Function0<Throwable> e) {
        Logging.trace$(this, msg, e);
    }

    @Override
    public boolean isDebugEnabled() {
        return Logging.isDebugEnabled$(this);
    }

    @Override
    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    @Override
    public void debug(Function0<String> msg) {
        Logging.debug$(this, msg);
    }

    @Override
    public void debug(Function0<String> msg, Function0<Throwable> e) {
        Logging.debug$(this, msg, e);
    }

    @Override
    public void info(Function0<String> msg) {
        Logging.info$(this, msg);
    }

    @Override
    public void info(Function0<String> msg, Function0<Throwable> e) {
        Logging.info$(this, msg, e);
    }

    @Override
    public void warn(Function0<String> msg) {
        Logging.warn$(this, msg);
    }

    @Override
    public void warn(Function0<String> msg, Function0<Throwable> e) {
        Logging.warn$(this, msg, e);
    }

    @Override
    public void error(Function0<String> msg) {
        Logging.error$(this, msg);
    }

    @Override
    public void error(Function0<String> msg, Function0<Throwable> e) {
        Logging.error$(this, msg, e);
    }

    @Override
    public void fatal(Function0<String> msg) {
        Logging.fatal$(this, msg);
    }

    @Override
    public void fatal(Function0<String> msg, Function0<Throwable> e) {
        Logging.fatal$(this, msg, e);
    }

    private Logger logger$lzycompute() {
        synchronized (this) {
            if ((byte)(this.bitmap$0 & 2) == 0) {
                this.logger = Logging.logger$(this);
                this.bitmap$0 = (byte)(this.bitmap$0 | 2);
            }
        }
        return this.logger;
    }

    @Override
    public Logger logger() {
        if ((byte)(this.bitmap$0 & 2) == 0) {
            return this.logger$lzycompute();
        }
        return this.logger;
    }

    @Override
    public String logIdent() {
        return this.logIdent;
    }

    @Override
    public void logIdent_$eq(String x$1) {
        this.logIdent = x$1;
    }

    public DurabilityTopicConfig config() {
        return this.config;
    }

    public DurabilityDB db() {
        return this.db;
    }

    public DurabilityEventsMaterialize materialize() {
        return this.materialize;
    }

    public boolean ready() {
        return this.ready;
    }

    public void ready_$eq(boolean x$1) {
        this.ready = x$1;
    }

    public boolean doShutdown() {
        return this.doShutdown;
    }

    public void doShutdown_$eq(boolean x$1) {
        this.doShutdown = x$1;
    }

    private KafkaThread consumerThread() {
        return this.consumerThread;
    }

    public boolean isReady() {
        return this.ready();
    }

    public synchronized void shutdown() {
        if (this.ready()) {
            this.doShutdown_$eq(true);
            this.consumer().wakeup();
            try {
                this.consumerThread().join();
                return;
            }
            catch (Exception ex) {
                this.error((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "DurabilityTopicConsumer shutdown interrupted, logging", (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> ex);
                return;
            }
        }
    }

    private Consumer<byte[], byte[]> consumer$lzycompute() {
        synchronized (this) {
            if ((byte)(this.bitmap$0 & 1) == 0) {
                String clientId = DurabilityTopicClient$.MODULE$.clientId(this.config().clusterId(), this.config().brokerId(), 0);
                this.consumer = new KafkaConsumer(this.properties(clientId));
                this.bitmap$0 = (byte)(this.bitmap$0 | 1);
            }
        }
        return this.consumer;
    }

    public Consumer<byte[], byte[]> consumer() {
        if ((byte)(this.bitmap$0 & 1) == 0) {
            return this.consumer$lzycompute();
        }
        return this.consumer;
    }

    @Override
    public void run() {
        try {
            try {
                while (!this.doShutdown()) {
                    ConsumerRecords records = this.consumer().poll(Duration.ofMillis(this.config().pollDurationMs()));
                    if (records == null) continue;
                    this.processRecords((ConsumerRecords<byte[], byte[]>)records);
                }
            }
            catch (Exception e) {
                if (this.doShutdown()) {
                    this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Exception caught during shutdown", (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> e);
                } else {
                    this.error((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Exception in TierTopicConsumer", (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> e);
                }
            }
        }
        finally {
            this.ready_$eq(false);
            this.consumer().close();
        }
    }

    public Set<TopicPartition> partitions() {
        return ((TraversableOnce)RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(0), this.config().configuredNumPartitions() - 1).map((Function1 & Serializable & scala.Serializable)x -> DurabilityTopicConsumer.$anonfun$partitions$1(this, BoxesRunTime.unboxToInt((Object)x)), IndexedSeq$.MODULE$.canBuildFrom())).toSet();
    }

    public void startConsumer(boolean startConsumeThread) {
        Set<TopicPartition> durabilityTopicPartitions = this.partitions();
        this.consumer().assign((Collection)CollectionConverters$.MODULE$.seqAsJavaListConverter((Seq)durabilityTopicPartitions.toList()).asJava());
        Seq<Object> offsets = this.db().getDurabilityTopicPartitionOffsets();
        durabilityTopicPartitions.foreach((Function1 & Serializable & scala.Serializable)topicPartition -> {
            DurabilityTopicConsumer.$anonfun$startConsumer$1(this, offsets, topicPartition);
            return BoxedUnit.UNIT;
        });
        if (startConsumeThread) {
            this.consumerThread().start();
        }
        this.ready_$eq(true);
    }

    public boolean startConsumer$default$1() {
        return true;
    }

    /*
     * WARNING - void declaration
     */
    public Properties properties(String clientId) {
        void var2_2;
        Properties properties = new Properties();
        this.config().interBrokerClientConfigs().get().entrySet().forEach(x -> properties.put(x.getKey(), x.getValue()));
        properties.put("auto.offset.reset", "earliest");
        properties.put("client.id", clientId);
        properties.put("enable.auto.commit", "false");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.remove("metric.reporters");
        return var2_2;
    }

    public void processRecords(ConsumerRecords<byte[], byte[]> records) {
        records.forEach(record -> {
            try {
                this.materialize().materialize((ConsumerRecord<byte[], byte[]>)record);
                this.db().updateDurabilityTopicPartitionOffset(record.partition(), record.offset() + 1L);
                return;
            }
            catch (Exception ex) {
                this.error((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(57).append("Exception during durability materialization of ").append(record).append(", ignoring").toString(), (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> ex);
                return;
            }
        });
    }

    public static final /* synthetic */ TopicPartition $anonfun$partitions$1(DurabilityTopicConsumer $this, int x) {
        return new TopicPartition($this.config().topicName(), x);
    }

    public static final /* synthetic */ void $anonfun$startConsumer$1(DurabilityTopicConsumer $this, Seq offsets$1, TopicPartition topicPartition) {
        $this.info((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(53).append("seeking durability consumer for partition ").append(topicPartition.partition()).append(" to offset ").append(offsets$1.apply(topicPartition.partition())).toString());
        $this.consumer().seek(topicPartition, BoxesRunTime.unboxToLong((Object)offsets$1.apply(topicPartition.partition())));
    }

    public DurabilityTopicConsumer(DurabilityTopicConfig config, DurabilityDB db, DurabilityEventsMaterialize materialize) {
        this.config = config;
        this.db = db;
        this.materialize = materialize;
        Logging.$init$(this);
        this.ready = false;
        this.doShutdown = false;
        this.consumerThread = new KafkaThread("DurabilityTopicConsumer", (Runnable)this, false);
    }
}

