/*
 * Decompiled with CFR 0.152.
 */
package com.rabbitmq.client.amqp.impl;

import com.rabbitmq.client.amqp.AmqpException;
import com.rabbitmq.client.amqp.Consumer;
import com.rabbitmq.client.amqp.Message;
import com.rabbitmq.client.amqp.Publisher;
import com.rabbitmq.client.amqp.Responder;
import com.rabbitmq.client.amqp.impl.AmqpConnection;
import com.rabbitmq.client.amqp.impl.RequestResponseSupport;
import com.rabbitmq.client.amqp.impl.RetryUtils;
import com.rabbitmq.client.amqp.impl.Utils;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class AmqpResponder
implements Responder {
    private static final Logger LOGGER = LoggerFactory.getLogger(AmqpResponder.class);
    private static final Publisher.Callback NO_OP_CALLBACK = ctx -> {};
    private static final Predicate<Exception> RESPONSE_SENDING_EXCEPTION_PREDICATE = ex -> ex instanceof AmqpException.AmqpResourceInvalidStateException && !(ex instanceof AmqpException.AmqpResourceClosedException);
    private static final List<Duration> RESPONSE_SENDING_RETRY_WAIT_TIMES = List.of(Duration.ofSeconds(1L), Duration.ofSeconds(3L), Duration.ofSeconds(5L), Duration.ofSeconds(10L));
    private final AmqpConnection connection;
    private final Publisher publisher;
    private final Consumer consumer;
    private final Function<Message, Object> correlationIdExtractor;
    private final BiFunction<Message, Object, Message> replyPostProcessor;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final Duration closeTimeout;

    AmqpResponder(RequestResponseSupport.AmqpResponderBuilder builder) {
        this.connection = builder.connection();
        this.closeTimeout = builder.closeTimeout();
        Responder.Handler handler = builder.handler();
        this.publisher = this.connection.publisherBuilder().build();
        Responder.Context context = new Responder.Context(){

            @Override
            public boolean isRequesterAlive(Message message) {
                boolean replyToOk;
                block4: {
                    String replyToAddr = message.replyTo();
                    String replyToQueue = Utils.extractQueueName(replyToAddr);
                    replyToOk = true;
                    if (replyToQueue != null) {
                        try {
                            AmqpResponder.this.connection.management().queueInfo(replyToQueue);
                        }
                        catch (AmqpException.AmqpEntityDoesNotExistException e) {
                            replyToOk = false;
                        }
                        catch (Exception e) {
                            if (!LOGGER.isWarnEnabled()) break block4;
                            LOGGER.warn("Error while checking reply queue '{}' ({}): {}", new Object[]{replyToQueue, replyToAddr, e.getMessage()});
                        }
                    }
                }
                return replyToOk;
            }

            @Override
            public Message message() {
                return AmqpResponder.this.publisher.message();
            }

            @Override
            public Message message(byte[] body) {
                return AmqpResponder.this.publisher.message(body);
            }
        };
        this.correlationIdExtractor = builder.correlationIdExtractor() == null ? Message::messageId : builder.correlationIdExtractor();
        this.replyPostProcessor = builder.replyPostProcessor() == null ? (msg, corrId) -> {
            if (msg != null) {
                msg.correlationId(corrId);
            }
            return msg;
        } : builder.replyPostProcessor();
        this.consumer = this.connection.consumerBuilder().queue(builder.requestQueue()).messageHandler((ctx, msg) -> {
            Object correlationId = null;
            try {
                Message reply = handler.handle(context, msg);
                String replyToAddr = msg.replyTo();
                if (reply != null && replyToAddr != null) {
                    reply.to(replyToAddr);
                }
                if ((reply = this.replyPostProcessor.apply(reply, correlationId = this.correlationIdExtractor.apply(msg))) != null && reply.to() != null) {
                    this.sendReply(reply);
                }
                ctx.accept();
            }
            catch (Exception e) {
                LOGGER.info("Error while processing request (correlation ID {}): {}", correlationId, (Object)e.getMessage());
                ctx.discard();
            }
        }).build();
    }

    @Override
    public void pause() {
        this.checkOpen();
        this.consumer.pause();
    }

    @Override
    public void unpause() {
        this.checkOpen();
        this.consumer.unpause();
    }

    @Override
    public void close() {
        if (this.closed.compareAndSet(false, true)) {
            this.connection.removeResponder(this);
            try {
                this.maybeWaitForUnsettledMessages();
            }
            catch (Exception e) {
                LOGGER.warn("Error while waiting for unsettled messages in responder: {}", (Object)e.getMessage());
            }
            try {
                long unsettledMessageCount = this.consumer.unsettledMessageCount();
                if (unsettledMessageCount > 0L) {
                    LOGGER.info("Closing responder with {} unsettled message(s)", (Object)unsettledMessageCount);
                }
                this.consumer.close();
            }
            catch (Exception e) {
                LOGGER.warn("Error while closing responder consumer: {}", (Object)e.getMessage());
            }
            try {
                this.publisher.close();
            }
            catch (Exception e) {
                LOGGER.warn("Error while closing responder publisher: {}", (Object)e.getMessage());
            }
        }
    }

    private void sendReply(Message reply) {
        try {
            RetryUtils.callAndMaybeRetry(() -> {
                this.publisher.publish(reply, NO_OP_CALLBACK);
                return null;
            }, RESPONSE_SENDING_EXCEPTION_PREDICATE, RESPONSE_SENDING_RETRY_WAIT_TIMES, "Responder Response", new Object[0]);
        }
        catch (Exception e) {
            LOGGER.info("Error while processing request: {}", (Object)e.getMessage());
        }
    }

    private void maybeWaitForUnsettledMessages() {
        if (this.closeTimeout.toNanos() > 0L) {
            Duration waited = Duration.ZERO;
            Duration waitStep = Duration.ofMillis(10L);
            while (this.consumer.unsettledMessageCount() > 0L && waited.compareTo(this.closeTimeout) < 0) {
                try {
                    Thread.sleep(100L);
                    waited = waited.plus(waitStep);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }
    }

    private void checkOpen() {
        if (this.closed.get()) {
            throw new AmqpException.AmqpResourceClosedException("Responder is closed");
        }
    }
}

