package com.azure.messaging.servicebus.implementation;

import com.azure.core.amqp.exception.AmqpErrorCondition;
import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.exception.AmqpResponseCode;
import com.azure.core.amqp.exception.SessionErrorContext;
import com.azure.core.amqp.implementation.ExceptionUtil;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.RequestResponseChannel;
import com.azure.core.amqp.implementation.RequestResponseUtils;
import com.azure.core.amqp.implementation.TokenManager;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.ServiceBusMessage;
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import com.azure.messaging.servicebus.models.ReceiveMode;
import java.nio.BufferOverflowException;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.message.Message;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:com/azure/messaging/servicebus/implementation/ManagementChannel.class */
public class ManagementChannel implements ServiceBusManagementNode {
    private final MessageSerializer messageSerializer;
    private final TokenManager tokenManager;
    private final Duration operationTimeout;
    private final Mono<RequestResponseChannel> createRequestResponse;
    private final String fullyQualifiedNamespace;
    private final ClientLogger logger;
    private final String entityPath;
    private final AtomicLong lastPeekedSequenceNumber = new AtomicLong();
    private volatile boolean isDisposed;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ManagementChannel(Mono<RequestResponseChannel> mono, String str, String str2, TokenManager tokenManager, MessageSerializer messageSerializer, Duration duration) {
        this.createRequestResponse = mono;
        this.fullyQualifiedNamespace = str;
        this.logger = new ClientLogger(String.format("%s<%s>", ManagementChannel.class, str2));
        this.entityPath = (String) Objects.requireNonNull(str2, "'entityPath' cannot be null.");
        this.messageSerializer = (MessageSerializer) Objects.requireNonNull(messageSerializer, "'messageSerializer' cannot be null.");
        this.tokenManager = (TokenManager) Objects.requireNonNull(tokenManager, "'tokenManager' cannot be null.");
        this.operationTimeout = duration;
    }

    @Override // com.azure.messaging.servicebus.implementation.ServiceBusManagementNode
    public Mono<Void> updateDisposition(String str, DispositionStatus dispositionStatus, String str2, String str3, Map<String, Object> map) {
        UUID fromString = UUID.fromString(str);
        return isAuthorized("com.microsoft:update-disposition").then(this.createRequestResponse.flatMap(requestResponseChannel -> {
            return requestResponseChannel.sendWithAck(createDispositionMessage(new UUID[]{fromString}, dispositionStatus, null, null, null, requestResponseChannel.getReceiveLinkName()));
        }).flatMap(message -> {
            int responseStatusCode = RequestResponseUtils.getResponseStatusCode(message);
            return AmqpResponseCode.fromValue(responseStatusCode) == AmqpResponseCode.OK ? Mono.empty() : Mono.error(ExceptionUtil.amqpResponseCodeToException(responseStatusCode, "", getErrorContext()));
        }));
    }

    @Override // com.azure.messaging.servicebus.implementation.ServiceBusManagementNode
    public Mono<ServiceBusReceivedMessage> peek(long j) {
        return peek(j, 1, null).last();
    }

    @Override // com.azure.messaging.servicebus.implementation.ServiceBusManagementNode
    public Flux<ServiceBusReceivedMessage> peekBatch(int i) {
        return peek(this.lastPeekedSequenceNumber.get() + 1, i, null);
    }

    @Override // com.azure.messaging.servicebus.implementation.ServiceBusManagementNode
    public Flux<ServiceBusReceivedMessage> peekBatch(int i, long j) {
        return peek(j, i, null);
    }

    @Override // com.azure.messaging.servicebus.implementation.ServiceBusManagementNode
    public Mono<ServiceBusReceivedMessage> receiveDeferredMessage(ReceiveMode receiveMode, long j) {
        return receiveDeferredMessageBatch(receiveMode, null, j).next();
    }

    @Override // com.azure.messaging.servicebus.implementation.ServiceBusManagementNode
    public Flux<ServiceBusReceivedMessage> receiveDeferredMessageBatch(ReceiveMode receiveMode, long... jArr) {
        return receiveDeferredMessageBatch(receiveMode, null, jArr);
    }

    @Override // com.azure.messaging.servicebus.implementation.ServiceBusManagementNode
    public Mono<ServiceBusReceivedMessage> peek() {
        return peek(this.lastPeekedSequenceNumber.get() + 1);
    }

    private Flux<ServiceBusReceivedMessage> peek(long j, int i, UUID uuid) {
        return isAuthorized("com.microsoft:peek-message").thenMany(this.createRequestResponse.flatMap(requestResponseChannel -> {
            Message createManagementMessage = createManagementMessage("com.microsoft:peek-message", requestResponseChannel.getReceiveLinkName());
            HashMap hashMap = new HashMap();
            hashMap.put("from-sequence-number", Long.valueOf(j));
            hashMap.put("message-count", Integer.valueOf(i));
            if (!Objects.isNull(uuid)) {
                hashMap.put("session-id", uuid);
            }
            createManagementMessage.setBody(new AmqpValue(hashMap));
            return requestResponseChannel.sendWithAck(createManagementMessage);
        }).flatMapMany(message -> {
            List deserializeList = this.messageSerializer.deserializeList(message, ServiceBusReceivedMessage.class);
            if (deserializeList.size() > 0) {
                ServiceBusReceivedMessage serviceBusReceivedMessage = (ServiceBusReceivedMessage) deserializeList.get(deserializeList.size() - 1);
                this.logger.info("Setting last peeked sequence number: {}", new Object[]{Long.valueOf(serviceBusReceivedMessage.getSequenceNumber())});
                if (serviceBusReceivedMessage.getSequenceNumber() > 0) {
                    this.lastPeekedSequenceNumber.set(serviceBusReceivedMessage.getSequenceNumber());
                }
            }
            return Flux.fromIterable(deserializeList);
        }));
    }

    private Flux<ServiceBusReceivedMessage> receiveDeferredMessageBatch(ReceiveMode receiveMode, UUID uuid, long... jArr) {
        return isAuthorized("com.microsoft:receive-by-sequence-number").thenMany(this.createRequestResponse.flatMap(requestResponseChannel -> {
            Message createManagementMessage = createManagementMessage("com.microsoft:receive-by-sequence-number", requestResponseChannel.getReceiveLinkName());
            HashMap hashMap = new HashMap();
            hashMap.put("sequence-numbers", Arrays.stream(jArr).boxed().toArray(i -> {
                return new Long[i];
            }));
            hashMap.put("receiver-settle-mode", UnsignedInteger.valueOf(receiveMode == ReceiveMode.RECEIVE_AND_DELETE ? 0 : 1));
            if (!Objects.isNull(uuid)) {
                hashMap.put("session-id", uuid);
            }
            createManagementMessage.setBody(new AmqpValue(hashMap));
            return requestResponseChannel.sendWithAck(createManagementMessage);
        }).flatMapMany(message -> {
            List deserializeList = this.messageSerializer.deserializeList(message, ServiceBusReceivedMessage.class);
            if (deserializeList.size() > 0) {
                ServiceBusReceivedMessage serviceBusReceivedMessage = (ServiceBusReceivedMessage) deserializeList.get(deserializeList.size() - 1);
                this.logger.info("Setting last peeked sequence number: {}", new Object[]{Long.valueOf(serviceBusReceivedMessage.getSequenceNumber())});
                if (serviceBusReceivedMessage.getSequenceNumber() > 0) {
                    this.lastPeekedSequenceNumber.set(serviceBusReceivedMessage.getSequenceNumber());
                }
            }
            return Flux.fromIterable(deserializeList);
        }));
    }

    private Mono<Void> isAuthorized(String str) {
        return this.tokenManager.getAuthorizationResults().next().flatMap(amqpResponseCode -> {
            return amqpResponseCode != AmqpResponseCode.ACCEPTED ? Mono.error(new AmqpException(false, String.format("User does not have authorization to perform operation [%s] on entity [%s]", str, this.entityPath), getErrorContext())) : Mono.empty();
        });
    }

    private Message createDispositionMessage(UUID[] uuidArr, DispositionStatus dispositionStatus, String str, String str2, Map<String, Object> map, String str3) {
        this.logger.verbose("Update disposition of deliveries '{}' to '{}' on entity '{}', session '{}'", new Object[]{Arrays.toString(uuidArr), dispositionStatus, this.entityPath, "n/a"});
        Message createManagementMessage = createManagementMessage("com.microsoft:update-disposition", str3);
        HashMap hashMap = new HashMap();
        hashMap.put("lock-tokens", uuidArr);
        hashMap.put("disposition-status", dispositionStatus.getValue());
        if (str != null) {
            hashMap.put("deadletter-reason", str);
        }
        if (str2 != null) {
            hashMap.put("deadletter-description", str2);
        }
        if (map != null && map.size() > 0) {
            hashMap.put("properties-to-modify", map);
        }
        createManagementMessage.setBody(new AmqpValue(hashMap));
        return createManagementMessage;
    }

    @Override // com.azure.messaging.servicebus.implementation.ServiceBusManagementNode
    public Mono<Instant> renewMessageLock(UUID uuid) {
        return isAuthorized("com.microsoft:peek-message").then(this.createRequestResponse.flatMap(requestResponseChannel -> {
            Message createManagementMessage = createManagementMessage("com.microsoft:renew-lock", requestResponseChannel.getReceiveLinkName());
            createManagementMessage.setBody(new AmqpValue(Collections.singletonMap("lock-tokens", new UUID[]{uuid})));
            return requestResponseChannel.sendWithAck(createManagementMessage);
        }).map(message -> {
            if (RequestResponseUtils.getResponseStatusCode(message) != AmqpResponseCode.OK.getValue()) {
                throw this.logger.logExceptionAsError(new AmqpException(false, String.format("Could not renew the lock for lock token: '%s'.", uuid.toString()), getErrorContext()));
            }
            List deserializeList = this.messageSerializer.deserializeList(message, Instant.class);
            if (CoreUtils.isNullOrEmpty(deserializeList)) {
                throw this.logger.logExceptionAsError(new AmqpException(false, String.format("Service bus response empty. Could not renew message with lock token: '%s'.", uuid.toString()), getErrorContext()));
            }
            return (Instant) deserializeList.get(0);
        }));
    }

    private Message createManagementMessage(String str, String str2) {
        Duration adjustServerTimeout = MessageUtils.adjustServerTimeout(this.operationTimeout);
        HashMap hashMap = new HashMap();
        hashMap.put("operation", str);
        hashMap.put("com.microsoft:server-timeout", Long.valueOf(adjustServerTimeout.toMillis()));
        if (str2 != null && !str2.isEmpty()) {
            hashMap.put("associated-link-name", str2);
        }
        Message message = Proton.message();
        message.setApplicationProperties(new ApplicationProperties(hashMap));
        return message;
    }

    private Map<String, Object> createScheduleMessgeAmqpValue(ServiceBusMessage serviceBusMessage, int i) {
        Message serialize = this.messageSerializer.serialize(serviceBusMessage);
        int min = Math.min(this.messageSerializer.getSize(serialize) + 512, i);
        byte[] bArr = new byte[min];
        try {
            int encode = serialize.encode(bArr, 0, min);
            HashMap hashMap = new HashMap();
            hashMap.put("message", new Binary(bArr, 0, encode));
            hashMap.put("message-id", serialize.getMessageId());
            LinkedList linkedList = new LinkedList();
            linkedList.add(hashMap);
            HashMap hashMap2 = new HashMap();
            hashMap2.put("messages", linkedList);
            return hashMap2;
        } catch (BufferOverflowException e) {
            throw this.logger.logExceptionAsWarning(new AmqpException(false, AmqpErrorCondition.LINK_PAYLOAD_SIZE_EXCEEDED, String.format(Locale.US, "Error sending. Size of the payload exceeded maximum message size: %s kb", Integer.valueOf(i / 1024)), e, getErrorContext()));
        }
    }

    private AmqpErrorContext getErrorContext() {
        return new SessionErrorContext(this.fullyQualifiedNamespace, this.entityPath);
    }

    @Override // com.azure.messaging.servicebus.implementation.ServiceBusManagementNode
    public Mono<Void> cancelScheduledMessage(long j) {
        return isAuthorized("com.microsoft:cancel-scheduled-message").then(this.createRequestResponse.flatMap(requestResponseChannel -> {
            Message createManagementMessage = createManagementMessage("com.microsoft:cancel-scheduled-message", requestResponseChannel.getReceiveLinkName());
            createManagementMessage.setBody(new AmqpValue(Collections.singletonMap("sequence-numbers", new Long[]{Long.valueOf(j)})));
            return requestResponseChannel.sendWithAck(createManagementMessage);
        }).map(message -> {
            return RequestResponseUtils.getResponseStatusCode(message) == AmqpResponseCode.OK.getValue() ? Mono.empty() : Mono.error(new AmqpException(false, "Could not cancel scheduled message with sequence number " + j, getErrorContext()));
        })).then();
    }

    @Override // com.azure.messaging.servicebus.implementation.ServiceBusManagementNode
    public Mono<Long> schedule(ServiceBusMessage serviceBusMessage, Instant instant, int i) {
        serviceBusMessage.setScheduledEnqueueTime(instant);
        return isAuthorized("com.microsoft:schedule-message").then(this.createRequestResponse.flatMap(requestResponseChannel -> {
            Message createManagementMessage = createManagementMessage("com.microsoft:schedule-message", requestResponseChannel.getReceiveLinkName());
            createManagementMessage.setBody(new AmqpValue(createScheduleMessgeAmqpValue(serviceBusMessage, i)));
            return requestResponseChannel.sendWithAck(createManagementMessage);
        }).map(message -> {
            if (RequestResponseUtils.getResponseStatusCode(message) != AmqpResponseCode.OK.getValue()) {
                throw this.logger.logExceptionAsError(new AmqpException(false, String.format("Could not schedule message with message id: '%s'.", serviceBusMessage.getMessageId()), getErrorContext()));
            }
            List deserializeList = this.messageSerializer.deserializeList(message, Long.class);
            if (CoreUtils.isNullOrEmpty(deserializeList)) {
                throw this.logger.logExceptionAsError(new AmqpException(false, String.format("Service bus response empty. Could not schedule message with message id: '%s'.", serviceBusMessage.getMessageId()), getErrorContext()));
            }
            return (Long) deserializeList.get(0);
        }));
    }

    @Override // com.azure.messaging.servicebus.implementation.ServiceBusManagementNode, java.lang.AutoCloseable
    public void close() {
        if (this.isDisposed) {
            return;
        }
        this.isDisposed = true;
        this.tokenManager.close();
    }
}
