/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.mqtt.stream.netty;

import io.confluent.mqtt.MqttConfig;
import io.confluent.mqtt.stream.DeliveryGuarantee;
import io.confluent.mqtt.stream.KafkaPublisher;
import io.confluent.mqtt.stream.PublishMqttRecord;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import java.nio.charset.StandardCharsets;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeClusterResult;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Values;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.header.Headers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ChannelHandler.Sharable
public class NettyKafkaPublisher
extends ChannelOutboundHandlerAdapter
implements KafkaPublisher {
    private static final Logger log = LoggerFactory.getLogger(NettyKafkaPublisher.class);
    private static final String MQTT_PROXY_RESOURCE_TYPE = "mqtt_proxy";
    private static final String PRE_0_10_1_0_KAFKA = "CLUSTER_ID_IS_NOT_SUPPORTED";
    private static final String PREFIX = "confluent.telemetry.";
    private static final String METRIC_PREFIX = "metric.";
    private static final String RESOURCE_LABEL_CLUSTER_ID = "resource.cluster.id";
    private static NettyKafkaPublisher instance;
    private final Map<DeliveryGuarantee, Producer<String, byte[]>> producers;

    protected NettyKafkaPublisher(Map<DeliveryGuarantee, Producer<String, byte[]>> producers) {
        this.producers = producers;
    }

    public static synchronized NettyKafkaPublisher instance(MqttConfig config) {
        if (instance == null) {
            try (AdminClient admin = AdminClient.create(config.adminClientConfig());){
                DescribeClusterResult describeClusterResult = admin.describeCluster();
                String kafkaClusterId = (String)describeClusterResult.clusterId().get();
                if (kafkaClusterId == null) {
                    log.warn("Kafka doesn't support clusterId, defaulting to {}", (Object)PRE_0_10_1_0_KAFKA);
                    kafkaClusterId = PRE_0_10_1_0_KAFKA;
                }
                instance = new NettyKafkaPublisher(NettyKafkaPublisher.newProducers(config, kafkaClusterId));
            }
            catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException("Failed to retrieve Kafka cluster id", e);
            }
        }
        return instance;
    }

    private static Map<DeliveryGuarantee, Producer<String, byte[]>> newProducers(MqttConfig config, String kafkaClusterId) {
        EnumMap<DeliveryGuarantee, Producer<String, byte[]>> producers = new EnumMap<DeliveryGuarantee, Producer<String, byte[]>>(DeliveryGuarantee.class);
        Map<String, Object> atMostOnceProps = NettyKafkaPublisher.producerProperties(config, kafkaClusterId);
        atMostOnceProps.put("acks", "0");
        atMostOnceProps.put("enable.idempotence", false);
        KafkaProducer atMostOnceProducer = new KafkaProducer(atMostOnceProps);
        producers.put(DeliveryGuarantee.AT_MOST_ONCE, (Producer<String, byte[]>)atMostOnceProducer);
        Map<String, Object> atLeastOnceProps = NettyKafkaPublisher.producerProperties(config, kafkaClusterId);
        atLeastOnceProps.put("acks", "all");
        atLeastOnceProps.put("enable.idempotence", false);
        KafkaProducer atLeastOnceProducer = new KafkaProducer(atLeastOnceProps);
        producers.put(DeliveryGuarantee.AT_LEAST_ONCE, (Producer<String, byte[]>)atLeastOnceProducer);
        Map<String, Object> exactlyOnceProps = NettyKafkaPublisher.producerProperties(config, kafkaClusterId);
        exactlyOnceProps.put("acks", "all");
        exactlyOnceProps.put("enable.idempotence", true);
        KafkaProducer exactlyOnceProducer = new KafkaProducer(exactlyOnceProps);
        producers.put(DeliveryGuarantee.EXACTLY_ONCE, (Producer<String, byte[]>)exactlyOnceProducer);
        return producers;
    }

    private static Map<String, Object> producerProperties(MqttConfig config, String clusterId) {
        HashMap<String, Object> producerProps = new HashMap<String, Object>();
        producerProps.put("bootstrap.servers", config.bootstrapServers());
        producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producerProps.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        producerProps.putAll(config.originalsWithPrefix("producer.", true));
        producerProps.putAll(config.originalsWithPrefix(PREFIX, false));
        producerProps.putAll(config.originalsWithPrefix(METRIC_PREFIX, false));
        producerProps.put("metrics.context.resource.type", MQTT_PROXY_RESOURCE_TYPE);
        producerProps.put("metrics.context.resource.version", AppInfoParser.getVersion());
        producerProps.put("metrics.context.resource.commit.id", AppInfoParser.getCommitId());
        producerProps.put("metrics.context.resource.cluster.id", clusterId);
        return producerProps;
    }

    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        if (!(msg instanceof PublishMqttRecord)) {
            ctx.write(msg);
            return;
        }
        String channelId = ctx.channel().id().asShortText();
        PublishMqttRecord kafkaRecord = (PublishMqttRecord)msg;
        log.debug("Publishing MQTT message to Kafka with QoS level '{}'", (Object)kafkaRecord.deliveryGuarantee());
        log.trace("Channel {} publishing message to Kafka {} ", (Object)channelId, (Object)kafkaRecord);
        try {
            Callback callback = (metadata, exception) -> {
                if (exception != null) {
                    promise.setFailure((Throwable)exception);
                    log.error("Channel {} publishing to Kafka failed for {}", new Object[]{channelId, msg, exception});
                    return;
                }
                promise.setSuccess();
                log.trace("Success in channel {} publishing to topic '{}', partition '{}', offset '{}'", new Object[]{channelId, metadata.topic(), metadata.partition(), metadata.offset()});
            };
            this.publish(kafkaRecord, callback);
        }
        catch (Exception e) {
            log.error("Channel {} publishing to Kafka failed with error {}", (Object)channelId, (Object)e);
            throw e;
        }
    }

    private void publish(PublishMqttRecord kafkaRecord, Callback callback) {
        DeliveryGuarantee deliveryGuarantee = kafkaRecord.deliveryGuarantee();
        Producer<String, byte[]> producer = this.producers.get((Object)kafkaRecord.deliveryGuarantee());
        if (producer == null) {
            callback.onCompletion(null, (Exception)new RuntimeException("Delivery guarantee " + (Object)((Object)deliveryGuarantee) + "is not supported"));
            return;
        }
        ProducerRecord record = new ProducerRecord(kafkaRecord.topic(), null, null, (Object)kafkaRecord.key(), (Object)kafkaRecord.value(), (Iterable)NettyKafkaPublisher.convertToRecordHeaders(kafkaRecord));
        producer.send(record, callback);
    }

    private static RecordHeaders convertToRecordHeaders(PublishMqttRecord record) {
        Headers headers = record.headers();
        RecordHeaders result = new RecordHeaders();
        if (headers != null) {
            for (Header header : headers) {
                String key = header.key();
                byte[] rawHeader = Values.convertToString((Schema)header.schema(), (Object)header.value()).getBytes(StandardCharsets.UTF_8);
                result.add(key, rawHeader);
            }
        }
        return result;
    }
}

