/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.catalog.ingestion.event;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.protobuf.Timestamp;
import com.google.protobuf.util.Timestamps;
import io.cloudevents.CloudEvent;
import io.cloudevents.CloudEventData;
import io.confluent.catalog.DataCatalogConfig;
import io.confluent.catalog.ingestion.ReconciliationCache;
import io.confluent.catalog.ingestion.event.EventDeserializer;
import io.confluent.catalog.metrics.MetricsManager;
import io.confluent.catalog.model.ModelConstants;
import io.confluent.catalog.storage.MetadataRegistry;
import io.confluent.catalog.util.QualifiedNameGenerator;
import io.confluent.protobuf.events.catalog.v1.ClusterLinkMetadata;
import io.confluent.protobuf.events.catalog.v1.ConnectMetadata;
import io.confluent.protobuf.events.catalog.v1.EnvironmentMetadata;
import io.confluent.protobuf.events.catalog.v1.LogicalClusterMetadata;
import io.confluent.protobuf.events.catalog.v1.MetadataChange;
import io.confluent.protobuf.events.catalog.v1.MetadataEvent;
import io.confluent.protobuf.events.catalog.v1.OpType;
import io.confluent.protobuf.events.catalog.v1.PipelineMetadata;
import io.confluent.protobuf.events.catalog.v1.TopicMetadata;
import io.confluent.rest.RestConfigException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.RequestContext;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.streams.processor.api.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EventProcessor {
    private static final Logger log = LoggerFactory.getLogger(EventProcessor.class);
    public static final String LSRC_HEADER = "_lsrc";
    public static final String RECONCILIATION_HEADER = "ce_reconciliation";
    public static final String PAGE_NUMBER_HEADER = "ce_page";
    public static final String TOTAL_PAGE_HEADER = "ce_total";
    public static final String LAST_PAGE_HEADER = "ce_lastpage";
    private final boolean isReconciliationClusterEnabled;
    private final boolean isReconciliationTopicEnabled;
    private final MetadataRegistry metadataRegistry;
    private final MetricsManager metricsManager;
    private final EventDeserializer eventDeserializer = new EventDeserializer();
    private final Cache<String, Long> entityTimeCache;
    private final ReconciliationCache reconciliationCache;

    public EventProcessor(MetadataRegistry metadataRegistry, MetricsManager metricsManager) {
        try {
            this.metadataRegistry = metadataRegistry;
            this.metricsManager = metricsManager;
            DataCatalogConfig dataCatalogConfig = new DataCatalogConfig(metadataRegistry.config().originalProperties());
            this.isReconciliationClusterEnabled = dataCatalogConfig.isCatalogReconciliationClusterEnabled();
            this.isReconciliationTopicEnabled = dataCatalogConfig.isCatalogReconciliationTopicEnabled();
            this.entityTimeCache = Caffeine.newBuilder().maximumSize((long)dataCatalogConfig.getCatalogIngestorCacheMaxSize()).expireAfterWrite(Duration.ofSeconds(dataCatalogConfig.getCatalogIngestorCacheTtlSec())).build();
            this.reconciliationCache = new ReconciliationCache(dataCatalogConfig.getCatalogReconciliationCacheMaxSize(), dataCatalogConfig.getCatalogReconciliationCacheTtlSec());
        }
        catch (RestConfigException e) {
            throw new IllegalArgumentException("Could not instantiate EventProcessor", e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void process(Record<String, CloudEvent> record) {
        try {
            this.metadataRegistry.waitForInit();
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        String id = (String)record.key();
        CloudEvent cloudEvent = (CloudEvent)record.value();
        CloudEventData data = cloudEvent.getData();
        if (data == null) {
            this.metricsManager.recordIngestionTransformError(1);
            log.error("Missing data in cloud event");
            return;
        }
        Long newTime = this.getTimestamp(cloudEvent);
        if (newTime != null && newTime != 0L) {
            long currentTimeMs = System.currentTimeMillis();
            this.metricsManager.recordIngestionEventsPipelineTime((double)(currentTimeMs - newTime) / 1000.0);
        }
        Headers headers = record.headers();
        MetadataChange metadataEntry = this.eventDeserializer.deserialize(null, data.toBytes());
        Header lsrcHeader = headers.lastHeader(LSRC_HEADER);
        if (lsrcHeader == null) {
            this.metricsManager.recordIngestionTransformError(1);
            log.error("Could not find lsrc for record with key " + id);
            return;
        }
        String tenant = new String(lsrcHeader.value(), StandardCharsets.UTF_8);
        String source = metadataEntry.getSource();
        OpType op = metadataEntry.getOp();
        List<MetadataEvent> events = metadataEntry.getEventsList();
        try {
            this.process(tenant, source, op, newTime, events, headers);
        }
        catch (Exception e) {
            this.metricsManager.recordIngestionTransformError(1);
            log.error("Error during event processing", (Throwable)e);
        }
        finally {
            RequestContext.clear();
        }
    }

    private void process(String tenant, String source, OpType opType, Long newTime, List<MetadataEvent> events, Headers headers) {
        List<List<MetadataEvent>> eventsList = this.getEventsList(events);
        switch (opType) {
            case CREATE: 
            case UPDATE: {
                this.createOrUpdateMetadata(tenant, source, eventsList, newTime);
                break;
            }
            case SNAPSHOT: {
                this.processSnapshot(tenant, source, eventsList, newTime, headers);
                break;
            }
            case DELETE: 
            case PURGE: {
                this.deleteMetadata(tenant, source, eventsList, newTime);
                break;
            }
            default: {
                log.warn("Unrecognized opType {}", (Object)opType);
            }
        }
    }

    private List<List<MetadataEvent>> getEventsList(List<MetadataEvent> events) {
        ArrayList<List<MetadataEvent>> eventsList = new ArrayList<List<MetadataEvent>>();
        for (MetadataEvent.MetadataCase metadataCase : MetadataEvent.MetadataCase.values()) {
            eventsList.add(new ArrayList());
        }
        for (MetadataEvent e : events) {
            int metadataType = e.getMetadataCase().getNumber();
            ((List)eventsList.get(metadataType)).add(e);
        }
        return eventsList;
    }

    private boolean isReconciliationCluster(Headers headers) {
        if (!this.isReconciliationClusterEnabled) {
            return false;
        }
        Header isReconciliationHeader = headers.lastHeader(RECONCILIATION_HEADER);
        return isReconciliationHeader != null && Boolean.parseBoolean(new String(isReconciliationHeader.value(), StandardCharsets.UTF_8));
    }

    private boolean isReconciliationTopic(Headers headers) {
        if (!this.isReconciliationTopicEnabled) {
            return false;
        }
        Header isReconciliationHeader = headers.lastHeader(RECONCILIATION_HEADER);
        return isReconciliationHeader != null && Boolean.parseBoolean(new String(isReconciliationHeader.value(), StandardCharsets.UTF_8));
    }

    private void processSnapshot(String tenant, String source, List<List<MetadataEvent>> events, Long newTime, Headers headers) {
        List<EnvironmentMetadata> environmentMetadataList;
        List<PipelineMetadata> pipelineMetadataList;
        List<ConnectMetadata> connectMetadataList;
        List<LogicalClusterMetadata> logicalClusterMetadataList;
        List<ClusterLinkMetadata> clusterLinkMetadataList;
        List<TopicMetadata> topicMetadataList = events.get(MetadataEvent.MetadataCase.TOPIC_METADATA.getNumber()).stream().map(MetadataEvent::getTopicMetadata).collect(Collectors.toList());
        if (topicMetadataList.size() > 0) {
            this.createOrUpdateTopicMetadata(tenant, source, topicMetadataList, newTime);
        }
        if ((clusterLinkMetadataList = events.get(MetadataEvent.MetadataCase.CLUSTER_LINK_METADATA.getNumber()).stream().map(MetadataEvent::getClusterLinkMetadata).collect(Collectors.toList())).size() > 0) {
            this.createOrUpdateClusterLinkMetadata(tenant, source, clusterLinkMetadataList, newTime);
        }
        if ((logicalClusterMetadataList = events.get(MetadataEvent.MetadataCase.LOGICAL_CLUSTER_METADATA.getNumber()).stream().map(MetadataEvent::getLogicalClusterMetadata).collect(Collectors.toList())).size() > 0) {
            this.createOrUpdateLogicalClusterMetadata(tenant, logicalClusterMetadataList, newTime);
            if (this.isReconciliationCluster(headers)) {
                this.reconcileLogicalClusterMetadata(tenant, logicalClusterMetadataList, newTime, headers);
            }
        }
        if ((connectMetadataList = events.get(MetadataEvent.MetadataCase.CONNECT_METADATA.getNumber()).stream().map(MetadataEvent::getConnectMetadata).collect(Collectors.toList())).size() > 0) {
            this.createOrUpdateConnectMetadata(tenant, connectMetadataList, newTime);
        }
        if ((pipelineMetadataList = events.get(MetadataEvent.MetadataCase.PIPELINE_METADATA.getNumber()).stream().map(MetadataEvent::getPipelineMetadata).collect(Collectors.toList())).size() > 0) {
            this.createOrUpdatePipelineMetadata(tenant, pipelineMetadataList, newTime);
        }
        if ((environmentMetadataList = events.get(MetadataEvent.MetadataCase.ENVIRONMENT_METADATA.getNumber()).stream().map(MetadataEvent::getEnvironmentMetadata).collect(Collectors.toList())).size() > 0) {
            this.createOrUpdateEnvironmentMetadata(tenant, environmentMetadataList, newTime);
        }
        if ((topicMetadataList.size() > 0 || clusterLinkMetadataList.size() > 0) && this.isReconciliationTopic(headers)) {
            this.reconcileTopicMetadata(tenant, source, topicMetadataList, newTime, headers);
        }
    }

    private void reconcileTopicMetadata(String tenant, String logicalKafkaClusterId, List<TopicMetadata> topicMetadataList, Long newTime, Headers headers) {
        Header totalPageHeader = headers.lastHeader(TOTAL_PAGE_HEADER);
        Header pageNumberHeader = headers.lastHeader(PAGE_NUMBER_HEADER);
        Header isLastPageHeader = headers.lastHeader(LAST_PAGE_HEADER);
        if (isLastPageHeader == null && totalPageHeader == null || pageNumberHeader == null) {
            log.error(String.format("Reconcile topic metadata, headers missing, tenant %s", tenant));
            return;
        }
        int pageNumber = Integer.parseInt(new String(pageNumberHeader.value(), StandardCharsets.UTF_8));
        ReconciliationCache.ReconciliationCacheKey cacheKey = new ReconciliationCache.ReconciliationCacheKey(tenant, logicalKafkaClusterId, ModelConstants.ENTITY_KAFKA_TOPIC);
        HashSet<Integer> pageNumberSet = this.reconciliationCache.getOrDefault(cacheKey).getPageNumbers();
        if (!pageNumberSet.add(pageNumber)) {
            log.warn(String.format("Reconcile topic metadata, receive same page numbers %d from %s", pageNumber, cacheKey));
            this.reconciliationCache.invalidate(cacheKey);
            return;
        }
        HashSet<String> allTopicsInSnapshot = this.reconciliationCache.getOrDefault(cacheKey).getEntities();
        for (TopicMetadata topicMetadata : topicMetadataList) {
            allTopicsInSnapshot.add(topicMetadata.getTopicName());
        }
        boolean isLastPage = false;
        if (isLastPageHeader != null) {
            isLastPage = Boolean.parseBoolean(new String(isLastPageHeader.value(), StandardCharsets.UTF_8));
        } else {
            int totalPage = Integer.parseInt(new String(totalPageHeader.value(), StandardCharsets.UTF_8));
            boolean bl = isLastPage = totalPage - 1 == pageNumber;
        }
        if (isLastPage) {
            if (pageNumberSet.size() != pageNumber + 1) {
                log.warn(String.format("Reconcile topic metadata, do not receive all pagesfrom %s", cacheKey));
            } else {
                try {
                    this.metadataRegistry.reconcileEntities(ModelConstants.ENTITY_KAFKA_TOPIC, tenant, logicalKafkaClusterId, allTopicsInSnapshot);
                }
                catch (AtlasBaseException e) {
                    log.error("Reconcile topic metadata error", (Throwable)e);
                }
            }
            this.reconciliationCache.invalidate(cacheKey);
        }
    }

    private void reconcileLogicalClusterMetadata(String tenant, List<LogicalClusterMetadata> logicalClusterMetadataList, Long newTime, Headers headers) {
        Header totalPageHeader = headers.lastHeader(TOTAL_PAGE_HEADER);
        Header pageNumberHeader = headers.lastHeader(PAGE_NUMBER_HEADER);
        if (totalPageHeader == null || pageNumberHeader == null) {
            log.error(String.format("Reconcile cluster metadata, headers missing, tenant %s", tenant));
            return;
        }
        int pageNumber = Integer.parseInt(new String(pageNumberHeader.value(), StandardCharsets.UTF_8));
        ReconciliationCache.ReconciliationCacheKey cacheKey = new ReconciliationCache.ReconciliationCacheKey(tenant, "", ModelConstants.ENTITY_KAFKA_LOGICAL_CLUSTER);
        HashSet<Integer> pageNumberSet = this.reconciliationCache.getOrDefault(cacheKey).getPageNumbers();
        if (!pageNumberSet.add(pageNumber)) {
            log.warn(String.format("Reconcile cluster metadata, receive same page numbers %d from %s", pageNumber, cacheKey));
            this.reconciliationCache.invalidate(cacheKey);
            return;
        }
        HashSet<String> allClustersInSnapshot = this.reconciliationCache.getOrDefault(cacheKey).getEntities();
        for (LogicalClusterMetadata logicalClusterMetadata : logicalClusterMetadataList) {
            allClustersInSnapshot.add(logicalClusterMetadata.getClusterId());
        }
        int totalPage = Integer.parseInt(new String(totalPageHeader.value(), StandardCharsets.UTF_8));
        if (totalPage - 1 == pageNumber) {
            if (pageNumberSet.size() != pageNumber + 1) {
                log.warn(String.format("Reconcile cluster metadata, do not receive all pages from %s", cacheKey));
            } else {
                try {
                    this.metadataRegistry.reconcileEntities(ModelConstants.ENTITY_KAFKA_LOGICAL_CLUSTER, tenant, null, allClustersInSnapshot);
                }
                catch (AtlasBaseException e) {
                    log.error("Reconcile cluster metadata error", (Throwable)e);
                }
            }
            this.reconciliationCache.invalidate(cacheKey);
        }
    }

    private Long getTimestamp(CloudEvent cloudEvent) {
        OffsetDateTime time = cloudEvent.getTime();
        return time != null ? Long.valueOf(time.toInstant().toEpochMilli()) : null;
    }

    private void createOrUpdateMetadata(String tenant, String source, List<List<MetadataEvent>> events, Long newTime) {
        List<EnvironmentMetadata> environmentMetadataList;
        List<PipelineMetadata> pipelineMetadataList;
        List<ConnectMetadata> connectMetadataList;
        List<LogicalClusterMetadata> logicalClusterMetadataList;
        List<ClusterLinkMetadata> clusterLinkMetadataList;
        List<TopicMetadata> topicMetadataList = events.get(MetadataEvent.MetadataCase.TOPIC_METADATA.getNumber()).stream().map(MetadataEvent::getTopicMetadata).collect(Collectors.toList());
        if (topicMetadataList.size() > 0) {
            this.createOrUpdateTopicMetadata(tenant, source, topicMetadataList, newTime);
        }
        if ((clusterLinkMetadataList = events.get(MetadataEvent.MetadataCase.CLUSTER_LINK_METADATA.getNumber()).stream().map(MetadataEvent::getClusterLinkMetadata).collect(Collectors.toList())).size() > 0) {
            this.createOrUpdateClusterLinkMetadata(tenant, source, clusterLinkMetadataList, newTime);
        }
        if ((logicalClusterMetadataList = events.get(MetadataEvent.MetadataCase.LOGICAL_CLUSTER_METADATA.getNumber()).stream().map(MetadataEvent::getLogicalClusterMetadata).collect(Collectors.toList())).size() > 0) {
            this.createOrUpdateLogicalClusterMetadata(tenant, logicalClusterMetadataList, newTime);
        }
        if ((connectMetadataList = events.get(MetadataEvent.MetadataCase.CONNECT_METADATA.getNumber()).stream().map(MetadataEvent::getConnectMetadata).collect(Collectors.toList())).size() > 0) {
            this.createOrUpdateConnectMetadata(tenant, connectMetadataList, newTime);
        }
        if ((pipelineMetadataList = events.get(MetadataEvent.MetadataCase.PIPELINE_METADATA.getNumber()).stream().map(MetadataEvent::getPipelineMetadata).collect(Collectors.toList())).size() > 0) {
            this.createOrUpdatePipelineMetadata(tenant, pipelineMetadataList, newTime);
        }
        if ((environmentMetadataList = events.get(MetadataEvent.MetadataCase.ENVIRONMENT_METADATA.getNumber()).stream().map(MetadataEvent::getEnvironmentMetadata).collect(Collectors.toList())).size() > 0) {
            this.createOrUpdateEnvironmentMetadata(tenant, environmentMetadataList, newTime);
        }
    }

    private void deleteMetadata(String tenant, String source, List<List<MetadataEvent>> events, Long newTime) {
        List<EnvironmentMetadata> environmentMetadataList;
        List<PipelineMetadata> pipelineMetadataList;
        List<ConnectMetadata> connectMetadataList;
        List<LogicalClusterMetadata> logicalClusterMetadataList;
        List<ClusterLinkMetadata> clusterLinkMetadataList;
        List<TopicMetadata> topicMetadataList = events.get(MetadataEvent.MetadataCase.TOPIC_METADATA.getNumber()).stream().map(MetadataEvent::getTopicMetadata).collect(Collectors.toList());
        if (topicMetadataList.size() > 0) {
            this.deleteTopicMetadata(tenant, source, topicMetadataList, newTime);
        }
        if ((clusterLinkMetadataList = events.get(MetadataEvent.MetadataCase.CLUSTER_LINK_METADATA.getNumber()).stream().map(MetadataEvent::getClusterLinkMetadata).collect(Collectors.toList())).size() > 0) {
            this.deleteClusterLinkMetadata(tenant, source, clusterLinkMetadataList, newTime);
        }
        if ((logicalClusterMetadataList = events.get(MetadataEvent.MetadataCase.LOGICAL_CLUSTER_METADATA.getNumber()).stream().map(MetadataEvent::getLogicalClusterMetadata).collect(Collectors.toList())).size() > 0) {
            this.deleteLogicalClusterMetadata(tenant, logicalClusterMetadataList, newTime);
        }
        if ((connectMetadataList = events.get(MetadataEvent.MetadataCase.CONNECT_METADATA.getNumber()).stream().map(MetadataEvent::getConnectMetadata).collect(Collectors.toList())).size() > 0) {
            this.deleteConnectMetadata(tenant, connectMetadataList, newTime);
        }
        if ((pipelineMetadataList = events.get(MetadataEvent.MetadataCase.PIPELINE_METADATA.getNumber()).stream().map(MetadataEvent::getPipelineMetadata).collect(Collectors.toList())).size() > 0) {
            this.deletePipelineMetadata(tenant, pipelineMetadataList, newTime);
        }
        if ((environmentMetadataList = events.get(MetadataEvent.MetadataCase.ENVIRONMENT_METADATA.getNumber()).stream().map(MetadataEvent::getEnvironmentMetadata).collect(Collectors.toList())).size() > 0) {
            this.deleteEnvironmentMetadata(tenant, environmentMetadataList, newTime);
        }
    }

    private void createOrUpdateTopicMetadata(String tenant, String clusterId, List<TopicMetadata> topicMetadataList, Long newTime) {
        try {
            AtlasEntity.AtlasEntityWithExtInfo cluster = this.maybeCreateDummyKafkaCluster(tenant, clusterId);
            ArrayList<AtlasEntity.AtlasEntityWithExtInfo> topics = new ArrayList<AtlasEntity.AtlasEntityWithExtInfo>();
            for (TopicMetadata topicMetadata : topicMetadataList) {
                AtlasEntity.AtlasEntityWithExtInfo topic;
                String topicQualifiedName = QualifiedNameGenerator.getQualifiedName(tenant, clusterId, topicMetadata.getTopicName());
                Long oldTime = (Long)this.entityTimeCache.getIfPresent((Object)topicQualifiedName);
                if (oldTime != null && newTime != null && newTime < oldTime) {
                    if (topicMetadata.getCreateTime().getSeconds() != 0L) {
                        topic = this.newTopicWithCreateTime(tenant, topicMetadata.getTopicName(), topicQualifiedName, new Date(Timestamps.toMillis((Timestamp)topicMetadata.getCreateTime())));
                        topics.add(topic);
                        log.info("Update the topic createTime from out-of-order create event {}", (Object)topicQualifiedName);
                    }
                    Instant instant = Instant.ofEpochMilli(newTime);
                    log.warn("Discarding event with time {} qualifiedName {}", (Object)instant, (Object)topicQualifiedName);
                    continue;
                }
                topic = this.newTopic(tenant, clusterId, topicMetadata.getTopicName(), topicQualifiedName, topicMetadata, cluster.getEntity());
                topics.add(topic);
                if (newTime == null) continue;
                this.entityTimeCache.put((Object)topicQualifiedName, (Object)newTime);
            }
            this.metadataRegistry.createOrUpdateEntities(tenant, topics, this::newEntityHasChanges);
            this.metricsManager.recordIngestionTransform(1);
            this.metricsManager.recordIngestionTopicCreation(topics.size());
        }
        catch (AtlasBaseException e) {
            this.metricsManager.recordIngestionTransformError(1);
            this.metricsManager.recordIngestionTopicCreationError(1);
            log.error(String.format("Could not process topic creation metadata, cluster id %s", clusterId), (Throwable)e);
        }
    }

    private AtlasEntity.AtlasEntityWithExtInfo maybeCreateDummyTopic(String tenant, String kafkaClusterId, String topicName, AtlasEntity.AtlasEntityWithExtInfo cluster) {
        String topicQualifiedName = QualifiedNameGenerator.getQualifiedName(tenant, kafkaClusterId, topicName);
        AtlasEntity.AtlasEntityWithExtInfo topicEntity = this.newTopic(tenant, kafkaClusterId, topicName, topicQualifiedName, null, cluster.getEntity());
        String oldTopicGuid = null;
        try {
            oldTopicGuid = this.metadataRegistry.getGuid(tenant, ModelConstants.ENTITY_KAFKA_TOPIC, topicQualifiedName);
        }
        catch (AtlasBaseException atlasBaseException) {
            // empty catch block
        }
        if (oldTopicGuid == null) {
            try {
                this.metadataRegistry.createOrUpdateEntity(tenant, topicEntity);
            }
            catch (AtlasBaseException e) {
                log.error(String.format("Could not create topic, cluster id %s", kafkaClusterId), (Throwable)e);
            }
        }
        return topicEntity;
    }

    private AtlasEntity.AtlasEntityWithExtInfo newTopicWithCreateTime(String tenant, String name, String qualifiedName, Date createTime) {
        AtlasEntity entity = new AtlasEntity(ModelConstants.ENTITY_KAFKA_TOPIC);
        entity.setAttribute("tenant", (Object)tenant);
        entity.setAttribute("qualifiedName", (Object)qualifiedName);
        entity.setAttribute("name", (Object)name);
        entity.setAttribute("nameLower", (Object)name.toLowerCase(Locale.ROOT));
        entity.setAttribute("createTime", (Object)createTime);
        return new AtlasEntity.AtlasEntityWithExtInfo(entity);
    }

    private AtlasEntity.AtlasEntityWithExtInfo newTopic(String tenant, String kafkaClusterId, String topic, String qualifiedName, TopicMetadata topicMetadata, AtlasEntity cluster) {
        AtlasEntity entity = new AtlasEntity(ModelConstants.ENTITY_KAFKA_TOPIC);
        entity.setAttribute("tenant", (Object)tenant);
        entity.setAttribute("qualifiedName", (Object)qualifiedName);
        entity.setAttribute("name", (Object)topic);
        entity.setAttribute("nameLower", (Object)topic.toLowerCase(Locale.ROOT));
        entity.setAttribute("createTime", (Object)ModelConstants.UNIX_ZERO_EPOCH);
        entity.setAttribute("isInternal", (Object)topic.startsWith("_"));
        if (topicMetadata != null) {
            if (!topicMetadata.getOwner().isEmpty()) {
                entity.setAttribute("owner", (Object)topicMetadata.getOwner());
            }
            if (!topicMetadata.getTopicId().isEmpty()) {
                entity.setAttribute("id", (Object)topicMetadata.getTopicId());
            }
            if (!topicMetadata.getFollowerReplicationThrottledReplicasList().isEmpty()) {
                entity.setAttribute("followerReplicationThrottledReplicas", new ArrayList(topicMetadata.getFollowerReplicationThrottledReplicasList()));
            }
            if (!topicMetadata.getLeaderReplicationThrottledReplicasList().isEmpty()) {
                entity.setAttribute("leaderReplicationThrottledReplicas", new ArrayList(topicMetadata.getLeaderReplicationThrottledReplicasList()));
            }
            if (!topicMetadata.getMessageFormatVersion().isEmpty()) {
                entity.setAttribute("messageFormatVersion", (Object)topicMetadata.getMessageFormatVersion());
            }
            if (!topicMetadata.getMessageTimestampType().isEmpty()) {
                entity.setAttribute("messageTimestampType", (Object)topicMetadata.getMessageTimestampType());
            }
            if (topicMetadata.getCreateTime().getSeconds() != 0L) {
                entity.setAttribute("createTime", (Object)new Date(Timestamps.toMillis((Timestamp)topicMetadata.getCreateTime())));
            }
            if (topicMetadata.getUpdateTime().getSeconds() != 0L) {
                entity.setAttribute("updateTime", (Object)new Date(Timestamps.toMillis((Timestamp)topicMetadata.getUpdateTime())));
            }
            entity.setAttribute("retentionMs", (Object)topicMetadata.getRetentionMs());
            entity.setAttribute("retentionBytes", (Object)topicMetadata.getRetentionBytes());
            entity.setAttribute("replicationFactor", (Object)topicMetadata.getReplicationFactor());
            entity.setAttribute("partitionsCount", (Object)topicMetadata.getPartitionsCount());
            entity.setAttribute("cleanupPolicy", (Object)this.getCleanupPolicy(topicMetadata.getCleanupPolicy()));
            entity.setAttribute("keySchemaValidation", (Object)topicMetadata.getKeySchemaValidation());
            entity.setAttribute("valueSchemaValidation", (Object)topicMetadata.getValueSchemaValidation());
            entity.setAttribute("compressionType", (Object)this.getCompressionType(topicMetadata.getCompressionType()));
            entity.setAttribute("fileDeleteDelayMs", (Object)topicMetadata.getFileDeleteDelayMs());
            entity.setAttribute("flushMessages", (Object)topicMetadata.getFlushMessages());
            entity.setAttribute("flushMs", (Object)topicMetadata.getFlushMs());
            entity.setAttribute("indexIntervalBytes", (Object)topicMetadata.getIndexIntervalBytes());
            entity.setAttribute("maxCompactionLagMs", (Object)topicMetadata.getMaxCompactionLagMs());
            entity.setAttribute("maxMessageBytes", (Object)topicMetadata.getMaxMessageBytes());
            entity.setAttribute("messageDownconversionEnable", (Object)topicMetadata.getMessageDownconversionEnable());
            entity.setAttribute("messageTimestampDifferenceMaxMs", (Object)topicMetadata.getMessageTimestampDifferenceMaxMs());
            entity.setAttribute("minCleanableDirtyRatio", (Object)topicMetadata.getMinCleanableDirtyRatio());
            entity.setAttribute("minCompactionLagMs", (Object)topicMetadata.getMinCompactionLagMs());
            entity.setAttribute("minInsyncReplicas", (Object)topicMetadata.getMinInsyncReplicas());
            entity.setAttribute("preallocate", (Object)topicMetadata.getPreallocate());
            entity.setAttribute("segmentBytes", (Object)topicMetadata.getSegmentBytes());
            entity.setAttribute("segmentIndexBytes", (Object)topicMetadata.getSegmentIndexBytes());
            entity.setAttribute("segmentJitterMs", (Object)topicMetadata.getSegmentJitterMs());
            entity.setAttribute("segmentMs", (Object)topicMetadata.getSegmentMs());
            entity.setAttribute("uncleanLeaderElectionEnable", (Object)topicMetadata.getUncleanLeaderElectionEnable());
            entity.setAttribute("deleteRetentionMs", (Object)topicMetadata.getDeleteRetentionMs());
            if (!topicMetadata.getMirrorTopicMetadata().getLinkId().isEmpty()) {
                String clusterLinkId = topicMetadata.getMirrorTopicMetadata().getLinkId();
                String clusterLinkName = topicMetadata.getMirrorTopicMetadata().getLinkName();
                AtlasEntity.AtlasEntityWithExtInfo clusterLinkEntity = this.maybeCreateDummyClusterLink(tenant, kafkaClusterId, clusterLinkId, clusterLinkName);
                entity.setRelationshipAttribute("cluster_link", (Object)MetadataRegistry.toRelatedObjectIdWithUniqAttr(clusterLinkEntity.getEntity()));
            }
            String sourceTopicName = topicMetadata.getMirrorTopicMetadata().getSourceTopicName();
            if (!topicMetadata.getMirrorTopicMetadata().getRemoteClusterId().isEmpty()) {
                String remoteKafkaClusterId = topicMetadata.getMirrorTopicMetadata().getRemoteClusterId();
                String remoteKafkaClusterQualifiedName = QualifiedNameGenerator.getQualifiedName(tenant, remoteKafkaClusterId);
                AtlasEntity.AtlasEntityWithExtInfo oldRemoteClusterEntity = null;
                try {
                    oldRemoteClusterEntity = this.metadataRegistry.getEntity(tenant, ModelConstants.ENTITY_KAFKA_LOGICAL_CLUSTER, remoteKafkaClusterQualifiedName, true, true);
                }
                catch (AtlasBaseException atlasBaseException) {
                    // empty catch block
                }
                if (oldRemoteClusterEntity != null) {
                    if (!sourceTopicName.isEmpty()) {
                        AtlasEntity.AtlasEntityWithExtInfo sourceTopicEntity = this.maybeCreateDummyTopic(tenant, remoteKafkaClusterId, sourceTopicName, oldRemoteClusterEntity);
                        entity.setRelationshipAttribute("source_topic", (Object)MetadataRegistry.toRelatedObjectIdWithUniqAttr(sourceTopicEntity.getEntity()));
                    }
                } else {
                    if (!sourceTopicName.isEmpty()) {
                        entity.setAttribute("externalSourceTopicName", (Object)sourceTopicName);
                    }
                    if (!topicMetadata.getMirrorTopicMetadata().getSourceTopicId().isEmpty()) {
                        entity.setAttribute("externalSourceTopicId", (Object)topicMetadata.getMirrorTopicMetadata().getSourceTopicId());
                    }
                }
            } else {
                if (!sourceTopicName.isEmpty()) {
                    entity.setAttribute("externalSourceTopicName", (Object)sourceTopicName);
                }
                if (!topicMetadata.getMirrorTopicMetadata().getSourceTopicId().isEmpty()) {
                    entity.setAttribute("externalSourceTopicId", (Object)topicMetadata.getMirrorTopicMetadata().getSourceTopicId());
                }
            }
            if (!topicMetadata.getMirrorTopicMetadata().getMirrorTopicState().isEmpty()) {
                entity.setAttribute("mirrorTopicState", (Object)topicMetadata.getMirrorTopicMetadata().getMirrorTopicState());
            }
            if (topicMetadata.getMirrorTopicMetadata().getUpdateTime().getSeconds() != 0L) {
                entity.setAttribute("mirrorTopicUpdateTime", (Object)new Date(Timestamps.toMillis((Timestamp)topicMetadata.getMirrorTopicMetadata().getUpdateTime())));
            }
        }
        if (cluster != null) {
            Date clusterDeprecatedTime = MetadataRegistry.getDateAttribute(cluster.getAttribute("deprecatedTime"));
            if (clusterDeprecatedTime != null && !clusterDeprecatedTime.equals(ModelConstants.UNIX_ZERO_EPOCH)) {
                entity.setAttribute("deprecatedTime", (Object)clusterDeprecatedTime);
            }
            entity.setRelationshipAttribute("logical_cluster", (Object)MetadataRegistry.toRelatedObjectIdWithUniqAttr(cluster));
        }
        return new AtlasEntity.AtlasEntityWithExtInfo(entity);
    }

    private boolean newEntityHasChanges(AtlasEntity.AtlasEntityWithExtInfo oldEntity, AtlasEntity.AtlasEntityWithExtInfo newEntity) {
        boolean isNewNull;
        boolean isOldNull = oldEntity == null || oldEntity.getEntity() == null;
        boolean bl = isNewNull = newEntity == null || newEntity.getEntity() == null;
        if (isNewNull) {
            return false;
        }
        if (isOldNull) {
            return true;
        }
        Map oldAttrs = oldEntity.getEntity().getAttributes();
        Map newAttrs = newEntity.getEntity().getAttributes();
        for (Map.Entry entry : oldAttrs.entrySet()) {
            String attrName = (String)entry.getKey();
            Object oldAttrValue = entry.getValue();
            Object newAttrValue = newAttrs.get(attrName);
            if (newAttrValue == null || newAttrValue.equals(oldAttrValue)) continue;
            return true;
        }
        return false;
    }

    private void deleteTopicMetadata(String tenant, String clusterId, List<TopicMetadata> topicMetadataList, Long newTime) {
        for (TopicMetadata topicMetadata : topicMetadataList) {
            String topicQualifiedName = QualifiedNameGenerator.getQualifiedName(tenant, clusterId, topicMetadata.getTopicName());
            Long oldTime = (Long)this.entityTimeCache.getIfPresent((Object)topicQualifiedName);
            if (oldTime != null && newTime != null && newTime < oldTime) {
                Instant instant = Instant.ofEpochMilli(newTime);
                log.warn("Discarding event with time {}", (Object)instant);
                continue;
            }
            this.deleteTopicMetadata(tenant, clusterId, topicMetadata);
            if (newTime == null) continue;
            this.entityTimeCache.put((Object)topicQualifiedName, (Object)newTime);
        }
    }

    private void deleteTopicMetadata(String tenant, String clusterId, TopicMetadata topicMetadata) {
        try {
            String topicQualifiedName = QualifiedNameGenerator.getQualifiedName(tenant, clusterId, topicMetadata.getTopicName());
            this.metadataRegistry.deleteEntity(tenant, ModelConstants.ENTITY_KAFKA_TOPIC, topicQualifiedName);
            this.metadataRegistry.purgeEntity(tenant, ModelConstants.ENTITY_KAFKA_TOPIC, topicQualifiedName);
            this.metricsManager.recordIngestionTransform(1);
            this.metricsManager.recordIngestionTopicDeletion(1);
        }
        catch (AtlasBaseException e) {
            this.metricsManager.recordIngestionTransformError(1);
            this.metricsManager.recordIngestionTopicDeletionError(1);
            log.error(String.format("Could not process topic deletion metadata, cluster id %s", clusterId), (Throwable)e);
        }
    }

    private void createOrUpdateClusterLinkMetadata(String tenant, String source, List<ClusterLinkMetadata> clusterLinkMetadatas, Long newTime) {
        try {
            for (ClusterLinkMetadata clusterLinkMetadata : clusterLinkMetadatas) {
                String clusterLinkQualifiedName = QualifiedNameGenerator.getQualifiedName(tenant, source, clusterLinkMetadata.getClusterLinkId());
                Long oldTime = (Long)this.entityTimeCache.getIfPresent((Object)clusterLinkQualifiedName);
                if (oldTime != null && newTime != null && newTime < oldTime) {
                    Instant instant = Instant.ofEpochMilli(newTime);
                    log.warn("Discarding cluster link creation event with time {}", (Object)instant);
                    continue;
                }
                String localKafkaClusterId = clusterLinkMetadata.getLocalClusterId();
                AtlasEntity.AtlasEntityWithExtInfo localClusterEntity = this.maybeCreateDummyKafkaCluster(tenant, localKafkaClusterId);
                String remoteKafkaClusterId = clusterLinkMetadata.getRemoteClusterId();
                String remoteKafkaClusterQualifiedName = QualifiedNameGenerator.getQualifiedName(tenant, remoteKafkaClusterId);
                AtlasEntity.AtlasEntityWithExtInfo oldRemoteClusterEntity = null;
                try {
                    oldRemoteClusterEntity = this.metadataRegistry.getEntity(tenant, ModelConstants.ENTITY_KAFKA_LOGICAL_CLUSTER, remoteKafkaClusterQualifiedName, true, true);
                }
                catch (AtlasBaseException atlasBaseException) {
                    // empty catch block
                }
                AtlasEntity.AtlasEntityWithExtInfo clusterLink = this.newClusterLink(tenant, clusterLinkMetadata.getClusterLinkId(), clusterLinkMetadata.getClusterLinkName(), clusterLinkQualifiedName, clusterLinkMetadata, localClusterEntity, oldRemoteClusterEntity);
                if (newTime != null) {
                    this.entityTimeCache.put((Object)clusterLinkQualifiedName, (Object)newTime);
                }
                this.metadataRegistry.createOrUpdateEntity(tenant, clusterLink);
                this.metricsManager.recordIngestionClusterLinkCreation(1);
            }
            this.metricsManager.recordIngestionTransform(1);
        }
        catch (AtlasBaseException e) {
            this.metricsManager.recordIngestionTransformError(1);
            this.metricsManager.recordIngestionClusterLinkCreationError(1);
            log.error("Could not process cluster link creation metadata", (Throwable)e);
        }
    }

    private AtlasEntity.AtlasEntityWithExtInfo maybeCreateDummyClusterLink(String tenant, String kafkaClusterId, String clusterLinkId, String clusterLinkName) {
        String clusterLinkQualifiedName = QualifiedNameGenerator.getQualifiedName(tenant, kafkaClusterId, clusterLinkId);
        AtlasEntity.AtlasEntityWithExtInfo clusterLinkEntity = this.newClusterLink(tenant, clusterLinkId, clusterLinkName, clusterLinkQualifiedName, null, null, null);
        String oldClusterLinkGuid = null;
        try {
            oldClusterLinkGuid = this.metadataRegistry.getGuid(tenant, ModelConstants.ENTITY_KAFKA_CLUSTER_LINK, clusterLinkQualifiedName);
        }
        catch (AtlasBaseException atlasBaseException) {
            // empty catch block
        }
        if (oldClusterLinkGuid == null) {
            try {
                this.metadataRegistry.createOrUpdateEntity(tenant, clusterLinkEntity);
            }
            catch (AtlasBaseException e) {
                log.warn("Could not create cluster link {}", (Object)clusterLinkEntity);
            }
        }
        return clusterLinkEntity;
    }

    private AtlasEntity.AtlasEntityWithExtInfo newClusterLink(String tenant, String clusterLinkId, String clusterLinkName, String qualifiedName, ClusterLinkMetadata clusterLinkMetadata, AtlasEntity.AtlasEntityWithExtInfo localCluster, AtlasEntity.AtlasEntityWithExtInfo remoteCluster) {
        AtlasEntity entity = new AtlasEntity(ModelConstants.ENTITY_KAFKA_CLUSTER_LINK);
        entity.setAttribute("tenant", (Object)tenant);
        entity.setAttribute("qualifiedName", (Object)qualifiedName);
        entity.setAttribute("name", (Object)clusterLinkName);
        entity.setAttribute("nameLower", (Object)clusterLinkName.toLowerCase(Locale.ROOT));
        entity.setAttribute("createTime", (Object)ModelConstants.UNIX_ZERO_EPOCH);
        if (clusterLinkMetadata != null) {
            if (!clusterLinkMetadata.getClusterLinkId().isEmpty()) {
                entity.setAttribute("id", (Object)clusterLinkMetadata.getClusterLinkId());
            }
            if (!clusterLinkMetadata.getLinkMode().isEmpty()) {
                entity.setAttribute("linkMode", (Object)clusterLinkMetadata.getLinkMode());
            }
            if (!clusterLinkMetadata.getConnectionMode().isEmpty()) {
                entity.setAttribute("connectionMode", (Object)clusterLinkMetadata.getConnectionMode());
            }
            if (clusterLinkMetadata.getCreateTime().getSeconds() != 0L) {
                entity.setAttribute("createTime", (Object)new Date(Timestamps.toMillis((Timestamp)clusterLinkMetadata.getCreateTime())));
            }
            if (clusterLinkMetadata.getUpdateTime().getSeconds() != 0L) {
                entity.setAttribute("updateTime", (Object)new Date(Timestamps.toMillis((Timestamp)clusterLinkMetadata.getUpdateTime())));
            }
            if (localCluster != null) {
                entity.setRelationshipAttribute("destination_cluster", (Object)MetadataRegistry.toRelatedObjectIdWithUniqAttr(localCluster.getEntity()));
            } else {
                log.warn("Local kafka cluster entity doesn't exist when creating cluster link {}", (Object)qualifiedName);
            }
            if (remoteCluster != null) {
                entity.setRelationshipAttribute("source_cluster", (Object)MetadataRegistry.toRelatedObjectIdWithUniqAttr(remoteCluster.getEntity()));
            } else {
                entity.setAttribute("externalSourceCluster", (Object)clusterLinkMetadata.getRemoteClusterId());
            }
        } else if (clusterLinkId != null) {
            entity.setAttribute("id", (Object)clusterLinkId);
        }
        return new AtlasEntity.AtlasEntityWithExtInfo(entity);
    }

    private void deleteClusterLinkMetadata(String tenant, String kafkaClusterId, List<ClusterLinkMetadata> clusterLinkMetadatas, Long newTime) {
        for (ClusterLinkMetadata clusterLinkMetadata : clusterLinkMetadatas) {
            String clusterLinkQualifiedName = QualifiedNameGenerator.getQualifiedName(tenant, kafkaClusterId, clusterLinkMetadata.getClusterLinkId());
            Long oldTime = (Long)this.entityTimeCache.getIfPresent((Object)clusterLinkQualifiedName);
            if (oldTime != null && newTime != null && newTime < oldTime) {
                Instant instant = Instant.ofEpochMilli(newTime);
                log.warn("Discarding cluster link deletion event with time {}", (Object)instant);
                continue;
            }
            this.deleteClusterLinkMetadata(tenant, kafkaClusterId, clusterLinkMetadata);
            if (newTime == null) continue;
            this.entityTimeCache.put((Object)clusterLinkQualifiedName, (Object)newTime);
        }
    }

    private void deleteClusterLinkMetadata(String tenant, String kafkaClusterId, ClusterLinkMetadata clusterLinkMetadata) {
        try {
            String clusterLinkQualifiedName = QualifiedNameGenerator.getQualifiedName(tenant, kafkaClusterId, clusterLinkMetadata.getClusterLinkId());
            this.metadataRegistry.deleteEntity(tenant, ModelConstants.ENTITY_KAFKA_CLUSTER_LINK, clusterLinkQualifiedName);
            this.metadataRegistry.purgeEntity(tenant, ModelConstants.ENTITY_KAFKA_CLUSTER_LINK, clusterLinkQualifiedName);
            this.metricsManager.recordIngestionTransform(1);
            this.metricsManager.recordIngestionClusterLinkDeletion(1);
        }
        catch (AtlasBaseException e) {
            this.metricsManager.recordIngestionTransformError(1);
            this.metricsManager.recordIngestionClusterLinkDeletionError(1);
            log.error("Could not process cluster link deletion metadata" + clusterLinkMetadata, (Throwable)e);
        }
    }

    private void createOrUpdateLogicalClusterMetadata(String tenant, List<LogicalClusterMetadata> logicalClusterMetadataList, Long newTime) {
        for (LogicalClusterMetadata logicalClusterMetadata : logicalClusterMetadataList) {
            String logicalClusterQualifiedName = QualifiedNameGenerator.getQualifiedName(tenant, logicalClusterMetadata.getClusterId());
            Long oldTime = (Long)this.entityTimeCache.getIfPresent((Object)logicalClusterQualifiedName);
            if (oldTime != null && newTime != null && newTime < oldTime) {
                Instant instant = Instant.ofEpochMilli(newTime);
                log.warn("Discarding logical cluster creation event with time {}", (Object)instant);
                continue;
            }
            try {
                AtlasEntity.AtlasEntityWithExtInfo logicalCluster = this.newKafkaCluster(tenant, logicalClusterMetadata.getClusterId(), logicalClusterMetadata.getName(), logicalClusterQualifiedName, logicalClusterMetadata);
                this.metadataRegistry.createOrUpdateEntity(tenant, logicalCluster);
                this.metricsManager.recordIngestionClusterCreation(1);
            }
            catch (AtlasBaseException e) {
                this.metricsManager.recordIngestionTransformError(1);
                this.metricsManager.recordIngestionClusterCreationError(1);
                log.error(String.format("Could not process logical cluster creation event %s", logicalClusterMetadata), (Throwable)e);
            }
            if (newTime == null) continue;
            this.entityTimeCache.put((Object)logicalClusterQualifiedName, (Object)newTime);
        }
        this.metricsManager.recordIngestionTransform(1);
    }

    private AtlasEntity.AtlasEntityWithExtInfo maybeCreateDummyKafkaCluster(String tenant, String kafkaClusterId) {
        String clusterQualifiedName = QualifiedNameGenerator.getQualifiedName(tenant, kafkaClusterId);
        AtlasEntity.AtlasEntityWithExtInfo cluster = null;
        try {
            cluster = this.metadataRegistry.getEntity(tenant, ModelConstants.ENTITY_KAFKA_LOGICAL_CLUSTER, clusterQualifiedName, true, true);
        }
        catch (AtlasBaseException atlasBaseException) {
            // empty catch block
        }
        if (cluster == null) {
            try {
                cluster = this.newKafkaCluster(tenant, kafkaClusterId, "", clusterQualifiedName, null);
                this.metadataRegistry.createOrUpdateEntity(tenant, cluster);
            }
            catch (AtlasBaseException e) {
                log.error(String.format("Could not create kafka logical cluster %s, qualifiedName %s", kafkaClusterId, clusterQualifiedName), (Throwable)e);
            }
        }
        return cluster;
    }

    private AtlasEntity.AtlasEntityWithExtInfo newKafkaCluster(String tenant, String clusterId, String clusterName, String qualifiedName, LogicalClusterMetadata logicalClusterMetadata) throws AtlasBaseException {
        AtlasEntity entity = new AtlasEntity(ModelConstants.ENTITY_KAFKA_LOGICAL_CLUSTER);
        entity.setAttribute("tenant", (Object)tenant);
        entity.setAttribute("qualifiedName", (Object)qualifiedName);
        entity.setAttribute("name", (Object)clusterName);
        entity.setAttribute("nameLower", (Object)clusterName.toLowerCase(Locale.ROOT));
        entity.setAttribute("createTime", (Object)ModelConstants.UNIX_ZERO_EPOCH);
        entity.setAttribute("deprecatedTime", (Object)ModelConstants.UNIX_ZERO_EPOCH);
        if (logicalClusterMetadata != null) {
            if (logicalClusterMetadata.getClusterId().isEmpty()) {
                throw new AtlasBaseException(AtlasErrorCode.MISSING_MANDATORY_ATTRIBUTE, new String[]{ModelConstants.ENTITY_KAFKA_LOGICAL_CLUSTER, "clusterId"});
            }
            entity.setAttribute("id", (Object)logicalClusterMetadata.getClusterId());
            entity.setAttribute("status", (Object)logicalClusterMetadata.getClusterStatus().name());
            entity.setAttribute("sku", (Object)logicalClusterMetadata.getSku().name());
            entity.setAttribute("provider", (Object)logicalClusterMetadata.getCloud().name());
            if (!logicalClusterMetadata.getRegion().isEmpty()) {
                entity.setAttribute("region", (Object)logicalClusterMetadata.getRegion());
            }
            entity.setAttribute("availability", (Object)logicalClusterMetadata.getAvailability().name());
            entity.setAttribute("cku", (Object)logicalClusterMetadata.getCku());
            if (!logicalClusterMetadata.getSelectedNetworkType().isEmpty()) {
                entity.setAttribute("network", (Object)logicalClusterMetadata.getSelectedNetworkType());
            }
            if (logicalClusterMetadata.getCreated().getSeconds() != 0L) {
                entity.setAttribute("createTime", (Object)new Date(Timestamps.toMillis((Timestamp)logicalClusterMetadata.getCreated())));
            }
            if (logicalClusterMetadata.getModified().getSeconds() != 0L) {
                entity.setAttribute("updateTime", (Object)new Date(Timestamps.toMillis((Timestamp)logicalClusterMetadata.getModified())));
            }
            if (logicalClusterMetadata.getDeactivated().getSeconds() != 0L) {
                entity.setAttribute("deactivateTime", (Object)new Date(Timestamps.toMillis((Timestamp)logicalClusterMetadata.getDeactivated())));
                entity.setAttribute("deprecatedTime", (Object)new Date(Timestamps.toMillis((Timestamp)logicalClusterMetadata.getDeactivated())));
            }
        } else if (clusterId != null && !clusterId.isEmpty()) {
            entity.setAttribute("id", (Object)clusterId);
        } else {
            throw new AtlasBaseException(AtlasErrorCode.MISSING_MANDATORY_ATTRIBUTE, new String[]{ModelConstants.ENTITY_KAFKA_LOGICAL_CLUSTER, "clusterId"});
        }
        return new AtlasEntity.AtlasEntityWithExtInfo(entity);
    }

    private void deleteLogicalClusterMetadata(String tenant, List<LogicalClusterMetadata> logicalClusterMetadataList, Long newTime) {
        for (LogicalClusterMetadata logicalClusterMetadata : logicalClusterMetadataList) {
            String logicalClusterQualifiedName = QualifiedNameGenerator.getQualifiedName(tenant, logicalClusterMetadata.getClusterId());
            Long oldTime = (Long)this.entityTimeCache.getIfPresent((Object)logicalClusterQualifiedName);
            if (oldTime != null && newTime != null && newTime < oldTime) {
                Instant instant = Instant.ofEpochMilli(newTime);
                log.warn("Discarding event with time {}", (Object)instant);
                continue;
            }
            this.deprecateKafkaClusterMetadata(tenant, logicalClusterMetadata);
            if (newTime == null) continue;
            this.entityTimeCache.put((Object)logicalClusterQualifiedName, (Object)newTime);
        }
    }

    private void deprecateKafkaClusterMetadata(String tenant, LogicalClusterMetadata logicalClusterMetadata) {
        String logicalClusterQualifiedName = QualifiedNameGenerator.getQualifiedName(tenant, logicalClusterMetadata.getClusterId());
        log.info(String.format("Deprecate logical kafka cluster %s", logicalClusterQualifiedName));
        try {
            if (logicalClusterMetadata.getDeactivated().getSeconds() == 0L) {
                throw new AtlasBaseException(String.format("The deactivate time is empty for logical cluster delete event, cluster id %s", logicalClusterMetadata.getClusterId()));
            }
            AtlasEntity.AtlasEntityWithExtInfo logicalCluster = this.newKafkaCluster(tenant, logicalClusterMetadata.getClusterId(), logicalClusterMetadata.getName(), logicalClusterQualifiedName, logicalClusterMetadata);
            this.metadataRegistry.createOrUpdateEntity(tenant, logicalCluster);
            this.metricsManager.recordIngestionTransform(1);
            this.metricsManager.recordIngestionClusterDeletion(1);
        }
        catch (AtlasBaseException e) {
            log.error(String.format("Error when deprecating kafka cluster metadata in catalog with logicalClusterQualifiedName %s", logicalClusterQualifiedName), (Throwable)e);
            this.metricsManager.recordIngestionTransformError(1);
            this.metricsManager.recordIngestionClusterDeletionError(1);
        }
    }

    private void createOrUpdateConnectMetadata(String tenant, List<ConnectMetadata> connectMetadatas, Long newTime) {
        try {
            for (ConnectMetadata connectMetadata : connectMetadatas) {
                String connectorQualifiedName = QualifiedNameGenerator.getQualifiedName(tenant, connectMetadata.getClusterId());
                Long oldTime = (Long)this.entityTimeCache.getIfPresent((Object)connectorQualifiedName);
                if (oldTime != null && newTime != null && newTime < oldTime) {
                    Instant instant = Instant.ofEpochMilli(newTime);
                    log.warn("Discarding event with time {}", (Object)instant);
                    continue;
                }
                String kafkaClusterId = connectMetadata.getKafkaClusterId();
                AtlasEntity.AtlasEntityWithExtInfo kafkaCluster = this.maybeCreateDummyKafkaCluster(tenant, kafkaClusterId);
                ArrayList<AtlasEntity> topics = new ArrayList<AtlasEntity>();
                for (String topic : connectMetadata.getTopicsList()) {
                    AtlasEntity.AtlasEntityWithExtInfo topicEntity = this.maybeCreateDummyTopic(tenant, kafkaClusterId, topic, kafkaCluster);
                    topics.add(topicEntity.getEntity());
                }
                AtlasEntity.AtlasEntityWithExtInfo connector = this.newConnector(tenant, connectMetadata.getClusterId(), connectMetadata.getName(), connectorQualifiedName, connectMetadata, topics);
                if (newTime != null) {
                    this.entityTimeCache.put((Object)connectorQualifiedName, (Object)newTime);
                }
                this.metadataRegistry.createOrUpdateEntity(tenant, connector);
                this.metricsManager.recordIngestionConnectorCreation(1);
            }
            this.metricsManager.recordIngestionTransform(1);
        }
        catch (AtlasBaseException e) {
            this.metricsManager.recordIngestionTransformError(1);
            this.metricsManager.recordIngestionConnectorCreationError(1);
            log.error("Could not process connector creation metadata", (Throwable)e);
        }
    }

    private AtlasEntity.AtlasEntityWithExtInfo maybeCreateDummyConnector(String tenant, String connectorClusterId) {
        String connectorQualifiedName = QualifiedNameGenerator.getQualifiedName(tenant, connectorClusterId);
        AtlasEntity.AtlasEntityWithExtInfo connectorEntity = this.newConnector(tenant, connectorClusterId, "", connectorQualifiedName, null, null);
        String oldConnectorGuid = null;
        try {
            oldConnectorGuid = this.metadataRegistry.getGuid(tenant, ModelConstants.ENTITY_CN_CONNECTOR, connectorQualifiedName);
        }
        catch (AtlasBaseException atlasBaseException) {
            // empty catch block
        }
        if (oldConnectorGuid == null) {
            try {
                this.metadataRegistry.createOrUpdateEntity(tenant, connectorEntity);
            }
            catch (AtlasBaseException e) {
                log.warn("Could not create connector {}", (Object)connectorQualifiedName);
            }
        }
        return connectorEntity;
    }

    private AtlasEntity.AtlasEntityWithExtInfo newConnector(String tenant, String connectorClusterId, String connectorName, String qualifiedName, ConnectMetadata connectMetadata, List<AtlasEntity> topics) {
        AtlasEntity entity = new AtlasEntity(ModelConstants.ENTITY_CN_CONNECTOR);
        entity.setAttribute("tenant", (Object)tenant);
        entity.setAttribute("qualifiedName", (Object)qualifiedName);
        entity.setAttribute("name", (Object)connectorName);
        entity.setAttribute("nameLower", (Object)connectorName.toLowerCase(Locale.ROOT));
        entity.setAttribute("createTime", (Object)ModelConstants.UNIX_ZERO_EPOCH);
        if (connectMetadata != null) {
            if (!connectMetadata.getClusterId().isEmpty()) {
                entity.setAttribute("clusterId", (Object)connectMetadata.getClusterId());
            }
            if (!connectMetadata.getClass_().isEmpty()) {
                entity.setAttribute("class", (Object)connectMetadata.getClass_());
            }
            if (!connectMetadata.getOwner().isEmpty()) {
                entity.setAttribute("owner", (Object)connectMetadata.getOwner());
            }
            if (connectMetadata.getCreateTime().getSeconds() != 0L) {
                entity.setAttribute("createTime", (Object)new Date(Timestamps.toMillis((Timestamp)connectMetadata.getCreateTime())));
            }
            if (connectMetadata.getUpdateTime().getSeconds() != 0L) {
                entity.setAttribute("updateTime", (Object)new Date(Timestamps.toMillis((Timestamp)connectMetadata.getUpdateTime())));
            }
            entity.setAttribute("type", (Object)connectMetadata.getType().name());
            entity.setAttribute("tasksMax", (Object)connectMetadata.getTasksMax());
            entity.setAttribute("kafkaAuthMode", (Object)connectMetadata.getKafkaAuthMode().name());
            if (!connectMetadata.getKafkaServiceAccountId().isEmpty()) {
                entity.setAttribute("kafkaServiceAccountId", (Object)connectMetadata.getKafkaServiceAccountId());
            }
            if (!connectMetadata.getKafkaApiKey().isEmpty()) {
                entity.setAttribute("kafkaApiKey", (Object)connectMetadata.getKafkaApiKey());
            }
            if (!connectMetadata.getDlqTopic().isEmpty()) {
                entity.setAttribute("dlqTopic", (Object)connectMetadata.getDlqTopic());
            }
            if (!connectMetadata.getInputFormat().isEmpty()) {
                entity.setAttribute("inputFormat", (Object)connectMetadata.getInputFormat());
            }
            if (!connectMetadata.getOutputFormat().isEmpty()) {
                entity.setAttribute("outputFormat", (Object)connectMetadata.getOutputFormat());
            }
            if (!connectMetadata.getSourceSchema().isEmpty()) {
                entity.setAttribute("sourceSchema", (Object)connectMetadata.getSourceSchema());
            }
            if (!connectMetadata.getSourceEndpoint().isEmpty()) {
                entity.setAttribute("sourceEndpoint", (Object)connectMetadata.getSourceEndpoint());
            }
        } else if (connectorClusterId != null) {
            entity.setAttribute("clusterId", (Object)connectorClusterId);
        }
        if (topics != null) {
            entity.setRelationshipAttribute("topics", topics.stream().map(MetadataRegistry::toRelatedObjectIdWithUniqAttr).collect(Collectors.toList()));
        }
        return new AtlasEntity.AtlasEntityWithExtInfo(entity);
    }

    private void deleteConnectMetadata(String tenant, List<ConnectMetadata> connectMetadatas, Long newTime) {
        for (ConnectMetadata connectMetadata : connectMetadatas) {
            String connectorQualifiedName = QualifiedNameGenerator.getQualifiedName(tenant, connectMetadata.getClusterId());
            Long oldTime = (Long)this.entityTimeCache.getIfPresent((Object)connectorQualifiedName);
            if (oldTime != null && newTime != null && newTime < oldTime) {
                Instant instant = Instant.ofEpochMilli(newTime);
                log.warn("Discarding event with time {}", (Object)instant);
                continue;
            }
            this.deleteConnectMetadata(tenant, connectMetadata);
            if (newTime == null) continue;
            this.entityTimeCache.put((Object)connectorQualifiedName, (Object)newTime);
        }
    }

    private void deleteConnectMetadata(String tenant, ConnectMetadata connectMetadata) {
        try {
            String connectorQualifiedName = QualifiedNameGenerator.getQualifiedName(tenant, connectMetadata.getClusterId());
            this.metadataRegistry.deleteEntity(tenant, ModelConstants.ENTITY_CN_CONNECTOR, connectorQualifiedName);
            this.metadataRegistry.purgeEntity(tenant, ModelConstants.ENTITY_CN_CONNECTOR, connectorQualifiedName);
            this.metricsManager.recordIngestionTransform(1);
            this.metricsManager.recordIngestionConnectorDeletion(1);
        }
        catch (AtlasBaseException e) {
            this.metricsManager.recordIngestionTransformError(1);
            this.metricsManager.recordIngestionConnectorDeletionError(1);
            log.error("Could not process connector deletion metadata" + connectMetadata, (Throwable)e);
        }
    }

    private AtlasEntity.AtlasEntityWithExtInfo maybeCreateKsqlCluster(String tenant, String ksqlClusterId) {
        String clusterQualifiedName = QualifiedNameGenerator.getQualifiedName(tenant, ksqlClusterId);
        AtlasEntity.AtlasEntityWithExtInfo cluster = this.newKsqlCluster(tenant, ksqlClusterId, clusterQualifiedName);
        String oldClusterGuid = null;
        try {
            oldClusterGuid = this.metadataRegistry.getGuid(tenant, ModelConstants.ENTITY_KSQL_LOGICAL_CLUSTER, clusterQualifiedName);
        }
        catch (AtlasBaseException atlasBaseException) {
            // empty catch block
        }
        if (oldClusterGuid == null) {
            try {
                this.metadataRegistry.createOrUpdateEntity(tenant, cluster);
            }
            catch (AtlasBaseException e) {
                log.warn("Could not create ksql logical cluster {}", (Object)clusterQualifiedName);
            }
        }
        return cluster;
    }

    private AtlasEntity.AtlasEntityWithExtInfo newKsqlCluster(String tenant, String cluster, String qualifiedName) {
        AtlasEntity entity = new AtlasEntity(ModelConstants.ENTITY_KSQL_LOGICAL_CLUSTER);
        entity.setAttribute("tenant", (Object)tenant);
        entity.setAttribute("qualifiedName", (Object)qualifiedName);
        entity.setAttribute("name", (Object)cluster);
        entity.setAttribute("nameLower", (Object)cluster.toLowerCase(Locale.ROOT));
        entity.setAttribute("createTime", (Object)ModelConstants.UNIX_ZERO_EPOCH);
        entity.setAttribute("id", (Object)cluster);
        return new AtlasEntity.AtlasEntityWithExtInfo(entity);
    }

    private AtlasEntity.AtlasEntityWithExtInfo maybeCreateDummyStream(String tenant, String ksqlClusterId, String streamName, AtlasEntity.AtlasEntityWithExtInfo ksqlCluster) {
        String streamQualifiedName = QualifiedNameGenerator.getQualifiedName(tenant, ksqlClusterId, streamName);
        AtlasEntity.AtlasEntityWithExtInfo streamEntity = this.newStream(tenant, streamName, streamQualifiedName, ksqlCluster.getEntity());
        String oldStreamGuid = null;
        try {
            oldStreamGuid = this.metadataRegistry.getGuid(tenant, ModelConstants.ENTITY_KSQL_STREAM, streamQualifiedName);
        }
        catch (AtlasBaseException atlasBaseException) {
            // empty catch block
        }
        if (oldStreamGuid == null) {
            try {
                this.metadataRegistry.createOrUpdateEntity(tenant, streamEntity);
            }
            catch (AtlasBaseException e) {
                log.warn("Could not create ksql stream {}", (Object)streamQualifiedName);
            }
        }
        return streamEntity;
    }

    private AtlasEntity.AtlasEntityWithExtInfo newStream(String tenant, String streamName, String qualifiedName, AtlasEntity cluster) {
        AtlasEntity entity = new AtlasEntity(ModelConstants.ENTITY_KSQL_STREAM);
        entity.setAttribute("tenant", (Object)tenant);
        entity.setAttribute("qualifiedName", (Object)qualifiedName);
        entity.setAttribute("name", (Object)streamName);
        entity.setAttribute("nameLower", (Object)streamName.toLowerCase(Locale.ROOT));
        entity.setAttribute("createTime", (Object)ModelConstants.UNIX_ZERO_EPOCH);
        entity.setRelationshipAttribute("logical_cluster", (Object)MetadataRegistry.toRelatedObjectIdWithUniqAttr(cluster));
        return new AtlasEntity.AtlasEntityWithExtInfo(entity);
    }

    private AtlasEntity.AtlasEntityWithExtInfo maybeCreateDummyTable(String tenant, String ksqlClusterId, String tableName, AtlasEntity.AtlasEntityWithExtInfo ksqlCluster) {
        String tableQualifiedName = QualifiedNameGenerator.getQualifiedName(tenant, ksqlClusterId, tableName);
        AtlasEntity.AtlasEntityWithExtInfo tableEntity = this.newTable(tenant, tableName, tableQualifiedName, ksqlCluster.getEntity());
        String oldTableGuid = null;
        try {
            oldTableGuid = this.metadataRegistry.getGuid(tenant, ModelConstants.ENTITY_KSQL_TABLE, tableQualifiedName);
        }
        catch (AtlasBaseException atlasBaseException) {
            // empty catch block
        }
        if (oldTableGuid == null) {
            try {
                this.metadataRegistry.createOrUpdateEntity(tenant, tableEntity);
            }
            catch (AtlasBaseException e) {
                log.warn("Could not create ksql table {}", (Object)tableQualifiedName);
            }
        }
        return tableEntity;
    }

    private AtlasEntity.AtlasEntityWithExtInfo newTable(String tenant, String tableName, String qualifiedName, AtlasEntity cluster) {
        AtlasEntity entity = new AtlasEntity(ModelConstants.ENTITY_KSQL_TABLE);
        entity.setAttribute("tenant", (Object)tenant);
        entity.setAttribute("qualifiedName", (Object)qualifiedName);
        entity.setAttribute("name", (Object)tableName);
        entity.setAttribute("nameLower", (Object)tableName.toLowerCase(Locale.ROOT));
        entity.setAttribute("createTime", (Object)ModelConstants.UNIX_ZERO_EPOCH);
        entity.setRelationshipAttribute("logical_cluster", (Object)MetadataRegistry.toRelatedObjectIdWithUniqAttr(cluster));
        return new AtlasEntity.AtlasEntityWithExtInfo(entity);
    }

    private AtlasEntity.AtlasEntityWithExtInfo newQuery(String tenant, String queryName, String qualifiedName, AtlasEntity cluster) {
        AtlasEntity entity = new AtlasEntity(ModelConstants.ENTITY_KSQL_QUERY);
        entity.setAttribute("tenant", (Object)tenant);
        entity.setAttribute("qualifiedName", (Object)qualifiedName);
        entity.setAttribute("name", (Object)queryName);
        entity.setAttribute("nameLower", (Object)queryName.toLowerCase(Locale.ROOT));
        entity.setAttribute("createTime", (Object)ModelConstants.UNIX_ZERO_EPOCH);
        entity.setRelationshipAttribute("logical_cluster", (Object)MetadataRegistry.toRelatedObjectIdWithUniqAttr(cluster));
        return new AtlasEntity.AtlasEntityWithExtInfo(entity);
    }

    private void createOrUpdatePipelineMetadata(String tenant, List<PipelineMetadata> pipelineMetadatas, Long newTime) {
        try {
            for (PipelineMetadata pipelineMetadata : pipelineMetadatas) {
                Object streamName2;
                Object topic4;
                Object topic22;
                String pipelineQualifiedName = QualifiedNameGenerator.getQualifiedName(tenant, pipelineMetadata.getId());
                Long oldTime = (Long)this.entityTimeCache.getIfPresent((Object)pipelineQualifiedName);
                if (oldTime != null && newTime != null && newTime < oldTime) {
                    Instant instant = Instant.ofEpochMilli(newTime);
                    log.warn("Discarding event with time {}", (Object)instant);
                    continue;
                }
                String kafkaClusterId = pipelineMetadata.getKafkaClusterId();
                AtlasEntity.AtlasEntityWithExtInfo kafkaCluster = this.maybeCreateDummyKafkaCluster(tenant, kafkaClusterId);
                String ksqlClusterId = pipelineMetadata.getKsqlClusterId();
                AtlasEntity.AtlasEntityWithExtInfo ksqlCluster = this.maybeCreateKsqlCluster(tenant, ksqlClusterId);
                ArrayList<AtlasEntity> inputTopics = new ArrayList<AtlasEntity>();
                for (Object topic22 : pipelineMetadata.getInputTopicNamesList()) {
                    AtlasEntity.AtlasEntityWithExtInfo topicEntity = this.maybeCreateDummyTopic(tenant, kafkaClusterId, (String)topic22, kafkaCluster);
                    inputTopics.add(topicEntity.getEntity());
                }
                ArrayList<AtlasEntity> outputTopics = new ArrayList<AtlasEntity>();
                topic22 = pipelineMetadata.getOutputTopicNamesList().iterator();
                while (topic22.hasNext()) {
                    String topic3 = (String)topic22.next();
                    AtlasEntity.AtlasEntityWithExtInfo topicEntity = this.maybeCreateDummyTopic(tenant, kafkaClusterId, topic3, kafkaCluster);
                    outputTopics.add(topicEntity.getEntity());
                }
                ArrayList<AtlasEntity> activatedTopics = new ArrayList<AtlasEntity>();
                for (Object topic4 : pipelineMetadata.getActivatedResources().getTopicNamesList()) {
                    AtlasEntity.AtlasEntityWithExtInfo topicEntity = this.maybeCreateDummyTopic(tenant, kafkaClusterId, (String)topic4, kafkaCluster);
                    activatedTopics.add(topicEntity.getEntity());
                }
                ArrayList<AtlasEntity> activatedConnectors = new ArrayList<AtlasEntity>();
                topic4 = pipelineMetadata.getActivatedResources().getConnectorIdsList().iterator();
                while (topic4.hasNext()) {
                    String connectorId = (String)topic4.next();
                    AtlasEntity.AtlasEntityWithExtInfo connectorEntity = this.maybeCreateDummyConnector(tenant, connectorId);
                    activatedConnectors.add(connectorEntity.getEntity());
                }
                ArrayList<AtlasEntity> activatedStreams = new ArrayList<AtlasEntity>();
                for (Object streamName2 : pipelineMetadata.getActivatedResources().getStreamNamesList()) {
                    AtlasEntity.AtlasEntityWithExtInfo streamEntity = this.maybeCreateDummyStream(tenant, ksqlClusterId, (String)streamName2, ksqlCluster);
                    activatedStreams.add(streamEntity.getEntity());
                }
                ArrayList<AtlasEntity> activatedTables = new ArrayList<AtlasEntity>();
                streamName2 = pipelineMetadata.getActivatedResources().getTableNamesList().iterator();
                while (streamName2.hasNext()) {
                    String tableName = (String)streamName2.next();
                    AtlasEntity.AtlasEntityWithExtInfo tableEntity = this.maybeCreateDummyTable(tenant, ksqlClusterId, tableName, ksqlCluster);
                    activatedTables.add(tableEntity.getEntity());
                }
                AtlasEntity.AtlasEntityWithExtInfo pipeline = this.newPipeline(tenant, pipelineMetadata.getName(), pipelineQualifiedName, pipelineMetadata, kafkaCluster.getEntity(), ksqlCluster.getEntity(), inputTopics, outputTopics, activatedTopics, activatedConnectors, activatedStreams, activatedTables);
                if (newTime != null) {
                    this.entityTimeCache.put((Object)pipelineQualifiedName, (Object)newTime);
                }
                this.metadataRegistry.createOrUpdateEntity(tenant, pipeline);
                this.metricsManager.recordIngestionPipelineCreation(1);
            }
            this.metricsManager.recordIngestionTransform(1);
        }
        catch (AtlasBaseException e) {
            this.metricsManager.recordIngestionTransformError(1);
            this.metricsManager.recordIngestionPipelineCreationError(1);
            log.error("Could not process pipeline creation metadata", (Throwable)e);
        }
    }

    private AtlasEntity.AtlasEntityWithExtInfo newPipeline(String tenant, String pipelineName, String qualifiedName, PipelineMetadata pipelineMetadata, AtlasEntity kafkaCluster, AtlasEntity ksqlCluster, List<AtlasEntity> inputTopics, List<AtlasEntity> outputTopics, List<AtlasEntity> activatedTopics, List<AtlasEntity> activatedConnectors, List<AtlasEntity> activatedStreams, List<AtlasEntity> activatedTables) {
        AtlasEntity entity = new AtlasEntity(ModelConstants.ENTITY_PL_PIPELINE);
        entity.setAttribute("tenant", (Object)tenant);
        entity.setAttribute("qualifiedName", (Object)qualifiedName);
        entity.setAttribute("name", (Object)pipelineName);
        entity.setAttribute("nameLower", (Object)pipelineName.toLowerCase(Locale.ROOT));
        entity.setAttribute("createTime", (Object)ModelConstants.UNIX_ZERO_EPOCH);
        if (pipelineMetadata != null) {
            if (!pipelineMetadata.getId().isEmpty()) {
                entity.setAttribute("id", (Object)pipelineMetadata.getId());
            }
            if (!pipelineMetadata.getDescription().isEmpty()) {
                entity.setAttribute("description", (Object)pipelineMetadata.getDescription());
            }
            entity.setAttribute("method", (Object)pipelineMetadata.getMethodName().name());
            entity.setAttribute("status", (Object)pipelineMetadata.getStatus().getState().name());
            if (pipelineMetadata.getStatus().getCreateTime().getSeconds() != 0L) {
                entity.setAttribute("createTime", (Object)new Date(Timestamps.toMillis((Timestamp)pipelineMetadata.getStatus().getCreateTime())));
            }
            if (!pipelineMetadata.getStatus().getCreatedBy().isEmpty()) {
                entity.setAttribute("createdBy", (Object)pipelineMetadata.getStatus().getCreatedBy());
            }
            if (pipelineMetadata.getStatus().getUpdateTime().getSeconds() != 0L) {
                entity.setAttribute("updateTime", (Object)new Date(Timestamps.toMillis((Timestamp)pipelineMetadata.getStatus().getUpdateTime())));
            }
            if (!pipelineMetadata.getStatus().getUpdatedBy().isEmpty()) {
                entity.setAttribute("updatedBy", (Object)pipelineMetadata.getStatus().getUpdatedBy());
            }
            if (pipelineMetadata.getStatus().getActivateTime().getSeconds() != 0L) {
                entity.setAttribute("activateTime", (Object)new Date(Timestamps.toMillis((Timestamp)pipelineMetadata.getStatus().getActivateTime())));
            }
            if (!pipelineMetadata.getStatus().getActivatedBy().isEmpty()) {
                entity.setAttribute("activatedBy", (Object)pipelineMetadata.getStatus().getActivatedBy());
            }
            entity.setAttribute("topicCount", (Object)pipelineMetadata.getStatus().getTopicCount());
            entity.setAttribute("connectorCount", (Object)pipelineMetadata.getStatus().getConnectorCount());
            entity.setAttribute("streamCount", (Object)pipelineMetadata.getStatus().getStreamCount());
            entity.setAttribute("tableCount", (Object)pipelineMetadata.getStatus().getTableCount());
            entity.setAttribute("queryCount", (Object)pipelineMetadata.getStatus().getQueryCount());
        }
        entity.setRelationshipAttribute("kafka_logical_cluster", (Object)MetadataRegistry.toRelatedObjectIdWithUniqAttr(kafkaCluster));
        entity.setRelationshipAttribute("ksql_logical_cluster", (Object)MetadataRegistry.toRelatedObjectIdWithUniqAttr(ksqlCluster));
        if (inputTopics != null) {
            entity.setRelationshipAttribute("input_topics", inputTopics.stream().map(MetadataRegistry::toRelatedObjectIdWithUniqAttr).collect(Collectors.toList()));
        }
        if (outputTopics != null) {
            entity.setRelationshipAttribute("output_topics", outputTopics.stream().map(MetadataRegistry::toRelatedObjectIdWithUniqAttr).collect(Collectors.toList()));
        }
        if (activatedTopics != null) {
            entity.setRelationshipAttribute("activated_topics", activatedTopics.stream().map(MetadataRegistry::toRelatedObjectIdWithUniqAttr).collect(Collectors.toList()));
        }
        if (activatedConnectors != null) {
            entity.setRelationshipAttribute("activated_connectors", activatedConnectors.stream().map(MetadataRegistry::toRelatedObjectIdWithUniqAttr).collect(Collectors.toList()));
        }
        if (activatedStreams != null) {
            entity.setRelationshipAttribute("activated_streams", activatedStreams.stream().map(MetadataRegistry::toRelatedObjectIdWithUniqAttr).collect(Collectors.toList()));
        }
        if (activatedTables != null) {
            entity.setRelationshipAttribute("activated_tables", activatedTables.stream().map(MetadataRegistry::toRelatedObjectIdWithUniqAttr).collect(Collectors.toList()));
        }
        return new AtlasEntity.AtlasEntityWithExtInfo(entity);
    }

    private void deletePipelineMetadata(String tenant, List<PipelineMetadata> pipelineMetadatas, Long newTime) {
        for (PipelineMetadata pipelineMetadata : pipelineMetadatas) {
            String pipelineQualifiedName = QualifiedNameGenerator.getQualifiedName(tenant, pipelineMetadata.getId());
            Long oldTime = (Long)this.entityTimeCache.getIfPresent((Object)pipelineQualifiedName);
            if (oldTime != null && newTime != null && newTime < oldTime) {
                Instant instant = Instant.ofEpochMilli(newTime);
                log.warn("Discarding event with time {}", (Object)instant);
                continue;
            }
            this.deletePipelineMetadata(tenant, pipelineMetadata);
            if (newTime == null) continue;
            this.entityTimeCache.put((Object)pipelineQualifiedName, (Object)newTime);
        }
    }

    private void deletePipelineMetadata(String tenant, PipelineMetadata pipelineMetadata) {
        try {
            String pipelineQualifiedName = QualifiedNameGenerator.getQualifiedName(tenant, pipelineMetadata.getId());
            this.metadataRegistry.deleteEntity(tenant, ModelConstants.ENTITY_PL_PIPELINE, pipelineQualifiedName);
            this.metadataRegistry.purgeEntity(tenant, ModelConstants.ENTITY_PL_PIPELINE, pipelineQualifiedName);
            this.metricsManager.recordIngestionTransform(1);
            this.metricsManager.recordIngestionPipelineDeletion(1);
        }
        catch (AtlasBaseException e) {
            this.metricsManager.recordIngestionTransformError(1);
            this.metricsManager.recordIngestionPipelineDeletionError(1);
            log.error("Could not process pipeline deletion metadata" + pipelineMetadata, (Throwable)e);
        }
    }

    private void createOrUpdateEnvironmentMetadata(String tenant, List<EnvironmentMetadata> environmentMetadatas, Long newTime) {
        try {
            for (EnvironmentMetadata environmentMetadata : environmentMetadatas) {
                String environmentQualifiedName = QualifiedNameGenerator.getQualifiedName(tenant, environmentMetadata.getEnvironmentId());
                Long oldTime = (Long)this.entityTimeCache.getIfPresent((Object)environmentQualifiedName);
                if (oldTime != null && newTime != null && newTime < oldTime) {
                    Instant instant = Instant.ofEpochMilli(newTime);
                    log.warn("Discarding create environment event with time {}", (Object)instant);
                    continue;
                }
                AtlasEntity.AtlasEntityWithExtInfo environmentEntity = this.newEnvironment(tenant, environmentMetadata.getEnvironmentId(), environmentMetadata.getEnvironmentName(), environmentQualifiedName, environmentMetadata);
                if (newTime != null) {
                    this.entityTimeCache.put((Object)environmentQualifiedName, (Object)newTime);
                }
                this.metadataRegistry.createOrUpdateEntity(tenant, environmentEntity);
                this.metricsManager.recordIngestionEnvironmentCreation(1);
            }
            this.metricsManager.recordIngestionTransform(1);
        }
        catch (AtlasBaseException e) {
            this.metricsManager.recordIngestionTransformError(1);
            this.metricsManager.recordIngestionEnvironmentCreationError(1);
            log.error("Could not process environment creation metadata", (Throwable)e);
        }
    }

    private AtlasEntity.AtlasEntityWithExtInfo newEnvironment(String tenant, String environmentId, String environmentName, String qualifiedName, EnvironmentMetadata environmentMetadata) {
        AtlasEntity entity = new AtlasEntity(ModelConstants.ENTITY_CF_ENVIRONMENT);
        entity.setAttribute("tenant", (Object)tenant);
        entity.setAttribute("qualifiedName", (Object)qualifiedName);
        entity.setAttribute("name", (Object)environmentName);
        entity.setAttribute("nameLower", (Object)environmentName.toLowerCase(Locale.ROOT));
        entity.setAttribute("createTime", (Object)ModelConstants.UNIX_ZERO_EPOCH);
        if (environmentMetadata != null) {
            if (!environmentMetadata.getEnvironmentId().isEmpty()) {
                entity.setAttribute("id", (Object)environmentMetadata.getEnvironmentId());
            }
            if (environmentMetadata.getCreateTime().getSeconds() != 0L) {
                entity.setAttribute("createTime", (Object)new Date(Timestamps.toMillis((Timestamp)environmentMetadata.getCreateTime())));
            }
            if (environmentMetadata.getUpdateTime().getSeconds() != 0L) {
                entity.setAttribute("updateTime", (Object)new Date(Timestamps.toMillis((Timestamp)environmentMetadata.getUpdateTime())));
            }
        } else if (environmentId != null) {
            entity.setAttribute("id", (Object)environmentId);
        }
        return new AtlasEntity.AtlasEntityWithExtInfo(entity);
    }

    private AtlasEntity.AtlasEntityWithExtInfo maybeCreateDummyEnvironment(String tenant, String environmentId) {
        String environmentQualifiedName = QualifiedNameGenerator.getQualifiedName(tenant, environmentId);
        AtlasEntity.AtlasEntityWithExtInfo environmentEntity = this.newEnvironment(tenant, environmentId, "", environmentQualifiedName, null);
        String oldEnvGuid = null;
        try {
            oldEnvGuid = this.metadataRegistry.getGuid(tenant, ModelConstants.ENTITY_CF_ENVIRONMENT, environmentQualifiedName);
        }
        catch (AtlasBaseException atlasBaseException) {
            // empty catch block
        }
        if (oldEnvGuid == null) {
            try {
                this.metadataRegistry.createOrUpdateEntity(tenant, environmentEntity);
            }
            catch (AtlasBaseException e) {
                log.warn("Could not create dummy environment {}", (Object)environmentQualifiedName);
            }
        }
        return environmentEntity;
    }

    private void deleteEnvironmentMetadata(String tenant, List<EnvironmentMetadata> environmentMetadatas, Long newTime) {
        for (EnvironmentMetadata environmentMetadata : environmentMetadatas) {
            String environmentQualifiedName = QualifiedNameGenerator.getQualifiedName(tenant, environmentMetadata.getEnvironmentId());
            Long oldTime = (Long)this.entityTimeCache.getIfPresent((Object)environmentQualifiedName);
            if (oldTime != null && newTime != null && newTime < oldTime) {
                Instant instant = Instant.ofEpochMilli(newTime);
                log.warn("Discarding delete environment event with time {}", (Object)instant);
                continue;
            }
            this.deleteEnvironmentMetadata(tenant, environmentMetadata, environmentQualifiedName);
            if (newTime == null) continue;
            this.entityTimeCache.put((Object)environmentQualifiedName, (Object)newTime);
        }
    }

    private void deleteEnvironmentMetadata(String tenant, EnvironmentMetadata environmentMetadata, String environmentQualifiedName) {
        try {
            this.metadataRegistry.deleteEntity(tenant, ModelConstants.ENTITY_CF_ENVIRONMENT, environmentQualifiedName);
            this.metadataRegistry.purgeEntity(tenant, ModelConstants.ENTITY_CF_ENVIRONMENT, environmentQualifiedName);
            this.metricsManager.recordIngestionTransform(1);
            this.metricsManager.recordIngestionEnvironmentDeletion(1);
        }
        catch (AtlasBaseException e) {
            this.metricsManager.recordIngestionTransformError(1);
            this.metricsManager.recordIngestionEnvironmentDeletionError(1);
            log.error("Could not process environment deletion metadata" + environmentMetadata, (Throwable)e);
        }
    }

    private String getCleanupPolicy(TopicMetadata.CleanupPolicy policy) {
        if (policy == null || policy == TopicMetadata.CleanupPolicy.UNSPECIFIED) {
            return "NONE";
        }
        return policy.name();
    }

    private String getCompressionType(TopicMetadata.CompressionType compressionType) {
        if (compressionType == null || compressionType == TopicMetadata.CompressionType.COMPRESSION_UNSPECIFIED) {
            return "UNSPECIFIED";
        }
        return compressionType.name();
    }
}

