/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.catalog.hook;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Ticker;
import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Injector;
import io.confluent.catalog.DataCatalogConfig;
import io.confluent.catalog.hook.AvroSchemaAtlasHook;
import io.confluent.catalog.hook.JsonSchemaAtlasHook;
import io.confluent.catalog.hook.ProtobufSchemaAtlasHook;
import io.confluent.catalog.hook.SchemaAtlasProcessor;
import io.confluent.catalog.hook.SchemaAtlasTypes;
import io.confluent.catalog.hook.SchemaOperationContext;
import io.confluent.catalog.hook.TypeDefStoreLoader;
import io.confluent.catalog.model.instance.Tag;
import io.confluent.catalog.util.StripedExecutorService;
import io.confluent.catalog.util.StripedRunnable;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.client.rest.entities.Metadata;
import io.confluent.kafka.schemaregistry.client.rest.entities.Rule;
import io.confluent.kafka.schemaregistry.client.rest.entities.RuleSet;
import io.confluent.kafka.schemaregistry.client.rest.entities.SubjectVersion;
import io.confluent.kafka.schemaregistry.exceptions.InvalidSchemaException;
import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryException;
import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryStoreException;
import io.confluent.kafka.schemaregistry.json.JsonSchema;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import io.confluent.kafka.schemaregistry.storage.ClearSubjectValue;
import io.confluent.kafka.schemaregistry.storage.CloseableIterator;
import io.confluent.kafka.schemaregistry.storage.DeleteSubjectValue;
import io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry;
import io.confluent.kafka.schemaregistry.storage.LookupCache;
import io.confluent.kafka.schemaregistry.storage.SchemaIdAndSubjects;
import io.confluent.kafka.schemaregistry.storage.SchemaKey;
import io.confluent.kafka.schemaregistry.storage.SchemaRegistryKey;
import io.confluent.kafka.schemaregistry.storage.SchemaRegistryKeyType;
import io.confluent.kafka.schemaregistry.storage.SchemaRegistryValue;
import io.confluent.kafka.schemaregistry.storage.SchemaUpdateHandler;
import io.confluent.kafka.schemaregistry.storage.SchemaValue;
import io.confluent.kafka.schemaregistry.storage.SubjectKey;
import io.confluent.kafka.schemaregistry.storage.exceptions.StoreException;
import io.confluent.rest.RestConfigException;
import java.io.IOException;
import java.io.Serializable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.notification.HookNotification;
import org.apache.atlas.type.AtlasTypeUtil;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.MetricValueProvider;
import org.apache.kafka.common.metrics.Metrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SchemaAtlasHook
implements SchemaUpdateHandler {
    private static final Logger LOG = LoggerFactory.getLogger(SchemaAtlasHook.class);
    public static final String KEY = "hook";
    protected static final int DEFAULT_DELETED_SUBJECTS_CACHE_EXPIRY_SECS = 60;
    protected static final int DEFAULT_DELETED_SUBJECTS_CACHE_MAX_SIZE = 1000;
    protected static final int DEFAULT_DELETED_SUBJECTS_MAX_COUNT = 10;
    protected static final int DEFAULT_EXCLUDE_SUBJECTS_CACHE_EXPIRY_SECS = 1800;
    protected static final int DEFAULT_EXCLUDE_SUBJECTS_CACHE_MAX_SIZE = 1000;
    private final Ticker ticker;
    private KafkaSchemaRegistry schemaRegistry;
    private boolean isCatalogEnabled;
    private List<String> excludeSubjects;
    private Cache<String, Long> deletedSubjectsCache;
    private Cache<String, Long> excludeSubjectsCache;
    private StripedExecutorService stripedExecutor;
    private Injector injector;
    private TypeDefStoreLoader typeDefStoreLoader;
    private SchemaAtlasProcessor processor;
    private LongGauge atlasQueueSizeMetric;
    private final AvroSchemaAtlasHook avro = new AvroSchemaAtlasHook();
    private final ProtobufSchemaAtlasHook protobuf = new ProtobufSchemaAtlasHook();
    private final JsonSchemaAtlasHook json = new JsonSchemaAtlasHook();
    private final AtomicBoolean typeDefStoreInitialized = new AtomicBoolean();
    private final AtomicBoolean cacheInitialized = new AtomicBoolean();
    private final AtomicBoolean initialized = new AtomicBoolean();
    private final CountDownLatch initLatch = new CountDownLatch(1);
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition condition = this.lock.newCondition();
    private final AtomicInteger notifyCount = new AtomicInteger();
    private final AtomicInteger handleCount = new AtomicInteger();
    private final Map<TopicPartition, Long> offsets = new ConcurrentHashMap<TopicPartition, Long>();
    private volatile int notifyInitial;

    public SchemaAtlasHook() {
        this(Ticker.systemTicker());
    }

    public SchemaAtlasHook(Ticker ticker) {
        this.ticker = ticker;
    }

    protected Cache<String, Long> getDeletedSubjectsCache() {
        return this.deletedSubjectsCache;
    }

    protected Cache<String, Long> getExcludeSubjectsCache() {
        return this.excludeSubjectsCache;
    }

    public void configure(Map<String, ?> configs) {
        try {
            this.schemaRegistry = (KafkaSchemaRegistry)configs.get("schemaRegistry");
            DataCatalogConfig dataCatalogConfig = new DataCatalogConfig(this.schemaRegistry.config().originalProperties());
            this.isCatalogEnabled = dataCatalogConfig.isCatalogEnabled();
            if (!this.isCatalogEnabled) {
                return;
            }
            this.schemaRegistry.properties().put(KEY, this);
            this.excludeSubjects = dataCatalogConfig.catalogExcludeSubjects();
            LOG.info("Excluding subjects: {}", this.excludeSubjects);
            this.deletedSubjectsCache = Caffeine.newBuilder().maximumSize(1000L).expireAfterWrite(Duration.ofSeconds(60L)).ticker(this.ticker).build();
            this.excludeSubjectsCache = Caffeine.newBuilder().maximumSize(1000L).expireAfterWrite(Duration.ofSeconds(1800L)).ticker(this.ticker).build();
            this.stripedExecutor = new StripedExecutorService();
            Metrics metrics = this.schemaRegistry.getMetricsContainer().getMetrics();
            MetricName atlasQueueSize = metrics.metricName("atlas-queue-size", "data_catalog", "Queue size for schema Atlas hook");
            this.atlasQueueSizeMetric = new LongGauge(0L);
            metrics.addMetric(atlasQueueSize, (MetricValueProvider)this.atlasQueueSizeMetric);
        }
        catch (RestConfigException e) {
            throw new IllegalArgumentException("Could not instantiate SchemaAtlasHook", e);
        }
    }

    public boolean ready() {
        return this.typeDefStoreInitialized.get();
    }

    public boolean synced() {
        if (!(this.typeDefStoreInitialized.get() && this.cacheInitialized.get() && this.typeDefStoreLoader.metadataRegistryInitialized())) {
            return false;
        }
        int queueSize = this.stripedExecutor.getQueueSize();
        int activeCount = this.stripedExecutor.getActiveCount();
        return queueSize == 0 && activeCount == 0;
    }

    public Injector getInjector() {
        return this.injector;
    }

    public void notifyReady(Injector injector) {
        try {
            this.lock.lock();
            this.injector = injector;
            this.typeDefStoreLoader = (TypeDefStoreLoader)injector.getInstance(TypeDefStoreLoader.class);
            this.processor = (SchemaAtlasProcessor)injector.getInstance(SchemaAtlasProcessor.class);
            this.condition.signalAll();
        }
        finally {
            this.lock.unlock();
        }
    }

    public void waitForInit() throws InterruptedException {
        this.initLatch.await();
    }

    private void handleSchemaOperationContext(SchemaOperationContext ctx) {
        SubjectKey key = ctx.getKey();
        SchemaRegistryValue value = ctx.getValue();
        if (key.getKeyType() == SchemaRegistryKeyType.SCHEMA) {
            SchemaValue v = (SchemaValue)value;
            if (v != null) {
                if (v.isDeleted()) {
                    this.deleteSchemaEntities(ctx, false);
                } else {
                    this.createSchemaEntities(ctx);
                }
            } else {
                this.deleteSchemaEntities(ctx, true);
            }
        } else if (value != null) {
            if (key.getKeyType() == SchemaRegistryKeyType.DELETE_SUBJECT) {
                this.deleteSubjectEntities(ctx);
            } else if (key.getKeyType() == SchemaRegistryKeyType.CLEAR_SUBJECT) {
                this.clearSubjectEntities(ctx);
            }
        }
    }

    private void createSchemaEntities(SchemaOperationContext ctx) {
        try {
            SchemaValue value = (SchemaValue)ctx.getValue();
            this.createSubjectVersion(ctx, value);
        }
        catch (Exception e) {
            LOG.error("SchemaAtlasHook.createSchemaEntities(): failed to create entity", (Throwable)e);
        }
    }

    private AtlasEntity createSubjectVersion(SchemaOperationContext ctx, SchemaValue schemaValue) throws InvalidSchemaException {
        LOG.debug("SchemaAtlasHook.createSubjectVersion() {}", (Object)schemaValue);
        String subject = ctx.getSubject();
        int version = schemaValue.getVersion();
        String qualifiedName = ctx.getQualifiedName(subject, version);
        AtlasEntity entity = ctx.getEntity(qualifiedName);
        if (entity != null) {
            return entity;
        }
        entity = new AtlasEntity(SchemaAtlasTypes.SR_SUBJECT_VERSION.getName());
        int id = schemaValue.getId();
        SchemaAtlasHook.setDefaultAttrs(ctx, entity, qualifiedName, id, subject);
        entity.setAttribute("version", (Object)version);
        entity.setAttribute("schemaType", (Object)schemaValue.getSchemaType());
        AtlasEntity schema = this.createSchema(ctx, schemaValue);
        entity.setRelationshipAttribute("schema", (Object)AtlasTypeUtil.toAtlasRelatedObjectId((AtlasEntity)schema));
        schema.setRelationshipAttribute("versions", (Object)AtlasTypeUtil.toAtlasRelatedObjectIds(Collections.singletonList(entity)));
        ctx.createOrUpdate(entity);
        return entity;
    }

    protected static void setDefaultAttrs(SchemaOperationContext ctx, AtlasEntity entity, String qualifiedName, int id, String name) {
        entity.setAttribute("qualifiedName", (Object)qualifiedName);
        entity.setAttribute("tenant", (Object)ctx.getTenant());
        entity.setAttribute("context", (Object)ctx.getContext());
        entity.setAttribute("id", (Object)id);
        entity.setAttribute("name", (Object)name);
        entity.setAttribute("nameLower", (Object)(name != null ? name.toLowerCase() : null));
        entity.setAttribute("createTime", (Object)new Date(ctx.getTimestamp()));
        ctx.addEntity(qualifiedName, entity);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private AtlasEntity createSchema(SchemaOperationContext ctx, SchemaValue schemaValue) throws InvalidSchemaException {
        RuleSet ruleSet;
        Metadata metadata;
        AtlasEntity entity;
        ParsedSchema schema;
        String tenant = this.schemaRegistry.tenant();
        this.schemaRegistry.setTenant(ctx.getTenant());
        try {
            schema = this.schemaRegistry.parseSchema(schemaValue.toSchemaEntity());
        }
        finally {
            this.schemaRegistry.setTenant(tenant);
        }
        int id = schemaValue.getId();
        switch (schema.schemaType()) {
            case "JSON": {
                entity = this.json.createSchema(ctx, id, ((JsonSchema)schema).rawSchema());
                break;
            }
            case "PROTOBUF": {
                entity = this.protobuf.createSchema(ctx, id, ((ProtobufSchema)schema).rawSchema());
                break;
            }
            default: {
                entity = this.avro.createSchema(ctx, id, ((AvroSchema)schema).rawSchema());
            }
        }
        Set tags = schema.tags();
        if (tags != null) {
            ctx.setEmbeddedTags(tags);
            entity.setAttribute("embeddedTags", new ArrayList(tags));
        }
        if ((metadata = schema.metadata()) != null) {
            SortedMap pathTags;
            SortedMap properties;
            SortedSet sensitive = metadata.getSensitive();
            if (sensitive != null) {
                entity.setAttribute("sensitive", new ArrayList(sensitive));
            }
            if ((properties = metadata.getProperties()) != null) {
                String newVersion = (String)properties.get("confluent:version");
                if (newVersion != null) {
                    try {
                        ctx.setNewVersion(Integer.parseInt(newVersion));
                    }
                    catch (NumberFormatException numberFormatException) {
                        // empty catch block
                    }
                }
                List props = properties.entrySet().stream().map(e -> this.createProperty(ctx, id, (String)e.getKey(), (String)e.getValue(), sensitive != null && sensitive.contains(e.getKey()))).collect(Collectors.toList());
                entity.setRelationshipAttribute("properties", (Object)AtlasTypeUtil.toAtlasRelatedObjectIds(props));
                for (AtlasEntity prop : props) {
                    prop.setRelationshipAttribute("schema", (Object)AtlasTypeUtil.toAtlasRelatedObjectId((AtlasEntity)entity));
                }
            }
            if ((pathTags = metadata.getTags()) != null) {
                List paths = pathTags.entrySet().stream().map(e -> this.createPath(ctx, id, (String)e.getKey(), (Set)e.getValue())).collect(Collectors.toList());
                entity.setRelationshipAttribute("paths", (Object)AtlasTypeUtil.toAtlasRelatedObjectIds((Collection)paths));
                Iterator<Object> iterator = paths.iterator();
                while (iterator.hasNext()) {
                    AtlasEntity path = (AtlasEntity)iterator.next();
                    path.setRelationshipAttribute("schema", (Object)AtlasTypeUtil.toAtlasRelatedObjectId((AtlasEntity)entity));
                }
            }
        }
        if ((ruleSet = schema.ruleSet()) != null) {
            List domainRules;
            List migrationRules = ruleSet.getMigrationRules();
            if (migrationRules != null) {
                List rules = migrationRules.stream().map(r -> this.createRule(ctx, id, (Rule)r)).collect(Collectors.toList());
                entity.setRelationshipAttribute("migration_rules", (Object)AtlasTypeUtil.toAtlasRelatedObjectIds(rules));
                for (AtlasEntity rule : rules) {
                    rule.setRelationshipAttribute("schema", (Object)AtlasTypeUtil.toAtlasRelatedObjectId((AtlasEntity)entity));
                }
            }
            if ((domainRules = ruleSet.getDomainRules()) != null) {
                List rules = domainRules.stream().map(r -> this.createRule(ctx, id, (Rule)r)).collect(Collectors.toList());
                entity.setRelationshipAttribute("domain_rules", (Object)AtlasTypeUtil.toAtlasRelatedObjectIds(rules));
                for (AtlasEntity rule : rules) {
                    rule.setRelationshipAttribute("schema", (Object)AtlasTypeUtil.toAtlasRelatedObjectId((AtlasEntity)entity));
                }
            }
        }
        return entity;
    }

    private AtlasEntity createProperty(SchemaOperationContext ctx, int id, String name, String value, boolean isSensitive) {
        String qualifiedName = ctx.getQualifiedName(id, name);
        AtlasEntity entity = ctx.getEntity(qualifiedName);
        if (entity != null) {
            return entity;
        }
        entity = new AtlasEntity(SchemaAtlasTypes.SR_SCHEMA_PROPERTY.getName());
        SchemaAtlasHook.setDefaultAttrs(ctx, entity, qualifiedName, id, name);
        entity.setAttribute("value", (Object)(isSensitive ? "[hidden]" : value));
        ctx.createOrUpdate(entity);
        return entity;
    }

    private AtlasEntity createPath(SchemaOperationContext ctx, int id, String path, Set<String> tags) {
        String qualifiedName = ctx.getQualifiedName(id, path);
        AtlasEntity entity = ctx.getEntity(qualifiedName);
        if (entity != null) {
            return entity;
        }
        entity = new AtlasEntity(SchemaAtlasTypes.SR_PATH.getName());
        SchemaAtlasHook.setDefaultAttrs(ctx, entity, qualifiedName, id, path);
        entity.setClassifications(tags.stream().map(t -> new Tag(ctx.getTenant(), (String)t)).collect(Collectors.toList()));
        ctx.createOrUpdate(entity);
        return entity;
    }

    private AtlasEntity createRule(SchemaOperationContext ctx, int id, Rule rule) {
        SortedSet ruleTags;
        String qualifiedName = ctx.getQualifiedName(id, rule.getName());
        AtlasEntity entity = ctx.getEntity(qualifiedName);
        if (entity != null) {
            return entity;
        }
        entity = new AtlasEntity(SchemaAtlasTypes.SR_RULE.getName());
        SchemaAtlasHook.setDefaultAttrs(ctx, entity, qualifiedName, id, rule.getName());
        entity.setAttribute("doc", (Object)rule.getDoc());
        if (rule.getKind() != null) {
            entity.setAttribute("kind", (Object)rule.getKind().name());
        }
        if (rule.getMode() != null) {
            entity.setAttribute("mode", (Object)rule.getMode().name());
        }
        entity.setAttribute("type", (Object)rule.getType());
        entity.setAttribute("expr", (Object)rule.getExpr());
        entity.setAttribute("onSuccess", (Object)rule.getOnSuccess());
        entity.setAttribute("onFailure", (Object)rule.getOnFailure());
        entity.setAttribute("disabled", (Object)rule.isDisabled());
        SortedMap ruleParams = rule.getParams();
        if (ruleParams != null) {
            List params = ruleParams.entrySet().stream().map(e -> this.createRuleParameter(ctx, id, rule, (String)e.getKey(), (String)e.getValue())).collect(Collectors.toList());
            entity.setRelationshipAttribute("parameters", (Object)AtlasTypeUtil.toAtlasRelatedObjectIds(params));
            for (AtlasEntity param : params) {
                param.setRelationshipAttribute("rule", (Object)AtlasTypeUtil.toAtlasRelatedObjectId((AtlasEntity)entity));
            }
        }
        if ((ruleTags = rule.getTags()) != null) {
            List tags = ruleTags.stream().map(t -> this.createRuleTag(ctx, id, rule, (String)t)).collect(Collectors.toList());
            entity.setRelationshipAttribute("tags", (Object)AtlasTypeUtil.toAtlasRelatedObjectIds(tags));
            for (AtlasEntity tag : tags) {
                tag.setRelationshipAttribute("rule", (Object)AtlasTypeUtil.toAtlasRelatedObjectId((AtlasEntity)entity));
            }
        }
        ctx.createOrUpdate(entity);
        return entity;
    }

    private AtlasEntity createRuleParameter(SchemaOperationContext ctx, int id, Rule rule, String name, String value) {
        String qualifiedName = ctx.getQualifiedName(id, rule.getName(), name);
        AtlasEntity entity = ctx.getEntity(qualifiedName);
        if (entity != null) {
            return entity;
        }
        entity = new AtlasEntity(SchemaAtlasTypes.SR_RULE_PARAMETER.getName());
        SchemaAtlasHook.setDefaultAttrs(ctx, entity, qualifiedName, id, name);
        entity.setAttribute("value", (Object)value);
        ctx.createOrUpdate(entity);
        return entity;
    }

    private AtlasEntity createRuleTag(SchemaOperationContext ctx, int id, Rule rule, String tag) {
        String qualifiedName = ctx.getQualifiedName(id, rule.getName(), tag);
        AtlasEntity entity = ctx.getEntity(qualifiedName);
        if (entity != null) {
            return entity;
        }
        entity = new AtlasEntity(SchemaAtlasTypes.SR_RULE_TAG.getName());
        SchemaAtlasHook.setDefaultAttrs(ctx, entity, qualifiedName, id, tag);
        ctx.createOrUpdate(entity);
        return entity;
    }

    private void deleteSubjectEntities(SchemaOperationContext ctx) {
        DeleteSubjectValue deleteSubjectValue = (DeleteSubjectValue)ctx.getValue();
        LOG.debug("SchemaAtlasHook.deleteSubjectEntities() {}", (Object)deleteSubjectValue);
        String subject = deleteSubjectValue.getSubject();
        long count = this.deletedSubjectsCache.asMap().compute(subject, (cachedSubject, currentCount) -> {
            if (currentCount == null) {
                return 1L;
            }
            if (currentCount >= 10L) {
                return currentCount;
            }
            return currentCount + 1L;
        });
        if (count >= 10L) {
            this.deletedSubjectsCache.invalidate((Object)subject);
            this.excludeSubjectsCache.put((Object)subject, (Object)count);
            LOG.info("Excluding subject: {}, due to max {} deletions within {} secs", new Object[]{subject, 10, 60});
            return;
        }
        Integer deleteTillVersion = deleteSubjectValue.getVersion();
        for (int version = 1; version <= deleteTillVersion; ++version) {
            try {
                SchemaKey schemaKey = new SchemaKey(subject, version);
                SchemaValue schemaValue = (SchemaValue)this.schemaRegistry.getLookupCache().get((Object)schemaKey);
                if (schemaValue == null) continue;
                this.deleteSubjectVersion(ctx, schemaValue, false);
                continue;
            }
            catch (Exception e) {
                LOG.error("SchemaAtlasHook.deleteSubjectEntities(): failed to delete entity", (Throwable)e);
            }
        }
    }

    private void clearSubjectEntities(SchemaOperationContext ctx) {
        ClearSubjectValue clearSubjectValue = (ClearSubjectValue)ctx.getValue();
        LOG.debug("SchemaAtlasHook.clearSubjectEntities() {}", (Object)clearSubjectValue);
        String clearSubject = clearSubjectValue.getSubject();
        try {
            Set subjects = this.schemaRegistry.getLookupCache().subjects(clearSubject, true);
            for (String subject : subjects) {
                Iterator<SchemaValue> iter = this.allVersions(subject);
                while (iter.hasNext()) {
                    SchemaValue schema = iter.next();
                    this.deleteSubjectVersion(ctx, schema, true);
                }
            }
        }
        catch (Exception e) {
            LOG.error("SchemaAtlasHook.clearSubjectEntities(): failed to clear entity", (Throwable)e);
        }
    }

    private Iterator<SchemaValue> allVersions(String subject) throws SchemaRegistryException {
        try {
            ArrayList<SchemaValue> result = new ArrayList<SchemaValue>();
            SchemaKey key1 = new SchemaKey(subject, 1);
            SchemaKey key2 = new SchemaKey(subject, Integer.MAX_VALUE);
            try (CloseableIterator allVersions = this.schemaRegistry.getLookupCache().getAll((Object)key1, (Object)key2);){
                while (allVersions.hasNext()) {
                    result.add((SchemaValue)allVersions.next());
                }
            }
            return result.iterator();
        }
        catch (StoreException e) {
            throw new SchemaRegistryStoreException("Error from the backend Kafka store", (Throwable)e);
        }
    }

    private void deleteSchemaEntities(SchemaOperationContext ctx, boolean purge) {
        try {
            SchemaValue value = (SchemaValue)ctx.getOldValue();
            if (value != null) {
                this.deleteSubjectVersion(ctx, value, purge);
            }
        }
        catch (Exception e) {
            LOG.error("SchemaAtlasHook.deleteSchemaEntities(): failed to delete entity", (Throwable)e);
        }
    }

    private void deleteSubjectVersion(SchemaOperationContext ctx, SchemaValue schemaValue, boolean purge) throws InvalidSchemaException {
        LOG.debug("SchemaAtlasHook.deleteSubjectVersion() {}", (Object)schemaValue);
        String subject = ctx.getSubject();
        int version = schemaValue.getVersion();
        String qualifiedName = ctx.getQualifiedName(subject, version);
        AtlasObjectId entityId = new AtlasObjectId(SchemaAtlasTypes.SR_SUBJECT_VERSION.getName(), "qualifiedName", (Object)qualifiedName);
        boolean doDelete = this.doDelete(ctx, schemaValue, purge);
        if (doDelete) {
            this.deleteSchema(ctx, schemaValue, purge);
        }
        if (purge) {
            ctx.purge(entityId);
        } else {
            ctx.delete(entityId);
        }
    }

    private boolean doDelete(SchemaOperationContext ctx, SchemaValue schemaValue, boolean purge) {
        if (!this.cacheInitialized.get()) {
            return false;
        }
        int id = schemaValue.getId();
        try {
            LookupCache lookupCache = this.schemaRegistry.getLookupCache();
            SchemaIdAndSubjects idAndSubjects = lookupCache.schemaIdAndSubjects(schemaValue.toSchemaEntity());
            ArrayList subjectVersions = idAndSubjects != null ? (List)idAndSubjects.allSubjectVersions().entrySet().stream().map(e -> new SubjectVersion((String)e.getKey(), (Integer)e.getValue())).collect(Collectors.toCollection(ArrayList::new)) : new ArrayList();
            subjectVersions.remove(new SubjectVersion(schemaValue.getSubject(), schemaValue.getVersion()));
            if (purge) {
                return subjectVersions.isEmpty();
            }
            for (SubjectVersion sv : subjectVersions) {
                SchemaValue value = (SchemaValue)lookupCache.get((Object)new SchemaKey(sv.getSubject(), sv.getVersion().intValue()));
                if (value == null || value.isDeleted()) continue;
                return false;
            }
            return true;
        }
        catch (Exception e2) {
            LOG.error("SchemaAtlasHook.doDelete(): could not find schema ID " + id, (Throwable)e2);
            return false;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void deleteSchema(SchemaOperationContext ctx, SchemaValue schemaValue, boolean purge) throws InvalidSchemaException {
        RuleSet ruleSet;
        ParsedSchema schema;
        String tenant = this.schemaRegistry.tenant();
        this.schemaRegistry.setTenant(ctx.getTenant());
        try {
            schema = this.schemaRegistry.parseSchema(schemaValue.toSchemaEntity());
        }
        finally {
            this.schemaRegistry.setTenant(tenant);
        }
        int id = schemaValue.getId();
        switch (schema.schemaType()) {
            case "JSON": {
                new JsonSchemaAtlasHook().deleteSchema(ctx, id, ((JsonSchema)schema).rawSchema(), purge);
                break;
            }
            case "PROTOBUF": {
                new ProtobufSchemaAtlasHook().deleteSchema(ctx, id, ((ProtobufSchema)schema).rawSchema(), purge);
                break;
            }
            default: {
                new AvroSchemaAtlasHook().deleteSchema(ctx, id, ((AvroSchema)schema).rawSchema(), purge);
            }
        }
        Metadata metadata = schema.metadata();
        if (metadata != null) {
            SortedMap pathTags;
            SortedMap properties = metadata.getProperties();
            if (properties != null) {
                properties.forEach((key, value) -> this.deleteProperty(ctx, id, (String)key, purge));
            }
            if ((pathTags = metadata.getTags()) != null) {
                pathTags.forEach((key, value) -> this.deletePath(ctx, id, (String)key, purge));
            }
        }
        if ((ruleSet = schema.ruleSet()) != null) {
            List domainRules;
            List migrationRules = ruleSet.getMigrationRules();
            if (migrationRules != null) {
                migrationRules.forEach(r -> this.deleteRule(ctx, id, (Rule)r, purge));
            }
            if ((domainRules = ruleSet.getDomainRules()) != null) {
                domainRules.forEach(r -> this.deleteRule(ctx, id, (Rule)r, purge));
            }
        }
    }

    private void deleteProperty(SchemaOperationContext ctx, int id, String name, boolean purge) {
        String qualifiedName = ctx.getQualifiedName(id, name);
        if (ctx.isDeleted(qualifiedName)) {
            return;
        }
        AtlasObjectId entityId = new AtlasObjectId(SchemaAtlasTypes.SR_SCHEMA_PROPERTY.getName(), "qualifiedName", (Object)qualifiedName);
        ctx.addDeleted(qualifiedName);
        if (purge) {
            ctx.purge(entityId);
        } else {
            ctx.delete(entityId);
        }
    }

    private void deletePath(SchemaOperationContext ctx, int id, String path, boolean purge) {
        String qualifiedName = ctx.getQualifiedName(id, path);
        if (ctx.isDeleted(qualifiedName)) {
            return;
        }
        AtlasObjectId entityId = new AtlasObjectId(SchemaAtlasTypes.SR_PATH.getName(), "qualifiedName", (Object)qualifiedName);
        ctx.addDeleted(qualifiedName);
        if (purge) {
            ctx.purge(entityId);
        } else {
            ctx.delete(entityId);
        }
    }

    private void deleteRule(SchemaOperationContext ctx, int id, Rule rule, boolean purge) {
        SortedSet ruleTags;
        String qualifiedName = ctx.getQualifiedName(id, rule.getName());
        if (ctx.isDeleted(qualifiedName)) {
            return;
        }
        AtlasObjectId entityId = new AtlasObjectId(SchemaAtlasTypes.SR_RULE.getName(), "qualifiedName", (Object)qualifiedName);
        ctx.addDeleted(qualifiedName);
        SortedMap ruleParams = rule.getParams();
        if (ruleParams != null) {
            ruleParams.forEach((key, value) -> this.deleteRuleParameter(ctx, id, rule, (String)key, purge));
        }
        if ((ruleTags = rule.getTags()) != null) {
            ruleTags.forEach(tag -> this.deleteRuleTag(ctx, id, rule, (String)tag, purge));
        }
        if (purge) {
            ctx.purge(entityId);
        } else {
            ctx.delete(entityId);
        }
    }

    private void deleteRuleParameter(SchemaOperationContext ctx, int id, Rule rule, String name, boolean purge) {
        String qualifiedName = ctx.getQualifiedName(id, rule.getName(), name);
        if (ctx.isDeleted(qualifiedName)) {
            return;
        }
        AtlasObjectId entityId = new AtlasObjectId(SchemaAtlasTypes.SR_RULE_PARAMETER.getName(), "qualifiedName", (Object)qualifiedName);
        ctx.addDeleted(qualifiedName);
        if (purge) {
            ctx.purge(entityId);
        } else {
            ctx.delete(entityId);
        }
    }

    private void deleteRuleTag(SchemaOperationContext ctx, int id, Rule rule, String tag, boolean purge) {
        String qualifiedName = ctx.getQualifiedName(id, rule.getName(), tag);
        if (ctx.isDeleted(qualifiedName)) {
            return;
        }
        AtlasObjectId entityId = new AtlasObjectId(SchemaAtlasTypes.SR_RULE_TAG.getName(), "qualifiedName", (Object)qualifiedName);
        ctx.addDeleted(qualifiedName);
        if (purge) {
            ctx.purge(entityId);
        } else {
            ctx.delete(entityId);
        }
    }

    public void cacheInitialized(Map<TopicPartition, Long> checkpoints) {
        if (!this.isCatalogEnabled) {
            return;
        }
        this.cacheInitialized.set(true);
        this.notifyInitial = this.notifyCount.get();
        int handled = this.handleCount.get();
        LOG.info("SchemaAtlasHook.cacheInitialized(): waiting for {} notifs, handled {}", (Object)this.notifyInitial, (Object)handled);
        this.checkIfInitialized(handled);
        this.stripedExecutor.execute(this::initTypeDefStore);
    }

    private void checkIfInitialized(int handled) {
        if (!this.initialized.get() && this.cacheInitialized.get() && handled >= this.notifyInitial) {
            LOG.info("SchemaAtlasHook.notify(): completed initial {} notifs, handled {}", (Object)this.notifyInitial, (Object)handled);
            this.initialized.set(true);
            this.initLatch.countDown();
        }
    }

    public void handleUpdate(SchemaRegistryKey key, SchemaRegistryValue value, SchemaRegistryValue oldValue, TopicPartition tp, long offset, long timestamp) {
        SubjectKey subjectKey;
        if (!this.isCatalogEnabled) {
            return;
        }
        if (key instanceof SubjectKey && (subjectKey = (SubjectKey)key).getSubject() != null) {
            this.sendSchemaOperation(subjectKey, value, oldValue, tp, offset, timestamp);
        }
    }

    @VisibleForTesting
    protected SchemaOperationContext sendSchemaOperation(SubjectKey key, SchemaRegistryValue value, SchemaRegistryValue oldValue, TopicPartition tp, long offset, long timestamp) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("==> SchemaAtlasHook.sendSchemaOperation()");
        }
        SchemaOperationContext ctx = null;
        try {
            ctx = this.handleSchemaOperation(key, value, oldValue, tp, offset, timestamp);
            this.sendNotification(ctx);
        }
        catch (Throwable t) {
            LOG.error("SchemaAtlasHook.sendSchemaOperation(): failed to send notification", t);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("<== SchemaAtlasHook.sendSchemaOperation()");
        }
        return ctx;
    }

    private void sendNotification(SchemaOperationContext ctx) {
        Set<AtlasObjectId> purges;
        List<AtlasEntity> upserts = ctx.getEntitiesToCreateOrUpdate();
        if (!upserts.isEmpty()) {
            this.notify(ctx, (HookNotification)new HookNotification.EntityCreateRequestV2(null, new AtlasEntity.AtlasEntitiesWithExtInfo(upserts)));
        }
        Map<AtlasObjectId, AtlasEntity> updates = ctx.getEntitiesToPartiallyUpdate();
        for (Map.Entry<AtlasObjectId, AtlasEntity> entry : updates.entrySet()) {
            this.notify(ctx, (HookNotification)new HookNotification.EntityPartialUpdateRequestV2(null, entry.getKey(), new AtlasEntity.AtlasEntityWithExtInfo(entry.getValue())));
        }
        Set<AtlasObjectId> deletes = ctx.getEntitiesToDelete();
        if (!deletes.isEmpty()) {
            this.notify(ctx, (HookNotification)new HookNotification.EntityDeleteRequestV2(null, new ArrayList<AtlasObjectId>(deletes)));
        }
        if (!(purges = ctx.getEntitiesToPurge()).isEmpty()) {
            this.notify(ctx, (HookNotification)new EntityPurgeRequestV2(null, new ArrayList<AtlasObjectId>(purges)));
        }
    }

    private SchemaOperationContext handleSchemaOperation(SubjectKey key, SchemaRegistryValue value, SchemaRegistryValue oldValue, TopicPartition tp, long offset, long timestamp) {
        SchemaOperationContext ctx;
        if (LOG.isDebugEnabled()) {
            LOG.debug("==> SchemaAtlasHook.handleSchemaNameSpaceOperation()");
        }
        if (!this.excludeSubject(ctx = new SchemaOperationContext(this.schemaRegistry, key, value, oldValue, tp, offset, timestamp))) {
            this.handleSchemaOperationContext(ctx);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("<== SchemaAtlasHook.handleSchemaNameSpaceOperation(): {}", (Object)ctx);
        }
        return ctx;
    }

    private boolean excludeSubject(SchemaOperationContext ctx) {
        String subject = ctx.getSubject();
        String qualifiedSubject = ctx.getQualifiedSubject().toQualifiedSubject();
        return this.excludeSubjects.contains(subject) || this.excludeSubjects.contains(qualifiedSubject) || this.excludeSubjectsCache.getIfPresent((Object)qualifiedSubject) != null;
    }

    private void notify(SchemaOperationContext ctx, HookNotification message) {
        this.notifyCount.incrementAndGet();
        this.stripedExecutor.execute(new NotificationRunnable(ctx, message));
    }

    private void initTypeDefStore() {
        try {
            this.lock.lock();
            while (this.injector == null) {
                try {
                    this.condition.await();
                }
                catch (InterruptedException interruptedException) {}
            }
            if (!this.typeDefStoreInitialized.get()) {
                this.typeDefStoreLoader.initTypeDefStore(this.injector);
                this.typeDefStoreInitialized.set(true);
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    public Map<TopicPartition, Long> checkpoint(int count) {
        return this.offsets;
    }

    public void close() throws IOException {
        this.shutdownQuietly(this.stripedExecutor);
    }

    private void shutdownQuietly(ExecutorService executor) {
        try {
            if (executor != null) {
                executor.shutdown();
                if (!executor.awaitTermination(5000L, TimeUnit.MILLISECONDS)) {
                    LOG.error("Timed out waiting for executor to shut down, exiting uncleanly");
                }
            }
        }
        catch (InterruptedException e) {
            LOG.error("Failure in shutting down executor");
        }
    }

    public int getThreadQueueSize(String tenant) {
        return this.stripedExecutor.getQueueSize(tenant);
    }

    private void recordAtlasQueueSize() {
        this.atlasQueueSizeMetric.update(Long.valueOf(this.stripedExecutor.getQueueSize()));
    }

    public static class LongGauge
    implements Gauge<Long> {
        private final AtomicLong value;

        LongGauge(Long initialValue) {
            this.value = new AtomicLong(initialValue);
        }

        public Long value(MetricConfig config, long now) {
            return this.value.get();
        }

        public void update(Long value) {
            this.value.set(value);
        }
    }

    public class NotificationRunnable
    implements StripedRunnable {
        private final SchemaOperationContext ctx;
        private final HookNotification message;

        public NotificationRunnable(SchemaOperationContext ctx, HookNotification message) {
            this.ctx = ctx;
            this.message = message;
        }

        public String tenant() {
            return this.ctx.getTenant();
        }

        @Override
        public Object getStripe() {
            return this.tenant();
        }

        @Override
        public void run() {
            SchemaAtlasHook.this.initTypeDefStore();
            SchemaAtlasHook.this.processor.handleMessage(this.ctx, this.message, SchemaAtlasHook.this.initialized.get());
            int handled = SchemaAtlasHook.this.handleCount.incrementAndGet();
            SchemaAtlasHook.this.checkIfInitialized(handled);
            if (handled % 100 == 0) {
                int notified = SchemaAtlasHook.this.notifyCount.get();
                LOG.info("SchemaAtlasHook.notify(): notified {}, handled {}", (Object)notified, (Object)handled);
                SchemaAtlasHook.this.recordAtlasQueueSize();
            }
            SchemaAtlasHook.this.offsets.put(this.ctx.getTopicPartition(), this.ctx.getOffset() + 1L);
        }
    }

    @JsonAutoDetect(getterVisibility=JsonAutoDetect.Visibility.PUBLIC_ONLY, setterVisibility=JsonAutoDetect.Visibility.PUBLIC_ONLY, fieldVisibility=JsonAutoDetect.Visibility.NONE)
    @JsonSerialize(include=JsonSerialize.Inclusion.ALWAYS)
    @JsonIgnoreProperties(ignoreUnknown=true)
    @XmlRootElement
    @XmlAccessorType(value=XmlAccessType.PROPERTY)
    public static class EntityPurgeRequestV2
    extends HookNotification.EntityDeleteRequestV2
    implements Serializable {
        public EntityPurgeRequestV2(String user, List<AtlasObjectId> entities) {
            super(user, entities);
        }
    }
}

