package org.apache.qpid.client;

import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.qpid.AMQConnectionClosedException;
import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQTimeoutException;
import org.apache.qpid.QpidException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverState;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.protocol.BlockingMethodFrameListener;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateWaiter;
import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
import org.apache.qpid.codec.ClientDecoder;
import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.AMQProtocolHeaderException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ConnectionCloseOkBody;
import org.apache.qpid.framing.HeartbeatBody;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.framing.ProtocolInitiation;
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.protocol.ErrorCodes;
import org.apache.qpid.thread.Threading;
import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.ConnectionSettings;
import org.apache.qpid.transport.ExceptionHandlingByteBufferReceiver;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.transport.network.TransportActivity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/qpid/client/AMQProtocolHandler.class */
public class AMQProtocolHandler implements ExceptionHandlingByteBufferReceiver, TransportActivity {
    private static final Logger _logger = LoggerFactory.getLogger(AMQProtocolHandler.class);
    private static final long MAXIMUM_STATE_WAIT_TIME = Long.parseLong(System.getProperty("amqj.MaximumStateWait", "30000"));
    private static final String AMQJ_DEFAULT_SYNCWRITE_TIMEOUT = "amqj.default_syncwrite_timeout";
    private final AMQConnection _connection;
    private final AMQProtocolSession _protocolSession;
    private AMQStateManager _stateManager;
    private CountDownLatch _failoverLatch;
    private FailoverException _lastFailoverException;
    private ClientDecoder _decoder;
    private ProtocolVersion _suggestedProtocolVersion;
    private long _writtenBytes;
    private long _readBytes;
    private int _messageReceivedCount;
    private int _messagesOut;
    private NetworkConnection _network;
    private ByteBufferSender _sender;
    private Throwable _initialConnectionException;
    private static final int REUSABLE_BYTE_BUFFER_CAPACITY = 66560;
    private final CopyOnWriteArraySet<AMQMethodListener> _frameListeners = new CopyOnWriteArraySet<>();
    private FailoverState _failoverState = FailoverState.NOT_STARTED;
    private final Object _failoverLatchChange = new Object();
    private final long DEFAULT_SYNC_TIMEOUT = Long.getLong(ClientProperties.QPID_SYNC_OP_TIMEOUT, Long.getLong("amqj.default_syncwrite_timeout", ClientProperties.DEFAULT_FLOW_CONTROL_WAIT_FAILURE)).longValue();
    private long _lastReadTime = System.currentTimeMillis();
    private long _lastWriteTime = System.currentTimeMillis();
    private HeartbeatListener _heartbeatListener = HeartbeatListener.DEFAULT;
    private final byte[] _reusableBytes = new byte[REUSABLE_BYTE_BUFFER_CAPACITY];
    private int _queueId = 1;
    private final Object _queueIdLock = new Object();
    private final FailoverHandler _failoverHandler = new FailoverHandler(this);

    public AMQProtocolHandler(AMQConnection aMQConnection) {
        this._connection = aMQConnection;
        this._protocolSession = new AMQProtocolSession(this, this._connection);
        this._stateManager = new AMQStateManager(this._protocolSession);
    }

    @Override // org.apache.qpid.transport.ByteBufferReceiver
    public void closed() {
        if (this._connection.isClosed()) {
            _logger.debug("Session closed called by client");
        } else {
            boolean z = false;
            boolean z2 = false;
            Throwable th = null;
            synchronized (this) {
                if (_logger.isDebugEnabled()) {
                    _logger.debug("Session closed called with failover state " + this._failoverState);
                }
                if (this._failoverState == FailoverState.NOT_STARTED) {
                    try {
                        this._sender.close();
                    } catch (Exception e) {
                        _logger.warn("Exception occurred on closing the sender", e);
                    }
                    if (this._connection.failoverAllowed()) {
                        this._failoverState = FailoverState.IN_PROGRESS;
                        _logger.debug("FAILOVER STARTING");
                        startFailoverThread();
                    } else if (this._connection.isConnected()) {
                        z = true;
                        if (_logger.isDebugEnabled()) {
                            _logger.debug("Failover not allowed by policy:" + this._connection.getFailoverPolicy());
                        }
                    } else {
                        z2 = true;
                        th = this._initialConnectionException;
                        _logger.debug("We are in process of establishing the initial connection");
                    }
                    this._initialConnectionException = null;
                } else {
                    _logger.debug("Not starting the failover thread as state currently " + this._failoverState);
                }
            }
            if (z) {
                this._connection.closed(new AMQDisconnectedException("Server closed connection and reconnection not permitted.", this._stateManager.getLastException()));
            } else if (z2) {
                if (th == null) {
                    th = this._stateManager.getLastException();
                }
                this._connection.exceptionReceived(new QpidException("Connection could not be established: " + (th == null ? "" : th.getMessage()), th));
            }
        }
        if (_logger.isDebugEnabled()) {
            _logger.debug("Protocol Session [" + this + "] closed");
        }
    }

    private void startFailoverThread() {
        if (this._connection.isClosed()) {
            return;
        }
        try {
            Thread createThread = Threading.getThreadFactory().createThread(new Runnable() { // from class: org.apache.qpid.client.AMQProtocolHandler.1
                @Override // java.lang.Runnable
                public void run() {
                    if (Thread.currentThread().isDaemon()) {
                        throw new IllegalStateException("FailoverHandler must run on a non-daemon thread.");
                    }
                    CountDownLatch countDownLatch = new CountDownLatch(1);
                    AMQProtocolHandler.this.setFailoverLatch(countDownLatch);
                    try {
                        AMQProtocolHandler.this.notifyFailoverStarting();
                        AMQProtocolHandler.this.getConnection().doWithAllLocks(AMQProtocolHandler.this._failoverHandler);
                    } finally {
                        countDownLatch.countDown();
                        AMQProtocolHandler.this.setFailoverLatch(null);
                    }
                }
            });
            createThread.setName("Failover");
            createThread.setDaemon(false);
            createThread.start();
        } catch (Exception e) {
            throw new RuntimeException("Failed to create thread", e);
        }
    }

    @Override // org.apache.qpid.transport.network.TransportActivity
    public void readerIdle() {
        _logger.debug("Protocol Session [" + this + "] idle: reader");
        _logger.warn("Timed out while waiting for heartbeat from peer.");
        this._network.close();
    }

    @Override // org.apache.qpid.transport.network.TransportActivity
    public void writerIdle() {
        _logger.debug("Protocol Session [" + this + "] idle: writer");
        writeFrame(HeartbeatBody.FRAME);
        this._heartbeatListener.heartbeatSent();
    }

    @Override // org.apache.qpid.transport.ExceptionHandlingByteBufferReceiver
    public void exception(Throwable th) {
        boolean z = (th instanceof AMQConnectionClosedException) || (th instanceof IOException) || (th instanceof TransportException);
        if (z) {
            try {
                this._network.close();
            } catch (Exception e) {
            }
        }
        FailoverState failoverState = getFailoverState();
        if (failoverState == FailoverState.NOT_STARTED) {
            if (!z) {
                this._connection.exceptionReceived(th);
                return;
            } else {
                _logger.info("Connection exception caught therefore going to attempt failover: " + th, th);
                this._initialConnectionException = th;
                return;
            }
        }
        if (failoverState != FailoverState.FAILED) {
            _logger.warn("Exception caught by protocol handler: " + th, th);
            return;
        }
        _logger.error("Exception caught by protocol handler: " + th, th);
        AMQDisconnectedException aMQDisconnectedException = new AMQDisconnectedException("Failover could not re-establish connectivity: " + th, th);
        propagateExceptionToAllWaiters(aMQDisconnectedException);
        this._connection.closed(aMQDisconnectedException);
    }

    public void propagateExceptionToAllWaiters(Exception exc) {
        getStateManager().error(exc);
        propagateExceptionToFrameListeners(exc);
    }

    public void propagateExceptionToFrameListeners(Exception exc) {
        synchronized (this._frameListeners) {
            if (!this._frameListeners.isEmpty()) {
                Iterator<AMQMethodListener> it = this._frameListeners.iterator();
                while (it.hasNext()) {
                    it.next().error(exc);
                }
            }
        }
    }

    public void notifyFailoverStarting() {
        synchronized (this._frameListeners) {
            this._lastFailoverException = new FailoverException("Failing over about to start");
        }
        propagateExceptionToFrameListeners(this._lastFailoverException);
    }

    public void failoverInProgress() {
        this._lastFailoverException = null;
    }

    @Override // org.apache.qpid.transport.ByteBufferReceiver
    public void received(ByteBuffer byteBuffer) {
        this._readBytes += byteBuffer.remaining();
        this._lastReadTime = System.currentTimeMillis();
        List<AMQDataBlock> processedMethods = this._protocolSession.getMethodProcessor().getProcessedMethods();
        try {
            try {
                this._decoder.decodeBuffer(byteBuffer);
                int size = processedMethods.size();
                for (int i = 0; i < size; i++) {
                    AMQDataBlock aMQDataBlock = processedMethods.get(i);
                    _logger.debug("RECV: {}", aMQDataBlock);
                    if (aMQDataBlock instanceof AMQFrame) {
                        int i2 = this._messageReceivedCount + 1;
                        this._messageReceivedCount = i2;
                        if (i2 % 1000 == 0 && _logger.isDebugEnabled()) {
                            _logger.debug("Received {} protocol messages", Integer.valueOf(this._messageReceivedCount));
                        }
                        AMQFrame aMQFrame = (AMQFrame) aMQDataBlock;
                        aMQFrame.getBodyFrame().handle(aMQFrame.getChannel(), this._protocolSession);
                        this._connection.bytesReceived(this._readBytes);
                    } else if (aMQDataBlock instanceof ProtocolInitiation) {
                        try {
                            this._suggestedProtocolVersion = ((ProtocolInitiation) aMQDataBlock).checkVersion();
                            _logger.debug("Broker suggested using protocol version: {} ", this._suggestedProtocolVersion);
                            this._stateManager.changeState(AMQState.CONNECTION_CLOSED);
                        } catch (AMQProtocolHeaderException e) {
                            this._stateManager.error(e);
                            throw e;
                        }
                    } else {
                        continue;
                    }
                }
                processedMethods.clear();
            } catch (Exception e2) {
                _logger.error("Exception processing frame", e2);
                propagateExceptionToFrameListeners(e2);
                this._stateManager.propagateExceptionToStateWaiters(e2);
                exception(e2);
                processedMethods.clear();
            }
        } catch (Throwable th) {
            processedMethods.clear();
            throw th;
        }
    }

    public void methodBodyReceived(int i, AMQBody aMQBody) throws QpidException {
        AMQMethodEvent aMQMethodEvent = new AMQMethodEvent(i, (AMQMethodBody) aMQBody);
        try {
            boolean methodReceived = getStateManager().methodReceived(aMQMethodEvent);
            synchronized (this._frameListeners) {
                if (!this._frameListeners.isEmpty()) {
                    Iterator<AMQMethodListener> it = this._frameListeners.iterator();
                    while (it.hasNext()) {
                        methodReceived = it.next().methodReceived(aMQMethodEvent) || methodReceived;
                    }
                }
            }
            if (!methodReceived) {
                throw new QpidException("AMQMethodEvent " + aMQMethodEvent + " was not processed by any listener.  Listeners:" + this._frameListeners, null);
            }
        } catch (QpidException e) {
            propagateExceptionToFrameListeners(e);
            this._stateManager.propagateExceptionToStateWaiters(e);
            exception(e);
        }
    }

    public StateWaiter createWaiter(Set<AMQState> set) throws QpidException {
        return getStateManager().createWaiter(set);
    }

    public void writeFrame(AMQDataBlock aMQDataBlock) {
        writeFrame(aMQDataBlock, true);
    }

    public synchronized void writeFrame(AMQDataBlock aMQDataBlock, boolean z) {
        this._lastWriteTime = System.currentTimeMillis();
        this._writtenBytes += aMQDataBlock.getSize();
        aMQDataBlock.writePayload(this._sender);
        if (z) {
            this._sender.flush();
        }
        _logger.debug("SEND: {}", aMQDataBlock);
        int i = this._messagesOut;
        this._messagesOut = i + 1;
        long j = i;
        if (_logger.isDebugEnabled() && j % 1000 == 0) {
            _logger.debug("Sent {} protocol messages", Integer.valueOf(this._messagesOut));
        }
        this._connection.bytesSent(this._writtenBytes);
    }

    public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQDataBlock aMQDataBlock, BlockingMethodFrameListener blockingMethodFrameListener) throws QpidException, FailoverException {
        return writeCommandFrameAndWaitForReply(aMQDataBlock, blockingMethodFrameListener, this.DEFAULT_SYNC_TIMEOUT);
    }

    public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQDataBlock aMQDataBlock, BlockingMethodFrameListener blockingMethodFrameListener, long j) throws QpidException, FailoverException {
        Exception lastException;
        try {
            synchronized (this._frameListeners) {
                if (this._lastFailoverException != null) {
                    throw this._lastFailoverException;
                }
                if ((this._stateManager.getCurrentState() == AMQState.CONNECTION_CLOSED || this._stateManager.getCurrentState() == AMQState.CONNECTION_CLOSING) && (lastException = this._stateManager.getLastException()) != null) {
                    if (lastException instanceof QpidException) {
                        throw ((QpidException) lastException).cloneForCurrentThread();
                    }
                    throw new AMQException(ErrorCodes.INTERNAL_ERROR, lastException.getMessage(), lastException);
                }
                this._frameListeners.add(blockingMethodFrameListener);
            }
            writeFrame(aMQDataBlock);
            AMQMethodEvent blockForFrame = blockingMethodFrameListener.blockForFrame(j == -1 ? this.DEFAULT_SYNC_TIMEOUT : j);
            this._frameListeners.remove(blockingMethodFrameListener);
            return blockForFrame;
        } catch (Throwable th) {
            this._frameListeners.remove(blockingMethodFrameListener);
            throw th;
        }
    }

    public AMQMethodEvent syncWrite(AMQFrame aMQFrame, Class cls) throws QpidException, FailoverException {
        return syncWrite(aMQFrame, cls, this.DEFAULT_SYNC_TIMEOUT);
    }

    public AMQMethodEvent syncWrite(AMQFrame aMQFrame, Class cls, long j) throws QpidException, FailoverException {
        return writeCommandFrameAndWaitForReply(aMQFrame, new SpecificMethodFrameListener(aMQFrame.getChannel(), cls, getConnectionDetails()), j);
    }

    public String getConnectionDetails() {
        return getLocalAddress() + "-" + getRemoteAddress();
    }

    public void closeSession(AMQSession aMQSession) throws QpidException {
        this._protocolSession.closeSession(aMQSession);
    }

    public void closeConnection(long j) throws QpidException {
        try {
            if (getStateManager().getCurrentState().equals(AMQState.CONNECTION_OPEN)) {
                try {
                    syncWrite(this._protocolSession.getMethodRegistry().createConnectionCloseBody(ErrorCodes.REPLY_SUCCESS, new AMQShortString("JMS client is closing the connection."), 0, 0).generateFrame(0), ConnectionCloseOkBody.class, j);
                    this._network.close();
                } catch (AMQTimeoutException e) {
                    _logger.debug("Timeout on sending connection close : " + e);
                    this._network.close();
                } catch (FailoverException e2) {
                    _logger.debug("FailoverException interrupted connection close, ignoring as connection closed anyway.");
                    this._network.close();
                }
            }
        } catch (Throwable th) {
            this._network.close();
            throw th;
        }
    }

    public long getReadBytes() {
        return this._readBytes;
    }

    public long getWrittenBytes() {
        return this._writtenBytes;
    }

    public void blockUntilNotFailingOver() throws InterruptedException {
        CountDownLatch failoverLatch = getFailoverLatch();
        if (failoverLatch == null || failoverLatch.await(MAXIMUM_STATE_WAIT_TIME, TimeUnit.MILLISECONDS)) {
            return;
        }
        _logger.debug("Timed out after waiting {}ms for failover to complete.", Long.valueOf(MAXIMUM_STATE_WAIT_TIME));
    }

    public String generateQueueName() {
        int i;
        synchronized (this._queueIdLock) {
            i = this._queueId;
            this._queueId = i + 1;
        }
        return ("tmp_" + getLocalAddress().toString().replaceAll("[./:;]", "_") + "_" + i).replaceAll("_+", "_");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CountDownLatch getFailoverLatch() {
        CountDownLatch countDownLatch;
        synchronized (this._failoverLatchChange) {
            countDownLatch = this._failoverLatch;
        }
        return countDownLatch;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setFailoverLatch(CountDownLatch countDownLatch) {
        synchronized (this._failoverLatchChange) {
            this._failoverLatch = countDownLatch;
        }
    }

    public AMQConnection getConnection() {
        return this._connection;
    }

    public AMQStateManager getStateManager() {
        return this._stateManager;
    }

    public void setStateManager(AMQStateManager aMQStateManager) {
        this._stateManager = aMQStateManager;
        this._stateManager.setProtocolSession(this._protocolSession);
    }

    public AMQProtocolSession getProtocolSession() {
        return this._protocolSession;
    }

    synchronized FailoverState getFailoverState() {
        return this._failoverState;
    }

    public synchronized void setFailoverState(FailoverState failoverState) {
        this._failoverState = failoverState;
    }

    public MethodRegistry getMethodRegistry() {
        return this._protocolSession.getMethodRegistry();
    }

    public ProtocolVersion getProtocolVersion() {
        return this._protocolSession.getProtocolVersion();
    }

    public SocketAddress getRemoteAddress() {
        return this._network.getRemoteAddress();
    }

    public SocketAddress getLocalAddress() {
        return this._network.getLocalAddress();
    }

    public void setNetworkConnection(NetworkConnection networkConnection) {
        setNetworkConnection(networkConnection, networkConnection.getSender());
    }

    public void setNetworkConnection(NetworkConnection networkConnection, ByteBufferSender byteBufferSender) {
        this._network = networkConnection;
        this._sender = byteBufferSender;
        this._protocolSession.setSender(byteBufferSender);
    }

    @Override // org.apache.qpid.transport.network.TransportActivity
    public long getLastReadTime() {
        return this._lastReadTime;
    }

    @Override // org.apache.qpid.transport.network.TransportActivity
    public long getLastWriteTime() {
        return this._lastWriteTime;
    }

    protected ByteBufferSender getSender() {
        return this._sender;
    }

    public NetworkConnection getNetworkConnection() {
        return this._network;
    }

    public ProtocolVersion getSuggestedProtocolVersion() {
        return this._suggestedProtocolVersion;
    }

    public void setHeartbeatListener(HeartbeatListener heartbeatListener) {
        this._heartbeatListener = heartbeatListener == null ? HeartbeatListener.DEFAULT : heartbeatListener;
    }

    public void heartbeatBodyReceived() {
        this._heartbeatListener.heartbeatReceived();
    }

    public void setMaxFrameSize(long j) {
        this._decoder.setMaxFrameSize((j == 0 || j > 2147483647L) ? Integer.MAX_VALUE : (int) j);
    }

    public void init(ConnectionSettings connectionSettings) {
        this._decoder = new ClientDecoder(this._protocolSession.getMethodProcessor());
        this._protocolSession.init(connectionSettings);
    }

    public long getDefaultTimeout() {
        return this.DEFAULT_SYNC_TIMEOUT;
    }
}
