package io.confluent.ksql.execution.scalablepush.consumer;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.execution.common.OffsetsRow;
import io.confluent.ksql.execution.scalablepush.ProcessingQueue;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.util.PushOffsetRange;
import io.confluent.ksql.util.PushOffsetVector;
import java.time.Clock;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/execution/scalablepush/consumer/ScalablePushConsumer.class */
public abstract class ScalablePushConsumer implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(ScalablePushConsumer.class);
    private static final Duration POLL_TIMEOUT = Duration.ofMillis(5000);
    protected final String topicName;
    protected final boolean windowed;
    protected final LogicalSchema logicalSchema;
    protected final KafkaConsumer<Object, GenericRow> consumer;
    protected final Clock clock;
    protected int partitions;
    protected boolean started = false;
    protected AtomicReference<Map<TopicPartition, Long>> currentPositions = new AtomicReference<>(Collections.emptyMap());
    protected volatile boolean newAssignment = false;
    protected final ConcurrentHashMap<QueryId, ProcessingQueue> processingQueues = new ConcurrentHashMap<>();
    private volatile boolean closed = false;
    private AtomicLong numRowsReceived = new AtomicLong(0);
    protected AtomicReference<Set<TopicPartition>> topicPartitions = new AtomicReference<>();

    public ScalablePushConsumer(String str, boolean z, LogicalSchema logicalSchema, KafkaConsumer<Object, GenericRow> kafkaConsumer, Clock clock) {
        this.topicName = str;
        this.windowed = z;
        this.logicalSchema = logicalSchema;
        this.consumer = kafkaConsumer;
        this.clock = clock;
    }

    protected abstract void onEmptyRecords();

    protected abstract void afterBatchProcessed();

    protected abstract void onNewAssignment();

    protected abstract void subscribeOrAssign();

    protected void afterOfferedRow(ProcessingQueue processingQueue) {
    }

    public synchronized void newAssignment(Collection<TopicPartition> collection) {
        this.newAssignment = true;
        this.topicPartitions.set(collection != null ? ImmutableSet.copyOf(collection) : null);
        notify();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void updateCurrentPositions() {
        updateCurrentPositions(Optional.empty());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void updateCurrentPositions(Optional<Map<Integer, Long>> optional) {
        HashMap hashMap = new HashMap();
        for (int i = 0; i < this.partitions; i++) {
            hashMap.put(new TopicPartition(this.topicName, i), -1L);
        }
        for (TopicPartition topicPartition : this.topicPartitions.get()) {
            hashMap.put(topicPartition, optional.map(map -> {
                return (Long) map.get(Integer.valueOf(topicPartition.partition()));
            }).orElse(Long.valueOf(this.consumer.position(topicPartition))));
        }
        LOG.debug("Consumer has assignment {} and current position {}", this.topicPartitions, hashMap);
        this.currentPositions.set(ImmutableMap.copyOf(hashMap));
    }

    private void initialize() {
        this.partitions = this.consumer.partitionsFor(this.topicName).size();
        LOG.info("Found {} partitions for {}", Integer.valueOf(this.partitions), this.topicName);
    }

    public void run() {
        if (this.started) {
            LOG.error("Already ran consumer");
            throw new IllegalStateException("Already ran consumer");
        }
        this.started = true;
        try {
            initialize();
            subscribeOrAssign();
            while (!this.closed) {
                ConsumerRecords poll = this.consumer.poll(POLL_TIMEOUT);
                if (this.topicPartitions.get() != null) {
                    if (this.newAssignment) {
                        this.newAssignment = false;
                        onNewAssignment();
                    }
                    PushOffsetVector offsetVector = getOffsetVector(this.currentPositions.get(), this.topicName, this.partitions);
                    if (poll.isEmpty()) {
                        updateCurrentPositions();
                        computeProgressToken(Optional.of(offsetVector));
                        onEmptyRecords();
                    } else {
                        Iterator it = poll.iterator();
                        while (it.hasNext()) {
                            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                            handleRow(consumerRecord.key(), (GenericRow) consumerRecord.value(), consumerRecord.timestamp());
                        }
                        updateCurrentPositions();
                        computeProgressToken(Optional.of(offsetVector));
                        try {
                            this.consumer.commitSync();
                        } catch (CommitFailedException e) {
                            LOG.warn("Failed to commit, likely due to rebalance.  Will wait for new assignment", e);
                        }
                        afterBatchProcessed();
                    }
                }
            }
        } catch (WakeupException e2) {
        }
    }

    private void computeProgressToken(Optional<PushOffsetVector> optional) {
        PushOffsetVector offsetVector = getOffsetVector(this.currentPositions.get(), this.topicName, this.partitions);
        handleProgressToken(optional.orElse(offsetVector), offsetVector);
    }

    private static PushOffsetVector getOffsetVector(Map<TopicPartition, Long> map, String str, int i) {
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < i; i2++) {
            arrayList.add(map.getOrDefault(new TopicPartition(str, i2), -1L));
        }
        return new PushOffsetVector(arrayList);
    }

    private void handleProgressToken(PushOffsetVector pushOffsetVector, PushOffsetVector pushOffsetVector2) {
        PushOffsetRange pushOffsetRange = new PushOffsetRange(Optional.of(pushOffsetVector), pushOffsetVector2);
        Iterator<ProcessingQueue> it = this.processingQueues.values().iterator();
        while (it.hasNext()) {
            it.next().offer(OffsetsRow.of(this.clock.millis(), pushOffsetRange));
        }
    }

    private boolean handleRow(Object obj, GenericRow genericRow, long j) {
        if ((obj == null && !this.logicalSchema.key().isEmpty()) || genericRow == null) {
            return false;
        }
        this.numRowsReceived.incrementAndGet();
        for (ProcessingQueue processingQueue : this.processingQueues.values()) {
            try {
                processingQueue.offer(RowUtil.createRow(obj, genericRow, j, this.windowed, this.logicalSchema));
                afterOfferedRow(processingQueue);
            } catch (Throwable th) {
                LOG.error("Error while offering row", th);
            }
        }
        return false;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        try {
            closeAsync();
        } catch (Throwable th) {
            LOG.error("Error closing consumer", th);
        }
        try {
            this.consumer.close();
        } catch (Throwable th2) {
            LOG.error("Error closing kafka consumer", th2);
        }
    }

    public void closeAsync() {
        this.closed = true;
        Iterator<ProcessingQueue> it = this.processingQueues.values().iterator();
        while (it.hasNext()) {
            it.next().close();
        }
        this.consumer.wakeup();
    }

    public boolean isClosed() {
        return this.closed;
    }

    public void register(ProcessingQueue processingQueue) {
        this.processingQueues.put(processingQueue.getQueryId(), processingQueue);
    }

    public void unregister(ProcessingQueue processingQueue) {
        this.processingQueues.remove(processingQueue.getQueryId());
    }

    public Set<TopicPartition> getAssignment() {
        return this.topicPartitions.get();
    }

    public Map<TopicPartition, Long> getCurrentOffsets() {
        return this.currentPositions.get();
    }

    public PushOffsetVector getCurrentToken() {
        return getOffsetVector(this.currentPositions.get(), this.topicName, this.partitions);
    }

    public long getNumRowsReceived() {
        return this.numRowsReceived.get();
    }

    public int numRegistered() {
        return this.processingQueues.size();
    }

    public void onError() {
        Iterator<ProcessingQueue> it = this.processingQueues.values().iterator();
        while (it.hasNext()) {
            it.next().onError();
        }
    }
}
