/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.mqtt.protocol.netty;

import io.confluent.mqtt.MqttConfig;
import io.confluent.mqtt.RegexListTopicMapper;
import io.confluent.mqtt.TopicMapper;
import io.confluent.mqtt.protocol.MqttHandler;
import io.confluent.mqtt.protocol.security.SecurityProtocol;
import io.confluent.mqtt.protocol.security.UserPasswordCallbackHandler;
import io.confluent.mqtt.stream.PublishMqttRecord;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.mqtt.MqttConnAckMessage;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectPayload;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageBuilders;
import io.netty.handler.codec.mqtt.MqttMessageFactory;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPubAckMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.net.SocketAddress;
import java.util.EnumMap;
import java.util.function.BiConsumer;
import java.util.function.Function;
import javax.security.auth.login.LoginContext;
import javax.security.auth.login.LoginException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.DataException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ChannelHandler.Sharable
public class NettyMqttHandler
extends ChannelInboundHandlerAdapter
implements MqttHandler {
    private static final Logger log = LoggerFactory.getLogger(NettyMqttHandler.class);
    public static final String LOGIN_CONTEXT_NAME = "ConfluentKafkaMqtt";
    private static NettyMqttHandler instance;
    private final SecurityProtocol securityProtocol;
    private final EnumMap<MqttMessageType, BiConsumer<Channel, MqttMessage>> mqttAutomaton;
    private final TopicMapper topicMapper;

    public static synchronized NettyMqttHandler instance(MqttConfig config) {
        if (instance == null) {
            instance = new NettyMqttHandler(config);
        }
        return instance;
    }

    public static synchronized void resetInstance() {
        instance = null;
    }

    protected NettyMqttHandler(MqttConfig config) {
        this.topicMapper = new RegexListTopicMapper(config);
        this.mqttAutomaton = new EnumMap(MqttMessageType.class);
        this.mqttAutomaton.put(MqttMessageType.CONNECT, this.connectHandler());
        this.mqttAutomaton.put(MqttMessageType.DISCONNECT, this.disconnectHandler());
        this.mqttAutomaton.put(MqttMessageType.PINGREQ, this.pingHandler());
        this.mqttAutomaton.put(MqttMessageType.PUBLISH, this.publishHandler());
        this.mqttAutomaton.put(MqttMessageType.PUBREL, this.publishReleaseHandler());
        this.securityProtocol = config.listenersSecurityProtocol();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void channelRead(ChannelHandlerContext context, Object mqttMessage) {
        MqttMessage message = (MqttMessage)mqttMessage;
        try {
            if (message.decoderResult().isFailure()) {
                log.debug("Decoding MQTT message from {} failed with: ", (Object)context.channel(), (Object)message.decoderResult().cause());
                return;
            }
            MqttMessageType messageType = message.fixedHeader().messageType();
            BiConsumer<Channel, MqttMessage> messageHandler = this.mqttAutomaton.get(messageType);
            if (messageHandler != null) {
                messageHandler.accept(context.channel(), message);
            } else {
                log.debug("No handler available for MQTT message '{}' from {}. Ignoring", (Object)messageType, (Object)context.channel().remoteAddress());
            }
        }
        catch (Throwable e) {
            log.debug("Handling MQTT message with fixed header '{}' from {} failed with: ", new Object[]{message.fixedHeader(), context.channel(), e});
        }
        finally {
            ReferenceCountUtil.release((Object)message);
        }
    }

    public void channelInactive(ChannelHandlerContext context) throws Exception {
    }

    public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception {
    }

    public void channelWritabilityChanged(ChannelHandlerContext context) throws Exception {
    }

    private MqttConnAckMessage connAck(MqttConnectReturnCode returnCode, boolean sessionPresent) {
        return MqttMessageBuilders.connAck().returnCode(returnCode).sessionPresent(sessionPresent).build();
    }

    public BiConsumer<Channel, MqttMessage> connectHandler() {
        return (channel, message) -> {
            SocketAddress remoteAddress = channel.remoteAddress();
            log.info("Handling MQTT message '{}' from {}", (Object)MqttMessageType.CONNECT, (Object)remoteAddress);
            MqttConnectMessage connectMessage = (MqttConnectMessage)message;
            boolean authenticated = this.login(connectMessage.payload());
            if (authenticated) {
                log.debug("Authentication successful from {}", (Object)remoteAddress);
                channel.writeAndFlush((Object)this.connAck(MqttConnectReturnCode.CONNECTION_ACCEPTED, false));
            } else {
                log.debug("Authentication failed from {}", (Object)remoteAddress);
                channel.writeAndFlush((Object)this.connAck(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD, false));
                log.info("Closing connection from {}", (Object)remoteAddress);
                channel.close();
            }
        };
    }

    public BiConsumer<Channel, MqttMessage> disconnectHandler() {
        return (channel, message) -> {
            SocketAddress remoteAddress = channel.remoteAddress();
            log.info("Handling MQTT message '{}' from {}", (Object)MqttMessageType.DISCONNECT, (Object)remoteAddress);
            if (log.isDebugEnabled()) {
                log.debug("Message metadata {} from {}", (Object)NettyMqttHandler.messageMetadata(message), (Object)remoteAddress);
            }
            log.info("Closing connection from {}", (Object)remoteAddress);
            channel.flush();
            channel.close().addListener((GenericFutureListener)ChannelFutureListener.CLOSE_ON_FAILURE);
        };
    }

    public BiConsumer<Channel, MqttMessage> pingHandler() {
        return (channel, message) -> {
            if (log.isDebugEnabled()) {
                log.debug("Handling MQTT message '{}' from {}", (Object)MqttMessageType.PINGREQ, (Object)channel.remoteAddress());
            }
            channel.writeAndFlush((Object)this.pingResp());
        };
    }

    public BiConsumer<Channel, MqttMessage> publishHandler() {
        return (channel, message) -> {
            MqttPublishMessage publishMessage = (MqttPublishMessage)message;
            log.debug("Handling MQTT message '{}' from {}", (Object)MqttMessageType.PUBLISH, (Object)channel.remoteAddress());
            MqttQoS mqttQoS = publishMessage.fixedHeader().qosLevel();
            ChannelFuture publishToKafkaFuture = channel.write((Object)this.newKafkaRecord(publishMessage));
            switch (mqttQoS) {
                case EXACTLY_ONCE: {
                    publishToKafkaFuture.addListener(this.listener((Channel)channel, publishMessage, this::pubRec));
                    break;
                }
                case AT_LEAST_ONCE: {
                    publishToKafkaFuture.addListener(this.listener((Channel)channel, publishMessage, this::pubAck));
                    break;
                }
                case AT_MOST_ONCE: {
                    break;
                }
                default: {
                    log.warn("'{}' message has invalid QoS level '{}'. Ignoring", (Object)MqttMessageType.PUBLISH, (Object)mqttQoS);
                }
            }
        };
    }

    public BiConsumer<Channel, MqttMessage> publishReleaseHandler() {
        return (channel, message) -> {
            if (log.isDebugEnabled()) {
                log.debug("Handling MQTT message '{}' from {}", (Object)MqttMessageType.PUBREL, (Object)channel.remoteAddress());
            }
            channel.writeAndFlush((Object)this.pubComp(this.messageId((MqttMessage)message)));
        };
    }

    private MqttMessage pingResp() {
        MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0);
        log.debug("Responding to client with {}", (Object)MqttMessageType.PINGREQ);
        return new MqttMessage(fixedHeader);
    }

    private MqttPubAckMessage pubAck(Integer messageId) {
        MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false, 2);
        MqttMessageIdVariableHeader mqttVariableHeader = MqttMessageIdVariableHeader.from((int)messageId);
        log.debug("Responding to client with {}", (Object)MqttMessageType.PUBACK);
        return (MqttPubAckMessage)MqttMessageFactory.newMessage((MqttFixedHeader)mqttFixedHeader, (Object)mqttVariableHeader, null);
    }

    private MqttMessage pubRec(Integer messageId) {
        MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBREC, false, MqttQoS.AT_MOST_ONCE, false, 2);
        MqttMessageIdVariableHeader mqttVariableHeader = MqttMessageIdVariableHeader.from((int)messageId);
        log.debug("Responding to client with {}", (Object)MqttMessageType.PUBREC);
        return MqttMessageFactory.newMessage((MqttFixedHeader)mqttFixedHeader, (Object)mqttVariableHeader, null);
    }

    private MqttMessage pubComp(Integer messageId) {
        MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 2);
        MqttMessageIdVariableHeader mqttVariableHeader = MqttMessageIdVariableHeader.from((int)messageId);
        log.debug("Responding to client with '{}'", (Object)MqttMessageType.PUBCOMP);
        return MqttMessageFactory.newMessage((MqttFixedHeader)mqttFixedHeader, (Object)mqttVariableHeader, null);
    }

    public GenericFutureListener<? extends Future<? super Void>> listener(Channel channel, MqttPublishMessage publishMessage, Function<Integer, MqttMessage> callback) {
        return future -> {
            if (future.isSuccess()) {
                if (log.isDebugEnabled()) {
                    log.debug("Publishing to Kafka succeeded for message with {} from {}", (Object)NettyMqttHandler.messageMetadata((MqttMessage)publishMessage), (Object)channel.remoteAddress());
                }
                channel.writeAndFlush(callback.apply(this.messageId((MqttMessage)publishMessage)));
            } else if (future.isCancelled()) {
                log.warn("Publishing to Kafka was canceled for message with {} from {}", (Object)NettyMqttHandler.messageMetadata((MqttMessage)publishMessage), (Object)channel.remoteAddress());
            } else {
                log.warn("Publishing to Kafka failed for message with {} from {} due to {}", new Object[]{NettyMqttHandler.messageMetadata((MqttMessage)publishMessage), channel.remoteAddress(), future.cause()});
            }
        };
    }

    private PublishMqttRecord newKafkaRecord(MqttPublishMessage publishMessage) {
        String mqttTopic = publishMessage.variableHeader().topicName();
        TopicPartition kafkaTopic = this.topicMapper.map(mqttTopic).orElseThrow(() -> new DataException("No matching Kafka topic was found for " + mqttTopic));
        log.trace("MQTT topic '{}' mapped to Kafka topic '{}'", (Object)mqttTopic, (Object)kafkaTopic);
        return PublishMqttRecord.newRecord(kafkaTopic, publishMessage);
    }

    private Integer messageId(MqttMessage message) {
        return message instanceof MqttPublishMessage ? ((MqttPublishMessage)message).variableHeader().packetId() : ((MqttMessageIdVariableHeader)message.variableHeader()).messageId();
    }

    private static String messageMetadata(MqttMessage message) {
        String fixedHeader = message.fixedHeader() != null ? message.fixedHeader().toString() : "";
        String variableHeader = message.variableHeader() != null ? message.variableHeader().toString() : "";
        return "[fixedHeader=" + fixedHeader + ", variableHeader=" + variableHeader + "]";
    }

    private boolean login(MqttConnectPayload payload) {
        try {
            if (SecurityProtocol.SASL_TLS.equals((Object)this.securityProtocol) || SecurityProtocol.SASL_SSL.equals((Object)this.securityProtocol) || SecurityProtocol.SASL_PLAINTEXT.equals((Object)this.securityProtocol)) {
                LoginContext lc = new LoginContext(LOGIN_CONTEXT_NAME, new UserPasswordCallbackHandler(payload));
                lc.login();
            }
            return true;
        }
        catch (LoginException e) {
            log.warn("Authentication failed for user '{}'", (Object)payload.userName(), (Object)e);
        }
        catch (Throwable t) {
            log.warn("Unable to login: ", t);
        }
        return false;
    }
}

