/*
 * Decompiled with CFR 0.152.
 */
package com.solacesystems.jcsmp.impl;

import com.solacesystems.jcsmp.InvalidOperationException;
import com.solacesystems.jcsmp.JCSMPException;
import com.solacesystems.jcsmp.JCSMPProducerEventHandler;
import com.solacesystems.jcsmp.JCSMPProperties;
import com.solacesystems.jcsmp.JCSMPStreamingPublishEventHandler;
import com.solacesystems.jcsmp.ProducerEvent;
import com.solacesystems.jcsmp.ProducerFlowProperties;
import com.solacesystems.jcsmp.impl.ADManager;
import com.solacesystems.jcsmp.impl.ContextImpl;
import com.solacesystems.jcsmp.impl.JCSMPXMLMessage;
import com.solacesystems.jcsmp.impl.JCSMPXMLMessageProducer;
import com.solacesystems.jcsmp.impl.MsgIdInfo;
import com.solacesystems.jcsmp.impl.TransactionSizeExceededException;
import com.solacesystems.jcsmp.impl.flow.ProducerEventArgsImpl;
import com.solacesystems.jcsmp.impl.queues.ConditionalBoundedMessageQueue;
import com.solacesystems.jcsmp.impl.queues.ProcessElementsTask;
import com.solacesystems.jcsmp.impl.timers.PubRetransmitTimedTask;
import com.solacesystems.jcsmp.protocol.SeqNumAllocator;
import com.solacesystems.jcsmp.protocol.impl.SeqNum63bAllocator;
import com.solacesystems.jcsmp.protocol.nio.impl.ProducerErrorNotification;
import com.solacesystems.jcsmp.protocol.nio.impl.ProducerResponseNotification;
import com.solacesystems.jcsmp.statistics.StatType;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.Callable;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class PubADManager
extends ADManager {
    protected static int instanceCount = 0;
    public int pub_Ack_Time = -1;
    public int pub_Ack_Window_Size = -1;
    public int configured_Pub_Ack_Window_Size = -1;
    public int max_Resends = -1;
    public boolean rtr_Windowed_Ack = true;
    public String ack_Event_Mode = "SUPPORTED_ACK_EVENT_MODE_PER_MSG";
    public volatile long flow_Id = -1L;
    public volatile long pub_Id = -1L;
    public String flow_Name = null;
    private long _dbg_lastMsgIdAck = 0L;
    public volatile SeqNumAllocator idAllocator = new SeqNum63bAllocator("PubFlow");
    private static final Log Trace = LogFactory.getLog(PubADManager.class);
    ConditionalBoundedMessageQueue _msgQueue;
    JCSMPXMLMessageProducer _producer;
    PubRetransmitTimedTask _pubAckTask;

    public PubADManager(JCSMPXMLMessageProducer producer, ContextImpl context) {
        super(context);
        this._producer = producer;
        this.idAllocator.getNext63b();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void reInit() {
        super.reInit();
        if (Trace.isDebugEnabled()) {
            Trace.debug((Object)"PUBADMGR_ReInit");
        }
        Object object = this._ackTimerLock;
        synchronized (object) {
            if (this._ackTimer != null) {
                this.clearADTimer();
            }
        }
        this._pubAckTask = new PubRetransmitTimedTask(this);
    }

    public JCSMPXMLMessageProducer getMessageProducer() {
        return this._producer;
    }

    public void setPub_Ack_Window_Size(int pub_Ack_Window_Size) {
        this.pub_Ack_Window_Size = this._producer.isTransacted() && !this._producer.getTransactedSession().isXA() ? 256 : pub_Ack_Window_Size;
    }

    public void resetAdFlow() {
        this.flow_Name = null;
        this.startMessageId = 0L;
        this.lastMessageIdSent = 0L;
        this.lastMessageIdAcked = 0L;
    }

    public int getPub_Ack_Window_Size() {
        return this.pub_Ack_Window_Size;
    }

    public void setRtr_Windowed_Ack(boolean rtr_Windowed_Ack) {
        this.rtr_Windowed_Ack = rtr_Windowed_Ack;
    }

    public boolean isRtr_Windowed_Ack() {
        return this.rtr_Windowed_Ack;
    }

    public long setMessageIdParamsOnPubMessage(JCSMPXMLMessage message) {
        long msgId_64bit = this.idAllocator.getNext63b();
        message.setMessageIdLong(msgId_64bit);
        message.setNewMsgIdRequired(false);
        message.setPrevMessageId(this.lastMessageIdSent);
        return msgId_64bit;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void renumberMessageIdParamsOnPubMessages(long respLastIdAcked) {
        int numMsgs = 0;
        if (Trace.isDebugEnabled()) {
            Trace.debug((Object)String.format("AD pub flow message renumbering: flowId=" + this.flow_Id + "; respLastIdAcked=" + respLastIdAcked, new Object[0]));
        }
        try {
            if (this._msgQueue != null && (numMsgs = this._msgQueue.msgIdRenumbering(respLastIdAcked)) > 0) {
                JCSMPProducerEventHandler eventHandler = this._producer.getProducerEventHandler();
                String infoStr = "Unknown Publisher Flow (flowId=" + this.flow_Id + ") recovered: " + numMsgs + " messages renumbered and resent (lastMessageIdSent =" + respLastIdAcked + ")";
                if (eventHandler != null) {
                    ProducerEventArgsImpl event = new ProducerEventArgsImpl(ProducerEvent.REPUBLISH_UNACKED_MESSAGES, infoStr, null, 0, numMsgs);
                    eventHandler.handleEvent(event);
                }
                if (Trace.isInfoEnabled()) {
                    Trace.info((Object)infoStr);
                }
            }
        }
        finally {
            this.lastMessageIdSent = respLastIdAcked + (long)numMsgs;
            this.idAllocator.setToNoCheck(respLastIdAcked + (long)numMsgs + 1L);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void startADTimer() {
        if (!(this._producer.getTransactedSession() == null || this._producer.getTransactedSession().isXA() && this._producer.getTransactedSession().getExpectsAcks())) {
            return;
        }
        this.validateIsInitialized();
        String logmsg = null;
        if (Trace.isDebugEnabled()) {
            logmsg = "Starting pub ad timer: ";
        }
        Object object = this._ackTimerLock;
        synchronized (object) {
            if (this._ackTimer == null || !this._ackTimer.isActive()) {
                this._ackTimer = this._ackTimer == null ? this._timerQueue.schedule_relative(this.pub_Ack_Time, this._pubAckTask) : this._timerQueue.schedule_relative(this.pub_Ack_Time, this._pubAckTask, this._ackTimer);
                if (Trace.isDebugEnabled()) {
                    logmsg = logmsg + "scheduled new timer in " + this.pub_Ack_Time;
                }
            } else if (Trace.isDebugEnabled()) {
                logmsg = logmsg + "already scheduled in " + (this._ackTimer.getTimeout() - System.currentTimeMillis()) + ", do nothing";
            }
        }
        if (Trace.isDebugEnabled()) {
            Trace.debug((Object)logmsg);
        }
    }

    public void initMessageQueue() {
        this.validateIsInitialized();
        if (this._msgQueue == null) {
            if (Trace.isDebugEnabled()) {
                Trace.debug((Object)String.format("Init message queue: size=%s", this.pub_Ack_Window_Size));
            }
            Callable<Object> actionOnEnqueueOnClose = this._producer.isTransacted() && !this._producer.getTransactedSession().isXA() ? new Callable<Object>(){

                @Override
                public Object call() throws Exception {
                    throw new TransactionSizeExceededException("Too many messages in transaction.");
                }
            } : null;
            this._msgQueue = new ConditionalBoundedMessageQueue(this.pub_Ack_Window_Size, actionOnEnqueueOnClose);
        }
        this._msgQueue.activate();
    }

    public void clearMessageQueue() {
        this.validateIsInitialized();
        if (this._msgQueue != null) {
            this._msgQueue.deactivate();
            ReturnUnackedMgsToPoolTask task = new ReturnUnackedMgsToPoolTask(this._msgQueue);
            try {
                int count = this._msgQueue.processElements(task);
                if (Trace.isDebugEnabled()) {
                    Trace.debug((Object)String.format("Return %s AD messages to pool", count));
                }
            }
            catch (JCSMPException e) {
                Trace.warn((Object)"Unexpected exception occurred while returning AD msgs to pool", (Throwable)((Object)e));
            }
        }
    }

    public void suspendMsgQueue() {
        this.validateIsInitialized();
        if (this._msgQueue != null) {
            this._msgQueue.suspend();
        }
    }

    public void resumeMsgQueue() {
        this.validateIsInitialized();
        if (this._msgQueue != null) {
            this._msgQueue.resume();
        }
    }

    public boolean processWindowedAck(long msgId) throws JCSMPException {
        this.validateIsInitialized();
        if (msgId < 0L) {
            return false;
        }
        this._dbg_lastMsgIdAck = msgId;
        if (Trace.isDebugEnabled()) {
            Trace.debug((Object)String.format("Processing windowed ack ackid=%s, ackEventCode=%s", msgId, this.ack_Event_Mode));
        }
        JCSMPStreamingPublishEventHandler cbHandler = null;
        try {
            cbHandler = this._producer.getStreamingCallbackHandler();
        }
        catch (InvalidOperationException e) {
            // empty catch block
        }
        ProcessWindowedAckTask ackTask = new ProcessWindowedAckTask(this._msgQueue, this._producer, cbHandler, msgId, this.ack_Event_Mode);
        int count = this._msgQueue.processElements(ackTask);
        return count > 0;
    }

    public boolean processWindowedAckError(long msgId, JCSMPException e) throws JCSMPException {
        this.validateIsInitialized();
        if (Trace.isDebugEnabled()) {
            Trace.debug((Object)String.format("Processing windowed error ackid=%s, ackEventCode=%s", msgId, this.ack_Event_Mode));
        }
        this.processWindowedAck(msgId - 1L);
        JCSMPStreamingPublishEventHandler cbHandler = null;
        try {
            cbHandler = this._producer.getStreamingCallbackHandler();
        }
        catch (InvalidOperationException ioe) {
            // empty catch block
        }
        ProcessWindowedAckErrorTask ackTask = new ProcessWindowedAckErrorTask(this._msgQueue, this._producer, cbHandler, msgId, e);
        int count = this._msgQueue.processElements(ackTask);
        return count > 0;
    }

    public boolean transactedProcessWindowedAckError(long msgId, JCSMPException e) throws JCSMPException {
        this.validateIsInitialized();
        if (Trace.isDebugEnabled()) {
            Trace.debug((Object)String.format("Processing windowed error ackid=%s, ackEventCode=%s", msgId, this.ack_Event_Mode));
        }
        this.processWindowedAck(msgId - 1L);
        ProcessTransactedWindowedAckErrorTask ackTask = new ProcessTransactedWindowedAckErrorTask(this._msgQueue, this.lastMessageIdSent);
        int count = this._msgQueue.processElements(ackTask);
        if (Trace.isDebugEnabled()) {
            Trace.debug((Object)("Message queue is empty: " + this._msgQueue.isEmpty()));
        }
        return count > 0;
    }

    public void doRetransmitNow() {
        this.validateIsInitialized();
        this._producer.handleRetransmitADMsgs(null, false);
    }

    public void enqueueMsgWithIdUpdate(JCSMPXMLMessage msg) throws InvalidOperationException {
        this.validateIsInitialized();
        try {
            this._msgQueue.queueMsgWithIdUpdate(msg, this);
        }
        catch (InterruptedException e) {
            Trace.warn((Object)e);
        }
    }

    public boolean isQueueFull() {
        this.validateIsInitialized();
        return this._msgQueue.isFull();
    }

    public boolean isQueueEmpty() {
        this.validateIsInitialized();
        return this._msgQueue.isEmpty();
    }

    public void waitUntilQueueEmpty() throws InterruptedException {
        this.validateIsInitialized();
        this._msgQueue.waitUntilEmpty();
    }

    public int getQueueUsedSize() {
        this.validateIsInitialized();
        return this._msgQueue.size();
    }

    public void copyUnackedADMsgs(LinkedList<JCSMPXMLMessage> adMsgs) {
        if (this._msgQueue != null) {
            int n = this._msgQueue.copyTo(adMsgs);
            if (Trace.isDebugEnabled()) {
                Trace.debug((Object)String.format("Copied %s AD messages from unacked list", n));
            }
        }
    }

    public static PubADManager getNewADManager(JCSMPXMLMessageProducer producer, JCSMPProperties props, ContextImpl context, ProducerFlowProperties fprop) {
        PubADManager set = new PubADManager(producer, context);
        Integer iPubAckTime = props.getIntegerProperty("pub_ack_time");
        Integer iPubAckWindowSz = fprop.getWindowSize();
        Integer iMaxResends = props.getIntegerProperty("max_resends");
        assert (iPubAckTime != null && iPubAckWindowSz != null && iMaxResends != null) : "Property error: missing windowed ack property.";
        set.pub_Ack_Time = iPubAckTime;
        set.configured_Pub_Ack_Window_Size = iPubAckWindowSz;
        set.rtr_Windowed_Ack = fprop.isRtrWindowedAck();
        set.max_Resends = iMaxResends;
        set.ack_Event_Mode = fprop.getAckEventMode();
        Trace.debug((Object)("Created Pub AD Manager: " + set.toString()));
        return set;
    }

    @Override
    public String toString() {
        String s = String.format("Pub_Ack_Time=%s  Pub_Ack_Window_Size=%s  Max_Resends=%s Ack_Event_Mode=%s", this.pub_Ack_Time, this.pub_Ack_Window_Size, this.max_Resends, this.ack_Event_Mode);
        return s;
    }

    public static class ReturnUnackedMgsToPoolTask
    implements ProcessElementsTask {
        private ConditionalBoundedMessageQueue queue;

        public ReturnUnackedMgsToPoolTask(ConditionalBoundedMessageQueue queue) {
            this.queue = queue;
        }

        public ConditionalBoundedMessageQueue getQueueToProcess() {
            return this.queue;
        }

        public int process() throws JCSMPException {
            int count = 0;
            while (!this.queue.isEmpty()) {
                JCSMPXMLMessage msg = this.queue.poll();
                if (msg == null) continue;
                msg.returnMessageToPool();
                ++count;
            }
            return count;
        }
    }

    public class ProcessTransactedWindowedAckErrorTask
    implements ProcessElementsTask {
        private ConditionalBoundedMessageQueue queue;
        private long msgIdToAck = -1L;

        public ProcessTransactedWindowedAckErrorTask(ConditionalBoundedMessageQueue queue, long msgIdToAck) {
            this.queue = queue;
            this.msgIdToAck = msgIdToAck;
        }

        public ConditionalBoundedMessageQueue getQueueToProcess() {
            return this.queue;
        }

        public int process() throws JCSMPException {
            JCSMPXMLMessage msg;
            long curMsgId;
            int count = 0;
            Iterator<JCSMPXMLMessage> iter = this.queue.iterator();
            while (iter.hasNext() && (curMsgId = (msg = iter.next()).getMessageIdLong()) != -1L && curMsgId <= this.msgIdToAck) {
                iter.remove();
                if (!msg.isRetransmitting()) {
                    msg.callout_ad_release_opportunity();
                } else {
                    msg.setSafeToRelease(true);
                }
                ++count;
            }
            return count;
        }
    }

    public class ProcessWindowedAckErrorTask
    implements ProcessElementsTask {
        private ConditionalBoundedMessageQueue queue;
        private JCSMPXMLMessageProducer producer;
        private JCSMPStreamingPublishEventHandler spHandler;
        private long msgIdToAck = -1L;
        private JCSMPException e;

        public ProcessWindowedAckErrorTask(ConditionalBoundedMessageQueue queue, JCSMPXMLMessageProducer producer, JCSMPStreamingPublishEventHandler spHandler, long msgIdToAck, JCSMPException e) {
            this.queue = queue;
            this.producer = producer;
            this.spHandler = spHandler;
            this.msgIdToAck = msgIdToAck;
            this.e = e;
        }

        public ConditionalBoundedMessageQueue getQueueToProcess() {
            return this.queue;
        }

        public int process() throws JCSMPException {
            JCSMPXMLMessage msg = this.queue.peek();
            long curMsgId = -1L;
            if (msg != null && (curMsgId = msg.getMessageIdLong()) != -1L) {
                if (curMsgId == this.msgIdToAck) {
                    Object corrKey = msg.getCorrelationKey();
                    this.queue.remove(msg);
                    this.producer.getSessionStats().incStat(StatType.RELIABLE_MSGS_SENT_CONFIRMED);
                    if (Trace.isDebugEnabled()) {
                        Trace.debug((Object)String.format("Windowed error for: msg=%s", curMsgId));
                    }
                    if (!msg.isRetransmitting()) {
                        msg.callout_ad_release_opportunity();
                    } else {
                        msg.setSafeToRelease(true);
                    }
                    if (this.spHandler != null) {
                        if (this.producer.getTransactedSession() == null) {
                            PubADManager.this.context.getProducerDispatcher().enqueueNotification(new ProducerErrorNotification(this.spHandler, new MsgIdInfo(this.msgIdToAck, corrKey), this.e, System.currentTimeMillis(), this.producer, true));
                        }
                    } else {
                        throw this.e;
                    }
                }
                return 1;
            }
            return 0;
        }
    }

    public class ProcessWindowedAckTask
    implements ProcessElementsTask {
        private ConditionalBoundedMessageQueue queue;
        private JCSMPXMLMessageProducer producer;
        private JCSMPStreamingPublishEventHandler spHandler;
        private long msgIdToAck = -1L;
        private String ackEventMode;

        public ProcessWindowedAckTask(ConditionalBoundedMessageQueue queue, JCSMPXMLMessageProducer producer, JCSMPStreamingPublishEventHandler spHandler, long msgIdToAck, String ackEventMode) {
            this.queue = queue;
            this.producer = producer;
            this.spHandler = spHandler;
            this.msgIdToAck = msgIdToAck;
            this.ackEventMode = ackEventMode;
        }

        public ConditionalBoundedMessageQueue getQueueToProcess() {
            return this.queue;
        }

        public int process() throws JCSMPException {
            JCSMPXMLMessage msg;
            long curMsgId;
            int count = 0;
            if (Trace.isDebugEnabled()) {
                Trace.debug((Object)String.format("Windowed ack for: msg=%s", this.msgIdToAck));
            }
            ProducerResponseNotification notif = new ProducerResponseNotification(this.spHandler, this.producer);
            Iterator<JCSMPXMLMessage> iter = this.queue.iterator();
            while (iter.hasNext() && (curMsgId = (msg = iter.next()).getMessageIdLong()) != -1L && curMsgId <= this.msgIdToAck) {
                iter.remove();
                if (this.spHandler != null) {
                    if (this.ackEventMode.equals("SUPPORTED_ACK_EVENT_MODE_WINDOWED")) {
                        if (curMsgId == this.msgIdToAck) {
                            notif.addMsgInfo(new MsgIdInfo(curMsgId, msg.getCorrelationKey()));
                        }
                    } else {
                        notif.addMsgInfo(new MsgIdInfo(curMsgId, msg.getCorrelationKey()));
                    }
                }
                if (!msg.isRetransmitting()) {
                    msg.callout_ad_release_opportunity();
                } else {
                    msg.setSafeToRelease(true);
                }
                ++count;
            }
            if (this.spHandler != null && this.producer.getTransactedSession() == null) {
                PubADManager.this.context.getProducerDispatcher().enqueueNotification(notif);
            }
            this.producer.getSessionStats().incStat(StatType.RELIABLE_MSGS_SENT_CONFIRMED, count);
            return count;
        }
    }
}

