/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.logevents.connect;

import com.google.protobuf.Message;
import io.confluent.logevents.connect.ConnectLogEntry;
import io.confluent.logevents.connect.LogEventsConfig;
import io.confluent.logevents.connect.LogEventsEmitter;
import io.confluent.telemetry.api.events.Event;
import io.confluent.telemetry.events.EventLogger;
import io.confluent.telemetry.events.EventUtils;
import java.time.Instant;
import java.time.ZoneOffset;
import java.util.Map;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LogEventsKafkaEmitter
implements LogEventsEmitter {
    private static final Logger log = LoggerFactory.getLogger(LogEventsKafkaEmitter.class);
    private EventLogger eventLogger = null;
    private LogEventsConfig logEventsConfig = null;
    private String dataContentType;

    public synchronized void start(Map<String, ?> configs) {
        if (this.eventLogger != null) {
            log.warn("Skipping reinitialization of {}", (Object)this);
            return;
        }
        this.logEventsConfig = new LogEventsConfig(configs);
        if (!this.logEventsConfig.getBoolean("confluent.event.logger.enable").booleanValue()) {
            log.info("Connect Log Events aren't enabled.");
            return;
        }
        log.info("Initializing {}", (Object)this);
        this.populateDataContentType();
        this.eventLogger = new EventLogger();
        this.eventLogger.configure(this.logEventsConfig.toEventLoggerConfig());
    }

    @Override
    public void emit(ConnectLogEntry connectLogEntry, String subject, String source, String type) {
        if (this.eventLogger == null) {
            log.trace("Skipping emitting the Connect Log Entry: {}, with source: {}, subject: {}, type: {}", new Object[]{connectLogEntry, source, subject, type});
            return;
        }
        Event event = this.eventBuilder(connectLogEntry, subject, source, type);
        try {
            this.eventLogger.log(event);
        }
        catch (RuntimeException e) {
            log.error("Unable to emit event: {}", (Object)EventUtils.toJson((Event)event), (Object)e);
        }
    }

    public synchronized void stop() {
        if (this.eventLogger == null) {
            return;
        }
        log.info("Stopping {}, closing event logger", (Object)this);
        try {
            this.eventLogger.close();
            this.eventLogger = null;
        }
        catch (Exception e) {
            log.error("Error closing the event logger in {}", (Object)this, (Object)e);
        }
    }

    public String toString() {
        return LogEventsKafkaEmitter.class.getSimpleName();
    }

    EventLogger getEventLogger() {
        return this.eventLogger;
    }

    private Event eventBuilder(ConnectLogEntry connectLogEntry, String subject, String source, String type) {
        return new Event().setId(UUID.randomUUID().toString()).setTime(Instant.now().atOffset(ZoneOffset.UTC)).setData(this.dataContentType, EventUtils.protoToBytes((Message)connectLogEntry, (String)this.dataContentType)).setSource(source).setSubject(subject).setType(type).setExtension("route", this.logEventsConfig.getString("confluent.event.logger.exporter.kafka.topic.name"));
    }

    private void populateDataContentType() {
        String encodingConfig;
        switch (encodingConfig = this.logEventsConfig.getString("confluent.event.logger.cloudevent.codec")) {
            case "binary": {
                this.dataContentType = "application/protobuf";
                break;
            }
            case "structured": {
                this.dataContentType = "application/json";
                break;
            }
            default: {
                throw new RuntimeException("unknown encoding " + encodingConfig);
            }
        }
    }
}

