/*
 * Decompiled with CFR 0.152.
 */
package org.apache.jackrabbit.oak.segment.azure.queue;

import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.jackrabbit.oak.segment.azure.AzureSegmentArchiveEntry;
import org.apache.jackrabbit.oak.segment.azure.queue.SegmentWriteAction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SegmentWriteQueue
implements Closeable {
    public static final int THREADS = Integer.getInteger("oak.segment.azure.threads", 5);
    private static final int QUEUE_SIZE = Integer.getInteger("oak.segment.org.apache.jackrabbit.oak.segment.azure.queue", 20);
    private static final Logger log = LoggerFactory.getLogger(SegmentWriteQueue.class);
    private final BlockingDeque<SegmentWriteAction> queue;
    private final Map<UUID, SegmentWriteAction> segmentsByUUID;
    private final ExecutorService executor;
    private final ReadWriteLock flushLock;
    private final SegmentConsumer writer;
    private volatile boolean shutdown;
    private final Object brokenMonitor = new Object();
    private volatile boolean broken;

    public SegmentWriteQueue(SegmentConsumer writer) {
        this(writer, QUEUE_SIZE, THREADS);
    }

    SegmentWriteQueue(SegmentConsumer writer, int queueSize, int threadNo) {
        this.writer = writer;
        this.segmentsByUUID = new ConcurrentHashMap<UUID, SegmentWriteAction>();
        this.flushLock = new ReentrantReadWriteLock();
        this.queue = new LinkedBlockingDeque<SegmentWriteAction>(queueSize);
        this.executor = Executors.newFixedThreadPool(threadNo + 1);
        for (int i = 0; i < threadNo; ++i) {
            this.executor.submit(this::mainLoop);
        }
        this.executor.submit(this::emergencyLoop);
    }

    private void mainLoop() {
        while (!this.shutdown) {
            try {
                this.waitWhileBroken();
                if (this.shutdown) break;
                this.consume();
            }
            catch (SegmentConsumeException e) {
                SegmentWriteAction segment = e.segment;
                log.error("Can't persist the segment {}", (Object)segment.getUuid(), (Object)e.getCause());
                try {
                    this.queue.put(segment);
                }
                catch (InterruptedException e1) {
                    log.error("Can't re-add the segment {} to the queue. It'll be dropped.", (Object)segment.getUuid(), (Object)e1);
                }
            }
        }
    }

    private void consume() throws SegmentConsumeException {
        SegmentWriteAction segment = null;
        try {
            segment = this.queue.poll(100L, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            log.error("Poll from queue interrupted", (Throwable)e);
        }
        if (segment != null) {
            this.consume(segment);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void consume(SegmentWriteAction segment) throws SegmentConsumeException {
        try {
            segment.passTo(this.writer);
        }
        catch (IOException e) {
            this.setBroken(true);
            throw new SegmentConsumeException(segment, e);
        }
        Map<UUID, SegmentWriteAction> map = this.segmentsByUUID;
        synchronized (map) {
            this.segmentsByUUID.remove(segment.getUuid());
            this.segmentsByUUID.notifyAll();
        }
        this.setBroken(false);
    }

    private void emergencyLoop() {
        while (!this.shutdown) {
            this.waitUntilBroken();
            if (this.shutdown) break;
            boolean success = false;
            SegmentWriteAction segmentToRetry = null;
            do {
                try {
                    if (segmentToRetry == null) {
                        this.consume();
                    } else {
                        this.consume(segmentToRetry);
                    }
                    success = true;
                }
                catch (SegmentConsumeException e) {
                    segmentToRetry = e.segment;
                    log.error("Can't persist the segment {}", (Object)segmentToRetry.getUuid(), (Object)e.getCause());
                    try {
                        Thread.sleep(1000L);
                    }
                    catch (InterruptedException e1) {
                        log.warn("Interrupted", (Throwable)e);
                    }
                    if (!this.shutdown) continue;
                    log.error("Shutdown initiated. The segment {} will be dropped.", (Object)segmentToRetry.getUuid());
                }
            } while (!success && !this.shutdown);
        }
    }

    public void addToQueue(AzureSegmentArchiveEntry indexEntry, byte[] data, int offset, int size) throws IOException {
        this.waitWhileBroken();
        if (this.shutdown) {
            throw new IllegalStateException("Can't accept the new segment - shutdown in progress");
        }
        SegmentWriteAction action = new SegmentWriteAction(indexEntry, data, offset, size);
        this.flushLock.readLock().lock();
        try {
            this.segmentsByUUID.put(action.getUuid(), action);
            if (!this.queue.offer(action, 1L, TimeUnit.MINUTES)) {
                this.segmentsByUUID.remove(action.getUuid());
                throw new IOException("Can't add segment to the queue");
            }
        }
        catch (InterruptedException e) {
            throw new IOException(e);
        }
        finally {
            this.flushLock.readLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void flush() throws IOException {
        this.flushLock.writeLock().lock();
        try {
            Map<UUID, SegmentWriteAction> map = this.segmentsByUUID;
            synchronized (map) {
                long start = System.currentTimeMillis();
                while (!this.segmentsByUUID.isEmpty()) {
                    this.segmentsByUUID.wait(100L);
                    if (System.currentTimeMillis() - start <= TimeUnit.MINUTES.toMillis(1L)) continue;
                    log.error("Can't flush the queue in 1 minute. Queue: {}. Segment map: {}", this.queue, this.segmentsByUUID);
                    start = System.currentTimeMillis();
                }
            }
        }
        catch (InterruptedException e) {
            throw new IOException(e);
        }
        finally {
            this.flushLock.writeLock().unlock();
        }
    }

    public SegmentWriteAction read(UUID id) {
        return this.segmentsByUUID.get(id);
    }

    @Override
    public void close() throws IOException {
        this.shutdown = true;
        try {
            this.executor.shutdown();
            if (!this.executor.awaitTermination(1L, TimeUnit.MINUTES)) {
                throw new IOException("The write wasn't able to shut down clearly");
            }
        }
        catch (InterruptedException e) {
            throw new IOException(e);
        }
    }

    public boolean isEmpty() {
        return this.segmentsByUUID.isEmpty();
    }

    boolean isBroken() {
        return this.broken;
    }

    int getSize() {
        return this.queue.size();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void setBroken(boolean broken) {
        Object object = this.brokenMonitor;
        synchronized (object) {
            this.broken = broken;
            this.brokenMonitor.notifyAll();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void waitWhileBroken() {
        if (!this.broken) {
            return;
        }
        Object object = this.brokenMonitor;
        synchronized (object) {
            while (this.broken && !this.shutdown) {
                try {
                    this.brokenMonitor.wait(100L);
                }
                catch (InterruptedException e) {
                    log.warn("Interrupted", (Throwable)e);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void waitUntilBroken() {
        if (this.broken) {
            return;
        }
        Object object = this.brokenMonitor;
        synchronized (object) {
            while (!this.broken && !this.shutdown) {
                try {
                    this.brokenMonitor.wait(100L);
                }
                catch (InterruptedException e) {
                    log.warn("Interrupted", (Throwable)e);
                }
            }
        }
    }

    public static class SegmentConsumeException
    extends Exception {
        private final SegmentWriteAction segment;

        public SegmentConsumeException(SegmentWriteAction segment, IOException cause) {
            super(cause);
            this.segment = segment;
        }
    }

    public static interface SegmentConsumer {
        public void consume(AzureSegmentArchiveEntry var1, byte[] var2, int var3, int var4) throws IOException;
    }
}

