/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.runtime.events;

import com.google.protobuf.util.Timestamps;
import io.cloudevents.SpecVersion;
import io.cloudevents.v1.proto.CloudEvent;
import io.confluent.protobuf.events.catalog.v1.MetadataChange;
import io.confluent.protobuf.events.catalog.v1.MetadataEvent;
import io.confluent.protobuf.events.catalog.v1.OpType;
import java.util.ArrayList;
import java.util.List;
import org.apache.kafka.connect.runtime.events.ClusterMetadata;
import org.apache.kafka.connect.runtime.events.ConnectorMetadata;
import org.apache.kafka.connect.runtime.events.EmitterConfig;
import org.apache.kafka.connect.runtime.events.EventEmitter;
import org.apache.kafka.connect.runtime.events.EventPublisher;
import org.apache.kafka.connect.runtime.events.EventTransformer;
import org.apache.kafka.connect.runtime.events.EventsException;

public class CloudEventsEmitter
extends EventEmitter {
    private static final String CONNECT_ENTITY = "connect";
    private final EventTransformer<ConnectorMetadata, MetadataEvent> transformer;
    private final EventPublisher<CloudEvent> publisher;

    public CloudEventsEmitter(EmitterConfig config, ClusterMetadata clusterMetadata, EventTransformer<ConnectorMetadata, MetadataEvent> transformer, EventPublisher<CloudEvent> publisher) {
        super(config, clusterMetadata);
        this.transformer = transformer;
        this.publisher = publisher;
    }

    private void send(List<ConnectorMetadata> connectorEvents, boolean isSnapshot, String clusterId, String metadataKafkaClusterId) throws EventsException {
        ArrayList<MetadataEvent> events = new ArrayList<MetadataEvent>(connectorEvents.size());
        for (ConnectorMetadata connectorMetadata : connectorEvents) {
            MetadataEvent event = this.transformer.transform(connectorMetadata);
            events.add(event);
        }
        if (isSnapshot) {
            MetadataChange change2 = MetadataChange.newBuilder().setSource(clusterId).setOp(OpType.SNAPSHOT).addAllEvents(events).build();
            this.publisher.publishEvent(this.cloudEvent(change2, clusterId, metadataKafkaClusterId));
            return;
        }
        ArrayList<MetadataChange> changes = new ArrayList<MetadataChange>();
        for (int i = 0; i < connectorEvents.size(); ++i) {
            ConnectorMetadata connectorMetadata = connectorEvents.get(i);
            MetadataEvent event = (MetadataEvent)events.get(i);
            MetadataChange change3 = MetadataChange.newBuilder().setSource(connectorMetadata.getConnectClusterId()).setOp(connectorMetadata.isDeleted() ? OpType.DELETE : OpType.UPDATE).addEvents(event).build();
            changes.add(change3);
        }
        this.publisher.publishEvents(changes.stream().map(change -> this.cloudEvent((MetadataChange)change, clusterId, metadataKafkaClusterId)).toList());
    }

    @Override
    protected void publishSnapshot(List<ConnectorMetadata> allConnectorMetadata, String clusterId, String metadataKafkaClusterId) throws EventsException {
        if (allConnectorMetadata == null) {
            log.info("No connector metadata to publish");
            return;
        }
        log.info("Publishing snapshot event for {} connectors", (Object)allConnectorMetadata.size());
        this.send(allConnectorMetadata, true, clusterId, metadataKafkaClusterId);
    }

    @Override
    protected void publishIncremental(List<ConnectorMetadata> incrementalConnectorMetadata) throws EventsException {
        if (incrementalConnectorMetadata == null || incrementalConnectorMetadata.isEmpty()) {
            log.info("No incremental connector metadata to publish");
            return;
        }
        log.info("Publishing incremental event for {} connectors", (Object)incrementalConnectorMetadata.size());
        this.send(incrementalConnectorMetadata, false, incrementalConnectorMetadata.get(0).getConnectClusterId(), incrementalConnectorMetadata.get(0).getMetadataKafkaClusterId());
    }

    private CloudEvent cloudEvent(MetadataChange metadataChange, String clusterId, String metadataKafkaClusterId) {
        return CloudEvent.newBuilder().setType(metadataChange.getOp().name()).setSpecVersion(SpecVersion.V1.toString()).putAttributes("datacontenttype", CloudEvent.CloudEventAttributeValue.newBuilder().setCeString("application/protobuf").build()).putAttributes("subject", CloudEvent.CloudEventAttributeValue.newBuilder().setCeString(CONNECT_ENTITY).build()).putAttributes("time", CloudEvent.CloudEventAttributeValue.newBuilder().setCeTimestamp(Timestamps.now()).build()).putAttributes("connectgroupid", CloudEvent.CloudEventAttributeValue.newBuilder().setCeString(clusterId).build()).putAttributes("metadatakafkaclusterid", CloudEvent.CloudEventAttributeValue.newBuilder().setCeString(metadataKafkaClusterId).build()).setBinaryData(metadataChange.toByteString()).build();
    }

    @Override
    protected void close() {
        this.publisher.close();
    }
}

