package io.confluent.catalog.notification;

import io.confluent.catalog.DataCatalogConfig;
import io.confluent.catalog.metrics.MetricsManager;
import io.confluent.catalog.model.ModelConstants;
import io.confluent.catalog.util.Pair;
import io.confluent.catalog.util.QualifiedNameGenerator;
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig;
import io.confluent.kafka.schemaregistry.storage.SchemaRegistry;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.RequestContext;
import org.apache.atlas.discovery.AtlasDiscoveryService;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.discovery.SearchParameters;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasRelationship;
import org.apache.atlas.model.notification.EntityNotification;
import org.apache.atlas.notification.NotificationException;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.utils.AtlasPerfMetrics;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Singleton
@Component
/* loaded from: input_file:io/confluent/catalog/notification/EntityNotificationListenerV2.class */
public class EntityNotificationListenerV2 extends AbstractEntityNotificationListener {
    private static final Logger LOG = LoggerFactory.getLogger(EntityNotificationListenerV2.class);
    private final Boolean isEnabled;
    private final MetricsManager metricsManager;
    private final AtlasEntityStore entityStore;
    private final EntityNotificationSender<NotificationContainer> notificationSender;
    private final AtlasDiscoveryService discoveryService;
    private final Map<String, Queue<Pair<EntityNotification.EntityNotificationV2.OperationType, List<NotificationContainer>>>> messageQueues;
    private final Map<String, ReentrantLock> locks;

    @Inject
    public EntityNotificationListenerV2(AtlasTypeRegistry atlasTypeRegistry, @NotificationProcessor NotificationInterface notificationInterface, Configuration configuration, AtlasEntityStore atlasEntityStore, SchemaRegistry schemaRegistry, AtlasDiscoveryService atlasDiscoveryService, MetricsManager metricsManager) {
        super(atlasTypeRegistry);
        this.messageQueues = new ConcurrentHashMap();
        this.locks = new ConcurrentHashMap();
        try {
            this.entityStore = atlasEntityStore;
            this.discoveryService = atlasDiscoveryService;
            this.notificationSender = new EntityNotificationSender<>(notificationInterface, configuration);
            this.metricsManager = metricsManager;
            SchemaRegistryConfig config = schemaRegistry.config();
            String string = config.getString("host.name");
            DataCatalogConfig dataCatalogConfig = new DataCatalogConfig(config.originalProperties());
            this.isEnabled = Boolean.valueOf(string != null && StringUtils.isNotEmpty(dataCatalogConfig.getCatalogNotificationsDesignatedHostPrefix()) && string.startsWith(dataCatalogConfig.getCatalogNotificationsDesignatedHostPrefix()));
        } catch (Exception e) {
            throw new IllegalArgumentException("Could not instantiate EntityNotificationListenerV2", e);
        }
    }

    @Override // io.confluent.catalog.notification.AbstractEntityNotificationListener
    public void notifyEntityEvents(List<AtlasEntity> list, EntityNotification.EntityNotificationV2.OperationType operationType) throws AtlasBaseException {
        if (this.isEnabled.booleanValue()) {
            AtlasPerfMetrics.MetricRecorder startMetricRecord = RequestContext.get().startMetricRecord("entityNotification");
            ArrayList arrayList = new ArrayList();
            String str = null;
            for (AtlasEntity atlasEntity : list) {
                if (!GraphHelper.isInternalType(atlasEntity.getTypeName())) {
                    EntityNotification.EntityNotificationV2 entityNotificationV2 = new EntityNotification.EntityNotificationV2(toNotificationHeader(atlasEntity), operationType, RequestContext.get().getRequestTime());
                    if (atlasEntity.getAttribute(ModelConstants.ATTR_QUALIFIED_NAME) == null) {
                        throw new AtlasBaseException(AtlasErrorCode.ENTITY_NOTIFICATION_FAILED, new String[]{operationType.toString(), "entity missing qualified name"});
                    }
                    String str2 = (String) atlasEntity.getAttribute(ModelConstants.ATTR_QUALIFIED_NAME);
                    str = QualifiedNameGenerator.parseQualifiedName(str2)[0];
                    arrayList.add(new NotificationContainer(str2, entityNotificationV2));
                }
            }
            if (this.locks.containsKey(str) && this.locks.get(str).isLocked()) {
                Pair<EntityNotification.EntityNotificationV2.OperationType, List<NotificationContainer>> pair = new Pair<>(operationType, arrayList);
                this.messageQueues.computeIfAbsent(str, str3 -> {
                    return new LinkedList();
                });
                this.messageQueues.get(str).add(pair);
            } else {
                sendNotifications(operationType, arrayList);
            }
            RequestContext.get().endMetricRecord(startMetricRecord);
        }
    }

    public void notifyEntityRepublish(List<AtlasEntity> list, EntityNotification.EntityNotificationV2.OperationType operationType) throws AtlasBaseException {
        if (this.isEnabled.booleanValue()) {
            AtlasPerfMetrics.MetricRecorder startMetricRecord = RequestContext.get().startMetricRecord("entityNotificationSnapshot");
            ArrayList arrayList = new ArrayList();
            if (list != null) {
                for (AtlasEntity atlasEntity : list) {
                    if (!GraphHelper.isInternalType(atlasEntity.getTypeName())) {
                        EntityNotification.EntityNotificationV2 entityNotificationV2 = new EntityNotification.EntityNotificationV2(toNotificationHeader(atlasEntity), operationType, System.currentTimeMillis());
                        if (atlasEntity.getAttribute(ModelConstants.ATTR_QUALIFIED_NAME) == null) {
                            LOG.error("entity missing qualified name");
                            this.metricsManager.recordNotificationSentError(1);
                            throw new AtlasBaseException(AtlasErrorCode.ENTITY_NOTIFICATION_FAILED, new String[]{"entity missing qualified name"});
                        }
                        arrayList.add(new NotificationContainer((String) atlasEntity.getAttribute(ModelConstants.ATTR_QUALIFIED_NAME), entityNotificationV2));
                    }
                }
                try {
                    this.notificationSender.sendInlineNotifications(arrayList);
                    this.metricsManager.recordNotificationSent(arrayList.size());
                } catch (NotificationException e) {
                    this.metricsManager.recordNotificationSentError(arrayList.size());
                    throw new AtlasBaseException(AtlasErrorCode.ENTITY_NOTIFICATION_FAILED, e, new String[]{operationType.name()});
                }
            }
            RequestContext.get().endMetricRecord(startMetricRecord);
        }
    }

    @Override // io.confluent.catalog.notification.AbstractEntityNotificationListener
    protected void notifyRelationshipEvents(List<AtlasRelationship> list, EntityNotification.EntityNotificationV2.OperationType operationType) throws AtlasBaseException {
        if (this.isEnabled.booleanValue()) {
            AtlasPerfMetrics.MetricRecorder startMetricRecord = RequestContext.get().startMetricRecord("entityNotification");
            ArrayList arrayList = new ArrayList();
            for (AtlasRelationship atlasRelationship : list) {
                if (!GraphHelper.isInternalType(atlasRelationship.getTypeName())) {
                    EntityNotification.EntityNotificationV2 entityNotificationV2 = new EntityNotification.EntityNotificationV2(toNotificationHeader(atlasRelationship), operationType, RequestContext.get().getRequestTime());
                    AtlasEntity entity = this.entityStore.getById(atlasRelationship.getEnd1().getGuid()).getEntity();
                    if (entity.getAttribute(ModelConstants.ATTR_QUALIFIED_NAME) == null) {
                        throw new AtlasBaseException(AtlasErrorCode.ENTITY_NOTIFICATION_FAILED, new String[]{"entity missing qualified name"});
                    }
                    arrayList.add(new NotificationContainer((String) entity.getAttribute(ModelConstants.ATTR_QUALIFIED_NAME), entityNotificationV2));
                }
            }
            sendNotifications(operationType, arrayList);
            RequestContext.get().endMetricRecord(startMetricRecord);
        }
    }

    private void sendNotifications(EntityNotification.EntityNotificationV2.OperationType operationType, List<NotificationContainer> list) throws AtlasBaseException {
        if (list.isEmpty()) {
            return;
        }
        try {
            this.notificationSender.send(list);
            this.metricsManager.recordNotificationSent(list.size());
        } catch (NotificationException e) {
            this.metricsManager.recordNotificationSentError(list.size());
            throw new AtlasBaseException(AtlasErrorCode.ENTITY_NOTIFICATION_FAILED, e, new String[]{operationType.name()});
        }
    }

    public void republishEntity(String str) throws AtlasBaseException {
        lock(str);
        try {
            republishEntities(str);
        } finally {
            unlock(str);
        }
    }

    public void republishEntities(String str) throws AtlasBaseException {
        int i = 0;
        SearchParameters.FilterCriteria filterCriteria = new SearchParameters.FilterCriteria();
        filterCriteria.setAttributeName("tenant");
        filterCriteria.setOperator(SearchParameters.Operator.EQ);
        filterCriteria.setAttributeValue(str);
        SearchParameters searchParameters = new SearchParameters();
        searchParameters.setLimit(1000);
        HashSet hashSet = new HashSet();
        hashSet.add("tenant");
        hashSet.add(ModelConstants.ATTR_UPDATE_TIME);
        searchParameters.setTypeName("cf_entity");
        searchParameters.setAttributes(hashSet);
        searchParameters.setEntityFilters(filterCriteria);
        searchParameters.setExcludeDeletedEntities(true);
        boolean z = true;
        while (z) {
            searchParameters.setOffset(i);
            List entities = this.discoveryService.searchWithParameters(searchParameters).getEntities();
            ArrayList arrayList = new ArrayList();
            Iterator it = entities.iterator();
            while (it.hasNext()) {
                arrayList.add(this.entityStore.getById(((AtlasEntityHeader) it.next()).getGuid(), true, false).getEntity());
            }
            notifyEntityRepublish(arrayList, EntityNotification.EntityNotificationV2.OperationType.ENTITY_UPDATE);
            i += 1000;
            z = entities.size() == 1000;
        }
    }

    public void unlock(String str) {
        if (this.locks.containsKey(str)) {
            Queue<Pair<EntityNotification.EntityNotificationV2.OperationType, List<NotificationContainer>>> queue = this.messageQueues.get(str);
            while (!queue.isEmpty()) {
                try {
                    Pair<EntityNotification.EntityNotificationV2.OperationType, List<NotificationContainer>> poll = queue.poll();
                    if (poll != null) {
                        try {
                            this.notificationSender.sendInlineNotifications(poll.getValue());
                            this.metricsManager.recordNotificationSent(1);
                        } catch (NotificationException e) {
                            this.metricsManager.recordNotificationSentError(1);
                            LOG.error("Failed to send notification for entity {} in operation {}", new Object[]{poll.getKey().name(), poll.getValue(), e});
                        }
                    }
                } finally {
                    this.locks.get(str).unlock();
                }
            }
        }
    }

    public void lock(String str) throws AtlasBaseException {
        this.locks.computeIfAbsent(str, str2 -> {
            return new ReentrantLock();
        });
        if (this.locks.get(str).isLocked()) {
            LOG.error("multiple snapshots on one tenant");
            throw new AtlasBaseException("multiple snapshots on one tenant");
        }
        this.locks.get(str).lock();
    }
}
