package io.confluent.ksql.execution.scalablepush;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.MoreExecutors;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.execution.ddl.commands.KsqlTopic;
import io.confluent.ksql.execution.scalablepush.consumer.CatchupConsumer;
import io.confluent.ksql.execution.scalablepush.consumer.CatchupCoordinator;
import io.confluent.ksql.execution.scalablepush.consumer.CatchupCoordinatorImpl;
import io.confluent.ksql.execution.scalablepush.consumer.KafkaConsumerFactory;
import io.confluent.ksql.execution.scalablepush.consumer.LatestConsumer;
import io.confluent.ksql.execution.scalablepush.locator.AllHostsLocator;
import io.confluent.ksql.execution.scalablepush.locator.PushLocator;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.PushOffsetRange;
import java.net.MalformedURLException;
import java.net.URL;
import java.time.Clock;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import javax.annotation.concurrent.GuardedBy;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/execution/scalablepush/ScalablePushRegistry.class */
public class ScalablePushRegistry {
    private static final Logger LOG = LoggerFactory.getLogger(ScalablePushRegistry.class);
    private static final String LATEST_CONSUMER_GROUP_SUFFIX = "_scalable_push_query_latest";
    private static final String CATCHUP_CONSUMER_GROUP_MIDDLE = "_scalable_push_query_catchup_";
    private final PushLocator pushLocator;
    private final LogicalSchema logicalSchema;
    private final boolean isTable;
    private final Map<String, Object> consumerProperties;
    private final KsqlTopic ksqlTopic;
    private final ServiceContext serviceContext;
    private final KsqlConfig ksqlConfig;
    private final String sourceApplicationId;
    private final KafkaConsumerFactory.KafkaConsumerFactoryInterface kafkaConsumerFactory;
    private final LatestConsumer.LatestConsumerFactory latestConsumerFactory;
    private final CatchupConsumer.CatchupConsumerFactory catchupConsumerFactory;
    private final ExecutorService executorService;
    private final ScheduledExecutorService executorServiceCatchup;

    @GuardedBy("this")
    private boolean closed = false;
    private AtomicReference<LatestConsumer> latestConsumer = new AtomicReference<>(null);
    private CatchupCoordinator catchupCoordinator = new CatchupCoordinatorImpl();
    private Map<QueryId, CatchupConsumer> catchupConsumers = new ConcurrentHashMap();

    @GuardedBy("this")
    private boolean stopLatestConsumerOnLastRequest = true;

    /* loaded from: input_file:io/confluent/ksql/execution/scalablepush/ScalablePushRegistry$CatchupMetadata.class */
    public static class CatchupMetadata {
        private final PushOffsetRange pushOffsetRange;
        private final String catchupConsumerGroup;

        public CatchupMetadata(PushOffsetRange pushOffsetRange, String str) {
            this.pushOffsetRange = pushOffsetRange;
            this.catchupConsumerGroup = str;
        }

        public PushOffsetRange getPushOffsetRange() {
            return this.pushOffsetRange;
        }

        public String getCatchupConsumerGroup() {
            return this.catchupConsumerGroup;
        }
    }

    @SuppressFBWarnings({"EI_EXPOSE_REP2"})
    public ScalablePushRegistry(PushLocator pushLocator, LogicalSchema logicalSchema, boolean z, Map<String, Object> map, KsqlTopic ksqlTopic, ServiceContext serviceContext, KsqlConfig ksqlConfig, String str, KafkaConsumerFactory.KafkaConsumerFactoryInterface kafkaConsumerFactoryInterface, LatestConsumer.LatestConsumerFactory latestConsumerFactory, CatchupConsumer.CatchupConsumerFactory catchupConsumerFactory, ExecutorService executorService, ScheduledExecutorService scheduledExecutorService) {
        this.pushLocator = pushLocator;
        this.logicalSchema = logicalSchema;
        this.isTable = z;
        this.consumerProperties = map;
        this.ksqlTopic = ksqlTopic;
        this.serviceContext = serviceContext;
        this.ksqlConfig = ksqlConfig;
        this.sourceApplicationId = str;
        this.kafkaConsumerFactory = kafkaConsumerFactoryInterface;
        this.latestConsumerFactory = latestConsumerFactory;
        this.catchupConsumerFactory = catchupConsumerFactory;
        this.executorService = executorService;
        this.executorServiceCatchup = scheduledExecutorService;
    }

    public synchronized void close() {
        if (this.closed) {
            LOG.warn("Already closed registry");
            return;
        }
        LOG.info("Closing scalable push registry for topic " + this.ksqlTopic.getKafkaTopicName());
        LatestConsumer latestConsumer = this.latestConsumer.get();
        if (latestConsumer != null) {
            latestConsumer.closeAsync();
        }
        Iterator<CatchupConsumer> it = this.catchupConsumers.values().iterator();
        while (it.hasNext()) {
            it.next().closeAsync();
        }
        this.catchupConsumers.clear();
        MoreExecutors.shutdownAndAwaitTermination(this.executorService, 5000L, TimeUnit.MILLISECONDS);
        MoreExecutors.shutdownAndAwaitTermination(this.executorServiceCatchup, 5000L, TimeUnit.MILLISECONDS);
        this.closed = true;
    }

    public synchronized void cleanup() {
        close();
        deleteConsumerGroup(getLatestConsumerGroupId());
    }

    public synchronized boolean isClosed() {
        return this.closed;
    }

    public synchronized void register(ProcessingQueue processingQueue, Optional<CatchupMetadata> optional) {
        if (this.closed) {
            throw new IllegalStateException("Shouldn't register after closing");
        }
        try {
            if (!optional.isPresent()) {
                LatestConsumer latestConsumer = this.latestConsumer.get();
                if (latestConsumer == null || latestConsumer.isClosed()) {
                    startLatestIfNotRunning(Optional.of(processingQueue));
                } else {
                    latestConsumer.register(processingQueue);
                }
            } else {
                if (this.catchupConsumers.size() >= this.ksqlConfig.getInt("ksql.query.push.v2.max.catchup.consumers").intValue()) {
                    processingQueue.onError();
                    throw new KsqlException("Too many catchups registered, ksql.query.push.v2.max.catchup.consumers:" + this.ksqlConfig.getInt("ksql.query.push.v2.max.catchup.consumers"));
                }
                startLatestIfNotRunning(Optional.empty());
                startCatchup(processingQueue, optional.get());
            }
        } finally {
            stopLatestConsumerOnLastRequest();
        }
    }

    public synchronized void unregister(ProcessingQueue processingQueue) {
        if (this.closed) {
            throw new IllegalStateException("Shouldn't unregister after closing");
        }
        try {
            LatestConsumer latestConsumer = this.latestConsumer.get();
            if (latestConsumer != null && !latestConsumer.isClosed()) {
                latestConsumer.unregister(processingQueue);
            }
            unregisterCatchup(processingQueue);
        } finally {
            stopLatestConsumerOnLastRequest();
        }
    }

    public PushLocator getLocator() {
        return this.pushLocator;
    }

    public boolean isTable() {
        return this.isTable;
    }

    public boolean isWindowed() {
        return this.ksqlTopic.getKeyFormat().isWindowed();
    }

    @VisibleForTesting
    public synchronized boolean isLatestRunning() {
        LatestConsumer latestConsumer = this.latestConsumer.get();
        return (latestConsumer == null || latestConsumer.isClosed()) ? false : true;
    }

    @VisibleForTesting
    public synchronized boolean anyCatchupsRunning() {
        return this.catchupConsumers.size() > 0;
    }

    @VisibleForTesting
    public int numRegistered() {
        return latestNumRegistered() + catchupNumRegistered();
    }

    @VisibleForTesting
    public synchronized int latestNumRegistered() {
        LatestConsumer latestConsumer = this.latestConsumer.get();
        if (latestConsumer == null || latestConsumer.isClosed()) {
            return 0;
        }
        return latestConsumer.numRegistered();
    }

    @VisibleForTesting
    public synchronized long latestNumRowsReceived() {
        LatestConsumer latestConsumer = this.latestConsumer.get();
        if (latestConsumer == null || latestConsumer.isClosed()) {
            return 0L;
        }
        return latestConsumer.getNumRowsReceived();
    }

    @VisibleForTesting
    public synchronized int catchupNumRegistered() {
        int i = 0;
        Iterator<CatchupConsumer> it = this.catchupConsumers.values().iterator();
        while (it.hasNext()) {
            i += it.next().numRegistered();
        }
        return i;
    }

    @VisibleForTesting
    public synchronized boolean latestHasAssignment() {
        LatestConsumer latestConsumer = this.latestConsumer.get();
        return (latestConsumer == null || latestConsumer.isClosed() || latestConsumer.getAssignment() == null) ? false : true;
    }

    @VisibleForTesting
    public synchronized void setKeepLatestConsumerOnLastRequest() {
        this.stopLatestConsumerOnLastRequest = false;
    }

    public synchronized void onError() {
    }

    public static Optional<ScalablePushRegistry> create(LogicalSchema logicalSchema, Supplier<List<PersistentQueryMetadata>> supplier, boolean z, Map<String, Object> map, Map<String, Object> map2, String str, KsqlTopic ksqlTopic, ServiceContext serviceContext, KsqlConfig ksqlConfig) {
        Object obj = map.get("application.server");
        if (obj == null) {
            return Optional.empty();
        }
        if (!(obj instanceof String)) {
            throw new IllegalArgumentException("application.server not String");
        }
        try {
            return Optional.of(new ScalablePushRegistry(new AllHostsLocator(supplier, new URL((String) obj)), logicalSchema, z, map2, ksqlTopic, serviceContext, ksqlConfig, str, KafkaConsumerFactory::create, LatestConsumer::new, CatchupConsumer::new, Executors.newSingleThreadExecutor(), Executors.newScheduledThreadPool(ksqlConfig.getInt("ksql.query.push.v2.max.catchup.consumers").intValue())));
        } catch (MalformedURLException e) {
            throw new IllegalArgumentException("application.server malformed: '" + obj + "'");
        }
    }

    private synchronized boolean stopLatestConsumerOnLastRequest() {
        LatestConsumer latestConsumer = this.latestConsumer.get();
        if (latestConsumer == null || latestConsumer.isClosed() || latestConsumer.numRegistered() != 0 || !this.stopLatestConsumerOnLastRequest || this.catchupConsumers.size() != 0) {
            return false;
        }
        latestConsumer.closeAsync();
        return true;
    }

    private LatestConsumer createLatestConsumer(Optional<ProcessingQueue> optional) {
        KafkaConsumer<Object, GenericRow> kafkaConsumer = null;
        try {
            kafkaConsumer = this.kafkaConsumerFactory.create(this.ksqlTopic, this.logicalSchema, this.serviceContext, this.consumerProperties, this.ksqlConfig, getLatestConsumerGroupId());
            return this.latestConsumerFactory.create(this.ksqlTopic.getKafkaTopicName(), isWindowed(), this.logicalSchema, kafkaConsumer, this.catchupCoordinator, this::catchupAssignmentUpdater, this.ksqlConfig, Clock.systemUTC());
        } catch (Exception e) {
            LOG.error("Couldn't create latest consumer", e);
            optional.ifPresent((v0) -> {
                v0.onError();
            });
            if (kafkaConsumer != null) {
                kafkaConsumer.close();
            }
            throw e;
        }
    }

    private synchronized void startLatestIfNotRunning(Optional<ProcessingQueue> optional) {
        LatestConsumer latestConsumer = this.latestConsumer.get();
        if (latestConsumer == null || latestConsumer.isClosed()) {
            LatestConsumer createLatestConsumer = createLatestConsumer(optional);
            createLatestConsumer.getClass();
            optional.ifPresent(createLatestConsumer::register);
            this.latestConsumer.set(createLatestConsumer);
            this.executorService.submit(() -> {
                runLatest(createLatestConsumer);
            });
        }
    }

    private void runLatest(LatestConsumer latestConsumer) {
        Throwable th = null;
        try {
            try {
                latestConsumer.run();
                if (latestConsumer != null) {
                    if (0 != 0) {
                        try {
                            latestConsumer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        latestConsumer.close();
                    }
                }
            } finally {
            }
        } catch (Throwable th3) {
            LOG.error("Got error while running latest", th3);
            latestConsumer.onError();
        }
    }

    private String getLatestConsumerGroupId() {
        return this.sourceApplicationId + LATEST_CONSUMER_GROUP_SUFFIX;
    }

    private synchronized void catchupAssignmentUpdater(Collection<TopicPartition> collection) {
        Iterator<CatchupConsumer> it = this.catchupConsumers.values().iterator();
        while (it.hasNext()) {
            it.next().newAssignment(collection);
        }
    }

    private CatchupConsumer createCatchupConsumer(ProcessingQueue processingQueue, PushOffsetRange pushOffsetRange, String str) {
        KafkaConsumer<Object, GenericRow> kafkaConsumer = null;
        try {
            kafkaConsumer = this.kafkaConsumerFactory.create(this.ksqlTopic, this.logicalSchema, this.serviceContext, this.consumerProperties, this.ksqlConfig, str);
            CatchupConsumer.CatchupConsumerFactory catchupConsumerFactory = this.catchupConsumerFactory;
            String kafkaTopicName = this.ksqlTopic.getKafkaTopicName();
            boolean isWindowed = isWindowed();
            LogicalSchema logicalSchema = this.logicalSchema;
            AtomicReference<LatestConsumer> atomicReference = this.latestConsumer;
            atomicReference.getClass();
            return catchupConsumerFactory.create(kafkaTopicName, isWindowed, logicalSchema, kafkaConsumer, atomicReference::get, this.catchupCoordinator, pushOffsetRange, Clock.systemUTC(), this.ksqlConfig.getLong("ksql.query.push.v2.catchup.consumer.msg.window").longValue(), this::unregisterCatchup);
        } catch (Exception e) {
            LOG.error("Couldn't create catchup consumer", e);
            processingQueue.onError();
            if (kafkaConsumer != null) {
                kafkaConsumer.close();
            }
            throw e;
        }
    }

    private synchronized void unregisterCatchup(ProcessingQueue processingQueue) {
        this.catchupConsumers.computeIfPresent(processingQueue.getQueryId(), (queryId, catchupConsumer) -> {
            catchupConsumer.unregister(processingQueue);
            catchupConsumer.closeAsync();
            return null;
        });
    }

    private synchronized void startCatchup(ProcessingQueue processingQueue, CatchupMetadata catchupMetadata) {
        CatchupConsumer createCatchupConsumer = createCatchupConsumer(processingQueue, catchupMetadata.getPushOffsetRange(), catchupMetadata.getCatchupConsumerGroup());
        createCatchupConsumer.register(processingQueue);
        this.catchupConsumers.put(processingQueue.getQueryId(), createCatchupConsumer);
        this.executorServiceCatchup.submit(() -> {
            runCatchup(createCatchupConsumer, processingQueue);
        });
    }

    private void runCatchup(CatchupConsumer catchupConsumer, ProcessingQueue processingQueue) {
        try {
            Throwable th = null;
            try {
                try {
                    try {
                        catchupConsumer.run();
                        if (catchupConsumer != null) {
                            if (0 != 0) {
                                try {
                                    catchupConsumer.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                catchupConsumer.close();
                            }
                        }
                        catchupConsumer.unregister(processingQueue);
                        this.catchupConsumers.remove(processingQueue.getQueryId());
                        stopLatestConsumerOnLastRequest();
                    } catch (Throwable th3) {
                        th = th3;
                        throw th3;
                    }
                } catch (Throwable th4) {
                    if (catchupConsumer != null) {
                        if (th != null) {
                            try {
                                catchupConsumer.close();
                            } catch (Throwable th5) {
                                th.addSuppressed(th5);
                            }
                        } else {
                            catchupConsumer.close();
                        }
                    }
                    throw th4;
                }
            } catch (Throwable th6) {
                LOG.error("Got error while running catchup", th6);
                catchupConsumer.onError();
                catchupConsumer.unregister(processingQueue);
                this.catchupConsumers.remove(processingQueue.getQueryId());
                stopLatestConsumerOnLastRequest();
            }
        } catch (Throwable th7) {
            catchupConsumer.unregister(processingQueue);
            this.catchupConsumers.remove(processingQueue.getQueryId());
            stopLatestConsumerOnLastRequest();
            throw th7;
        }
    }

    public void cleanupCatchupConsumer(String str) {
        this.executorServiceCatchup.schedule(() -> {
            deleteConsumerGroup(str);
        }, 5000L, TimeUnit.MILLISECONDS);
    }

    public String getCatchupConsumerId(String str) {
        return this.sourceApplicationId + CATCHUP_CONSUMER_GROUP_MIDDLE + str;
    }

    private void deleteConsumerGroup(String str) {
        LOG.info("Cleaning up consumer group " + str);
        try {
            Map listConsumerGroupOffsets = this.serviceContext.getConsumerGroupClient().listConsumerGroupOffsets(str);
            if (listConsumerGroupOffsets == null || listConsumerGroupOffsets.values().stream().allMatch((v0) -> {
                return Objects.isNull(v0);
            })) {
                return;
            }
            this.serviceContext.getConsumerGroupClient().deleteConsumerGroups(ImmutableSet.of(str));
        } catch (Throwable th) {
            LOG.error("Failed to delete consumer group " + str, th);
        }
    }
}
