package io.confluent.catalog.storage;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import io.confluent.catalog.DataCatalogConfig;
import io.confluent.catalog.client.rest.CatalogRestService;
import io.confluent.catalog.hook.AtlasGraphUtils;
import io.confluent.catalog.hook.SchemaAtlasHook;
import io.confluent.catalog.hook.SchemaAtlasTypes;
import io.confluent.catalog.metrics.MetricsManager;
import io.confluent.catalog.model.ModelConstants;
import io.confluent.catalog.model.instance.BusinessMetadata;
import io.confluent.catalog.model.instance.SchemaMetadata;
import io.confluent.catalog.model.instance.SchemaTags;
import io.confluent.catalog.model.instance.Tag;
import io.confluent.catalog.model.typedef.TagDef;
import io.confluent.catalog.notification.SnapshotManager;
import io.confluent.catalog.storage.serialization.MetadataRegistryKeySerde;
import io.confluent.catalog.storage.serialization.MetadataRegistryValueSerde;
import io.confluent.catalog.util.AtomicTimestampGenerator;
import io.confluent.catalog.util.JavaClock;
import io.confluent.catalog.util.QualifiedNameGenerator;
import io.confluent.catalog.web.errors.ExceptionMapperUtil;
import io.confluent.catalog.web.rest.entities.BusinessMetadataDefResponse;
import io.confluent.catalog.web.rest.entities.BusinessMetadataResponse;
import io.confluent.catalog.web.rest.entities.SchemaTagsResponse;
import io.confluent.catalog.web.rest.entities.TagDefResponse;
import io.confluent.catalog.web.rest.entities.TagResponse;
import io.confluent.catalog.web.rest.exceptions.RestInvalidTagException;
import io.confluent.catalog.web.rest.exceptions.RestInvalidTagOperationException;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider;
import io.confluent.kafka.schemaregistry.client.rest.RestService;
import io.confluent.kafka.schemaregistry.client.rest.entities.Metadata;
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaEntity;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.schemaregistry.client.rest.utils.UrlList;
import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryException;
import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryRequestForwardingException;
import io.confluent.kafka.schemaregistry.exceptions.UnknownLeaderException;
import io.confluent.kafka.schemaregistry.json.JsonSchemaProvider;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig;
import io.confluent.kafka.schemaregistry.rest.client.LocalSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.rest.client.RetryExecutor;
import io.confluent.kafka.schemaregistry.rest.exceptions.RestInvalidSchemaException;
import io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry;
import io.confluent.kafka.schemaregistry.storage.SchemaRegistry;
import io.confluent.kafka.schemaregistry.utils.JacksonMapper;
import io.confluent.kafka.schemaregistry.utils.QualifiedSubject;
import io.confluent.rest.RestConfigException;
import io.confluent.rest.entities.ErrorMessage;
import io.confluent.rest.exceptions.RestException;
import io.confluent.rest.exceptions.RestNotFoundException;
import io.kcache.Cache;
import io.kcache.CacheLoader;
import io.kcache.CacheUpdateHandler;
import io.kcache.KafkaCache;
import io.kcache.KafkaCacheConfig;
import io.kcache.KeyValue;
import io.kcache.caffeine.CaffeineCache;
import io.kcache.exceptions.CacheInitializationException;
import io.kcache.exceptions.EntryTooLargeException;
import io.kcache.utils.Caches;
import java.io.Closeable;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiPredicate;
import java.util.stream.Collectors;
import javax.annotation.PreDestroy;
import javax.ws.rs.core.UriBuilder;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.annotation.GraphTransaction;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.AtlasRelatedObjectId;
import org.apache.atlas.model.instance.AtlasStruct;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.instance.EntityMutations;
import org.apache.atlas.model.typedef.AtlasBusinessMetadataDef;
import org.apache.atlas.model.typedef.AtlasClassificationDef;
import org.apache.atlas.model.typedef.AtlasRelationshipDef;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasRelationshipType;
import org.apache.atlas.type.AtlasStructType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

@Singleton
@Service
/* loaded from: input_file:io/confluent/catalog/storage/MetadataRegistry.class */
public class MetadataRegistry implements Closeable {
    public static final String X_FORWARD_HEADER = "X-Forward";
    private static final String VERSION_PREFIX = "1.";
    private static final String DEFAULT_VERSION = "1.0";
    private static final String CONFLUENT_VERSION = "confluent:version";
    private static final int CACHE_SIZE = 10000;
    private final KafkaSchemaRegistry schemaRegistry;
    private final LocalSchemaRegistryClient schemaRegistryClient;
    private final RetryExecutor retryExecutor;
    private final AtlasGraph graph;
    private final AtlasTypeRegistry typeRegistry;
    private final AtlasEntityStore entityStore;
    private final AtlasTypeDefStore typeDefStore;
    private final AtlasGraphUtils atlasGraphUtils;
    private final EntitySearchService searchService;
    private final MetricsManager metricsManager;
    private final SnapshotManager snapshotManager;
    private final Map<String, AtlasEntity.AtlasEntityWithExtInfo> pendingCache;
    private final Cache<MetadataRegistryKey, MetadataRegistryValue<?>> localCache;
    final Cache<MetadataRegistryKey, MetadataRegistryValue<?>> kafkaCache;
    private final Cache<Long, ObjectWithException> errorCache;
    private final Map<String, Lock> tenantToLock = new ConcurrentHashMap();
    private final AtomicBoolean initialized = new AtomicBoolean();
    private final CountDownLatch initLatch = new CountDownLatch(1);
    private final AtomicTimestampGenerator tsGenerator = new AtomicTimestampGenerator(new JavaClock());
    private final Boolean updateTagDefDefaultColor;
    private final String tagColorToUpdate;
    private static final Logger log = LoggerFactory.getLogger(MetadataRegistry.class);
    private static final Duration CACHE_EXPIRE_AFTER_WRITE = Duration.ofHours(1);
    private static final AtlasBaseException UNKNOWN_ERROR = new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, new String[]{""});
    private static final TypeReference<EntityMutationResponse> ENTITY_MUTATION_RESPONSE_TYPE = new TypeReference<EntityMutationResponse>() { // from class: io.confluent.catalog.storage.MetadataRegistry.1
    };
    private static final TypeReference<List<TagResponse>> LIST_TAGS_TYPE = new TypeReference<List<TagResponse>>() { // from class: io.confluent.catalog.storage.MetadataRegistry.2
    };
    private static final TypeReference<List<TagDefResponse>> LIST_TAG_DEFS_TYPE = new TypeReference<List<TagDefResponse>>() { // from class: io.confluent.catalog.storage.MetadataRegistry.3
    };
    private static final TypeReference<List<BusinessMetadataResponse>> LIST_BM_TYPE = new TypeReference<List<BusinessMetadataResponse>>() { // from class: io.confluent.catalog.storage.MetadataRegistry.4
    };
    private static final TypeReference<List<BusinessMetadataDefResponse>> LIST_BM_DEFS_TYPE = new TypeReference<List<BusinessMetadataDefResponse>>() { // from class: io.confluent.catalog.storage.MetadataRegistry.5
    };
    private static final TypeReference<List<SchemaTagsResponse>> LIST_SCHEMA_TAGS_TYPE = new TypeReference<List<SchemaTagsResponse>>() { // from class: io.confluent.catalog.storage.MetadataRegistry.6
    };
    private static final TypeReference<String> STRING_TYPE = new TypeReference<String>() { // from class: io.confluent.catalog.storage.MetadataRegistry.7
    };
    private static final TypeReference<Void> VOID_TYPE = new TypeReference<Void>() { // from class: io.confluent.catalog.storage.MetadataRegistry.8
    };
    private static final String searchTypesForTags = String.join(",", SchemaAtlasTypes.SR_RECORD.getName(), SchemaAtlasTypes.SR_FIELD.getName());

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/confluent/catalog/storage/MetadataRegistry$ObjectWithException.class */
    public static class ObjectWithException {
        private final Object obj;
        private final AtlasBaseException ex;

        public ObjectWithException(Object obj, AtlasBaseException atlasBaseException) {
            this.obj = obj;
            this.ex = atlasBaseException;
        }

        public Object getObject() {
            return this.obj;
        }

        public AtlasBaseException getException() {
            return this.ex;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            ObjectWithException objectWithException = (ObjectWithException) obj;
            return Objects.equals(this.obj, objectWithException.obj) && Objects.equals(this.ex, objectWithException.ex);
        }

        public int hashCode() {
            return Objects.hash(this.obj, this.ex);
        }
    }

    @Inject
    public MetadataRegistry(SchemaRegistry schemaRegistry, AtlasGraph atlasGraph, AtlasTypeRegistry atlasTypeRegistry, AtlasEntityStore atlasEntityStore, AtlasTypeDefStore atlasTypeDefStore, AtlasGraphUtils atlasGraphUtils, EntitySearchService entitySearchService, MetricsManager metricsManager, SnapshotManager snapshotManager) {
        try {
            this.schemaRegistry = (KafkaSchemaRegistry) schemaRegistry;
            this.schemaRegistryClient = new LocalSchemaRegistryClient(this.schemaRegistry, Arrays.asList(new AvroSchemaProvider(), new JsonSchemaProvider(), new ProtobufSchemaProvider()));
            this.graph = atlasGraph;
            this.typeRegistry = atlasTypeRegistry;
            this.entityStore = atlasEntityStore;
            this.typeDefStore = atlasTypeDefStore;
            this.atlasGraphUtils = atlasGraphUtils;
            this.searchService = entitySearchService;
            this.metricsManager = metricsManager;
            this.snapshotManager = snapshotManager;
            this.pendingCache = new ConcurrentHashMap();
            this.localCache = new PersistentCaffeineCache(Integer.valueOf(CACHE_SIZE), CACHE_EXPIRE_AFTER_WRITE, null);
            DataCatalogConfig dataCatalogConfig = new DataCatalogConfig(schemaRegistry.config().originalProperties());
            this.updateTagDefDefaultColor = dataCatalogConfig.getBoolean(DataCatalogConfig.CATALOG_TAGDEF_UPDATE_TO_DEFAULT_COLOR_CONFIG);
            this.tagColorToUpdate = dataCatalogConfig.getString(DataCatalogConfig.CATALOG_TAGDEF_UPDATE_COLOR_CONFIG);
            this.retryExecutor = new RetryExecutor(dataCatalogConfig.catalogMaxRetries(), dataCatalogConfig.catalogRetriesWaitMs());
            this.kafkaCache = createCache(dataCatalogConfig);
            this.errorCache = new CaffeineCache(Integer.valueOf(CACHE_SIZE), CACHE_EXPIRE_AFTER_WRITE, (CacheLoader) null);
        } catch (RestConfigException e) {
            throw new CacheInitializationException("Could not initialize MetadataRegistry", e);
        }
    }

    public KafkaSchemaRegistry getSchemaRegistry() {
        return this.schemaRegistry;
    }

    protected Cache<MetadataRegistryKey, MetadataRegistryValue<?>> createCache(DataCatalogConfig dataCatalogConfig) throws CacheInitializationException {
        Cache<MetadataRegistryKey, MetadataRegistryValue<?>> concurrentCache = Caches.concurrentCache(new KafkaCache(new KafkaCacheConfig(getKafkaCacheProperties(dataCatalogConfig)), new MetadataRegistryKeySerde(), new MetadataRegistryValueSerde(), createHandler(), this.localCache));
        getSchemaRegistry().addLeaderChangeListener(bool -> {
            if (bool.booleanValue()) {
                concurrentCache.reset();
                concurrentCache.sync();
            }
        });
        return concurrentCache;
    }

    private Properties getKafkaCacheProperties(DataCatalogConfig dataCatalogConfig) throws CacheInitializationException {
        Properties properties = new Properties();
        properties.putAll(this.schemaRegistry.config().originalProperties());
        Set<String> stringPropertyNames = properties.stringPropertyNames();
        for (String str : stringPropertyNames) {
            if (str.startsWith("kafkastore.")) {
                String replace = str.replace("kafkastore", "kafkacache");
                if (!stringPropertyNames.contains(replace)) {
                    properties.put(replace, properties.get(str));
                }
            }
        }
        if (properties.containsKey(DataCatalogConfig.CATALOG_TOPIC_CONFIG)) {
            properties.put("kafkacache.topic", properties.get(DataCatalogConfig.CATALOG_TOPIC_CONFIG));
        }
        properties.put("kafkacache.init.timeout.ms", Integer.MAX_VALUE);
        properties.put("kafkacache.timeout.ms", Integer.MAX_VALUE);
        properties.put("kafkacache.poll.timeout.ms", Long.valueOf(dataCatalogConfig.getCatalogIngestorBatchTimeoutMs()));
        return properties;
    }

    protected CacheUpdateHandler<MetadataRegistryKey, MetadataRegistryValue<?>> createHandler() {
        return new MetadataRegistryUpdateHandler(this, this.graph, this.typeRegistry, this.entityStore, this.typeDefStore, this.searchService, this.metricsManager, this.tsGenerator);
    }

    public SchemaRegistryConfig config() {
        return this.schemaRegistry.config();
    }

    public Map<String, Object> properties() {
        return this.schemaRegistry.properties();
    }

    public void init() {
        if (this.initialized.get()) {
            return;
        }
        this.kafkaCache.init();
        if (!this.initialized.compareAndSet(false, true)) {
            throw new IllegalStateException("Metadata registry was already initialized");
        }
        this.initLatch.countDown();
    }

    public void waitForInit() throws InterruptedException {
        this.initLatch.await();
    }

    public boolean initialized() {
        return this.initialized.get();
    }

    public Map<String, AtlasEntity.AtlasEntityWithExtInfo> pendingCache() {
        return this.pendingCache;
    }

    public Cache<MetadataRegistryKey, MetadataRegistryValue<?>> localCache() {
        return this.localCache;
    }

    public boolean isLeader() {
        return this.schemaRegistry.isLeader();
    }

    private boolean isLeader(Map<String, String> map) {
        String str = map.get(X_FORWARD_HEADER);
        return isLeader() && (str == null || Boolean.parseBoolean(str));
    }

    protected Lock lockFor(String str) {
        return this.tenantToLock.computeIfAbsent(str, str2 -> {
            return new ReentrantLock();
        });
    }

    private void lock(String str, Map<String, String> map) {
        String str2 = map.get(X_FORWARD_HEADER);
        if (str2 == null || Boolean.parseBoolean(str2)) {
            lockFor(str).lock();
        }
    }

    private void unlock(String str) {
        if (((ReentrantLock) lockFor(str)).isHeldByCurrentThread()) {
            lockFor(str).unlock();
        }
    }

    public List<TagResponse> createTagsOrForward(String str, List<Tag> list, Map<String, String> map) throws SchemaRegistryException {
        lock(str, map);
        try {
            if (isLeader(map)) {
                List<TagResponse> createTags = createTags(str, list);
                unlock(str);
                return createTags;
            }
            if (this.schemaRegistry.leaderIdentity() == null) {
                throw new UnknownLeaderException("Request failed since leader is unknown");
            }
            List<TagResponse> forwardCreateTagsRequestToLeader = forwardCreateTagsRequestToLeader(list, map);
            unlock(str);
            return forwardCreateTagsRequestToLeader;
        } catch (Throwable th) {
            unlock(str);
            throw th;
        }
    }

    private List<TagResponse> forwardCreateTagsRequestToLeader(List<Tag> list, Map<String, String> map) throws SchemaRegistryRequestForwardingException {
        RestService leaderRestService = this.schemaRegistry.leaderRestService();
        UrlList baseUrls = leaderRestService.getBaseUrls();
        String uri = UriBuilder.fromPath("/catalog/v1/entity/tags").build(new Object[0]).toString();
        log.debug(String.format("Forwarding create tags request to %s", baseUrls));
        try {
            return (List) leaderRestService.httpRequest(uri, "POST", toJson(list), map, LIST_TAGS_TYPE);
        } catch (RestClientException e) {
            throw new RestException(e.getMessage(), e.getStatus(), e.getErrorCode(), e);
        } catch (IOException e2) {
            throw new SchemaRegistryRequestForwardingException(String.format("Unexpected error while forwarding the create tags request to %s", baseUrls), e2);
        }
    }

    private List<TagResponse> createTags(String str, List<Tag> list) {
        this.kafkaCache.sync();
        ArrayList arrayList = new ArrayList();
        for (Tag tag : list) {
            Tag tag2 = null;
            try {
                tag2 = getTag(str, tag.getEntityType(), tag.getEntityName(), tag.getTypeName());
            } catch (AtlasBaseException e) {
            }
            if (tag2 == null || tag2.getEntityStatus() != AtlasEntity.Status.ACTIVE) {
                try {
                    if (allowTagOperation(tag)) {
                        MetadataRegistryKey metadataRegistryKey = new MetadataRegistryKey(str, MetadataRegistryOp.CREATE, MetadataRegistryKind.TAG, tag.getTypeName(), null, null, tag.getEntityType(), tag.getEntityName());
                        long next = this.tsGenerator.next();
                        this.kafkaCache.put(metadataRegistryKey, new TagValue(null, tag, false, false, next));
                        MetadataRegistryValue metadataRegistryValue = (MetadataRegistryValue) this.kafkaCache.get(metadataRegistryKey);
                        if (metadataRegistryValue == null || metadataRegistryValue.getTimestamp() != next) {
                            ObjectWithException objectWithException = (ObjectWithException) this.errorCache.get(Long.valueOf(next));
                            arrayList.add(new TagResponse(tag, ExceptionMapperUtil.toErrorMessage(objectWithException != null ? objectWithException.getException() : UNKNOWN_ERROR)));
                        } else {
                            arrayList.add(new TagResponse(((TagValue) metadataRegistryValue).getNewValue()));
                        }
                    } else {
                        arrayList.add(new TagResponse(tag, RestInvalidTagOperationException.toErrorMessage(tag.getTypeName())));
                    }
                } catch (AtlasBaseException e2) {
                    arrayList.add(new TagResponse(tag, ExceptionMapperUtil.toErrorMessage(e2)));
                }
            } else {
                arrayList.add(new TagResponse(tag, ExceptionMapperUtil.toErrorMessage(new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_ALREADY_ASSOCIATED, new String[]{tag.getEntityName(), tag.getTypeName()}))));
            }
        }
        return arrayList;
    }

    public List<BusinessMetadataResponse> createBusinessMetadataOrForward(String str, List<BusinessMetadata> list, Map<String, String> map) throws SchemaRegistryException {
        lock(str, map);
        try {
            if (isLeader(map)) {
                List<BusinessMetadataResponse> createBusinessMetadata = createBusinessMetadata(str, list);
                unlock(str);
                return createBusinessMetadata;
            }
            if (this.schemaRegistry.leaderIdentity() == null) {
                throw new UnknownLeaderException("Request failed since leader is unknown");
            }
            List<BusinessMetadataResponse> forwardCreateBusinessMetadataRequestToLeader = forwardCreateBusinessMetadataRequestToLeader(list, map);
            unlock(str);
            return forwardCreateBusinessMetadataRequestToLeader;
        } catch (Throwable th) {
            unlock(str);
            throw th;
        }
    }

    private List<BusinessMetadataResponse> forwardCreateBusinessMetadataRequestToLeader(List<BusinessMetadata> list, Map<String, String> map) throws SchemaRegistryRequestForwardingException {
        RestService leaderRestService = this.schemaRegistry.leaderRestService();
        UrlList baseUrls = leaderRestService.getBaseUrls();
        String uri = UriBuilder.fromPath("/catalog/v1/entity/businessmetadata").build(new Object[0]).toString();
        log.debug(String.format("Forwarding create bm request to %s", baseUrls));
        try {
            return (List) leaderRestService.httpRequest(uri, "POST", toJson(list), map, LIST_BM_TYPE);
        } catch (RestClientException e) {
            throw new RestException(e.getMessage(), e.getStatus(), e.getErrorCode(), e);
        } catch (IOException e2) {
            throw new SchemaRegistryRequestForwardingException(String.format("Unexpected error while forwarding the create bm request to %s", baseUrls), e2);
        }
    }

    private List<BusinessMetadataResponse> createBusinessMetadata(String str, List<BusinessMetadata> list) {
        this.kafkaCache.sync();
        ArrayList arrayList = new ArrayList();
        for (BusinessMetadata businessMetadata : list) {
            BusinessMetadata businessMetadata2 = null;
            try {
                businessMetadata2 = getBusinessMetadata(str, businessMetadata.getEntityType(), businessMetadata.getEntityName(), businessMetadata.getTypeName());
            } catch (AtlasBaseException e) {
            }
            if (businessMetadata2 != null) {
                arrayList.add(new BusinessMetadataResponse(businessMetadata, ExceptionMapperUtil.toErrorMessage(new AtlasBaseException(AtlasErrorCode.BUSINESS_METADATA_ATTRIBUTE_ALREADY_EXISTS, new String[]{businessMetadata.getEntityName(), businessMetadata.getTypeName()}))));
            } else {
                MetadataRegistryKey metadataRegistryKey = new MetadataRegistryKey(str, MetadataRegistryOp.CREATE, MetadataRegistryKind.BUSINESS_METADATA, businessMetadata.getTypeName(), null, null, businessMetadata.getEntityType(), businessMetadata.getEntityName());
                long next = this.tsGenerator.next();
                this.kafkaCache.put(metadataRegistryKey, new BusinessMetadataValue(null, businessMetadata, false, false, next));
                MetadataRegistryValue metadataRegistryValue = (MetadataRegistryValue) this.kafkaCache.get(metadataRegistryKey);
                if (metadataRegistryValue == null || metadataRegistryValue.getTimestamp() != next) {
                    ObjectWithException objectWithException = (ObjectWithException) this.errorCache.get(Long.valueOf(next));
                    arrayList.add(new BusinessMetadataResponse(businessMetadata, ExceptionMapperUtil.toErrorMessage(objectWithException != null ? objectWithException.getException() : UNKNOWN_ERROR)));
                } else {
                    arrayList.add(new BusinessMetadataResponse(((BusinessMetadataValue) metadataRegistryValue).getNewValue()));
                }
            }
        }
        return arrayList;
    }

    public Tag getTag(String str, String str2, String str3, String str4) throws AtlasBaseException {
        AtlasClassification classification = this.entityStore.getClassification(getGuid(str, str2, str3), str4);
        classification.setEntityGuid((String) null);
        return new Tag(classification, str2, str3);
    }

    public BusinessMetadata getBusinessMetadata(String str, String str2, String str3, String str4) throws AtlasBaseException {
        AtlasEntity entity = this.entityStore.getById(getGuid(str, str2, str3)).getEntity();
        if (entity.getBusinessAttributes() == null || entity.getBusinessAttributes().get(str4) == null) {
            return null;
        }
        return new BusinessMetadata(new AtlasStruct(str4, (Map) entity.getBusinessAttributes().get(str4)), str2, str3);
    }

    public List<TagResponse> updateTagsOrForward(String str, List<Tag> list, Map<String, String> map) throws SchemaRegistryException {
        lock(str, map);
        try {
            if (isLeader(map)) {
                List<TagResponse> updateTags = updateTags(str, list);
                unlock(str);
                return updateTags;
            }
            if (this.schemaRegistry.leaderIdentity() == null) {
                throw new UnknownLeaderException("Request failed since leader is unknown");
            }
            List<TagResponse> forwardUpdateTagsRequestToLeader = forwardUpdateTagsRequestToLeader(list, map);
            unlock(str);
            return forwardUpdateTagsRequestToLeader;
        } catch (Throwable th) {
            unlock(str);
            throw th;
        }
    }

    private List<TagResponse> forwardUpdateTagsRequestToLeader(List<Tag> list, Map<String, String> map) throws SchemaRegistryRequestForwardingException {
        RestService leaderRestService = this.schemaRegistry.leaderRestService();
        UrlList baseUrls = leaderRestService.getBaseUrls();
        String uri = UriBuilder.fromPath("/catalog/v1/entity/tags").build(new Object[0]).toString();
        log.debug(String.format("Forwarding update tags request to %s", baseUrls));
        try {
            return (List) leaderRestService.httpRequest(uri, "PUT", toJson(list), map, LIST_TAGS_TYPE);
        } catch (RestClientException e) {
            throw new RestException(e.getMessage(), e.getStatus(), e.getErrorCode(), e);
        } catch (IOException e2) {
            throw new SchemaRegistryRequestForwardingException(String.format("Unexpected error while forwarding the update tags request to %s", baseUrls), e2);
        }
    }

    private List<TagResponse> updateTags(String str, List<Tag> list) {
        this.kafkaCache.sync();
        ArrayList arrayList = new ArrayList();
        for (Tag tag : list) {
            try {
                Tag tag2 = getTag(str, tag.getEntityType(), tag.getEntityName(), tag.getTypeName());
                if (tag2.getEntityStatus() != AtlasEntity.Status.ACTIVE) {
                    arrayList.add(new TagResponse(tag, ExceptionMapperUtil.toErrorMessage(new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_ASSOCIATED_WITH_ENTITY, new String[]{tag.getTypeName()}))));
                } else {
                    try {
                        if (allowTagOperation(tag)) {
                            MetadataRegistryKey metadataRegistryKey = new MetadataRegistryKey(str, MetadataRegistryOp.UPDATE, MetadataRegistryKind.TAG, tag.getTypeName(), null, null, tag.getEntityType(), tag.getEntityName());
                            long next = this.tsGenerator.next();
                            this.kafkaCache.put(metadataRegistryKey, new TagValue(tag2, tag, false, false, next));
                            MetadataRegistryValue metadataRegistryValue = (MetadataRegistryValue) this.kafkaCache.get(metadataRegistryKey);
                            if (metadataRegistryValue == null || metadataRegistryValue.getTimestamp() != next) {
                                ObjectWithException objectWithException = (ObjectWithException) this.errorCache.get(Long.valueOf(next));
                                arrayList.add(new TagResponse(tag, ExceptionMapperUtil.toErrorMessage(objectWithException != null ? objectWithException.getException() : UNKNOWN_ERROR)));
                            } else {
                                arrayList.add(new TagResponse(((TagValue) metadataRegistryValue).getNewValue()));
                            }
                        } else {
                            arrayList.add(new TagResponse(tag, RestInvalidTagOperationException.toErrorMessage(tag.getTypeName())));
                        }
                    } catch (AtlasBaseException e) {
                        arrayList.add(new TagResponse(tag, ExceptionMapperUtil.toErrorMessage(e)));
                    }
                }
            } catch (AtlasBaseException e2) {
                arrayList.add(new TagResponse(tag, ExceptionMapperUtil.toErrorMessage(e2)));
            }
        }
        return arrayList;
    }

    public List<BusinessMetadataResponse> updateBusinessMetadataOrForward(String str, List<BusinessMetadata> list, Map<String, String> map) throws SchemaRegistryException {
        lock(str, map);
        try {
            if (isLeader(map)) {
                List<BusinessMetadataResponse> updateBusinessMetadata = updateBusinessMetadata(str, list);
                unlock(str);
                return updateBusinessMetadata;
            }
            if (this.schemaRegistry.leaderIdentity() == null) {
                throw new UnknownLeaderException("Request failed since leader is unknown");
            }
            List<BusinessMetadataResponse> forwardUpdateBusinessMetadataRequestToLeader = forwardUpdateBusinessMetadataRequestToLeader(list, map);
            unlock(str);
            return forwardUpdateBusinessMetadataRequestToLeader;
        } catch (Throwable th) {
            unlock(str);
            throw th;
        }
    }

    private List<BusinessMetadataResponse> forwardUpdateBusinessMetadataRequestToLeader(List<BusinessMetadata> list, Map<String, String> map) throws SchemaRegistryRequestForwardingException {
        RestService leaderRestService = this.schemaRegistry.leaderRestService();
        UrlList baseUrls = leaderRestService.getBaseUrls();
        String uri = UriBuilder.fromPath("/catalog/v1/entity/businessmetadata").build(new Object[0]).toString();
        log.debug(String.format("Forwarding update bms request to %s", baseUrls));
        try {
            return (List) leaderRestService.httpRequest(uri, "PUT", toJson(list), map, LIST_BM_TYPE);
        } catch (RestClientException e) {
            throw new RestException(e.getMessage(), e.getStatus(), e.getErrorCode(), e);
        } catch (IOException e2) {
            throw new SchemaRegistryRequestForwardingException(String.format("Unexpected error while forwarding the update bms request to %s", baseUrls), e2);
        }
    }

    private List<BusinessMetadataResponse> updateBusinessMetadata(String str, List<BusinessMetadata> list) {
        this.kafkaCache.sync();
        ArrayList arrayList = new ArrayList();
        for (BusinessMetadata businessMetadata : list) {
            try {
                BusinessMetadata businessMetadata2 = getBusinessMetadata(str, businessMetadata.getEntityType(), businessMetadata.getEntityName(), businessMetadata.getTypeName());
                if (businessMetadata2 == null) {
                    arrayList.add(new BusinessMetadataResponse(businessMetadata, ExceptionMapperUtil.toErrorMessage(new AtlasBaseException(AtlasErrorCode.BUSINESS_METADATA_ATTRIBUTE_DOES_NOT_EXIST, new String[]{businessMetadata.getTypeName()}))));
                } else {
                    MetadataRegistryKey metadataRegistryKey = new MetadataRegistryKey(str, MetadataRegistryOp.UPDATE, MetadataRegistryKind.BUSINESS_METADATA, businessMetadata.getTypeName(), null, null, businessMetadata.getEntityType(), businessMetadata.getEntityName());
                    long next = this.tsGenerator.next();
                    this.kafkaCache.put(metadataRegistryKey, new BusinessMetadataValue(businessMetadata2, businessMetadata, false, false, next));
                    MetadataRegistryValue metadataRegistryValue = (MetadataRegistryValue) this.kafkaCache.get(metadataRegistryKey);
                    if (metadataRegistryValue == null || metadataRegistryValue.getTimestamp() != next) {
                        ObjectWithException objectWithException = (ObjectWithException) this.errorCache.get(Long.valueOf(next));
                        arrayList.add(new BusinessMetadataResponse(businessMetadata, ExceptionMapperUtil.toErrorMessage(objectWithException != null ? objectWithException.getException() : UNKNOWN_ERROR)));
                    } else {
                        arrayList.add(new BusinessMetadataResponse(((BusinessMetadataValue) metadataRegistryValue).getNewValue()));
                    }
                }
            } catch (AtlasBaseException e) {
                arrayList.add(new BusinessMetadataResponse(businessMetadata, ExceptionMapperUtil.toErrorMessage(e)));
            }
        }
        return arrayList;
    }

    public void deleteTagOrForward(String str, String str2, String str3, String str4, Map<String, String> map) throws SchemaRegistryException, AtlasBaseException {
        lock(str, map);
        try {
            if (isLeader(map)) {
                deleteTag(str, str2, str3, str4);
            } else {
                if (this.schemaRegistry.leaderIdentity() == null) {
                    throw new UnknownLeaderException("Request failed since leader is unknown");
                }
                forwardDeleteTagRequestToLeader(str2, str3, str4, map);
            }
        } finally {
            unlock(str);
        }
    }

    private void forwardDeleteTagRequestToLeader(String str, String str2, String str3, Map<String, String> map) throws SchemaRegistryRequestForwardingException {
        RestService leaderRestService = this.schemaRegistry.leaderRestService();
        UrlList baseUrls = leaderRestService.getBaseUrls();
        String uri = UriBuilder.fromPath("/catalog/v1/entity/type/{typeName}/name/{qualifiedName}/tags/{tagName}").build(new Object[]{str, str2, str3}).toString();
        log.debug(String.format("Forwarding delete tag request to %s", baseUrls));
        try {
            leaderRestService.httpRequest(uri, "DELETE", (byte[]) null, map, VOID_TYPE);
        } catch (IOException e) {
            throw new SchemaRegistryRequestForwardingException(String.format("Unexpected error while forwarding the delete tag request to %s", baseUrls), e);
        } catch (RestClientException e2) {
            throw new RestException(e2.getMessage(), e2.getStatus(), e2.getErrorCode(), e2);
        }
    }

    private void deleteTag(String str, String str2, String str3, String str4) throws AtlasBaseException {
        this.kafkaCache.sync();
        try {
            Tag tag = getTag(str, str2, str3, str4);
            if (tag == null || tag.getEntityStatus() != AtlasEntity.Status.ACTIVE) {
                throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_FOUND, new String[]{str4});
            }
            if (!allowTagOperation(tag)) {
                throw new RestInvalidTagOperationException(tag.getTypeName());
            }
            MetadataRegistryKey metadataRegistryKey = new MetadataRegistryKey(str, MetadataRegistryOp.DELETE, MetadataRegistryKind.TAG, str4, null, null, str2, str3);
            long next = this.tsGenerator.next();
            this.kafkaCache.put(metadataRegistryKey, new TagValue(tag, null, false, false, next));
            MetadataRegistryValue metadataRegistryValue = (MetadataRegistryValue) this.kafkaCache.get(metadataRegistryKey);
            if (metadataRegistryValue == null || metadataRegistryValue.getTimestamp() != next) {
                ObjectWithException objectWithException = (ObjectWithException) this.errorCache.get(Long.valueOf(next));
                throw (objectWithException != null ? objectWithException.getException() : UNKNOWN_ERROR);
            }
            this.kafkaCache.put(metadataRegistryKey, (Object) null);
            this.kafkaCache.put(new MetadataRegistryKey(str, MetadataRegistryOp.UPDATE, MetadataRegistryKind.TAG, str4, null, null, str2, str3), (Object) null);
            this.kafkaCache.put(new MetadataRegistryKey(str, MetadataRegistryOp.CREATE, MetadataRegistryKind.TAG, str4, null, null, str2, str3), (Object) null);
        } catch (AtlasBaseException e) {
            if (isJanusGraphNullPointerException(e)) {
                log.error("JanusGraph NullPointerException in MetadataRegistry.deleteTag(tenant={}, entityType={}, entityName={}, tagName={})", new Object[]{str, str2, str3, str4});
                this.metricsManager.recordJanusGraphNullPointerExceptionError(1);
            }
            throw e;
        }
    }

    public void deleteBusinessMetadataOrForward(String str, String str2, String str3, String str4, Map<String, String> map) throws SchemaRegistryException, AtlasBaseException {
        lock(str, map);
        try {
            if (isLeader(map)) {
                deleteBusinessMetadata(str, str2, str3, str4);
            } else {
                if (this.schemaRegistry.leaderIdentity() == null) {
                    throw new UnknownLeaderException("Request failed since leader is unknown");
                }
                forwardDeleteBusinessMetadataRequestToLeader(str2, str3, str4, map);
            }
        } finally {
            unlock(str);
        }
    }

    private void forwardDeleteBusinessMetadataRequestToLeader(String str, String str2, String str3, Map<String, String> map) throws SchemaRegistryRequestForwardingException {
        RestService leaderRestService = this.schemaRegistry.leaderRestService();
        UrlList baseUrls = leaderRestService.getBaseUrls();
        String uri = UriBuilder.fromPath("/catalog/v1/entity/type/{typeName}/name/{qualifiedName}/businessmetadata/{bmName}").build(new Object[]{str, str2, str3}).toString();
        log.debug(String.format("Forwarding delete bm request to %s", baseUrls));
        try {
            leaderRestService.httpRequest(uri, "DELETE", (byte[]) null, map, VOID_TYPE);
        } catch (RestClientException e) {
            throw new RestException(e.getMessage(), e.getStatus(), e.getErrorCode(), e);
        } catch (IOException e2) {
            throw new SchemaRegistryRequestForwardingException(String.format("Unexpected error while forwarding the delete bm request to %s", baseUrls), e2);
        }
    }

    private void deleteBusinessMetadata(String str, String str2, String str3, String str4) throws AtlasBaseException {
        this.kafkaCache.sync();
        try {
            BusinessMetadata businessMetadata = getBusinessMetadata(str, str2, str3, str4);
            MetadataRegistryKey metadataRegistryKey = new MetadataRegistryKey(str, MetadataRegistryOp.DELETE, MetadataRegistryKind.BUSINESS_METADATA, str4, null, null, str2, str3);
            long next = this.tsGenerator.next();
            this.kafkaCache.put(metadataRegistryKey, new BusinessMetadataValue(businessMetadata, null, false, false, next));
            MetadataRegistryValue metadataRegistryValue = (MetadataRegistryValue) this.kafkaCache.get(metadataRegistryKey);
            if (metadataRegistryValue == null || metadataRegistryValue.getTimestamp() != next) {
                ObjectWithException objectWithException = (ObjectWithException) this.errorCache.get(Long.valueOf(next));
                throw (objectWithException != null ? objectWithException.getException() : UNKNOWN_ERROR);
            }
            this.kafkaCache.put(metadataRegistryKey, (Object) null);
            this.kafkaCache.put(new MetadataRegistryKey(str, MetadataRegistryOp.UPDATE, MetadataRegistryKind.BUSINESS_METADATA, str4, null, null, str2, str3), (Object) null);
            this.kafkaCache.put(new MetadataRegistryKey(str, MetadataRegistryOp.CREATE, MetadataRegistryKind.BUSINESS_METADATA, str4, null, null, str2, str3), (Object) null);
        } catch (AtlasBaseException e) {
            if (isJanusGraphNullPointerException(e)) {
                log.error("JanusGraph NullPointerException in MetadataRegistry.deleteBusinessMetadata(tenant={}, entityType={}, entityName={}, bmName={})", new Object[]{str, str2, str3, str4});
                this.metricsManager.recordJanusGraphNullPointerExceptionError(1);
            }
            throw e;
        }
    }

    public List<TagDefResponse> createTagDefsOrForward(String str, List<TagDef> list, Map<String, String> map) throws SchemaRegistryException {
        lock(str, map);
        try {
            if (isLeader(map)) {
                List<TagDefResponse> createTagDefs = createTagDefs(str, list);
                unlock(str);
                return createTagDefs;
            }
            if (this.schemaRegistry.leaderIdentity() == null) {
                throw new UnknownLeaderException("Request failed since leader is unknown");
            }
            List<TagDefResponse> forwardCreateTagDefsRequestToLeader = forwardCreateTagDefsRequestToLeader(list, map);
            unlock(str);
            return forwardCreateTagDefsRequestToLeader;
        } catch (Throwable th) {
            unlock(str);
            throw th;
        }
    }

    private List<TagDefResponse> forwardCreateTagDefsRequestToLeader(List<TagDef> list, Map<String, String> map) throws SchemaRegistryRequestForwardingException {
        RestService leaderRestService = this.schemaRegistry.leaderRestService();
        UrlList baseUrls = leaderRestService.getBaseUrls();
        String uri = UriBuilder.fromPath("/catalog/v1/types/tagdefs").build(new Object[0]).toString();
        log.debug(String.format("Forwarding create tagdefs request to %s", baseUrls));
        try {
            return (List) leaderRestService.httpRequest(uri, "POST", toJson(list), map, LIST_TAG_DEFS_TYPE);
        } catch (IOException e) {
            throw new SchemaRegistryRequestForwardingException(String.format("Unexpected error while forwarding the create tagdefs request to %s", baseUrls), e);
        } catch (RestClientException e2) {
            throw new RestException(e2.getMessage(), e2.getStatus(), e2.getErrorCode(), e2);
        }
    }

    private List<TagDefResponse> createTagDefs(String str, List<TagDef> list) throws SchemaRegistryException {
        this.kafkaCache.sync();
        ArrayList arrayList = new ArrayList();
        for (TagDef tagDef : list) {
            if (tagDef.getName() == null || !AtlasTypeUtil.isValidTypeName(tagDef.getName())) {
                arrayList.add(new TagDefResponse(tagDef, ExceptionMapperUtil.toErrorMessage(new AtlasBaseException(AtlasErrorCode.TYPE_NAME_INVALID_FORMAT, new String[]{tagDef.getName(), tagDef.getCategory().name()}))));
            } else {
                TagDef tagDef2 = null;
                try {
                    tagDef2 = getTagDef(str, tagDef.getName());
                } catch (AtlasBaseException e) {
                }
                if (tagDef2 != null) {
                    arrayList.add(new TagDefResponse(tagDef, ExceptionMapperUtil.toErrorMessage(new AtlasBaseException(AtlasErrorCode.TYPE_ALREADY_EXISTS, new String[]{tagDef.getName()}))));
                } else {
                    MetadataRegistryKey metadataRegistryKey = new MetadataRegistryKey(str, MetadataRegistryOp.CREATE, MetadataRegistryKind.TAG_DEF, tagDef.getName(), DEFAULT_VERSION, null, null, null);
                    long next = this.tsGenerator.next();
                    this.kafkaCache.put(metadataRegistryKey, new TagDefValue(null, tagDef, false, false, next));
                    MetadataRegistryValue metadataRegistryValue = (MetadataRegistryValue) this.kafkaCache.get(metadataRegistryKey);
                    if (metadataRegistryValue == null || metadataRegistryValue.getTimestamp() != next) {
                        ObjectWithException objectWithException = (ObjectWithException) this.errorCache.get(Long.valueOf(next));
                        arrayList.add(new TagDefResponse(tagDef, ExceptionMapperUtil.toErrorMessage(objectWithException != null ? objectWithException.getException() : UNKNOWN_ERROR)));
                    } else {
                        arrayList.add(new TagDefResponse(((TagDefValue) metadataRegistryValue).getNewValue()));
                    }
                }
            }
        }
        return arrayList;
    }

    public List<BusinessMetadataDefResponse> createBusinessMetadataDefsOrForward(String str, List<AtlasBusinessMetadataDef> list, Map<String, String> map) throws SchemaRegistryException {
        lock(str, map);
        try {
            if (isLeader(map)) {
                List<BusinessMetadataDefResponse> createBusinessMetadataDefs = createBusinessMetadataDefs(str, list);
                unlock(str);
                return createBusinessMetadataDefs;
            }
            if (this.schemaRegistry.leaderIdentity() == null) {
                throw new UnknownLeaderException("Request failed since leader is unknown");
            }
            List<BusinessMetadataDefResponse> forwardCreateBusinessMetadataDefsRequestToLeader = forwardCreateBusinessMetadataDefsRequestToLeader(list, map);
            unlock(str);
            return forwardCreateBusinessMetadataDefsRequestToLeader;
        } catch (Throwable th) {
            unlock(str);
            throw th;
        }
    }

    private List<BusinessMetadataDefResponse> forwardCreateBusinessMetadataDefsRequestToLeader(List<AtlasBusinessMetadataDef> list, Map<String, String> map) throws SchemaRegistryRequestForwardingException {
        RestService leaderRestService = this.schemaRegistry.leaderRestService();
        UrlList baseUrls = leaderRestService.getBaseUrls();
        String uri = UriBuilder.fromPath("/catalog/v1/types/businessmetadatadefs").build(new Object[0]).toString();
        log.debug(String.format("Forwarding create bmdefs request to %s", baseUrls));
        try {
            return (List) leaderRestService.httpRequest(uri, "POST", toJson(list), map, LIST_BM_DEFS_TYPE);
        } catch (IOException e) {
            throw new SchemaRegistryRequestForwardingException(String.format("Unexpected error while forwarding the create bmdefs request to %s", baseUrls), e);
        } catch (RestClientException e2) {
            throw new RestException(e2.getMessage(), e2.getStatus(), e2.getErrorCode(), e2);
        }
    }

    private List<BusinessMetadataDefResponse> createBusinessMetadataDefs(String str, List<AtlasBusinessMetadataDef> list) throws SchemaRegistryException {
        this.kafkaCache.sync();
        ArrayList arrayList = new ArrayList();
        for (AtlasBusinessMetadataDef atlasBusinessMetadataDef : list) {
            if (atlasBusinessMetadataDef.getName() == null || !AtlasTypeUtil.isValidTypeName(atlasBusinessMetadataDef.getName())) {
                arrayList.add(new BusinessMetadataDefResponse(atlasBusinessMetadataDef, ExceptionMapperUtil.toErrorMessage(new AtlasBaseException(AtlasErrorCode.TYPE_NAME_INVALID_FORMAT, new String[]{atlasBusinessMetadataDef.getName(), atlasBusinessMetadataDef.getCategory().name()}))));
            } else {
                AtlasBusinessMetadataDef atlasBusinessMetadataDef2 = null;
                try {
                    atlasBusinessMetadataDef2 = getBusinessMetadataDef(str, atlasBusinessMetadataDef.getName());
                } catch (AtlasBaseException e) {
                }
                if (atlasBusinessMetadataDef2 != null) {
                    arrayList.add(new BusinessMetadataDefResponse(atlasBusinessMetadataDef, ExceptionMapperUtil.toErrorMessage(new AtlasBaseException(AtlasErrorCode.TYPE_ALREADY_EXISTS, new String[]{atlasBusinessMetadataDef.getName()}))));
                } else {
                    MetadataRegistryKey metadataRegistryKey = new MetadataRegistryKey(str, MetadataRegistryOp.CREATE, MetadataRegistryKind.BUSINESS_METADATA_DEF, atlasBusinessMetadataDef.getName(), DEFAULT_VERSION, null, null, null);
                    long next = this.tsGenerator.next();
                    this.kafkaCache.put(metadataRegistryKey, new BusinessMetadataDefValue(null, atlasBusinessMetadataDef, false, false, next));
                    MetadataRegistryValue metadataRegistryValue = (MetadataRegistryValue) this.kafkaCache.get(metadataRegistryKey);
                    if (metadataRegistryValue == null || metadataRegistryValue.getTimestamp() != next) {
                        ObjectWithException objectWithException = (ObjectWithException) this.errorCache.get(Long.valueOf(next));
                        arrayList.add(new BusinessMetadataDefResponse(atlasBusinessMetadataDef, ExceptionMapperUtil.toErrorMessage(objectWithException != null ? objectWithException.getException() : UNKNOWN_ERROR)));
                    } else {
                        arrayList.add(new BusinessMetadataDefResponse(((BusinessMetadataDefValue) metadataRegistryValue).getNewValue()));
                    }
                }
            }
        }
        return arrayList;
    }

    public TagDef getTagDef(String str, String str2) throws AtlasBaseException {
        AtlasClassificationDef classificationDefByName = this.typeDefStore.getClassificationDefByName(QualifiedNameGenerator.ensureTypeTenantPrefix(str, str2));
        classificationDefByName.setGuid((String) null);
        return new TagDef(classificationDefByName);
    }

    public List<TagDefResponse> updateTagDefsOrForward(String str, List<TagDef> list, Map<String, String> map) throws SchemaRegistryException {
        lock(str, map);
        try {
            if (isLeader(map)) {
                List<TagDefResponse> updateTagDefs = updateTagDefs(str, list);
                unlock(str);
                return updateTagDefs;
            }
            if (this.schemaRegistry.leaderIdentity() == null) {
                throw new UnknownLeaderException("Request failed since leader is unknown");
            }
            List<TagDefResponse> forwardUpdateTagDefsRequestToLeader = forwardUpdateTagDefsRequestToLeader(list, map);
            unlock(str);
            return forwardUpdateTagDefsRequestToLeader;
        } catch (Throwable th) {
            unlock(str);
            throw th;
        }
    }

    public AtlasBusinessMetadataDef getBusinessMetadataDef(String str, String str2) throws AtlasBaseException {
        AtlasBusinessMetadataDef businessMetadataDefByName = this.typeDefStore.getBusinessMetadataDefByName(QualifiedNameGenerator.ensureTypeTenantPrefix(str, str2));
        businessMetadataDefByName.setGuid((String) null);
        return businessMetadataDefByName;
    }

    private List<TagDefResponse> forwardUpdateTagDefsRequestToLeader(List<TagDef> list, Map<String, String> map) throws SchemaRegistryRequestForwardingException {
        RestService leaderRestService = this.schemaRegistry.leaderRestService();
        UrlList baseUrls = leaderRestService.getBaseUrls();
        String uri = UriBuilder.fromPath("/catalog/v1/types/tagdefs").build(new Object[0]).toString();
        log.debug(String.format("Forwarding update tagdefs request to %s", baseUrls));
        try {
            return (List) leaderRestService.httpRequest(uri, "PUT", toJson(list), map, LIST_TAG_DEFS_TYPE);
        } catch (IOException e) {
            throw new SchemaRegistryRequestForwardingException(String.format("Unexpected error while forwarding the update tagdefs request to %s", baseUrls), e);
        } catch (RestClientException e2) {
            throw new RestException(e2.getMessage(), e2.getStatus(), e2.getErrorCode(), e2);
        }
    }

    private List<TagDefResponse> updateTagDefs(String str, List<TagDef> list) {
        this.kafkaCache.sync();
        ArrayList arrayList = new ArrayList();
        for (TagDef tagDef : list) {
            if (tagDef.getName() == null || !AtlasTypeUtil.isValidTypeName(tagDef.getName())) {
                arrayList.add(new TagDefResponse(tagDef, ExceptionMapperUtil.toErrorMessage(new AtlasBaseException(AtlasErrorCode.TYPE_NAME_INVALID_FORMAT, new String[]{tagDef.getName(), tagDef.getCategory().name()}))));
            } else {
                try {
                    TagDef tagDef2 = getTagDef(str, tagDef.getName());
                    MetadataRegistryKey metadataRegistryKey = new MetadataRegistryKey(str, MetadataRegistryOp.UPDATE, MetadataRegistryKind.TAG_DEF, tagDef.getName(), incrementVersion(tagDef2.getTypeVersion()), null, null, null);
                    long next = this.tsGenerator.next();
                    this.kafkaCache.put(metadataRegistryKey, new TagDefValue(tagDef2, tagDef, false, false, next));
                    MetadataRegistryValue metadataRegistryValue = (MetadataRegistryValue) this.kafkaCache.get(metadataRegistryKey);
                    if (metadataRegistryValue == null || metadataRegistryValue.getTimestamp() != next) {
                        ObjectWithException objectWithException = (ObjectWithException) this.errorCache.get(Long.valueOf(next));
                        arrayList.add(new TagDefResponse(tagDef, ExceptionMapperUtil.toErrorMessage(objectWithException != null ? objectWithException.getException() : UNKNOWN_ERROR)));
                    } else {
                        arrayList.add(new TagDefResponse(((TagDefValue) metadataRegistryValue).getNewValue()));
                    }
                } catch (AtlasBaseException e) {
                    arrayList.add(new TagDefResponse(tagDef, ExceptionMapperUtil.toErrorMessage(e)));
                }
            }
        }
        return arrayList;
    }

    public List<BusinessMetadataDefResponse> updateBusinessMetadataDefsOrForward(String str, List<AtlasBusinessMetadataDef> list, Map<String, String> map) throws SchemaRegistryException {
        lock(str, map);
        try {
            if (isLeader(map)) {
                List<BusinessMetadataDefResponse> updateBusinessMetadataDefs = updateBusinessMetadataDefs(str, list);
                unlock(str);
                return updateBusinessMetadataDefs;
            }
            if (this.schemaRegistry.leaderIdentity() == null) {
                throw new UnknownLeaderException("Request failed since leader is unknown");
            }
            List<BusinessMetadataDefResponse> forwardUpdateBusinessMetadataDefsRequestToLeader = forwardUpdateBusinessMetadataDefsRequestToLeader(list, map);
            unlock(str);
            return forwardUpdateBusinessMetadataDefsRequestToLeader;
        } catch (Throwable th) {
            unlock(str);
            throw th;
        }
    }

    private List<BusinessMetadataDefResponse> forwardUpdateBusinessMetadataDefsRequestToLeader(List<AtlasBusinessMetadataDef> list, Map<String, String> map) throws SchemaRegistryRequestForwardingException {
        RestService leaderRestService = this.schemaRegistry.leaderRestService();
        UrlList baseUrls = leaderRestService.getBaseUrls();
        String uri = UriBuilder.fromPath("/catalog/v1/types/businessmetadatadefs").build(new Object[0]).toString();
        log.debug(String.format("Forwarding update businessmetadatadefs request to %s", baseUrls));
        try {
            return (List) leaderRestService.httpRequest(uri, "PUT", toJson(list), map, LIST_BM_DEFS_TYPE);
        } catch (IOException e) {
            throw new SchemaRegistryRequestForwardingException(String.format("Unexpected error while forwarding the update bmdefs request to %s", baseUrls), e);
        } catch (RestClientException e2) {
            throw new RestException(e2.getMessage(), e2.getStatus(), e2.getErrorCode(), e2);
        }
    }

    private List<BusinessMetadataDefResponse> updateBusinessMetadataDefs(String str, List<AtlasBusinessMetadataDef> list) {
        this.kafkaCache.sync();
        ArrayList arrayList = new ArrayList();
        for (AtlasBusinessMetadataDef atlasBusinessMetadataDef : list) {
            if (atlasBusinessMetadataDef.getName() == null || !AtlasTypeUtil.isValidTypeName(atlasBusinessMetadataDef.getName())) {
                arrayList.add(new BusinessMetadataDefResponse(atlasBusinessMetadataDef, ExceptionMapperUtil.toErrorMessage(new AtlasBaseException(AtlasErrorCode.TYPE_NAME_INVALID_FORMAT, new String[]{atlasBusinessMetadataDef.getName(), atlasBusinessMetadataDef.getCategory().name()}))));
            } else {
                try {
                    AtlasBusinessMetadataDef businessMetadataDef = getBusinessMetadataDef(str, atlasBusinessMetadataDef.getName());
                    MetadataRegistryKey metadataRegistryKey = new MetadataRegistryKey(str, MetadataRegistryOp.UPDATE, MetadataRegistryKind.BUSINESS_METADATA_DEF, atlasBusinessMetadataDef.getName(), incrementVersion(businessMetadataDef.getTypeVersion()), null, null, null);
                    long next = this.tsGenerator.next();
                    this.kafkaCache.put(metadataRegistryKey, new BusinessMetadataDefValue(businessMetadataDef, atlasBusinessMetadataDef, false, false, next));
                    MetadataRegistryValue metadataRegistryValue = (MetadataRegistryValue) this.kafkaCache.get(metadataRegistryKey);
                    if (metadataRegistryValue == null || metadataRegistryValue.getTimestamp() != next) {
                        ObjectWithException objectWithException = (ObjectWithException) this.errorCache.get(Long.valueOf(next));
                        arrayList.add(new BusinessMetadataDefResponse(atlasBusinessMetadataDef, ExceptionMapperUtil.toErrorMessage(objectWithException != null ? objectWithException.getException() : UNKNOWN_ERROR)));
                    } else {
                        arrayList.add(new BusinessMetadataDefResponse(((BusinessMetadataDefValue) metadataRegistryValue).getNewValue()));
                    }
                } catch (AtlasBaseException e) {
                    arrayList.add(new BusinessMetadataDefResponse(atlasBusinessMetadataDef, ExceptionMapperUtil.toErrorMessage(e)));
                }
            }
        }
        return arrayList;
    }

    public void deleteTagDefOrForward(String str, String str2, Map<String, String> map) throws SchemaRegistryException, AtlasBaseException {
        lock(str, map);
        try {
            if (isLeader(map)) {
                deleteTagDef(str, str2);
            } else {
                if (this.schemaRegistry.leaderIdentity() == null) {
                    throw new UnknownLeaderException("Request failed since leader is unknown");
                }
                forwardDeleteTagDefRequestToLeader(str2, map);
            }
        } finally {
            unlock(str);
        }
    }

    private void forwardDeleteTagDefRequestToLeader(String str, Map<String, String> map) throws SchemaRegistryRequestForwardingException {
        RestService leaderRestService = this.schemaRegistry.leaderRestService();
        UrlList baseUrls = leaderRestService.getBaseUrls();
        String uri = UriBuilder.fromPath("/catalog/v1/types/tagdefs/{tagName}").build(new Object[]{str}).toString();
        log.debug(String.format("Forwarding delete tagdef request to %s", baseUrls));
        try {
            leaderRestService.httpRequest(uri, "DELETE", (byte[]) null, map, VOID_TYPE);
        } catch (RestClientException e) {
            throw new RestException(e.getMessage(), e.getStatus(), e.getErrorCode(), e);
        } catch (IOException e2) {
            throw new SchemaRegistryRequestForwardingException(String.format("Unexpected error while forwarding the delete tagdef request to %s", baseUrls), e2);
        }
    }

    private void deleteTagDef(String str, String str2) throws AtlasBaseException {
        this.kafkaCache.sync();
        TagDef tagDef = null;
        try {
            tagDef = getTagDef(str, str2);
        } catch (AtlasBaseException e) {
        }
        if (tagDef == null) {
            throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, new String[]{str2});
        }
        MetadataRegistryKey metadataRegistryKey = new MetadataRegistryKey(str, MetadataRegistryOp.DELETE, MetadataRegistryKind.TAG_DEF, str2, null, null, null, null);
        long next = this.tsGenerator.next();
        this.kafkaCache.put(metadataRegistryKey, new TagDefValue(tagDef, null, false, false, next));
        MetadataRegistryValue metadataRegistryValue = (MetadataRegistryValue) this.kafkaCache.get(metadataRegistryKey);
        if (metadataRegistryValue == null || metadataRegistryValue.getTimestamp() != next) {
            ObjectWithException objectWithException = (ObjectWithException) this.errorCache.get(Long.valueOf(next));
            if (objectWithException == null) {
                throw UNKNOWN_ERROR;
            }
            throw objectWithException.getException();
        }
        this.kafkaCache.put(metadataRegistryKey, (Object) null);
        String typeVersion = tagDef.getTypeVersion();
        while (true) {
            String str3 = typeVersion;
            if (str3 == null || str3.equals(DEFAULT_VERSION)) {
                break;
            }
            this.kafkaCache.put(new MetadataRegistryKey(str, MetadataRegistryOp.UPDATE, MetadataRegistryKind.TAG_DEF, str2, str3, null, null, null), (Object) null);
            typeVersion = decrementVersion(str3);
        }
        this.kafkaCache.put(new MetadataRegistryKey(str, MetadataRegistryOp.CREATE, MetadataRegistryKind.TAG_DEF, str2, DEFAULT_VERSION, null, null, null), (Object) null);
    }

    public void deleteBusinessMetadataDefOrForward(String str, String str2, Map<String, String> map) throws SchemaRegistryException, AtlasBaseException {
        lock(str, map);
        try {
            if (isLeader(map)) {
                deleteBusinessMetadataDef(str, str2);
            } else {
                if (this.schemaRegistry.leaderIdentity() == null) {
                    throw new UnknownLeaderException("Request failed since leader is unknown");
                }
                forwardDeleteBusinessMetadataDefRequestToLeader(str2, map);
            }
        } finally {
            unlock(str);
        }
    }

    private void forwardDeleteBusinessMetadataDefRequestToLeader(String str, Map<String, String> map) throws SchemaRegistryRequestForwardingException {
        RestService leaderRestService = this.schemaRegistry.leaderRestService();
        UrlList baseUrls = leaderRestService.getBaseUrls();
        String uri = UriBuilder.fromPath("/catalog/v1/types/businessmetadatadefs/{bmName}").build(new Object[]{str}).toString();
        log.debug(String.format("Forwarding delete bmdef request to %s", baseUrls));
        try {
            leaderRestService.httpRequest(uri, "DELETE", (byte[]) null, map, VOID_TYPE);
        } catch (RestClientException e) {
            throw new RestException(e.getMessage(), e.getStatus(), e.getErrorCode(), e);
        } catch (IOException e2) {
            throw new SchemaRegistryRequestForwardingException(String.format("Unexpected error while forwarding the delete bmdef request to %s", baseUrls), e2);
        }
    }

    private void deleteBusinessMetadataDef(String str, String str2) throws AtlasBaseException {
        this.kafkaCache.sync();
        AtlasBusinessMetadataDef atlasBusinessMetadataDef = null;
        try {
            atlasBusinessMetadataDef = getBusinessMetadataDef(str, str2);
        } catch (AtlasBaseException e) {
        }
        if (atlasBusinessMetadataDef == null) {
            throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, new String[]{str2});
        }
        MetadataRegistryKey metadataRegistryKey = new MetadataRegistryKey(str, MetadataRegistryOp.DELETE, MetadataRegistryKind.BUSINESS_METADATA_DEF, str2, null, null, null, null);
        long next = this.tsGenerator.next();
        this.kafkaCache.put(metadataRegistryKey, new BusinessMetadataDefValue(atlasBusinessMetadataDef, null, false, false, next));
        MetadataRegistryValue metadataRegistryValue = (MetadataRegistryValue) this.kafkaCache.get(metadataRegistryKey);
        if (metadataRegistryValue == null || metadataRegistryValue.getTimestamp() != next) {
            ObjectWithException objectWithException = (ObjectWithException) this.errorCache.get(Long.valueOf(next));
            if (objectWithException == null) {
                throw UNKNOWN_ERROR;
            }
            throw objectWithException.getException();
        }
        this.kafkaCache.put(metadataRegistryKey, (Object) null);
        String typeVersion = atlasBusinessMetadataDef.getTypeVersion();
        while (true) {
            String str3 = typeVersion;
            if (str3 == null || str3.equals(DEFAULT_VERSION)) {
                break;
            }
            this.kafkaCache.put(new MetadataRegistryKey(str, MetadataRegistryOp.UPDATE, MetadataRegistryKind.BUSINESS_METADATA_DEF, str2, str3, null, null, null), (Object) null);
            typeVersion = decrementVersion(str3);
        }
        this.kafkaCache.put(new MetadataRegistryKey(str, MetadataRegistryOp.CREATE, MetadataRegistryKind.BUSINESS_METADATA_DEF, str2, DEFAULT_VERSION, null, null, null), (Object) null);
    }

    public EntityMutationResponse createOrUpdateEntityOrForward(String str, AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo, Map<String, String> map) throws SchemaRegistryException, AtlasBaseException {
        lock(str, map);
        try {
            if (isLeader(map)) {
                EntityMutationResponse createOrUpdateEntityWithResponse = createOrUpdateEntityWithResponse(str, atlasEntityWithExtInfo);
                unlock(str);
                return createOrUpdateEntityWithResponse;
            }
            if (this.schemaRegistry.leaderIdentity() == null) {
                throw new UnknownLeaderException("Request failed since leader is unknown");
            }
            EntityMutationResponse forwardCreateOrUpdateEntityRequestToLeader = forwardCreateOrUpdateEntityRequestToLeader(atlasEntityWithExtInfo, map);
            unlock(str);
            return forwardCreateOrUpdateEntityRequestToLeader;
        } catch (Throwable th) {
            unlock(str);
            throw th;
        }
    }

    private EntityMutationResponse forwardCreateOrUpdateEntityRequestToLeader(AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo, Map<String, String> map) throws SchemaRegistryRequestForwardingException {
        RestService leaderRestService = this.schemaRegistry.leaderRestService();
        UrlList baseUrls = leaderRestService.getBaseUrls();
        String uri = UriBuilder.fromPath("/catalog/v1/entity").build(new Object[0]).toString();
        log.debug(String.format("Forwarding create/update entity request to %s", baseUrls));
        try {
            return (EntityMutationResponse) leaderRestService.httpRequest(uri, "POST", toJson(atlasEntityWithExtInfo), map, ENTITY_MUTATION_RESPONSE_TYPE);
        } catch (IOException e) {
            throw new SchemaRegistryRequestForwardingException(String.format("Unexpected error while forwarding the create/update entity request to %s", baseUrls), e);
        } catch (RestClientException e2) {
            throw new RestException(e2.getMessage(), e2.getStatus(), e2.getErrorCode(), e2);
        }
    }

    private EntityMutationResponse createOrUpdateEntityWithResponse(String str, AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo) throws AtlasBaseException {
        return new EntityMutationResponse(Collections.singletonMap(((AtlasEntity.AtlasEntityWithExtInfo) ((MetadataRegistryValue) createOrUpdateEntity(str, atlasEntityWithExtInfo).value).getOldValue()) != null ? EntityMutations.EntityOperation.UPDATE : EntityMutations.EntityOperation.CREATE, Collections.singletonList(new AtlasEntityHeader(getEntity(str, atlasEntityWithExtInfo.getEntity().getTypeName(), (String) atlasEntityWithExtInfo.getEntity().getAttribute(ModelConstants.ATTR_QUALIFIED_NAME), false, false).getEntity()))));
    }

    public KeyValue<MetadataRegistryKey, MetadataRegistryValue<AtlasEntity.AtlasEntityWithExtInfo>> createOrUpdateEntity(String str, AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo) throws AtlasBaseException {
        this.kafkaCache.sync();
        KeyValue<MetadataRegistryKey, MetadataRegistryValue<AtlasEntity.AtlasEntityWithExtInfo>> keyValue = toKeyValue(str, atlasEntityWithExtInfo, null, false);
        MetadataRegistryKey metadataRegistryKey = (MetadataRegistryKey) keyValue.key;
        MetadataRegistryValue metadataRegistryValue = (MetadataRegistryValue) keyValue.value;
        long timestamp = metadataRegistryValue.getTimestamp();
        this.kafkaCache.put(metadataRegistryKey, metadataRegistryValue);
        MetadataRegistryValue metadataRegistryValue2 = (MetadataRegistryValue) this.kafkaCache.get(metadataRegistryKey);
        if (metadataRegistryValue2 != null && metadataRegistryValue2.getTimestamp() == timestamp) {
            return keyValue;
        }
        ObjectWithException objectWithException = (ObjectWithException) this.errorCache.get(Long.valueOf(timestamp));
        throw (objectWithException != null ? objectWithException.getException() : UNKNOWN_ERROR);
    }

    public void createOrUpdateEntities(String str, List<AtlasEntity.AtlasEntityWithExtInfo> list, BiPredicate<AtlasEntity.AtlasEntityWithExtInfo, AtlasEntity.AtlasEntityWithExtInfo> biPredicate) throws AtlasBaseException {
        this.kafkaCache.sync();
        Map map = (Map) list.stream().map(atlasEntityWithExtInfo -> {
            return toKeyValue(str, atlasEntityWithExtInfo, biPredicate, true);
        }).filter(keyValue -> {
            return keyValue.key != null;
        }).collect(Collectors.toMap(keyValue2 -> {
            return (MetadataRegistryKey) keyValue2.key;
        }, keyValue3 -> {
            return (MetadataRegistryValue) keyValue3.value;
        }, (metadataRegistryValue, metadataRegistryValue2) -> {
            return metadataRegistryValue2;
        }, LinkedHashMap::new));
        if (map.size() != 1) {
            this.kafkaCache.putAll(map);
        } else {
            Map.Entry entry = (Map.Entry) map.entrySet().iterator().next();
            this.kafkaCache.put(entry.getKey(), entry.getValue());
        }
    }

    public List<SchemaTagsResponse> updateSchemaMetadataOrForward(String str, List<SchemaMetadata> list, Map<String, String> map) throws SchemaRegistryException, AtlasBaseException {
        lock(str, map);
        try {
            if (isLeader(map)) {
                List<SchemaTagsResponse> updateSchemaMetadata = updateSchemaMetadata(str, list);
                unlock(str);
                return updateSchemaMetadata;
            }
            if (this.schemaRegistry.leaderIdentity() == null) {
                throw new UnknownLeaderException("Request failed since leader is unknown");
            }
            List<SchemaTagsResponse> forwardUpdateSchemaTagsRequestToLeader = forwardUpdateSchemaTagsRequestToLeader(list, map);
            unlock(str);
            return forwardUpdateSchemaTagsRequestToLeader;
        } catch (Throwable th) {
            unlock(str);
            throw th;
        }
    }

    public List<SchemaTagsResponse> updateSchemaMetadata(String str, List<SchemaMetadata> list) {
        this.kafkaCache.sync();
        ArrayList arrayList = new ArrayList(list.size());
        AtlasEntityType entityTypeByName = this.typeRegistry.getEntityTypeByName(SchemaAtlasTypes.SR_SCHEMA.getName());
        for (SchemaMetadata schemaMetadata : list) {
            String subject = schemaMetadata.getSubject();
            int intValue = schemaMetadata.getVersion() != null ? schemaMetadata.getVersion().intValue() : 0;
            ErrorMessage errorMessage = null;
            Integer num = null;
            try {
                ArrayList arrayList2 = new ArrayList(schemaMetadata.getTagsToAdd());
                Iterator<SchemaTags> it = arrayList2.iterator();
                while (it.hasNext()) {
                    Set<String> checkTags = checkTags(str, new HashSet(it.next().getTags()));
                    if (!checkTags.isEmpty()) {
                        throw new RestInvalidTagException(checkTags);
                        break;
                    }
                }
                ParsedSchema schemaBySubjectAndId = this.schemaRegistryClient.getSchemaBySubjectAndId(subject, schemaMetadata.getId().intValue());
                String qualifiedName = QualifiedNameGenerator.getQualifiedName(str, QualifiedSubject.contextFor(str, subject), schemaMetadata.getId());
                Object attribute = getByUniqueAttributes(entityTypeByName, Collections.singletonMap(ModelConstants.ATTR_QUALIFIED_NAME, qualifiedName), false, false).getEntity().getAttribute(ModelConstants.ATTR_EMBEDDED_TAGS);
                if (attribute == null || ((List) attribute).isEmpty()) {
                    arrayList2.addAll(getOldCatalogTags(str, qualifiedName));
                }
                Metadata metadata = new Metadata(Collections.emptyMap(), Collections.singletonMap(CONFLUENT_VERSION, String.valueOf(intValue)), Collections.emptySet());
                ParsedSchema copy = schemaBySubjectAndId.copy(schemaTagsListToMap(arrayList2), schemaTagsListToMap(schemaMetadata.getTagsToRemove())).copy(schemaMetadata.getMetadata() == null ? Metadata.mergeMetadata(schemaBySubjectAndId.metadata(), metadata) : Metadata.mergeMetadata(schemaMetadata.getMetadata(), metadata), schemaMetadata.getRuleSet()).copy(Integer.valueOf(intValue));
                num = (Integer) this.retryExecutor.retry(() -> {
                    return Integer.valueOf(this.schemaRegistryClient.register(subject, copy, intValue, -1));
                });
            } catch (RestInvalidTagException e) {
                errorMessage = new ErrorMessage(e.getErrorCode(), e.getMessage());
            } catch (RestNotFoundException e2) {
                errorMessage = new ErrorMessage(e2.getErrorCode(), e2.getMessage());
            } catch (AtlasBaseException e3) {
                errorMessage = ExceptionMapperUtil.toErrorMessage(e3);
            } catch (RestClientException e4) {
                errorMessage = new ErrorMessage(e4.getErrorCode(), e4.getMessage());
            } catch (IOException e5) {
                throw new RuntimeException(e5);
            } catch (IllegalArgumentException e6) {
                errorMessage = new ErrorMessage(404, e6.getMessage());
            } catch (RestInvalidSchemaException e7) {
                errorMessage = new ErrorMessage(e7.getErrorCode(), "Cannot create schema tags on non-latest subject version.");
            }
            arrayList.add(new SchemaTagsResponse(subject, num, errorMessage));
        }
        return arrayList;
    }

    @Deprecated
    public List<SchemaTags> getOldCatalogTags(String str, String str2) throws AtlasBaseException {
        ArrayList arrayList = new ArrayList();
        List<AtlasEntityHeader> searchEntityByQualifiedNamePrefix = this.searchService.searchEntityByQualifiedNamePrefix(searchTypesForTags, str2, Collections.singleton(ModelConstants.ATTR_QUALIFIED_NAME));
        if (searchEntityByQualifiedNamePrefix != null && !searchEntityByQualifiedNamePrefix.isEmpty()) {
            for (AtlasEntityHeader atlasEntityHeader : searchEntityByQualifiedNamePrefix) {
                List classificationNames = atlasEntityHeader.getClassificationNames();
                if (classificationNames != null && !classificationNames.isEmpty()) {
                    String str3 = (String) atlasEntityHeader.getAttribute(ModelConstants.ATTR_QUALIFIED_NAME);
                    arrayList.add(new SchemaTags(new SchemaEntity(str3.substring(str3.lastIndexOf(QualifiedNameGenerator.NAME_DELIMITER) + 1), SchemaEntity.EntityType.get(atlasEntityHeader.getTypeName())), (List) classificationNames.stream().map(str4 -> {
                        return QualifiedNameGenerator.stripTypeTenantPrefix(str, str4);
                    }).collect(Collectors.toList())));
                }
            }
        }
        return arrayList;
    }

    public List<io.confluent.kafka.schemaregistry.client.rest.entities.SchemaTags> getCatalogTags(String str, String str2) throws AtlasBaseException {
        ArrayList arrayList = new ArrayList();
        List<AtlasEntityHeader> searchEntityByQualifiedNamePrefix = this.searchService.searchEntityByQualifiedNamePrefix(searchTypesForTags, str2, Collections.singleton(ModelConstants.ATTR_QUALIFIED_NAME));
        if (searchEntityByQualifiedNamePrefix != null && !searchEntityByQualifiedNamePrefix.isEmpty()) {
            for (AtlasEntityHeader atlasEntityHeader : searchEntityByQualifiedNamePrefix) {
                List classificationNames = atlasEntityHeader.getClassificationNames();
                if (classificationNames != null && !classificationNames.isEmpty()) {
                    String str3 = (String) atlasEntityHeader.getAttribute(ModelConstants.ATTR_QUALIFIED_NAME);
                    arrayList.add(new io.confluent.kafka.schemaregistry.client.rest.entities.SchemaTags(new SchemaEntity(str3.substring(str3.lastIndexOf(QualifiedNameGenerator.NAME_DELIMITER) + 1), SchemaEntity.EntityType.get(atlasEntityHeader.getTypeName())), (List) classificationNames.stream().map(str4 -> {
                        return QualifiedNameGenerator.stripTypeTenantPrefix(str, str4);
                    }).collect(Collectors.toList())));
                }
            }
        }
        return arrayList;
    }

    private boolean allowTagOperation(Tag tag) throws AtlasBaseException {
        List list;
        SchemaAtlasTypes schemaAtlasTypes = SchemaAtlasTypes.get(tag.getEntityType());
        if (schemaAtlasTypes != SchemaAtlasTypes.SR_FIELD && schemaAtlasTypes != SchemaAtlasTypes.SR_RECORD) {
            return true;
        }
        String[] parseQualifiedName = QualifiedNameGenerator.parseQualifiedName(tag.getEntityName());
        List<AtlasEntityHeader> searchEntityByQualifiedNamePrefix = this.searchService.searchEntityByQualifiedNamePrefix(SchemaAtlasTypes.SR_SCHEMA.getName(), QualifiedNameGenerator.getQualifiedName((String[]) Arrays.copyOfRange(parseQualifiedName, 0, parseQualifiedName.length - 1)), Collections.singleton(ModelConstants.ATTR_EMBEDDED_TAGS));
        return searchEntityByQualifiedNamePrefix == null || searchEntityByQualifiedNamePrefix.isEmpty() || (list = (List) searchEntityByQualifiedNamePrefix.get(0).getAttribute(ModelConstants.ATTR_EMBEDDED_TAGS)) == null || list.isEmpty();
    }

    public void reconcileEntities(String str, String str2, String str3, Set<String> set) throws AtlasBaseException {
        this.kafkaCache.sync();
        MetadataRegistryKey metadataRegistryKey = new MetadataRegistryKey(str2, MetadataRegistryOp.RECONCILE, MetadataRegistryKind.ENTITY_SNAPSHOT, str, null, str3, null, null);
        long next = this.tsGenerator.next();
        try {
            this.kafkaCache.put(metadataRegistryKey, new EntitySnapshotValue(new EntitySnapshot(set), next));
            MetadataRegistryValue metadataRegistryValue = (MetadataRegistryValue) this.kafkaCache.get(metadataRegistryKey);
            if (metadataRegistryValue == null || metadataRegistryValue.getTimestamp() != next) {
                ObjectWithException objectWithException = (ObjectWithException) this.errorCache.get(Long.valueOf(next));
                if (objectWithException == null) {
                    throw UNKNOWN_ERROR;
                }
            }
        } catch (EntryTooLargeException e) {
            log.error(String.format("Reconcile Error, The size %s (before serialization) of set allEntitiesInSnapshot is too large, metadata type %s, tenant %s, group %s.", Integer.valueOf(set.size()), str, str2, str3), e);
        }
    }

    public Set<String> checkTags(String str, Set<String> set) {
        if (set.isEmpty()) {
            return Collections.emptySet();
        }
        HashSet hashSet = new HashSet();
        for (String str2 : set) {
            TagDef tagDef = null;
            try {
                tagDef = getTagDef(str, QualifiedNameGenerator.ensureTypeTenantPrefix(str, str2));
            } catch (Exception e) {
            }
            if (tagDef == null) {
                hashSet.add(str2);
            }
        }
        return hashSet;
    }

    public List<SchemaTagsResponse> forwardUpdateSchemaTagsRequestToLeader(List<SchemaMetadata> list, Map<String, String> map) throws SchemaRegistryRequestForwardingException {
        RestService leaderRestService = this.schemaRegistry.leaderRestService();
        UrlList baseUrls = leaderRestService.getBaseUrls();
        String uri = UriBuilder.fromPath("/catalog/v1/entity/schematags").build(new Object[0]).toString();
        log.debug(String.format("Forwarding update schema tags request to %s", baseUrls));
        try {
            return (List) leaderRestService.httpRequest(uri, "POST", toJson(list), map, LIST_SCHEMA_TAGS_TYPE);
        } catch (IOException e) {
            throw new SchemaRegistryRequestForwardingException(String.format("Unexpected error while forwarding the update schema tags request to %s", baseUrls), e);
        } catch (RestClientException e2) {
            throw new RestException(e2.getMessage(), e2.getStatus(), e2.getErrorCode(), e2);
        }
    }

    private Map<SchemaEntity, Set<String>> schemaTagsListToMap(List<SchemaTags> list) {
        return (Map) list.stream().collect(Collectors.toMap((v0) -> {
            return v0.getSchemaEntity();
        }, schemaTags -> {
            return new HashSet(schemaTags.getTags());
        }, (set, set2) -> {
            set.addAll(set2);
            return set;
        }));
    }

    private KeyValue<MetadataRegistryKey, MetadataRegistryValue<AtlasEntity.AtlasEntityWithExtInfo>> toKeyValue(String str, AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo, BiPredicate<AtlasEntity.AtlasEntityWithExtInfo, AtlasEntity.AtlasEntityWithExtInfo> biPredicate, boolean z) {
        MetadataRegistryOp metadataRegistryOp;
        AtlasEntity entity = atlasEntityWithExtInfo.getEntity();
        String typeName = entity.getTypeName();
        String str2 = (String) entity.getAttribute(ModelConstants.ATTR_QUALIFIED_NAME);
        AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo2 = null;
        try {
            atlasEntityWithExtInfo2 = getOldEntity(str, typeName, str2, false, false);
            replaceGuidsWithUniqAttrs(atlasEntityWithExtInfo2);
        } catch (AtlasBaseException e) {
        }
        if (atlasEntityWithExtInfo2 != null) {
            metadataRegistryOp = MetadataRegistryOp.UPDATE;
            AtlasEntity entity2 = atlasEntityWithExtInfo2.getEntity();
            if (entity.getAttribute(ModelConstants.ATTR_CREATE_TIME) == null || entity.getAttribute(ModelConstants.ATTR_CREATE_TIME).equals(ModelConstants.UNIX_ZERO_EPOCH)) {
                entity.setAttribute(ModelConstants.ATTR_CREATE_TIME, entity2.getAttribute(ModelConstants.ATTR_CREATE_TIME));
            }
            if (entity.getAttribute(ModelConstants.ATTR_UPDATE_TIME) == null || entity.getAttribute(ModelConstants.ATTR_UPDATE_TIME).equals(ModelConstants.UNIX_ZERO_EPOCH)) {
                if (entity2.getAttribute(ModelConstants.ATTR_UPDATE_TIME) != null && !entity2.getAttribute(ModelConstants.ATTR_UPDATE_TIME).equals(ModelConstants.UNIX_ZERO_EPOCH)) {
                    entity.setAttribute(ModelConstants.ATTR_UPDATE_TIME, entity2.getAttribute(ModelConstants.ATTR_UPDATE_TIME));
                } else if (entity2.getAttribute(ModelConstants.ATTR_CREATE_TIME) == null || entity2.getAttribute(ModelConstants.ATTR_CREATE_TIME).equals(ModelConstants.UNIX_ZERO_EPOCH)) {
                    entity.setAttribute(ModelConstants.ATTR_UPDATE_TIME, entity.getAttribute(ModelConstants.ATTR_CREATE_TIME));
                } else {
                    entity.setAttribute(ModelConstants.ATTR_UPDATE_TIME, entity2.getAttribute(ModelConstants.ATTR_CREATE_TIME));
                }
            }
        } else {
            metadataRegistryOp = MetadataRegistryOp.CREATE;
            if (entity.getAttribute(ModelConstants.ATTR_CREATE_TIME) == null) {
                entity.setAttribute(ModelConstants.ATTR_CREATE_TIME, ModelConstants.UNIX_ZERO_EPOCH);
            }
            if (entity.getAttribute(ModelConstants.ATTR_UPDATE_TIME) == null || entity.getAttribute(ModelConstants.ATTR_UPDATE_TIME).equals(ModelConstants.UNIX_ZERO_EPOCH)) {
                entity.setAttribute(ModelConstants.ATTR_UPDATE_TIME, entity.getAttribute(ModelConstants.ATTR_CREATE_TIME));
            }
        }
        replaceGuidsWithUniqAttrs(atlasEntityWithExtInfo);
        MetadataRegistryKey metadataRegistryKey = null;
        if (biPredicate == null || biPredicate.test(atlasEntityWithExtInfo2, atlasEntityWithExtInfo)) {
            metadataRegistryKey = new MetadataRegistryKey(str, metadataRegistryOp, MetadataRegistryKind.ENTITY, typeName, null, str2, null, null);
        }
        return new KeyValue<>(metadataRegistryKey, new EntityValue(atlasEntityWithExtInfo2, atlasEntityWithExtInfo, true, z, this.tsGenerator.next()));
    }

    private void replaceGuidsWithUniqAttrs(AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo) {
        if (atlasEntityWithExtInfo == null || atlasEntityWithExtInfo.getEntity() == null) {
            return;
        }
        Map relationshipAttributes = atlasEntityWithExtInfo.getEntity().getRelationshipAttributes();
        if (relationshipAttributes != null) {
            relationshipAttributes.entrySet().removeIf(entry -> {
                return isContainer(atlasEntityWithExtInfo, (String) entry.getKey());
            });
            relationshipAttributes.replaceAll((str, obj) -> {
                return replaceGuidsWithUniqAttrsInRel(atlasEntityWithExtInfo, obj);
            });
        }
        atlasEntityWithExtInfo.setReferredEntities((Map) null);
        atlasEntityWithExtInfo.getEntity().setGuid((String) null);
    }

    private Object replaceGuidsWithUniqAttrsInRel(AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo, Object obj) {
        return ((obj instanceof AtlasRelatedObjectId) || (obj instanceof Map)) ? toRelatedObjectIdWithUniqAttr(atlasEntityWithExtInfo, obj) : obj instanceof Collection ? ((Collection) obj).stream().map(obj2 -> {
            return toRelatedObjectIdWithUniqAttr(atlasEntityWithExtInfo, obj2);
        }).collect(Collectors.toList()) : obj;
    }

    private boolean isContainer(AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo, String str) {
        AtlasStructType.AtlasAttribute attribute;
        AtlasEntityType entityTypeByName = this.typeRegistry.getEntityTypeByName(atlasEntityWithExtInfo.getEntity().getTypeName());
        if (entityTypeByName == null || (attribute = entityTypeByName.getAttribute(str)) == null) {
            return false;
        }
        AtlasRelationshipType relationshipTypeByName = this.typeRegistry.getRelationshipTypeByName(attribute.getRelationshipName());
        if (relationshipTypeByName == null) {
            return false;
        }
        AtlasRelationshipDef relationshipDef = relationshipTypeByName.getRelationshipDef();
        return (attribute.getRelationshipEdgeDirection() == AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.OUT ? relationshipDef.getEndDef1() : relationshipDef.getEndDef2()).getIsContainer();
    }

    public EntityMutationResponse partiallyUpdateEntityOrForward(String str, AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo, Map<String, String> map) throws SchemaRegistryException, AtlasBaseException {
        lock(str, map);
        try {
            if (isLeader(map)) {
                EntityMutationResponse partiallyUpdateEntity = partiallyUpdateEntity(str, atlasEntityWithExtInfo);
                unlock(str);
                return partiallyUpdateEntity;
            }
            if (this.schemaRegistry.leaderIdentity() == null) {
                throw new UnknownLeaderException("Request failed since leader is unknown");
            }
            EntityMutationResponse forwardPartiallyUpdateEntityRequestToLeader = forwardPartiallyUpdateEntityRequestToLeader(atlasEntityWithExtInfo, map);
            unlock(str);
            return forwardPartiallyUpdateEntityRequestToLeader;
        } catch (Throwable th) {
            unlock(str);
            throw th;
        }
    }

    private EntityMutationResponse forwardPartiallyUpdateEntityRequestToLeader(AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo, Map<String, String> map) throws SchemaRegistryRequestForwardingException {
        RestService leaderRestService = this.schemaRegistry.leaderRestService();
        UrlList baseUrls = leaderRestService.getBaseUrls();
        String uri = UriBuilder.fromPath("/catalog/v1/entity").build(new Object[0]).toString();
        log.debug(String.format("Forwarding create/update entity request to %s", baseUrls));
        try {
            return (EntityMutationResponse) leaderRestService.httpRequest(uri, "PUT", toJson(atlasEntityWithExtInfo), map, ENTITY_MUTATION_RESPONSE_TYPE);
        } catch (IOException e) {
            throw new SchemaRegistryRequestForwardingException(String.format("Unexpected error while forwarding the create/update entity request to %s", baseUrls), e);
        } catch (RestClientException e2) {
            throw new RestException(e2.getMessage(), e2.getStatus(), e2.getErrorCode(), e2);
        }
    }

    public void republishOrForward(String str, String str2, Map<String, String> map) throws SchemaRegistryRequestForwardingException {
        SchemaRegistryConfig config = this.schemaRegistry.config();
        String string = config.getString("host.name");
        try {
            DataCatalogConfig dataCatalogConfig = new DataCatalogConfig(config.originalProperties());
            boolean z = string != null && StringUtils.isNotEmpty(dataCatalogConfig.getCatalogNotificationsDesignatedHostPrefix()) && string.startsWith(dataCatalogConfig.getCatalogNotificationsDesignatedHostPrefix());
            SchemaAtlasHook schemaAtlasHook = (SchemaAtlasHook) this.schemaRegistry.properties().get(SchemaAtlasHook.KEY);
            if (z) {
                this.snapshotManager.submitSnapshotRequest(str, str2, schemaAtlasHook);
            } else {
                forwardRepublishRequestToLeader(str2, map);
            }
        } catch (RestConfigException e) {
            throw new IllegalArgumentException("Could not instantiate dataCatalogConfig", e);
        }
    }

    private void forwardRepublishRequestToLeader(String str, Map<String, String> map) throws SchemaRegistryRequestForwardingException {
        SchemaRegistryConfig config = this.schemaRegistry.config();
        try {
            DataCatalogConfig dataCatalogConfig = new DataCatalogConfig(config.originalProperties());
            String string = config.getString("host.name");
            new CatalogRestService("https://" + dataCatalogConfig.getCatalogNotificationsDesignatedHostPrefix() + string.substring(string.indexOf(46))).postEntitySnapshot(str, map);
        } catch (RestConfigException e) {
            throw new IllegalArgumentException("Could not instantiate dataCatalogConfig", e);
        }
    }

    public EntityMutationResponse partiallyUpdateEntity(String str, AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo) throws AtlasBaseException {
        if (atlasEntityWithExtInfo.getEntity().getAttribute(ModelConstants.ATTR_DESCRIPTION) != null || atlasEntityWithExtInfo.getEntity().getAttribute(ModelConstants.ATTR_OWNER) != null || atlasEntityWithExtInfo.getEntity().getAttribute(ModelConstants.ATTR_OWNER_EMAIL) != null) {
            this.metricsManager.recordEntityPartialUpdate(str, 1);
        }
        this.kafkaCache.sync();
        String typeName = atlasEntityWithExtInfo.getEntity().getTypeName();
        String str2 = (String) atlasEntityWithExtInfo.getEntity().getAttribute(ModelConstants.ATTR_QUALIFIED_NAME);
        AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo2 = null;
        try {
            atlasEntityWithExtInfo2 = getOldEntity(str, typeName, str2, false, false);
            replaceGuidsWithUniqAttrs(atlasEntityWithExtInfo2);
        } catch (AtlasBaseException e) {
        }
        if (atlasEntityWithExtInfo2 == null) {
            throw new AtlasBaseException(AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND, new String[]{typeName, str2});
        }
        replaceGuidsWithUniqAttrs(atlasEntityWithExtInfo);
        MetadataRegistryKey metadataRegistryKey = new MetadataRegistryKey(str, MetadataRegistryOp.UPDATE, MetadataRegistryKind.ENTITY, typeName, null, str2, null, null);
        long next = this.tsGenerator.next();
        this.kafkaCache.put(metadataRegistryKey, new EntityValue(atlasEntityWithExtInfo2, atlasEntityWithExtInfo, true, false, next));
        MetadataRegistryValue metadataRegistryValue = (MetadataRegistryValue) this.kafkaCache.get(metadataRegistryKey);
        if (metadataRegistryValue != null && metadataRegistryValue.getTimestamp() == next) {
            return new EntityMutationResponse(Collections.singletonMap(EntityMutations.EntityOperation.UPDATE, Collections.singletonList(new AtlasEntityHeader(getEntity(str, typeName, str2, false, false).getEntity()))));
        }
        ObjectWithException objectWithException = (ObjectWithException) this.errorCache.get(Long.valueOf(next));
        throw (objectWithException != null ? objectWithException.getException() : UNKNOWN_ERROR);
    }

    private AtlasEntity.AtlasEntityWithExtInfo getOldEntity(String str, String str2, String str3, boolean z, boolean z2) throws AtlasBaseException {
        try {
            AtlasEntity.AtlasEntityWithExtInfo entity = getEntity(str, str2, str3, z, z2);
            return entity != null ? entity : entity;
        } catch (AtlasBaseException e) {
            AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo = pendingCache().get(QualifiedNameGenerator.ensureEntityTenantPrefix(str, str2, str3));
            if (atlasEntityWithExtInfo != null) {
                return new AtlasEntity.AtlasEntityWithExtInfo(new AtlasEntity(atlasEntityWithExtInfo.getEntity()));
            }
            throw e;
        }
    }

    public AtlasEntity.AtlasEntityWithExtInfo getEntity(String str, String str2, String str3, boolean z, boolean z2) throws AtlasBaseException {
        AtlasEntity.AtlasEntityWithExtInfo byUniqueAttributes = getByUniqueAttributes(ensureEntityType(str2), Collections.singletonMap(ModelConstants.ATTR_QUALIFIED_NAME, QualifiedNameGenerator.ensureEntityTenantPrefix(str, str2, str3)), z, z2);
        validateEntity(str, byUniqueAttributes, AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND, str2, str3);
        return byUniqueAttributes;
    }

    private void validateEntity(String str, AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo, AtlasErrorCode atlasErrorCode, String... strArr) throws AtlasBaseException {
        if (!str.equals((String) atlasEntityWithExtInfo.getEntity().getAttribute("tenant"))) {
            throw new AtlasBaseException(atlasErrorCode, strArr);
        }
    }

    public String getGuid(String str, String str2, String str3) throws AtlasBaseException {
        try {
            return getGuidByUniqueAttributes(ensureEntityType(str2), Collections.singletonMap(ModelConstants.ATTR_QUALIFIED_NAME, QualifiedNameGenerator.ensureEntityTenantPrefix(str, str2, str3)));
        } catch (AtlasBaseException e) {
            if (isJanusGraphNullPointerException(e)) {
                log.error("JanusGraph NullPointerException in MetadataRegistry.getGuid(tenant={}, typeName={}, qualifiedName={})", new Object[]{str, str2, str3});
                this.metricsManager.recordJanusGraphNullPointerExceptionError(1);
            }
            throw e;
        }
    }

    public void deleteOrPurgeEntityOrForward(String str, String str2, String str3, boolean z, Map<String, String> map) throws SchemaRegistryException, AtlasBaseException {
        lock(str, map);
        try {
            if (isLeader(map)) {
                deleteOrPurgeEntity(str, str2, str3, z);
            } else {
                if (this.schemaRegistry.leaderIdentity() == null) {
                    throw new UnknownLeaderException("Request failed since leader is unknown");
                }
                forwardDeleteEntityRequestToLeader(str2, str3, z, map);
            }
        } finally {
            unlock(str);
        }
    }

    private void forwardDeleteEntityRequestToLeader(String str, String str2, boolean z, Map<String, String> map) throws SchemaRegistryRequestForwardingException {
        RestService leaderRestService = this.schemaRegistry.leaderRestService();
        UrlList baseUrls = leaderRestService.getBaseUrls();
        String uri = UriBuilder.fromPath("/catalog/v1/entity/type/{typeName}/name/{qualifiedName}").queryParam("purge", new Object[]{Boolean.valueOf(z)}).build(new Object[]{str, str2}).toString();
        log.debug(String.format("Forwarding delete entity request to %s", baseUrls));
        try {
            leaderRestService.httpRequest(uri, "DELETE", (byte[]) null, map, VOID_TYPE);
        } catch (IOException e) {
            throw new SchemaRegistryRequestForwardingException(String.format("Unexpected error while forwarding the delete entity request to %s", baseUrls), e);
        } catch (RestClientException e2) {
            throw new RestException(e2.getMessage(), e2.getStatus(), e2.getErrorCode(), e2);
        }
    }

    public void deleteEntity(String str, String str2, String str3) throws AtlasBaseException {
        deleteOrPurgeEntity(str, str2, str3, false);
    }

    public void purgeEntity(String str, String str2, String str3) throws AtlasBaseException {
        deleteOrPurgeEntity(str, str2, str3, true);
    }

    private void deleteOrPurgeEntity(String str, String str2, String str3, boolean z) throws AtlasBaseException {
        this.kafkaCache.sync();
        AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo = null;
        if (!z) {
            try {
                atlasEntityWithExtInfo = getOldEntity(str, str2, str3, false, false);
                replaceGuidsWithUniqAttrs(atlasEntityWithExtInfo);
            } catch (AtlasBaseException e) {
            }
            if (atlasEntityWithExtInfo == null) {
                throw new AtlasBaseException(AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND, new String[]{str2, str3});
            }
        }
        MetadataRegistryKey metadataRegistryKey = new MetadataRegistryKey(str, z ? MetadataRegistryOp.PURGE : MetadataRegistryOp.DELETE, MetadataRegistryKind.ENTITY, str2, null, str3, null, null);
        long next = this.tsGenerator.next();
        this.kafkaCache.put(metadataRegistryKey, new EntityValue(atlasEntityWithExtInfo, null, false, false, next));
        MetadataRegistryValue metadataRegistryValue = (MetadataRegistryValue) this.kafkaCache.get(metadataRegistryKey);
        if (metadataRegistryValue == null || metadataRegistryValue.getTimestamp() != next) {
            ObjectWithException objectWithException = (ObjectWithException) this.errorCache.get(Long.valueOf(next));
            throw (objectWithException != null ? objectWithException.getException() : UNKNOWN_ERROR);
        }
    }

    public static String incrementVersion(String str) {
        if (!str.startsWith(VERSION_PREFIX)) {
            throw new IllegalStateException("Unknown version");
        }
        return VERSION_PREFIX + (Integer.parseInt(str.substring(2)) + 1);
    }

    public static String decrementVersion(String str) {
        if (!str.startsWith(VERSION_PREFIX)) {
            throw new IllegalStateException("Unknown version");
        }
        int parseInt = Integer.parseInt(str.substring(2));
        if (parseInt > 0) {
            return VERSION_PREFIX + (parseInt - 1);
        }
        return null;
    }

    public boolean isEntityDeprecated(AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo) {
        Date dateAttribute = getDateAttribute(atlasEntityWithExtInfo.getEntity().getAttribute(ModelConstants.ATTR_DEPRECATED_TIME));
        return (dateAttribute == null || dateAttribute.equals(ModelConstants.UNIX_ZERO_EPOCH)) ? false : true;
    }

    public static Date getDateAttribute(Object obj) {
        if (obj == null) {
            return null;
        }
        Date date = null;
        if (obj instanceof Integer) {
            date = new Date(((Integer) obj).longValue());
        } else if (obj instanceof Long) {
            date = new Date(((Long) obj).longValue());
        } else if (obj instanceof Date) {
            date = (Date) obj;
        } else {
            log.error(String.format("unknown time format from atlas %s, %s", obj, obj.getClass()));
        }
        return date;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    @PreDestroy
    public void close() throws IOException {
        log.info("Shutting down metadata registry");
        if (this.kafkaCache != null) {
            this.kafkaCache.close();
        }
    }

    public ObjectWithException getObjectWithException(long j) {
        return (ObjectWithException) this.errorCache.get(Long.valueOf(j));
    }

    public void putObjectWithException(long j, Object obj, AtlasBaseException atlasBaseException) {
        this.errorCache.put(Long.valueOf(j), new ObjectWithException(obj, atlasBaseException));
    }

    private Object toRelatedObjectIdWithUniqAttr(AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo, Object obj) {
        if (obj instanceof Map) {
            obj = new AtlasRelatedObjectId((Map) obj);
        }
        if (obj instanceof AtlasRelatedObjectId) {
            AtlasRelatedObjectId atlasRelatedObjectId = (AtlasRelatedObjectId) obj;
            if (atlasRelatedObjectId.getGuid() != null) {
                String guid = atlasRelatedObjectId.getGuid();
                AtlasEntity entity = atlasEntityWithExtInfo.getEntity(guid);
                if (entity != null) {
                    return toRelatedObjectIdWithUniqAttr(atlasRelatedObjectId, entity);
                }
                try {
                    AtlasEntityHeader headerById = getHeaderById(guid);
                    if (headerById != null) {
                        return toRelatedObjectIdWithUniqAttr(atlasRelatedObjectId, headerById);
                    }
                } catch (AtlasBaseException e) {
                }
            }
        }
        return obj;
    }

    public static AtlasRelatedObjectId toRelatedObjectIdWithUniqAttr(AtlasEntity atlasEntity) {
        return toRelatedObjectIdWithUniqAttr((AtlasRelatedObjectId) null, atlasEntity);
    }

    public static AtlasRelatedObjectId toRelatedObjectIdWithUniqAttr(AtlasRelatedObjectId atlasRelatedObjectId, AtlasEntity atlasEntity) {
        return toRelatedObjectIdWithUniqAttr(atlasRelatedObjectId, atlasEntity.getTypeName(), atlasEntity.getAttribute(ModelConstants.ATTR_QUALIFIED_NAME));
    }

    public static AtlasRelatedObjectId toRelatedObjectIdWithUniqAttr(AtlasRelatedObjectId atlasRelatedObjectId, AtlasEntityHeader atlasEntityHeader) {
        return toRelatedObjectIdWithUniqAttr(atlasRelatedObjectId, atlasEntityHeader.getTypeName(), atlasEntityHeader.getAttribute(ModelConstants.ATTR_QUALIFIED_NAME));
    }

    private static AtlasRelatedObjectId toRelatedObjectIdWithUniqAttr(AtlasRelatedObjectId atlasRelatedObjectId, String str, Object obj) {
        AtlasRelatedObjectId atlasRelatedObjectId2 = new AtlasRelatedObjectId(new AtlasObjectId(str, ModelConstants.ATTR_QUALIFIED_NAME, obj));
        if (atlasRelatedObjectId != null) {
            atlasRelatedObjectId2.setRelationshipAttributes(atlasRelatedObjectId.getRelationshipAttributes());
            atlasRelatedObjectId2.setRelationshipType(atlasRelatedObjectId.getRelationshipType());
            atlasRelatedObjectId2.setRelationshipStatus(atlasRelatedObjectId.getRelationshipStatus());
        }
        return atlasRelatedObjectId2;
    }

    private AtlasEntityType ensureEntityType(String str) throws AtlasBaseException {
        AtlasEntityType entityTypeByName = this.typeRegistry.getEntityTypeByName(str);
        if (entityTypeByName == null) {
            throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_INVALID, new String[]{TypeCategory.ENTITY.name(), str});
        }
        return entityTypeByName;
    }

    private static byte[] toJson(Object obj) throws JsonProcessingException {
        return JacksonMapper.INSTANCE.writeValueAsBytes(obj);
    }

    @GraphTransaction(logRollback = false)
    public String getGuidByUniqueAttributes(AtlasEntityType atlasEntityType, Map<String, Object> map) throws AtlasBaseException {
        long currentTimeMillis = System.currentTimeMillis();
        String guidByUniqueAttributes = this.entityStore.getGuidByUniqueAttributes(atlasEntityType, map);
        this.metricsManager.recordGetGuidByUniqueAttributes(System.currentTimeMillis() - currentTimeMillis);
        return guidByUniqueAttributes;
    }

    @GraphTransaction(logRollback = false)
    public String getGuidByUniqueAttributes(AtlasEntityType atlasEntityType, Map<String, Object> map, AtlasEntity.Status status) throws AtlasBaseException {
        long currentTimeMillis = System.currentTimeMillis();
        String guidByUniqueAttributes = this.atlasGraphUtils.getGuidByUniqueAttributes(atlasEntityType, map, status);
        this.metricsManager.recordGetGuidByUniqueAttributesAndStatus(System.currentTimeMillis() - currentTimeMillis);
        return guidByUniqueAttributes;
    }

    @GraphTransaction(logRollback = false)
    public AtlasEntity.AtlasEntityWithExtInfo getByUniqueAttributes(AtlasEntityType atlasEntityType, Map<String, Object> map, boolean z, boolean z2) throws AtlasBaseException {
        try {
            long currentTimeMillis = System.currentTimeMillis();
            AtlasEntity.AtlasEntityWithExtInfo byUniqueAttributes = this.entityStore.getByUniqueAttributes(atlasEntityType, map, z, z2);
            this.metricsManager.recordGetByUniqueAttributes(System.currentTimeMillis() - currentTimeMillis);
            return byUniqueAttributes;
        } catch (AtlasBaseException e) {
            if (isJanusGraphNullPointerException(e)) {
                log.error("JanusGraph NullPointerException in MetadataRegistry.getByUniqueAttributes(entityType={}, attributes={}, minExtInfo={}, ignoreRelationships={})", new Object[]{atlasEntityType.getTypeName(), map.toString(), Boolean.valueOf(z), Boolean.valueOf(z2)});
                this.metricsManager.recordJanusGraphNullPointerExceptionError(1);
            }
            throw e;
        }
    }

    @GraphTransaction(logRollback = false)
    public AtlasEntityHeader getHeaderById(String str) throws AtlasBaseException {
        return this.entityStore.getHeaderById(str);
    }

    private static boolean isJanusGraphNullPointerException(AtlasBaseException atlasBaseException) {
        String message;
        int indexOfThrowable = ExceptionUtils.indexOfThrowable(atlasBaseException, NullPointerException.class);
        return (indexOfThrowable == -1 || (message = ((Throwable) ExceptionUtils.getThrowableList(atlasBaseException).get(indexOfThrowable)).getMessage()) == null || !message.contains("Could not find type for id")) ? false : true;
    }
}
