package net.openhft.chronicle.engine.pubsub;

import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import net.openhft.chronicle.bytes.BytesStore;
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.core.threads.EventLoop;
import net.openhft.chronicle.core.threads.InvalidEventHandlerException;
import net.openhft.chronicle.engine.api.pubsub.InvalidSubscriberException;
import net.openhft.chronicle.engine.api.pubsub.Subscriber;
import net.openhft.chronicle.engine.api.tree.Asset;
import net.openhft.chronicle.engine.api.tree.AssetNotFoundException;
import net.openhft.chronicle.engine.api.tree.RequestContext;
import net.openhft.chronicle.engine.query.Filter;
import net.openhft.chronicle.engine.tree.ChronicleQueueView;
import net.openhft.chronicle.engine.tree.QueueView;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/chronicle-engine-1.14.22.jar:net/openhft/chronicle/engine/pubsub/QueueSimpleSubscription.class */
public class QueueSimpleSubscription<E> implements SimpleSubscription<E> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) QueueSimpleSubscription.class);
    private final Map<Subscriber<E>, AtomicBoolean> subscribers = new ConcurrentHashMap();
    private final Function<Object, E> valueReader;

    @NotNull
    private final ChronicleQueueView<?, E> chronicleQueue;

    @NotNull
    private final EventLoop eventLoop;
    private final String topic;

    public QueueSimpleSubscription(Function<Object, E> function, @NotNull Asset asset, String str) {
        this.valueReader = function;
        this.topic = str;
        this.chronicleQueue = (ChronicleQueueView) asset.acquireView(QueueView.class);
        this.eventLoop = (EventLoop) asset.acquireView(EventLoop.class);
    }

    @Override // net.openhft.chronicle.engine.api.pubsub.SubscriptionCollection
    public void registerSubscriber(@NotNull RequestContext requestContext, @NotNull Subscriber<E> subscriber, @NotNull Filter<E> filter) {
        registerSubscriber(false, 0, (Subscriber) subscriber);
    }

    public void registerSubscriber(boolean z, int i, @NotNull Subscriber<E> subscriber) throws AssetNotFoundException {
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        this.subscribers.put(subscriber, atomicBoolean);
        QueueView.Tailer<?, E> tailer = this.chronicleQueue.tailer();
        this.eventLoop.addHandler(() -> {
            if (atomicBoolean.get()) {
                throw new InvalidEventHandlerException();
            }
            QueueView.Excerpt read = tailer.read();
            if (read == null) {
                return false;
            }
            try {
                if (!this.topic.equals(read.topic().toString())) {
                    return true;
                }
                subscriber.onMessage(read.message());
                return true;
            } catch (InvalidSubscriberException e) {
                atomicBoolean.set(true);
                return true;
            }
        });
    }

    @Override // net.openhft.chronicle.engine.api.pubsub.SubscriptionCollection
    public void unregisterSubscriber(Subscriber subscriber) {
        AtomicBoolean remove = this.subscribers.remove(subscriber);
        if (remove != null) {
            remove.set(true);
        }
    }

    @Override // net.openhft.chronicle.engine.api.pubsub.SubscriptionCollection
    public int keySubscriberCount() {
        return subscriberCount();
    }

    @Override // net.openhft.chronicle.engine.api.pubsub.SubscriptionCollection
    public int entrySubscriberCount() {
        return 0;
    }

    @Override // net.openhft.chronicle.engine.api.pubsub.SubscriptionCollection
    public int topicSubscriberCount() {
        return 0;
    }

    @Override // net.openhft.chronicle.engine.api.pubsub.SubscriptionCollection
    public int subscriberCount() {
        return this.subscribers.size();
    }

    @Override // net.openhft.chronicle.engine.pubsub.SimpleSubscription
    public void notifyMessage(Object obj) {
        try {
            Object apply = obj instanceof BytesStore ? this.valueReader.apply(obj) : obj;
        } catch (ClassCastException e) {
            if (LOG.isDebugEnabled()) {
                Jvm.debug().on(getClass(), "Is " + this.valueReader + " the correct ValueReader?");
            }
            throw e;
        }
    }

    @Override // net.openhft.chronicle.core.io.Closeable, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        Iterator<Subscriber<E>> it = this.subscribers.keySet().iterator();
        while (it.hasNext()) {
            try {
                it.next().onEndOfSubscription();
            } catch (Exception e) {
                Jvm.debug().on(getClass(), e);
            }
        }
    }
}
