/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.catalog.storage;

import io.confluent.catalog.DataCatalogConfig;
import io.confluent.catalog.atlas.repository.graphdb.janus.CfltAtlasJanusGraph;
import io.confluent.catalog.metrics.MetricsManager;
import io.confluent.catalog.model.ModelConstants;
import io.confluent.catalog.model.instance.BusinessMetadata;
import io.confluent.catalog.model.instance.Tag;
import io.confluent.catalog.model.typedef.TagDef;
import io.confluent.catalog.storage.EntitySearchService;
import io.confluent.catalog.storage.EntitySnapshot;
import io.confluent.catalog.storage.EntitySnapshotValue;
import io.confluent.catalog.storage.MetadataRegistry;
import io.confluent.catalog.storage.MetadataRegistryKey;
import io.confluent.catalog.storage.MetadataRegistryKind;
import io.confluent.catalog.storage.MetadataRegistryOp;
import io.confluent.catalog.storage.MetadataRegistryValue;
import io.confluent.catalog.util.AtomicTimestampGenerator;
import io.confluent.catalog.util.QualifiedNameGenerator;
import io.confluent.catalog.web.util.Types;
import io.confluent.rest.RestConfigException;
import io.kcache.CacheUpdateHandler;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.RequestContext;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.typedef.AtlasBusinessMetadataDef;
import org.apache.atlas.model.typedef.AtlasClassificationDef;
import org.apache.atlas.model.typedef.AtlasTypesDef;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.v2.AtlasEntityStream;
import org.apache.atlas.repository.store.graph.v2.EntityStream;
import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MetadataRegistryUpdateHandler
implements CacheUpdateHandler<MetadataRegistryKey, MetadataRegistryValue<?>> {
    private static final Logger log = LoggerFactory.getLogger(MetadataRegistryUpdateHandler.class);
    private final MetadataRegistry registry;
    private final CfltAtlasJanusGraph graph;
    private final AtlasTypeRegistry typeRegistry;
    private final AtlasEntityStore entityStore;
    private final AtlasTypeDefStore typeDefStore;
    private final EntitySearchService searchService;
    private final MetricsManager metricsManager;
    private final int batchSize;
    private final AtomicBoolean cacheInitialized = new AtomicBoolean();
    private final Map<TopicPartition, Long> lastOffsets = new ConcurrentHashMap<TopicPartition, Long>();
    private final AtomicInteger committedCount = new AtomicInteger();
    private final List<AtlasEntity> uncommittedEntities = new ArrayList<AtlasEntity>();
    private final AtomicTimestampGenerator tsGenerator;

    public MetadataRegistryUpdateHandler(MetadataRegistry registry, AtlasGraph graph, AtlasTypeRegistry typeRegistry, AtlasEntityStore entityStore, AtlasTypeDefStore typeDefStore, EntitySearchService searchService, MetricsManager metricsManager, AtomicTimestampGenerator tsGenerator) {
        try {
            this.registry = registry;
            this.graph = (CfltAtlasJanusGraph)graph;
            this.typeRegistry = typeRegistry;
            this.entityStore = entityStore;
            this.typeDefStore = typeDefStore;
            this.searchService = searchService;
            this.metricsManager = metricsManager;
            this.tsGenerator = tsGenerator;
            DataCatalogConfig dataCatalogConfig = new DataCatalogConfig(registry.config().originalProperties());
            this.batchSize = dataCatalogConfig.getCatalogIngestorBatchSize();
            log.info("Ingestor using batch size of {}", (Object)this.batchSize);
        }
        catch (RestConfigException e) {
            throw new IllegalArgumentException("Could not instantiate MetadataRegistryUpdateHandler", e);
        }
    }

    public void cacheInitialized(int count, Map<TopicPartition, Long> checkpoints) {
        this.cacheInitialized.set(true);
        new Thread(this.metricsManager::init).start();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CacheUpdateHandler.ValidationStatus validateUpdate(MetadataRegistryKey key, MetadataRegistryValue<?> value, TopicPartition tp, long offset, long timestamp) {
        Object newValue;
        block25: {
            block24: {
                if (value != null) break block24;
                CacheUpdateHandler.ValidationStatus validationStatus = CacheUpdateHandler.ValidationStatus.SUCCESS;
                this.metricsManager.updateAtlasMetrics(key.getTenant());
                RequestContext.clear();
                this.lastOffsets.put(tp, offset);
                if (value != null) {
                    long lagInSeconds = (this.tsGenerator.next() - value.getTimestamp()) / 1000000L;
                    this.metricsManager.recordCatalogEventProcessingLag(lagInSeconds);
                    if (offset % 100L == 0L) {
                        log.info("Catalog event processing lag: {} seconds, partition: {}, offset: {}", new Object[]{lagInSeconds, tp, offset});
                    }
                }
                return validationStatus;
            }
            if (key.getOp() != MetadataRegistryOp.CREATE && key.getOp() != MetadataRegistryOp.UPDATE || key.getKind() != MetadataRegistryKind.ENTITY || !value.isBatch()) break block25;
            AtlasEntity.AtlasEntityWithExtInfo entity = (AtlasEntity.AtlasEntityWithExtInfo)value.getNewValue();
            this.addToPending(key, entity);
            CacheUpdateHandler.ValidationStatus lagInSeconds = CacheUpdateHandler.ValidationStatus.SUCCESS;
            this.metricsManager.updateAtlasMetrics(key.getTenant());
            RequestContext.clear();
            this.lastOffsets.put(tp, offset);
            if (value != null) {
                long lagInSeconds2 = (this.tsGenerator.next() - value.getTimestamp()) / 1000000L;
                this.metricsManager.recordCatalogEventProcessingLag(lagInSeconds2);
                if (offset % 100L == 0L) {
                    log.info("Catalog event processing lag: {} seconds, partition: {}, offset: {}", new Object[]{lagInSeconds2, tp, offset});
                }
            }
            return lagInSeconds;
        }
        try {
            this.commitBatch();
            switch (key.getOp()) {
                case CREATE: 
                case UPDATE: {
                    newValue = this.createOrUpdate(key, value);
                    value.setNewValue(newValue);
                    break;
                }
                case DELETE: {
                    this.delete(key, false);
                    break;
                }
                case PURGE: {
                    this.delete(key, true);
                    break;
                }
                case RECONCILE: {
                    this.reconcile(key, value);
                    break;
                }
                default: {
                    throw new IllegalArgumentException("Unknown action: " + (Object)((Object)key.getOp()));
                }
            }
            log.debug("{} - successfully processed key: {}", (Object)key.getTenant(), (Object)key);
            newValue = CacheUpdateHandler.ValidationStatus.SUCCESS;
            this.metricsManager.updateAtlasMetrics(key.getTenant());
        }
        catch (Throwable e) {
            CacheUpdateHandler.ValidationStatus validationStatus;
            try {
                if (e instanceof AtlasBaseException) {
                    log.error("{} - AtlasBaseException, key: {}, partition: {}, offset: {}", new Object[]{key.getTenant(), key, tp, offset, e});
                    this.registry.putObjectWithException(value.getTimestamp(), value.getNewValue(), (AtlasBaseException)e);
                } else {
                    log.error("{} - unexpected exception, key: {}, partition: {}, offset: {}", new Object[]{key.getTenant(), key, tp, offset, e});
                    this.registry.putObjectWithException(value.getTimestamp(), value.getNewValue(), new AtlasBaseException(e));
                }
                validationStatus = CacheUpdateHandler.ValidationStatus.IGNORE_FAILURE;
                this.metricsManager.updateAtlasMetrics(key.getTenant());
            }
            catch (Throwable throwable) {
                this.metricsManager.updateAtlasMetrics(key.getTenant());
                RequestContext.clear();
                this.lastOffsets.put(tp, offset);
                if (value != null) {
                    long lagInSeconds = (this.tsGenerator.next() - value.getTimestamp()) / 1000000L;
                    this.metricsManager.recordCatalogEventProcessingLag(lagInSeconds);
                    if (offset % 100L == 0L) {
                        log.info("Catalog event processing lag: {} seconds, partition: {}, offset: {}", new Object[]{lagInSeconds, tp, offset});
                    }
                }
                throw throwable;
            }
            RequestContext.clear();
            this.lastOffsets.put(tp, offset);
            if (value != null) {
                long lagInSeconds = (this.tsGenerator.next() - value.getTimestamp()) / 1000000L;
                this.metricsManager.recordCatalogEventProcessingLag(lagInSeconds);
                if (offset % 100L == 0L) {
                    log.info("Catalog event processing lag: {} seconds, partition: {}, offset: {}", new Object[]{lagInSeconds, tp, offset});
                }
            }
            return validationStatus;
        }
        RequestContext.clear();
        this.lastOffsets.put(tp, offset);
        if (value != null) {
            long lagInSeconds = (this.tsGenerator.next() - value.getTimestamp()) / 1000000L;
            this.metricsManager.recordCatalogEventProcessingLag(lagInSeconds);
            if (offset % 100L == 0L) {
                log.info("Catalog event processing lag: {} seconds, partition: {}, offset: {}", new Object[]{lagInSeconds, tp, offset});
            }
        }
        return newValue;
    }

    private void addToPending(MetadataRegistryKey key, AtlasEntity.AtlasEntityWithExtInfo entity) throws AtlasBaseException {
        String qualifiedName = (String)entity.getEntity().getAttribute("qualifiedName");
        this.registry.pendingCache().put(qualifiedName, entity);
        this.uncommittedEntities.add(this.scrubEntity(key, entity));
    }

    private void clearPending() {
        this.uncommittedEntities.clear();
        this.registry.pendingCache().clear();
    }

    private Object createOrUpdate(MetadataRegistryKey key, MetadataRegistryValue<?> value) throws AtlasBaseException {
        if (value == null) {
            return null;
        }
        switch (key.getKind()) {
            case ENTITY: {
                this.maybeUpdateOldEntity(key, (AtlasEntity.AtlasEntityWithExtInfo)value.getOldValue());
                return this.createOrUpdateEntity(key, (AtlasEntity.AtlasEntityWithExtInfo)value.getNewValue(), value.isDelta());
            }
            case TAG: {
                this.maybeUpdateOldTag(key, (Tag)((Object)value.getOldValue()));
                return this.createOrUpdateTag(key, (Tag)((Object)value.getNewValue()));
            }
            case TAG_DEF: {
                return this.createOrUpdateTagDef(key, (TagDef)((Object)value.getNewValue()));
            }
            case BUSINESS_METADATA: {
                this.maybeUpdateOldBusinessMetadata(key, (BusinessMetadata)((Object)value.getOldValue()));
                return this.createOrUpdateBusinessMetadata(key, (BusinessMetadata)((Object)value.getNewValue()));
            }
            case BUSINESS_METADATA_DEF: {
                return this.createOrUpdateBusinessMetadataDef(key, (AtlasBusinessMetadataDef)value.getNewValue());
            }
        }
        throw new IllegalArgumentException("Unknown key type: " + (Object)((Object)key.getKind()));
    }

    private void delete(MetadataRegistryKey key, boolean isPurge) throws AtlasBaseException {
        switch (key.getKind()) {
            case ENTITY: {
                this.deleteEntity(key, isPurge);
                break;
            }
            case TAG: {
                this.deleteTag(key, isPurge);
                break;
            }
            case TAG_DEF: {
                this.deleteTagDef(key, isPurge);
                break;
            }
            case BUSINESS_METADATA: {
                this.deleteBusinessMetadata(key, isPurge);
                break;
            }
            case BUSINESS_METADATA_DEF: {
                this.deleteBusinessMetadataDef(key, isPurge);
                break;
            }
            default: {
                throw new IllegalArgumentException("Unknown key type: " + (Object)((Object)key.getKind()));
            }
        }
    }

    private void reconcile(MetadataRegistryKey key, MetadataRegistryValue<?> value) {
        Set<String> allEntitiesInSnapshot = ((EntitySnapshot)((EntitySnapshotValue)value).getNewValue()).getEntities();
        switch (key.getType()) {
            case "kafka_topic": {
                this.reconcileTopicMetadata(key.getTenant(), key.getName(), allEntitiesInSnapshot);
                break;
            }
            case "kafka_logical_cluster": {
                this.reconcileClusterMetadata(key.getTenant(), allEntitiesInSnapshot);
                break;
            }
            default: {
                throw new IllegalArgumentException("Unknown key typeName: " + key.getType());
            }
        }
    }

    private void reconcileTopicMetadata(String tenant, String logicalKafkaClusterId, Set<String> allTopicsInSnapshot) {
        long startTimeMs = System.currentTimeMillis();
        HashSet<String> attributes = new HashSet<String>();
        attributes.add("tenant");
        attributes.add("name");
        attributes.add("qualifiedName");
        String qualifiedNamePrefix = QualifiedNameGenerator.getQualifiedName(tenant, logicalKafkaClusterId);
        List<Object> allTopicsInCatalog = new ArrayList();
        try {
            allTopicsInCatalog = this.searchService.searchAllNonDeprecatedEntities(ModelConstants.ENTITY_KAFKA_TOPIC, qualifiedNamePrefix, null, attributes, Collections.emptySet());
        }
        catch (AtlasBaseException e) {
            log.error(String.format("Reconcile topic metadata, error searching, qualifiedNamePrefix %s", qualifiedNamePrefix), (Throwable)e);
        }
        int count = 0;
        for (AtlasEntityHeader atlasEntityHeader : allTopicsInCatalog) {
            String topicName = atlasEntityHeader.getAttribute("name").toString();
            if (allTopicsInSnapshot.contains(topicName)) continue;
            ++count;
            String topicQualifiedName = atlasEntityHeader.getAttribute("qualifiedName").toString();
            try {
                AtlasEntity newEntity = new AtlasEntity(ModelConstants.ENTITY_KAFKA_TOPIC);
                Date deprecatedTime = new Date();
                newEntity.setAttribute("qualifiedName", (Object)topicQualifiedName);
                newEntity.setAttribute("deprecatedTime", (Object)deprecatedTime);
                String guid = atlasEntityHeader.getGuid();
                if (guid == null) {
                    throw new AtlasBaseException(AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND, new String[]{ModelConstants.ENTITY_KAFKA_TOPIC, topicQualifiedName});
                }
                newEntity.setGuid(guid);
                this.entityStore.createOrUpdate((EntityStream)new AtlasEntityStream(newEntity), true);
                this.metricsManager.recordReconciliationTopic(1);
            }
            catch (AtlasBaseException e) {
                log.error(String.format("Reconcile topic metadata, error deprecating topic entity, qualifiedNamePrefix %s", qualifiedNamePrefix), (Throwable)e);
                this.metricsManager.recordReconciliationTopicError(1);
            }
        }
        log.info(String.format("Reconcile topic metadata, time cost %d ms, active topic number in catalog %d, in snapshot %d, extra number %d, qualifiedNamePrefix %s", System.currentTimeMillis() - startTimeMs, allTopicsInCatalog.size(), allTopicsInSnapshot.size(), count, qualifiedNamePrefix));
        this.metricsManager.recordReconciliationTopicTime(System.currentTimeMillis() - startTimeMs);
    }

    private void reconcileClusterMetadata(String tenant, Set<String> allClustersInSnapshot) {
        long startTimeMs = System.currentTimeMillis();
        HashSet<String> attributes = new HashSet<String>();
        attributes.add("tenant");
        attributes.add("id");
        attributes.add("qualifiedName");
        String qualifiedNamePrefix = QualifiedNameGenerator.getQualifiedName(tenant);
        List<Object> allClustersInCatalog = new ArrayList();
        try {
            allClustersInCatalog = this.searchService.searchAllNonDeprecatedEntities(ModelConstants.ENTITY_KAFKA_LOGICAL_CLUSTER, qualifiedNamePrefix, null, attributes, Collections.emptySet());
        }
        catch (AtlasBaseException e) {
            log.error(String.format("Reconcile cluster metadata, error searching, qualifiedNamePrefix %s", qualifiedNamePrefix), (Throwable)e);
        }
        int count = 0;
        for (AtlasEntityHeader atlasEntityHeader : allClustersInCatalog) {
            String clusterId = atlasEntityHeader.getAttribute("id").toString();
            if (allClustersInSnapshot.contains(clusterId)) continue;
            ++count;
            String clusterQualifiedName = atlasEntityHeader.getAttribute("qualifiedName").toString();
            try {
                AtlasEntity newEntity = new AtlasEntity(ModelConstants.ENTITY_KAFKA_LOGICAL_CLUSTER);
                Date deprecatedTime = new Date();
                newEntity.setAttribute("qualifiedName", (Object)clusterQualifiedName);
                newEntity.setAttribute("deprecatedTime", (Object)deprecatedTime);
                String guid = atlasEntityHeader.getGuid();
                if (guid == null) {
                    throw new AtlasBaseException(AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND, new String[]{ModelConstants.ENTITY_KAFKA_LOGICAL_CLUSTER, clusterQualifiedName});
                }
                newEntity.setGuid(guid);
                this.entityStore.createOrUpdate((EntityStream)new AtlasEntityStream(newEntity), true);
                this.deprecateAllTopics(clusterQualifiedName, deprecatedTime);
                this.metricsManager.recordReconciliationCluster(1);
            }
            catch (AtlasBaseException e) {
                log.error(String.format("Reconcile cluster metadata, error deprecating entity with clusterQualifiedName %s", clusterQualifiedName), (Throwable)e);
                this.metricsManager.recordReconciliationClusterError(1);
            }
        }
        log.info(String.format("Reconcile cluster metadata, time cost %d ms, clusters number in catalog %d, in snapshot %d, extra number %d, qualifiedNamePrefix %s", System.currentTimeMillis() - startTimeMs, allClustersInCatalog.size(), allClustersInSnapshot.size(), count, qualifiedNamePrefix));
        this.metricsManager.recordReconciliationClusterTime(System.currentTimeMillis() - startTimeMs);
    }

    private AtlasEntity.AtlasEntityWithExtInfo createOrUpdateEntity(MetadataRegistryKey key, AtlasEntity.AtlasEntityWithExtInfo entity, boolean isDelta) throws AtlasBaseException {
        String entityType = key.getType();
        if (MetadataRegistryUpdateHandler.isIgnored(entityType)) {
            throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_INVALID, new String[]{TypeCategory.ENTITY.name(), entityType});
        }
        AtlasEntityType atlasEntityType = this.ensureEntityType(entityType);
        String entityName = QualifiedNameGenerator.ensureEntityTenantPrefix(key.getTenant(), entityType, key.getName());
        Map<String, Object> attributes = Collections.singletonMap("qualifiedName", entityName);
        String guid = null;
        try {
            guid = this.registry.getGuidByUniqueAttributes(atlasEntityType, attributes);
        }
        catch (Exception exception) {
            // empty catch block
        }
        AtlasEntity atlasEntity = entity.getEntity();
        atlasEntity.setTypeName(entityType);
        atlasEntity.setAttribute("qualifiedName", (Object)entityName);
        if (key.getOp() == MetadataRegistryOp.CREATE) {
            if (guid != null) {
                throw new AtlasBaseException(AtlasErrorCode.INSTANCE_ALREADY_EXISTS, new String[]{entityType, attributes.toString()});
            }
            this.entityStore.createOrUpdate((EntityStream)new AtlasEntityStream(entity), false);
        } else {
            if (guid == null) {
                throw new AtlasBaseException(AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND, new String[]{entityType, attributes.toString()});
            }
            atlasEntity.setGuid(guid);
            AtlasEntity.AtlasEntityWithExtInfo oldCluster = null;
            try {
                oldCluster = this.registry.getEntity(key.getTenant(), ModelConstants.ENTITY_KAFKA_LOGICAL_CLUSTER, entityName, true, true);
            }
            catch (AtlasBaseException atlasBaseException) {
                // empty catch block
            }
            this.entityStore.createOrUpdate((EntityStream)new AtlasEntityStream(entity), isDelta);
            if (isDelta && entityType.equals(ModelConstants.ENTITY_KAFKA_LOGICAL_CLUSTER) && oldCluster != null) {
                this.kafkaClusterStatusCheck(entityName, oldCluster, entity);
            }
        }
        entity.getEntity().setGuid(null);
        return entity;
    }

    private void kafkaClusterStatusCheck(String clusterQualifiedName, AtlasEntity.AtlasEntityWithExtInfo oldCluster, AtlasEntity.AtlasEntityWithExtInfo newCluster) {
        if (oldCluster == null) {
            return;
        }
        boolean wasDeprecated = false;
        Date clusterDeprecatedTime = MetadataRegistry.getDateAttribute(oldCluster.getEntity().getAttribute("deprecatedTime"));
        if (clusterDeprecatedTime != null && !clusterDeprecatedTime.equals(ModelConstants.UNIX_ZERO_EPOCH)) {
            wasDeprecated = true;
        }
        boolean isDeprecated = false;
        Date newDeprecatedTime = MetadataRegistry.getDateAttribute(newCluster.getEntity().getAttribute("deprecatedTime"));
        if (newDeprecatedTime != null && !newDeprecatedTime.equals(ModelConstants.UNIX_ZERO_EPOCH)) {
            isDeprecated = true;
        }
        if (wasDeprecated && !isDeprecated) {
            this.activateAllTopics(clusterQualifiedName);
        } else if (!wasDeprecated && isDeprecated) {
            this.deprecateAllTopics(clusterQualifiedName, newDeprecatedTime);
        }
    }

    private void deprecateAllTopics(String qualifiedNamePrefix, Date deprecatedTime) {
        HashSet<String> attributes = new HashSet<String>();
        attributes.add("tenant");
        attributes.add("qualifiedName");
        List<AtlasEntityHeader> allActiveTopicsInCatalog = null;
        try {
            allActiveTopicsInCatalog = this.searchService.searchAllNonDeprecatedEntities(ModelConstants.ENTITY_KAFKA_TOPIC, qualifiedNamePrefix, null, attributes, Collections.emptySet());
        }
        catch (AtlasBaseException e) {
            log.error(String.format("Error search topics in catalog with qualifiedNamePrefix %s", qualifiedNamePrefix), (Throwable)e);
        }
        if (allActiveTopicsInCatalog == null || allActiveTopicsInCatalog.size() == 0) {
            log.info(String.format("No topics found when cascading deprecate, qualifiedNamePrefix %s", qualifiedNamePrefix));
            return;
        }
        ArrayList<AtlasEntity> entities = new ArrayList<AtlasEntity>();
        for (AtlasEntityHeader topicHeader : allActiveTopicsInCatalog) {
            String topicQualifiedName = topicHeader.getAttribute("qualifiedName").toString();
            AtlasEntity newEntity = new AtlasEntity(ModelConstants.ENTITY_KAFKA_TOPIC);
            newEntity.setAttribute("qualifiedName", (Object)topicQualifiedName);
            newEntity.setAttribute("deprecatedTime", (Object)deprecatedTime);
            String guid = topicHeader.getGuid();
            if (guid == null) {
                log.error(String.valueOf((Object)new AtlasBaseException(AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND, new String[]{ModelConstants.ENTITY_KAFKA_TOPIC, topicQualifiedName})));
                continue;
            }
            newEntity.setGuid(guid);
            entities.add(newEntity);
        }
        try {
            this.entityStore.createOrUpdate((EntityStream)new AtlasEntityStream(entities), true);
        }
        catch (AtlasBaseException e) {
            log.error(String.format("Error cascading deprecate topics with lkc, qualifiedNamePrefix %s", qualifiedNamePrefix), (Throwable)e);
        }
        log.info(String.format("Cascading deprecate topics, total number %d, qualifiedNamePrefix %s", allActiveTopicsInCatalog.size(), qualifiedNamePrefix));
    }

    public void activateAllTopics(String qualifiedNamePrefix) {
        HashSet<String> attributes = new HashSet<String>();
        attributes.add("tenant");
        attributes.add("qualifiedName");
        List<AtlasEntityHeader> allDeprecatedTopicsInCatalog = null;
        try {
            allDeprecatedTopicsInCatalog = this.searchService.searchAllDeprecatedEntities(ModelConstants.ENTITY_KAFKA_TOPIC, qualifiedNamePrefix, null, attributes, Collections.emptySet());
        }
        catch (AtlasBaseException e) {
            log.error(String.format("Error search topics in catalog with qualifiedNamePrefix %s", qualifiedNamePrefix), (Throwable)e);
        }
        if (allDeprecatedTopicsInCatalog == null || allDeprecatedTopicsInCatalog.size() == 0) {
            log.info(String.format("No topics found when cascading activate, qualifiedNamePrefix %s", qualifiedNamePrefix));
            return;
        }
        ArrayList<AtlasEntity> entities = new ArrayList<AtlasEntity>();
        for (AtlasEntityHeader topicHeader : allDeprecatedTopicsInCatalog) {
            String topicQualifiedName = topicHeader.getAttribute("qualifiedName").toString();
            AtlasEntity newEntity = new AtlasEntity(ModelConstants.ENTITY_KAFKA_TOPIC);
            newEntity.setAttribute("qualifiedName", (Object)topicQualifiedName);
            newEntity.setAttribute("deprecatedTime", (Object)ModelConstants.UNIX_ZERO_EPOCH);
            String guid = topicHeader.getGuid();
            if (guid == null) {
                log.error(String.valueOf((Object)new AtlasBaseException(AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND, new String[]{ModelConstants.ENTITY_KAFKA_TOPIC, topicQualifiedName})));
                continue;
            }
            newEntity.setGuid(guid);
            entities.add(newEntity);
        }
        try {
            this.entityStore.createOrUpdate((EntityStream)new AtlasEntityStream(entities), true);
        }
        catch (AtlasBaseException e) {
            log.error(String.format("Error cascading activate topics with lkc, qualifiedNamePrefix %s", qualifiedNamePrefix), (Throwable)e);
        }
        log.info(String.format("Cascading activate topics, total number %d, qualifiedNamePrefix %s", allDeprecatedTopicsInCatalog.size(), qualifiedNamePrefix));
    }

    private static boolean isIgnored(String typeName) {
        return !ModelConstants.hasValidPrefix(typeName);
    }

    private void maybeUpdateOldEntity(MetadataRegistryKey key, AtlasEntity.AtlasEntityWithExtInfo oldEntity) {
        if (oldEntity == null || oldEntity.getEntity() == null || key.getOp() != MetadataRegistryOp.UPDATE || this.cacheInitialized.get() || this.registry.localCache().get((Object)key) != null) {
            return;
        }
        try {
            this.createOrUpdateEntity(key, oldEntity, false);
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    private AtlasEntity scrubEntity(MetadataRegistryKey key, AtlasEntity.AtlasEntityWithExtInfo entity) throws AtlasBaseException {
        String entityType = key.getType();
        if (MetadataRegistryUpdateHandler.isIgnored(entityType)) {
            throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_INVALID, new String[]{TypeCategory.ENTITY.name(), entityType});
        }
        this.ensureEntityType(entityType);
        String entityName = QualifiedNameGenerator.ensureEntityTenantPrefix(key.getTenant(), entityType, key.getName());
        AtlasEntity atlasEntity = entity.getEntity();
        atlasEntity.setTypeName(entityType);
        atlasEntity.setAttribute("qualifiedName", (Object)entityName);
        return atlasEntity;
    }

    private Tag createOrUpdateTag(MetadataRegistryKey key, Tag tag) throws AtlasBaseException {
        AtlasClassification classification = new AtlasClassification((AtlasClassification)tag);
        String relatedEntityType = key.getRelatedType();
        AtlasEntityType atlasEntityType = this.ensureEntityType(relatedEntityType);
        Map<String, Object> attributes = Collections.singletonMap("qualifiedName", key.getRelatedName());
        String guid = this.registry.getGuidByUniqueAttributes(atlasEntityType, attributes);
        classification.setEntityGuid(guid);
        if (classification.isPropagate() == null) {
            classification.setPropagate(Boolean.FALSE);
        }
        if (key.getOp() == MetadataRegistryOp.CREATE) {
            this.entityStore.addClassifications(guid, Collections.singletonList(classification));
        } else {
            this.entityStore.updateClassifications(guid, Collections.singletonList(classification));
        }
        classification.setEntityGuid(null);
        return tag;
    }

    private void maybeUpdateOldTag(MetadataRegistryKey key, Tag oldTag) {
        if (oldTag == null || key.getOp() != MetadataRegistryOp.UPDATE || this.cacheInitialized.get() || this.registry.localCache().get((Object)key) != null) {
            return;
        }
        try {
            Tag tag = this.registry.getTag(key.getTenant(), oldTag.getEntityType(), oldTag.getEntityName(), oldTag.getTypeName());
            if (!oldTag.equals((Object)tag)) {
                this.createOrUpdateTag(key, oldTag);
            }
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    private void maybeUpdateOldBusinessMetadata(MetadataRegistryKey key, BusinessMetadata oldBM) {
        if (oldBM == null || key.getOp() != MetadataRegistryOp.UPDATE || this.cacheInitialized.get() || this.registry.localCache().get((Object)key) != null) {
            return;
        }
        try {
            BusinessMetadata businessMetadata = this.registry.getBusinessMetadata(key.getTenant(), oldBM.getEntityType(), oldBM.getEntityName(), oldBM.getTypeName());
            if (!oldBM.equals((Object)businessMetadata)) {
                this.createOrUpdateBusinessMetadata(key, oldBM);
            }
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    private BusinessMetadata createOrUpdateBusinessMetadata(MetadataRegistryKey key, BusinessMetadata businessMetadata) throws AtlasBaseException {
        String relatedEntityType = key.getRelatedType();
        AtlasEntityType atlasEntityType = this.ensureEntityType(relatedEntityType);
        Map<String, Object> attributes = Collections.singletonMap("qualifiedName", key.getRelatedName());
        String guid = this.registry.getGuidByUniqueAttributes(atlasEntityType, attributes);
        this.entityStore.addOrUpdateBusinessAttributes(guid, Collections.singletonMap(businessMetadata.getTypeName(), businessMetadata.getAttributes()), false);
        return businessMetadata;
    }

    private void deleteEntity(MetadataRegistryKey key, boolean isPurge) throws AtlasBaseException {
        String guid;
        String entityType = key.getType();
        AtlasEntityType atlasEntityType = this.ensureEntityType(entityType);
        String entityName = QualifiedNameGenerator.ensureEntityTenantPrefix(key.getTenant(), entityType, key.getName());
        Map<String, Object> attributes = Collections.singletonMap("qualifiedName", entityName);
        String string = guid = isPurge ? this.registry.getGuidByUniqueAttributes(atlasEntityType, attributes, AtlasEntity.Status.DELETED) : this.registry.getGuidByUniqueAttributes(atlasEntityType, attributes);
        if (isPurge) {
            this.entityStore.purgeByIds(Collections.singleton(guid));
        } else {
            this.entityStore.deleteById(guid);
        }
    }

    private void deleteTag(MetadataRegistryKey key, boolean isPurge) throws AtlasBaseException {
        String relatedEntityType = key.getRelatedType();
        AtlasEntityType atlasEntityType = this.ensureEntityType(relatedEntityType);
        Map<String, Object> attributes = Collections.singletonMap("qualifiedName", key.getRelatedName());
        String guid = this.registry.getGuidByUniqueAttributes(atlasEntityType, attributes);
        this.entityStore.deleteClassification(guid, key.getType());
    }

    private void deleteBusinessMetadata(MetadataRegistryKey key, boolean isPurge) throws AtlasBaseException {
        String relatedEntityType = key.getRelatedType();
        AtlasEntityType atlasEntityType = this.ensureEntityType(relatedEntityType);
        Map<String, Object> attributes = Collections.singletonMap("qualifiedName", key.getRelatedName());
        String guid = this.registry.getGuidByUniqueAttributes(atlasEntityType, attributes);
        String bmName = key.getType();
        BusinessMetadata bm = this.registry.getBusinessMetadata(key.getTenant(), relatedEntityType, key.getRelatedName(), bmName);
        if (bm != null) {
            this.entityStore.removeBusinessAttributes(guid, Collections.singletonMap(bmName, bm.getAttributes()));
        } else {
            log.warn("BusinessMetadata {} doesn't not exist", (Object)bmName);
        }
    }

    private AtlasEntityType ensureEntityType(String typeName) throws AtlasBaseException {
        AtlasEntityType ret = this.typeRegistry.getEntityTypeByName(typeName);
        if (ret == null) {
            throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_INVALID, new String[]{TypeCategory.ENTITY.name(), typeName});
        }
        return ret;
    }

    private TagDef createOrUpdateTagDef(MetadataRegistryKey key, TagDef tagDef) throws AtlasBaseException {
        if (tagDef.getName() == null) {
            throw new AtlasBaseException(AtlasErrorCode.UNKNOWN_TYPENAME, new String[]{tagDef.getName()});
        }
        AtlasClassificationDef classificationDef = new AtlasClassificationDef((AtlasClassificationDef)tagDef);
        classificationDef.setEntityTypes(tagDef.getEntityTypes());
        classificationDef.setTypeVersion(key.getVersion());
        AtlasTypesDef typesDef = new AtlasTypesDef();
        typesDef.setClassificationDefs(Collections.singletonList(classificationDef));
        typesDef = key.getOp() == MetadataRegistryOp.CREATE ? this.typeDefStore.createTypesDef(typesDef) : this.typeDefStore.updateTypesDef(typesDef);
        List classificationDefs = typesDef.getClassificationDefs();
        if (classificationDefs.size() > 0) {
            classificationDef = (AtlasClassificationDef)classificationDefs.get(0);
            classificationDef.setGuid(null);
            return new TagDef(classificationDef);
        }
        return null;
    }

    private AtlasBusinessMetadataDef createOrUpdateBusinessMetadataDef(MetadataRegistryKey key, AtlasBusinessMetadataDef bmDef) throws AtlasBaseException {
        if (bmDef.getName() == null) {
            throw new AtlasBaseException(AtlasErrorCode.UNKNOWN_TYPENAME, new String[]{bmDef.getName()});
        }
        if (Types.containsStringIndexType(bmDef)) {
            throw new AtlasBaseException("Attribute definition with index type STRING not supported");
        }
        AtlasTypesDef typesDef = new AtlasTypesDef();
        typesDef.setBusinessMetadataDefs(Collections.singletonList(bmDef));
        typesDef = key.getOp() == MetadataRegistryOp.CREATE ? this.typeDefStore.createTypesDef(typesDef) : this.typeDefStore.updateTypesDef(typesDef);
        List bmDefs = typesDef.getBusinessMetadataDefs();
        if (bmDefs.size() > 0) {
            bmDef = (AtlasBusinessMetadataDef)bmDefs.get(0);
            bmDef.setGuid(null);
            return bmDef;
        }
        return null;
    }

    private void deleteTagDef(MetadataRegistryKey key, boolean isPurge) throws AtlasBaseException {
        this.typeDefStore.deleteTypeByName(key.getType());
    }

    private void deleteBusinessMetadataDef(MetadataRegistryKey key, boolean isPurge) throws AtlasBaseException {
        this.typeDefStore.deleteTypeByName(key.getType());
    }

    public void handleUpdate(MetadataRegistryKey key, MetadataRegistryValue<?> value, MetadataRegistryValue<?> oldValue, TopicPartition tp, long offset, long timestamp) {
    }

    public Map<TopicPartition, Long> checkpoint(int count) {
        if (count == 0 || this.uncommittedEntities.size() >= this.batchSize) {
            this.commitBatch();
        }
        if (this.uncommittedEntities.size() == 0) {
            return this.lastOffsets;
        }
        return Collections.emptyMap();
    }

    public void endBatch(int count) {
        if (!this.cacheInitialized.get()) {
            this.commitBatch();
        }
    }

    public void failBatch(int count, Throwable t) {
        if (t instanceof WakeupException) {
            this.commitBatch();
        } else {
            this.metricsManager.recordIngestionIndexError(this.uncommittedEntities.size());
            this.clearPending();
        }
    }

    private void commitBatch() {
        int uncommitted = this.uncommittedEntities.size();
        if (uncommitted > 0) {
            try {
                this.graph.setVerifyUniqueness(false);
                this.entityStore.createOrUpdate((EntityStream)new AtlasEntityStream(this.uncommittedEntities), true);
                int committed = this.committedCount.addAndGet(uncommitted);
                this.metricsManager.recordIngestionIndex(uncommitted);
                log.info("Total committed {}", (Object)committed);
            }
            catch (Exception e) {
                this.metricsManager.recordIngestionIndexError(uncommitted);
                log.error("Could not commit {}", (Object)uncommitted, (Object)e);
            }
            finally {
                this.graph.setVerifyUniqueness(true);
                RequestContext.clear();
            }
            this.clearPending();
        }
    }

    public void close() throws IOException {
    }
}

