/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.schemaregistry.storage;

import com.google.common.annotations.VisibleForTesting;
import io.confluent.kafka.schemaregistry.SchemaProvider;
import io.confluent.kafka.schemaregistry.id.IdGenerator;
import io.confluent.kafka.schemaregistry.metrics.MetricsContainer;
import io.confluent.kafka.schemaregistry.metrics.SchemaRegistryMetric;
import io.confluent.kafka.schemaregistry.storage.ClearSubjectValue;
import io.confluent.kafka.schemaregistry.storage.DeleteSubjectValue;
import io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry;
import io.confluent.kafka.schemaregistry.storage.LookupCache;
import io.confluent.kafka.schemaregistry.storage.SchemaKey;
import io.confluent.kafka.schemaregistry.storage.SchemaRegistryKey;
import io.confluent.kafka.schemaregistry.storage.SchemaRegistryKeyType;
import io.confluent.kafka.schemaregistry.storage.SchemaRegistryValue;
import io.confluent.kafka.schemaregistry.storage.SchemaUpdateHandler;
import io.confluent.kafka.schemaregistry.storage.SchemaValue;
import io.confluent.kafka.schemaregistry.storage.StoreUpdateHandler;
import io.confluent.kafka.schemaregistry.storage.SubjectKey;
import io.confluent.kafka.schemaregistry.storage.SubjectValue;
import io.confluent.kafka.schemaregistry.storage.exceptions.StoreException;
import io.confluent.kafka.schemaregistry.utils.QualifiedSubject;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaStoreMessageHandler
implements SchemaUpdateHandler {
    private static final Logger log = LoggerFactory.getLogger(KafkaStoreMessageHandler.class);
    private final KafkaSchemaRegistry schemaRegistry;
    private final LookupCache<SchemaRegistryKey, SchemaRegistryValue> lookupCache;
    private final IdGenerator idGenerator;
    private final List<String> canonicalizeSchemaTypes;
    private final Map<TopicPartition, Long> offsets = new ConcurrentHashMap<TopicPartition, Long>();

    public KafkaStoreMessageHandler(KafkaSchemaRegistry schemaRegistry, LookupCache<SchemaRegistryKey, SchemaRegistryValue> lookupCache, IdGenerator idGenerator) {
        this.schemaRegistry = schemaRegistry;
        this.lookupCache = lookupCache;
        this.idGenerator = idGenerator;
        this.canonicalizeSchemaTypes = schemaRegistry.config().getList("schema.canonicalize.on.consume");
    }

    @Override
    public StoreUpdateHandler.ValidationStatus validateUpdate(SchemaRegistryKey key, SchemaRegistryValue value, TopicPartition tp, long offset, long timestamp) {
        block7: {
            SubjectValue subjectValue;
            if (value == null) {
                return StoreUpdateHandler.ValidationStatus.SUCCESS;
            }
            value.setOffset(offset);
            value.setTimestamp(timestamp);
            if (key.getKeyType() == SchemaRegistryKeyType.SCHEMA) {
                SchemaProvider schemaProvider;
                SchemaValue schemaObj = (SchemaValue)value;
                String schemaType = schemaObj.getSchemaType();
                if (this.canonicalizeSchemaTypes.contains(schemaType) && (schemaProvider = this.schemaRegistry.schemaProvider(schemaType)) != null) {
                    KafkaStoreMessageHandler.canonicalize(schemaProvider, schemaObj);
                }
                try {
                    SchemaValue oldSchema;
                    String qctx = QualifiedSubject.qualifiedContextFor((String)this.schemaRegistry.tenant(), (String)schemaObj.getSubject());
                    SchemaKey oldKey = this.lookupCache.schemaKeyById(schemaObj.getId(), qctx);
                    if (oldKey != null && (oldSchema = (SchemaValue)this.lookupCache.get(oldKey)) != null && !oldSchema.getSchema().equals(schemaObj.getSchema())) {
                        log.error("Found a schema with duplicate ID {}.  This schema will not be registered since a schema already exists with this ID.", (Object)schemaObj.getId());
                        return this.schemaRegistry.isLeader() ? StoreUpdateHandler.ValidationStatus.ROLLBACK_FAILURE : StoreUpdateHandler.ValidationStatus.IGNORE_FAILURE;
                    }
                    break block7;
                }
                catch (StoreException e) {
                    log.error("Error while retrieving schema", (Throwable)e);
                    return this.schemaRegistry.isLeader() ? StoreUpdateHandler.ValidationStatus.ROLLBACK_FAILURE : StoreUpdateHandler.ValidationStatus.IGNORE_FAILURE;
                }
            }
            if ((key.getKeyType() == SchemaRegistryKeyType.CONFIG || key.getKeyType() == SchemaRegistryKeyType.MODE) && (subjectValue = (SubjectValue)value).getSubject() == null) {
                subjectValue.setSubject(((SubjectKey)key).getSubject());
            }
        }
        return StoreUpdateHandler.ValidationStatus.SUCCESS;
    }

    @VisibleForTesting
    protected static void canonicalize(SchemaProvider schemaProvider, SchemaValue schemaValue) {
        schemaProvider.parseSchema(schemaValue.getSchema(), Collections.emptyList()).ifPresent(parsedSchema -> schemaValue.setSchema(parsedSchema.canonicalString()));
    }

    @Override
    public void handleUpdate(SchemaRegistryKey key, SchemaRegistryValue value, SchemaRegistryValue oldValue, TopicPartition tp, long offset, long timestamp) {
        if (key.getKeyType() == SchemaRegistryKeyType.SCHEMA) {
            this.handleSchemaUpdate((SchemaKey)key, (SchemaValue)value, (SchemaValue)oldValue);
        } else if (value != null) {
            if (key.getKeyType() == SchemaRegistryKeyType.DELETE_SUBJECT) {
                this.handleDeleteSubject((DeleteSubjectValue)value);
            } else if (key.getKeyType() == SchemaRegistryKeyType.CLEAR_SUBJECT) {
                this.handleClearSubject((ClearSubjectValue)value);
            }
        }
        this.offsets.put(tp, offset + 1L);
    }

    private void handleDeleteSubject(DeleteSubjectValue deleteSubjectValue) {
        String subject = deleteSubjectValue.getSubject();
        Integer deleteTillVersion = deleteSubjectValue.getVersion();
        for (int version = 1; version <= deleteTillVersion; ++version) {
            try {
                SchemaKey schemaKey = new SchemaKey(subject, version);
                SchemaValue schemaValue = (SchemaValue)this.lookupCache.get(schemaKey);
                if (schemaValue == null) continue;
                schemaValue.setDeleted(true);
                SchemaValue oldSchemaValue = (SchemaValue)this.lookupCache.put(schemaKey, schemaValue);
                this.lookupCache.schemaDeleted(schemaKey, schemaValue, oldSchemaValue);
                this.schemaRegistry.invalidateFromNewSchemaCache(schemaValue.toHashKey());
                continue;
            }
            catch (StoreException e) {
                log.error("Failed to delete subject {} in the local cache", (Object)subject, (Object)e);
            }
        }
    }

    private void handleClearSubject(ClearSubjectValue clearSubjectValue) {
        String subject = clearSubjectValue.getSubject();
        try {
            this.lookupCache.clearSubjects(subject);
        }
        catch (StoreException e) {
            log.error("Failed to clear subject {} in the local cache", (Object)subject, (Object)e);
        }
        this.schemaRegistry.clearNewSchemaCache();
        this.schemaRegistry.clearOldSchemaCache();
    }

    private void handleSchemaUpdate(SchemaKey schemaKey, SchemaValue schemaValue, SchemaValue oldSchemaValue) {
        MetricsContainer metricsContainer = this.schemaRegistry.getMetricsContainer();
        if (schemaValue != null) {
            this.idGenerator.schemaRegistered(schemaKey, schemaValue);
            if (schemaValue.isDeleted()) {
                this.lookupCache.schemaDeleted(schemaKey, schemaValue, oldSchemaValue);
                this.schemaRegistry.invalidateFromNewSchemaCache(schemaValue.toHashKey());
                KafkaStoreMessageHandler.updateMetrics(metricsContainer.getSchemasDeleted(), metricsContainer.getSchemasDeleted(KafkaStoreMessageHandler.getSchemaType(schemaValue)));
            } else {
                this.lookupCache.schemaRegistered(schemaKey, schemaValue, oldSchemaValue);
                KafkaStoreMessageHandler.updateMetrics(metricsContainer.getSchemasCreated(), metricsContainer.getSchemasCreated(KafkaStoreMessageHandler.getSchemaType(schemaValue)));
            }
        } else {
            this.lookupCache.schemaTombstoned(schemaKey, oldSchemaValue);
            this.schemaRegistry.clearOldSchemaCache();
        }
    }

    @Override
    public Map<TopicPartition, Long> checkpoint(int count) {
        return this.offsets;
    }

    private static String getSchemaType(SchemaValue schemaValue) {
        return schemaValue.getSchemaType() == null ? "AVRO" : schemaValue.getSchemaType();
    }

    private static void updateMetrics(SchemaRegistryMetric total, SchemaRegistryMetric perType) {
        total.record();
        if (perType != null) {
            perType.record();
        }
    }
}

