/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.shaded.org.jgroups.protocols;

import java.util.Objects;
import java.util.concurrent.locks.LockSupport;
import org.apache.activemq.artemis.shaded.org.jgroups.Address;
import org.apache.activemq.artemis.shaded.org.jgroups.Message;
import org.apache.activemq.artemis.shaded.org.jgroups.protocols.BaseBundler;
import org.apache.activemq.artemis.shaded.org.jgroups.protocols.TP;
import org.apache.activemq.artemis.shaded.org.jgroups.util.BiConsumer;
import org.apache.activemq.artemis.shaded.org.jgroups.util.RingBuffer;
import org.apache.activemq.artemis.shaded.org.jgroups.util.Runner;
import org.apache.activemq.artemis.shaded.org.jgroups.util.Util;

public class RingBufferBundler
extends BaseBundler {
    protected RingBuffer<Message> rb;
    protected Runner bundler_thread;
    protected int num_spins = 40;
    protected static final String THREAD_NAME = "RingBufferBundler";
    protected BiConsumer<Integer, Integer> wait_strategy = SPIN_PARK;
    protected int capacity;
    protected final Runnable run_function = new Runnable(){

        @Override
        public void run() {
            RingBufferBundler.this.readMessages();
        }
    };
    protected final Runnable stop_function = new Runnable(){

        @Override
        public void run() {
            RingBufferBundler.this.rb.clear();
        }
    };
    protected static final BiConsumer<Integer, Integer> SPIN = new BiConsumer<Integer, Integer>(){

        @Override
        public void accept(Integer ignore1, Integer ignore2) {
        }
    };
    protected static final BiConsumer<Integer, Integer> YIELD = new BiConsumer<Integer, Integer>(){

        @Override
        public void accept(Integer it, Integer spins) {
            Thread.yield();
        }
    };
    protected static final BiConsumer<Integer, Integer> PARK = new BiConsumer<Integer, Integer>(){

        @Override
        public void accept(Integer it, Integer spins) {
            LockSupport.parkNanos(1L);
        }
    };
    protected static final BiConsumer<Integer, Integer> SPIN_PARK = new BiConsumer<Integer, Integer>(){

        @Override
        public void accept(Integer it, Integer spins) {
            if (it < spins / 10) {
                // empty if block
            }
            LockSupport.parkNanos(1L);
        }
    };
    protected static final BiConsumer<Integer, Integer> SPIN_YIELD = new BiConsumer<Integer, Integer>(){

        @Override
        public void accept(Integer it, Integer spins) {
            if (it < spins / 10) {
                // empty if block
            }
            Thread.yield();
        }
    };

    public RingBufferBundler() {
    }

    protected RingBufferBundler(RingBuffer<Message> rb) {
        this.rb = rb;
        this.capacity = rb.capacity();
    }

    public RingBufferBundler(int capacity) {
        this(new RingBuffer<Message>(Message.class, RingBufferBundler.assertPositive(capacity, "bundler capacity cannot be " + capacity)));
    }

    public RingBuffer<Message> buf() {
        return this.rb;
    }

    public Thread getThread() {
        return this.bundler_thread.getThread();
    }

    @Override
    public int size() {
        return this.rb.size();
    }

    public int numSpins() {
        return this.num_spins;
    }

    public RingBufferBundler numSpins(int n) {
        this.num_spins = n;
        return this;
    }

    public String waitStrategy() {
        return RingBufferBundler.print(this.wait_strategy);
    }

    public RingBufferBundler waitStrategy(String st) {
        this.wait_strategy = this.createWaitStrategy(st, YIELD);
        return this;
    }

    @Override
    public void init(TP transport) {
        super.init(transport);
        this.checkForSharedTransport(transport);
        if (this.rb == null) {
            this.rb = new RingBuffer<Message>(Message.class, RingBufferBundler.assertPositive(transport.getBundlerCapacity(), "bundler capacity cannot be " + transport.getBundlerCapacity()));
            this.capacity = this.rb.capacity();
        }
        this.bundler_thread = new Runner(transport.getThreadFactory(), THREAD_NAME, this.run_function, this.stop_function);
    }

    @Override
    public void start() {
        this.bundler_thread.start();
    }

    @Override
    public void stop() {
        this.bundler_thread.stop();
    }

    @Override
    public void send(Message msg) throws Exception {
        this.rb.put(msg);
    }

    public void sendBundledMessages(Message[] buf, int read_index, int available_msgs) {
        int max_bundle_size = this.transport.getMaxBundleSize();
        byte[] cluster_name = this.transport.cluster_name.chars();
        int start = read_index;
        int end = this.index(start + available_msgs - 1);
        while (true) {
            Message msg;
            if ((msg = buf[start]) == null) {
                if (start == end) break;
                start = this.advance(start);
                continue;
            }
            Address dest = msg.dest();
            try {
                this.output.position(0);
                Util.writeMessageListHeader(dest, msg.src(), cluster_name, 1, this.output, dest == null);
                int size_pos = this.output.position() - 4;
                int num_msgs = this.marshalMessagesToSameDestination(dest, buf, start, end, max_bundle_size);
                if (num_msgs > 1) {
                    int current_pos = this.output.position();
                    this.output.position(size_pos);
                    this.output.writeInt(num_msgs);
                    this.output.position(current_pos);
                }
                this.transport.doSend(null, this.output.buffer(), 0, this.output.position(), dest);
                if (this.transport.statsEnabled()) {
                    this.transport.incrBatchesSent(num_msgs);
                }
            }
            catch (Exception ex) {
                this.log.error("failed to send message(s)", ex);
            }
            if (start == end) break;
            start = this.advance(start);
        }
    }

    protected int marshalMessagesToSameDestination(Address dest, Message[] buf, int start_index, int end_index, int max_bundle_size) throws Exception {
        int num_msgs = 0;
        int bytes = 0;
        while (true) {
            Message msg;
            if ((msg = buf[start_index]) != null && Objects.equals(dest, msg.dest())) {
                long size = msg.size();
                if ((long)bytes + size > (long)max_bundle_size) break;
                bytes = (int)((long)bytes + size);
                ++num_msgs;
                buf[start_index] = null;
                msg.writeToNoAddrs(msg.src(), this.output, this.transport.getId());
            }
            if (start_index == end_index) break;
            start_index = this.advance(start_index);
        }
        return num_msgs;
    }

    protected void readMessages() {
        try {
            int available_msgs = this.rb.waitForMessages(this.num_spins, this.wait_strategy);
            int read_index = this.rb.readIndexLockless();
            Message[] buf = this.rb.buf();
            this.sendBundledMessages(buf, read_index, available_msgs);
            this.rb.publishReadIndex(available_msgs);
        }
        catch (Throwable throwable) {
            // empty catch block
        }
    }

    protected final int advance(int index) {
        return index + 1 == this.capacity ? 0 : index + 1;
    }

    protected final int index(int idx) {
        return idx & this.capacity - 1;
    }

    protected static String print(BiConsumer<Integer, Integer> wait_strategy) {
        if (wait_strategy == null) {
            return null;
        }
        if (wait_strategy == SPIN) {
            return "spin";
        }
        if (wait_strategy == YIELD) {
            return "yield";
        }
        if (wait_strategy == PARK) {
            return "park";
        }
        if (wait_strategy == SPIN_PARK) {
            return "spin-park";
        }
        if (wait_strategy == SPIN_YIELD) {
            return "spin-yield";
        }
        return wait_strategy.getClass().getSimpleName();
    }

    protected BiConsumer<Integer, Integer> createWaitStrategy(String st, BiConsumer<Integer, Integer> default_wait_strategy) {
        if (st == null) {
            return default_wait_strategy != null ? default_wait_strategy : null;
        }
        switch (st) {
            case "spin": {
                this.wait_strategy = SPIN;
                return this.wait_strategy;
            }
            case "yield": {
                this.wait_strategy = YIELD;
                return this.wait_strategy;
            }
            case "park": {
                this.wait_strategy = PARK;
                return this.wait_strategy;
            }
            case "spin_park": 
            case "spin-park": {
                this.wait_strategy = SPIN_PARK;
                return this.wait_strategy;
            }
            case "spin_yield": 
            case "spin-yield": {
                this.wait_strategy = SPIN_YIELD;
                return this.wait_strategy;
            }
        }
        try {
            Class clazz = Util.loadClass(st, this.getClass());
            return (BiConsumer)clazz.newInstance();
        }
        catch (Throwable t) {
            this.log.error("failed creating wait_strategy " + st, t);
            return default_wait_strategy != null ? default_wait_strategy : null;
        }
    }

    protected static int assertPositive(int value, String message) {
        if (value <= 0) {
            throw new IllegalArgumentException(message);
        }
        return value;
    }
}

