/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.catalog.ingestion;

import io.cloudevents.CloudEvent;
import io.confluent.catalog.DataCatalogConfig;
import io.confluent.catalog.ingestion.event.CloudEventSerde;
import io.confluent.catalog.ingestion.event.EventProcessor;
import io.confluent.catalog.metrics.MetricsManager;
import io.confluent.catalog.storage.MetadataRegistry;
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig;
import io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry;
import io.confluent.kafka.schemaregistry.storage.SchemaRegistry;
import io.confluent.kafka.schemaregistry.storage.exceptions.StoreInitializationException;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.inject.Inject;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EntityIngestor
implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(EntityIngestor.class);
    private final KafkaSchemaRegistry schemaRegistry;
    private final MetadataRegistry metadataRegistry;
    private final MetricsManager metricsManager;
    private final List<String> topics;
    private final List<EventProcessor> processors;
    private int timeout;
    private KafkaStreams streams;

    @Inject
    public EntityIngestor(SchemaRegistry schemaRegistry, MetadataRegistry metadataRegistry, MetricsManager metricsManager) {
        try {
            this.schemaRegistry = (KafkaSchemaRegistry)schemaRegistry;
            this.metadataRegistry = metadataRegistry;
            this.metricsManager = metricsManager;
            this.processors = Collections.singletonList(new EventProcessor(metadataRegistry, metricsManager));
            SchemaRegistryConfig config = this.schemaRegistry.config();
            DataCatalogConfig dataCatalogConfig = new DataCatalogConfig(config.originalProperties());
            this.topics = dataCatalogConfig.getList("catalog.ingestor.topics");
            if (!dataCatalogConfig.isCatalogIngestorEnabled()) {
                return;
            }
            this.createTopic();
            Properties streamsConfig = new Properties();
            EntityIngestor.addSchemaRegistryConfigsToClientProperties(config, streamsConfig);
            String applicationId = config.getString("schema.registry.group.id") + "-ingest";
            streamsConfig.put("application.id", applicationId);
            streamsConfig.put("bootstrap.servers", config.bootstrapBrokers());
            streamsConfig.put("default.key.serde", Serdes.StringSerde.class.getName());
            streamsConfig.put("default.value.serde", CloudEventSerde.class.getName());
            streamsConfig.put("num.stream.threads", (Object)2);
            streamsConfig.put("commit.interval.ms", (Object)100);
            streamsConfig.put("default.deserialization.exception.handler", LogAndContinueExceptionHandler.class);
            streamsConfig.put("max.poll.interval.ms", dataCatalogConfig.getInt("catalog.ingestor.max.poll.interval.ms"));
            this.timeout = dataCatalogConfig.getInt("catalog.ingestor.shutdown.timeout.ms");
            this.streams = this.createStream(streamsConfig);
        }
        catch (Exception e) {
            throw new IllegalArgumentException("Could not instantiate MetadataIngestor", e);
        }
    }

    public static void addSchemaRegistryConfigsToClientProperties(SchemaRegistryConfig config, Properties props) {
        props.putAll((Map<?, ?>)config.originalsWithPrefix("kafkastore."));
    }

    private void createTopic() throws StoreInitializationException {
        SchemaRegistryConfig config = this.schemaRegistry.config();
        Properties props = new Properties();
        EntityIngestor.addSchemaRegistryConfigsToClientProperties(config, props);
        props.put("bootstrap.servers", config.bootstrapBrokers());
        int initTimeout = config.getInt("kafkastore.init.timeout.ms");
        try (AdminClient admin = AdminClient.create((Properties)props);){
            Set allTopics = (Set)admin.listTopics().names().get((long)initTimeout, TimeUnit.MILLISECONDS);
            for (String topic : this.topics) {
                if (allTopics.contains(topic)) continue;
                this.createTopic(admin, topic, initTimeout);
            }
        }
        catch (TimeoutException e) {
            throw new StoreInitializationException("Timed out trying to create or validate ingestor topic configuration", (Throwable)e);
        }
        catch (InterruptedException | ExecutionException e) {
            throw new StoreInitializationException("Failed trying to create or validate ingestor topic configuration", (Throwable)e);
        }
    }

    private void createTopic(AdminClient admin, String topic, int initTimeout) throws StoreInitializationException, InterruptedException, ExecutionException, TimeoutException {
        block4: {
            log.info("Creating ingestor topic {}", (Object)topic);
            int numLiveBrokers = ((Collection)admin.describeCluster().nodes().get((long)initTimeout, TimeUnit.MILLISECONDS)).size();
            if (numLiveBrokers <= 0) {
                throw new StoreInitializationException("No live Kafka brokers");
            }
            int desiredReplicationFactor = 3;
            int topicReplicationFactor = Math.min(numLiveBrokers, desiredReplicationFactor);
            if (topicReplicationFactor < desiredReplicationFactor) {
                log.warn("Creating the ingestor topic " + topic + " using a replication factor of " + topicReplicationFactor + ", which is less than the desired one of " + desiredReplicationFactor + ". If this is a production environment, it's crucial to add more brokers and increase the replication factor of the topic.");
            }
            int desiredPartitionCount = 6;
            SchemaRegistryConfig config = this.schemaRegistry.config();
            NewTopic schemaTopicRequest = new NewTopic(topic, desiredPartitionCount, (short)topicReplicationFactor);
            HashMap topicConfigs = new HashMap(config.originalsWithPrefix("kafkastore.topic.config."));
            schemaTopicRequest.configs(topicConfigs);
            try {
                admin.createTopics(Collections.singleton(schemaTopicRequest)).all().get((long)initTimeout, TimeUnit.MILLISECONDS);
            }
            catch (ExecutionException e) {
                if (e.getCause() instanceof TopicExistsException) break block4;
                throw e;
            }
        }
    }

    private KafkaStreams createStream(Properties streamsConfig) {
        StreamsBuilder builder = new StreamsBuilder();
        KStream input = builder.stream(this.topics);
        input.process(() -> new ProcessInput(), new String[0]);
        Topology topology = builder.build();
        log.info("Topology description {}", (Object)topology.describe());
        KafkaStreams streams = new KafkaStreams(topology, streamsConfig);
        streams.start();
        return streams;
    }

    public KafkaStreams getStreams() {
        return this.streams;
    }

    @Override
    public void close() {
        if (this.streams != null) {
            this.streams.close(Duration.ofMillis(this.timeout));
            this.streams = null;
        }
    }

    private final class ProcessInput
    implements Processor<String, CloudEvent, Void, Void> {
        public void init(ProcessorContext context) {
        }

        public void process(Record<String, CloudEvent> record) {
            for (EventProcessor processor : EntityIngestor.this.processors) {
                try {
                    processor.process(record);
                }
                catch (Throwable e) {
                    log.error("Could not process record", e);
                }
            }
        }

        public void close() {
        }
    }
}

