/*
 * Decompiled with CFR 0.152.
 */
package com.rabbitmq.client.amqp.impl;

import com.rabbitmq.client.amqp.AmqpException;
import com.rabbitmq.client.amqp.Management;
import com.rabbitmq.client.amqp.Message;
import com.rabbitmq.client.amqp.Publisher;
import com.rabbitmq.client.amqp.Requester;
import com.rabbitmq.client.amqp.impl.AmqpConnection;
import com.rabbitmq.client.amqp.impl.AmqpConsumer;
import com.rabbitmq.client.amqp.impl.AmqpConsumerBuilder;
import com.rabbitmq.client.amqp.impl.AmqpPublisherBuilder;
import com.rabbitmq.client.amqp.impl.Clock;
import com.rabbitmq.client.amqp.impl.DefaultAddressBuilder;
import com.rabbitmq.client.amqp.impl.RequestResponseSupport;
import com.rabbitmq.client.amqp.impl.Utils;
import java.time.Duration;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class AmqpRequester
implements Requester {
    private static final Logger LOGGER = LoggerFactory.getLogger(AmqpRequester.class);
    private static final Publisher.Callback NO_OP_CALLBACK = ctx -> {};
    private final AmqpConnection connection;
    private final Clock clock;
    private final Publisher publisher;
    private final AmqpConsumer consumer;
    private final Map<Object, OutstandingRequest> outstandingRequests = new ConcurrentHashMap<Object, OutstandingRequest>();
    private final Supplier<Object> correlationIdSupplier;
    private final BiFunction<Message, Object, Message> requestPostProcessor;
    private final Function<Message, Object> correlationIdExtractor;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final Duration requestTimeout;
    private final ScheduledFuture<?> requestTimeoutFuture;

    AmqpRequester(RequestResponseSupport.AmqpRequesterBuilder builder) {
        boolean directReplyTo;
        this.connection = builder.connection();
        this.clock = this.connection.clock();
        AmqpPublisherBuilder publisherBuilder = (AmqpPublisherBuilder)this.connection.publisherBuilder();
        ((DefaultAddressBuilder)((Object)builder.requestAddress())).copyTo(publisherBuilder.addressBuilder());
        this.publisher = publisherBuilder.build();
        String replyTo = builder.replyToQueue();
        if (replyTo == null) {
            directReplyTo = this.connection.directReplyToSupported();
            if (!directReplyTo) {
                Management.QueueInfo queueInfo = this.connection.management().queue().exclusive(true).autoDelete(true).declare();
                replyTo = queueInfo.name();
            }
        } else {
            directReplyTo = false;
        }
        this.correlationIdExtractor = builder.correlationIdExtractor() == null ? Message::correlationId : builder.correlationIdExtractor();
        AmqpConsumerBuilder consumerBuilder = (AmqpConsumerBuilder)this.connection.consumerBuilder();
        LOGGER.debug("Using direct reply-to: {}", (Object)this.connection.directReplyToSupported());
        this.consumer = (AmqpConsumer)consumerBuilder.directReplyTo(directReplyTo).queue(replyTo).messageHandler((ctx, msg) -> {
            ctx.accept();
            OutstandingRequest request = this.outstandingRequests.remove(this.correlationIdExtractor.apply(msg));
            if (request != null) {
                request.future.complete(msg);
            }
        }).build();
        if (builder.correlationIdSupplier() == null) {
            String correlationIdPrefix = UUID.randomUUID().toString();
            AtomicLong correlationIdSequence = new AtomicLong();
            this.correlationIdSupplier = () -> correlationIdPrefix + "-" + correlationIdSequence.getAndIncrement();
        } else {
            this.correlationIdSupplier = builder.correlationIdSupplier();
        }
        if (builder.requestPostProcessor() == null) {
            if (directReplyTo) {
                this.requestPostProcessor = (request, correlationId) -> request.replyTo(this.consumer.directReplyToAddress()).messageId(correlationId);
            } else {
                DefaultAddressBuilder<?> addressBuilder = Utils.addressBuilder();
                addressBuilder.queue(replyTo);
                String replyToAddress = addressBuilder.address();
                this.requestPostProcessor = (request, correlationId) -> request.replyTo(replyToAddress).messageId(correlationId);
            }
        } else {
            this.requestPostProcessor = builder.requestPostProcessor();
        }
        this.requestTimeout = builder.requestTimeout();
        Runnable requestTimeoutTask = this.requestTimeoutTask();
        this.requestTimeoutFuture = this.connection.scheduledExecutorService().scheduleAtFixedRate(() -> {
            try {
                requestTimeoutTask.run();
            }
            catch (Exception e) {
                LOGGER.info("Error during request timeout task: {}", (Object)e.getMessage());
            }
        }, this.requestTimeout.toMillis(), this.requestTimeout.toMillis(), TimeUnit.MILLISECONDS);
    }

    @Override
    public Message message() {
        return this.publisher.message();
    }

    @Override
    public Message message(byte[] body) {
        return this.publisher.message(body);
    }

    @Override
    public CompletableFuture<Message> publish(Message message) {
        this.checkOpen();
        Object correlationId = this.correlationIdSupplier.get();
        message = this.requestPostProcessor.apply(message, correlationId);
        long time = this.clock.time();
        CompletableFuture<Message> future = new CompletableFuture<Message>();
        this.outstandingRequests.put(correlationId, new OutstandingRequest(future, time));
        this.publisher.publish(message, NO_OP_CALLBACK);
        return future;
    }

    @Override
    public void close() {
        if (this.closed.compareAndSet(false, true)) {
            this.connection.removeRequester(this);
            this.requestTimeoutFuture.cancel(true);
            try {
                this.publisher.close();
            }
            catch (Exception e) {
                LOGGER.warn("Error while closing requester publisher: {}", (Object)e.getMessage());
            }
            try {
                this.consumer.close();
            }
            catch (Exception e) {
                LOGGER.warn("Error while closing requester consumer: {}", (Object)e.getMessage());
            }
            this.outstandingRequests.values().forEach(r -> r.future.completeExceptionally(new AmqpException("Requester is closed", new Object[0])));
        }
    }

    Runnable requestTimeoutTask() {
        return () -> {
            long limit = this.clock.time() - this.requestTimeout.toNanos();
            Iterator<OutstandingRequest> iterator = this.outstandingRequests.values().iterator();
            while (iterator.hasNext()) {
                if (Thread.currentThread().isInterrupted()) {
                    return;
                }
                OutstandingRequest request = iterator.next();
                if (request.time >= limit) continue;
                try {
                    iterator.remove();
                }
                catch (Exception e) {
                    LOGGER.warn("Error while pruning timed out request: {}", (Object)e.getMessage());
                }
                request.future.completeExceptionally(new AmqpException("Request timed out", new Object[0]));
            }
        };
    }

    private void checkOpen() {
        if (this.closed.get()) {
            throw new AmqpException("Requester is closed", new Object[0]);
        }
    }

    private static class OutstandingRequest {
        private final CompletableFuture<Message> future;
        private final long time;

        private OutstandingRequest(CompletableFuture<Message> future, long time) {
            this.future = future;
            this.time = time;
        }
    }
}

