/*
 * Decompiled with CFR 0.152.
 */
package com.rabbitmq.client.amqp.impl;

import com.rabbitmq.client.amqp.AmqpException;
import java.time.Duration;
import java.util.HashMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class EventLoop
implements AutoCloseable {
    private static final Duration TIMEOUT = Duration.ofSeconds(60L);
    private static final Logger LOGGER = LoggerFactory.getLogger(EventLoop.class);
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final Future<?> loop;
    private final AtomicReference<Thread> loopThread = new AtomicReference();
    private final BlockingQueue<ClientTaskContext<Object>> taskQueue = new LinkedBlockingQueue<ClientTaskContext<Object>>(1000);
    private static final AtomicLong CLIENT_ID_SEQUENCE = new AtomicLong();

    EventLoop(ExecutorService executorService) {
        CountDownLatch loopThreadSetLatch = new CountDownLatch(1);
        this.loop = executorService.submit(() -> {
            this.loopThread.set(Thread.currentThread());
            loopThreadSetLatch.countDown();
            HashMap states = new HashMap();
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    ClientTaskContext<Object> context = this.taskQueue.poll(1000L, TimeUnit.MILLISECONDS);
                    if (context == null) continue;
                    Object state = states.get(context.client.id);
                    if ((state = context.task.apply(state)) == null) {
                        states.remove(context.client.id);
                        continue;
                    }
                    states.put(context.client.id, state);
                }
                catch (InterruptedException e) {
                    LOGGER.debug("Event loop has been interrupted.");
                    return;
                }
                catch (Exception e) {
                    LOGGER.warn("Error during processing of topology recording task", (Throwable)e);
                }
            }
        });
        try {
            if (!loopThreadSetLatch.await(10L, TimeUnit.SECONDS)) {
                throw new IllegalStateException("Recording topology loop could not start");
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new AmqpException("Error while creating recording topology listener", e);
        }
    }

    <S> Client<S> register(Supplier<S> stateSupplier) {
        Client client = new Client(this);
        this.submit(client, nullState -> {
            Object state = stateSupplier.get();
            client.stateReference.set(state);
            return state;
        });
        return client;
    }

    private <ST> void submit(Client<ST> client, UnaryOperator<ST> task) {
        if (this.closed.get()) {
            throw new IllegalStateException("Event loop is closed");
        }
        if (Thread.currentThread().equals(this.loopThread.get())) {
            task.apply(client.stateReference.get());
        } else {
            CountDownLatch latch = new CountDownLatch(1);
            try {
                ClientTaskContext<ST> context = new ClientTaskContext<ST>(client, state -> {
                    try {
                        Object r = task.apply(state);
                        return r;
                    }
                    catch (Exception e) {
                        LOGGER.info("Error during task", (Throwable)e);
                    }
                    finally {
                        latch.countDown();
                    }
                    return null;
                });
                boolean added = this.taskQueue.offer(context, TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
                if (!added) {
                    throw new AmqpException("Enqueueing of task timed out", new Object[0]);
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new AmqpException("Task enqueueing has been interrupted", e);
            }
            try {
                boolean completed = latch.await(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
                if (!completed) {
                    LOGGER.warn("Event loop task did not complete in {} second(s), queue size is {}", (Object)TIMEOUT.toSeconds(), (Object)this.taskQueue.size());
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new AmqpException("Topology task processing has been interrupted", e);
            }
        }
    }

    @Override
    public void close() {
        if (this.closed.compareAndSet(false, true)) {
            this.loop.cancel(true);
        }
    }

    private static class ClientTaskContext<S> {
        private final Client<S> client;
        private final UnaryOperator<S> task;

        private ClientTaskContext(Client<S> client, UnaryOperator<S> task) {
            this.client = client;
            this.task = task;
        }
    }

    static class Client<S>
    implements AutoCloseable {
        private final long id;
        private final AtomicReference<S> stateReference = new AtomicReference();
        private final EventLoop loop;
        private final AtomicBoolean closed = new AtomicBoolean(false);

        private Client(EventLoop loop) {
            this.id = CLIENT_ID_SEQUENCE.getAndIncrement();
            this.loop = loop;
        }

        void submit(Consumer<S> task) {
            if (this.closed.get()) {
                throw new IllegalStateException("Event loop is closed");
            }
            this.loop.submit(this, s -> {
                task.accept(s);
                return s;
            });
        }

        @Override
        public void close() {
            if (this.closed.compareAndSet(false, true)) {
                this.loop.submit(this, s -> null);
            }
        }

        S state() {
            return this.stateReference.get();
        }
    }
}

