/*
 * Decompiled with CFR 0.152.
 */
package com.github.sonus21.rqueue.core.impl;

import com.github.sonus21.rqueue.core.EndpointRegistry;
import com.github.sonus21.rqueue.core.ReactiveRqueueMessageEnqueuer;
import com.github.sonus21.rqueue.core.RqueueMessage;
import com.github.sonus21.rqueue.core.RqueueMessageTemplate;
import com.github.sonus21.rqueue.core.impl.BaseMessageSender;
import com.github.sonus21.rqueue.core.support.RqueueMessageUtils;
import com.github.sonus21.rqueue.exception.DuplicateMessageException;
import com.github.sonus21.rqueue.listener.QueueDetail;
import com.github.sonus21.rqueue.utils.PriorityUtils;
import com.github.sonus21.rqueue.utils.Validator;
import java.util.function.Function;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.MessageConverter;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class ReactiveRqueueMessageEnqueuerImpl
extends BaseMessageSender
implements ReactiveRqueueMessageEnqueuer {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(ReactiveRqueueMessageEnqueuerImpl.class);

    public ReactiveRqueueMessageEnqueuerImpl(RqueueMessageTemplate messageTemplate, MessageConverter messageConverter, MessageHeaders messageHeaders) {
        super(messageTemplate, messageConverter, messageHeaders);
    }

    private <T> Mono<T> pushReactiveMessage(MessageBuilder builder, String queueName, String messageId, Object message, Integer retryCount, Long delayInMilliSecs, boolean isUnique, Function<RqueueMessage, Mono<T>> monoConverter) {
        QueueDetail queueDetail = EndpointRegistry.get(queueName);
        RqueueMessage rqueueMessage = builder.build(this.messageConverter, queueName, messageId, message, retryCount, delayInMilliSecs, this.messageHeaders);
        try {
            Mono storeResult = (Mono)this.storeMessageMetadata(rqueueMessage, delayInMilliSecs, true, isUnique);
            return storeResult.flatMap(success -> {
                if (Boolean.TRUE.equals(success)) {
                    Mono enqueueMono;
                    Object result = this.enqueue(queueDetail, rqueueMessage, delayInMilliSecs, true);
                    if (result instanceof Flux) {
                        enqueueMono = ((Flux)result).next();
                    } else if (result instanceof Mono) {
                        enqueueMono = (Mono)result;
                    } else {
                        return Mono.error((Throwable)new IllegalStateException("Unexpected enqueue result type: " + String.valueOf(result.getClass())));
                    }
                    return enqueueMono.flatMap(ignore -> (Mono)monoConverter.apply(rqueueMessage));
                }
                return Mono.error((Throwable)new DuplicateMessageException(rqueueMessage.getId()));
            });
        }
        catch (Exception e) {
            log.error("Failed to enqueue message [{}] to queue [{}]", new Object[]{rqueueMessage.getId(), queueName, e});
            return Mono.error((Throwable)e);
        }
    }

    private void validateBasic(String queue, Object msg) {
        Validator.validateQueue(queue);
        Validator.validateMessage(msg);
    }

    private void validateWithId(String queue, String id, Object msg) {
        Validator.validateQueue(queue);
        Validator.validateMessageId(id);
        Validator.validateMessage(msg);
    }

    private Mono<String> pushReactiveMessage(String queueName, Object message, Integer retryCount, Long delayInMilliSecs) {
        return this.pushReactiveMessage(RqueueMessageUtils::buildMessage, queueName, null, message, retryCount, delayInMilliSecs, false, rqueueMessage -> Mono.just((Object)rqueueMessage.getId()));
    }

    private Mono<Boolean> pushReactiveWithMessageId(String queueName, String messageId, Object message, Integer retryCount, Long delayInMilliSecs, boolean isUnique) {
        return this.pushReactiveMessage(RqueueMessageUtils::buildMessage, queueName, messageId, message, retryCount, delayInMilliSecs, isUnique, rqueueMessage -> Mono.just((Object)Boolean.TRUE));
    }

    private Mono<String> pushReactivePeriodicMessage(String queueName, Object message, long periodInMilliSeconds) {
        return this.pushReactiveMessage(RqueueMessageUtils::buildPeriodicMessage, queueName, null, message, null, periodInMilliSeconds, false, rqueueMessage -> Mono.just((Object)rqueueMessage.getId()));
    }

    private Mono<Boolean> pushReactivePeriodicMessageWithMessageId(String queueName, String messageId, Object message, long periodInMilliSeconds) {
        return this.pushReactiveMessage(RqueueMessageUtils::buildPeriodicMessage, queueName, messageId, message, null, periodInMilliSeconds, false, rqueueMessage -> Mono.just((Object)Boolean.TRUE));
    }

    @Override
    public Mono<String> enqueue(String queueName, Object message) {
        this.validateBasic(queueName, message);
        return this.pushReactiveMessage(queueName, message, null, null);
    }

    @Override
    public Mono<Boolean> enqueue(String queueName, String messageId, Object message) {
        this.validateWithId(queueName, messageId, message);
        return this.pushReactiveWithMessageId(queueName, messageId, message, null, null, false);
    }

    @Override
    public Mono<Boolean> enqueueUnique(String queueName, String messageId, Object message) {
        this.validateWithId(queueName, messageId, message);
        return this.pushReactiveWithMessageId(queueName, messageId, message, null, null, true);
    }

    @Override
    public Mono<String> enqueueWithRetry(String queueName, Object message, int retryCount) {
        this.validateBasic(queueName, message);
        Validator.validateRetryCount(retryCount);
        return this.pushReactiveMessage(queueName, message, retryCount, null);
    }

    @Override
    public Mono<Boolean> enqueueWithRetry(String queueName, String messageId, Object message, int retryCount) {
        this.validateWithId(queueName, messageId, message);
        Validator.validateRetryCount(retryCount);
        return this.pushReactiveWithMessageId(queueName, messageId, message, retryCount, null, false);
    }

    @Override
    public Mono<String> enqueueWithPriority(String queueName, String priority, Object message) {
        Validator.validateQueue(queueName);
        Validator.validatePriority(priority);
        Validator.validateMessage(message);
        return this.pushReactiveMessage(PriorityUtils.getQueueNameForPriority(queueName, priority), message, null, null);
    }

    @Override
    public Mono<Boolean> enqueueWithPriority(String queueName, String priority, String messageId, Object message) {
        this.validateWithId(queueName, messageId, message);
        Validator.validatePriority(priority);
        return this.pushReactiveWithMessageId(PriorityUtils.getQueueNameForPriority(queueName, priority), messageId, message, null, null, false);
    }

    @Override
    public Mono<String> enqueueIn(String queueName, Object message, long delayInMilliSecs) {
        this.validateBasic(queueName, message);
        Validator.validateDelay(delayInMilliSecs);
        return this.pushReactiveMessage(queueName, message, null, delayInMilliSecs);
    }

    @Override
    public Mono<Boolean> enqueueIn(String queueName, String messageId, Object message, long delayInMilliSecs) {
        this.validateWithId(queueName, messageId, message);
        Validator.validateDelay(delayInMilliSecs);
        return this.pushReactiveWithMessageId(queueName, messageId, message, null, delayInMilliSecs, false);
    }

    @Override
    public Mono<Boolean> enqueueUniqueIn(String queueName, String messageId, Object message, long delayInMillisecond) {
        this.validateWithId(queueName, messageId, message);
        Validator.validateDelay(delayInMillisecond);
        return this.pushReactiveWithMessageId(queueName, messageId, message, null, delayInMillisecond, true);
    }

    @Override
    public Mono<String> enqueueInWithRetry(String queueName, Object message, int retryCount, long delayInMilliSecs) {
        this.validateBasic(queueName, message);
        Validator.validateRetryCount(retryCount);
        Validator.validateDelay(delayInMilliSecs);
        return this.pushReactiveMessage(queueName, message, retryCount, delayInMilliSecs);
    }

    @Override
    public Mono<Boolean> enqueueInWithRetry(String queueName, String messageId, Object message, int retryCount, long delayInMilliSecs) {
        this.validateWithId(queueName, messageId, message);
        Validator.validateRetryCount(retryCount);
        Validator.validateDelay(delayInMilliSecs);
        return this.pushReactiveWithMessageId(queueName, messageId, message, retryCount, delayInMilliSecs, false);
    }

    @Override
    public Mono<String> enqueuePeriodic(String queueName, Object message, long periodInMilliSeconds) {
        this.validateBasic(queueName, message);
        Validator.validatePeriod(periodInMilliSeconds);
        return this.pushReactivePeriodicMessage(queueName, message, periodInMilliSeconds);
    }

    @Override
    public Mono<Boolean> enqueuePeriodic(String queueName, String messageId, Object message, long periodInMilliSeconds) {
        this.validateWithId(queueName, messageId, message);
        Validator.validatePeriod(periodInMilliSeconds);
        return this.pushReactivePeriodicMessageWithMessageId(queueName, messageId, message, periodInMilliSeconds);
    }

    @FunctionalInterface
    private static interface MessageBuilder {
        public RqueueMessage build(MessageConverter var1, String var2, String var3, Object var4, Integer var5, Long var6, MessageHeaders var7);
    }

    @FunctionalInterface
    private static interface MonoConverter<T> {
        public T convert(Long var1, Boolean var2);
    }
}

