package io.confluent.catalog.hook;

import io.confluent.catalog.DataCatalogConfig;
import io.confluent.catalog.hook.SchemaAtlasHook;
import io.confluent.catalog.metrics.MetricsManager;
import io.confluent.catalog.model.ModelConstants;
import io.confluent.catalog.util.QualifiedNameGenerator;
import io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry;
import io.confluent.kafka.schemaregistry.storage.SchemaKey;
import io.confluent.kafka.schemaregistry.storage.SchemaRegistry;
import io.confluent.kafka.schemaregistry.storage.SchemaValue;
import io.confluent.kafka.schemaregistry.storage.SubjectKey;
import io.confluent.kafka.schemaregistry.storage.exceptions.StoreException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.inject.Inject;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.AtlasException;
import org.apache.atlas.RequestContext;
import org.apache.atlas.annotation.GraphTransaction;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.notification.HookNotification;
import org.apache.atlas.notification.AbstractNotification;
import org.apache.atlas.repository.converters.AtlasInstanceConverter;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.v2.AtlasEntityStream;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.util.AtlasMetricsUtil;
import org.apache.atlas.v1.model.notification.HookNotificationV1;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.DependsOn;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

@DependsOn({"atlasTypeDefStoreInitializer", "atlasTypeDefGraphStoreV2"})
@Component
@Order(5)
/* loaded from: input_file:io/confluent/catalog/hook/SchemaAtlasProcessor.class */
public class SchemaAtlasProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(SchemaAtlasProcessor.class);
    private static final Logger FAILED_LOG = LoggerFactory.getLogger("FAILED");
    private static final int SC_OK = 200;
    private static final int SC_BAD_REQUEST = 400;
    private final KafkaSchemaRegistry schemaRegistry;
    private final AtlasEntityStore atlasEntityStore;
    private final AtlasInstanceConverter instanceConverter;
    private final AtlasTypeRegistry typeRegistry;
    private final AtlasGraphUtils atlasGraphUtils;
    private final MetricsManager metricsManager;
    private final int maxRetries = 3;
    private final int retryInterval = DataCatalogConfig.CATALOG_MAX_LIMIT_PER_ITERATION_DEFAULT;
    private final int failedMsgCacheSize = 1;
    private final int commitBatchSize = 50;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.confluent.catalog.hook.SchemaAtlasProcessor$1, reason: invalid class name */
    /* loaded from: input_file:io/confluent/catalog/hook/SchemaAtlasProcessor$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$atlas$model$notification$HookNotification$HookNotificationType;

        static {
            try {
                $SwitchMap$io$confluent$catalog$hook$SchemaAtlasTypes[SchemaAtlasTypes.SR_SUBJECT_VERSION.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$confluent$catalog$hook$SchemaAtlasTypes[SchemaAtlasTypes.SR_SCHEMA.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$io$confluent$catalog$hook$SchemaAtlasTypes[SchemaAtlasTypes.SR_RECORD.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$io$confluent$catalog$hook$SchemaAtlasTypes[SchemaAtlasTypes.SR_FIELD.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            $SwitchMap$org$apache$atlas$model$notification$HookNotification$HookNotificationType = new int[HookNotification.HookNotificationType.values().length];
            try {
                $SwitchMap$org$apache$atlas$model$notification$HookNotification$HookNotificationType[HookNotification.HookNotificationType.ENTITY_CREATE.ordinal()] = 1;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$atlas$model$notification$HookNotification$HookNotificationType[HookNotification.HookNotificationType.ENTITY_PARTIAL_UPDATE.ordinal()] = 2;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$org$apache$atlas$model$notification$HookNotification$HookNotificationType[HookNotification.HookNotificationType.ENTITY_DELETE.ordinal()] = 3;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$org$apache$atlas$model$notification$HookNotification$HookNotificationType[HookNotification.HookNotificationType.ENTITY_FULL_UPDATE.ordinal()] = 4;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$org$apache$atlas$model$notification$HookNotification$HookNotificationType[HookNotification.HookNotificationType.ENTITY_CREATE_V2.ordinal()] = 5;
            } catch (NoSuchFieldError e9) {
            }
            try {
                $SwitchMap$org$apache$atlas$model$notification$HookNotification$HookNotificationType[HookNotification.HookNotificationType.ENTITY_PARTIAL_UPDATE_V2.ordinal()] = 6;
            } catch (NoSuchFieldError e10) {
            }
            try {
                $SwitchMap$org$apache$atlas$model$notification$HookNotification$HookNotificationType[HookNotification.HookNotificationType.ENTITY_FULL_UPDATE_V2.ordinal()] = 7;
            } catch (NoSuchFieldError e11) {
            }
            try {
                $SwitchMap$org$apache$atlas$model$notification$HookNotification$HookNotificationType[HookNotification.HookNotificationType.ENTITY_DELETE_V2.ordinal()] = 8;
            } catch (NoSuchFieldError e12) {
            }
        }
    }

    @Inject
    public SchemaAtlasProcessor(SchemaRegistry schemaRegistry, AtlasEntityStore atlasEntityStore, AtlasInstanceConverter atlasInstanceConverter, AtlasTypeRegistry atlasTypeRegistry, AtlasGraphUtils atlasGraphUtils, MetricsManager metricsManager) throws AtlasException {
        this.schemaRegistry = (KafkaSchemaRegistry) schemaRegistry;
        this.atlasEntityStore = atlasEntityStore;
        this.instanceConverter = atlasInstanceConverter;
        this.typeRegistry = atlasTypeRegistry;
        this.atlasGraphUtils = atlasGraphUtils;
        this.metricsManager = metricsManager;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handleMessage(SchemaOperationContext schemaOperationContext, HookNotification hookNotification, boolean z) {
        EntityMutationResponse updateEntity;
        ArrayList arrayList = new ArrayList();
        String user = hookNotification.getUser();
        long currentTimeMillis = System.currentTimeMillis();
        AtlasMetricsUtil.NotificationStat notificationStat = new AtlasMetricsUtil.NotificationStat();
        try {
            if (isEmptyMessage(hookNotification)) {
                notificationStat.timeTakenMs = System.currentTimeMillis() - currentTimeMillis;
                return;
            }
            int i = 0;
            while (true) {
                if (i >= 3) {
                    break;
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("handleMessage({}): attempt {}", hookNotification.getType().name(), Integer.valueOf(i));
                }
                try {
                    try {
                        RequestContext requestContext = RequestContext.get();
                        requestContext.setAttemptCount(i + 1);
                        requestContext.setMaxAttempts(3);
                        requestContext.setUser(user, (Set) null);
                        requestContext.setInNotificationProcessing(true);
                        requestContext.setCreateShellEntityForNonExistingReference(false);
                        switch (AnonymousClass1.$SwitchMap$org$apache$atlas$model$notification$HookNotification$HookNotificationType[hookNotification.getType().ordinal()]) {
                            case 1:
                                createOrUpdate(this.instanceConverter.toAtlasEntities(((HookNotificationV1.EntityCreateRequest) hookNotification).getEntities()), false, notificationStat);
                                break;
                            case 2:
                                HookNotificationV1.EntityPartialUpdateRequest entityPartialUpdateRequest = (HookNotificationV1.EntityPartialUpdateRequest) hookNotification;
                                AtlasEntity.AtlasEntitiesWithExtInfo atlasEntity = this.instanceConverter.toAtlasEntity(entityPartialUpdateRequest.getEntity());
                                ((AtlasEntity) atlasEntity.getEntities().get(0)).setGuid(getGuidByUniqueAttributes(this.typeRegistry.getEntityTypeByName(entityPartialUpdateRequest.getTypeName()), Collections.singletonMap(entityPartialUpdateRequest.getAttribute(), entityPartialUpdateRequest.getAttributeValue()), AtlasEntity.Status.ACTIVE));
                                createOrUpdate(atlasEntity, true, notificationStat);
                                break;
                            case 3:
                                HookNotificationV1.EntityDeleteRequest entityDeleteRequest = (HookNotificationV1.EntityDeleteRequest) hookNotification;
                                try {
                                    EntityMutationResponse deleteByUniqueAttributes = this.atlasEntityStore.deleteByUniqueAttributes(this.typeRegistry.getType(entityDeleteRequest.getTypeName()), Collections.singletonMap(entityDeleteRequest.getAttribute(), entityDeleteRequest.getAttributeValue()));
                                    notificationStat.updateStats(deleteByUniqueAttributes);
                                    logEntityMutationResponse(deleteByUniqueAttributes);
                                } catch (ClassCastException e) {
                                    LOG.error("Failed to delete entity {}", entityDeleteRequest);
                                }
                                break;
                            case 4:
                                createOrUpdate(this.instanceConverter.toAtlasEntities(((HookNotificationV1.EntityUpdateRequest) hookNotification).getEntities()), false, notificationStat);
                                break;
                            case 5:
                                AtlasEntity.AtlasEntitiesWithExtInfo entities = ((HookNotification.EntityCreateRequestV2) hookNotification).getEntities();
                                createOrUpdate(entities, false, notificationStat);
                                if (z) {
                                    try {
                                        propagateTagsAndBMs(schemaOperationContext, entities);
                                    } catch (Exception e2) {
                                        LOG.error("Failed to propagate tags or BMs for entities {}", entities, e2);
                                    }
                                }
                                break;
                            case 6:
                                HookNotification.EntityPartialUpdateRequestV2 entityPartialUpdateRequestV2 = (HookNotification.EntityPartialUpdateRequestV2) hookNotification;
                                AtlasObjectId entityId = entityPartialUpdateRequestV2.getEntityId();
                                AtlasEntity.AtlasEntityWithExtInfo entity = entityPartialUpdateRequestV2.getEntity();
                                try {
                                    updateEntity = this.atlasEntityStore.updateEntity(entityId, entity, true);
                                } catch (Exception e3) {
                                    LOG.warn("Attempting create for {} on failed partial update", entityId, e3);
                                    AtlasEntity atlasEntity2 = new AtlasEntity(entity.getEntity());
                                    atlasEntity2.setRelationshipAttributes(new HashMap());
                                    createOrUpdate(new AtlasEntity.AtlasEntitiesWithExtInfo(atlasEntity2), false, notificationStat);
                                    updateEntity = this.atlasEntityStore.updateEntity(entityId, entity, true);
                                }
                                notificationStat.updateStats(updateEntity);
                                logEntityMutationResponse(updateEntity);
                                break;
                            case 7:
                                createOrUpdate(((HookNotification.EntityUpdateRequestV2) hookNotification).getEntities(), false, notificationStat);
                                break;
                            case 8:
                                HookNotification.EntityDeleteRequestV2 entityDeleteRequestV2 = (HookNotification.EntityDeleteRequestV2) hookNotification;
                                List<AtlasObjectId> entities2 = entityDeleteRequestV2.getEntities();
                                try {
                                    for (AtlasObjectId atlasObjectId : entities2) {
                                        AtlasEntityType atlasEntityType = (AtlasEntityType) this.typeRegistry.getType(atlasObjectId.getTypeName());
                                        if (entityDeleteRequestV2 instanceof SchemaAtlasHook.EntityPurgeRequestV2) {
                                            String str = null;
                                            try {
                                                str = getGuidByUniqueAttributes(atlasEntityType, atlasObjectId.getUniqueAttributes(), AtlasEntity.Status.DELETED);
                                            } catch (AtlasBaseException e4) {
                                            }
                                            if (str != null) {
                                                EntityMutationResponse purgeByIds = this.atlasEntityStore.purgeByIds(Collections.singleton(str));
                                                notificationStat.updateStats(purgeByIds);
                                                logEntityMutationResponse(purgeByIds);
                                            }
                                        } else {
                                            EntityMutationResponse deleteByUniqueAttributes2 = this.atlasEntityStore.deleteByUniqueAttributes(atlasEntityType, atlasObjectId.getUniqueAttributes());
                                            notificationStat.updateStats(deleteByUniqueAttributes2);
                                            logEntityMutationResponse(deleteByUniqueAttributes2);
                                        }
                                    }
                                } catch (ClassCastException e5) {
                                    LOG.error("Failed to do delete entities {}", entities2);
                                }
                                break;
                            default:
                                throw new IllegalStateException("Unknown notification type: " + hookNotification.getType().name());
                        }
                        RequestContext.clear();
                    } catch (Throwable th) {
                        RequestContext.get().resetEntityGuidUpdates();
                        if (i == 2) {
                            String messageJson = AbstractNotification.getMessageJson(hookNotification);
                            LOG.warn("Max retries exceeded for message {}", messageJson, th);
                            notificationStat.isFailedMsg = true;
                            arrayList.add(messageJson);
                            if (arrayList.size() >= 1) {
                                recordFailedMessages(arrayList);
                            }
                            RequestContext.clear();
                            notificationStat.timeTakenMs = System.currentTimeMillis() - currentTimeMillis;
                            return;
                        }
                        LOG.warn("Error handling " + hookNotification.getType().name() + " message, " + String.format("topic: %s, partition: %d, offset: %d", schemaOperationContext.getTopicPartition().topic(), Integer.valueOf(schemaOperationContext.getTopicPartition().partition()), Long.valueOf(schemaOperationContext.getOffset())), th);
                        try {
                            LOG.info("Sleeping for {} ms before retry", Integer.valueOf(DataCatalogConfig.CATALOG_MAX_LIMIT_PER_ITERATION_DEFAULT));
                            Thread.sleep(500L);
                        } catch (InterruptedException e6) {
                            LOG.error("Notification consumer thread sleep interrupted");
                        }
                        RequestContext.clear();
                        i++;
                    }
                } catch (Throwable th2) {
                    RequestContext.clear();
                    throw th2;
                }
            }
            notificationStat.timeTakenMs = System.currentTimeMillis() - currentTimeMillis;
            this.metricsManager.updateAtlasMetrics(schemaOperationContext.getTenant());
        } catch (Throwable th3) {
            notificationStat.timeTakenMs = System.currentTimeMillis() - currentTimeMillis;
            throw th3;
        }
    }

    private void createOrUpdate(AtlasEntity.AtlasEntitiesWithExtInfo atlasEntitiesWithExtInfo, boolean z, AtlasMetricsUtil.NotificationStat notificationStat) throws AtlasBaseException {
        List entities = atlasEntitiesWithExtInfo.getEntities();
        AtlasEntityStream atlasEntityStream = new AtlasEntityStream(atlasEntitiesWithExtInfo);
        if (entities.size() <= 50) {
            EntityMutationResponse createOrUpdate = this.atlasEntityStore.createOrUpdate(atlasEntityStream, z);
            notificationStat.updateStats(createOrUpdate);
            logEntityMutationResponse(createOrUpdate);
            return;
        }
        for (int i = 0; i < entities.size(); i += 50) {
            int i2 = i + 50;
            if (i2 > entities.size()) {
                i2 = entities.size();
            }
            EntityMutationResponse createOrUpdate2 = this.atlasEntityStore.createOrUpdate(new AtlasEntityStream(new AtlasEntity.AtlasEntitiesWithExtInfo(new ArrayList(entities.subList(i, i2))), atlasEntityStream), z);
            notificationStat.updateStats(createOrUpdate2);
            logEntityMutationResponse(createOrUpdate2);
            RequestContext.get().resetEntityGuidUpdates();
            RequestContext.get().clearCache();
        }
    }

    private void logEntityMutationResponse(EntityMutationResponse entityMutationResponse) {
        if (entityMutationResponse == null) {
            return;
        }
        try {
            logSchemaEntityResponse(entityMutationResponse.getCreatedEntities(), "Created");
            logSchemaEntityResponse(entityMutationResponse.getUpdatedEntities(), "Updated");
            logSchemaEntityResponse(entityMutationResponse.getDeletedEntities(), "Deleted");
            logSchemaEntityResponse(entityMutationResponse.getPurgedEntities(), "Purged");
        } catch (Exception e) {
            LOG.error("Error logging EntityMutationResponse, should not have happened", e);
        }
    }

    private void logSchemaEntityResponse(List<AtlasEntityHeader> list, String str) {
        if (list == null) {
            return;
        }
        list.stream().filter(atlasEntityHeader -> {
            return SchemaAtlasTypes.SR_SUBJECT_VERSION.getName().equals(atlasEntityHeader.getTypeName());
        }).forEach(atlasEntityHeader2 -> {
            LOG.info("{} {} entity: {}", new Object[]{str, SchemaAtlasTypes.SR_SUBJECT_VERSION.getName(), atlasEntityHeader2.getAttribute(ModelConstants.ATTR_QUALIFIED_NAME)});
        });
    }

    private void propagateTagsAndBMs(SchemaOperationContext schemaOperationContext, AtlasEntity.AtlasEntitiesWithExtInfo atlasEntitiesWithExtInfo) throws StoreException {
        SubjectKey key = schemaOperationContext.getKey();
        if (key instanceof SchemaKey) {
            SchemaKey schemaKey = (SchemaKey) key;
            SchemaValue value = schemaOperationContext.getValue();
            if (schemaKey.getVersion() == 1) {
                return;
            }
            SchemaValue schemaValue = (SchemaValue) this.schemaRegistry.getLookupCache().get(new SchemaKey(schemaKey.getSubject(), schemaKey.getVersion() - 1));
            if (schemaValue == null || schemaValue.isDeleted()) {
                return;
            }
            int intValue = schemaValue.getId().intValue();
            int intValue2 = value.getId().intValue();
            Iterator it = atlasEntitiesWithExtInfo.getEntities().iterator();
            while (it.hasNext()) {
                try {
                    propagateEntityTagsAndBMs(schemaOperationContext, (AtlasEntity) it.next(), schemaKey, intValue2, intValue);
                } catch (AtlasBaseException e) {
                    LOG.error("Failed to propagate tags for schema {}", Integer.valueOf(intValue2), e);
                }
            }
        }
    }

    private void propagateEntityTagsAndBMs(SchemaOperationContext schemaOperationContext, AtlasEntity atlasEntity, SchemaKey schemaKey, int i, int i2) throws AtlasBaseException {
        List classifications;
        SchemaAtlasTypes valueOf = SchemaAtlasTypes.valueOf(atlasEntity.getTypeName().toUpperCase());
        String str = (String) atlasEntity.getAttribute(ModelConstants.ATTR_QUALIFIED_NAME);
        String str2 = null;
        switch (valueOf) {
            case SR_SUBJECT_VERSION:
                String[] parseQualifiedName = QualifiedNameGenerator.parseQualifiedName(str);
                if (parseQualifiedName.length > 3) {
                    parseQualifiedName[3] = String.valueOf(schemaKey.getVersion() - 1);
                    str2 = QualifiedNameGenerator.getQualifiedName(parseQualifiedName);
                    break;
                }
                break;
            case SR_SCHEMA:
            case SR_RECORD:
            case SR_FIELD:
                String[] parseQualifiedName2 = QualifiedNameGenerator.parseQualifiedName(str);
                if (parseQualifiedName2.length > 2) {
                    parseQualifiedName2[2] = String.valueOf(i2);
                    str2 = QualifiedNameGenerator.getQualifiedName(parseQualifiedName2);
                    break;
                }
                break;
        }
        if (str2 == null) {
            return;
        }
        String str3 = null;
        try {
            str3 = getGuidByUniqueAttributes(ensureEntityType(atlasEntity.getTypeName()), Collections.singletonMap(ModelConstants.ATTR_QUALIFIED_NAME, str2), AtlasEntity.Status.ACTIVE);
        } catch (AtlasBaseException e) {
        }
        if (str3 == null) {
            return;
        }
        if (((schemaOperationContext.getEmbeddedTags().isEmpty() && schemaOperationContext.getNewVersion() == null) || !(valueOf == SchemaAtlasTypes.SR_RECORD || valueOf == SchemaAtlasTypes.SR_FIELD)) && (classifications = this.atlasEntityStore.getClassifications(str3)) != null && !classifications.isEmpty()) {
            this.atlasEntityStore.addClassifications(atlasEntity.getGuid(), (List) classifications.stream().map(atlasClassification -> {
                AtlasClassification atlasClassification = new AtlasClassification(atlasClassification);
                atlasClassification.setEntityGuid(atlasEntity.getGuid());
                return atlasClassification;
            }).collect(Collectors.toList()));
        }
        Map businessAttributes = this.atlasEntityStore.getById(str3).getEntity().getBusinessAttributes();
        if (businessAttributes == null || businessAttributes.isEmpty()) {
            return;
        }
        this.atlasEntityStore.addOrUpdateBusinessAttributes(atlasEntity.getGuid(), businessAttributes, false);
    }

    private AtlasEntityType ensureEntityType(String str) throws AtlasBaseException {
        AtlasEntityType entityTypeByName = this.typeRegistry.getEntityTypeByName(str);
        if (entityTypeByName == null) {
            throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_INVALID, new String[]{TypeCategory.ENTITY.name(), str});
        }
        return entityTypeByName;
    }

    private void recordFailedMessages(List<String> list) {
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            FAILED_LOG.error("[DROPPED_NOTIFICATION] {}", it.next());
        }
        list.clear();
    }

    private boolean isEmptyMessage(HookNotification hookNotification) {
        boolean z;
        switch (AnonymousClass1.$SwitchMap$org$apache$atlas$model$notification$HookNotification$HookNotificationType[hookNotification.getType().ordinal()]) {
            case 5:
                AtlasEntity.AtlasEntitiesWithExtInfo entities = ((HookNotification.EntityCreateRequestV2) hookNotification).getEntities();
                z = entities == null || CollectionUtils.isEmpty(entities.getEntities());
                break;
            case 7:
                AtlasEntity.AtlasEntitiesWithExtInfo entities2 = ((HookNotification.EntityUpdateRequestV2) hookNotification).getEntities();
                z = entities2 == null || CollectionUtils.isEmpty(entities2.getEntities());
                break;
            default:
                z = false;
                break;
        }
        return z;
    }

    @GraphTransaction(logRollback = false)
    public String getGuidByUniqueAttributes(AtlasEntityType atlasEntityType, Map<String, Object> map, AtlasEntity.Status status) throws AtlasBaseException {
        return this.atlasGraphUtils.getGuidByUniqueAttributes(atlasEntityType, map, status);
    }
}
