package io.confluent.catalog.notification;

import com.google.common.annotations.VisibleForTesting;
import io.confluent.catalog.DataCatalogConfig;
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig;
import io.confluent.kafka.schemaregistry.storage.KafkaStore;
import io.confluent.kafka.schemaregistry.storage.SchemaRegistry;
import io.confluent.kafka.schemaregistry.storage.exceptions.StoreInitializationException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.inject.Inject;
import org.apache.atlas.AtlasException;
import org.apache.atlas.notification.AbstractNotification;
import org.apache.atlas.notification.NotificationConsumer;
import org.apache.atlas.notification.NotificationException;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.service.Service;
import org.apache.commons.compress.utils.Lists;
import org.apache.commons.configuration.Configuration;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

@Component
@Order(4)
@NotificationSender
/* loaded from: input_file:io/confluent/catalog/notification/MultiTenantKafkaNotification.class */
public class MultiTenantKafkaNotification extends AbstractNotification implements Service {
    public static final Logger LOG = LoggerFactory.getLogger(MultiTenantKafkaNotification.class);
    private Properties properties;
    private final SchemaRegistry schemaRegistry;
    private String psrcId;
    private final Map<NotificationInterface.NotificationType, KafkaProducer> producers;
    private final Map<NotificationInterface.NotificationType, String> topicMap;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/catalog/notification/MultiTenantKafkaNotification$MessageContext.class */
    public static class MessageContext {
        private final Future<RecordMetadata> future;
        private final String message;

        public MessageContext(Future<RecordMetadata> future, String str) {
            this.future = future;
            this.message = str;
        }

        public Future<RecordMetadata> getFuture() {
            return this.future;
        }

        public String getMessage() {
            return this.message;
        }
    }

    @Inject
    public MultiTenantKafkaNotification(Configuration configuration, SchemaRegistry schemaRegistry) throws AtlasException {
        super(configuration);
        this.psrcId = "";
        this.producers = new ConcurrentHashMap();
        this.topicMap = new HashMap();
        LOG.info("==> MultiTenantKafkaNotification()");
        this.schemaRegistry = schemaRegistry;
        SchemaRegistryConfig config = schemaRegistry.config();
        try {
            this.psrcId = config.getString("psrc.id");
        } catch (ConfigException e) {
            LOG.warn("config error with psrc.id");
        }
        try {
            this.properties = new Properties();
            KafkaStore.addSchemaRegistryConfigsToClientProperties(config, this.properties);
            this.properties.put("bootstrap.servers", config.bootstrapBrokers());
            this.properties.put("acks", "all");
            this.properties.put("retries", 3);
            this.properties.put("enable.idempotence", true);
            this.properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            this.properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            DataCatalogConfig dataCatalogConfig = new DataCatalogConfig(config.originalProperties());
            this.topicMap.put(NotificationInterface.NotificationType.ENTITIES, dataCatalogConfig.getCatalogNotificationsTopic());
            this.topicMap.put(NotificationInterface.NotificationType.HOOK, dataCatalogConfig.getCatalogNotificationsHookTopic());
            createTopicIfNotFound();
            LOG.info("<== KafkaNotification()");
        } catch (Exception e2) {
            throw new IllegalArgumentException("Could not instantiate MultiTenantKafkaNotification", e2);
        }
    }

    public void start() throws AtlasException {
        LOG.info("==> KafkaNotification.start()");
        LOG.info("<== KafkaNotification.start()");
    }

    public void stop() {
        LOG.info("==> KafkaNotification.stop()");
        LOG.info("<== KafkaNotification.stop()");
    }

    public boolean isReady(NotificationInterface.NotificationType notificationType) {
        try {
            getOrCreateProducer(notificationType);
            return true;
        } catch (Exception e) {
            LOG.error("Error: Connecting... {}", e.getMessage());
            return false;
        }
    }

    public void close() {
        LOG.info("==> KafkaNotification.close()");
        for (KafkaProducer kafkaProducer : this.producers.values()) {
            if (kafkaProducer != null) {
                try {
                    kafkaProducer.close();
                } catch (Throwable th) {
                    LOG.error("failed to close Kafka producer. Ignoring", th);
                }
            }
        }
        this.producers.clear();
        LOG.info("<== KafkaNotification.close()");
    }

    public <T> void send(NotificationInterface.NotificationType notificationType, List<T> list) throws NotificationException {
        KafkaProducer orCreateProducer = getOrCreateProducer(notificationType);
        String str = this.topicMap.get(notificationType);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < list.size(); i++) {
            T t = list.get(i);
            if (t instanceof NotificationContainer) {
                NotificationContainer notificationContainer = (NotificationContainer) t;
                ArrayList arrayList2 = new ArrayList();
                try {
                    createNotificationMessages(notificationContainer.getNotification(), arrayList2);
                    arrayList.addAll((Collection) arrayList2.stream().map(str2 -> {
                        ProducerRecord producerRecord = new ProducerRecord(str, notificationContainer.getTenant(), str2);
                        producerRecord.headers().add("tenant", notificationContainer.getTenant().getBytes(StandardCharsets.UTF_8));
                        producerRecord.headers().add("psrc", this.psrcId.getBytes(StandardCharsets.UTF_8));
                        return producerRecord;
                    }).collect(Collectors.toList()));
                } catch (Exception e) {
                    LOG.error("Error occurred while creating notification messages", e);
                    throw new RuntimeException("Error occurred while creating notification messages", e);
                }
            } else {
                try {
                    createNotificationMessages(null, new ArrayList());
                    arrayList.addAll((Collection) list.stream().map(obj -> {
                        return new ProducerRecord(str, obj);
                    }).collect(Collectors.toList()));
                } catch (Exception e2) {
                    LOG.error("Error occurred while creating notification messages", e2);
                    throw new RuntimeException("Error occurred while creating notification messages", e2);
                }
            }
        }
        sendInternalToProducer(orCreateProducer, arrayList);
    }

    public void sendInternal(NotificationInterface.NotificationType notificationType, List<String> list) throws NotificationException {
        KafkaProducer orCreateProducer = getOrCreateProducer(notificationType);
        String str = this.topicMap.get(notificationType);
        sendInternalToProducer(orCreateProducer, (List) list.stream().map(str2 -> {
            return new ProducerRecord(str, str2);
        }).collect(Collectors.toList()));
    }

    public <T> void sendInternalToProducer(Producer producer, List<ProducerRecord> list) throws NotificationException {
        ArrayList<MessageContext> arrayList = new ArrayList();
        for (ProducerRecord producerRecord : list) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Sending message for topic {}: {}", producerRecord.topic(), producerRecord.value());
            }
            arrayList.add(new MessageContext(producer.send(producerRecord), (String) producerRecord.value()));
        }
        ArrayList arrayList2 = new ArrayList();
        Exception exc = null;
        for (MessageContext messageContext : arrayList) {
            try {
                RecordMetadata recordMetadata = messageContext.getFuture().get();
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Sent message for topic - {}, partition - {}, offset - {}", new Object[]{recordMetadata.topic(), Integer.valueOf(recordMetadata.partition()), Long.valueOf(recordMetadata.offset())});
                }
            } catch (Exception e) {
                exc = e;
                arrayList2.add(messageContext.getMessage());
            }
        }
        if (exc != null) {
            throw new NotificationException(exc, arrayList2);
        }
    }

    @VisibleForTesting
    KafkaProducer getOrCreateProducer(NotificationInterface.NotificationType notificationType) {
        LOG.debug("==> KafkaNotification.getOrCreateProducer()");
        KafkaProducer computeIfAbsent = this.producers.computeIfAbsent(notificationType, notificationType2 -> {
            return new KafkaProducer(this.properties);
        });
        LOG.debug("<== KafkaNotification.getOrCreateProducer()");
        return computeIfAbsent;
    }

    public <T> List<NotificationConsumer<T>> createConsumers(NotificationInterface.NotificationType notificationType, int i) {
        return Lists.newArrayList();
    }

    private void createTopicIfNotFound() throws StoreInitializationException {
        SchemaRegistryConfig config = this.schemaRegistry.config();
        Properties properties = new Properties();
        KafkaStore.addSchemaRegistryConfigsToClientProperties(config, properties);
        properties.put("bootstrap.servers", config.bootstrapBrokers());
        int intValue = config.getInt("kafkastore.init.timeout.ms").intValue();
        try {
            AdminClient create = AdminClient.create(properties);
            Throwable th = null;
            try {
                try {
                    Set set = (Set) create.listTopics().names().get(intValue, TimeUnit.MILLISECONDS);
                    for (String str : this.topicMap.values()) {
                        if (!set.contains(str)) {
                            createTopic(create, str, intValue);
                        }
                    }
                    if (create != null) {
                        if (0 != 0) {
                            try {
                                create.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            create.close();
                        }
                    }
                } finally {
                }
            } catch (Throwable th3) {
                if (create != null) {
                    if (th != null) {
                        try {
                            create.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        create.close();
                    }
                }
                throw th3;
            }
        } catch (InterruptedException | ExecutionException e) {
            throw new StoreInitializationException("Failed trying to create or validate notification topic configuration", e);
        } catch (TimeoutException e2) {
            throw new StoreInitializationException("Timed out trying to create or validate notification topic configuration", e2);
        }
    }

    private void createTopic(AdminClient adminClient, String str, int i) throws StoreInitializationException, InterruptedException, ExecutionException, TimeoutException {
        LOG.info("Creating notification topic {}", str);
        int size = ((Collection) adminClient.describeCluster().nodes().get(i, TimeUnit.MILLISECONDS)).size();
        if (size <= 0) {
            throw new StoreInitializationException("No live Kafka brokers");
        }
        int min = Math.min(size, 3);
        if (min < 3) {
            LOG.warn("Creating the notification topic " + str + " using a replication factor of " + min + ", which is less than the desired one of 3. If this is a production environment, it's crucial to add more brokers and increase the replication factor of the topic.");
        }
        SchemaRegistryConfig config = this.schemaRegistry.config();
        NewTopic newTopic = new NewTopic(str, 6, (short) min);
        newTopic.configs(new HashMap(config.originalsWithPrefix("kafkastore.topic.config.")));
        try {
            adminClient.createTopics(Collections.singleton(newTopic)).all().get(i, TimeUnit.MILLISECONDS);
        } catch (ExecutionException e) {
            if (!(e.getCause() instanceof TopicExistsException)) {
                throw e;
            }
        }
    }
}
