/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.logevents.connect;

import com.google.protobuf.Message;
import io.confluent.logevents.connect.ConnectLogEntry;
import io.confluent.logevents.connect.LogEventsConfig;
import io.confluent.logevents.connect.LogEventsEmitter;
import io.confluent.logevents.connect.RemoteConnectLogsConfiguration;
import io.confluent.remote.config.poller.Region;
import io.confluent.remote.config.poller.RemoteConfigConfiguration;
import io.confluent.remote.config.poller.RemoteConfigurationSource;
import io.confluent.remote.config.poller.kubernetes.KubernetesConfigMapRemoteConfigurationConfig;
import io.confluent.remote.config.poller.kubernetes.KubernetesConfigMapRemoteConfigurationSource;
import io.confluent.telemetry.api.events.Event;
import io.confluent.telemetry.events.EventLogger;
import io.confluent.telemetry.events.EventUtils;
import java.time.Instant;
import java.time.ZoneOffset;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.security.oauthbearer.internals.secured.Retry;
import org.apache.kafka.common.security.oauthbearer.internals.secured.Retryable;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LogEventsKafkaEmitter
implements LogEventsEmitter {
    private static final Logger log = LoggerFactory.getLogger(LogEventsKafkaEmitter.class);
    private volatile EventLogger eventLogger = null;
    private LogEventsConfig logEventsConfig = null;
    private String dataContentType;
    private volatile RemoteConnectLogsConfiguration remoteConfiguration;
    private boolean namedConfigEnabled = false;
    private RemoteConfigurationSource<RemoteConnectLogsConfiguration> kubernetesConfigMapRemoteConfigurationSource;

    public LogEventsKafkaEmitter(EventLogger eventLogger, LogEventsConfig logEventsConfig, String dataContentType) {
        this.eventLogger = eventLogger;
        this.logEventsConfig = logEventsConfig;
        this.dataContentType = dataContentType;
    }

    public LogEventsKafkaEmitter() {
    }

    public synchronized void start(Map<String, ?> configs) {
        if (this.eventLogger != null) {
            log.warn("Skipping reinitialization of {}", (Object)this);
            return;
        }
        this.namedConfigEnabled = this.getNamedConfigEnabled(configs);
        log.info("NamedConfigEnabled: {}", (Object)this.namedConfigEnabled);
        if (this.isDRFeatureEnabled()) {
            try {
                this.kubernetesConfigMapRemoteConfigurationSource = this.configureRemoteConfigurationSource(configs);
                if (Objects.nonNull(this.kubernetesConfigMapRemoteConfigurationSource.getConfig())) {
                    this.remoteConfiguration = (RemoteConnectLogsConfiguration)this.kubernetesConfigMapRemoteConfigurationSource.getConfig();
                    this.updateConfigsWithNamedConfig(configs, this.remoteConfiguration);
                }
                if (this.shouldStartKubernetesConfigMapRemoteConfigurationSource()) {
                    try {
                        this.kubernetesConfigMapRemoteConfigurationSource.start();
                        log.info("Started the Kubernetes remote configuration source");
                    }
                    catch (IllegalStateException e) {
                        log.error("RemoteConfigmap source in illegal state");
                    }
                }
            }
            catch (Exception e) {
                log.error("Error occurred while configuring named Config. Continuing with the regular flow", (Throwable)e);
            }
        }
        this.logEventsConfig = new LogEventsConfig(configs);
        if (!this.logEventsConfig.getBoolean("confluent.event.logger.enable").booleanValue()) {
            log.info("Connect Log Events aren't enabled.");
            return;
        }
        log.info("Initializing {}", (Object)this);
        this.populateDataContentType();
        this.eventLogger = new EventLogger();
        this.eventLogger.configure(this.logEventsConfig.toEventLoggerConfig());
    }

    public void onReceiveCallback(RemoteConnectLogsConfiguration newRemoteConfiguration) {
        try {
            String activeRegion = newRemoteConfiguration.getActiveRegion();
            if (this.isDRFeatureEnabled()) {
                log.info("Received new RemoteConnectLogsConfiguration with active Region: {}", (Object)activeRegion);
                this.remoteConfiguration = newRemoteConfiguration;
                HashMap configs = null;
                if (this.isRemoteConnectLogsConfigurationEnabled()) {
                    configs = new HashMap(this.logEventsConfig.originals());
                    this.updateConfigsWithNamedConfig(configs, this.remoteConfiguration);
                }
                this.reconfigureLogger(configs);
            }
        }
        catch (Exception e) {
            log.error("Error occurred while handling named config", (Throwable)e);
        }
    }

    void reconfigureLogger(Map<String, ?> configs) {
        EventLogger oldLogger = this.eventLogger;
        EventLogger newLogger = new EventLogger();
        LogEventsConfig newlogEventsConfig = new LogEventsConfig(configs);
        newLogger.configure(newlogEventsConfig.toEventLoggerConfig());
        this.logEventsConfig = newlogEventsConfig;
        this.eventLogger = newLogger;
        if (oldLogger != null) {
            Utils.closeQuietly((AutoCloseable)oldLogger, (String)"eventLogger");
        }
    }

    public synchronized LogEventsConfig logEventsConfig() {
        if (this.logEventsConfig == null) {
            log.error("logEventsConfig instance can't be returned without starting logEventsKafkaEmitter instance.");
            throw new IllegalStateException("logEventsConfig can't be accessed without starting logEventsKafkaEmitter instance");
        }
        return this.logEventsConfig;
    }

    @Override
    public void emit(ConnectLogEntry connectLogEntry, String subject, String source, String type) {
        if (this.eventLogger == null) {
            log.trace("Skipping emitting the Connect Log Entry: {}, with source: {}, subject: {}, type: {}", new Object[]{connectLogEntry, source, subject, type});
            return;
        }
        Event event = this.eventBuilder(connectLogEntry, subject, source, type);
        try {
            this.eventLogger.log(event);
        }
        catch (RuntimeException e) {
            log.error("Unable to emit event: {}", (Object)EventUtils.toJson((Event)event), (Object)e);
        }
    }

    public synchronized void stop() {
        if (this.eventLogger == null) {
            return;
        }
        log.info("Stopping {}, closing event logger", (Object)this);
        try {
            this.eventLogger.close();
            this.eventLogger = null;
        }
        catch (Exception e) {
            log.error("Error closing the event logger in {}", (Object)this, (Object)e);
        }
    }

    public String toString() {
        return LogEventsKafkaEmitter.class.getSimpleName();
    }

    EventLogger getEventLogger() {
        return this.eventLogger;
    }

    private Event eventBuilder(ConnectLogEntry connectLogEntry, String subject, String source, String type) {
        return new Event().setId(UUID.randomUUID().toString()).setTime(Instant.now().atOffset(ZoneOffset.UTC)).setData(this.dataContentType, EventUtils.protoToBytes((Message)connectLogEntry, (String)this.dataContentType)).setSource(source).setSubject(subject).setType(type).setExtension("route", this.logEventsConfig.getString("confluent.event.logger.exporter.kafka.topic.name"));
    }

    private void populateDataContentType() {
        String encodingConfig;
        switch (encodingConfig = this.logEventsConfig.getString("confluent.event.logger.cloudevent.codec")) {
            case "binary": {
                this.dataContentType = "application/protobuf";
                break;
            }
            case "structured": {
                this.dataContentType = "application/json";
                break;
            }
            default: {
                throw new RuntimeException("unknown encoding " + encodingConfig);
            }
        }
    }

    private boolean getNamedConfigEnabled(Map<String, ?> configs) {
        try {
            String namedConfigEnabled = configs.getOrDefault("confluent.event.logger.named.config.enabled", "false");
            return namedConfigEnabled.equalsIgnoreCase("true");
        }
        catch (Exception e) {
            log.error("Error occurred while handling named config", (Throwable)e);
            throw new ConfigException("Invalid named config", (Object)e);
        }
    }

    private boolean shouldStartKubernetesConfigMapRemoteConfigurationSource() {
        return this.isDRFeatureEnabled() && Objects.nonNull(this.kubernetesConfigMapRemoteConfigurationSource);
    }

    private boolean isRemoteConnectLogsConfigurationEnabled() {
        return this.isDRFeatureEnabled() && Objects.nonNull(this.remoteConfiguration);
    }

    private boolean isDRFeatureEnabled() {
        return this.namedConfigEnabled;
    }

    public RemoteConfigurationSource<RemoteConnectLogsConfiguration> configureRemoteConfigurationSource(Map<String, ?> configs) {
        try {
            Optional<RemoteConfigConfiguration> remoteConfigConfiguration = this.parseRemoteConfigSourceConfiguration(configs, true);
            if (!remoteConfigConfiguration.isPresent()) {
                log.error("Remote configuration is not present");
                throw new ConfigException("Remote configuration is not present");
            }
            return this.getRemoteKubernetesConfigurationSource(remoteConfigConfiguration.get());
        }
        catch (Exception e) {
            log.error("Error occurred while handling named config", (Throwable)e);
            throw new RuntimeException(e);
        }
    }

    private RemoteConfigurationSource<RemoteConnectLogsConfiguration> getRemoteKubernetesConfigurationSource(RemoteConfigConfiguration remoteConfigConfiguration) {
        try {
            KubernetesConfigMapRemoteConfigurationSource remoteConfigurationSource = new KubernetesConfigMapRemoteConfigurationSource((KubernetesConfigMapRemoteConfigurationConfig)remoteConfigConfiguration, Optional.empty(), this::onReceiveCallback, RemoteConnectLogsConfiguration.class);
            long retryBackOffMaxMs = ((KubernetesConfigMapRemoteConfigurationConfig)remoteConfigConfiguration).getRetryBackoffMs();
            long maxRetryBackOffMs = ((KubernetesConfigMapRemoteConfigurationConfig)remoteConfigConfiguration).getMaxRetryBackoffMs();
            Retry remoteConnectLogsConfigurationRetry = new Retry(retryBackOffMaxMs, maxRetryBackOffMs);
            try {
                RemoteConnectLogsConfiguration remoteConnectLogsConfiguration = (RemoteConnectLogsConfiguration)remoteConnectLogsConfigurationRetry.execute((Retryable)remoteConfigurationSource);
                log.info("RemoteConnectLogsConfiguration Region: {}", (Object)remoteConnectLogsConfiguration.getActiveRegion());
            }
            catch (ExecutionException e) {
                log.error("Error while setting the RemoteConnectLogsConfiguration in the kubernetesConfigurationSource. Continuing with original bootstrap servers from worker configs.", (Throwable)e);
            }
            return remoteConfigurationSource;
        }
        catch (Exception e) {
            throw new RuntimeException("Unable to configure RemoteConfigurationSource", e);
        }
    }

    Optional<RemoteConfigConfiguration> parseRemoteConfigSourceConfiguration(Map<String, ?> configs, boolean doLog) {
        LogEventsConfig config = new LogEventsConfig(configs);
        Map remoteConfigSourceConfigs = config.originalsWithPrefix("confluent.event.logger.remoteconfig.cloud.");
        return Optional.of(new KubernetesConfigMapRemoteConfigurationConfig(remoteConfigSourceConfigs, doLog));
    }

    void updateConfigsWithNamedConfig(Map<String, ?> configs, RemoteConnectLogsConfiguration remoteConnectLogsConfiguration) {
        if (this.isDRFeatureEnabled()) {
            try {
                String activeRegion = remoteConnectLogsConfiguration.getActiveRegion();
                log.info("Active Region: {}.", (Object)activeRegion);
                Region region = Region.valueOf((String)activeRegion);
                String namedBootStrapServers = remoteConnectLogsConfiguration.getConnectLogEventsProducerConfig(region).getBootstrapServers();
                String namedSaslJaasConfig = remoteConnectLogsConfiguration.getConnectLogEventsProducerConfig(region).getJaasConfig();
                configs.put("confluent.event.logger.exporter.kafka.producer.bootstrap.servers", namedBootStrapServers);
                configs.put("confluent.event.logger.exporter.kafka.producer.sasl.jaas.config", namedSaslJaasConfig);
            }
            catch (Exception e) {
                log.error("Error occurred while handling named config", (Throwable)e);
                throw new ConfigException("Invalid named config", (Object)e);
            }
        }
    }

    public RemoteConnectLogsConfiguration getRemoteConfiguration() {
        return this.remoteConfiguration;
    }
}

