/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.shaded.org.jgroups.protocols;

import java.io.DataInput;
import java.io.DataOutput;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.activemq.artemis.shaded.org.jgroups.Address;
import org.apache.activemq.artemis.shaded.org.jgroups.Event;
import org.apache.activemq.artemis.shaded.org.jgroups.Header;
import org.apache.activemq.artemis.shaded.org.jgroups.Message;
import org.apache.activemq.artemis.shaded.org.jgroups.View;
import org.apache.activemq.artemis.shaded.org.jgroups.annotations.MBean;
import org.apache.activemq.artemis.shaded.org.jgroups.annotations.ManagedAttribute;
import org.apache.activemq.artemis.shaded.org.jgroups.annotations.ManagedOperation;
import org.apache.activemq.artemis.shaded.org.jgroups.annotations.Property;
import org.apache.activemq.artemis.shaded.org.jgroups.conf.PropertyConverters;
import org.apache.activemq.artemis.shaded.org.jgroups.stack.Protocol;
import org.apache.activemq.artemis.shaded.org.jgroups.util.AgeOutCache;
import org.apache.activemq.artemis.shaded.org.jgroups.util.Bits;
import org.apache.activemq.artemis.shaded.org.jgroups.util.Filter;
import org.apache.activemq.artemis.shaded.org.jgroups.util.MessageBatch;
import org.apache.activemq.artemis.shaded.org.jgroups.util.SeqnoList;
import org.apache.activemq.artemis.shaded.org.jgroups.util.Table;
import org.apache.activemq.artemis.shaded.org.jgroups.util.TimeScheduler;
import org.apache.activemq.artemis.shaded.org.jgroups.util.Util;

@Deprecated
@MBean(description="Reliable unicast layer")
public class UNICAST2
extends Protocol
implements AgeOutCache.Handler<Address> {
    public static final long DEFAULT_FIRST_SEQNO = 1L;
    @Deprecated
    protected int[] timeout = new int[]{400, 800, 1600, 3200};
    @Deprecated
    @Property(description="The first value (in milliseconds) to use in the exponential backoff. Enabled if greater than 0", deprecatedMessage="Not used anymore")
    protected int exponential_backoff = 300;
    @Property(description="Max number of messages to be removed from a NakReceiverWindow. This property might get removed anytime, so don't use it !")
    protected int max_msg_batch_size = 500;
    @Property(description="Max number of bytes before a stability message is sent to the sender")
    protected long max_bytes = 10000000L;
    @Property(description="Max number of milliseconds before a stability message is sent to the sender(s)")
    protected long stable_interval = 60000L;
    @Property(description="Max number of STABLE messages sent for the same highest_received seqno. A value < 1 is invalid")
    protected int max_stable_msgs = 5;
    @Property(description="Number of rows of the matrix in the retransmission table (only for experts)", writable=false)
    protected int xmit_table_num_rows = 100;
    @Property(description="Number of elements of a row of the matrix in the retransmission table (only for experts). The capacity of the matrix is xmit_table_num_rows * xmit_table_msgs_per_row", writable=false)
    protected int xmit_table_msgs_per_row = 2000;
    @Property(description="Resize factor of the matrix in the retransmission table (only for experts)", writable=false)
    protected double xmit_table_resize_factor = 1.2;
    @Property(description="Number of milliseconds after which the matrix in the retransmission table is compacted (only for experts)", writable=false)
    protected long xmit_table_max_compaction_time = 600000L;
    @Deprecated
    @Property(description="If enabled, the removal of a message from the retransmission table causes an automatic purge (only for experts)", writable=false, deprecatedMessage="not used anymore")
    protected boolean xmit_table_automatic_purging = true;
    @Property(description="Whether to use the old retransmitter which retransmits individual messages or the new one which uses ranges of retransmitted messages. Default is true. Note that this property will be removed in 3.0; it is only used to switch back to the old (and proven) retransmitter mechanism if issues occur")
    protected boolean use_range_based_retransmitter = true;
    @Property(description="If true, trashes warnings about retransmission messages not found in the xmit_table (used for testing)")
    protected boolean log_not_found_msgs = true;
    @Property(description="Time (in milliseconds) after which an idle incoming or outgoing connection is closed. The connection will get re-established when used again. 0 disables connection reaping")
    protected long conn_expiry_timeout = 0L;
    protected long max_retransmit_time = 60000L;
    @Property(description="Interval (in milliseconds) at which missing messages (from all retransmit buffers) are retransmitted")
    protected long xmit_interval = 1000L;
    @ManagedAttribute(description="Number of messages sent")
    protected int num_messages_sent = 0;
    @ManagedAttribute(description="Number of messages received")
    protected int num_messages_received = 0;
    protected final ConcurrentMap<Address, SenderEntry> send_table = Util.createConcurrentMap();
    protected final ConcurrentMap<Address, ReceiverEntry> recv_table = Util.createConcurrentMap();
    protected Future<?> xmit_task;
    protected final Map<Address, Long> xmit_task_map = new HashMap<Address, Long>();
    protected final ReentrantLock recv_table_lock = new ReentrantLock();
    protected volatile List<Address> members = new ArrayList<Address>(11);
    protected Address local_addr = null;
    protected TimeScheduler timer = null;
    protected volatile boolean running = false;
    protected short last_conn_id = 0;
    protected AgeOutCache<Address> cache = null;
    protected Future<?> stable_task_future = null;
    protected Future<?> connection_reaper;
    protected static final Filter<Message> dont_loopback_filter = new Filter<Message>(){

        @Override
        public boolean accept(Message msg) {
            return msg != null && msg.isTransientFlagSet(Message.TransientFlag.DONT_LOOPBACK);
        }
    };
    @ManagedAttribute(description="Number of retransmit requests received")
    protected final AtomicLong xmit_reqs_received = new AtomicLong(0L);
    @ManagedAttribute(description="Number of retransmit requests sent")
    protected final AtomicLong xmit_reqs_sent = new AtomicLong(0L);
    @ManagedAttribute(description="Number of retransmit responses sent")
    protected final AtomicLong xmit_rsps_sent = new AtomicLong(0L);

    @Deprecated
    public int[] getTimeout() {
        return this.timeout;
    }

    @Deprecated
    @Property(name="timeout", converter=PropertyConverters.IntegerArray.class, description="list of timeouts", deprecatedMessage="not used anymore")
    public void setTimeout(int[] val) {
    }

    public void setMaxMessageBatchSize(int size) {
        if (size >= 1) {
            this.max_msg_batch_size = size;
        }
    }

    @ManagedAttribute
    public String getLocalAddress() {
        return this.local_addr != null ? this.local_addr.toString() : "null";
    }

    @ManagedAttribute
    public String getMembers() {
        return Util.printListWithDelimiter(this.members, ",");
    }

    @ManagedAttribute(description="Returns the number of outgoing (send) connections")
    public int getNumSendConnections() {
        return this.send_table.size();
    }

    @ManagedAttribute(description="Returns the number of incoming (receive) connections")
    public int getNumReceiveConnections() {
        return this.recv_table.size();
    }

    @ManagedAttribute(description="Returns the total number of outgoing (send) and incoming (receive) connections")
    public int getNumConnections() {
        return this.getNumReceiveConnections() + this.getNumSendConnections();
    }

    @ManagedOperation
    public String printConnections() {
        StringBuilder sb = new StringBuilder();
        if (!this.send_table.isEmpty()) {
            sb.append("send connections:\n");
            for (Map.Entry entry : this.send_table.entrySet()) {
                sb.append(entry.getKey()).append(": ").append(entry.getValue()).append("\n");
            }
        }
        if (!this.recv_table.isEmpty()) {
            sb.append("\nreceive connections:\n");
            for (Map.Entry entry : this.recv_table.entrySet()) {
                sb.append(entry.getKey()).append(": ").append(entry.getValue()).append("\n");
            }
        }
        return sb.toString();
    }

    public boolean connectionEstablished(Address target) {
        SenderEntry entry = target != null ? (SenderEntry)this.send_table.get(target) : null;
        return entry != null && entry.connEstablished();
    }

    @ManagedAttribute(description="Whether the ConnectionReaper task is running")
    public boolean isConnectionReaperRunning() {
        return this.connection_reaper != null && !this.connection_reaper.isDone();
    }

    @ManagedAttribute(description="Total number of undelivered messages in all receive windows")
    public long getXmitTableUndeliveredMessages() {
        long retval = 0L;
        for (ReceiverEntry entry : this.recv_table.values()) {
            if (entry.received_msgs == null) continue;
            retval += (long)entry.received_msgs.size();
        }
        return retval;
    }

    @ManagedAttribute(description="Total number of missing messages in all receive windows")
    public long getXmitTableMissingMessages() {
        long retval = 0L;
        for (ReceiverEntry entry : this.recv_table.values()) {
            if (entry.received_msgs == null) continue;
            retval += (long)entry.received_msgs.getNumMissing();
        }
        return retval;
    }

    @ManagedAttribute(description="Number of compactions in all (receive and send) windows")
    public int getXmitTableNumCompactions() {
        int retval = 0;
        for (Object entry : this.recv_table.values()) {
            if (((ReceiverEntry)entry).received_msgs == null) continue;
            retval += ((ReceiverEntry)entry).received_msgs.getNumCompactions();
        }
        for (Object entry : this.send_table.values()) {
            if (((SenderEntry)entry).sent_msgs == null) continue;
            retval += ((SenderEntry)entry).sent_msgs.getNumCompactions();
        }
        return retval;
    }

    @ManagedAttribute(description="Number of moves in all (receive and send) windows")
    public int getXmitTableNumMoves() {
        int retval = 0;
        for (Object entry : this.recv_table.values()) {
            if (((ReceiverEntry)entry).received_msgs == null) continue;
            retval += ((ReceiverEntry)entry).received_msgs.getNumMoves();
        }
        for (Object entry : this.send_table.values()) {
            if (((SenderEntry)entry).sent_msgs == null) continue;
            retval += ((SenderEntry)entry).sent_msgs.getNumMoves();
        }
        return retval;
    }

    @ManagedAttribute(description="Number of resizes in all (receive and send) windows")
    public int getXmitTableNumResizes() {
        int retval = 0;
        for (Object entry : this.recv_table.values()) {
            if (((ReceiverEntry)entry).received_msgs == null) continue;
            retval += ((ReceiverEntry)entry).received_msgs.getNumResizes();
        }
        for (Object entry : this.send_table.values()) {
            if (((SenderEntry)entry).sent_msgs == null) continue;
            retval += ((SenderEntry)entry).sent_msgs.getNumResizes();
        }
        return retval;
    }

    @ManagedAttribute(description="Number of purges in all (receive and send) windows")
    public int getXmitTableNumPurges() {
        int retval = 0;
        for (Object entry : this.recv_table.values()) {
            if (((ReceiverEntry)entry).received_msgs == null) continue;
            retval += ((ReceiverEntry)entry).received_msgs.getNumPurges();
        }
        for (Object entry : this.send_table.values()) {
            if (((SenderEntry)entry).sent_msgs == null) continue;
            retval += ((SenderEntry)entry).sent_msgs.getNumPurges();
        }
        return retval;
    }

    @ManagedOperation(description="Prints the contents of the receive windows for all members")
    public String printReceiveWindowMessages() {
        StringBuilder ret = new StringBuilder(this.local_addr + ":\n");
        for (Map.Entry entry : this.recv_table.entrySet()) {
            Address addr = (Address)entry.getKey();
            Table<Message> buf = ((ReceiverEntry)entry.getValue()).received_msgs;
            ret.append(addr).append(": ").append(buf.toString()).append('\n');
        }
        return ret.toString();
    }

    @ManagedOperation(description="Prints the contents of the send windows for all members")
    public String printSendWindowMessages() {
        StringBuilder ret = new StringBuilder(this.local_addr + ":\n");
        for (Map.Entry entry : this.send_table.entrySet()) {
            Address addr = (Address)entry.getKey();
            Table<Message> buf = ((SenderEntry)entry.getValue()).sent_msgs;
            ret.append(addr).append(": ").append(buf.toString()).append('\n');
        }
        return ret.toString();
    }

    @ManagedAttribute(description="Is the retransmit task running")
    public boolean isXmitTaskRunning() {
        return this.xmit_task != null && !this.xmit_task.isDone();
    }

    public long getMaxRetransmitTime() {
        return this.max_retransmit_time;
    }

    @Property(description="Max number of milliseconds we try to retransmit a message to any given member. After that, the connection is removed. Any new connection to that member will start with seqno #1 again. 0 disables this")
    public void setMaxRetransmitTime(long max_retransmit_time) {
        this.max_retransmit_time = max_retransmit_time;
        if (this.cache != null && max_retransmit_time > 0L) {
            this.cache.setTimeout(max_retransmit_time);
        }
    }

    @ManagedAttribute
    public int getAgeOutCacheSize() {
        return this.cache != null ? this.cache.size() : 0;
    }

    @ManagedOperation
    public String printAgeOutCache() {
        return this.cache != null ? this.cache.toString() : "n/a";
    }

    public AgeOutCache<Address> getAgeOutCache() {
        return this.cache;
    }

    public boolean hasSendConnectionTo(Address dest) {
        return this.send_table.containsKey(dest);
    }

    @Override
    public void resetStats() {
        this.num_messages_received = 0;
        this.num_messages_sent = 0;
        this.xmit_reqs_received.set(0L);
        this.xmit_reqs_sent.set(0L);
        this.xmit_rsps_sent.set(0L);
    }

    public TimeScheduler getTimer() {
        return this.timer;
    }

    public void setTimer(TimeScheduler timer) {
        this.timer = timer;
    }

    @Override
    public void init() throws Exception {
        super.init();
        if (this.max_stable_msgs < 1) {
            throw new IllegalArgumentException("max_stable_msgs ( " + this.max_stable_msgs + ") must be > 0");
        }
        if (this.max_bytes <= 0L) {
            throw new IllegalArgumentException("max_bytes has to be > 0");
        }
    }

    @Override
    public void start() throws Exception {
        this.timer = this.getTransport().getTimer();
        if (this.timer == null) {
            throw new Exception("timer is null");
        }
        if (this.max_retransmit_time > 0L) {
            this.cache = new AgeOutCache(this.timer, this.max_retransmit_time, this);
        }
        this.running = true;
        if (this.stable_interval > 0L) {
            this.startStableTask();
        }
        if (this.conn_expiry_timeout > 0L) {
            this.startConnectionReaper();
        }
        this.startRetransmitTask();
    }

    @Override
    public void stop() {
        this.running = false;
        this.stopStableTask();
        this.stopConnectionReaper();
        this.stopRetransmitTask();
        this.xmit_task_map.clear();
        this.removeAllConnections();
    }

    @Override
    public Object up(Event evt) {
        switch (evt.getType()) {
            case 1: {
                Unicast2Header hdr;
                Message msg = (Message)evt.getArg();
                if (msg.getDest() == null || msg.isFlagSet(Message.Flag.NO_RELIABILITY) || (hdr = (Unicast2Header)msg.getHeader(this.id)) == null) break;
                Address sender = msg.getSrc();
                switch (hdr.type) {
                    case 0: {
                        if (this.handleDataReceived(sender, hdr.seqno, hdr.conn_id, hdr.first, msg, evt) && hdr.first) {
                            this.sendAck(sender, hdr.seqno, hdr.conn_id);
                        }
                        return null;
                    }
                    case 1: {
                        try {
                            SeqnoList seqnos = Util.streamableFromBuffer(SeqnoList.class, msg.getRawBuffer(), msg.getOffset(), msg.getLength());
                            this.handleXmitRequest(sender, seqnos);
                        }
                        catch (Exception e) {
                            this.log.error("failed deserializing retransmission list", e);
                        }
                        break;
                    }
                    case 2: {
                        this.handleResendingOfFirstMessage(sender, hdr.seqno);
                        break;
                    }
                    case 4: {
                        SenderEntry entry;
                        if (this.log.isTraceEnabled()) {
                            this.log.trace(this.local_addr + " <-- ACK(" + sender + "," + hdr.seqno + " [conn_id=" + hdr.conn_id + "])");
                        }
                        if ((entry = (SenderEntry)this.send_table.get(msg.getSrc())) == null) break;
                        if (entry.send_conn_id != hdr.conn_id) {
                            if (!this.log.isTraceEnabled()) break;
                            this.log.trace(this.local_addr + ": ACK from " + sender + " is discarded as the connection IDs don't match: " + "my conn-id=" + entry.send_conn_id + ", hdr.conn_id=" + hdr.conn_id);
                            break;
                        }
                        entry.connEstablished(true);
                        break;
                    }
                    case 3: {
                        this.stable(msg.getSrc(), hdr.conn_id, hdr.seqno, hdr.high_seqno);
                        break;
                    }
                    default: {
                        this.log.error(Util.getMessage("UnicastHeaderType"), hdr.type);
                    }
                }
                return null;
            }
        }
        return this.up_prot.up(evt);
    }

    @Override
    public void up(MessageBatch batch) {
        if (batch.dest() == null) {
            this.up_prot.up(batch);
            return;
        }
        int size = batch.size();
        TreeMap<Short, List<Message>> msgs = new TreeMap<Short, List<Message>>();
        for (Message msg : batch) {
            Unicast2Header hdr;
            if (msg == null || msg.isFlagSet(Message.Flag.NO_RELIABILITY) || (hdr = (Unicast2Header)msg.getHeader(this.id)) == null) continue;
            batch.remove(msg);
            if (hdr.type != 0) {
                this.up(new Event(1, msg));
                continue;
            }
            ArrayList<Message> list = (ArrayList<Message>)msgs.get(hdr.conn_id);
            if (list == null) {
                list = new ArrayList<Message>(size);
                msgs.put(hdr.conn_id, list);
            }
            list.add(msg);
        }
        if (!msgs.isEmpty()) {
            this.handleBatchReceived(batch.sender(), msgs);
        }
        if (!batch.isEmpty()) {
            this.up_prot.up(batch);
        }
    }

    @Override
    public Object down(Event evt) {
        switch (evt.getType()) {
            case 1: {
                Message msg = (Message)evt.getArg();
                Address dst = msg.getDest();
                if (dst == null || msg.isFlagSet(Message.Flag.NO_RELIABILITY)) break;
                if (!this.running) {
                    if (this.log.isTraceEnabled()) {
                        this.log.trace(this.local_addr + ": discarded message as start() has not yet been called, message: " + msg);
                    }
                    return null;
                }
                SenderEntry entry = (SenderEntry)this.send_table.get(dst);
                if (entry == null) {
                    entry = new SenderEntry(this.getNewConnectionId());
                    SenderEntry existing = this.send_table.putIfAbsent(dst, entry);
                    if (existing != null) {
                        entry = existing;
                    } else {
                        if (this.log.isTraceEnabled()) {
                            this.log.trace(this.local_addr + ": created connection to " + dst + " (conn_id=" + entry.send_conn_id + ")");
                        }
                        if (this.cache != null && !this.members.contains(dst)) {
                            this.cache.add(dst);
                        }
                    }
                }
                boolean dont_loopback_set = msg.isTransientFlagSet(Message.TransientFlag.DONT_LOOPBACK) && dst.equals(this.local_addr);
                short send_conn_id = entry.send_conn_id;
                long seqno = entry.sent_msgs_seqno.getAndIncrement();
                long sleep = 10L;
                while (true) {
                    try {
                        msg.putHeader(this.id, Unicast2Header.createDataHeader(seqno, send_conn_id, seqno == 1L));
                        entry.sent_msgs.add(seqno, msg, dont_loopback_set ? dont_loopback_filter : null);
                        if (this.conn_expiry_timeout > 0L) {
                            entry.update();
                        }
                        if (!dont_loopback_set) break;
                        entry.sent_msgs.purge(entry.sent_msgs.getHighestDeliverable());
                    }
                    catch (Throwable t) {
                        if (!this.running) break;
                        Util.sleep(sleep);
                        sleep = Math.min(5000L, sleep * 2L);
                        if (this.running) continue;
                    }
                    break;
                }
                if (this.log.isTraceEnabled()) {
                    StringBuilder sb = new StringBuilder();
                    sb.append(this.local_addr).append(" --> DATA(").append(dst).append(": #").append(seqno).append(", conn_id=").append(send_conn_id);
                    if (seqno == 1L) {
                        sb.append(", first");
                    }
                    sb.append(')');
                    this.log.trace(sb);
                }
                ++this.num_messages_sent;
                return this.down_prot.down(evt);
            }
            case 6: {
                View view = (View)evt.getArg();
                List<Address> new_members = view.getMembers();
                HashSet non_members = new HashSet(this.send_table.keySet());
                non_members.addAll(this.recv_table.keySet());
                this.members = new_members;
                non_members.removeAll(new_members);
                if (this.cache != null) {
                    this.cache.removeAll(new_members);
                }
                if (!non_members.isEmpty()) {
                    if (this.log.isTraceEnabled()) {
                        this.log.trace(this.local_addr + ": removing non members " + non_members);
                    }
                    for (Address non_mbr : non_members) {
                        this.removeConnection(non_mbr);
                    }
                }
                this.xmit_task_map.keySet().retainAll(view.getMembers());
                break;
            }
            case 8: {
                this.local_addr = (Address)evt.getArg();
            }
        }
        return this.down_prot.down(evt);
    }

    protected void stable(Address sender, short conn_id, long hd, long hr) {
        Table<Message> win;
        SenderEntry entry = (SenderEntry)this.send_table.get(sender);
        Table<Message> table = win = entry != null ? entry.sent_msgs : null;
        if (win == null) {
            return;
        }
        if (this.log.isTraceEnabled()) {
            this.log.trace(new StringBuilder().append(this.local_addr).append(" <-- STABLE(").append(sender).append(": ").append(hd).append("-").append(hr).append(", conn_id=" + conn_id) + ")");
        }
        if (entry.send_conn_id != conn_id) {
            this.log.trace(this.local_addr + ": my conn_id (" + entry.send_conn_id + ") != received conn_id (" + conn_id + "); discarding STABLE message !");
            return;
        }
        win.purge(hd, true);
        long win_hr = win.getHighestReceived();
        if (win_hr > hr) {
            for (long seqno = hr; seqno <= win_hr; ++seqno) {
                Message msg = win.get(seqno);
                if (msg == null) continue;
                this.down_prot.down(new Event(1, msg));
            }
        }
    }

    @ManagedOperation(description="Sends a STABLE message to all senders. This causes message purging and potential retransmissions from senders")
    public void sendStableMessages() {
        for (Map.Entry entry : this.recv_table.entrySet()) {
            Address dest = (Address)entry.getKey();
            ReceiverEntry val = (ReceiverEntry)entry.getValue();
            Table<Message> win = val != null ? val.received_msgs : null;
            if (win == null) continue;
            long[] tmp = win.getDigest();
            long low = tmp[0];
            long high = tmp[1];
            if (val.last_highest == high) {
                if (val.num_stable_msgs >= this.max_stable_msgs) continue;
                ++val.num_stable_msgs;
            } else {
                val.last_highest = high;
                val.num_stable_msgs = 1;
            }
            this.sendStableMessage(dest, val.recv_conn_id, low, high);
        }
    }

    protected void sendStableMessage(Address dest, short conn_id, long hd, long hr) {
        Message stable_msg = new Message(dest).setFlag(Message.Flag.OOB, Message.Flag.INTERNAL).putHeader(this.id, Unicast2Header.createStableHeader(conn_id, hd, hr));
        if (this.log.isTraceEnabled()) {
            StringBuilder sb = new StringBuilder();
            sb.append(this.local_addr).append(" --> STABLE(").append(dest).append(": ").append(hd).append("-").append(hr).append(", conn_id=").append(conn_id).append(")");
            this.log.trace(sb.toString());
        }
        this.down_prot.down(new Event(1, stable_msg));
    }

    protected void startStableTask() {
        if (this.stable_task_future == null || this.stable_task_future.isDone()) {
            Runnable stable_task = new Runnable(){

                @Override
                public void run() {
                    try {
                        UNICAST2.this.sendStableMessages();
                    }
                    catch (Throwable t) {
                        UNICAST2.this.log.error(Util.getMessage("SendingOfSTABLEMessagesFailed"), t);
                    }
                }

                public String toString() {
                    return UNICAST2.class.getSimpleName() + ": StableTask (interval=" + UNICAST2.this.stable_interval + " ms)";
                }
            };
            this.stable_task_future = this.timer.scheduleWithFixedDelay(stable_task, this.stable_interval, this.stable_interval, TimeUnit.MILLISECONDS);
            if (this.log.isTraceEnabled()) {
                this.log.trace(this.local_addr + ": stable task started (interval=" + this.stable_interval + ")");
            }
        }
    }

    protected void stopStableTask() {
        if (this.stable_task_future != null) {
            this.stable_task_future.cancel(false);
            this.stable_task_future = null;
        }
    }

    protected synchronized void startConnectionReaper() {
        if (this.connection_reaper == null || this.connection_reaper.isDone()) {
            this.connection_reaper = this.timer.scheduleWithFixedDelay(new ConnectionReaper(), this.conn_expiry_timeout, this.conn_expiry_timeout, TimeUnit.MILLISECONDS);
        }
    }

    protected synchronized void stopConnectionReaper() {
        if (this.connection_reaper != null) {
            this.connection_reaper.cancel(false);
        }
    }

    public void removeConnection(Address mbr) {
        this.removeSendConnection(mbr);
        this.removeReceiveConnection(mbr);
    }

    public void removeSendConnection(Address mbr) {
        this.send_table.remove(mbr);
    }

    public void removeReceiveConnection(Address mbr) {
        ReceiverEntry entry2 = (ReceiverEntry)this.recv_table.remove(mbr);
        if (entry2 != null) {
            Table<Message> win = entry2.received_msgs;
            if (win != null) {
                long[] digest = win.getDigest();
                this.sendStableMessage(mbr, entry2.recv_conn_id, digest[0], digest[1]);
            }
            entry2.reset();
        }
    }

    @ManagedOperation(description="Trashes all connections to other nodes. This is only used for testing")
    public void removeAllConnections() {
        this.send_table.clear();
        this.sendStableMessages();
        for (ReceiverEntry entry2 : this.recv_table.values()) {
            entry2.reset();
        }
        this.recv_table.clear();
    }

    public void retransmit(SeqnoList missing, Address sender) {
        Unicast2Header hdr = Unicast2Header.createXmitReqHeader();
        Message retransmit_msg = new Message(sender).setBuffer(Util.streamableToBuffer(missing)).setFlag(Message.Flag.OOB, Message.Flag.INTERNAL);
        if (this.log.isTraceEnabled()) {
            this.log.trace(this.local_addr + ": sending XMIT_REQ (" + missing + ") to " + sender);
        }
        retransmit_msg.putHeader(this.id, hdr);
        this.down_prot.down(new Event(1, retransmit_msg));
        this.xmit_reqs_sent.addAndGet(missing.size());
    }

    @Override
    public void expired(Address key) {
        if (key != null) {
            if (this.log.isDebugEnabled()) {
                this.log.debug(this.local_addr + ": removing connection to " + key + " because it expired");
            }
            this.removeConnection(key);
        }
    }

    protected boolean handleDataReceived(Address sender, long seqno, short conn_id, boolean first, Message msg, Event evt) {
        int len;
        ReceiverEntry entry;
        if (this.log.isTraceEnabled()) {
            StringBuilder sb = new StringBuilder();
            sb.append(this.local_addr).append(" <-- DATA(").append(sender).append(": #").append(seqno);
            if (conn_id != 0) {
                sb.append(", conn_id=").append(conn_id);
            }
            if (first) {
                sb.append(", first");
            }
            sb.append(')');
            this.log.trace(sb);
        }
        if ((entry = this.getReceiverEntry(sender, seqno, first, conn_id)) == null) {
            return false;
        }
        if (this.conn_expiry_timeout > 0L) {
            entry.update();
        }
        Table<Message> win = entry.received_msgs;
        boolean added = win.add(seqno, msg);
        ++this.num_messages_received;
        if (added && (len = msg.getLength()) > 0 && entry.incrementStable(len)) {
            long[] digest = win.getDigest();
            this.sendStableMessage(sender, entry.recv_conn_id, digest[0], digest[1]);
        }
        if (msg.isFlagSet(Message.Flag.OOB) && added) {
            try {
                this.up_prot.up(evt);
            }
            catch (Throwable t) {
                this.log.error("couldn't deliver OOB message %s: %s", msg, t);
            }
        }
        this.removeAndPassUp(win, sender);
        return true;
    }

    protected void handleBatchReceived(Address sender, Map<Short, List<Message>> map) {
        Table<Message> win;
        for (Map.Entry<Short, List<Message>> element : map.entrySet()) {
            List<Message> msg_list = element.getValue();
            if (this.log.isTraceEnabled()) {
                StringBuilder sb = new StringBuilder();
                sb.append(this.local_addr).append(" <-- DATA(").append(sender).append(": " + this.printMessageList(msg_list)).append(')');
                this.log.trace(sb);
            }
            short conn_id = element.getKey();
            ReceiverEntry entry = null;
            Table<Message> win2 = null;
            boolean added = false;
            int total_len = 0;
            for (Message msg : msg_list) {
                Unicast2Header hdr = (Unicast2Header)msg.getHeader(this.id);
                entry = this.getReceiverEntry(sender, hdr.seqno, hdr.first, conn_id);
                if (entry == null) continue;
                win2 = entry.received_msgs;
                boolean msg_added = win2.add(hdr.seqno, msg);
                added |= msg_added;
                ++this.num_messages_received;
                total_len += msg.getLength();
                if (hdr.first && msg_added) {
                    this.sendAck(sender, hdr.seqno, hdr.conn_id);
                }
                if (!msg.isFlagSet(Message.Flag.OOB) || !msg_added) continue;
                try {
                    this.up_prot.up(new Event(1, msg));
                }
                catch (Throwable t) {
                    this.log.error("couldn't deliver OOB message %s: %s", msg, t);
                }
            }
            if (entry != null && this.conn_expiry_timeout > 0L) {
                entry.update();
            }
            if (!added || total_len <= 0 || entry == null || !entry.incrementStable(total_len) || win2 == null) continue;
            long[] digest = win2.getDigest();
            this.sendStableMessage(sender, entry.recv_conn_id, digest[0], digest[1]);
        }
        ReceiverEntry tmp = (ReceiverEntry)this.recv_table.get(sender);
        Table<Message> table = win = tmp != null ? tmp.received_msgs : null;
        if (win != null) {
            this.removeAndPassUp(win, sender);
        }
    }

    protected String printMessageList(List<Message> list) {
        Unicast2Header hdr;
        Message second;
        StringBuilder sb = new StringBuilder();
        int size = list.size();
        Message first = size > 0 ? list.get(0) : null;
        Message message = second = size > 1 ? list.get(size - 1) : null;
        if (first != null && (hdr = (Unicast2Header)first.getHeader(this.id)) != null) {
            sb.append("#" + hdr.seqno);
        }
        if (second != null && (hdr = (Unicast2Header)second.getHeader(this.id)) != null) {
            sb.append(" - #" + hdr.seqno);
        }
        return sb.toString();
    }

    protected void removeAndPassUp(Table<Message> win, Address sender) {
        AtomicBoolean processing = win.getProcessing();
        if (!processing.compareAndSet(false, true)) {
            return;
        }
        boolean released_processing = false;
        try {
            while (true) {
                List<Message> msgs;
                if ((msgs = win.removeMany(processing, true, this.max_msg_batch_size)) == null || msgs.isEmpty()) {
                    released_processing = true;
                    return;
                }
                MessageBatch batch = new MessageBatch(this.local_addr, sender, null, false, msgs);
                for (Message msg_to_deliver : batch) {
                    if (!msg_to_deliver.isFlagSet(Message.Flag.OOB)) continue;
                    batch.remove(msg_to_deliver);
                }
                try {
                    if (this.log.isTraceEnabled()) {
                        Message first = batch.first();
                        Message last = batch.last();
                        StringBuilder sb = new StringBuilder(this.local_addr + ": delivering");
                        if (first != null && last != null) {
                            Unicast2Header hdr1 = (Unicast2Header)first.getHeader(this.id);
                            Unicast2Header hdr2 = (Unicast2Header)last.getHeader(this.id);
                            sb.append(" #").append(hdr1.seqno).append(" - #").append(hdr2.seqno);
                        }
                        sb.append(" (" + batch.size()).append(" messages)");
                        this.log.trace(sb);
                    }
                    this.up_prot.up(batch);
                }
                catch (Throwable t) {
                    this.log.error(Util.getMessage("FailedToDeliverBatch"), batch, t);
                }
            }
        }
        finally {
            if (!released_processing) {
                processing.set(false);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected ReceiverEntry getReceiverEntry(Address sender, long seqno, boolean first, short conn_id) {
        ReceiverEntry entry = (ReceiverEntry)this.recv_table.get(sender);
        if (entry != null && entry.recv_conn_id == conn_id) {
            return entry;
        }
        this.recv_table_lock.lock();
        try {
            entry = (ReceiverEntry)this.recv_table.get(sender);
            if (first) {
                if (entry == null) {
                    entry = this.getOrCreateReceiverEntry(sender, seqno, conn_id);
                } else if (conn_id != entry.recv_conn_id) {
                    if (this.log.isTraceEnabled()) {
                        this.log.trace(this.local_addr + ": conn_id=" + conn_id + " != " + entry.recv_conn_id + "; resetting receiver window");
                    }
                    this.recv_table.remove(sender);
                    entry = this.getOrCreateReceiverEntry(sender, seqno, conn_id);
                }
            } else if (entry == null || entry.recv_conn_id != conn_id) {
                this.recv_table_lock.unlock();
                this.sendRequestForFirstSeqno(sender, seqno);
                ReceiverEntry receiverEntry = null;
                return receiverEntry;
            }
            ReceiverEntry receiverEntry = entry;
            return receiverEntry;
        }
        finally {
            if (this.recv_table_lock.isHeldByCurrentThread()) {
                this.recv_table_lock.unlock();
            }
        }
    }

    protected ReceiverEntry getOrCreateReceiverEntry(Address sender, long seqno, short conn_id) {
        Table<Message> table = new Table<Message>(this.xmit_table_num_rows, this.xmit_table_msgs_per_row, seqno - 1L, this.xmit_table_resize_factor, this.xmit_table_max_compaction_time);
        ReceiverEntry entry = new ReceiverEntry(table, conn_id);
        ReceiverEntry entry2 = this.recv_table.putIfAbsent(sender, entry);
        if (entry2 != null) {
            return entry2;
        }
        if (this.log.isTraceEnabled()) {
            this.log.trace(this.local_addr + ": created receiver window for " + sender + " at seqno=#" + seqno + " for conn-id=" + conn_id);
        }
        return entry;
    }

    protected void handleXmitRequest(Address sender, SeqnoList missing) {
        Table<Message> win;
        if (this.log.isTraceEnabled()) {
            this.log.trace(new StringBuilder().append(this.local_addr).append(" <-- XMIT(").append(sender).append(": #").append(missing).append(')'));
        }
        SenderEntry entry = (SenderEntry)this.send_table.get(sender);
        this.xmit_reqs_received.addAndGet(missing.size());
        Table<Message> table = win = entry != null ? entry.sent_msgs : null;
        if (win != null) {
            for (long seqno : missing) {
                Message msg = win.get(seqno);
                if (msg == null) {
                    if (!this.log.isWarnEnabled() || !this.log_not_found_msgs || this.local_addr.equals(sender) || seqno <= win.getLow()) continue;
                    StringBuilder sb = new StringBuilder();
                    sb.append(this.local_addr + ": (requester=").append(sender).append(") message ").append(sender).append("::").append(seqno).append(" not found in retransmission table of ").append(sender).append(":\n").append(win);
                    this.log.warn(sb.toString());
                    continue;
                }
                this.down_prot.down(new Event(1, msg));
                this.xmit_rsps_sent.incrementAndGet();
            }
        }
    }

    protected void handleResendingOfFirstMessage(Address sender, long seqno) {
        SenderEntry entry;
        Table<Message> win;
        if (this.log.isTraceEnabled()) {
            this.log.trace(this.local_addr + " <-- SEND_FIRST_SEQNO(" + sender + "," + seqno + ")");
        }
        Table<Message> table = win = (entry = (SenderEntry)this.send_table.get(sender)) != null ? entry.sent_msgs : null;
        if (win == null) {
            if (this.log.isTraceEnabled()) {
                this.log.trace(this.local_addr + ": sender window for " + sender + " not found");
            }
            return;
        }
        boolean first_sent = false;
        for (long i = win.getLow() + 1L; i <= seqno; ++i) {
            Message rsp = win.get(i);
            if (rsp == null) continue;
            if (first_sent) {
                this.down_prot.down(new Event(1, rsp));
                continue;
            }
            first_sent = true;
            Message copy = rsp.copy();
            Unicast2Header hdr = (Unicast2Header)copy.getHeader(this.id);
            Unicast2Header newhdr = hdr.copy();
            newhdr.first = true;
            copy.putHeader(this.id, newhdr);
            this.down_prot.down(new Event(1, copy));
        }
    }

    protected void startRetransmitTask() {
        if (this.xmit_task == null || this.xmit_task.isDone()) {
            this.xmit_task = this.timer.scheduleWithFixedDelay(new RetransmitTask(), 0L, this.xmit_interval, TimeUnit.MILLISECONDS);
        }
    }

    protected void stopRetransmitTask() {
        if (this.xmit_task != null) {
            this.xmit_task.cancel(true);
            this.xmit_task = null;
        }
    }

    protected synchronized short getNewConnectionId() {
        short retval = this.last_conn_id;
        this.last_conn_id = this.last_conn_id >= Short.MAX_VALUE || this.last_conn_id < 0 ? (short)0 : (short)(this.last_conn_id + 1);
        return retval;
    }

    protected void sendRequestForFirstSeqno(Address dest, long seqno_received) {
        Message msg = new Message(dest).setFlag(Message.Flag.OOB, Message.Flag.INTERNAL);
        Unicast2Header hdr = Unicast2Header.createSendFirstSeqnoHeader(seqno_received);
        msg.putHeader(this.id, hdr);
        if (this.log.isTraceEnabled()) {
            this.log.trace(this.local_addr + " --> SEND_FIRST_SEQNO(" + dest + "," + seqno_received + ")");
        }
        this.down_prot.down(new Event(1, msg));
    }

    protected void sendAck(Address dest, long seqno, short conn_id) {
        Message msg = new Message(dest).setFlag(Message.Flag.OOB, Message.Flag.INTERNAL).putHeader(this.id, Unicast2Header.createAckHeader(seqno, conn_id));
        if (this.log.isTraceEnabled()) {
            this.log.trace(this.local_addr + " --> ACK(" + dest + "," + seqno + " [conn_id=" + conn_id + "])");
        }
        this.down_prot.down(new Event(1, msg));
    }

    @ManagedOperation(description="Closes connections that have been idle for more than conn_expiry_timeout ms")
    public void reapIdleConnections() {
        long age;
        Object val;
        if (this.conn_expiry_timeout <= 0L) {
            return;
        }
        for (Map.Entry entry : this.send_table.entrySet()) {
            val = (SenderEntry)entry.getValue();
            age = ((SenderEntry)val).age();
            if (age < this.conn_expiry_timeout) continue;
            this.removeSendConnection((Address)entry.getKey());
            if (!this.log.isDebugEnabled()) continue;
            this.log.debug(this.local_addr + ": removed expired connection for " + entry.getKey() + " (" + age + " ms old) from send_table");
        }
        for (Map.Entry entry : this.recv_table.entrySet()) {
            val = (ReceiverEntry)entry.getValue();
            age = ((ReceiverEntry)val).age();
            if (age < this.conn_expiry_timeout) continue;
            this.removeReceiveConnection((Address)entry.getKey());
            if (!this.log.isDebugEnabled()) continue;
            this.log.debug(this.local_addr + ": removed expired connection for " + entry.getKey() + " (" + age + " ms old) from recv_table");
        }
    }

    @ManagedOperation(description="Triggers the retransmission task, asking all senders for missing messages")
    public void triggerXmit() {
        for (Map.Entry entry : this.recv_table.entrySet()) {
            SeqnoList missing;
            Table<Message> buf;
            Address target = (Address)entry.getKey();
            ReceiverEntry val = (ReceiverEntry)entry.getValue();
            Table<Message> table = buf = val != null ? val.received_msgs : null;
            if (buf != null && buf.getNumMissing() > 0 && (missing = buf.getMissing()) != null) {
                long highest = missing.getLast();
                Long prev_seqno = this.xmit_task_map.get(target);
                if (prev_seqno == null) {
                    this.xmit_task_map.put(target, highest);
                    continue;
                }
                missing.removeHigherThan(prev_seqno);
                if (highest > prev_seqno) {
                    this.xmit_task_map.put(target, highest);
                }
                if (missing.isEmpty()) continue;
                this.retransmit(missing, target);
                continue;
            }
            if (this.xmit_task_map.isEmpty()) continue;
            this.xmit_task_map.remove(target);
        }
        for (SenderEntry senderEntry : this.send_table.values()) {
            Message msg;
            if (senderEntry.connEstablished() || (msg = senderEntry.getFirstMessage()) == null) continue;
            Unicast2Header hdr = (Unicast2Header)msg.getHeader(this.id);
            if (hdr.first) {
                if (this.log.isTraceEnabled()) {
                    this.log.trace(this.local_addr + ": resending first message " + hdr.seqno + " to " + msg.getDest());
                }
                this.down_prot.down(new Event(1, msg));
                continue;
            }
            Message copy = msg.copy();
            hdr = (Unicast2Header)copy.getHeader(this.id);
            Unicast2Header newhdr = hdr.copy();
            newhdr.first = true;
            copy.putHeader(this.id, newhdr);
            if (this.log.isTraceEnabled()) {
                this.log.trace(this.local_addr + ": resending first message " + hdr.seqno + " to " + msg.getDest());
            }
            this.down_prot.down(new Event(1, copy));
        }
    }

    protected class RetransmitTask
    implements Runnable {
        protected RetransmitTask() {
        }

        @Override
        public void run() {
            UNICAST2.this.triggerXmit();
        }

        public String toString() {
            return UNICAST2.class.getSimpleName() + ": RetransmitTask (interval=" + UNICAST2.this.xmit_interval + " ms)";
        }
    }

    protected class ConnectionReaper
    implements Runnable {
        protected ConnectionReaper() {
        }

        @Override
        public void run() {
            UNICAST2.this.reapIdleConnections();
        }

        public String toString() {
            return UNICAST2.class.getSimpleName() + ": ConnectionReaper (interval=" + UNICAST2.this.conn_expiry_timeout + " ms)";
        }
    }

    protected final class ReceiverEntry {
        protected final Table<Message> received_msgs;
        protected final short recv_conn_id;
        protected int received_bytes = 0;
        protected final AtomicLong timestamp = new AtomicLong(0L);
        protected final Lock lock = new ReentrantLock();
        protected long last_highest = -1L;
        protected int num_stable_msgs = 0;

        public ReceiverEntry(Table<Message> received_msgs, short recv_conn_id) {
            this.received_msgs = received_msgs;
            this.recv_conn_id = recv_conn_id;
            this.update();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        boolean incrementStable(int len) {
            this.lock.lock();
            try {
                if ((long)(this.received_bytes + len) >= UNICAST2.this.max_bytes) {
                    this.received_bytes = 0;
                    boolean bl = true;
                    return bl;
                }
                this.received_bytes += len;
                boolean bl = false;
                return bl;
            }
            finally {
                this.lock.unlock();
            }
        }

        void reset() {
            this.received_bytes = 0;
            this.last_highest = -1L;
            this.num_stable_msgs = 0;
        }

        void update() {
            this.timestamp.set(System.currentTimeMillis());
        }

        long age() {
            return System.currentTimeMillis() - this.timestamp.longValue();
        }

        public String toString() {
            StringBuilder sb = new StringBuilder();
            if (this.received_msgs != null) {
                sb.append(this.received_msgs).append(", ");
            }
            sb.append("recv_conn_id=" + this.recv_conn_id).append(" (" + this.age() + " ms old)");
            return sb.toString();
        }
    }

    protected final class SenderEntry {
        final Table<Message> sent_msgs;
        final AtomicLong sent_msgs_seqno = new AtomicLong(1L);
        final short send_conn_id;
        protected final AtomicLong timestamp = new AtomicLong(0L);
        protected volatile boolean ack_received;

        public SenderEntry(short send_conn_id) {
            this.send_conn_id = send_conn_id;
            this.sent_msgs = new Table(UNICAST2.this.xmit_table_num_rows, UNICAST2.this.xmit_table_msgs_per_row, 0L, UNICAST2.this.xmit_table_resize_factor, UNICAST2.this.xmit_table_max_compaction_time);
            this.update();
        }

        protected void update() {
            this.timestamp.set(System.currentTimeMillis());
        }

        protected long age() {
            return System.currentTimeMillis() - this.timestamp.longValue();
        }

        protected boolean connEstablished() {
            return this.ack_received;
        }

        protected SenderEntry connEstablished(boolean flag) {
            this.ack_received = flag;
            return this;
        }

        protected Message getFirstMessage() {
            return this.sent_msgs.get(this.sent_msgs.getLow() + 1L);
        }

        public String toString() {
            StringBuilder sb = new StringBuilder();
            if (this.sent_msgs != null) {
                sb.append(this.sent_msgs).append(", ");
            }
            sb.append("send_conn_id=" + this.send_conn_id).append(" (" + this.age() + " ms old) [").append((this.ack_received ? "acked" : "not acked") + "])");
            return sb.toString();
        }
    }

    public static class Unicast2Header
    extends Header {
        public static final byte DATA = 0;
        public static final byte XMIT_REQ = 1;
        public static final byte SEND_FIRST_SEQNO = 2;
        public static final byte STABLE = 3;
        public static final byte ACK = 4;
        byte type;
        long seqno;
        long high_seqno;
        short conn_id;
        boolean first;

        public Unicast2Header() {
        }

        public static Unicast2Header createDataHeader(long seqno, short conn_id, boolean first) {
            return new Unicast2Header(0, seqno, 0L, conn_id, first);
        }

        public static Unicast2Header createXmitReqHeader() {
            return new Unicast2Header(1);
        }

        public static Unicast2Header createStableHeader(short conn_id, long low, long high) {
            if (low > high) {
                throw new IllegalArgumentException("low (" + low + ") needs to be <= high (" + high + ")");
            }
            Unicast2Header retval = new Unicast2Header(3, low);
            retval.high_seqno = high;
            retval.conn_id = conn_id;
            return retval;
        }

        public static Unicast2Header createSendFirstSeqnoHeader(long seqno_received) {
            return new Unicast2Header(2, seqno_received);
        }

        public static Unicast2Header createAckHeader(long acked_seqno, short conn_id) {
            Unicast2Header retval = new Unicast2Header(4, acked_seqno);
            retval.conn_id = conn_id;
            return retval;
        }

        protected Unicast2Header(byte type) {
            this.type = type;
        }

        protected Unicast2Header(byte type, long seqno) {
            this.type = type;
            this.seqno = seqno;
        }

        protected Unicast2Header(byte type, long seqno, long high, short conn_id, boolean first) {
            this.type = type;
            this.seqno = seqno;
            this.high_seqno = high;
            this.conn_id = conn_id;
            this.first = first;
        }

        public byte getType() {
            return this.type;
        }

        public long getSeqno() {
            return this.seqno;
        }

        public long getHighSeqno() {
            return this.high_seqno;
        }

        public short getConnId() {
            return this.conn_id;
        }

        public boolean isFirst() {
            return this.first;
        }

        @Override
        public String toString() {
            StringBuilder sb = new StringBuilder();
            sb.append(Unicast2Header.type2Str(this.type)).append(", seqno=").append(this.seqno);
            if (this.conn_id != 0) {
                sb.append(", conn_id=").append(this.conn_id);
            }
            if (this.first) {
                sb.append(", first");
            }
            return sb.toString();
        }

        public static String type2Str(byte t) {
            switch (t) {
                case 0: {
                    return "DATA";
                }
                case 1: {
                    return "XMIT_REQ";
                }
                case 2: {
                    return "SEND_FIRST_SEQNO";
                }
                case 3: {
                    return "STABLE";
                }
                case 4: {
                    return "ACK";
                }
            }
            return "<unknown>";
        }

        @Override
        public final int size() {
            int retval = 1;
            switch (this.type) {
                case 0: {
                    retval += Bits.size(this.seqno) + 2 + 1;
                    break;
                }
                case 1: {
                    break;
                }
                case 3: {
                    retval += Bits.size(this.seqno, this.high_seqno) + 2;
                    break;
                }
                case 2: {
                    retval += Bits.size(this.seqno);
                    break;
                }
                case 4: {
                    retval += Bits.size(this.seqno) + 2;
                }
            }
            return retval;
        }

        public Unicast2Header copy() {
            return new Unicast2Header(this.type, this.seqno, this.high_seqno, this.conn_id, this.first);
        }

        @Override
        public void writeTo(DataOutput out) throws Exception {
            out.writeByte(this.type);
            switch (this.type) {
                case 0: {
                    Bits.writeLong(this.seqno, out);
                    out.writeShort(this.conn_id);
                    out.writeBoolean(this.first);
                    break;
                }
                case 1: {
                    break;
                }
                case 3: {
                    Bits.writeLongSequence(this.seqno, this.high_seqno, out);
                    out.writeShort(this.conn_id);
                    break;
                }
                case 2: {
                    Bits.writeLong(this.seqno, out);
                    break;
                }
                case 4: {
                    Bits.writeLong(this.seqno, out);
                    out.writeShort(this.conn_id);
                }
            }
        }

        @Override
        public void readFrom(DataInput in) throws Exception {
            this.type = in.readByte();
            switch (this.type) {
                case 0: {
                    this.seqno = Bits.readLong(in);
                    this.conn_id = in.readShort();
                    this.first = in.readBoolean();
                    break;
                }
                case 1: {
                    break;
                }
                case 3: {
                    long[] seqnos = Bits.readLongSequence(in);
                    this.seqno = seqnos[0];
                    this.high_seqno = seqnos[1];
                    this.conn_id = in.readShort();
                    break;
                }
                case 2: {
                    this.seqno = Bits.readLong(in);
                    break;
                }
                case 4: {
                    this.seqno = Bits.readLong(in);
                    this.conn_id = in.readShort();
                }
            }
        }
    }
}

