package com.couchbase.client.core.cnc;

import com.couchbase.client.core.cnc.EventBus;
import com.couchbase.client.core.deps.org.jctools.queues.QueueFactory;
import com.couchbase.client.core.deps.org.jctools.queues.spec.ConcurrentQueueSpec;
import java.io.PrintStream;
import java.time.Duration;
import java.util.Iterator;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;

/* loaded from: input_file:com/couchbase/client/core/cnc/DefaultEventBus.class */
public class DefaultEventBus implements EventBus {
    private static final int DEFAULT_QUEUE_CAPACITY = 16384;
    private static final Duration DEFAULT_IDLE_SLEEP_DURATION = Duration.ofMillis(100);
    private final CopyOnWriteArraySet<Consumer<Event>> subscribers;
    private final Queue<Event> eventQueue;
    private final AtomicBoolean running;
    private final PrintStream errorLogging;
    private final String threadName;
    private final Duration idleSleepDuration;
    private final Scheduler scheduler;
    private volatile Thread runningThread;

    /* loaded from: input_file:com/couchbase/client/core/cnc/DefaultEventBus$Builder.class */
    public static class Builder {
        final Scheduler scheduler;
        int queueCapacity = 16384;
        Optional<PrintStream> errorLogging = Optional.of(System.err);
        String threadName = "cb-events";
        Duration idleSleepDuration = DefaultEventBus.DEFAULT_IDLE_SLEEP_DURATION;

        Builder(Scheduler scheduler) {
            this.scheduler = scheduler;
        }

        public Builder queueCapacity(int i) {
            this.queueCapacity = i;
            return this;
        }

        public Builder errorLogging(Optional<PrintStream> optional) {
            this.errorLogging = optional;
            return this;
        }

        public Builder threadName(String str) {
            this.threadName = str;
            return this;
        }

        public Builder idleSleepDuration(Duration duration) {
            this.idleSleepDuration = duration;
            return this;
        }

        public DefaultEventBus build() {
            return new DefaultEventBus(this);
        }
    }

    public static Builder builder(Scheduler scheduler) {
        return new Builder(scheduler);
    }

    public static DefaultEventBus create(Scheduler scheduler) {
        return builder(scheduler).build();
    }

    private DefaultEventBus(Builder builder) {
        this.scheduler = builder.scheduler;
        this.subscribers = new CopyOnWriteArraySet<>();
        this.running = new AtomicBoolean(false);
        this.eventQueue = QueueFactory.newQueue(ConcurrentQueueSpec.createBoundedMpsc(builder.queueCapacity));
        this.errorLogging = builder.errorLogging.orElse(null);
        this.threadName = builder.threadName;
        this.idleSleepDuration = builder.idleSleepDuration;
    }

    @Override // com.couchbase.client.core.cnc.EventBus
    public EventSubscription subscribe(Consumer<Event> consumer) {
        this.subscribers.add(consumer);
        return new EventSubscription(this, consumer);
    }

    @Override // com.couchbase.client.core.cnc.EventBus
    public void unsubscribe(EventSubscription eventSubscription) {
        this.subscribers.remove(eventSubscription.consumer());
    }

    @Override // com.couchbase.client.core.cnc.EventBus
    public EventBus.PublishResult publish(Event event) {
        if (!isRunning()) {
            return EventBus.PublishResult.SHUTDOWN;
        }
        if (this.eventQueue.offer(event)) {
            return EventBus.PublishResult.SUCCESS;
        }
        if (this.errorLogging != null) {
            this.errorLogging.println("Could not publish Event because the queue is full. " + event);
        }
        return EventBus.PublishResult.OVERLOADED;
    }

    @Override // com.couchbase.client.core.cnc.EventBus
    public Mono<Void> start() {
        return Mono.defer(() -> {
            if (this.running.compareAndSet(false, true)) {
                this.runningThread = new Thread(() -> {
                    long millis = this.idleSleepDuration.toMillis();
                    while (true) {
                        if (!isRunning() && this.eventQueue.isEmpty()) {
                            return;
                        }
                        Event poll = this.eventQueue.poll();
                        while (true) {
                            Event event = poll;
                            if (event != null) {
                                Iterator<Consumer<Event>> it = this.subscribers.iterator();
                                while (it.hasNext()) {
                                    try {
                                        it.next().accept(event);
                                    } catch (Throwable th) {
                                        if (this.errorLogging != null) {
                                            this.errorLogging.println("Exception caught in EventBus Consumer: " + th);
                                            th.printStackTrace();
                                        }
                                    }
                                }
                                poll = this.eventQueue.poll();
                            } else {
                                try {
                                    break;
                                } catch (InterruptedException e) {
                                }
                            }
                        }
                        if (isRunning()) {
                            Thread.sleep(millis);
                        }
                    }
                });
                this.runningThread.setDaemon(true);
                this.runningThread.setName(this.threadName);
                this.runningThread.start();
            }
            return Mono.empty();
        });
    }

    @Override // com.couchbase.client.core.cnc.EventBus
    public Mono<Void> stop(Duration duration) {
        return Mono.defer(() -> {
            if (this.running.compareAndSet(true, false)) {
                this.runningThread.interrupt();
            }
            return Mono.empty();
        }).then(Flux.interval(Duration.ofMillis(10L), this.scheduler).takeUntil(l -> {
            return !this.runningThread.isAlive();
        }).then()).timeout(duration, this.scheduler);
    }

    boolean isRunning() {
        return this.running.get();
    }

    boolean hasSubscribers() {
        return !this.subscribers.isEmpty();
    }
}
