/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.stream.impl;

import java.lang.invoke.MethodHandles;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.BaseStream;
import java.util.stream.Stream;
import org.infinispan.commons.util.CloseableIterator;
import org.infinispan.commons.util.Closeables;
import org.infinispan.commons.util.Util;
import org.infinispan.commons.util.concurrent.ConcurrentHashSet;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.factories.annotations.Stop;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
import org.infinispan.remoting.transport.Address;
import org.infinispan.stream.impl.intops.IntermediateOperation;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

@Listener(observation=Listener.Observation.POST)
public class IteratorHandler {
    private static final Log log = LogFactory.getLog(MethodHandles.lookup().lookupClass());
    private static final boolean trace = log.isTraceEnabled();
    private final Map<Object, CloseableIterator<?>> currentRequests = new ConcurrentHashMap();
    private final Map<Address, Set<Object>> ownerRequests = new ConcurrentHashMap<Address, Set<Object>>();
    @Inject
    private EmbeddedCacheManager manager;

    @ViewChanged
    public void viewChange(ViewChangedEvent event) {
        List<Address> newMembers = event.getNewMembers();
        Iterator<Map.Entry<Address, Set<Object>>> iter = this.ownerRequests.entrySet().iterator();
        while (iter.hasNext()) {
            Map.Entry<Address, Set<Object>> entry = iter.next();
            Address owner = entry.getKey();
            if (newMembers.contains(owner)) continue;
            Set<Object> ids = entry.getValue();
            if (!ids.isEmpty()) {
                log.tracef("View changed and no longer contains %s, closing %s iterators", owner, ids);
                ids.forEach(this::closeIterator);
            }
            iter.remove();
        }
    }

    @Start
    public void start() {
        this.manager.addListener(this);
    }

    @Stop
    public void stop() {
        this.manager.removeListener(this);
    }

    public <K, V, E> OnCloseIterator<E> start(Address origin, Supplier<Stream<CacheEntry<K, V>>> streamSupplier, Iterable<IntermediateOperation> intOps, Object requestId) {
        if (trace) {
            log.tracef("Iterator requested from %s using requestId %s", origin, requestId);
        }
        BaseStream stream = streamSupplier.get();
        for (IntermediateOperation intOp : intOps) {
            stream = intOp.perform(stream);
        }
        IteratorCloser iter = new IteratorCloser(Closeables.iterator((BaseStream)stream));
        iter.onClose(() -> this.closeIterator(origin, requestId));
        this.currentRequests.put(requestId, iter);
        Set ids = this.ownerRequests.computeIfAbsent(origin, k -> new ConcurrentHashSet());
        ids.add(requestId);
        return iter;
    }

    public <E> CloseableIterator<E> getIterator(Object requestId) {
        CloseableIterator<?> closeableIterator = this.currentRequests.get(requestId);
        if (closeableIterator == null) {
            throw new IllegalStateException("Iterator for requestId " + requestId + " doesn't exist!");
        }
        if (trace) {
            log.tracef("Iterator retrieved using requestId %s", requestId);
        }
        return closeableIterator;
    }

    public int openIterators() {
        return this.currentRequests.size();
    }

    public void closeIterator(Address origin, Object requestId) {
        Set<Object> ids = this.ownerRequests.get(origin);
        if (ids != null) {
            ids.remove(requestId);
        }
        this.closeIterator(requestId);
    }

    private void closeIterator(Object requestId) {
        CloseableIterator<?> closeableIterator = this.currentRequests.remove(requestId);
        if (closeableIterator != null) {
            if (trace) {
                log.tracef("Closing iterator using requestId %s", requestId);
            }
            closeableIterator.close();
        }
    }

    private class IteratorCloser<E>
    implements OnCloseIterator<E> {
        private final CloseableIterator<E> closeableIterator;
        private volatile Runnable closeRunnable;

        IteratorCloser(CloseableIterator<E> closeableIterator) {
            this.closeableIterator = closeableIterator;
        }

        public boolean hasNext() {
            boolean hasNext = this.closeableIterator.hasNext();
            if (!hasNext) {
                this.close();
            }
            return hasNext;
        }

        public E next() {
            this.hasNext();
            return (E)this.closeableIterator.next();
        }

        public void forEachRemaining(Consumer<? super E> action) {
            this.closeableIterator.forEachRemaining(action);
            this.close();
        }

        public void close() {
            this.closeableIterator.close();
            Runnable onClose = this.closeRunnable;
            if (onClose != null) {
                this.closeRunnable = null;
                onClose.run();
            }
        }

        @Override
        public IteratorCloser<E> onClose(Runnable closeHandler) {
            this.closeRunnable = this.closeRunnable == null ? closeHandler : Util.composeWithExceptions((Runnable)this.closeRunnable, (Runnable)closeHandler);
            return this;
        }
    }

    public static interface OnCloseIterator<E>
    extends CloseableIterator<E> {
        public OnCloseIterator<E> onClose(Runnable var1);
    }
}

