package io.confluent.ksql.rest.server;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import io.confluent.ksql.rest.client.KsqlClient;
import io.confluent.ksql.rest.entity.KsqlErrorMessage;
import io.confluent.ksql.rest.server.services.InternalKsqlClientFactory;
import io.confluent.ksql.rest.server.services.RestServiceContextFactory;
import io.confluent.ksql.rest.server.state.ServerState;
import io.confluent.ksql.schema.registry.KsqlSchemaRegistryClientFactory;
import io.confluent.ksql.services.DefaultConnectClientFactory;
import io.confluent.ksql.services.KafkaTopicClient;
import io.confluent.ksql.services.KafkaTopicClientImpl;
import io.confluent.ksql.services.LazyServiceContext;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.RetryUtil;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.net.SocketAddress;
import java.io.Closeable;
import java.net.URI;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/rest/server/PreconditionChecker.class */
public class PreconditionChecker implements Executable {
    private static final Logger LOG = LoggerFactory.getLogger(PreconditionChecker.class);
    final KsqlRestConfig restConfig;
    final Supplier<Map<String, String>> propertiesLoader;
    final Supplier<Clients> clientsSupplier;
    final Vertx vertx;
    final List<KsqlServerPrecondition> preconditions;
    final PreconditionServer server;
    final ServerState serverState;
    private final CompletableFuture<Void> terminatedFuture;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/confluent/ksql/rest/server/PreconditionChecker$Clients.class */
    public static final class Clients implements Closeable {
        private final ServiceContext serviceContext;
        private final Vertx vertx;
        private final Admin admin;
        private final KafkaTopicClient topicClient;

        Clients(ServiceContext serviceContext, Vertx vertx, Admin admin, KafkaTopicClient kafkaTopicClient) {
            this.serviceContext = serviceContext;
            this.vertx = vertx;
            this.admin = admin;
            this.topicClient = kafkaTopicClient;
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            this.serviceContext.close();
            this.vertx.close();
            this.admin.close();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:io/confluent/ksql/rest/server/PreconditionChecker$KsqlFailedPrecondition.class */
    public static class KsqlFailedPrecondition extends RuntimeException {
        KsqlFailedPrecondition(String str) {
            super(str);
        }
    }

    public PreconditionChecker(Supplier<Map<String, String>> supplier, ServerState serverState) {
        this.terminatedFuture = new CompletableFuture<>();
        this.propertiesLoader = (Supplier) Objects.requireNonNull(supplier, "propertiesLoader");
        this.restConfig = new KsqlRestConfig(supplier.get());
        this.serverState = (ServerState) Objects.requireNonNull(serverState, "serverState");
        this.clientsSupplier = () -> {
            return buildClients(this.propertiesLoader);
        };
        this.preconditions = this.restConfig.getConfiguredInstances(KsqlRestConfig.KSQL_SERVER_PRECONDITIONS, KsqlServerPrecondition.class);
        this.vertx = Vertx.vertx(new VertxOptions().setMaxWorkerExecuteTimeUnit(TimeUnit.MILLISECONDS).setMaxWorkerExecuteTime(Long.MAX_VALUE));
        this.server = new PreconditionServer(this.vertx, this.restConfig, serverState);
    }

    @VisibleForTesting
    PreconditionChecker(Supplier<Map<String, String>> supplier, KsqlRestConfig ksqlRestConfig, Supplier<Clients> supplier2, Vertx vertx, List<KsqlServerPrecondition> list, PreconditionServer preconditionServer, ServerState serverState) {
        this.terminatedFuture = new CompletableFuture<>();
        this.propertiesLoader = (Supplier) Objects.requireNonNull(supplier, "propertiesLoader");
        this.clientsSupplier = (Supplier) Objects.requireNonNull(supplier2, "clientsSupplier");
        this.restConfig = (KsqlRestConfig) Objects.requireNonNull(ksqlRestConfig, "restConfig");
        this.vertx = (Vertx) Objects.requireNonNull(vertx, "vertx");
        this.preconditions = (List) Objects.requireNonNull(list, "preconditions");
        this.server = (PreconditionServer) Objects.requireNonNull(preconditionServer, "server");
        this.serverState = (ServerState) Objects.requireNonNull(serverState, "state");
    }

    private boolean shouldCheckPreconditions() {
        Clients clients = this.clientsSupplier.get();
        Throwable th = null;
        try {
            boolean anyMatch = this.preconditions.stream().map(ksqlServerPrecondition -> {
                return ksqlServerPrecondition.checkPrecondition(this.propertiesLoader.get(), clients.serviceContext, clients.topicClient);
            }).peek(optional -> {
                optional.ifPresent(ksqlErrorMessage -> {
                    LOG.info("Precondition failed: {}", ksqlErrorMessage);
                });
            }).anyMatch((v0) -> {
                return v0.isPresent();
            });
            if (clients != null) {
                if (0 != 0) {
                    try {
                        clients.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    clients.close();
                }
            }
            return anyMatch;
        } catch (Throwable th3) {
            if (clients != null) {
                if (0 != 0) {
                    try {
                        clients.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    clients.close();
                }
            }
            throw th3;
        }
    }

    @Override // io.confluent.ksql.rest.server.Executable
    public void startAsync() {
        if (!shouldCheckPreconditions()) {
            LOG.info("All preconditions passed, skipping precondition server start");
        } else {
            LOG.info("Some preconditions not passed, starting precondition server");
            this.server.start();
        }
    }

    @Override // io.confluent.ksql.rest.server.Executable
    public void notifyTerminated() {
        this.terminatedFuture.complete(null);
    }

    @Override // io.confluent.ksql.rest.server.Executable
    public void shutdown() {
        if (this.server.started()) {
            this.server.stop();
        }
        this.vertx.close();
    }

    @Override // io.confluent.ksql.rest.server.Executable
    public void awaitTerminated() {
        ImmutableList of = ImmutableList.of(exc -> {
            return !(exc instanceof KsqlFailedPrecondition);
        });
        int intExact = Math.toIntExact(this.restConfig.getLong(KsqlRestConfig.KSQL_PRECONDITION_CHECKER_BACK_OFF_TIME_MS).longValue());
        Runnable runnable = this::checkPreconditions;
        CompletableFuture<Void> completableFuture = this.terminatedFuture;
        completableFuture.getClass();
        RetryUtil.retryWithBackoff(Integer.MAX_VALUE, 1000, intExact, runnable, completableFuture::isDone, of);
    }

    public List<URI> getListeners() {
        return this.server.getListeners();
    }

    private void checkPreconditions() {
        LOG.info("Checking preconditions...");
        for (KsqlServerPrecondition ksqlServerPrecondition : this.preconditions) {
            Clients clients = this.clientsSupplier.get();
            Throwable th = null;
            try {
                try {
                    Optional<KsqlErrorMessage> checkPrecondition = ksqlServerPrecondition.checkPrecondition(this.propertiesLoader.get(), clients.serviceContext, clients.topicClient);
                    if (clients != null) {
                        if (0 != 0) {
                            try {
                                clients.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            clients.close();
                        }
                    }
                    if (checkPrecondition.isPresent()) {
                        LOG.info("Precondition failed: {}", checkPrecondition.get());
                        this.serverState.setInitializingReason(checkPrecondition.get());
                        throw new KsqlFailedPrecondition(checkPrecondition.get().toString());
                    }
                } finally {
                }
            } catch (Throwable th3) {
                if (clients != null) {
                    if (th != null) {
                        try {
                            clients.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        clients.close();
                    }
                }
                throw th3;
            }
        }
    }

    private static Admin createCommandTopicAdminClient(KsqlRestConfig ksqlRestConfig, KsqlConfig ksqlConfig) {
        HashMap hashMap = new HashMap(ksqlConfig.getKsqlAdminClientConfigProps());
        hashMap.putAll(ksqlRestConfig.getCommandProducerProperties());
        return new DefaultKafkaClientSupplier().getAdmin(hashMap);
    }

    private static Clients buildClients(Supplier<Map<String, String>> supplier) {
        Map<String, String> map = supplier.get();
        Vertx vertx = Vertx.vertx(new VertxOptions().setMaxWorkerExecuteTimeUnit(TimeUnit.MILLISECONDS).setMaxWorkerExecuteTime(Long.MAX_VALUE));
        KsqlConfig ksqlConfig = new KsqlConfig(map);
        KsqlClient createInternalClient = InternalKsqlClientFactory.createInternalClient(ksqlConfig.originalsStrings(), (v0, v1) -> {
            return SocketAddress.inetSocketAddress(v0, v1);
        }, vertx);
        KsqlSchemaRegistryClientFactory ksqlSchemaRegistryClientFactory = new KsqlSchemaRegistryClientFactory(ksqlConfig, Collections.emptyMap());
        ksqlSchemaRegistryClientFactory.getClass();
        Supplier supplier2 = ksqlSchemaRegistryClientFactory::get;
        DefaultConnectClientFactory defaultConnectClientFactory = new DefaultConnectClientFactory(ksqlConfig);
        LazyServiceContext lazyServiceContext = new LazyServiceContext(() -> {
            return RestServiceContextFactory.create(ksqlConfig, Optional.empty(), supplier2, defaultConnectClientFactory, createInternalClient, Collections.emptyList(), Optional.empty());
        });
        Admin createCommandTopicAdminClient = createCommandTopicAdminClient(new KsqlRestConfig(map), new KsqlConfig(map));
        return new Clients(lazyServiceContext, vertx, createCommandTopicAdminClient, new KafkaTopicClientImpl(() -> {
            return createCommandTopicAdminClient;
        }));
    }
}
