package io.confluent.ksql.query;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.UnmodifiableIterator;
import io.confluent.ksql.GenericRow;
import io.vertx.core.impl.ConcurrentHashSet;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
import org.apache.kafka.streams.processor.internals.StreamTask;

/* loaded from: input_file:io/confluent/ksql/query/TransientQuerySinkProcessor.class */
final class TransientQuerySinkProcessor implements Processor<Object, GenericRow, Void, Void> {
    private final TransientQueryQueue queue;
    private final Optional<ImmutableMap<TopicPartition, Long>> endOffsets;
    private final ConcurrentHashSet<TopicPartition> donePartitions;
    private ProcessorContext<Void, Void> context;

    public static ProcessorSupplier<Object, GenericRow, Void, Void> supplier(TransientQueryQueue transientQueryQueue, Optional<ImmutableMap<TopicPartition, Long>> optional, ConcurrentHashSet<TopicPartition> concurrentHashSet) {
        return () -> {
            return new TransientQuerySinkProcessor(transientQueryQueue, optional, concurrentHashSet);
        };
    }

    private TransientQuerySinkProcessor(TransientQueryQueue transientQueryQueue, Optional<ImmutableMap<TopicPartition, Long>> optional, ConcurrentHashSet<TopicPartition> concurrentHashSet) {
        this.queue = transientQueryQueue;
        this.endOffsets = optional;
        this.donePartitions = concurrentHashSet;
    }

    public void init(ProcessorContext<Void, Void> processorContext) {
        super.init(processorContext);
        this.context = processorContext;
        if (this.endOffsets.isPresent()) {
            processorContext.schedule(Duration.ofMillis(100L), PunctuationType.WALL_CLOCK_TIME, j -> {
                checkForQueryCompletion();
            });
        }
    }

    public void process(Record<Object, GenericRow> record) {
        Optional map = this.context.recordMetadata().map(recordMetadata -> {
            return new TopicPartition(recordMetadata.topic(), recordMetadata.partition());
        });
        boolean z = map.isPresent() && this.donePartitions.contains(map.get());
        if (record.value() != null && !z) {
            this.queue.acceptRow(null, (GenericRow) record.value());
        }
        map.ifPresent(this::checkForPartitionCompletion);
    }

    private void checkForPartitionCompletion(TopicPartition topicPartition) {
        if (this.endOffsets.isPresent()) {
            ImmutableMap<TopicPartition, Long> immutableMap = this.endOffsets.get();
            if (immutableMap.containsKey(topicPartition)) {
                checkCompletion(topicPartition, getCurrentPositions(), (Long) immutableMap.get(topicPartition));
            }
        }
    }

    private void checkForQueryCompletion() {
        if (this.endOffsets.isPresent()) {
            Map<TopicPartition, OffsetAndMetadata> currentPositions = getCurrentPositions();
            UnmodifiableIterator it = this.endOffsets.get().entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry entry = (Map.Entry) it.next();
                checkCompletion((TopicPartition) entry.getKey(), currentPositions, (Long) entry.getValue());
            }
            if (ImmutableSet.copyOf(this.donePartitions).equals(this.endOffsets.get().keySet())) {
                this.queue.complete();
            }
        }
    }

    private void checkCompletion(TopicPartition topicPartition, Map<TopicPartition, OffsetAndMetadata> map, Long l) {
        OffsetAndMetadata offsetAndMetadata;
        if (!map.containsKey(topicPartition) || (offsetAndMetadata = map.get(topicPartition)) == null || offsetAndMetadata.offset() < l.longValue()) {
            return;
        }
        this.donePartitions.add(topicPartition);
    }

    private Map<TopicPartition, OffsetAndMetadata> getCurrentPositions() {
        try {
            if (!this.context.getClass().equals(ProcessorContextImpl.class)) {
                throw new IllegalStateException("Expected only to run in the KafkaStreams or TopologyTestDriver runtimes.");
            }
            Field declaredField = ProcessorContextImpl.class.getDeclaredField("streamTask");
            declaredField.setAccessible(true);
            StreamTask streamTask = (StreamTask) declaredField.get(this.context);
            Method declaredMethod = StreamTask.class.getDeclaredMethod("committableOffsetsAndMetadata", new Class[0]);
            declaredMethod.setAccessible(true);
            return (Map) declaredMethod.invoke(streamTask, new Object[0]);
        } catch (IllegalAccessException | NoSuchFieldException | NoSuchMethodException | InvocationTargetException e) {
            throw new RuntimeException(e);
        }
    }
}
