/*
 * Decompiled with CFR 0.152.
 */
package kafka.durability.topic;

import com.typesafe.scalalogging.Logger;
import java.io.Serializable;
import java.time.Duration;
import java.util.Collection;
import kafka.durability.db.DurabilityDB;
import kafka.durability.materialization.AbstractDurabilityEventsMaterialize;
import kafka.durability.topic.DurabilityTopicClient$;
import kafka.durability.topic.DurabilityTopicConfig;
import kafka.utils.Logging;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.KafkaThread;
import scala.Function0;
import scala.Function1;
import scala.collection.Seq;
import scala.collection.immutable.Set;
import scala.collection.immutable.Set$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

@ScalaSignature(bytes="\u0006\u0001\u0005\re\u0001B\u000e\u001d\u0001\rB\u0001\u0002\u000f\u0001\u0003\u0006\u0004%\t!\u000f\u0005\t}\u0001\u0011\t\u0011)A\u0005u!Aq\b\u0001BC\u0002\u0013\u0005\u0001\t\u0003\u0005G\u0001\t\u0005\t\u0015!\u0003B\u0011!9\u0005A!b\u0001\n\u0003A\u0005\u0002C(\u0001\u0005\u0003\u0005\u000b\u0011B%\t\u000bA\u0003A\u0011A)\t\u000fY\u0003\u0001\u0019!C\u0001/\"91\f\u0001a\u0001\n\u0003a\u0006B\u00022\u0001A\u0003&\u0001\fC\u0004h\u0001\u0001\u0007I\u0011A,\t\u000f!\u0004\u0001\u0019!C\u0001S\"11\u000e\u0001Q!\naCq!\u001c\u0001C\u0002\u0013%a\u000e\u0003\u0004|\u0001\u0001\u0006Ia\u001c\u0005\u0006y\u0002!\t% \u0005\u0007\u0003'\u0001A\u0011A,\t\u000f\u0005U\u0001\u0001\"\u0001\u0002\u0018!I\u0011\u0011\u0004\u0001C\u0002\u0013%\u00111\u0004\u0005\t\u0003C\u0001\u0001\u0015!\u0003\u0002\u001e!Q\u00111\u0005\u0001\t\u0006\u0004%\t!!\n\t\u000f\u0005\u0005\u0003\u0001\"\u0011\u0002\u0018!9\u00111\t\u0001\u0005\u0002\u0005\u0015\u0003\u0002CA+\u0001\u0011\u0005A$a\u0016\t\u0015\u0005u\u0003!%A\u0005\u0002q\ty\u0006\u0003\u0005\u0002v\u0001!\t\u0001HA<\u0005]!UO]1cS2LG/\u001f+pa&\u001c7i\u001c8tk6,'O\u0003\u0002\u001e=\u0005)Ao\u001c9jG*\u0011q\u0004I\u0001\u000bIV\u0014\u0018MY5mSRL(\"A\u0011\u0002\u000b-\fgm[1\u0004\u0001M!\u0001\u0001\n\u00161!\t)\u0003&D\u0001'\u0015\u00059\u0013!B:dC2\f\u0017BA\u0015'\u0005\u0019\te.\u001f*fMB\u00111FL\u0007\u0002Y)\u0011Q\u0006I\u0001\u0006kRLGn]\u0005\u0003_1\u0012q\u0001T8hO&tw\r\u0005\u00022m5\t!G\u0003\u00024i\u0005!A.\u00198h\u0015\u0005)\u0014\u0001\u00026bm\u0006L!a\u000e\u001a\u0003\u0011I+hN\\1cY\u0016\faaY8oM&<W#\u0001\u001e\u0011\u0005mbT\"\u0001\u000f\n\u0005ub\"!\u0006#ve\u0006\u0014\u0017\u000e\\5usR{\u0007/[2D_:4\u0017nZ\u0001\bG>tg-[4!\u0003\t!'-F\u0001B!\t\u0011E)D\u0001D\u0015\tyd$\u0003\u0002F\u0007\naA)\u001e:bE&d\u0017\u000e^=E\u0005\u0006\u0019AM\u0019\u0011\u0002\u00175\fG/\u001a:jC2L'0Z\u000b\u0002\u0013B\u0011!*T\u0007\u0002\u0017*\u0011AJH\u0001\u0010[\u0006$XM]5bY&T\u0018\r^5p]&\u0011aj\u0013\u0002$\u0003\n\u001cHO]1di\u0012+(/\u00192jY&$\u00180\u0012<f]R\u001cX*\u0019;fe&\fG.\u001b>f\u00031i\u0017\r^3sS\u0006d\u0017N_3!\u0003\u0019a\u0014N\\5u}Q!!k\u0015+V!\tY\u0004\u0001C\u00039\u000f\u0001\u0007!\bC\u0003@\u000f\u0001\u0007\u0011\tC\u0003H\u000f\u0001\u0007\u0011*A\u0003sK\u0006$\u00170F\u0001Y!\t)\u0013,\u0003\u0002[M\t9!i\\8mK\u0006t\u0017!\u0003:fC\u0012Lx\fJ3r)\ti\u0006\r\u0005\u0002&=&\u0011qL\n\u0002\u0005+:LG\u000fC\u0004b\u0013\u0005\u0005\t\u0019\u0001-\u0002\u0007a$\u0013'\u0001\u0004sK\u0006$\u0017\u0010\t\u0015\u0003\u0015\u0011\u0004\"!J3\n\u0005\u00194#\u0001\u0003<pY\u0006$\u0018\u000e\\3\u0002\u0015\u0011|7\u000b[;uI><h.\u0001\be_NCW\u000f\u001e3po:|F%Z9\u0015\u0005uS\u0007bB1\r\u0003\u0003\u0005\r\u0001W\u0001\fI>\u001c\u0006.\u001e;e_^t\u0007\u0005\u000b\u0002\u000eI\u0006q1m\u001c8tk6,'\u000f\u00165sK\u0006$W#A8\u0011\u0005ALX\"A9\u000b\u00055\u0012(BA:u\u0003\u0019\u0019w.\\7p]*\u0011\u0011%\u001e\u0006\u0003m^\fa!\u00199bG\",'\"\u0001=\u0002\u0007=\u0014x-\u0003\u0002{c\nY1*\u00194lCRC'/Z1e\u0003=\u0019wN\\:v[\u0016\u0014H\u000b\u001b:fC\u0012\u0004\u0013A\u00037pO\u001e,'OT1nKV\ta\u0010E\u0002\u0000\u0003\u001bqA!!\u0001\u0002\nA\u0019\u00111\u0001\u0014\u000e\u0005\u0005\u0015!bAA\u0004E\u00051AH]8pizJ1!a\u0003'\u0003\u0019\u0001&/\u001a3fM&!\u0011qBA\t\u0005\u0019\u0019FO]5oO*\u0019\u00111\u0002\u0014\u0002\u000f%\u001c(+Z1es\u0006A1\u000f[;uI><h\u000eF\u0001^\u0003)\u0019G.[3oiRK\b/Z\u000b\u0003\u0003;\u00012!MA\u0010\u0013\r\tyAM\u0001\fG2LWM\u001c;UsB,\u0007%\u0001\u0005d_:\u001cX/\\3s+\t\t9\u0003\u0005\u0005\u0002*\u0005E\u0012QGA\u001b\u001b\t\tYC\u0003\u0003\u0002$\u00055\"bAA\u0018i\u000691\r\\5f]R\u001c\u0018\u0002BA\u001a\u0003W\u0011\u0001bQ8ogVlWM\u001d\t\u0006K\u0005]\u00121H\u0005\u0004\u0003s1#!B!se\u0006L\bcA\u0013\u0002>%\u0019\u0011q\b\u0014\u0003\t\tKH/Z\u0001\u0004eVt\u0017A\u00039beRLG/[8ogR\u0011\u0011q\t\t\u0006\u007f\u0006%\u0013QJ\u0005\u0005\u0003\u0017\n\tBA\u0002TKR\u0004B!a\u0014\u0002R5\t!/C\u0002\u0002TI\u0014a\u0002V8qS\u000e\u0004\u0016M\u001d;ji&|g.A\u0007ti\u0006\u0014HoQ8ogVlWM\u001d\u000b\u0004;\u0006e\u0003\u0002CA.1A\u0005\t\u0019\u0001-\u0002%M$\u0018M\u001d;D_:\u001cX/\\3UQJ,\u0017\rZ\u0001\u0018gR\f'\u000f^\"p]N,X.\u001a:%I\u00164\u0017-\u001e7uIE*\"!!\u0019+\u0007a\u000b\u0019g\u000b\u0002\u0002fA!\u0011qMA9\u001b\t\tIG\u0003\u0003\u0002l\u00055\u0014!C;oG\",7m[3e\u0015\r\tyGJ\u0001\u000bC:tw\u000e^1uS>t\u0017\u0002BA:\u0003S\u0012\u0011#\u001e8dQ\u0016\u001c7.\u001a3WCJL\u0017M\\2f\u00039\u0001(o\\2fgN\u0014VmY8sIN$2!XA=\u0011\u001d\tYH\u0007a\u0001\u0003{\nqA]3d_J$7\u000f\u0005\u0005\u0002*\u0005}\u0014QGA\u001b\u0013\u0011\t\t)a\u000b\u0003\u001f\r{gn];nKJ\u0014VmY8sIN\u0004")
public class DurabilityTopicConsumer
implements Logging,
Runnable {
    private Consumer<byte[], byte[]> consumer;
    private final DurabilityTopicConfig config;
    private final DurabilityDB db;
    private final AbstractDurabilityEventsMaterialize materialize;
    private volatile boolean ready;
    private volatile boolean doShutdown;
    private final KafkaThread consumerThread;
    private final String clientType;
    private Logger logger;
    private String logIdent;
    private volatile byte bitmap$0;

    @Override
    public String msgWithLogIdent(String msg) {
        return Logging.msgWithLogIdent$(this, msg);
    }

    @Override
    public void trace(Function0<String> msg) {
        Logging.trace$(this, msg);
    }

    @Override
    public void trace(Function0<String> msg, Function0<Throwable> e) {
        Logging.trace$(this, msg, e);
    }

    @Override
    public boolean isDebugEnabled() {
        return Logging.isDebugEnabled$(this);
    }

    @Override
    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    @Override
    public void debug(Function0<String> msg) {
        Logging.debug$(this, msg);
    }

    @Override
    public void debug(Function0<String> msg, Function0<Throwable> e) {
        Logging.debug$(this, msg, e);
    }

    @Override
    public void info(Function0<String> msg) {
        Logging.info$(this, msg);
    }

    @Override
    public void info(Function0<String> msg, Function0<Throwable> e) {
        Logging.info$(this, msg, e);
    }

    @Override
    public void warn(Function0<String> msg) {
        Logging.warn$(this, msg);
    }

    @Override
    public void warn(Function0<String> msg, Function0<Throwable> e) {
        Logging.warn$(this, msg, e);
    }

    @Override
    public void error(Function0<String> msg) {
        Logging.error$(this, msg);
    }

    @Override
    public void error(Function0<String> msg, Function0<Throwable> e) {
        Logging.error$(this, msg, e);
    }

    @Override
    public void fatal(Function0<String> msg) {
        Logging.fatal$(this, msg);
    }

    @Override
    public void fatal(Function0<String> msg, Function0<Throwable> e) {
        Logging.fatal$(this, msg, e);
    }

    private Logger logger$lzycompute() {
        synchronized (this) {
            if ((byte)(this.bitmap$0 & 2) == 0) {
                this.logger = Logging.logger$(this);
                this.bitmap$0 = (byte)(this.bitmap$0 | 2);
            }
        }
        return this.logger;
    }

    @Override
    public Logger logger() {
        if ((byte)(this.bitmap$0 & 2) == 0) {
            return this.logger$lzycompute();
        }
        return this.logger;
    }

    @Override
    public String logIdent() {
        return this.logIdent;
    }

    @Override
    public void logIdent_$eq(String x$1) {
        this.logIdent = x$1;
    }

    public DurabilityTopicConfig config() {
        return this.config;
    }

    public DurabilityDB db() {
        return this.db;
    }

    public AbstractDurabilityEventsMaterialize materialize() {
        return this.materialize;
    }

    public boolean ready() {
        return this.ready;
    }

    public void ready_$eq(boolean x$1) {
        this.ready = x$1;
    }

    public boolean doShutdown() {
        return this.doShutdown;
    }

    public void doShutdown_$eq(boolean x$1) {
        this.doShutdown = x$1;
    }

    private KafkaThread consumerThread() {
        return this.consumerThread;
    }

    @Override
    public String loggerName() {
        return DurabilityTopicConsumer.class.getName();
    }

    public boolean isReady() {
        return this.ready();
    }

    public synchronized void shutdown() {
        if (this.ready()) {
            this.doShutdown_$eq(true);
            this.consumer().wakeup();
            try {
                this.consumerThread().join();
                return;
            }
            catch (Exception ex) {
                this.error((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "DurabilityTopicConsumer shutdown interrupted, logging", (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> ex);
                return;
            }
        }
    }

    private String clientType() {
        return this.clientType;
    }

    private Consumer<byte[], byte[]> consumer$lzycompute() {
        synchronized (this) {
            if ((byte)(this.bitmap$0 & 1) == 0) {
                String clientId = DurabilityTopicClient$.MODULE$.clientId(this.clientType(), this.config().clusterId(), this.config().brokerId(), 0);
                this.consumer = new KafkaConsumer(this.config().toConsumerProperties(clientId));
                this.bitmap$0 = (byte)(this.bitmap$0 | 1);
            }
        }
        return this.consumer;
    }

    public Consumer<byte[], byte[]> consumer() {
        if ((byte)(this.bitmap$0 & 1) == 0) {
            return this.consumer$lzycompute();
        }
        return this.consumer;
    }

    @Override
    public void run() {
        try {
            try {
                while (!this.doShutdown()) {
                    ConsumerRecords records = this.consumer().poll(Duration.ofMillis(this.config().pollDurationMs()));
                    if (records == null) continue;
                    this.processRecords((ConsumerRecords<byte[], byte[]>)records);
                }
            }
            catch (Exception e) {
                if (this.doShutdown()) {
                    this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Exception caught during shutdown", (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> e);
                } else {
                    this.error((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Exception in TierTopicConsumer", (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> e);
                }
            }
        }
        finally {
            this.ready_$eq(false);
            this.consumer().close();
        }
    }

    public Set<TopicPartition> partitions() {
        return (Set)this.config().partitionsToMonitor().map((Function1 & Serializable & scala.Serializable)x -> DurabilityTopicConsumer.$anonfun$partitions$1(this, BoxesRunTime.unboxToShort((Object)x)), Set$.MODULE$.canBuildFrom());
    }

    public void startConsumer(boolean startConsumeThread) {
        Set<TopicPartition> durabilityTopicPartitionsToMonitor = this.partitions();
        this.consumer().assign((Collection)CollectionConverters$.MODULE$.seqAsJavaListConverter((Seq)durabilityTopicPartitionsToMonitor.toList()).asJava());
        Seq<Object> offsets = this.db().getDurabilityTopicPartitionOffsets();
        durabilityTopicPartitionsToMonitor.foreach((Function1 & Serializable & scala.Serializable)topicPartition -> {
            DurabilityTopicConsumer.$anonfun$startConsumer$1(this, offsets, topicPartition);
            return BoxedUnit.UNIT;
        });
        if (startConsumeThread) {
            this.consumerThread().start();
        }
        this.ready_$eq(true);
    }

    public boolean startConsumer$default$1() {
        return true;
    }

    public void processRecords(ConsumerRecords<byte[], byte[]> records) {
        records.forEach(record -> {
            try {
                this.materialize().materialize((ConsumerRecord<byte[], byte[]>)record);
                this.db().updateDurabilityTopicPartitionOffset(record.partition(), record.offset() + 1L);
                return;
            }
            catch (Exception ex) {
                this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(99).append("Ignoring unsupported record ").append(record).append(" during materialization. Could be due to ").append("incompatibility during upgrade").toString(), (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> ex);
                return;
            }
        });
    }

    public static final /* synthetic */ TopicPartition $anonfun$partitions$1(DurabilityTopicConsumer $this, short x) {
        return new TopicPartition($this.config().topicName(), (int)x);
    }

    public static final /* synthetic */ void $anonfun$startConsumer$1(DurabilityTopicConsumer $this, Seq offsets$1, TopicPartition topicPartition) {
        $this.info((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(53).append("seeking durability consumer for partition ").append(topicPartition.partition()).append(" to offset ").append(offsets$1.apply(topicPartition.partition())).toString());
        $this.consumer().seek(topicPartition, BoxesRunTime.unboxToLong((Object)offsets$1.apply(topicPartition.partition())));
    }

    public DurabilityTopicConsumer(DurabilityTopicConfig config, DurabilityDB db, AbstractDurabilityEventsMaterialize materialize) {
        this.config = config;
        this.db = db;
        this.materialize = materialize;
        Logging.$init$(this);
        this.ready = false;
        this.doShutdown = false;
        this.consumerThread = new KafkaThread("DurabilityTopicConsumer", (Runnable)this, false);
        this.clientType = "consumer";
    }
}

