/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.catalog.notification;

import com.google.common.annotations.VisibleForTesting;
import io.confluent.catalog.DataCatalogConfig;
import io.confluent.catalog.notification.NotificationContainer;
import io.confluent.catalog.notification.NotificationSender;
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig;
import io.confluent.kafka.schemaregistry.storage.KafkaStore;
import io.confluent.kafka.schemaregistry.storage.SchemaRegistry;
import io.confluent.kafka.schemaregistry.storage.exceptions.StoreInitializationException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.inject.Inject;
import org.apache.atlas.AtlasException;
import org.apache.atlas.notification.AbstractNotification;
import org.apache.atlas.notification.NotificationConsumer;
import org.apache.atlas.notification.NotificationException;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.service.Service;
import org.apache.commons.compress.utils.Lists;
import org.apache.commons.configuration.Configuration;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

@Component
@Order(value=4)
@NotificationSender
public class MultiTenantKafkaNotification
extends AbstractNotification
implements Service {
    public static final Logger LOG = LoggerFactory.getLogger(MultiTenantKafkaNotification.class);
    private Properties properties;
    private final SchemaRegistry schemaRegistry;
    private String psrcId = "";
    private final Map<NotificationInterface.NotificationType, KafkaProducer> producers = new ConcurrentHashMap<NotificationInterface.NotificationType, KafkaProducer>();
    private final Map<NotificationInterface.NotificationType, String> topicMap = new HashMap<NotificationInterface.NotificationType, String>();

    @Inject
    public MultiTenantKafkaNotification(Configuration applicationProperties, SchemaRegistry schemaRegistry) throws AtlasException {
        super(applicationProperties);
        LOG.info("==> MultiTenantKafkaNotification()");
        this.schemaRegistry = schemaRegistry;
        SchemaRegistryConfig config = schemaRegistry.config();
        try {
            this.psrcId = config.getString("psrc.id");
        }
        catch (ConfigException ce) {
            LOG.warn("config error with psrc.id");
        }
        try {
            this.properties = new Properties();
            KafkaStore.addSchemaRegistryConfigsToClientProperties((SchemaRegistryConfig)config, (Properties)this.properties);
            this.properties.put("bootstrap.servers", config.bootstrapBrokers());
            this.properties.put("acks", "all");
            this.properties.put("retries", (Object)3);
            this.properties.put("enable.idempotence", (Object)true);
            this.properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            this.properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            DataCatalogConfig dataCatalogConfig = new DataCatalogConfig(config.originalProperties());
            this.topicMap.put(NotificationInterface.NotificationType.ENTITIES, dataCatalogConfig.getCatalogNotificationsTopic());
            this.topicMap.put(NotificationInterface.NotificationType.HOOK, dataCatalogConfig.getCatalogNotificationsHookTopic());
            this.createTopicIfNotFound();
            LOG.info("<== KafkaNotification()");
        }
        catch (Exception e) {
            throw new IllegalArgumentException("Could not instantiate MultiTenantKafkaNotification", e);
        }
    }

    public void start() throws AtlasException {
        LOG.info("==> KafkaNotification.start()");
        LOG.info("<== KafkaNotification.start()");
    }

    public void stop() {
        LOG.info("==> KafkaNotification.stop()");
        LOG.info("<== KafkaNotification.stop()");
    }

    public boolean isReady(NotificationInterface.NotificationType notificationType) {
        try {
            this.getOrCreateProducer(notificationType);
            return true;
        }
        catch (Exception exception) {
            LOG.error("Error: Connecting... {}", (Object)exception.getMessage());
            return false;
        }
    }

    public void close() {
        LOG.info("==> KafkaNotification.close()");
        for (KafkaProducer producer : this.producers.values()) {
            if (producer == null) continue;
            try {
                producer.close();
            }
            catch (Throwable t) {
                LOG.error("failed to close Kafka producer. Ignoring", t);
            }
        }
        this.producers.clear();
        LOG.info("<== KafkaNotification.close()");
    }

    public <T> void send(NotificationInterface.NotificationType notificationType, List<T> messages) throws NotificationException {
        KafkaProducer producer = this.getOrCreateProducer(notificationType);
        String topic = this.topicMap.get(notificationType);
        ArrayList<ProducerRecord> records = new ArrayList<ProducerRecord>();
        for (int index = 0; index < messages.size(); ++index) {
            T message = messages.get(index);
            if (message instanceof NotificationContainer) {
                NotificationContainer envelope = (NotificationContainer)message;
                ArrayList strMessages = new ArrayList();
                try {
                    MultiTenantKafkaNotification.createNotificationMessages(envelope.getNotification(), strMessages);
                }
                catch (Exception e) {
                    LOG.error("Error occurred while creating notification messages", (Throwable)e);
                    throw new RuntimeException("Error occurred while creating notification messages", e);
                }
                records.addAll(strMessages.stream().map(strMessage -> {
                    ProducerRecord record = new ProducerRecord(topic, (Object)envelope.getTenant(), strMessage);
                    record.headers().add("tenant", envelope.getTenant().getBytes(StandardCharsets.UTF_8));
                    record.headers().add("psrc", this.psrcId.getBytes(StandardCharsets.UTF_8));
                    return record;
                }).collect(Collectors.toList()));
                continue;
            }
            ArrayList strMessages = new ArrayList();
            try {
                MultiTenantKafkaNotification.createNotificationMessages(null, strMessages);
            }
            catch (Exception e) {
                LOG.error("Error occurred while creating notification messages", (Throwable)e);
                throw new RuntimeException("Error occurred while creating notification messages", e);
            }
            records.addAll(messages.stream().map(m -> new ProducerRecord(topic, m)).collect(Collectors.toList()));
        }
        this.sendInternalToProducer((Producer)producer, records);
    }

    public void sendInternal(NotificationInterface.NotificationType notificationType, List<String> messages) throws NotificationException {
        KafkaProducer producer = this.getOrCreateProducer(notificationType);
        String topic = this.topicMap.get(notificationType);
        List<ProducerRecord> records = messages.stream().map(m -> new ProducerRecord(topic, m)).collect(Collectors.toList());
        this.sendInternalToProducer((Producer)producer, records);
    }

    public <T> void sendInternalToProducer(Producer p, List<ProducerRecord> records) throws NotificationException {
        ArrayList<MessageContext> messageContexts = new ArrayList<MessageContext>();
        for (ProducerRecord record : records) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Sending message for topic {}: {}", (Object)record.topic(), record.value());
            }
            Future future = p.send(record);
            messageContexts.add(new MessageContext(future, (String)record.value()));
        }
        ArrayList<String> failedMessages = new ArrayList<String>();
        Exception lastFailureException = null;
        for (MessageContext context : messageContexts) {
            try {
                RecordMetadata response = context.getFuture().get();
                if (!LOG.isDebugEnabled()) continue;
                LOG.debug("Sent message for topic - {}, partition - {}, offset - {}", new Object[]{response.topic(), response.partition(), response.offset()});
            }
            catch (Exception e) {
                lastFailureException = e;
                failedMessages.add(context.getMessage());
            }
        }
        if (lastFailureException != null) {
            throw new NotificationException(lastFailureException, failedMessages);
        }
    }

    @VisibleForTesting
    KafkaProducer getOrCreateProducer(NotificationInterface.NotificationType notificationType) {
        LOG.debug("==> KafkaNotification.getOrCreateProducer()");
        KafkaProducer ret = this.producers.computeIfAbsent(notificationType, n -> new KafkaProducer(this.properties));
        LOG.debug("<== KafkaNotification.getOrCreateProducer()");
        return ret;
    }

    public <T> List<NotificationConsumer<T>> createConsumers(NotificationInterface.NotificationType notificationType, int numConsumers) {
        return Lists.newArrayList();
    }

    private void createTopicIfNotFound() throws StoreInitializationException {
        SchemaRegistryConfig config = this.schemaRegistry.config();
        Properties props = new Properties();
        KafkaStore.addSchemaRegistryConfigsToClientProperties((SchemaRegistryConfig)config, (Properties)props);
        props.put("bootstrap.servers", config.bootstrapBrokers());
        int initTimeout = config.getInt("kafkastore.init.timeout.ms");
        try (AdminClient admin = AdminClient.create((Properties)props);){
            Set allTopics = (Set)admin.listTopics().names().get((long)initTimeout, TimeUnit.MILLISECONDS);
            for (String topic : this.topicMap.values()) {
                if (allTopics.contains(topic)) continue;
                this.createTopic(admin, topic, initTimeout);
            }
        }
        catch (TimeoutException e) {
            throw new StoreInitializationException("Timed out trying to create or validate notification topic configuration", (Throwable)e);
        }
        catch (InterruptedException | ExecutionException e) {
            throw new StoreInitializationException("Failed trying to create or validate notification topic configuration", (Throwable)e);
        }
    }

    private void createTopic(AdminClient admin, String topic, int initTimeout) throws StoreInitializationException, InterruptedException, ExecutionException, TimeoutException {
        block4: {
            LOG.info("Creating notification topic {}", (Object)topic);
            int numLiveBrokers = ((Collection)admin.describeCluster().nodes().get((long)initTimeout, TimeUnit.MILLISECONDS)).size();
            if (numLiveBrokers <= 0) {
                throw new StoreInitializationException("No live Kafka brokers");
            }
            int desiredReplicationFactor = 3;
            int topicReplicationFactor = Math.min(numLiveBrokers, desiredReplicationFactor);
            if (topicReplicationFactor < desiredReplicationFactor) {
                LOG.warn("Creating the notification topic " + topic + " using a replication factor of " + topicReplicationFactor + ", which is less than the desired one of " + desiredReplicationFactor + ". If this is a production environment, it's crucial to add more brokers and increase the replication factor of the topic.");
            }
            int desiredPartitionCount = 6;
            SchemaRegistryConfig config = this.schemaRegistry.config();
            NewTopic schemaTopicRequest = new NewTopic(topic, desiredPartitionCount, (short)topicReplicationFactor);
            HashMap topicConfigs = new HashMap(config.originalsWithPrefix("kafkastore.topic.config."));
            schemaTopicRequest.configs(topicConfigs);
            try {
                admin.createTopics(Collections.singleton(schemaTopicRequest)).all().get((long)initTimeout, TimeUnit.MILLISECONDS);
            }
            catch (ExecutionException e) {
                if (e.getCause() instanceof TopicExistsException) break block4;
                throw e;
            }
        }
    }

    private static class MessageContext {
        private final Future<RecordMetadata> future;
        private final String message;

        public MessageContext(Future<RecordMetadata> future, String message) {
            this.future = future;
            this.message = message;
        }

        public Future<RecordMetadata> getFuture() {
            return this.future;
        }

        public String getMessage() {
            return this.message;
        }
    }
}

