package com.atlassian.audit.broker;

import com.atlassian.audit.api.AuditConsumer;
import com.atlassian.audit.entity.AuditEntity;
import com.atlassian.audit.event.AuditConsumerAddedEvent;
import com.atlassian.audit.event.AuditConsumerRemovedEvent;
import com.atlassian.event.api.EventListener;
import com.atlassian.event.api.EventPublisher;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;

@ThreadSafe
/* loaded from: input_file:WEB-INF/atlassian-bundled-plugins/atlassian-audit-plugin-1.15.1.jar:com/atlassian/audit/broker/ScatterAuditBroker.class */
public class ScatterAuditBroker implements InternalAuditBroker, InitializingBean, DisposableBean {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ScatterAuditBroker.class);
    private final EventPublisher eventPublisher;
    private final AuditPolicy auditPolicy;
    private final AuditEntityRejectionHandler rejectAuditEntityHandler;
    private final AuditConsumerExceptionHandler exceptionHandler;
    private final ConcurrentHashMap<AuditConsumer, ConsumerRegistration> consumerRegistry = new ConcurrentHashMap<>();
    private final int defaultConsumerBufferSize;
    private final int defaultConsumerBatchSize;

    /* JADX INFO: Access modifiers changed from: private */
    @ThreadSafe
    /* loaded from: input_file:WEB-INF/atlassian-bundled-plugins/atlassian-audit-plugin-1.15.1.jar:com/atlassian/audit/broker/ScatterAuditBroker$ConsumerQueue.class */
    public static final class ConsumerQueue {
        private final AuditConsumer auditConsumer;
        private final BlockingQueue<AuditEntity> queue;
        private final int batchSize;
        private final Consumer<List<AuditEntity>> rejectionHandler;

        ConsumerQueue(AuditConsumer auditConsumer, BlockingQueue<AuditEntity> blockingQueue, int i, Consumer<List<AuditEntity>> consumer) {
            this.auditConsumer = auditConsumer;
            this.queue = (BlockingQueue) Objects.requireNonNull(blockingQueue);
            this.batchSize = i;
            this.rejectionHandler = (Consumer) Objects.requireNonNull(consumer);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void offer(AuditEntity auditEntity) {
            while (!this.queue.offer(auditEntity)) {
                discardOldestEntities();
            }
            ScatterAuditBroker.log.trace("#offer auditConsumer={}, entity={}", this.auditConsumer, auditEntity);
        }

        void clear() {
            this.queue.clear();
        }

        List<AuditEntity> take() throws InterruptedException {
            AuditEntity poll;
            ArrayList arrayList = new ArrayList(this.batchSize);
            arrayList.add(this.queue.take());
            while (arrayList.size() < this.batchSize && (poll = this.queue.poll()) != null) {
                arrayList.add(poll);
            }
            return arrayList;
        }

        List<AuditEntity> poll() {
            AuditEntity poll;
            ArrayList arrayList = new ArrayList(this.batchSize);
            while (arrayList.size() < this.batchSize && (poll = this.queue.poll()) != null) {
                arrayList.add(poll);
            }
            return arrayList;
        }

        private void discardOldestEntities() {
            AuditEntity poll;
            ArrayList arrayList = new ArrayList(this.batchSize);
            for (int i = 0; i < this.batchSize && (poll = this.queue.poll()) != null; i++) {
                arrayList.add(poll);
            }
            ScatterAuditBroker.log.trace("#discardOldestEntities auditConsumer={}, batchSize={}, batch={}", this.auditConsumer, Integer.valueOf(this.batchSize), arrayList);
            this.rejectionHandler.accept(arrayList);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/atlassian-bundled-plugins/atlassian-audit-plugin-1.15.1.jar:com/atlassian/audit/broker/ScatterAuditBroker$ConsumerRegistration.class */
    public static class ConsumerRegistration {
        private final ConsumerQueue queue;
        private final ConsumerThread thread;

        private ConsumerRegistration(ConsumerQueue consumerQueue, ConsumerThread consumerThread) {
            this.queue = (ConsumerQueue) Objects.requireNonNull(consumerQueue);
            this.thread = (ConsumerThread) Objects.requireNonNull(consumerThread);
        }

        ConsumerThread getThread() {
            return this.thread;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/atlassian-bundled-plugins/atlassian-audit-plugin-1.15.1.jar:com/atlassian/audit/broker/ScatterAuditBroker$ConsumerThread.class */
    public final class ConsumerThread extends Thread {
        private final AtomicBoolean running;
        private final AuditConsumer consumer;
        private final ConsumerQueue queue;
        private final BiConsumer<RuntimeException, List<AuditEntity>> exceptionHandler;

        ConsumerThread(ConsumerQueue consumerQueue, AuditConsumer auditConsumer, BiConsumer<RuntimeException, List<AuditEntity>> biConsumer) {
            super("audit-broker-consumer-thread-" + Integer.toHexString(ScatterAuditBroker.this.hashCode()) + "-consumer-" + Integer.toHexString(auditConsumer.hashCode()));
            this.running = new AtomicBoolean(false);
            this.queue = (ConsumerQueue) Objects.requireNonNull(consumerQueue);
            this.consumer = (AuditConsumer) Objects.requireNonNull(auditConsumer);
            this.exceptionHandler = (BiConsumer) Objects.requireNonNull(biConsumer);
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (!isInterrupted()) {
                try {
                    try {
                        List<AuditEntity> take = this.queue.take();
                        ScatterAuditBroker.log.trace("#run batch={}", take);
                        processBatch(take);
                    } catch (InterruptedException e) {
                    }
                } catch (Throwable th) {
                    ScatterAuditBroker.log.error("#run ConsumerThread killed by an uncaught throwable", th);
                    throw th;
                }
            }
            ScatterAuditBroker.log.trace("#run ConsumerThread interrupted, consumer={}, queue={}", this.consumer, this.queue.queue);
        }

        @Override // java.lang.Thread
        public void start() {
            if (this.running.compareAndSet(false, true)) {
                super.start();
            }
        }

        public void shutdown() {
            if (this.running.compareAndSet(true, false)) {
                interrupt();
                drainQueue();
            }
        }

        public void shutdownNow() {
            if (this.running.compareAndSet(true, false)) {
                interrupt();
            }
            this.queue.clear();
            this.running.set(false);
        }

        private void drainQueue() {
            while (true) {
                List<AuditEntity> poll = this.queue.poll();
                if (poll.isEmpty()) {
                    return;
                }
                ScatterAuditBroker.log.trace("#drainQueue batch={}", poll);
                processBatch(poll);
            }
        }

        private void processBatch(List<AuditEntity> list) {
            try {
                this.consumer.accept(list);
            } catch (RuntimeException e) {
                this.exceptionHandler.accept(e, list);
            }
        }
    }

    public ScatterAuditBroker(EventPublisher eventPublisher, AuditPolicy auditPolicy, AuditEntityRejectionHandler auditEntityRejectionHandler, AuditConsumerExceptionHandler auditConsumerExceptionHandler, int i, int i2) {
        this.eventPublisher = eventPublisher;
        this.defaultConsumerBatchSize = i2;
        this.defaultConsumerBufferSize = i;
        this.auditPolicy = (AuditPolicy) Objects.requireNonNull(auditPolicy);
        this.rejectAuditEntityHandler = (AuditEntityRejectionHandler) Objects.requireNonNull(auditEntityRejectionHandler);
        this.exceptionHandler = (AuditConsumerExceptionHandler) Objects.requireNonNull(auditConsumerExceptionHandler);
    }

    @Override // org.springframework.beans.factory.InitializingBean
    public void afterPropertiesSet() {
        this.eventPublisher.register(this);
    }

    @Override // org.springframework.beans.factory.DisposableBean
    public void destroy() {
        shutdown();
    }

    public synchronized void shutdown() {
        this.consumerRegistry.values().forEach(consumerRegistration -> {
            consumerRegistration.getThread().shutdown();
        });
        waitForTermination();
    }

    public synchronized void shutdownNow() {
        this.consumerRegistry.values().forEach(consumerRegistration -> {
            consumerRegistration.getThread().shutdownNow();
        });
        waitForTermination();
    }

    @EventListener
    public void onAuditConsumerAdded(AuditConsumerAddedEvent auditConsumerAddedEvent) {
        addConsumer(auditConsumerAddedEvent.getConsumerService(), this.defaultConsumerBufferSize, this.defaultConsumerBatchSize);
    }

    @EventListener
    public void onAuditConsumerRemoved(AuditConsumerRemovedEvent auditConsumerRemovedEvent) {
        removeConsumer(auditConsumerRemovedEvent.getConsumerService(), false);
    }

    public void addConsumer(AuditConsumer auditConsumer, int i, int i2) {
        ConsumerQueue consumerQueue = new ConsumerQueue(auditConsumer, new ArrayBlockingQueue(i), i2, list -> {
            this.rejectAuditEntityHandler.reject(this, auditConsumer, list);
        });
        ConsumerThread consumerThread = new ConsumerThread(consumerQueue, auditConsumer, (runtimeException, list2) -> {
            this.exceptionHandler.handle(auditConsumer, runtimeException, list2);
        });
        log.trace("#addConsumer consumer={}, bufferSize={}, batchSize={}, thread={}", auditConsumer, Integer.valueOf(i), Integer.valueOf(i2), consumerThread.getName());
        this.consumerRegistry.put(auditConsumer, new ConsumerRegistration(consumerQueue, consumerThread));
        consumerThread.start();
    }

    public void removeConsumer(AuditConsumer auditConsumer, boolean z) {
        ConsumerRegistration remove = this.consumerRegistry.remove(auditConsumer);
        log.trace("#removeConsumer consumer={}, force={}, registration={}", auditConsumer, Boolean.valueOf(z), remove);
        if (remove != null) {
            if (z) {
                remove.getThread().shutdownNow();
            } else {
                remove.getThread().shutdown();
            }
        }
    }

    @Override // com.atlassian.audit.broker.InternalAuditBroker
    public void audit(@Nonnull AuditEntity auditEntity) {
        Objects.requireNonNull(auditEntity, "entity");
        if (!this.auditPolicy.pass(auditEntity)) {
            log.trace("#audit auditPolicy.pass=false, entity={}", auditEntity);
            return;
        }
        if (log.isTraceEnabled()) {
            log.trace("#audit auditPolicy.pass=true, entity={}, enabledConsumers={}", auditEntity, getIsConsumerEnabledMap());
        }
        this.consumerRegistry.entrySet().stream().filter(entry -> {
            return ((AuditConsumer) entry.getKey()).isEnabled();
        }).forEach(entry2 -> {
            ((ConsumerRegistration) entry2.getValue()).queue.offer(auditEntity);
        });
    }

    private Map<AuditConsumer, Boolean> getIsConsumerEnabledMap() {
        return (Map) this.consumerRegistry.keySet().stream().collect(Collectors.toMap(auditConsumer -> {
            return auditConsumer;
        }, (v0) -> {
            return v0.isEnabled();
        }));
    }

    private void waitForTermination() {
        this.consumerRegistry.values().forEach(consumerRegistration -> {
            try {
                consumerRegistration.thread.join();
            } catch (InterruptedException e) {
            }
        });
    }
}
