/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.catalog.notification;

import io.confluent.catalog.DataCatalogConfig;
import io.confluent.catalog.metrics.MetricsManager;
import io.confluent.catalog.notification.AbstractEntityNotificationListener;
import io.confluent.catalog.notification.EntityNotificationSender;
import io.confluent.catalog.notification.NotificationContainer;
import io.confluent.catalog.notification.NotificationProcessor;
import io.confluent.catalog.util.Pair;
import io.confluent.catalog.util.QualifiedNameGenerator;
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig;
import io.confluent.kafka.schemaregistry.storage.SchemaRegistry;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.RequestContext;
import org.apache.atlas.discovery.AtlasDiscoveryService;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.discovery.AtlasSearchResult;
import org.apache.atlas.model.discovery.SearchParameters;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasRelationship;
import org.apache.atlas.model.notification.EntityNotification;
import org.apache.atlas.notification.NotificationException;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.utils.AtlasPerfMetrics;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
@Singleton
public class EntityNotificationListenerV2
extends AbstractEntityNotificationListener {
    private static final Logger LOG = LoggerFactory.getLogger(EntityNotificationListenerV2.class);
    private final Boolean isEnabled;
    private final MetricsManager metricsManager;
    private final AtlasEntityStore entityStore;
    private final EntityNotificationSender<NotificationContainer> notificationSender;
    private final AtlasDiscoveryService discoveryService;
    private final Map<String, Queue<Pair<EntityNotification.EntityNotificationV2.OperationType, List<NotificationContainer>>>> messageQueues = new ConcurrentHashMap<String, Queue<Pair<EntityNotification.EntityNotificationV2.OperationType, List<NotificationContainer>>>>();
    private final Map<String, ReentrantLock> locks = new ConcurrentHashMap<String, ReentrantLock>();

    @Inject
    public EntityNotificationListenerV2(AtlasTypeRegistry typeRegistry, @NotificationProcessor NotificationInterface notificationInterface, Configuration configuration, AtlasEntityStore entityStore, SchemaRegistry schemaRegistry, AtlasDiscoveryService discoveryService, MetricsManager metricsManager) {
        super(typeRegistry);
        try {
            this.entityStore = entityStore;
            this.discoveryService = discoveryService;
            this.notificationSender = new EntityNotificationSender(notificationInterface, configuration);
            this.metricsManager = metricsManager;
            SchemaRegistryConfig schemaRegistryConfig = schemaRegistry.config();
            String host = schemaRegistryConfig.getString("host.name");
            DataCatalogConfig dataCatalogConfig = new DataCatalogConfig(schemaRegistryConfig.originalProperties());
            this.isEnabled = host != null && StringUtils.isNotEmpty((String)dataCatalogConfig.getCatalogNotificationsDesignatedHostPrefix()) && host.startsWith(dataCatalogConfig.getCatalogNotificationsDesignatedHostPrefix());
        }
        catch (Exception e) {
            throw new IllegalArgumentException("Could not instantiate EntityNotificationListenerV2", e);
        }
    }

    @Override
    public void notifyEntityEvents(List<AtlasEntity> entities, EntityNotification.EntityNotificationV2.OperationType operationType) throws AtlasBaseException {
        if (!this.isEnabled.booleanValue()) {
            return;
        }
        AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("entityNotification");
        ArrayList<NotificationContainer> messages = new ArrayList<NotificationContainer>();
        String tenantName = null;
        for (AtlasEntity entity : entities) {
            if (GraphHelper.isInternalType((String)entity.getTypeName())) continue;
            EntityNotification.EntityNotificationV2 notification = new EntityNotification.EntityNotificationV2(this.toNotificationHeader(entity), operationType, RequestContext.get().getRequestTime());
            if (entity.getAttribute("qualifiedName") == null) {
                throw new AtlasBaseException(AtlasErrorCode.ENTITY_NOTIFICATION_FAILED, new String[]{operationType.toString(), "entity missing qualified name"});
            }
            String qualifiedName = (String)entity.getAttribute("qualifiedName");
            tenantName = QualifiedNameGenerator.parseQualifiedName(qualifiedName)[0];
            messages.add(new NotificationContainer<EntityNotification.EntityNotificationV2>(qualifiedName, notification));
        }
        if (!this.locks.containsKey(tenantName) || !this.locks.get(tenantName).isLocked()) {
            this.sendNotifications(operationType, messages);
        } else {
            Pair<EntityNotification.EntityNotificationV2.OperationType, ArrayList<NotificationContainer>> pair = new Pair<EntityNotification.EntityNotificationV2.OperationType, ArrayList<NotificationContainer>>(operationType, messages);
            this.messageQueues.computeIfAbsent(tenantName, k -> new LinkedList());
            this.messageQueues.get(tenantName).add(pair);
        }
        RequestContext.get().endMetricRecord(metric);
    }

    public void notifyEntityRepublish(List<AtlasEntity> entities, EntityNotification.EntityNotificationV2.OperationType operationType) throws AtlasBaseException {
        if (!this.isEnabled.booleanValue()) {
            return;
        }
        AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("entityNotificationSnapshot");
        ArrayList<NotificationContainer<EntityNotification.EntityNotificationV2>> messages = new ArrayList<NotificationContainer<EntityNotification.EntityNotificationV2>>();
        if (entities != null) {
            for (AtlasEntity entity : entities) {
                if (GraphHelper.isInternalType((String)entity.getTypeName())) continue;
                EntityNotification.EntityNotificationV2 notification = new EntityNotification.EntityNotificationV2(this.toNotificationHeader(entity), operationType, System.currentTimeMillis());
                if (entity.getAttribute("qualifiedName") == null) {
                    LOG.error("entity missing qualified name");
                    this.metricsManager.recordNotificationSentError(1);
                    throw new AtlasBaseException(AtlasErrorCode.ENTITY_NOTIFICATION_FAILED, new String[]{"entity missing qualified name"});
                }
                String qualifiedName = (String)entity.getAttribute("qualifiedName");
                messages.add(new NotificationContainer<EntityNotification.EntityNotificationV2>(qualifiedName, notification));
            }
            try {
                this.notificationSender.sendInlineNotifications(messages);
                this.metricsManager.recordNotificationSent(messages.size());
            }
            catch (NotificationException e) {
                this.metricsManager.recordNotificationSentError(messages.size());
                throw new AtlasBaseException(AtlasErrorCode.ENTITY_NOTIFICATION_FAILED, (Throwable)e, new String[]{operationType.name()});
            }
        }
        RequestContext.get().endMetricRecord(metric);
    }

    @Override
    protected void notifyRelationshipEvents(List<AtlasRelationship> relationships, EntityNotification.EntityNotificationV2.OperationType operationType) throws AtlasBaseException {
        if (!this.isEnabled.booleanValue()) {
            return;
        }
        AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("entityNotification");
        ArrayList<NotificationContainer> messages = new ArrayList<NotificationContainer>();
        for (AtlasRelationship relationship : relationships) {
            if (GraphHelper.isInternalType((String)relationship.getTypeName())) continue;
            EntityNotification.EntityNotificationV2 notification = new EntityNotification.EntityNotificationV2(this.toNotificationHeader(relationship), operationType, RequestContext.get().getRequestTime());
            AtlasEntity entity = this.entityStore.getById(relationship.getEnd1().getGuid()).getEntity();
            if (entity.getAttribute("qualifiedName") == null) {
                throw new AtlasBaseException(AtlasErrorCode.ENTITY_NOTIFICATION_FAILED, new String[]{"entity missing qualified name"});
            }
            String qualifiedName = (String)entity.getAttribute("qualifiedName");
            messages.add(new NotificationContainer<EntityNotification.EntityNotificationV2>(qualifiedName, notification));
        }
        this.sendNotifications(operationType, messages);
        RequestContext.get().endMetricRecord(metric);
    }

    private void sendNotifications(EntityNotification.EntityNotificationV2.OperationType operationType, List<NotificationContainer> messages) throws AtlasBaseException {
        if (!messages.isEmpty()) {
            try {
                this.notificationSender.send(messages);
                this.metricsManager.recordNotificationSent(messages.size());
            }
            catch (NotificationException e) {
                this.metricsManager.recordNotificationSentError(messages.size());
                throw new AtlasBaseException(AtlasErrorCode.ENTITY_NOTIFICATION_FAILED, (Throwable)e, new String[]{operationType.name()});
            }
        }
    }

    public void republishEntity(String tenantName) throws AtlasBaseException {
        this.lock(tenantName);
        try {
            this.republishEntities(tenantName);
        }
        finally {
            this.unlock(tenantName);
        }
    }

    public void republishEntities(String tenantName) throws AtlasBaseException {
        int offset = 0;
        SearchParameters.FilterCriteria attrFilters = new SearchParameters.FilterCriteria();
        attrFilters.setAttributeName("tenant");
        attrFilters.setOperator(SearchParameters.Operator.EQ);
        attrFilters.setAttributeValue(tenantName);
        SearchParameters searchParams = new SearchParameters();
        int limit = 1000;
        searchParams.setLimit(limit);
        HashSet<String> attributes = new HashSet<String>();
        attributes.add("tenant");
        attributes.add("updateTime");
        searchParams.setTypeName("cf_entity");
        searchParams.setAttributes(attributes);
        searchParams.setEntityFilters(attrFilters);
        searchParams.setExcludeDeletedEntities(true);
        boolean hasMoreResults = true;
        while (hasMoreResults) {
            searchParams.setOffset(offset);
            AtlasSearchResult searchResult = this.discoveryService.searchWithParameters(searchParams);
            List entityHeaders = searchResult.getEntities();
            ArrayList<AtlasEntity> entities = new ArrayList<AtlasEntity>();
            for (AtlasEntityHeader entityHeader : entityHeaders) {
                AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = this.entityStore.getById(entityHeader.getGuid(), true, false);
                AtlasEntity fullEntity = entityWithExtInfo.getEntity();
                entities.add(fullEntity);
            }
            this.notifyEntityRepublish(entities, EntityNotification.EntityNotificationV2.OperationType.ENTITY_UPDATE);
            offset += limit;
            hasMoreResults = entityHeaders.size() == limit;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void unlock(String tenantName) {
        if (this.locks.containsKey(tenantName)) {
            Queue<Pair<EntityNotification.EntityNotificationV2.OperationType, List<NotificationContainer>>> tenantQueue = this.messageQueues.get(tenantName);
            try {
                while (!tenantQueue.isEmpty()) {
                    Pair<EntityNotification.EntityNotificationV2.OperationType, List<NotificationContainer>> message = tenantQueue.poll();
                    if (message == null) continue;
                    try {
                        this.notificationSender.sendInlineNotifications(message.getValue());
                        this.metricsManager.recordNotificationSent(1);
                    }
                    catch (NotificationException e) {
                        this.metricsManager.recordNotificationSentError(1);
                        LOG.error("Failed to send notification for entity {} in operation {}", new Object[]{message.getKey().name(), message.getValue(), e});
                    }
                }
            }
            finally {
                this.locks.get(tenantName).unlock();
            }
        }
    }

    public void lock(String tenantName) throws AtlasBaseException {
        this.locks.computeIfAbsent(tenantName, k -> new ReentrantLock());
        if (this.locks.get(tenantName).isLocked()) {
            LOG.error("multiple snapshots on one tenant");
            throw new AtlasBaseException("multiple snapshots on one tenant");
        }
        this.locks.get(tenantName).lock();
    }
}

