package org.apache.activemq.artemis.shaded.org.jgroups.protocols;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Stream;
import org.apache.activemq.artemis.shaded.org.jgroups.Address;
import org.apache.activemq.artemis.shaded.org.jgroups.Message;
import org.apache.activemq.artemis.shaded.org.jgroups.NullAddress;
import org.apache.activemq.artemis.shaded.org.jgroups.View;
import org.apache.activemq.artemis.shaded.org.jgroups.annotations.ManagedAttribute;
import org.apache.activemq.artemis.shaded.org.jgroups.annotations.Property;
import org.apache.activemq.artemis.shaded.org.jgroups.conf.AttributeType;
import org.apache.activemq.artemis.shaded.org.jgroups.logging.Log;
import org.apache.activemq.artemis.shaded.org.jgroups.util.ByteArrayDataOutputStream;
import org.apache.activemq.artemis.shaded.org.jgroups.util.FastArray;
import org.apache.activemq.artemis.shaded.org.jgroups.util.Profiler;
import org.apache.activemq.artemis.shaded.org.jgroups.util.Util;

/* loaded from: input_file:org/apache/activemq/artemis/shaded/org/jgroups/protocols/PerDestinationBundler.class */
public class PerDestinationBundler implements Bundler {
    protected TP transport;
    protected Log log;
    protected Address local_addr;
    protected static final Address NULL = new NullAddress();
    protected static final String THREAD_NAME = "pd-bundler";

    @Property(name = "max_size", type = AttributeType.BYTES, description = "Maximum number of bytes for messages to be queued (per destination) until they are sent")
    protected int max_size = 64000;

    @ManagedAttribute(description = "Total number of messages sent (single and batches)", type = AttributeType.SCALAR)
    protected final LongAdder total_msgs_sent = new LongAdder();

    @ManagedAttribute(description = "Number of single messages sent", type = AttributeType.SCALAR)
    protected final LongAdder num_single_msgs_sent = new LongAdder();

    @ManagedAttribute(description = "Number of batches sent", type = AttributeType.SCALAR)
    protected final LongAdder num_batches_sent = new LongAdder();

    @ManagedAttribute(description = "Number of batches sent because no more messages were available", type = AttributeType.SCALAR)
    protected final LongAdder num_send_due_to_no_msgs = new LongAdder();

    @ManagedAttribute(description = "Number of batches sent because the queue was full", type = AttributeType.SCALAR)
    protected final LongAdder num_sends_due_to_max_size = new LongAdder();
    protected final Map<Address, SendBuffer> dests = Util.createConcurrentMap();

    @ManagedAttribute
    final Profiler p_send_msg_list = new Profiler();

    @ManagedAttribute
    final Profiler p_send_single_msg = new Profiler();

    @ManagedAttribute
    final Profiler p_send_msgs = new Profiler();

    /* loaded from: input_file:org/apache/activemq/artemis/shaded/org/jgroups/protocols/PerDestinationBundler$SendBuffer.class */
    protected class SendBuffer implements Runnable {
        private final Address dest;
        private final ByteArrayDataOutputStream output;
        private volatile Thread bundler_thread;
        private long count;
        protected final FastArray<Message> msgs = new FastArray(16).increment(10);
        private final Lock lock = new ReentrantLock(false);
        private final BlockingQueue<Message> queue = new ArrayBlockingQueue(8192);
        private final List<Message> remove_queue = new ArrayList(1024);
        private volatile boolean running = true;

        protected SendBuffer(Address address) {
            this.output = new ByteArrayDataOutputStream(PerDestinationBundler.this.max_size + 5);
            this.dest = address;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (this.running) {
                try {
                    Message take = this.queue.take();
                    if (take != null) {
                        addAndSendIfSizeExceeded(take);
                        while (true) {
                            this.remove_queue.clear();
                            if (this.queue.drainTo(this.remove_queue) <= 0) {
                                break;
                            }
                            for (int i = 0; i < this.remove_queue.size(); i++) {
                                addAndSendIfSizeExceeded(this.remove_queue.get(i));
                            }
                        }
                        if (this.count > 0) {
                            sendBundledMessages();
                            PerDestinationBundler.this.num_send_due_to_no_msgs.increment();
                        }
                    }
                } catch (Throwable th) {
                }
            }
        }

        protected void addAndSendIfSizeExceeded(Message message) {
            int size = message.size();
            if (this.count + size >= PerDestinationBundler.this.max_size) {
                sendBundledMessages();
                PerDestinationBundler.this.num_sends_due_to_max_size.increment();
            }
            addMessage(message, size);
        }

        protected void addMessage(Message message, int i) {
            this.msgs.add((FastArray<Message>) message);
            this.count += i;
        }

        protected void sendBundledMessages() {
            sendBatch(this.dest, this.msgs);
            this.msgs.clear(false);
            this.count = 0L;
        }

        public SendBuffer start() {
            if (this.running) {
                stop();
            }
            this.bundler_thread = PerDestinationBundler.this.transport.getThreadFactory().newThread(this, PerDestinationBundler.THREAD_NAME);
            this.running = true;
            this.bundler_thread.start();
            return this;
        }

        public void stop() {
            this.running = false;
            Thread thread = this.bundler_thread;
            if (thread != null) {
                thread.interrupt();
            }
        }

        public void renameThread() {
            PerDestinationBundler.this.transport.getThreadFactory().renameThread(PerDestinationBundler.THREAD_NAME, this.bundler_thread);
        }

        protected void send(Message message) throws Exception {
            this.queue.put(message);
        }

        protected void sendBatch(Address address, FastArray<Message> fastArray) {
            if (fastArray.isEmpty()) {
                return;
            }
            sendMessages(address == PerDestinationBundler.NULL ? null : address, PerDestinationBundler.this.local_addr, fastArray);
        }

        protected void sendSingleMessage(Address address, Message message) {
            PerDestinationBundler.this.p_send_single_msg.start();
            try {
                try {
                    this.output.position(0);
                    Util.writeMessage(message, this.output, address == null);
                    PerDestinationBundler.this.transport.doSend(this.output.buffer(), 0, this.output.position(), address);
                    PerDestinationBundler.this.transport.getMessageStats().incrNumSingleMsgsSent();
                    PerDestinationBundler.this.num_single_msgs_sent.increment();
                    PerDestinationBundler.this.p_send_single_msg.stop();
                } catch (Throwable th) {
                    PerDestinationBundler.this.log.error("%s: failed sending message to %s: %s", PerDestinationBundler.this.local_addr, address, th);
                    PerDestinationBundler.this.p_send_single_msg.stop();
                }
            } catch (Throwable th2) {
                PerDestinationBundler.this.p_send_single_msg.stop();
                throw th2;
            }
        }

        protected void sendMessageList(Address address, Address address2, FastArray<Message> fastArray) {
            PerDestinationBundler.this.p_send_msg_list.start();
            this.output.position(0);
            try {
                try {
                    Util.writeMessageList(address, address2, PerDestinationBundler.this.transport.cluster_name.chars(), fastArray, this.output, address == null);
                    PerDestinationBundler.this.transport.doSend(this.output.buffer(), 0, this.output.position(), address);
                    PerDestinationBundler.this.transport.getMessageStats().incrNumBatchesSent();
                    PerDestinationBundler.this.num_batches_sent.increment();
                    PerDestinationBundler.this.p_send_msg_list.stop();
                } catch (Throwable th) {
                    PerDestinationBundler.this.log.trace(Util.getMessage("FailureSendingMsgBundle"), PerDestinationBundler.this.transport.getAddress(), th);
                    PerDestinationBundler.this.p_send_msg_list.stop();
                }
            } catch (Throwable th2) {
                PerDestinationBundler.this.p_send_msg_list.stop();
                throw th2;
            }
        }

        protected void sendMessages(Address address, Address address2, FastArray<Message> fastArray) {
            PerDestinationBundler.this.p_send_msgs.start();
            try {
                try {
                    int size = fastArray.size();
                    if (size == 0) {
                        return;
                    }
                    if (size == 1) {
                        sendSingleMessage(address, fastArray.get(0));
                    } else {
                        sendMessageList(address, address2, fastArray);
                    }
                    PerDestinationBundler.this.total_msgs_sent.add(size);
                    PerDestinationBundler.this.p_send_msgs.stop();
                } catch (Throwable th) {
                    PerDestinationBundler.this.log.trace(Util.getMessage("FailureSendingMsgBundle"), PerDestinationBundler.this.transport.getAddress(), th);
                    PerDestinationBundler.this.p_send_msgs.stop();
                }
            } finally {
                PerDestinationBundler.this.p_send_msgs.stop();
            }
        }

        public String toString() {
            return String.format("%d msgs", Integer.valueOf(size()));
        }

        protected int size() {
            this.lock.lock();
            try {
                return this.msgs.size();
            } finally {
                this.lock.unlock();
            }
        }
    }

    @Override // org.apache.activemq.artemis.shaded.org.jgroups.protocols.Bundler
    public int size() {
        return ((Integer) this.dests.values().stream().map((v0) -> {
            return v0.size();
        }).reduce(0, (v0, v1) -> {
            return Integer.sum(v0, v1);
        })).intValue();
    }

    @Override // org.apache.activemq.artemis.shaded.org.jgroups.protocols.Bundler
    public int getQueueSize() {
        return -1;
    }

    @Override // org.apache.activemq.artemis.shaded.org.jgroups.protocols.Bundler
    public int getMaxSize() {
        return this.max_size;
    }

    @Override // org.apache.activemq.artemis.shaded.org.jgroups.protocols.Bundler
    public Bundler setMaxSize(int i) {
        this.max_size = i;
        return this;
    }

    @ManagedAttribute(description = "Average number of messages in an BatchMessage")
    public double avgBatchSize() {
        long sum = this.num_batches_sent.sum();
        long sum2 = this.total_msgs_sent.sum();
        long sum3 = this.num_single_msgs_sent.sum();
        if (sum == 0 || sum2 == 0) {
            return 0.0d;
        }
        return (sum2 - sum3) / sum;
    }

    @Override // org.apache.activemq.artemis.shaded.org.jgroups.protocols.Bundler
    public void resetStats() {
        Stream.of((Object[]) new LongAdder[]{this.total_msgs_sent, this.num_batches_sent, this.num_single_msgs_sent, this.num_sends_due_to_max_size}).forEach((v0) -> {
            v0.reset();
        });
        this.p_send_msg_list.reset();
        this.p_send_single_msg.reset();
        this.p_send_msgs.reset();
    }

    @Override // org.apache.activemq.artemis.shaded.org.jgroups.protocols.Bundler
    public void init(TP tp) {
        this.transport = (TP) Objects.requireNonNull(tp);
        this.log = tp.getLog();
    }

    @Override // org.apache.activemq.artemis.shaded.org.jgroups.protocols.Bundler
    public void start() {
        this.local_addr = (Address) Objects.requireNonNull(this.transport.getAddress());
        this.dests.values().forEach((v0) -> {
            v0.start();
        });
    }

    @Override // org.apache.activemq.artemis.shaded.org.jgroups.protocols.Bundler
    public void stop() {
        this.dests.values().forEach((v0) -> {
            v0.stop();
        });
    }

    @Override // org.apache.activemq.artemis.shaded.org.jgroups.protocols.Bundler
    public void send(Message message) throws Exception {
        if (message.getSrc() == null) {
            message.setSrc(this.local_addr);
        }
        Address dest = message.dest() == null ? NULL : message.dest();
        this.dests.computeIfAbsent(dest, address -> {
            return new SendBuffer(dest).start();
        }).send(message);
    }

    @Override // org.apache.activemq.artemis.shaded.org.jgroups.protocols.Bundler
    public void viewChange(View view) {
        List<Address> members = view.getMembers();
        if (members == null) {
            return;
        }
        members.stream().filter(address -> {
            return !this.dests.containsKey(address);
        }).forEach(address2 -> {
            this.dests.putIfAbsent(address2, new SendBuffer(address2).start());
        });
        Stream<Address> filter = this.dests.keySet().stream().filter(address3 -> {
            return (members.contains(address3) || address3 == NULL) ? false : true;
        });
        Map<Address, SendBuffer> map = this.dests;
        Objects.requireNonNull(map);
        filter.forEach((v1) -> {
            r1.remove(v1);
        });
    }
}
