/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.runtime.events;

import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.connect.runtime.events.ClusterMetadata;
import org.apache.kafka.connect.runtime.events.ConnectorMetadata;
import org.apache.kafka.connect.runtime.events.EmitterConfig;
import org.apache.kafka.connect.runtime.events.EventsException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class EventEmitter {
    protected static final Logger log = LoggerFactory.getLogger(EventEmitter.class);
    private final Long snapshotInterval;
    private final Long minEventInterval;
    private final ScheduledExecutorService executorService;
    private final ClusterMetadata clusterMetadata;
    private volatile boolean active = false;
    private final boolean snapshotEnabled;
    private final boolean incrementalEnabled;
    private final Map<String, Boolean> connectorsToUpdate;
    private Instant lastSnapshotTime;

    protected EventEmitter(EmitterConfig config, ClusterMetadata clusterMetadata) {
        this.snapshotEnabled = config.snapshotEnabled();
        this.snapshotInterval = config.snapshotInterval();
        this.minEventInterval = config.eventInterval();
        this.connectorsToUpdate = new HashMap<String, Boolean>();
        this.executorService = Executors.newSingleThreadScheduledExecutor();
        this.incrementalEnabled = config.incrementalEnabled();
        this.clusterMetadata = clusterMetadata;
    }

    public void start() {
        log.info("Starting EventEmitter with interval: {}", (Object)this.minEventInterval);
        this.executorService.scheduleAtFixedRate(this::process, 0L, this.minEventInterval, TimeUnit.SECONDS);
    }

    public void stop() {
        log.info("Stopping EventEmitter");
        this.deactivate();
        this.executorService.shutdown();
        this.close();
    }

    protected abstract void close();

    private boolean canEmitEvents() {
        if (!this.isActive()) {
            log.debug("EventEmitter is not active, skipping emission");
            return false;
        }
        if (!this.clusterMetadata.isReady()) {
            log.info("Cluster metadata is not ready, skipping emission");
            return false;
        }
        if (!this.clusterMetadata.refresh()) {
            log.info("Cluster metadata refresh failed, skipping emission");
            return false;
        }
        return true;
    }

    private void processSnapshotEvent() {
        try {
            long epochMs = System.currentTimeMillis();
            ArrayList<ConnectorMetadata> allConnectorMetadata = new ArrayList<ConnectorMetadata>();
            for (String connectorName : this.clusterMetadata.connectors()) {
                try {
                    allConnectorMetadata.add(this.snapshotConnectorMetadata(connectorName, epochMs));
                }
                catch (EventsException e) {
                    log.error("Failed to get connector metadata for {}: {}", (Object)connectorName, (Object)e.getMessage());
                }
            }
            log.debug("Emitting snapshot event for connectors: {}", allConnectorMetadata.stream().map(ConnectorMetadata::getName).collect(Collectors.toList()));
            this.publishSnapshot(allConnectorMetadata, this.clusterMetadata.connectClusterId(), this.clusterMetadata.metadataKafkaClusterId());
        }
        catch (Throwable e) {
            log.error("Error emitting snapshot event", e);
        }
    }

    private void processIncrementalEvents() {
        try {
            ArrayList<ConnectorMetadata> incrementalEvents = new ArrayList<ConnectorMetadata>();
            for (Map.Entry<String, Boolean> entry : this.connectorsToUpdate.entrySet()) {
                boolean isDeleted;
                String connectorName = entry.getKey();
                boolean bl = isDeleted = entry.getValue() != false && !this.clusterMetadata.connectors().contains(connectorName);
                ConnectorMetadata connectorMetadata = this.incrementalConnectorMetadata(connectorName, System.currentTimeMillis(), isDeleted);
                if (connectorMetadata == null) continue;
                incrementalEvents.add(connectorMetadata);
            }
            if (incrementalEvents.isEmpty()) {
                log.debug("No incremental events to process");
                return;
            }
            this.connectorsToUpdate.clear();
            log.debug("Emitting incremental event for connectors: {}", incrementalEvents.stream().map(ConnectorMetadata::getName).collect(Collectors.toList()));
            this.publishIncremental(incrementalEvents);
        }
        catch (EventsException e) {
            log.error("Error processing incremental event(s) ", (Throwable)e);
        }
    }

    private synchronized void process() {
        if (!this.canEmitEvents()) {
            return;
        }
        if (this.snapshotEnabled && (this.lastSnapshotTime == null || this.lastSnapshotTime.plusSeconds(this.snapshotInterval).isBefore(Instant.now()))) {
            this.processSnapshotEvent();
            this.lastSnapshotTime = Instant.now();
            this.connectorsToUpdate.clear();
        } else if (this.incrementalEnabled) {
            this.processIncrementalEvents();
        }
    }

    private ConnectorMetadata snapshotConnectorMetadata(String connectorName, long epochMs) throws EventsException {
        return new ConnectorMetadata(connectorName, this.clusterMetadata.connectorType(connectorName), this.clusterMetadata.maskedConnectorConfigs(connectorName), this.clusterMetadata.connectorStatus(connectorName), this.clusterMetadata.connectorTopics(connectorName), this.clusterMetadata.metadataKafkaClusterId(), this.clusterMetadata.dataKafkaClusterId(connectorName), this.clusterMetadata.connectClusterId(), this.clusterMetadata.tasksMax(connectorName), this.clusterMetadata.valueConverter(connectorName), epochMs, false);
    }

    private ConnectorMetadata incrementalConnectorMetadata(String connectorName, long epochMs, boolean isDeleted) throws EventsException {
        try {
            return new ConnectorMetadata(connectorName, isDeleted ? null : this.clusterMetadata.connectorType(connectorName), isDeleted ? null : this.clusterMetadata.maskedConnectorConfigs(connectorName), isDeleted ? null : this.clusterMetadata.connectorStatus(connectorName), isDeleted ? null : this.clusterMetadata.connectorTopics(connectorName), this.clusterMetadata.metadataKafkaClusterId(), isDeleted ? null : this.clusterMetadata.dataKafkaClusterId(connectorName), this.clusterMetadata.connectClusterId(), isDeleted ? 0 : this.clusterMetadata.tasksMax(connectorName), isDeleted ? null : this.clusterMetadata.valueConverter(connectorName), epochMs, isDeleted);
        }
        catch (EventsException.UnknownConnectorException e) {
            return null;
        }
    }

    public synchronized void emitChange(String connectorName, boolean isDeleted) {
        if (!this.incrementalEnabled) {
            log.info("Incremental event is disabled, skipping incremental event emission");
            return;
        }
        if (this.connectorsToUpdate.containsKey(connectorName) && this.connectorsToUpdate.get(connectorName).booleanValue()) {
            log.debug("Skipping deleted connector event {}", (Object)connectorName);
            return;
        }
        this.connectorsToUpdate.put(connectorName, isDeleted);
    }

    public void activate() {
        if (!this.active) {
            log.info("Activating EventEmitter");
            this.connectorsToUpdate.clear();
        }
        this.active = true;
    }

    public void deactivate() {
        if (this.active) {
            log.info("Deactivating EventEmitter");
        }
        this.active = false;
    }

    public boolean isActive() {
        return this.active;
    }

    protected abstract void publishSnapshot(List<ConnectorMetadata> var1, String var2, String var3) throws EventsException;

    protected abstract void publishIncremental(List<ConnectorMetadata> var1) throws EventsException;
}

