/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.runtime.events;

import io.cloudevents.v1.proto.CloudEvent;
import io.confluent.telemetry.events.v0.EventServiceRequest;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.X509TrustManager;
import okhttp3.Credentials;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import org.apache.http.client.utils.URIBuilder;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.events.CloudEventsHttpEmitterConfig;
import org.apache.kafka.connect.runtime.events.ConnectMetadataCompressionUtils;
import org.apache.kafka.connect.runtime.events.ConnectMetadataSslFactory;
import org.apache.kafka.connect.runtime.events.EventPublisher;
import org.apache.kafka.connect.runtime.events.EventTransformer;
import org.apache.kafka.connect.runtime.events.EventsException;
import org.apache.kafka.connect.runtime.events.MetadataPublisherMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HttpMetadataChangePublisher
implements EventPublisher<CloudEvent> {
    private static final Logger log = LoggerFactory.getLogger(HttpMetadataChangePublisher.class);
    private static final String PATH = "v1/events";
    private final OkHttpClient httpclient;
    private final String url;
    private final String key;
    private final String secret;
    private final EventTransformer<List<CloudEvent>, EventServiceRequest> transformer = new ApiEventTransformer();
    private final ConnectMetadataSslFactory sslFactory;
    private final CloudEventsHttpEmitterConfig config;

    public HttpMetadataChangePublisher(CloudEventsHttpEmitterConfig config) {
        this.config = config;
        Map<String, Object> sslConfigs = config.sslConfigs();
        boolean hasExplicitSslConfig = config.hasExplicitSslConfig();
        this.sslFactory = hasExplicitSslConfig ? new ConnectMetadataSslFactory(sslConfigs) : null;
        OkHttpClient.Builder clientBuilder = new OkHttpClient.Builder();
        if (this.sslFactory != null) {
            try {
                SSLSocketFactory sslSocketFactory = this.sslFactory.getSslSocketFactory();
                if (sslSocketFactory == null) {
                    log.error("SSL configuration provided but failed to create SSL socket factory");
                    throw new ConnectException("SSL configuration failed - unable to create SSL socket factory");
                }
                X509TrustManager trustManager = this.sslFactory.getTrustManager();
                clientBuilder.sslSocketFactory(sslSocketFactory, trustManager);
                if (trustManager != null) {
                    log.debug("SSL/TLS configured for Connect Metadata Emitter HTTP client with custom trust store");
                }
                log.debug("SSL/TLS configured for Connect Metadata Emitter HTTP client with JVM default trust store");
            }
            catch (Exception e) {
                log.error("Failed to configure SSL for Connect Metadata Emitter HTTP client", (Throwable)e);
                throw new ConnectException("SSL configuration failed", (Throwable)e);
            }
        } else {
            log.debug("No explicit SSL configuration provided, using default HTTP client SSL settings");
        }
        this.httpclient = clientBuilder.build();
        String url = null;
        try {
            url = new URIBuilder(config.httpBaseUrl()).setPath(PATH).build().toString();
        }
        catch (URISyntaxException e) {
            log.error("Invalid URI for HTTP event emitter: {}", (Object)config.httpBaseUrl(), (Object)e);
        }
        this.url = url;
        this.key = config.apiKey();
        this.secret = config.apiSecret();
    }

    @Override
    public void publishEvent(CloudEvent event) throws EventsException {
        EventServiceRequest eventToPublish = this.transformer.transform(Collections.singletonList(event));
        this.publish(eventToPublish, event.getType(), event.getSource(), 1);
    }

    @Override
    public void publishEvents(List<CloudEvent> events) throws EventsException {
        if (events == null || events.isEmpty()) {
            return;
        }
        EventServiceRequest eventToPublish = this.transformer.transform(events);
        this.publish(eventToPublish, events.get(0).getType(), events.get(0).getSource(), events.size());
    }

    private void publish(EventServiceRequest eventServiceRequest, String operation, String source, int eventCount) throws EventsException {
        byte[] payload;
        long startTime = System.currentTimeMillis();
        byte[] compressedPayloadBytes = payload = eventServiceRequest.toByteArray();
        CompressionType compressionType = CompressionType.NONE;
        String contentEncoding = null;
        if (this.config.isCompressionEnabled() && ConnectMetadataCompressionUtils.shouldCompress(payload.length, this.config.getCompressionMinSize())) {
            try {
                compressionType = this.config.getCompressionTypeEnum();
                ByteBuffer compressedPayload = ConnectMetadataCompressionUtils.compress(payload, compressionType);
                contentEncoding = ConnectMetadataCompressionUtils.getContentEncodingHeader(compressionType);
                log.debug("Compressed Connect metadata payload from {} bytes to {} bytes using {}", new Object[]{payload.length, compressedPayload.limit(), compressionType});
                compressedPayloadBytes = Utils.toArray((ByteBuffer)compressedPayload);
            }
            catch (Exception e) {
                log.warn("Failed to compress Connect metadata payload using {}, sending uncompressed data", (Object)compressionType, (Object)e);
                compressionType = CompressionType.NONE;
                contentEncoding = null;
            }
        }
        Request.Builder requestBuilder = new Request.Builder().url(this.url).addHeader("Content-Type", "application/protobuf").addHeader("Authorization", Credentials.basic((String)this.key, (String)this.secret)).post(RequestBody.create((byte[])compressedPayloadBytes));
        if (contentEncoding != null) {
            requestBuilder.addHeader("Content-Encoding", contentEncoding);
        }
        Request request = requestBuilder.build();
        if (log.isTraceEnabled()) {
            try {
                String base64Body = Base64.getEncoder().encodeToString(compressedPayloadBytes);
                log.info("Publishing connect event - URL: {}, Headers: {}, Body (base64): {}", new Object[]{request.url(), request.headers().toMultimap(), base64Body});
            }
            catch (Exception e) {
                log.trace("Failed to log request body for connect event", (Throwable)e);
            }
        }
        try (Response response = this.httpclient.newCall(request).execute();){
            long endTime = System.currentTimeMillis();
            long latency = endTime - startTime;
            if (response.isSuccessful()) {
                log.debug("Published connect event {} for {} response code: {} (compression: {})", new Object[]{operation, source, response.code(), compressionType});
                this.recordMetrics(true, latency, operation, eventCount, response.code(), compressedPayloadBytes.length);
                return;
            }
            log.error("Failed to publish connect event {} for {} response code: {}", new Object[]{operation, source, response.code()});
            this.recordMetrics(false, latency, operation, eventCount, response.code(), compressedPayloadBytes.length);
            throw new EventsException.UnableToPublishEventException();
        }
        catch (IOException e) {
            long endTime = System.currentTimeMillis();
            long latency = endTime - startTime;
            this.recordMetrics(false, latency, operation, eventCount, 0, compressedPayloadBytes.length);
            throw new EventsException.UnableToPublishEventException(e);
        }
    }

    private void recordMetrics(boolean success, long latencyMs, String operation, int eventCount, int responseCode, long payloadSizeBytes) {
        try {
            MetadataPublisherMetrics metrics = MetadataPublisherMetrics.getInstance();
            String compressionType = this.config.getCompressionTypeEnum().name();
            metrics.record(success, operation, latencyMs, compressionType, eventCount, responseCode, payloadSizeBytes);
        }
        catch (Exception e) {
            log.debug("Failed to record metadata publishing metrics", (Throwable)e);
        }
    }

    @Override
    public void close() {
        if (this.sslFactory != null) {
            this.sslFactory.close();
        }
        if (this.httpclient != null) {
            this.httpclient.dispatcher().executorService().shutdown();
            this.httpclient.connectionPool().evictAll();
            log.debug("HTTP client resources cleaned up successfully");
        }
    }

    private static class ApiEventTransformer
    implements EventTransformer<List<CloudEvent>, EventServiceRequest> {
        private ApiEventTransformer() {
        }

        @Override
        public EventServiceRequest transform(List<CloudEvent> events) {
            return EventServiceRequest.newBuilder().addAllEvents(events).build();
        }
    }
}

