package io.confluent.catalog.ingestion;

import io.cloudevents.CloudEvent;
import io.confluent.catalog.DataCatalogConfig;
import io.confluent.catalog.ingestion.event.CloudEventSerde;
import io.confluent.catalog.ingestion.event.EventProcessor;
import io.confluent.catalog.metrics.MetricsManager;
import io.confluent.catalog.storage.MetadataRegistry;
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig;
import io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry;
import io.confluent.kafka.schemaregistry.storage.SchemaRegistry;
import io.confluent.kafka.schemaregistry.storage.exceptions.StoreInitializationException;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.inject.Inject;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/catalog/ingestion/EntityIngestor.class */
public class EntityIngestor implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(EntityIngestor.class);
    private final KafkaSchemaRegistry schemaRegistry;
    private final MetadataRegistry metadataRegistry;
    private final MetricsManager metricsManager;
    private final List<String> topics;
    private final List<EventProcessor> processors;
    private int timeout;
    private KafkaStreams streams;

    /* loaded from: input_file:io/confluent/catalog/ingestion/EntityIngestor$ProcessInput.class */
    private final class ProcessInput implements Processor<String, CloudEvent, Void, Void> {
        public ProcessInput() {
        }

        public void init(ProcessorContext processorContext) {
        }

        public void process(Record<String, CloudEvent> record) {
            Iterator it = EntityIngestor.this.processors.iterator();
            while (it.hasNext()) {
                try {
                    ((EventProcessor) it.next()).process(record);
                } catch (Throwable th) {
                    EntityIngestor.log.error("Could not process record", th);
                }
            }
        }

        public void close() {
        }
    }

    @Inject
    public EntityIngestor(SchemaRegistry schemaRegistry, MetadataRegistry metadataRegistry, MetricsManager metricsManager) {
        try {
            this.schemaRegistry = (KafkaSchemaRegistry) schemaRegistry;
            this.metadataRegistry = metadataRegistry;
            this.metricsManager = metricsManager;
            this.processors = Collections.singletonList(new EventProcessor(metadataRegistry, metricsManager));
            SchemaRegistryConfig config = this.schemaRegistry.config();
            DataCatalogConfig dataCatalogConfig = new DataCatalogConfig(config.originalProperties());
            this.topics = dataCatalogConfig.getList(DataCatalogConfig.CATALOG_INGESTOR_TOPICS_CONFIG);
            if (dataCatalogConfig.isCatalogIngestorEnabled()) {
                createTopic();
                Properties properties = new Properties();
                addSchemaRegistryConfigsToClientProperties(config, properties);
                properties.put("application.id", config.getString("schema.registry.group.id") + "-ingest");
                properties.put("bootstrap.servers", config.bootstrapBrokers());
                properties.put("default.key.serde", Serdes.StringSerde.class.getName());
                properties.put("default.value.serde", CloudEventSerde.class.getName());
                properties.put("num.stream.threads", 2);
                properties.put("commit.interval.ms", 100);
                properties.put("default.deserialization.exception.handler", LogAndContinueExceptionHandler.class);
                properties.put("max.poll.interval.ms", dataCatalogConfig.getInt(DataCatalogConfig.CATALOG_INGESTOR_MAX_POLL_INTERVAL_MS));
                this.timeout = dataCatalogConfig.getInt(DataCatalogConfig.CATALOG_INGESTOR_SHUTDOWN_TIMEOUT_MS).intValue();
                this.streams = createStream(properties);
            }
        } catch (Exception e) {
            throw new IllegalArgumentException("Could not instantiate MetadataIngestor", e);
        }
    }

    public static void addSchemaRegistryConfigsToClientProperties(SchemaRegistryConfig schemaRegistryConfig, Properties properties) {
        properties.putAll(schemaRegistryConfig.originalsWithPrefix("kafkastore."));
    }

    /* JADX WARN: Finally extract failed */
    private void createTopic() throws StoreInitializationException {
        SchemaRegistryConfig config = this.schemaRegistry.config();
        Properties properties = new Properties();
        addSchemaRegistryConfigsToClientProperties(config, properties);
        properties.put("bootstrap.servers", config.bootstrapBrokers());
        int intValue = config.getInt("kafkastore.init.timeout.ms").intValue();
        try {
            AdminClient create = AdminClient.create(properties);
            Throwable th = null;
            try {
                Set set = (Set) create.listTopics().names().get(intValue, TimeUnit.MILLISECONDS);
                for (String str : this.topics) {
                    if (!set.contains(str)) {
                        createTopic(create, str, intValue);
                    }
                }
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        create.close();
                    }
                }
            } catch (Throwable th3) {
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        create.close();
                    }
                }
                throw th3;
            }
        } catch (InterruptedException | ExecutionException e) {
            throw new StoreInitializationException("Failed trying to create or validate ingestor topic configuration", e);
        } catch (TimeoutException e2) {
            throw new StoreInitializationException("Timed out trying to create or validate ingestor topic configuration", e2);
        }
    }

    private void createTopic(AdminClient adminClient, String str, int i) throws StoreInitializationException, InterruptedException, ExecutionException, TimeoutException {
        log.info("Creating ingestor topic {}", str);
        int size = ((Collection) adminClient.describeCluster().nodes().get(i, TimeUnit.MILLISECONDS)).size();
        if (size <= 0) {
            throw new StoreInitializationException("No live Kafka brokers");
        }
        int min = Math.min(size, 3);
        if (min < 3) {
            log.warn("Creating the ingestor topic " + str + " using a replication factor of " + min + ", which is less than the desired one of 3. If this is a production environment, it's crucial to add more brokers and increase the replication factor of the topic.");
        }
        SchemaRegistryConfig config = this.schemaRegistry.config();
        NewTopic newTopic = new NewTopic(str, 6, (short) min);
        newTopic.configs(new HashMap(config.originalsWithPrefix("kafkastore.topic.config.")));
        try {
            adminClient.createTopics(Collections.singleton(newTopic)).all().get(i, TimeUnit.MILLISECONDS);
        } catch (ExecutionException e) {
            if (!(e.getCause() instanceof TopicExistsException)) {
                throw e;
            }
        }
    }

    private KafkaStreams createStream(Properties properties) {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream(this.topics).process(() -> {
            return new ProcessInput();
        }, new String[0]);
        Topology build = streamsBuilder.build();
        log.info("Topology description {}", build.describe());
        KafkaStreams kafkaStreams = new KafkaStreams(build, properties);
        kafkaStreams.start();
        return kafkaStreams;
    }

    public KafkaStreams getStreams() {
        return this.streams;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        if (this.streams != null) {
            this.streams.close(Duration.ofMillis(this.timeout));
            this.streams = null;
        }
    }
}
