package io.confluent.catalog.ingestion.event;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.protobuf.util.Timestamps;
import io.cloudevents.CloudEvent;
import io.cloudevents.CloudEventData;
import io.confluent.catalog.DataCatalogConfig;
import io.confluent.catalog.ingestion.ReconciliationCache;
import io.confluent.catalog.metrics.MetricsManager;
import io.confluent.catalog.model.ModelConstants;
import io.confluent.catalog.storage.MetadataRegistry;
import io.confluent.catalog.util.QualifiedNameGenerator;
import io.confluent.protobuf.events.catalog.v1.ClusterLinkMetadata;
import io.confluent.protobuf.events.catalog.v1.ConnectMetadata;
import io.confluent.protobuf.events.catalog.v1.EnvironmentMetadata;
import io.confluent.protobuf.events.catalog.v1.LogicalClusterMetadata;
import io.confluent.protobuf.events.catalog.v1.MetadataChange;
import io.confluent.protobuf.events.catalog.v1.MetadataEvent;
import io.confluent.protobuf.events.catalog.v1.OpType;
import io.confluent.protobuf.events.catalog.v1.PipelineMetadata;
import io.confluent.protobuf.events.catalog.v1.TopicMetadata;
import io.confluent.rest.RestConfigException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.RequestContext;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.streams.processor.api.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/catalog/ingestion/event/EventProcessor.class */
public class EventProcessor {
    private static final Logger log = LoggerFactory.getLogger(EventProcessor.class);
    public static final String LSRC_HEADER = "_lsrc";
    public static final String RECONCILIATION_HEADER = "ce_reconciliation";
    public static final String PAGE_NUMBER_HEADER = "ce_page";
    public static final String TOTAL_PAGE_HEADER = "ce_total";
    public static final String LAST_PAGE_HEADER = "ce_lastpage";
    private final boolean isReconciliationClusterEnabled;
    private final boolean isReconciliationTopicEnabled;
    private final MetadataRegistry metadataRegistry;
    private final MetricsManager metricsManager;
    private final EventDeserializer eventDeserializer = new EventDeserializer();
    private final Cache<String, Long> entityTimeCache;
    private final ReconciliationCache reconciliationCache;

    public EventProcessor(MetadataRegistry metadataRegistry, MetricsManager metricsManager) {
        try {
            this.metadataRegistry = metadataRegistry;
            this.metricsManager = metricsManager;
            DataCatalogConfig dataCatalogConfig = new DataCatalogConfig(metadataRegistry.config().originalProperties());
            this.isReconciliationClusterEnabled = dataCatalogConfig.isCatalogReconciliationClusterEnabled();
            this.isReconciliationTopicEnabled = dataCatalogConfig.isCatalogReconciliationTopicEnabled();
            this.entityTimeCache = Caffeine.newBuilder().maximumSize(dataCatalogConfig.getCatalogIngestorCacheMaxSize()).expireAfterWrite(Duration.ofSeconds(dataCatalogConfig.getCatalogIngestorCacheTtlSec())).build();
            this.reconciliationCache = new ReconciliationCache(dataCatalogConfig.getCatalogReconciliationCacheMaxSize(), dataCatalogConfig.getCatalogReconciliationCacheTtlSec());
        } catch (RestConfigException e) {
            throw new IllegalArgumentException("Could not instantiate EventProcessor", e);
        }
    }

    public void process(Record<String, CloudEvent> record) {
        try {
            this.metadataRegistry.waitForInit();
        } catch (InterruptedException e) {
        }
        String str = (String) record.key();
        CloudEvent cloudEvent = (CloudEvent) record.value();
        CloudEventData data = cloudEvent.getData();
        if (data == null) {
            this.metricsManager.recordIngestionTransformError(1);
            log.error("Missing data in cloud event");
            return;
        }
        Long timestamp = getTimestamp(cloudEvent);
        if (timestamp != null && timestamp.longValue() != 0) {
            this.metricsManager.recordIngestionEventsPipelineTime((System.currentTimeMillis() - timestamp.longValue()) / 1000.0d);
        }
        Headers headers = record.headers();
        MetadataChange m32deserialize = this.eventDeserializer.m32deserialize((String) null, data.toBytes());
        Header lastHeader = headers.lastHeader(LSRC_HEADER);
        if (lastHeader == null) {
            this.metricsManager.recordIngestionTransformError(1);
            log.error("Could not find lsrc for record with key " + str);
            return;
        }
        try {
            process(new String(lastHeader.value(), StandardCharsets.UTF_8), m32deserialize.getSource(), m32deserialize.getOp(), timestamp, m32deserialize.getEventsList(), headers);
        } catch (Exception e2) {
            this.metricsManager.recordIngestionTransformError(1);
            log.error("Error during event processing", e2);
        } finally {
            RequestContext.clear();
        }
    }

    private void process(String str, String str2, OpType opType, Long l, List<MetadataEvent> list, Headers headers) {
        List<List<MetadataEvent>> eventsList = getEventsList(list);
        switch (opType) {
            case CREATE:
            case UPDATE:
                createOrUpdateMetadata(str, str2, eventsList, l);
                return;
            case SNAPSHOT:
                processSnapshot(str, str2, eventsList, l, headers);
                return;
            case DELETE:
            case PURGE:
                deleteMetadata(str, str2, eventsList, l);
                return;
            default:
                log.warn("Unrecognized opType {}", opType);
                return;
        }
    }

    private List<List<MetadataEvent>> getEventsList(List<MetadataEvent> list) {
        ArrayList arrayList = new ArrayList();
        for (MetadataEvent.MetadataCase metadataCase : MetadataEvent.MetadataCase.values()) {
            arrayList.add(new ArrayList());
        }
        for (MetadataEvent metadataEvent : list) {
            ((List) arrayList.get(metadataEvent.getMetadataCase().getNumber())).add(metadataEvent);
        }
        return arrayList;
    }

    private boolean isReconciliationCluster(Headers headers) {
        Header lastHeader;
        return this.isReconciliationClusterEnabled && (lastHeader = headers.lastHeader(RECONCILIATION_HEADER)) != null && Boolean.parseBoolean(new String(lastHeader.value(), StandardCharsets.UTF_8));
    }

    private boolean isReconciliationTopic(Headers headers) {
        Header lastHeader;
        return this.isReconciliationTopicEnabled && (lastHeader = headers.lastHeader(RECONCILIATION_HEADER)) != null && Boolean.parseBoolean(new String(lastHeader.value(), StandardCharsets.UTF_8));
    }

    private void processSnapshot(String str, String str2, List<List<MetadataEvent>> list, Long l, Headers headers) {
        List<TopicMetadata> list2 = (List) list.get(MetadataEvent.MetadataCase.TOPIC_METADATA.getNumber()).stream().map((v0) -> {
            return v0.getTopicMetadata();
        }).collect(Collectors.toList());
        if (list2.size() > 0) {
            createOrUpdateTopicMetadata(str, str2, list2, l);
        }
        List<ClusterLinkMetadata> list3 = (List) list.get(MetadataEvent.MetadataCase.CLUSTER_LINK_METADATA.getNumber()).stream().map((v0) -> {
            return v0.getClusterLinkMetadata();
        }).collect(Collectors.toList());
        if (list3.size() > 0) {
            createOrUpdateClusterLinkMetadata(str, str2, list3, l);
        }
        List<LogicalClusterMetadata> list4 = (List) list.get(MetadataEvent.MetadataCase.LOGICAL_CLUSTER_METADATA.getNumber()).stream().map((v0) -> {
            return v0.getLogicalClusterMetadata();
        }).collect(Collectors.toList());
        if (list4.size() > 0) {
            createOrUpdateLogicalClusterMetadata(str, list4, l);
            if (isReconciliationCluster(headers)) {
                reconcileLogicalClusterMetadata(str, list4, l, headers);
            }
        }
        List<ConnectMetadata> list5 = (List) list.get(MetadataEvent.MetadataCase.CONNECT_METADATA.getNumber()).stream().map((v0) -> {
            return v0.getConnectMetadata();
        }).collect(Collectors.toList());
        if (list5.size() > 0) {
            createOrUpdateConnectMetadata(str, list5, l);
        }
        List<PipelineMetadata> list6 = (List) list.get(MetadataEvent.MetadataCase.PIPELINE_METADATA.getNumber()).stream().map((v0) -> {
            return v0.getPipelineMetadata();
        }).collect(Collectors.toList());
        if (list6.size() > 0) {
            createOrUpdatePipelineMetadata(str, list6, l);
        }
        List<EnvironmentMetadata> list7 = (List) list.get(MetadataEvent.MetadataCase.ENVIRONMENT_METADATA.getNumber()).stream().map((v0) -> {
            return v0.getEnvironmentMetadata();
        }).collect(Collectors.toList());
        if (list7.size() > 0) {
            createOrUpdateEnvironmentMetadata(str, list7, l);
        }
        if ((list2.size() > 0 || list3.size() > 0) && isReconciliationTopic(headers)) {
            reconcileTopicMetadata(str, str2, list2, l, headers);
        }
    }

    private void reconcileTopicMetadata(String str, String str2, List<TopicMetadata> list, Long l, Headers headers) {
        boolean z;
        Header lastHeader = headers.lastHeader(TOTAL_PAGE_HEADER);
        Header lastHeader2 = headers.lastHeader(PAGE_NUMBER_HEADER);
        Header lastHeader3 = headers.lastHeader(LAST_PAGE_HEADER);
        if ((lastHeader3 == null && lastHeader == null) || lastHeader2 == null) {
            log.error(String.format("Reconcile topic metadata, headers missing, tenant %s", str));
            return;
        }
        int parseInt = Integer.parseInt(new String(lastHeader2.value(), StandardCharsets.UTF_8));
        ReconciliationCache.ReconciliationCacheKey reconciliationCacheKey = new ReconciliationCache.ReconciliationCacheKey(str, str2, ModelConstants.ENTITY_KAFKA_TOPIC);
        HashSet<Integer> pageNumbers = this.reconciliationCache.getOrDefault(reconciliationCacheKey).getPageNumbers();
        if (!pageNumbers.add(Integer.valueOf(parseInt))) {
            log.warn(String.format("Reconcile topic metadata, receive same page numbers %d from %s", Integer.valueOf(parseInt), reconciliationCacheKey));
            this.reconciliationCache.invalidate(reconciliationCacheKey);
            return;
        }
        HashSet<String> entities = this.reconciliationCache.getOrDefault(reconciliationCacheKey).getEntities();
        Iterator<TopicMetadata> it = list.iterator();
        while (it.hasNext()) {
            entities.add(it.next().getTopicName());
        }
        if (lastHeader3 != null) {
            z = Boolean.parseBoolean(new String(lastHeader3.value(), StandardCharsets.UTF_8));
        } else {
            z = Integer.parseInt(new String(lastHeader.value(), StandardCharsets.UTF_8)) - 1 == parseInt;
        }
        if (z) {
            if (pageNumbers.size() != parseInt + 1) {
                log.warn(String.format("Reconcile topic metadata, do not receive all pagesfrom %s", reconciliationCacheKey));
            } else {
                try {
                    this.metadataRegistry.reconcileEntities(ModelConstants.ENTITY_KAFKA_TOPIC, str, str2, entities);
                } catch (AtlasBaseException e) {
                    log.error("Reconcile topic metadata error", e);
                }
            }
            this.reconciliationCache.invalidate(reconciliationCacheKey);
        }
    }

    private void reconcileLogicalClusterMetadata(String str, List<LogicalClusterMetadata> list, Long l, Headers headers) {
        Header lastHeader = headers.lastHeader(TOTAL_PAGE_HEADER);
        Header lastHeader2 = headers.lastHeader(PAGE_NUMBER_HEADER);
        if (lastHeader == null || lastHeader2 == null) {
            log.error(String.format("Reconcile cluster metadata, headers missing, tenant %s", str));
            return;
        }
        int parseInt = Integer.parseInt(new String(lastHeader2.value(), StandardCharsets.UTF_8));
        ReconciliationCache.ReconciliationCacheKey reconciliationCacheKey = new ReconciliationCache.ReconciliationCacheKey(str, "", ModelConstants.ENTITY_KAFKA_LOGICAL_CLUSTER);
        HashSet<Integer> pageNumbers = this.reconciliationCache.getOrDefault(reconciliationCacheKey).getPageNumbers();
        if (!pageNumbers.add(Integer.valueOf(parseInt))) {
            log.warn(String.format("Reconcile cluster metadata, receive same page numbers %d from %s", Integer.valueOf(parseInt), reconciliationCacheKey));
            this.reconciliationCache.invalidate(reconciliationCacheKey);
            return;
        }
        HashSet<String> entities = this.reconciliationCache.getOrDefault(reconciliationCacheKey).getEntities();
        Iterator<LogicalClusterMetadata> it = list.iterator();
        while (it.hasNext()) {
            entities.add(it.next().getClusterId());
        }
        if (Integer.parseInt(new String(lastHeader.value(), StandardCharsets.UTF_8)) - 1 == parseInt) {
            if (pageNumbers.size() != parseInt + 1) {
                log.warn(String.format("Reconcile cluster metadata, do not receive all pages from %s", reconciliationCacheKey));
            } else {
                try {
                    this.metadataRegistry.reconcileEntities(ModelConstants.ENTITY_KAFKA_LOGICAL_CLUSTER, str, null, entities);
                } catch (AtlasBaseException e) {
                    log.error("Reconcile cluster metadata error", e);
                }
            }
            this.reconciliationCache.invalidate(reconciliationCacheKey);
        }
    }

    private Long getTimestamp(CloudEvent cloudEvent) {
        OffsetDateTime time = cloudEvent.getTime();
        if (time != null) {
            return Long.valueOf(time.toInstant().toEpochMilli());
        }
        return null;
    }

    private void createOrUpdateMetadata(String str, String str2, List<List<MetadataEvent>> list, Long l) {
        List<TopicMetadata> list2 = (List) list.get(MetadataEvent.MetadataCase.TOPIC_METADATA.getNumber()).stream().map((v0) -> {
            return v0.getTopicMetadata();
        }).collect(Collectors.toList());
        if (list2.size() > 0) {
            createOrUpdateTopicMetadata(str, str2, list2, l);
        }
        List<ClusterLinkMetadata> list3 = (List) list.get(MetadataEvent.MetadataCase.CLUSTER_LINK_METADATA.getNumber()).stream().map((v0) -> {
            return v0.getClusterLinkMetadata();
        }).collect(Collectors.toList());
        if (list3.size() > 0) {
            createOrUpdateClusterLinkMetadata(str, str2, list3, l);
        }
        List<LogicalClusterMetadata> list4 = (List) list.get(MetadataEvent.MetadataCase.LOGICAL_CLUSTER_METADATA.getNumber()).stream().map((v0) -> {
            return v0.getLogicalClusterMetadata();
        }).collect(Collectors.toList());
        if (list4.size() > 0) {
            createOrUpdateLogicalClusterMetadata(str, list4, l);
        }
        List<ConnectMetadata> list5 = (List) list.get(MetadataEvent.MetadataCase.CONNECT_METADATA.getNumber()).stream().map((v0) -> {
            return v0.getConnectMetadata();
        }).collect(Collectors.toList());
        if (list5.size() > 0) {
            createOrUpdateConnectMetadata(str, list5, l);
        }
        List<PipelineMetadata> list6 = (List) list.get(MetadataEvent.MetadataCase.PIPELINE_METADATA.getNumber()).stream().map((v0) -> {
            return v0.getPipelineMetadata();
        }).collect(Collectors.toList());
        if (list6.size() > 0) {
            createOrUpdatePipelineMetadata(str, list6, l);
        }
        List<EnvironmentMetadata> list7 = (List) list.get(MetadataEvent.MetadataCase.ENVIRONMENT_METADATA.getNumber()).stream().map((v0) -> {
            return v0.getEnvironmentMetadata();
        }).collect(Collectors.toList());
        if (list7.size() > 0) {
            createOrUpdateEnvironmentMetadata(str, list7, l);
        }
    }

    private void deleteMetadata(String str, String str2, List<List<MetadataEvent>> list, Long l) {
        List<TopicMetadata> list2 = (List) list.get(MetadataEvent.MetadataCase.TOPIC_METADATA.getNumber()).stream().map((v0) -> {
            return v0.getTopicMetadata();
        }).collect(Collectors.toList());
        if (list2.size() > 0) {
            deleteTopicMetadata(str, str2, list2, l);
        }
        List<ClusterLinkMetadata> list3 = (List) list.get(MetadataEvent.MetadataCase.CLUSTER_LINK_METADATA.getNumber()).stream().map((v0) -> {
            return v0.getClusterLinkMetadata();
        }).collect(Collectors.toList());
        if (list3.size() > 0) {
            deleteClusterLinkMetadata(str, str2, list3, l);
        }
        List<LogicalClusterMetadata> list4 = (List) list.get(MetadataEvent.MetadataCase.LOGICAL_CLUSTER_METADATA.getNumber()).stream().map((v0) -> {
            return v0.getLogicalClusterMetadata();
        }).collect(Collectors.toList());
        if (list4.size() > 0) {
            deleteLogicalClusterMetadata(str, list4, l);
        }
        List<ConnectMetadata> list5 = (List) list.get(MetadataEvent.MetadataCase.CONNECT_METADATA.getNumber()).stream().map((v0) -> {
            return v0.getConnectMetadata();
        }).collect(Collectors.toList());
        if (list5.size() > 0) {
            deleteConnectMetadata(str, list5, l);
        }
        List<PipelineMetadata> list6 = (List) list.get(MetadataEvent.MetadataCase.PIPELINE_METADATA.getNumber()).stream().map((v0) -> {
            return v0.getPipelineMetadata();
        }).collect(Collectors.toList());
        if (list6.size() > 0) {
            deletePipelineMetadata(str, list6, l);
        }
        List<EnvironmentMetadata> list7 = (List) list.get(MetadataEvent.MetadataCase.ENVIRONMENT_METADATA.getNumber()).stream().map((v0) -> {
            return v0.getEnvironmentMetadata();
        }).collect(Collectors.toList());
        if (list7.size() > 0) {
            deleteEnvironmentMetadata(str, list7, l);
        }
    }

    private void createOrUpdateTopicMetadata(String str, String str2, List<TopicMetadata> list, Long l) {
        try {
            AtlasEntity.AtlasEntityWithExtInfo maybeCreateDummyKafkaCluster = maybeCreateDummyKafkaCluster(str, str2);
            ArrayList arrayList = new ArrayList();
            for (TopicMetadata topicMetadata : list) {
                String qualifiedName = QualifiedNameGenerator.getQualifiedName(str, str2, topicMetadata.getTopicName());
                Long l2 = (Long) this.entityTimeCache.getIfPresent(qualifiedName);
                if (l2 == null || l == null || l.longValue() >= l2.longValue()) {
                    arrayList.add(newTopic(str, str2, topicMetadata.getTopicName(), qualifiedName, topicMetadata, maybeCreateDummyKafkaCluster.getEntity()));
                    if (l != null) {
                        this.entityTimeCache.put(qualifiedName, l);
                    }
                } else {
                    if (topicMetadata.getCreateTime().getSeconds() != 0) {
                        arrayList.add(newTopicWithCreateTime(str, topicMetadata.getTopicName(), qualifiedName, new Date(Timestamps.toMillis(topicMetadata.getCreateTime()))));
                        log.info("Update the topic createTime from out-of-order create event {}", qualifiedName);
                    }
                    log.warn("Discarding event with time {} qualifiedName {}", Instant.ofEpochMilli(l.longValue()), qualifiedName);
                }
            }
            this.metadataRegistry.createOrUpdateEntities(str, arrayList, this::newEntityHasChanges);
            this.metricsManager.recordIngestionTransform(1);
            this.metricsManager.recordIngestionTopicCreation(arrayList.size());
        } catch (AtlasBaseException e) {
            this.metricsManager.recordIngestionTransformError(1);
            this.metricsManager.recordIngestionTopicCreationError(1);
            log.error(String.format("Could not process topic creation metadata, cluster id %s", str2), e);
        }
    }

    private AtlasEntity.AtlasEntityWithExtInfo maybeCreateDummyTopic(String str, String str2, String str3, AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo) {
        String qualifiedName = QualifiedNameGenerator.getQualifiedName(str, str2, str3);
        AtlasEntity.AtlasEntityWithExtInfo newTopic = newTopic(str, str2, str3, qualifiedName, null, atlasEntityWithExtInfo.getEntity());
        String str4 = null;
        try {
            str4 = this.metadataRegistry.getGuid(str, ModelConstants.ENTITY_KAFKA_TOPIC, qualifiedName);
        } catch (AtlasBaseException e) {
        }
        if (str4 == null) {
            try {
                this.metadataRegistry.createOrUpdateEntity(str, newTopic);
            } catch (AtlasBaseException e2) {
                log.error(String.format("Could not create topic, cluster id %s", str2), e2);
            }
        }
        return newTopic;
    }

    private AtlasEntity.AtlasEntityWithExtInfo newTopicWithCreateTime(String str, String str2, String str3, Date date) {
        AtlasEntity atlasEntity = new AtlasEntity(ModelConstants.ENTITY_KAFKA_TOPIC);
        atlasEntity.setAttribute("tenant", str);
        atlasEntity.setAttribute(ModelConstants.ATTR_QUALIFIED_NAME, str3);
        atlasEntity.setAttribute(ModelConstants.ATTR_NAME, str2);
        atlasEntity.setAttribute(ModelConstants.ATTR_NAME_LOWER, str2.toLowerCase(Locale.ROOT));
        atlasEntity.setAttribute(ModelConstants.ATTR_CREATE_TIME, date);
        return new AtlasEntity.AtlasEntityWithExtInfo(atlasEntity);
    }

    private AtlasEntity.AtlasEntityWithExtInfo newTopic(String str, String str2, String str3, String str4, TopicMetadata topicMetadata, AtlasEntity atlasEntity) {
        AtlasEntity atlasEntity2 = new AtlasEntity(ModelConstants.ENTITY_KAFKA_TOPIC);
        atlasEntity2.setAttribute("tenant", str);
        atlasEntity2.setAttribute(ModelConstants.ATTR_QUALIFIED_NAME, str4);
        atlasEntity2.setAttribute(ModelConstants.ATTR_NAME, str3);
        atlasEntity2.setAttribute(ModelConstants.ATTR_NAME_LOWER, str3.toLowerCase(Locale.ROOT));
        atlasEntity2.setAttribute(ModelConstants.ATTR_CREATE_TIME, ModelConstants.UNIX_ZERO_EPOCH);
        atlasEntity2.setAttribute(ModelConstants.ATTR_IS_INTERNAL, Boolean.valueOf(str3.startsWith("_")));
        if (topicMetadata != null) {
            if (!topicMetadata.getOwner().isEmpty()) {
                atlasEntity2.setAttribute(ModelConstants.ATTR_OWNER, topicMetadata.getOwner());
            }
            if (!topicMetadata.getTopicId().isEmpty()) {
                atlasEntity2.setAttribute(ModelConstants.ATTR_ID, topicMetadata.getTopicId());
            }
            if (!topicMetadata.mo612getFollowerReplicationThrottledReplicasList().isEmpty()) {
                atlasEntity2.setAttribute(ModelConstants.ATTR_FOLLOWER_REPLICATION_THROTTLED_REPLICAS, new ArrayList((Collection) topicMetadata.mo612getFollowerReplicationThrottledReplicasList()));
            }
            if (!topicMetadata.mo611getLeaderReplicationThrottledReplicasList().isEmpty()) {
                atlasEntity2.setAttribute(ModelConstants.ATTR_LEADER_REPLICATION_THROTTLED_REPLICAS, new ArrayList((Collection) topicMetadata.mo611getLeaderReplicationThrottledReplicasList()));
            }
            if (!topicMetadata.getMessageFormatVersion().isEmpty()) {
                atlasEntity2.setAttribute(ModelConstants.ATTR_MESSAGE_FORMAT_VERSION, topicMetadata.getMessageFormatVersion());
            }
            if (!topicMetadata.getMessageTimestampType().isEmpty()) {
                atlasEntity2.setAttribute(ModelConstants.ATTR_MESSAGE_TIMESTAMP_TYPE, topicMetadata.getMessageTimestampType());
            }
            if (topicMetadata.getCreateTime().getSeconds() != 0) {
                atlasEntity2.setAttribute(ModelConstants.ATTR_CREATE_TIME, new Date(Timestamps.toMillis(topicMetadata.getCreateTime())));
            }
            if (topicMetadata.getUpdateTime().getSeconds() != 0) {
                atlasEntity2.setAttribute(ModelConstants.ATTR_UPDATE_TIME, new Date(Timestamps.toMillis(topicMetadata.getUpdateTime())));
            }
            atlasEntity2.setAttribute(ModelConstants.ATTR_RETENTION_MS, Long.valueOf(topicMetadata.getRetentionMs()));
            atlasEntity2.setAttribute(ModelConstants.ATTR_RETENTION_BYTES, Long.valueOf(topicMetadata.getRetentionBytes()));
            atlasEntity2.setAttribute(ModelConstants.ATTR_REPLICATION_FACTOR, Integer.valueOf(topicMetadata.getReplicationFactor()));
            atlasEntity2.setAttribute(ModelConstants.ATTR_PARTITIONS_COUNT, Integer.valueOf(topicMetadata.getPartitionsCount()));
            atlasEntity2.setAttribute(ModelConstants.ATTR_CLEANUP_POLICY, getCleanupPolicy(topicMetadata.getCleanupPolicy()));
            atlasEntity2.setAttribute(ModelConstants.ATTR_KEY_SCHEMA_VALIDATION, Boolean.valueOf(topicMetadata.getKeySchemaValidation()));
            atlasEntity2.setAttribute(ModelConstants.ATTR_VALUE_SCHEMA_VALIDATION, Boolean.valueOf(topicMetadata.getValueSchemaValidation()));
            atlasEntity2.setAttribute(ModelConstants.ATTR_COMPRESSION_TYPE, getCompressionType(topicMetadata.getCompressionType()));
            atlasEntity2.setAttribute(ModelConstants.ATTR_FILE_DELETE_DELAY_MS, Long.valueOf(topicMetadata.getFileDeleteDelayMs()));
            atlasEntity2.setAttribute(ModelConstants.ATTR_FLUSH_MESSAGES, Long.valueOf(topicMetadata.getFlushMessages()));
            atlasEntity2.setAttribute(ModelConstants.ATTR_FLUSH_MS, Long.valueOf(topicMetadata.getFlushMs()));
            atlasEntity2.setAttribute(ModelConstants.ATTR_INDEX_INTERVAL_BYTES, Integer.valueOf(topicMetadata.getIndexIntervalBytes()));
            atlasEntity2.setAttribute(ModelConstants.ATTR_MAX_COMPACTION_LAG_MS, Long.valueOf(topicMetadata.getMaxCompactionLagMs()));
            atlasEntity2.setAttribute(ModelConstants.ATTR_MAX_MESSAGE_BYTES, Integer.valueOf(topicMetadata.getMaxMessageBytes()));
            atlasEntity2.setAttribute(ModelConstants.ATTR_MESSAGE_DOWNCONVERSION_ENABLE, Boolean.valueOf(topicMetadata.getMessageDownconversionEnable()));
            atlasEntity2.setAttribute(ModelConstants.ATTR_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS, Long.valueOf(topicMetadata.getMessageTimestampDifferenceMaxMs()));
            atlasEntity2.setAttribute(ModelConstants.ATTR_MIN_CLEANABLE_DIRTY_RATIO, Double.valueOf(topicMetadata.getMinCleanableDirtyRatio()));
            atlasEntity2.setAttribute(ModelConstants.ATTR_MIN_COMPACTION_LAG_MS, Long.valueOf(topicMetadata.getMinCompactionLagMs()));
            atlasEntity2.setAttribute(ModelConstants.ATTR_MIN_INSYNC_REPLICAS, Integer.valueOf(topicMetadata.getMinInsyncReplicas()));
            atlasEntity2.setAttribute(ModelConstants.ATTR_PREALLOCATE, Boolean.valueOf(topicMetadata.getPreallocate()));
            atlasEntity2.setAttribute(ModelConstants.ATTR_SEGMENT_BYTES, Integer.valueOf(topicMetadata.getSegmentBytes()));
            atlasEntity2.setAttribute(ModelConstants.ATTR_SEGMENT_INDEX_BYTES, Integer.valueOf(topicMetadata.getSegmentIndexBytes()));
            atlasEntity2.setAttribute(ModelConstants.ATTR_SEGMENT_JITTER_MS, Long.valueOf(topicMetadata.getSegmentJitterMs()));
            atlasEntity2.setAttribute(ModelConstants.ATTR_SEGMENT_MS, Long.valueOf(topicMetadata.getSegmentMs()));
            atlasEntity2.setAttribute(ModelConstants.ATTR_UNCLEAN_LEADER_ELECTION_ENABLE, Boolean.valueOf(topicMetadata.getUncleanLeaderElectionEnable()));
            atlasEntity2.setAttribute(ModelConstants.ATTR_DELETE_RETENTION_MS, Long.valueOf(topicMetadata.getDeleteRetentionMs()));
            if (!topicMetadata.getMirrorTopicMetadata().getLinkId().isEmpty()) {
                atlasEntity2.setRelationshipAttribute(ModelConstants.RELN_CLUSTER_LINK, MetadataRegistry.toRelatedObjectIdWithUniqAttr(maybeCreateDummyClusterLink(str, str2, topicMetadata.getMirrorTopicMetadata().getLinkId(), topicMetadata.getMirrorTopicMetadata().getLinkName()).getEntity()));
            }
            String sourceTopicName = topicMetadata.getMirrorTopicMetadata().getSourceTopicName();
            if (topicMetadata.getMirrorTopicMetadata().getRemoteClusterId().isEmpty()) {
                if (!sourceTopicName.isEmpty()) {
                    atlasEntity2.setAttribute(ModelConstants.ATTR_EXTERNAL_SOURCE_TOPIC_NAME, sourceTopicName);
                }
                if (!topicMetadata.getMirrorTopicMetadata().getSourceTopicId().isEmpty()) {
                    atlasEntity2.setAttribute(ModelConstants.ATTR_EXTERNAL_SOURCE_TOPIC_ID, topicMetadata.getMirrorTopicMetadata().getSourceTopicId());
                }
            } else {
                String remoteClusterId = topicMetadata.getMirrorTopicMetadata().getRemoteClusterId();
                AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo = null;
                try {
                    atlasEntityWithExtInfo = this.metadataRegistry.getEntity(str, ModelConstants.ENTITY_KAFKA_LOGICAL_CLUSTER, QualifiedNameGenerator.getQualifiedName(str, remoteClusterId), true, true);
                } catch (AtlasBaseException e) {
                }
                if (atlasEntityWithExtInfo == null) {
                    if (!sourceTopicName.isEmpty()) {
                        atlasEntity2.setAttribute(ModelConstants.ATTR_EXTERNAL_SOURCE_TOPIC_NAME, sourceTopicName);
                    }
                    if (!topicMetadata.getMirrorTopicMetadata().getSourceTopicId().isEmpty()) {
                        atlasEntity2.setAttribute(ModelConstants.ATTR_EXTERNAL_SOURCE_TOPIC_ID, topicMetadata.getMirrorTopicMetadata().getSourceTopicId());
                    }
                } else if (!sourceTopicName.isEmpty()) {
                    atlasEntity2.setRelationshipAttribute(ModelConstants.RELN_SOURCE_TOPIC, MetadataRegistry.toRelatedObjectIdWithUniqAttr(maybeCreateDummyTopic(str, remoteClusterId, sourceTopicName, atlasEntityWithExtInfo).getEntity()));
                }
            }
            if (!topicMetadata.getMirrorTopicMetadata().getMirrorTopicState().isEmpty()) {
                atlasEntity2.setAttribute(ModelConstants.ATTR_MIRROR_TOPIC_STATE, topicMetadata.getMirrorTopicMetadata().getMirrorTopicState());
            }
            if (topicMetadata.getMirrorTopicMetadata().getUpdateTime().getSeconds() != 0) {
                atlasEntity2.setAttribute(ModelConstants.ATTR_MIRROR_TOPIC_UPDATE_TIME, new Date(Timestamps.toMillis(topicMetadata.getMirrorTopicMetadata().getUpdateTime())));
            }
        }
        if (atlasEntity != null) {
            Date dateAttribute = MetadataRegistry.getDateAttribute(atlasEntity.getAttribute(ModelConstants.ATTR_DEPRECATED_TIME));
            if (dateAttribute != null && !dateAttribute.equals(ModelConstants.UNIX_ZERO_EPOCH)) {
                atlasEntity2.setAttribute(ModelConstants.ATTR_DEPRECATED_TIME, dateAttribute);
            }
            atlasEntity2.setRelationshipAttribute(ModelConstants.RELN_LOGICAL_CLUSTER, MetadataRegistry.toRelatedObjectIdWithUniqAttr(atlasEntity));
        }
        return new AtlasEntity.AtlasEntityWithExtInfo(atlasEntity2);
    }

    private boolean newEntityHasChanges(AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo, AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo2) {
        boolean z = atlasEntityWithExtInfo == null || atlasEntityWithExtInfo.getEntity() == null;
        if (atlasEntityWithExtInfo2 == null || atlasEntityWithExtInfo2.getEntity() == null) {
            return false;
        }
        if (z) {
            return true;
        }
        Map attributes = atlasEntityWithExtInfo.getEntity().getAttributes();
        Map attributes2 = atlasEntityWithExtInfo2.getEntity().getAttributes();
        for (Map.Entry entry : attributes.entrySet()) {
            String str = (String) entry.getKey();
            Object value = entry.getValue();
            Object obj = attributes2.get(str);
            if (obj != null && !obj.equals(value)) {
                return true;
            }
        }
        return false;
    }

    private void deleteTopicMetadata(String str, String str2, List<TopicMetadata> list, Long l) {
        for (TopicMetadata topicMetadata : list) {
            String qualifiedName = QualifiedNameGenerator.getQualifiedName(str, str2, topicMetadata.getTopicName());
            Long l2 = (Long) this.entityTimeCache.getIfPresent(qualifiedName);
            if (l2 == null || l == null || l.longValue() >= l2.longValue()) {
                deleteTopicMetadata(str, str2, topicMetadata);
                if (l != null) {
                    this.entityTimeCache.put(qualifiedName, l);
                }
            } else {
                log.warn("Discarding event with time {}", Instant.ofEpochMilli(l.longValue()));
            }
        }
    }

    private void deleteTopicMetadata(String str, String str2, TopicMetadata topicMetadata) {
        try {
            String qualifiedName = QualifiedNameGenerator.getQualifiedName(str, str2, topicMetadata.getTopicName());
            this.metadataRegistry.deleteEntity(str, ModelConstants.ENTITY_KAFKA_TOPIC, qualifiedName);
            this.metadataRegistry.purgeEntity(str, ModelConstants.ENTITY_KAFKA_TOPIC, qualifiedName);
            this.metricsManager.recordIngestionTransform(1);
            this.metricsManager.recordIngestionTopicDeletion(1);
        } catch (AtlasBaseException e) {
            this.metricsManager.recordIngestionTransformError(1);
            this.metricsManager.recordIngestionTopicDeletionError(1);
            log.error(String.format("Could not process topic deletion metadata, cluster id %s", str2), e);
        }
    }

    private void createOrUpdateClusterLinkMetadata(String str, String str2, List<ClusterLinkMetadata> list, Long l) {
        try {
            for (ClusterLinkMetadata clusterLinkMetadata : list) {
                String qualifiedName = QualifiedNameGenerator.getQualifiedName(str, str2, clusterLinkMetadata.getClusterLinkId());
                Long l2 = (Long) this.entityTimeCache.getIfPresent(qualifiedName);
                if (l2 == null || l == null || l.longValue() >= l2.longValue()) {
                    AtlasEntity.AtlasEntityWithExtInfo maybeCreateDummyKafkaCluster = maybeCreateDummyKafkaCluster(str, clusterLinkMetadata.getLocalClusterId());
                    AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo = null;
                    try {
                        atlasEntityWithExtInfo = this.metadataRegistry.getEntity(str, ModelConstants.ENTITY_KAFKA_LOGICAL_CLUSTER, QualifiedNameGenerator.getQualifiedName(str, clusterLinkMetadata.getRemoteClusterId()), true, true);
                    } catch (AtlasBaseException e) {
                    }
                    AtlasEntity.AtlasEntityWithExtInfo newClusterLink = newClusterLink(str, clusterLinkMetadata.getClusterLinkId(), clusterLinkMetadata.getClusterLinkName(), qualifiedName, clusterLinkMetadata, maybeCreateDummyKafkaCluster, atlasEntityWithExtInfo);
                    if (l != null) {
                        this.entityTimeCache.put(qualifiedName, l);
                    }
                    this.metadataRegistry.createOrUpdateEntity(str, newClusterLink);
                    this.metricsManager.recordIngestionClusterLinkCreation(1);
                } else {
                    log.warn("Discarding cluster link creation event with time {}", Instant.ofEpochMilli(l.longValue()));
                }
            }
            this.metricsManager.recordIngestionTransform(1);
        } catch (AtlasBaseException e2) {
            this.metricsManager.recordIngestionTransformError(1);
            this.metricsManager.recordIngestionClusterLinkCreationError(1);
            log.error("Could not process cluster link creation metadata", e2);
        }
    }

    private AtlasEntity.AtlasEntityWithExtInfo maybeCreateDummyClusterLink(String str, String str2, String str3, String str4) {
        String qualifiedName = QualifiedNameGenerator.getQualifiedName(str, str2, str3);
        AtlasEntity.AtlasEntityWithExtInfo newClusterLink = newClusterLink(str, str3, str4, qualifiedName, null, null, null);
        String str5 = null;
        try {
            str5 = this.metadataRegistry.getGuid(str, ModelConstants.ENTITY_KAFKA_CLUSTER_LINK, qualifiedName);
        } catch (AtlasBaseException e) {
        }
        if (str5 == null) {
            try {
                this.metadataRegistry.createOrUpdateEntity(str, newClusterLink);
            } catch (AtlasBaseException e2) {
                log.warn("Could not create cluster link {}", newClusterLink);
            }
        }
        return newClusterLink;
    }

    private AtlasEntity.AtlasEntityWithExtInfo newClusterLink(String str, String str2, String str3, String str4, ClusterLinkMetadata clusterLinkMetadata, AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo, AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo2) {
        AtlasEntity atlasEntity = new AtlasEntity(ModelConstants.ENTITY_KAFKA_CLUSTER_LINK);
        atlasEntity.setAttribute("tenant", str);
        atlasEntity.setAttribute(ModelConstants.ATTR_QUALIFIED_NAME, str4);
        atlasEntity.setAttribute(ModelConstants.ATTR_NAME, str3);
        atlasEntity.setAttribute(ModelConstants.ATTR_NAME_LOWER, str3.toLowerCase(Locale.ROOT));
        atlasEntity.setAttribute(ModelConstants.ATTR_CREATE_TIME, ModelConstants.UNIX_ZERO_EPOCH);
        if (clusterLinkMetadata != null) {
            if (!clusterLinkMetadata.getClusterLinkId().isEmpty()) {
                atlasEntity.setAttribute(ModelConstants.ATTR_ID, clusterLinkMetadata.getClusterLinkId());
            }
            if (!clusterLinkMetadata.getLinkMode().isEmpty()) {
                atlasEntity.setAttribute(ModelConstants.ATTR_LINK_MODE, clusterLinkMetadata.getLinkMode());
            }
            if (!clusterLinkMetadata.getConnectionMode().isEmpty()) {
                atlasEntity.setAttribute(ModelConstants.ATTR_CONNECTION_MODE, clusterLinkMetadata.getConnectionMode());
            }
            if (clusterLinkMetadata.getCreateTime().getSeconds() != 0) {
                atlasEntity.setAttribute(ModelConstants.ATTR_CREATE_TIME, new Date(Timestamps.toMillis(clusterLinkMetadata.getCreateTime())));
            }
            if (clusterLinkMetadata.getUpdateTime().getSeconds() != 0) {
                atlasEntity.setAttribute(ModelConstants.ATTR_UPDATE_TIME, new Date(Timestamps.toMillis(clusterLinkMetadata.getUpdateTime())));
            }
            if (atlasEntityWithExtInfo != null) {
                atlasEntity.setRelationshipAttribute(ModelConstants.RELN_DESTINATION_CLUSTER, MetadataRegistry.toRelatedObjectIdWithUniqAttr(atlasEntityWithExtInfo.getEntity()));
            } else {
                log.warn("Local kafka cluster entity doesn't exist when creating cluster link {}", str4);
            }
            if (atlasEntityWithExtInfo2 != null) {
                atlasEntity.setRelationshipAttribute(ModelConstants.RELN_SOURCE_CLUSTER, MetadataRegistry.toRelatedObjectIdWithUniqAttr(atlasEntityWithExtInfo2.getEntity()));
            } else {
                atlasEntity.setAttribute(ModelConstants.ATTR_EXTERNAL_SOURCE_CLUSTER, clusterLinkMetadata.getRemoteClusterId());
            }
        } else if (str2 != null) {
            atlasEntity.setAttribute(ModelConstants.ATTR_ID, str2);
        }
        return new AtlasEntity.AtlasEntityWithExtInfo(atlasEntity);
    }

    private void deleteClusterLinkMetadata(String str, String str2, List<ClusterLinkMetadata> list, Long l) {
        for (ClusterLinkMetadata clusterLinkMetadata : list) {
            String qualifiedName = QualifiedNameGenerator.getQualifiedName(str, str2, clusterLinkMetadata.getClusterLinkId());
            Long l2 = (Long) this.entityTimeCache.getIfPresent(qualifiedName);
            if (l2 == null || l == null || l.longValue() >= l2.longValue()) {
                deleteClusterLinkMetadata(str, str2, clusterLinkMetadata);
                if (l != null) {
                    this.entityTimeCache.put(qualifiedName, l);
                }
            } else {
                log.warn("Discarding cluster link deletion event with time {}", Instant.ofEpochMilli(l.longValue()));
            }
        }
    }

    private void deleteClusterLinkMetadata(String str, String str2, ClusterLinkMetadata clusterLinkMetadata) {
        try {
            String qualifiedName = QualifiedNameGenerator.getQualifiedName(str, str2, clusterLinkMetadata.getClusterLinkId());
            this.metadataRegistry.deleteEntity(str, ModelConstants.ENTITY_KAFKA_CLUSTER_LINK, qualifiedName);
            this.metadataRegistry.purgeEntity(str, ModelConstants.ENTITY_KAFKA_CLUSTER_LINK, qualifiedName);
            this.metricsManager.recordIngestionTransform(1);
            this.metricsManager.recordIngestionClusterLinkDeletion(1);
        } catch (AtlasBaseException e) {
            this.metricsManager.recordIngestionTransformError(1);
            this.metricsManager.recordIngestionClusterLinkDeletionError(1);
            log.error("Could not process cluster link deletion metadata" + clusterLinkMetadata, e);
        }
    }

    private void createOrUpdateLogicalClusterMetadata(String str, List<LogicalClusterMetadata> list, Long l) {
        for (LogicalClusterMetadata logicalClusterMetadata : list) {
            String qualifiedName = QualifiedNameGenerator.getQualifiedName(str, logicalClusterMetadata.getClusterId());
            Long l2 = (Long) this.entityTimeCache.getIfPresent(qualifiedName);
            if (l2 == null || l == null || l.longValue() >= l2.longValue()) {
                try {
                    this.metadataRegistry.createOrUpdateEntity(str, newKafkaCluster(str, logicalClusterMetadata.getClusterId(), logicalClusterMetadata.getName(), qualifiedName, logicalClusterMetadata));
                    this.metricsManager.recordIngestionClusterCreation(1);
                } catch (AtlasBaseException e) {
                    this.metricsManager.recordIngestionTransformError(1);
                    this.metricsManager.recordIngestionClusterCreationError(1);
                    log.error(String.format("Could not process logical cluster creation event %s", logicalClusterMetadata), e);
                }
                if (l != null) {
                    this.entityTimeCache.put(qualifiedName, l);
                }
            } else {
                log.warn("Discarding logical cluster creation event with time {}", Instant.ofEpochMilli(l.longValue()));
            }
        }
        this.metricsManager.recordIngestionTransform(1);
    }

    private AtlasEntity.AtlasEntityWithExtInfo maybeCreateDummyKafkaCluster(String str, String str2) {
        String qualifiedName = QualifiedNameGenerator.getQualifiedName(str, str2);
        AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo = null;
        try {
            atlasEntityWithExtInfo = this.metadataRegistry.getEntity(str, ModelConstants.ENTITY_KAFKA_LOGICAL_CLUSTER, qualifiedName, true, true);
        } catch (AtlasBaseException e) {
        }
        if (atlasEntityWithExtInfo == null) {
            try {
                atlasEntityWithExtInfo = newKafkaCluster(str, str2, "", qualifiedName, null);
                this.metadataRegistry.createOrUpdateEntity(str, atlasEntityWithExtInfo);
            } catch (AtlasBaseException e2) {
                log.error(String.format("Could not create kafka logical cluster %s, qualifiedName %s", str2, qualifiedName), e2);
            }
        }
        return atlasEntityWithExtInfo;
    }

    private AtlasEntity.AtlasEntityWithExtInfo newKafkaCluster(String str, String str2, String str3, String str4, LogicalClusterMetadata logicalClusterMetadata) throws AtlasBaseException {
        AtlasEntity atlasEntity = new AtlasEntity(ModelConstants.ENTITY_KAFKA_LOGICAL_CLUSTER);
        atlasEntity.setAttribute("tenant", str);
        atlasEntity.setAttribute(ModelConstants.ATTR_QUALIFIED_NAME, str4);
        atlasEntity.setAttribute(ModelConstants.ATTR_NAME, str3);
        atlasEntity.setAttribute(ModelConstants.ATTR_NAME_LOWER, str3.toLowerCase(Locale.ROOT));
        atlasEntity.setAttribute(ModelConstants.ATTR_CREATE_TIME, ModelConstants.UNIX_ZERO_EPOCH);
        atlasEntity.setAttribute(ModelConstants.ATTR_DEPRECATED_TIME, ModelConstants.UNIX_ZERO_EPOCH);
        if (logicalClusterMetadata != null) {
            if (logicalClusterMetadata.getClusterId().isEmpty()) {
                throw new AtlasBaseException(AtlasErrorCode.MISSING_MANDATORY_ATTRIBUTE, new String[]{ModelConstants.ENTITY_KAFKA_LOGICAL_CLUSTER, ModelConstants.ATTR_CLUSTER_ID});
            }
            atlasEntity.setAttribute(ModelConstants.ATTR_ID, logicalClusterMetadata.getClusterId());
            atlasEntity.setAttribute(ModelConstants.ATTR_STATUS, logicalClusterMetadata.getClusterStatus().name());
            atlasEntity.setAttribute(ModelConstants.ATTR_SKU, logicalClusterMetadata.getSku().name());
            atlasEntity.setAttribute(ModelConstants.ATTR_PROVIDER, logicalClusterMetadata.getCloud().name());
            if (!logicalClusterMetadata.getRegion().isEmpty()) {
                atlasEntity.setAttribute(ModelConstants.ATTR_REGION, logicalClusterMetadata.getRegion());
            }
            atlasEntity.setAttribute(ModelConstants.ATTR_AVAILABILITY, logicalClusterMetadata.getAvailability().name());
            atlasEntity.setAttribute(ModelConstants.ATTR_CKU, Integer.valueOf(logicalClusterMetadata.getCku()));
            if (!logicalClusterMetadata.getSelectedNetworkType().isEmpty()) {
                atlasEntity.setAttribute(ModelConstants.ATTR_NETWORK, logicalClusterMetadata.getSelectedNetworkType());
            }
            if (logicalClusterMetadata.getCreated().getSeconds() != 0) {
                atlasEntity.setAttribute(ModelConstants.ATTR_CREATE_TIME, new Date(Timestamps.toMillis(logicalClusterMetadata.getCreated())));
            }
            if (logicalClusterMetadata.getModified().getSeconds() != 0) {
                atlasEntity.setAttribute(ModelConstants.ATTR_UPDATE_TIME, new Date(Timestamps.toMillis(logicalClusterMetadata.getModified())));
            }
            if (logicalClusterMetadata.getDeactivated().getSeconds() != 0) {
                atlasEntity.setAttribute(ModelConstants.ATTR_DEACTIVATE_TIME, new Date(Timestamps.toMillis(logicalClusterMetadata.getDeactivated())));
                atlasEntity.setAttribute(ModelConstants.ATTR_DEPRECATED_TIME, new Date(Timestamps.toMillis(logicalClusterMetadata.getDeactivated())));
            }
        } else {
            if (str2 == null || str2.isEmpty()) {
                throw new AtlasBaseException(AtlasErrorCode.MISSING_MANDATORY_ATTRIBUTE, new String[]{ModelConstants.ENTITY_KAFKA_LOGICAL_CLUSTER, ModelConstants.ATTR_CLUSTER_ID});
            }
            atlasEntity.setAttribute(ModelConstants.ATTR_ID, str2);
        }
        return new AtlasEntity.AtlasEntityWithExtInfo(atlasEntity);
    }

    private void deleteLogicalClusterMetadata(String str, List<LogicalClusterMetadata> list, Long l) {
        for (LogicalClusterMetadata logicalClusterMetadata : list) {
            String qualifiedName = QualifiedNameGenerator.getQualifiedName(str, logicalClusterMetadata.getClusterId());
            Long l2 = (Long) this.entityTimeCache.getIfPresent(qualifiedName);
            if (l2 == null || l == null || l.longValue() >= l2.longValue()) {
                deprecateKafkaClusterMetadata(str, logicalClusterMetadata);
                if (l != null) {
                    this.entityTimeCache.put(qualifiedName, l);
                }
            } else {
                log.warn("Discarding event with time {}", Instant.ofEpochMilli(l.longValue()));
            }
        }
    }

    private void deprecateKafkaClusterMetadata(String str, LogicalClusterMetadata logicalClusterMetadata) {
        String qualifiedName = QualifiedNameGenerator.getQualifiedName(str, logicalClusterMetadata.getClusterId());
        log.info(String.format("Deprecate logical kafka cluster %s", qualifiedName));
        try {
            if (logicalClusterMetadata.getDeactivated().getSeconds() == 0) {
                throw new AtlasBaseException(String.format("The deactivate time is empty for logical cluster delete event, cluster id %s", logicalClusterMetadata.getClusterId()));
            }
            this.metadataRegistry.createOrUpdateEntity(str, newKafkaCluster(str, logicalClusterMetadata.getClusterId(), logicalClusterMetadata.getName(), qualifiedName, logicalClusterMetadata));
            this.metricsManager.recordIngestionTransform(1);
            this.metricsManager.recordIngestionClusterDeletion(1);
        } catch (AtlasBaseException e) {
            log.error(String.format("Error when deprecating kafka cluster metadata in catalog with logicalClusterQualifiedName %s", qualifiedName), e);
            this.metricsManager.recordIngestionTransformError(1);
            this.metricsManager.recordIngestionClusterDeletionError(1);
        }
    }

    private void createOrUpdateConnectMetadata(String str, List<ConnectMetadata> list, Long l) {
        try {
            for (ConnectMetadata connectMetadata : list) {
                String qualifiedName = QualifiedNameGenerator.getQualifiedName(str, connectMetadata.getClusterId());
                Long l2 = (Long) this.entityTimeCache.getIfPresent(qualifiedName);
                if (l2 == null || l == null || l.longValue() >= l2.longValue()) {
                    String kafkaClusterId = connectMetadata.getKafkaClusterId();
                    AtlasEntity.AtlasEntityWithExtInfo maybeCreateDummyKafkaCluster = maybeCreateDummyKafkaCluster(str, kafkaClusterId);
                    ArrayList arrayList = new ArrayList();
                    Iterator it = connectMetadata.mo209getTopicsList().iterator();
                    while (it.hasNext()) {
                        arrayList.add(maybeCreateDummyTopic(str, kafkaClusterId, (String) it.next(), maybeCreateDummyKafkaCluster).getEntity());
                    }
                    AtlasEntity.AtlasEntityWithExtInfo newConnector = newConnector(str, connectMetadata.getClusterId(), connectMetadata.getName(), qualifiedName, connectMetadata, arrayList);
                    if (l != null) {
                        this.entityTimeCache.put(qualifiedName, l);
                    }
                    this.metadataRegistry.createOrUpdateEntity(str, newConnector);
                    this.metricsManager.recordIngestionConnectorCreation(1);
                } else {
                    log.warn("Discarding event with time {}", Instant.ofEpochMilli(l.longValue()));
                }
            }
            this.metricsManager.recordIngestionTransform(1);
        } catch (AtlasBaseException e) {
            this.metricsManager.recordIngestionTransformError(1);
            this.metricsManager.recordIngestionConnectorCreationError(1);
            log.error("Could not process connector creation metadata", e);
        }
    }

    private AtlasEntity.AtlasEntityWithExtInfo maybeCreateDummyConnector(String str, String str2) {
        String qualifiedName = QualifiedNameGenerator.getQualifiedName(str, str2);
        AtlasEntity.AtlasEntityWithExtInfo newConnector = newConnector(str, str2, "", qualifiedName, null, null);
        String str3 = null;
        try {
            str3 = this.metadataRegistry.getGuid(str, ModelConstants.ENTITY_CN_CONNECTOR, qualifiedName);
        } catch (AtlasBaseException e) {
        }
        if (str3 == null) {
            try {
                this.metadataRegistry.createOrUpdateEntity(str, newConnector);
            } catch (AtlasBaseException e2) {
                log.warn("Could not create connector {}", qualifiedName);
            }
        }
        return newConnector;
    }

    private AtlasEntity.AtlasEntityWithExtInfo newConnector(String str, String str2, String str3, String str4, ConnectMetadata connectMetadata, List<AtlasEntity> list) {
        AtlasEntity atlasEntity = new AtlasEntity(ModelConstants.ENTITY_CN_CONNECTOR);
        atlasEntity.setAttribute("tenant", str);
        atlasEntity.setAttribute(ModelConstants.ATTR_QUALIFIED_NAME, str4);
        atlasEntity.setAttribute(ModelConstants.ATTR_NAME, str3);
        atlasEntity.setAttribute(ModelConstants.ATTR_NAME_LOWER, str3.toLowerCase(Locale.ROOT));
        atlasEntity.setAttribute(ModelConstants.ATTR_CREATE_TIME, ModelConstants.UNIX_ZERO_EPOCH);
        if (connectMetadata != null) {
            if (!connectMetadata.getClusterId().isEmpty()) {
                atlasEntity.setAttribute(ModelConstants.ATTR_CLUSTER_ID, connectMetadata.getClusterId());
            }
            if (!connectMetadata.getClass_().isEmpty()) {
                atlasEntity.setAttribute(ModelConstants.ATTR_CLASS, connectMetadata.getClass_());
            }
            if (!connectMetadata.getOwner().isEmpty()) {
                atlasEntity.setAttribute(ModelConstants.ATTR_OWNER, connectMetadata.getOwner());
            }
            if (connectMetadata.getCreateTime().getSeconds() != 0) {
                atlasEntity.setAttribute(ModelConstants.ATTR_CREATE_TIME, new Date(Timestamps.toMillis(connectMetadata.getCreateTime())));
            }
            if (connectMetadata.getUpdateTime().getSeconds() != 0) {
                atlasEntity.setAttribute(ModelConstants.ATTR_UPDATE_TIME, new Date(Timestamps.toMillis(connectMetadata.getUpdateTime())));
            }
            atlasEntity.setAttribute("type", connectMetadata.getType().name());
            atlasEntity.setAttribute(ModelConstants.ATTR_TASKS_MAX, Integer.valueOf(connectMetadata.getTasksMax()));
            atlasEntity.setAttribute(ModelConstants.ATTR_KAFKA_AUTH_MODE, connectMetadata.getKafkaAuthMode().name());
            if (!connectMetadata.getKafkaServiceAccountId().isEmpty()) {
                atlasEntity.setAttribute(ModelConstants.ATTR_KAFKA_SERVICE_ACCOUNT_ID, connectMetadata.getKafkaServiceAccountId());
            }
            if (!connectMetadata.getKafkaApiKey().isEmpty()) {
                atlasEntity.setAttribute(ModelConstants.ATTR_KAFKA_API_KEY, connectMetadata.getKafkaApiKey());
            }
            if (!connectMetadata.getDlqTopic().isEmpty()) {
                atlasEntity.setAttribute(ModelConstants.ATTR_DLQ_TOPIC, connectMetadata.getDlqTopic());
            }
            if (!connectMetadata.getInputFormat().isEmpty()) {
                atlasEntity.setAttribute(ModelConstants.ATTR_INPUT_FORMAT, connectMetadata.getInputFormat());
            }
            if (!connectMetadata.getOutputFormat().isEmpty()) {
                atlasEntity.setAttribute(ModelConstants.ATTR_OUTPUT_FORMAT, connectMetadata.getOutputFormat());
            }
            if (!connectMetadata.getSourceSchema().isEmpty()) {
                atlasEntity.setAttribute(ModelConstants.ATTR_SOURCE_SCHEMA, connectMetadata.getSourceSchema());
            }
            if (!connectMetadata.getSourceEndpoint().isEmpty()) {
                atlasEntity.setAttribute(ModelConstants.ATTR_SOURCE_ENDPOINT, connectMetadata.getSourceEndpoint());
            }
        } else if (str2 != null) {
            atlasEntity.setAttribute(ModelConstants.ATTR_CLUSTER_ID, str2);
        }
        if (list != null) {
            atlasEntity.setRelationshipAttribute(ModelConstants.RELN_TOPICS, list.stream().map(MetadataRegistry::toRelatedObjectIdWithUniqAttr).collect(Collectors.toList()));
        }
        return new AtlasEntity.AtlasEntityWithExtInfo(atlasEntity);
    }

    private void deleteConnectMetadata(String str, List<ConnectMetadata> list, Long l) {
        for (ConnectMetadata connectMetadata : list) {
            String qualifiedName = QualifiedNameGenerator.getQualifiedName(str, connectMetadata.getClusterId());
            Long l2 = (Long) this.entityTimeCache.getIfPresent(qualifiedName);
            if (l2 == null || l == null || l.longValue() >= l2.longValue()) {
                deleteConnectMetadata(str, connectMetadata);
                if (l != null) {
                    this.entityTimeCache.put(qualifiedName, l);
                }
            } else {
                log.warn("Discarding event with time {}", Instant.ofEpochMilli(l.longValue()));
            }
        }
    }

    private void deleteConnectMetadata(String str, ConnectMetadata connectMetadata) {
        try {
            String qualifiedName = QualifiedNameGenerator.getQualifiedName(str, connectMetadata.getClusterId());
            this.metadataRegistry.deleteEntity(str, ModelConstants.ENTITY_CN_CONNECTOR, qualifiedName);
            this.metadataRegistry.purgeEntity(str, ModelConstants.ENTITY_CN_CONNECTOR, qualifiedName);
            this.metricsManager.recordIngestionTransform(1);
            this.metricsManager.recordIngestionConnectorDeletion(1);
        } catch (AtlasBaseException e) {
            this.metricsManager.recordIngestionTransformError(1);
            this.metricsManager.recordIngestionConnectorDeletionError(1);
            log.error("Could not process connector deletion metadata" + connectMetadata, e);
        }
    }

    private AtlasEntity.AtlasEntityWithExtInfo maybeCreateKsqlCluster(String str, String str2) {
        String qualifiedName = QualifiedNameGenerator.getQualifiedName(str, str2);
        AtlasEntity.AtlasEntityWithExtInfo newKsqlCluster = newKsqlCluster(str, str2, qualifiedName);
        String str3 = null;
        try {
            str3 = this.metadataRegistry.getGuid(str, ModelConstants.ENTITY_KSQL_LOGICAL_CLUSTER, qualifiedName);
        } catch (AtlasBaseException e) {
        }
        if (str3 == null) {
            try {
                this.metadataRegistry.createOrUpdateEntity(str, newKsqlCluster);
            } catch (AtlasBaseException e2) {
                log.warn("Could not create ksql logical cluster {}", qualifiedName);
            }
        }
        return newKsqlCluster;
    }

    private AtlasEntity.AtlasEntityWithExtInfo newKsqlCluster(String str, String str2, String str3) {
        AtlasEntity atlasEntity = new AtlasEntity(ModelConstants.ENTITY_KSQL_LOGICAL_CLUSTER);
        atlasEntity.setAttribute("tenant", str);
        atlasEntity.setAttribute(ModelConstants.ATTR_QUALIFIED_NAME, str3);
        atlasEntity.setAttribute(ModelConstants.ATTR_NAME, str2);
        atlasEntity.setAttribute(ModelConstants.ATTR_NAME_LOWER, str2.toLowerCase(Locale.ROOT));
        atlasEntity.setAttribute(ModelConstants.ATTR_CREATE_TIME, ModelConstants.UNIX_ZERO_EPOCH);
        atlasEntity.setAttribute(ModelConstants.ATTR_ID, str2);
        return new AtlasEntity.AtlasEntityWithExtInfo(atlasEntity);
    }

    private AtlasEntity.AtlasEntityWithExtInfo maybeCreateDummyStream(String str, String str2, String str3, AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo) {
        String qualifiedName = QualifiedNameGenerator.getQualifiedName(str, str2, str3);
        AtlasEntity.AtlasEntityWithExtInfo newStream = newStream(str, str3, qualifiedName, atlasEntityWithExtInfo.getEntity());
        String str4 = null;
        try {
            str4 = this.metadataRegistry.getGuid(str, ModelConstants.ENTITY_KSQL_STREAM, qualifiedName);
        } catch (AtlasBaseException e) {
        }
        if (str4 == null) {
            try {
                this.metadataRegistry.createOrUpdateEntity(str, newStream);
            } catch (AtlasBaseException e2) {
                log.warn("Could not create ksql stream {}", qualifiedName);
            }
        }
        return newStream;
    }

    private AtlasEntity.AtlasEntityWithExtInfo newStream(String str, String str2, String str3, AtlasEntity atlasEntity) {
        AtlasEntity atlasEntity2 = new AtlasEntity(ModelConstants.ENTITY_KSQL_STREAM);
        atlasEntity2.setAttribute("tenant", str);
        atlasEntity2.setAttribute(ModelConstants.ATTR_QUALIFIED_NAME, str3);
        atlasEntity2.setAttribute(ModelConstants.ATTR_NAME, str2);
        atlasEntity2.setAttribute(ModelConstants.ATTR_NAME_LOWER, str2.toLowerCase(Locale.ROOT));
        atlasEntity2.setAttribute(ModelConstants.ATTR_CREATE_TIME, ModelConstants.UNIX_ZERO_EPOCH);
        atlasEntity2.setRelationshipAttribute(ModelConstants.RELN_LOGICAL_CLUSTER, MetadataRegistry.toRelatedObjectIdWithUniqAttr(atlasEntity));
        return new AtlasEntity.AtlasEntityWithExtInfo(atlasEntity2);
    }

    private AtlasEntity.AtlasEntityWithExtInfo maybeCreateDummyTable(String str, String str2, String str3, AtlasEntity.AtlasEntityWithExtInfo atlasEntityWithExtInfo) {
        String qualifiedName = QualifiedNameGenerator.getQualifiedName(str, str2, str3);
        AtlasEntity.AtlasEntityWithExtInfo newTable = newTable(str, str3, qualifiedName, atlasEntityWithExtInfo.getEntity());
        String str4 = null;
        try {
            str4 = this.metadataRegistry.getGuid(str, ModelConstants.ENTITY_KSQL_TABLE, qualifiedName);
        } catch (AtlasBaseException e) {
        }
        if (str4 == null) {
            try {
                this.metadataRegistry.createOrUpdateEntity(str, newTable);
            } catch (AtlasBaseException e2) {
                log.warn("Could not create ksql table {}", qualifiedName);
            }
        }
        return newTable;
    }

    private AtlasEntity.AtlasEntityWithExtInfo newTable(String str, String str2, String str3, AtlasEntity atlasEntity) {
        AtlasEntity atlasEntity2 = new AtlasEntity(ModelConstants.ENTITY_KSQL_TABLE);
        atlasEntity2.setAttribute("tenant", str);
        atlasEntity2.setAttribute(ModelConstants.ATTR_QUALIFIED_NAME, str3);
        atlasEntity2.setAttribute(ModelConstants.ATTR_NAME, str2);
        atlasEntity2.setAttribute(ModelConstants.ATTR_NAME_LOWER, str2.toLowerCase(Locale.ROOT));
        atlasEntity2.setAttribute(ModelConstants.ATTR_CREATE_TIME, ModelConstants.UNIX_ZERO_EPOCH);
        atlasEntity2.setRelationshipAttribute(ModelConstants.RELN_LOGICAL_CLUSTER, MetadataRegistry.toRelatedObjectIdWithUniqAttr(atlasEntity));
        return new AtlasEntity.AtlasEntityWithExtInfo(atlasEntity2);
    }

    private AtlasEntity.AtlasEntityWithExtInfo newQuery(String str, String str2, String str3, AtlasEntity atlasEntity) {
        AtlasEntity atlasEntity2 = new AtlasEntity(ModelConstants.ENTITY_KSQL_QUERY);
        atlasEntity2.setAttribute("tenant", str);
        atlasEntity2.setAttribute(ModelConstants.ATTR_QUALIFIED_NAME, str3);
        atlasEntity2.setAttribute(ModelConstants.ATTR_NAME, str2);
        atlasEntity2.setAttribute(ModelConstants.ATTR_NAME_LOWER, str2.toLowerCase(Locale.ROOT));
        atlasEntity2.setAttribute(ModelConstants.ATTR_CREATE_TIME, ModelConstants.UNIX_ZERO_EPOCH);
        atlasEntity2.setRelationshipAttribute(ModelConstants.RELN_LOGICAL_CLUSTER, MetadataRegistry.toRelatedObjectIdWithUniqAttr(atlasEntity));
        return new AtlasEntity.AtlasEntityWithExtInfo(atlasEntity2);
    }

    private void createOrUpdatePipelineMetadata(String str, List<PipelineMetadata> list, Long l) {
        try {
            for (PipelineMetadata pipelineMetadata : list) {
                String qualifiedName = QualifiedNameGenerator.getQualifiedName(str, pipelineMetadata.getId());
                Long l2 = (Long) this.entityTimeCache.getIfPresent(qualifiedName);
                if (l2 == null || l == null || l.longValue() >= l2.longValue()) {
                    String kafkaClusterId = pipelineMetadata.getKafkaClusterId();
                    AtlasEntity.AtlasEntityWithExtInfo maybeCreateDummyKafkaCluster = maybeCreateDummyKafkaCluster(str, kafkaClusterId);
                    String ksqlClusterId = pipelineMetadata.getKsqlClusterId();
                    AtlasEntity.AtlasEntityWithExtInfo maybeCreateKsqlCluster = maybeCreateKsqlCluster(str, ksqlClusterId);
                    ArrayList arrayList = new ArrayList();
                    Iterator it = pipelineMetadata.mo512getInputTopicNamesList().iterator();
                    while (it.hasNext()) {
                        arrayList.add(maybeCreateDummyTopic(str, kafkaClusterId, (String) it.next(), maybeCreateDummyKafkaCluster).getEntity());
                    }
                    ArrayList arrayList2 = new ArrayList();
                    Iterator it2 = pipelineMetadata.mo511getOutputTopicNamesList().iterator();
                    while (it2.hasNext()) {
                        arrayList2.add(maybeCreateDummyTopic(str, kafkaClusterId, (String) it2.next(), maybeCreateDummyKafkaCluster).getEntity());
                    }
                    ArrayList arrayList3 = new ArrayList();
                    Iterator it3 = pipelineMetadata.getActivatedResources().mo114getTopicNamesList().iterator();
                    while (it3.hasNext()) {
                        arrayList3.add(maybeCreateDummyTopic(str, kafkaClusterId, (String) it3.next(), maybeCreateDummyKafkaCluster).getEntity());
                    }
                    ArrayList arrayList4 = new ArrayList();
                    Iterator it4 = pipelineMetadata.getActivatedResources().mo113getConnectorIdsList().iterator();
                    while (it4.hasNext()) {
                        arrayList4.add(maybeCreateDummyConnector(str, (String) it4.next()).getEntity());
                    }
                    ArrayList arrayList5 = new ArrayList();
                    Iterator it5 = pipelineMetadata.getActivatedResources().mo112getStreamNamesList().iterator();
                    while (it5.hasNext()) {
                        arrayList5.add(maybeCreateDummyStream(str, ksqlClusterId, (String) it5.next(), maybeCreateKsqlCluster).getEntity());
                    }
                    ArrayList arrayList6 = new ArrayList();
                    Iterator it6 = pipelineMetadata.getActivatedResources().mo111getTableNamesList().iterator();
                    while (it6.hasNext()) {
                        arrayList6.add(maybeCreateDummyTable(str, ksqlClusterId, (String) it6.next(), maybeCreateKsqlCluster).getEntity());
                    }
                    AtlasEntity.AtlasEntityWithExtInfo newPipeline = newPipeline(str, pipelineMetadata.getName(), qualifiedName, pipelineMetadata, maybeCreateDummyKafkaCluster.getEntity(), maybeCreateKsqlCluster.getEntity(), arrayList, arrayList2, arrayList3, arrayList4, arrayList5, arrayList6);
                    if (l != null) {
                        this.entityTimeCache.put(qualifiedName, l);
                    }
                    this.metadataRegistry.createOrUpdateEntity(str, newPipeline);
                    this.metricsManager.recordIngestionPipelineCreation(1);
                } else {
                    log.warn("Discarding event with time {}", Instant.ofEpochMilli(l.longValue()));
                }
            }
            this.metricsManager.recordIngestionTransform(1);
        } catch (AtlasBaseException e) {
            this.metricsManager.recordIngestionTransformError(1);
            this.metricsManager.recordIngestionPipelineCreationError(1);
            log.error("Could not process pipeline creation metadata", e);
        }
    }

    private AtlasEntity.AtlasEntityWithExtInfo newPipeline(String str, String str2, String str3, PipelineMetadata pipelineMetadata, AtlasEntity atlasEntity, AtlasEntity atlasEntity2, List<AtlasEntity> list, List<AtlasEntity> list2, List<AtlasEntity> list3, List<AtlasEntity> list4, List<AtlasEntity> list5, List<AtlasEntity> list6) {
        AtlasEntity atlasEntity3 = new AtlasEntity(ModelConstants.ENTITY_PL_PIPELINE);
        atlasEntity3.setAttribute("tenant", str);
        atlasEntity3.setAttribute(ModelConstants.ATTR_QUALIFIED_NAME, str3);
        atlasEntity3.setAttribute(ModelConstants.ATTR_NAME, str2);
        atlasEntity3.setAttribute(ModelConstants.ATTR_NAME_LOWER, str2.toLowerCase(Locale.ROOT));
        atlasEntity3.setAttribute(ModelConstants.ATTR_CREATE_TIME, ModelConstants.UNIX_ZERO_EPOCH);
        if (pipelineMetadata != null) {
            if (!pipelineMetadata.getId().isEmpty()) {
                atlasEntity3.setAttribute(ModelConstants.ATTR_ID, pipelineMetadata.getId());
            }
            if (!pipelineMetadata.getDescription().isEmpty()) {
                atlasEntity3.setAttribute(ModelConstants.ATTR_DESCRIPTION, pipelineMetadata.getDescription());
            }
            atlasEntity3.setAttribute(ModelConstants.ATTR_METHOD, pipelineMetadata.getMethodName().name());
            atlasEntity3.setAttribute(ModelConstants.ATTR_STATUS, pipelineMetadata.getStatus().getState().name());
            if (pipelineMetadata.getStatus().getCreateTime().getSeconds() != 0) {
                atlasEntity3.setAttribute(ModelConstants.ATTR_CREATE_TIME, new Date(Timestamps.toMillis(pipelineMetadata.getStatus().getCreateTime())));
            }
            if (!pipelineMetadata.getStatus().getCreatedBy().isEmpty()) {
                atlasEntity3.setAttribute(ModelConstants.ATTR_CREATED_BY, pipelineMetadata.getStatus().getCreatedBy());
            }
            if (pipelineMetadata.getStatus().getUpdateTime().getSeconds() != 0) {
                atlasEntity3.setAttribute(ModelConstants.ATTR_UPDATE_TIME, new Date(Timestamps.toMillis(pipelineMetadata.getStatus().getUpdateTime())));
            }
            if (!pipelineMetadata.getStatus().getUpdatedBy().isEmpty()) {
                atlasEntity3.setAttribute(ModelConstants.ATTR_UPDATED_BY, pipelineMetadata.getStatus().getUpdatedBy());
            }
            if (pipelineMetadata.getStatus().getActivateTime().getSeconds() != 0) {
                atlasEntity3.setAttribute(ModelConstants.ATTR_ACTIVATE_TIME, new Date(Timestamps.toMillis(pipelineMetadata.getStatus().getActivateTime())));
            }
            if (!pipelineMetadata.getStatus().getActivatedBy().isEmpty()) {
                atlasEntity3.setAttribute(ModelConstants.ATTR_ACTIVATED_BY, pipelineMetadata.getStatus().getActivatedBy());
            }
            atlasEntity3.setAttribute(ModelConstants.ATTR_TOPIC_COUNT, Integer.valueOf(pipelineMetadata.getStatus().getTopicCount()));
            atlasEntity3.setAttribute(ModelConstants.ATTR_CONNECTOR_COUNT, Integer.valueOf(pipelineMetadata.getStatus().getConnectorCount()));
            atlasEntity3.setAttribute(ModelConstants.ATTR_STREAM_COUNT, Integer.valueOf(pipelineMetadata.getStatus().getStreamCount()));
            atlasEntity3.setAttribute(ModelConstants.ATTR_TABLE_COUNT, Integer.valueOf(pipelineMetadata.getStatus().getTableCount()));
            atlasEntity3.setAttribute(ModelConstants.ATTR_QUERY_COUNT, Integer.valueOf(pipelineMetadata.getStatus().getQueryCount()));
        }
        atlasEntity3.setRelationshipAttribute(ModelConstants.RELN_KAFKA_LOGICAL_CLUSTER, MetadataRegistry.toRelatedObjectIdWithUniqAttr(atlasEntity));
        atlasEntity3.setRelationshipAttribute(ModelConstants.RELN_KSQL_LOGICAL_CLUSTER, MetadataRegistry.toRelatedObjectIdWithUniqAttr(atlasEntity2));
        if (list != null) {
            atlasEntity3.setRelationshipAttribute(ModelConstants.RELN_INPUT_TOPICS, list.stream().map(MetadataRegistry::toRelatedObjectIdWithUniqAttr).collect(Collectors.toList()));
        }
        if (list2 != null) {
            atlasEntity3.setRelationshipAttribute(ModelConstants.RELN_OUTPUT_TOPICS, list2.stream().map(MetadataRegistry::toRelatedObjectIdWithUniqAttr).collect(Collectors.toList()));
        }
        if (list3 != null) {
            atlasEntity3.setRelationshipAttribute(ModelConstants.RELN_ACTIVATED_TOPICS, list3.stream().map(MetadataRegistry::toRelatedObjectIdWithUniqAttr).collect(Collectors.toList()));
        }
        if (list4 != null) {
            atlasEntity3.setRelationshipAttribute(ModelConstants.RELN_ACTIVATED_CONNECTORS, list4.stream().map(MetadataRegistry::toRelatedObjectIdWithUniqAttr).collect(Collectors.toList()));
        }
        if (list5 != null) {
            atlasEntity3.setRelationshipAttribute(ModelConstants.RELN_ACTIVATED_STREAMS, list5.stream().map(MetadataRegistry::toRelatedObjectIdWithUniqAttr).collect(Collectors.toList()));
        }
        if (list6 != null) {
            atlasEntity3.setRelationshipAttribute(ModelConstants.RELN_ACTIVATED_TABLES, list6.stream().map(MetadataRegistry::toRelatedObjectIdWithUniqAttr).collect(Collectors.toList()));
        }
        return new AtlasEntity.AtlasEntityWithExtInfo(atlasEntity3);
    }

    private void deletePipelineMetadata(String str, List<PipelineMetadata> list, Long l) {
        for (PipelineMetadata pipelineMetadata : list) {
            String qualifiedName = QualifiedNameGenerator.getQualifiedName(str, pipelineMetadata.getId());
            Long l2 = (Long) this.entityTimeCache.getIfPresent(qualifiedName);
            if (l2 == null || l == null || l.longValue() >= l2.longValue()) {
                deletePipelineMetadata(str, pipelineMetadata);
                if (l != null) {
                    this.entityTimeCache.put(qualifiedName, l);
                }
            } else {
                log.warn("Discarding event with time {}", Instant.ofEpochMilli(l.longValue()));
            }
        }
    }

    private void deletePipelineMetadata(String str, PipelineMetadata pipelineMetadata) {
        try {
            String qualifiedName = QualifiedNameGenerator.getQualifiedName(str, pipelineMetadata.getId());
            this.metadataRegistry.deleteEntity(str, ModelConstants.ENTITY_PL_PIPELINE, qualifiedName);
            this.metadataRegistry.purgeEntity(str, ModelConstants.ENTITY_PL_PIPELINE, qualifiedName);
            this.metricsManager.recordIngestionTransform(1);
            this.metricsManager.recordIngestionPipelineDeletion(1);
        } catch (AtlasBaseException e) {
            this.metricsManager.recordIngestionTransformError(1);
            this.metricsManager.recordIngestionPipelineDeletionError(1);
            log.error("Could not process pipeline deletion metadata" + pipelineMetadata, e);
        }
    }

    private void createOrUpdateEnvironmentMetadata(String str, List<EnvironmentMetadata> list, Long l) {
        try {
            for (EnvironmentMetadata environmentMetadata : list) {
                String qualifiedName = QualifiedNameGenerator.getQualifiedName(str, environmentMetadata.getEnvironmentId());
                Long l2 = (Long) this.entityTimeCache.getIfPresent(qualifiedName);
                if (l2 == null || l == null || l.longValue() >= l2.longValue()) {
                    AtlasEntity.AtlasEntityWithExtInfo newEnvironment = newEnvironment(str, environmentMetadata.getEnvironmentId(), environmentMetadata.getEnvironmentName(), qualifiedName, environmentMetadata);
                    if (l != null) {
                        this.entityTimeCache.put(qualifiedName, l);
                    }
                    this.metadataRegistry.createOrUpdateEntity(str, newEnvironment);
                    this.metricsManager.recordIngestionEnvironmentCreation(1);
                } else {
                    log.warn("Discarding create environment event with time {}", Instant.ofEpochMilli(l.longValue()));
                }
            }
            this.metricsManager.recordIngestionTransform(1);
        } catch (AtlasBaseException e) {
            this.metricsManager.recordIngestionTransformError(1);
            this.metricsManager.recordIngestionEnvironmentCreationError(1);
            log.error("Could not process environment creation metadata", e);
        }
    }

    private AtlasEntity.AtlasEntityWithExtInfo newEnvironment(String str, String str2, String str3, String str4, EnvironmentMetadata environmentMetadata) {
        AtlasEntity atlasEntity = new AtlasEntity(ModelConstants.ENTITY_CF_ENVIRONMENT);
        atlasEntity.setAttribute("tenant", str);
        atlasEntity.setAttribute(ModelConstants.ATTR_QUALIFIED_NAME, str4);
        atlasEntity.setAttribute(ModelConstants.ATTR_NAME, str3);
        atlasEntity.setAttribute(ModelConstants.ATTR_NAME_LOWER, str3.toLowerCase(Locale.ROOT));
        atlasEntity.setAttribute(ModelConstants.ATTR_CREATE_TIME, ModelConstants.UNIX_ZERO_EPOCH);
        if (environmentMetadata != null) {
            if (!environmentMetadata.getEnvironmentId().isEmpty()) {
                atlasEntity.setAttribute(ModelConstants.ATTR_ID, environmentMetadata.getEnvironmentId());
            }
            if (environmentMetadata.getCreateTime().getSeconds() != 0) {
                atlasEntity.setAttribute(ModelConstants.ATTR_CREATE_TIME, new Date(Timestamps.toMillis(environmentMetadata.getCreateTime())));
            }
            if (environmentMetadata.getUpdateTime().getSeconds() != 0) {
                atlasEntity.setAttribute(ModelConstants.ATTR_UPDATE_TIME, new Date(Timestamps.toMillis(environmentMetadata.getUpdateTime())));
            }
        } else if (str2 != null) {
            atlasEntity.setAttribute(ModelConstants.ATTR_ID, str2);
        }
        return new AtlasEntity.AtlasEntityWithExtInfo(atlasEntity);
    }

    private AtlasEntity.AtlasEntityWithExtInfo maybeCreateDummyEnvironment(String str, String str2) {
        String qualifiedName = QualifiedNameGenerator.getQualifiedName(str, str2);
        AtlasEntity.AtlasEntityWithExtInfo newEnvironment = newEnvironment(str, str2, "", qualifiedName, null);
        String str3 = null;
        try {
            str3 = this.metadataRegistry.getGuid(str, ModelConstants.ENTITY_CF_ENVIRONMENT, qualifiedName);
        } catch (AtlasBaseException e) {
        }
        if (str3 == null) {
            try {
                this.metadataRegistry.createOrUpdateEntity(str, newEnvironment);
            } catch (AtlasBaseException e2) {
                log.warn("Could not create dummy environment {}", qualifiedName);
            }
        }
        return newEnvironment;
    }

    private void deleteEnvironmentMetadata(String str, List<EnvironmentMetadata> list, Long l) {
        for (EnvironmentMetadata environmentMetadata : list) {
            String qualifiedName = QualifiedNameGenerator.getQualifiedName(str, environmentMetadata.getEnvironmentId());
            Long l2 = (Long) this.entityTimeCache.getIfPresent(qualifiedName);
            if (l2 == null || l == null || l.longValue() >= l2.longValue()) {
                deleteEnvironmentMetadata(str, environmentMetadata, qualifiedName);
                if (l != null) {
                    this.entityTimeCache.put(qualifiedName, l);
                }
            } else {
                log.warn("Discarding delete environment event with time {}", Instant.ofEpochMilli(l.longValue()));
            }
        }
    }

    private void deleteEnvironmentMetadata(String str, EnvironmentMetadata environmentMetadata, String str2) {
        try {
            this.metadataRegistry.deleteEntity(str, ModelConstants.ENTITY_CF_ENVIRONMENT, str2);
            this.metadataRegistry.purgeEntity(str, ModelConstants.ENTITY_CF_ENVIRONMENT, str2);
            this.metricsManager.recordIngestionTransform(1);
            this.metricsManager.recordIngestionEnvironmentDeletion(1);
        } catch (AtlasBaseException e) {
            this.metricsManager.recordIngestionTransformError(1);
            this.metricsManager.recordIngestionEnvironmentDeletionError(1);
            log.error("Could not process environment deletion metadata" + environmentMetadata, e);
        }
    }

    private String getCleanupPolicy(TopicMetadata.CleanupPolicy cleanupPolicy) {
        return (cleanupPolicy == null || cleanupPolicy == TopicMetadata.CleanupPolicy.UNSPECIFIED) ? "NONE" : cleanupPolicy.name();
    }

    private String getCompressionType(TopicMetadata.CompressionType compressionType) {
        return (compressionType == null || compressionType == TopicMetadata.CompressionType.COMPRESSION_UNSPECIFIED) ? "UNSPECIFIED" : compressionType.name();
    }
}
