/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.dcp.highlevel.internal;

import com.couchbase.client.dcp.highlevel.DatabaseChangeListener;
import com.couchbase.client.dcp.highlevel.FlowControlMode;
import com.couchbase.client.dcp.highlevel.StreamFailure;
import com.couchbase.client.dcp.highlevel.internal.BlockingQueueConsumerOps;
import com.couchbase.client.dcp.highlevel.internal.DatabaseChangeConsumerOps;
import com.couchbase.client.dcp.highlevel.internal.DatabaseChangeEvent;
import com.couchbase.client.dcp.highlevel.internal.EventDispatcher;
import com.couchbase.client.dcp.highlevel.internal.SimpleThreadFactory;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsyncEventDispatcher
implements EventDispatcher {
    private static final Logger log = LoggerFactory.getLogger(AsyncEventDispatcher.class);
    private final BlockingDeque<DatabaseChangeEvent> queue = new LinkedBlockingDeque<DatabaseChangeEvent>();
    private final BlockingQueueConsumerOps<DatabaseChangeEvent> queueOps;
    private volatile boolean shutdown;
    private static final ThreadFactory threadFactory = new SimpleThreadFactory("dcp-event-dispatch-", t -> t.setDaemon(true));
    private final Thread thread;

    public AsyncEventDispatcher(FlowControlMode flowControlMode, DatabaseChangeListener consumer) {
        Objects.requireNonNull(consumer);
        this.queueOps = new DatabaseChangeConsumerOps(this.queue, flowControlMode);
        this.thread = threadFactory.newThread(() -> {
            while (!this.shutdown) {
                try {
                    this.queueOps.take().dispatch(consumer);
                }
                catch (GracefulShutdownPoisonPill e) {
                    log.info("High-level event dispatcher terminated due to graceful shutdown request.");
                    return;
                }
                catch (InterruptedException e) {
                    log.info("High-level event dispatcher terminated due to interruption.");
                    return;
                }
                catch (Throwable t) {
                    try {
                        log.warn("Event listener threw exception.", t);
                        consumer.onFailure(new StreamFailure(-1, t));
                    }
                    catch (Throwable anotherFineMess) {
                        log.error("Event listener error handler threw exception.", anotherFineMess);
                    }
                }
            }
        });
        this.thread.start();
    }

    @Override
    public void dispatch(DatabaseChangeEvent event) {
        this.queue.add(event);
    }

    @Override
    public void shutdownNow() {
        this.shutdown = true;
        this.thread.interrupt();
    }

    @Override
    public void gracefulShutdown() {
        this.queue.addFirst(AsyncEventDispatcher.poisonPill());
    }

    @Override
    public boolean awaitTermination(Duration timeout) throws InterruptedException {
        this.thread.join(timeout.toMillis());
        return !this.thread.isAlive();
    }

    private static DatabaseChangeEvent poisonPill() {
        return new DatabaseChangeEvent(){

            @Override
            public void dispatch(DatabaseChangeListener listener) {
                throw new GracefulShutdownPoisonPill();
            }

            @Override
            public int getVbucket() {
                return -1;
            }
        };
    }

    private static class GracefulShutdownPoisonPill
    extends RuntimeException {
        private GracefulShutdownPoisonPill() {
        }
    }
}

