package org.apache.activemq.artemis.protocol.amqp.proton.transaction;

import java.nio.ByteBuffer;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonDeliveryHandler;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.transaction.Declare;
import org.apache.qpid.proton.amqp.transaction.Declared;
import org.apache.qpid.proton.amqp.transaction.Discharge;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.message.impl.MessageImpl;
import org.jboss.logging.Logger;

/* loaded from: input_file:org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.class */
public class ProtonTransactionHandler implements ProtonDeliveryHandler {
    private static final Logger log = Logger.getLogger(ProtonTransactionHandler.class);
    private final int amqpCredit;
    private final int amqpLowMark;
    private Transaction currentTx;
    final AMQPSessionCallback sessionSPI;
    final AMQPConnectionContext connection;
    private final ByteBuffer DECODE_BUFFER = ByteBuffer.allocate(64);

    public ProtonTransactionHandler(AMQPSessionCallback aMQPSessionCallback, AMQPConnectionContext aMQPConnectionContext) {
        this.sessionSPI = aMQPSessionCallback;
        this.connection = aMQPConnectionContext;
        this.amqpCredit = aMQPConnectionContext.getAmqpCredits();
        this.amqpLowMark = aMQPConnectionContext.getAmqpLowCredits();
        this.sessionSPI.setTransactionHandler(this);
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.ProtonDeliveryHandler
    public void onMessage(final Delivery delivery) throws ActiveMQAMQPException {
        try {
            Receiver link = delivery.getLink();
            if (delivery.isReadable()) {
                if (link.getCredit() < this.amqpLowMark) {
                    link.flow(this.amqpCredit);
                }
                ByteBuffer allocate = delivery.available() > this.DECODE_BUFFER.capacity() ? ByteBuffer.allocate(delivery.available()) : (ByteBuffer) this.DECODE_BUFFER.clear();
                allocate.limit(link.recv(allocate.array(), allocate.arrayOffset(), allocate.capacity()));
                link.advance();
                Object value = decodeMessage(allocate).getBody().getValue();
                if (value instanceof Declare) {
                    Binary newTransaction = this.sessionSPI.newTransaction();
                    final Declared declared = new Declared();
                    declared.setTxnId(newTransaction);
                    this.currentTx = this.sessionSPI.getTransaction(newTransaction, false);
                    this.sessionSPI.afterIO(new IOCallback() { // from class: org.apache.activemq.artemis.protocol.amqp.proton.transaction.ProtonTransactionHandler.1
                        public void done() {
                            AMQPConnectionContext aMQPConnectionContext = ProtonTransactionHandler.this.connection;
                            Delivery delivery2 = delivery;
                            Declared declared2 = declared;
                            aMQPConnectionContext.runLater(() -> {
                                delivery2.settle();
                                delivery2.disposition(declared2);
                                ProtonTransactionHandler.this.connection.flush();
                            });
                        }

                        public void onError(int i, String str) {
                            ProtonTransactionHandler.this.currentTx = null;
                        }
                    });
                } else if (value instanceof Discharge) {
                    Discharge discharge = (Discharge) value;
                    ProtonTransactionImpl transaction = this.sessionSPI.getTransaction(discharge.getTxnId(), true);
                    transaction.discharge();
                    IOCallback iOCallback = new IOCallback() { // from class: org.apache.activemq.artemis.protocol.amqp.proton.transaction.ProtonTransactionHandler.2
                        public void done() {
                            AMQPConnectionContext aMQPConnectionContext = ProtonTransactionHandler.this.connection;
                            Delivery delivery2 = delivery;
                            aMQPConnectionContext.runLater(() -> {
                                delivery2.settle();
                                delivery2.disposition(new Accepted());
                                ProtonTransactionHandler.this.currentTx = null;
                                ProtonTransactionHandler.this.connection.flush();
                            });
                        }

                        public void onError(int i, String str) {
                        }
                    };
                    if (discharge.getFail().booleanValue()) {
                        this.sessionSPI.withinSessionExecutor(() -> {
                            try {
                                transaction.rollback();
                                this.sessionSPI.afterIO(iOCallback);
                            } catch (Throwable th) {
                                txError(delivery, th);
                            }
                        });
                    } else {
                        this.sessionSPI.withinSessionExecutor(() -> {
                            try {
                                transaction.commit();
                                this.sessionSPI.afterIO(iOCallback);
                            } catch (Throwable th) {
                                txError(delivery, th);
                            }
                        });
                    }
                }
            }
        } catch (ActiveMQAMQPException e) {
            txError(delivery, e);
        } catch (Throwable th) {
            txError(delivery, th);
        }
    }

    private void txError(Delivery delivery, Throwable th) {
        log.warn(th.getMessage(), th);
        this.connection.runNow(() -> {
            delivery.settle();
            if (th instanceof ActiveMQAMQPException) {
                delivery.disposition(createRejected(((ActiveMQAMQPException) th).getAmqpError(), th.getMessage()));
            } else {
                delivery.disposition(createRejected(Symbol.getSymbol("failed"), th.getMessage()));
            }
            this.connection.flush();
        });
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.ProtonDeliveryHandler
    public void onFlow(int i, boolean z) {
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.ProtonDeliveryHandler
    public void close(boolean z) throws ActiveMQAMQPException {
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.ProtonDeliveryHandler
    public void close(ErrorCondition errorCondition) throws ActiveMQAMQPException {
    }

    private Rejected createRejected(Symbol symbol, String str) {
        Rejected rejected = new Rejected();
        ErrorCondition errorCondition = new ErrorCondition();
        errorCondition.setCondition(symbol);
        errorCondition.setDescription(str);
        rejected.setError(errorCondition);
        return rejected;
    }

    private MessageImpl decodeMessage(ByteBuffer byteBuffer) {
        MessageImpl create = Message.Factory.create();
        create.decode(byteBuffer);
        return create;
    }

    public Transaction getCurrentTransaction() {
        return this.currentTx;
    }
}
