/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.protocol.amqp.proton;

import java.util.Arrays;
import java.util.List;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
import org.apache.activemq.artemis.core.security.CheckType;
import org.apache.activemq.artemis.core.security.SecurityAuth;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPLargeMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFoundException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPSecurityException;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonDeliveryHandler;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonInitializable;
import org.apache.activemq.artemis.protocol.amqp.sasl.PlainSASLResult;
import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Outcome;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.codec.ReadableBuffer;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver;
import org.jboss.logging.Logger;

public class ProtonServerReceiverContext
extends ProtonInitializable
implements ProtonDeliveryHandler {
    private static final Logger log = Logger.getLogger(ProtonServerReceiverContext.class);
    protected final AMQPConnectionContext connection;
    protected final AMQPSessionContext protonSession;
    protected final Receiver receiver;
    protected SimpleString address;
    protected final AMQPSessionCallback sessionSPI;
    final RoutingContext routingContext;
    protected final Runnable creditRunnable;
    protected final Runnable spiFlow = this::sessionSPIFlow;
    private final boolean useModified;
    private int pendingSettles = 0;
    private final int amqpCredits;
    private final int minCreditRefresh;
    private final int minLargeMessageSize;
    volatile AMQPLargeMessage currentLargeMessage;

    public static Runnable createCreditRunnable(int refill, int threshold, Receiver receiver, AMQPConnectionContext connection, ProtonServerReceiverContext context) {
        return new FlowControlRunner(refill, threshold, receiver, connection, context);
    }

    public static Runnable createCreditRunnable(int refill, int threshold, Receiver receiver, AMQPConnectionContext connection) {
        return new FlowControlRunner(refill, threshold, receiver, connection, null);
    }

    public static boolean isBellowThreshold(int credit, int pending, int threshold) {
        return credit <= threshold - pending;
    }

    public static int calculatedUpdateRefill(int refill, int credits, int pending) {
        return refill - credits - pending;
    }

    public ProtonServerReceiverContext(AMQPSessionCallback sessionSPI, AMQPConnectionContext connection, AMQPSessionContext protonSession, Receiver receiver) {
        this.connection = connection;
        this.routingContext = new RoutingContextImpl(null).setDuplicateDetection(connection.getProtocolManager().isAmqpDuplicateDetection());
        this.protonSession = protonSession;
        this.receiver = receiver;
        this.sessionSPI = sessionSPI;
        this.amqpCredits = connection.getAmqpCredits();
        this.minCreditRefresh = connection.getAmqpLowCredits();
        this.creditRunnable = ProtonServerReceiverContext.createCreditRunnable(this.amqpCredits, this.minCreditRefresh, receiver, connection, this);
        this.useModified = this.connection.getProtocolManager().isUseModifiedForTransientDeliveryErrors();
        this.minLargeMessageSize = connection.getProtocolManager().getAmqpMinLargeMessageSize();
        if (sessionSPI != null) {
            sessionSPI.addCloseable(failed -> this.clearLargeMessage());
        }
    }

    protected void clearLargeMessage() {
        this.connection.runNow(() -> {
            if (this.currentLargeMessage != null) {
                try {
                    this.currentLargeMessage.deleteFile();
                }
                catch (Throwable error) {
                    ActiveMQServerLogger.LOGGER.errorDeletingLargeMessageFile(error);
                }
                finally {
                    this.currentLargeMessage = null;
                }
            }
        });
    }

    @Override
    public void onFlow(int credits, boolean drain) {
        this.flow();
    }

    @Override
    public void initialise() throws Exception {
        super.initialise();
        Target target = (Target)this.receiver.getRemoteTarget();
        this.receiver.setSenderSettleMode(this.receiver.getRemoteSenderSettleMode());
        this.receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
        if (target != null) {
            List<Symbol> list;
            if (target.getDynamic()) {
                this.address = SimpleString.toSimpleString((String)this.sessionSPI.tempQueueName());
                RoutingType defRoutingType = this.getRoutingType(target.getCapabilities(), this.address);
                try {
                    this.sessionSPI.createTemporaryQueue(this.address, defRoutingType);
                }
                catch (ActiveMQAMQPSecurityException e) {
                    throw e;
                }
                catch (ActiveMQSecurityException e) {
                    throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingTempDestination(e.getMessage());
                }
                catch (Exception e) {
                    throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
                }
                target.setAddress(this.address.toString());
            } else {
                this.address = SimpleString.toSimpleString((String)target.getAddress());
                if (this.address != null && !this.address.isEmpty()) {
                    RoutingType defRoutingType = this.getRoutingType(target.getCapabilities(), this.address);
                    try {
                        if (!this.sessionSPI.checkAddressAndAutocreateIfPossible(this.address, defRoutingType)) {
                            throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist();
                        }
                    }
                    catch (ActiveMQAMQPNotFoundException e) {
                        throw e;
                    }
                    catch (Exception e) {
                        log.debug((Object)e.getMessage(), (Throwable)e);
                        throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
                    }
                    try {
                        this.sessionSPI.check(this.address, CheckType.SEND, new SecurityAuth(){

                            public String getUsername() {
                                String username = null;
                                SASLResult saslResult = ProtonServerReceiverContext.this.connection.getSASLResult();
                                if (saslResult != null) {
                                    username = saslResult.getUser();
                                }
                                return username;
                            }

                            public String getPassword() {
                                String password = null;
                                SASLResult saslResult = ProtonServerReceiverContext.this.connection.getSASLResult();
                                if (saslResult != null && saslResult instanceof PlainSASLResult) {
                                    password = ((PlainSASLResult)saslResult).getPassword();
                                }
                                return password;
                            }

                            public RemotingConnection getRemotingConnection() {
                                return ProtonServerReceiverContext.this.connection.connectionCallback.getProtonConnectionDelegate();
                            }

                            public String getSecurityDomain() {
                                return ProtonServerReceiverContext.this.connection.getProtocolManager().getSecurityDomain();
                            }
                        });
                    }
                    catch (ActiveMQSecurityException e) {
                        throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingProducer(e.getMessage());
                    }
                }
            }
            Symbol[] remoteDesiredCapabilities = this.receiver.getRemoteDesiredCapabilities();
            if (remoteDesiredCapabilities != null && (list = Arrays.asList(remoteDesiredCapabilities)).contains(AmqpSupport.DELAYED_DELIVERY)) {
                this.receiver.setOfferedCapabilities(new Symbol[]{AmqpSupport.DELAYED_DELIVERY});
            }
        }
        this.flow();
    }

    public RoutingType getRoutingType(Receiver receiver, SimpleString address) {
        Target target = (Target)receiver.getRemoteTarget();
        return target != null ? this.getRoutingType(target.getCapabilities(), address) : this.getRoutingType((Symbol[])null, address);
    }

    private RoutingType getRoutingType(Symbol[] symbols, SimpleString address) {
        AddressInfo addressInfo;
        if (symbols != null) {
            for (Symbol symbol : symbols) {
                if (AmqpSupport.TEMP_TOPIC_CAPABILITY.equals(symbol) || AmqpSupport.TOPIC_CAPABILITY.equals(symbol)) {
                    return RoutingType.MULTICAST;
                }
                if (!AmqpSupport.TEMP_QUEUE_CAPABILITY.equals(symbol) && !AmqpSupport.QUEUE_CAPABILITY.equals(symbol)) continue;
                return RoutingType.ANYCAST;
            }
        }
        if ((addressInfo = this.sessionSPI.getAddress(address)) != null && !addressInfo.getRoutingTypes().isEmpty() && addressInfo.getRoutingTypes().size() == 1 && addressInfo.getRoutingType() == RoutingType.MULTICAST) {
            return RoutingType.MULTICAST;
        }
        RoutingType defaultRoutingType = this.sessionSPI.getDefaultRoutingType(address);
        defaultRoutingType = defaultRoutingType == null ? ActiveMQDefaultConfiguration.getDefaultRoutingType() : defaultRoutingType;
        return defaultRoutingType;
    }

    @Override
    public void onMessage(Delivery delivery) throws ActiveMQAMQPException {
        this.connection.requireInHandler();
        Receiver receiver = (Receiver)delivery.getLink();
        if (receiver.current() != delivery) {
            return;
        }
        try {
            AMQPMessage message;
            if (delivery.isAborted()) {
                this.clearLargeMessage();
                receiver.advance();
                delivery.settle();
                if (!receiver.getDrain()) {
                    receiver.flow(1);
                }
                return;
            }
            if (delivery.isPartial()) {
                if (this.sessionSPI.getStorageManager() instanceof NullStorageManager) {
                    return;
                }
                if (this.currentLargeMessage == null) {
                    if (this.minLargeMessageSize > 0 && delivery.available() >= this.minLargeMessageSize) {
                        this.initializeCurrentLargeMessage(delivery, receiver);
                    }
                } else {
                    this.currentLargeMessage.addBytes(receiver.recv());
                }
                return;
            }
            if (!(this.sessionSPI.getStorageManager() instanceof NullStorageManager) && this.currentLargeMessage == null && this.minLargeMessageSize > 0 && delivery.available() >= this.minLargeMessageSize) {
                this.initializeCurrentLargeMessage(delivery, receiver);
            }
            if (this.currentLargeMessage != null) {
                this.currentLargeMessage.addBytes(receiver.recv());
                receiver.advance();
                this.currentLargeMessage.finishParse();
                message = this.currentLargeMessage;
                this.currentLargeMessage = null;
            } else {
                ReadableBuffer data = receiver.recv();
                receiver.advance();
                message = this.sessionSPI.createStandardMessage(delivery, data);
            }
            Transaction tx = null;
            if (delivery.getRemoteState() instanceof TransactionalState) {
                TransactionalState txState = (TransactionalState)delivery.getRemoteState();
                tx = this.sessionSPI.getTransaction(txState.getTxnId(), false);
            }
            this.actualDelivery(message, delivery, receiver, tx);
        }
        catch (Exception e) {
            throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
        }
    }

    private void initializeCurrentLargeMessage(Delivery delivery, Receiver receiver) throws Exception {
        long id = this.sessionSPI.getStorageManager().generateID();
        this.currentLargeMessage = new AMQPLargeMessage(id, delivery.getMessageFormat(), null, this.sessionSPI.getCoreMessageObjectPools(), this.sessionSPI.getStorageManager());
        ReadableBuffer dataBuffer = receiver.recv();
        this.currentLargeMessage.parseHeader(dataBuffer);
        this.sessionSPI.getStorageManager().largeMessageCreated(id, (LargeServerMessage)this.currentLargeMessage);
        this.currentLargeMessage.addBytes(dataBuffer);
    }

    private void actualDelivery(AMQPMessage message, Delivery delivery, Receiver receiver, Transaction tx) {
        try {
            this.sessionSPI.serverSend(this, tx, receiver, delivery, this.address, this.routingContext, message);
        }
        catch (Exception e) {
            log.warn((Object)e.getMessage(), (Throwable)e);
            this.deliveryFailed(delivery, receiver, e);
        }
    }

    public void deliveryFailed(Delivery delivery, Receiver receiver, Exception e) {
        this.connection.runNow(() -> {
            DeliveryState deliveryState = this.determineDeliveryState((Source)receiver.getSource(), this.useModified, e);
            delivery.disposition(deliveryState);
            this.settle(delivery);
            this.connection.flush();
        });
    }

    private DeliveryState determineDeliveryState(Source source, boolean useModified, Exception e) {
        Outcome defaultOutcome = this.getEffectiveDefaultOutcome(source);
        if (this.isAddressFull(e) && useModified && (this.outcomeSupported(source, Modified.DESCRIPTOR_SYMBOL) || defaultOutcome instanceof Modified)) {
            Modified modified = new Modified();
            modified.setDeliveryFailed(Boolean.valueOf(true));
            return modified;
        }
        if (this.outcomeSupported(source, Rejected.DESCRIPTOR_SYMBOL) || defaultOutcome instanceof Rejected) {
            return this.createRejected(e);
        }
        if (source.getDefaultOutcome() instanceof DeliveryState) {
            return (DeliveryState)source.getDefaultOutcome();
        }
        return this.createRejected(e);
    }

    private boolean isAddressFull(Exception e) {
        return e instanceof ActiveMQException && ActiveMQExceptionType.ADDRESS_FULL.equals((Object)((ActiveMQException)e).getType());
    }

    private Rejected createRejected(Exception e) {
        ErrorCondition condition = new ErrorCondition();
        if (e instanceof ActiveMQSecurityException) {
            condition.setCondition(AmqpError.UNAUTHORIZED_ACCESS);
        } else if (this.isAddressFull(e)) {
            condition.setCondition(AmqpError.RESOURCE_LIMIT_EXCEEDED);
        } else {
            condition.setCondition(Symbol.valueOf((String)"failed"));
        }
        condition.setDescription(e.getMessage());
        Rejected rejected = new Rejected();
        rejected.setError(condition);
        return rejected;
    }

    @Override
    public void close(boolean remoteLinkClose) throws ActiveMQAMQPException {
        this.protonSession.removeReceiver(this.receiver);
        Target target = (Target)this.receiver.getRemoteTarget();
        if (target != null && target.getDynamic() && (target.getExpiryPolicy() == TerminusExpiryPolicy.LINK_DETACH || target.getExpiryPolicy() == TerminusExpiryPolicy.SESSION_END)) {
            try {
                this.sessionSPI.removeTemporaryQueue(SimpleString.toSimpleString((String)target.getAddress()));
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
    }

    @Override
    public void close(ErrorCondition condition) throws ActiveMQAMQPException {
        this.receiver.setCondition(condition);
        this.close(false);
        this.clearLargeMessage();
    }

    public int incrementSettle() {
        assert (this.pendingSettles >= 0);
        this.connection.requireInHandler();
        return this.pendingSettles++;
    }

    public void settle(Delivery settlement) {
        this.connection.requireInHandler();
        --this.pendingSettles;
        assert (this.pendingSettles >= 0);
        settlement.settle();
        this.flow();
    }

    public void flow() {
        this.connection.afterFlush(this.spiFlow);
    }

    private void sessionSPIFlow() {
        this.connection.requireInHandler();
        if (this.sessionSPI != null) {
            this.sessionSPI.flow(this.address, this.creditRunnable);
        } else {
            this.creditRunnable.run();
        }
    }

    public void drain(int credits) {
        this.connection.runNow(() -> {
            this.receiver.drain(credits);
            this.connection.flush();
        });
    }

    public int drained() {
        return this.receiver.drained();
    }

    public boolean isDraining() {
        return this.receiver.draining();
    }

    private boolean outcomeSupported(Source source, Symbol outcome) {
        if (source != null && source.getOutcomes() != null) {
            return Arrays.asList(source.getOutcomes()).contains(outcome);
        }
        return false;
    }

    private Outcome getEffectiveDefaultOutcome(Source source) {
        return source.getOutcomes() == null || source.getOutcomes().length == 0 ? source.getDefaultOutcome() : null;
    }

    static class FlowControlRunner
    implements Runnable {
        final int refill;
        final int threshold;
        final Receiver receiver;
        final AMQPConnectionContext connection;
        final ProtonServerReceiverContext context;

        FlowControlRunner(int refill, int threshold, Receiver receiver, AMQPConnectionContext connection, ProtonServerReceiverContext context) {
            this.refill = refill;
            this.threshold = threshold;
            this.receiver = receiver;
            this.connection = connection;
            this.context = context;
        }

        @Override
        public void run() {
            int topUp;
            int pending;
            if (!this.connection.isHandler()) {
                this.connection.runLater(this);
                return;
            }
            this.connection.requireInHandler();
            int n = pending = this.context != null ? this.context.pendingSettles : 0;
            if (ProtonServerReceiverContext.isBellowThreshold(this.receiver.getCredit(), pending, this.threshold) && (topUp = ProtonServerReceiverContext.calculatedUpdateRefill(this.refill, this.receiver.getCredit(), pending)) > 0) {
                this.receiver.flow(topUp);
                this.connection.instantFlush();
            }
        }
    }
}

