/*
 * Decompiled with CFR 0.152.
 */
package org.mule.extension.redis.internal.source;

import java.util.List;
import java.util.function.Consumer;
import org.mule.extension.redis.api.SubscribeChannelAttributes;
import org.mule.extension.redis.internal.connection.RedisConnection;
import org.mule.extension.redis.internal.error.RedisErrorType;
import org.mule.extension.redis.internal.error.exceptions.RedisConnectionException;
import org.mule.extension.redis.internal.error.exceptions.UnableToUnsubscribeException;
import org.mule.extension.redis.internal.service.ChannelSubscription;
import org.mule.extension.redis.internal.service.MessagingAPIService;
import org.mule.extension.redis.internal.service.RedisClientAdapter;
import org.mule.extension.redis.internal.service.dto.MessageDTO;
import org.mule.extension.redis.internal.service.factory.RedisClientAdapterFactory;
import org.mule.extension.redis.internal.service.factory.ServiceFactory;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.connection.ConnectionProvider;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.extension.api.annotation.Alias;
import org.mule.runtime.extension.api.annotation.Streaming;
import org.mule.runtime.extension.api.annotation.param.Connection;
import org.mule.runtime.extension.api.annotation.param.MediaType;
import org.mule.runtime.extension.api.annotation.param.Parameter;
import org.mule.runtime.extension.api.annotation.source.EmitsResponse;
import org.mule.runtime.extension.api.error.ErrorTypeDefinition;
import org.mule.runtime.extension.api.exception.ModuleException;
import org.mule.runtime.extension.api.runtime.operation.Result;
import org.mule.runtime.extension.api.runtime.source.Source;
import org.mule.runtime.extension.api.runtime.source.SourceCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisException;

@Alias(value="subscribe")
@MediaType(value="*/*", strict=false)
@EmitsResponse
@Streaming
public class SubscribeChannelSource
extends Source<String, SubscribeChannelAttributes> {
    private static final Logger logger = LoggerFactory.getLogger(SubscribeChannelSource.class);
    @Parameter
    private List<String> channels;
    @Connection
    private ConnectionProvider<RedisConnection> provider;
    private ServiceFactory serviceFactory = new ServiceFactory();
    private RedisClientAdapterFactory clientAdapterFactory = new RedisClientAdapterFactory();
    private ChannelSubscription channelSubscription;
    private RedisClientAdapter redisClientAdapter;

    public void onStart(SourceCallback<String, SubscribeChannelAttributes> sourceCallback) throws MuleException {
        this.redisClientAdapter = this.clientAdapterFactory.getRedisClientAdapter((RedisConnection)this.provider.connect());
        MessagingAPIService<MessageDTO> messaginService = this.serviceFactory.getMessagingAPIService(this.redisClientAdapter);
        this.channelSubscription = messaginService.subscribeToChannel(this.messageConsumer(sourceCallback), this.messageErrorHandler(sourceCallback), this.channels);
    }

    private Consumer<MessageDTO> messageConsumer(SourceCallback<String, SubscribeChannelAttributes> sourceCallback) {
        return messageDTO -> {
            SubscribeChannelAttributes subscribeChannelAttributes = new SubscribeChannelAttributes();
            subscribeChannelAttributes.setChannel(messageDTO.getChannel());
            sourceCallback.handle(Result.builder().output((Object)messageDTO.getPayload()).attributes((Object)subscribeChannelAttributes).build());
        };
    }

    private Consumer<RuntimeException> messageErrorHandler(SourceCallback<String, SubscribeChannelAttributes> sourceCallback) {
        return exception -> {
            if (exception instanceof RedisConnectionException) {
                sourceCallback.onConnectionException(new ConnectionException("Something nasty has happened with the connection to server.", (Throwable)exception));
            } else if (exception instanceof UnableToUnsubscribeException) {
                logger.warn("Unable to unsubscribe from channels.", (Throwable)exception);
            } else {
                logger.warn("Unknown error. Please try again and if the problem persists then contact support team.", (Throwable)exception);
            }
        };
    }

    public void onStop() {
        try {
            this.channelSubscription.stop();
        }
        finally {
            this.closeClientAdapter();
        }
    }

    private void closeClientAdapter() {
        try {
            this.redisClientAdapter.close();
        }
        catch (JedisConnectionException e) {
            throw new ModuleException("Unable to close connection.", (ErrorTypeDefinition)RedisErrorType.UNKNOWN, (Throwable)e);
        }
        catch (JedisException e) {
            throw new ModuleException("Unknown error while trying to close connection.", (ErrorTypeDefinition)RedisErrorType.UNKNOWN, (Throwable)e);
        }
    }

    public void setChannels(List<String> channels) {
        this.channels = channels;
    }
}

