/*
 * Decompiled with CFR 0.152.
 */
package net.openhft.chronicle.tcp;

import java.io.EOFException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import net.openhft.chronicle.Chronicle;
import net.openhft.chronicle.Excerpt;
import net.openhft.chronicle.ExcerptAppender;
import net.openhft.chronicle.ExcerptCommon;
import net.openhft.chronicle.ExcerptTailer;
import net.openhft.chronicle.IndexedChronicle;
import net.openhft.chronicle.VanillaChronicle;
import net.openhft.chronicle.tcp.ChronicleSourceConfig;
import net.openhft.chronicle.tcp.ChronicleSourceSocketHandler;
import net.openhft.chronicle.tcp.ChronicleTcp;
import net.openhft.chronicle.tools.WrappedExcerpt;
import net.openhft.lang.model.constraints.NotNull;
import net.openhft.lang.thread.NamedThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ChronicleSource
implements Chronicle {
    @NotNull
    private final Chronicle chronicle;
    private final ChronicleSourceConfig config;
    private final ServerSocketChannel server;
    private final Selector selector;
    @NotNull
    private final String name;
    @NotNull
    private final ExecutorService service;
    private final Logger logger;
    private final Object notifier = new Object();
    private static final long busyWaitTimeNS = 100000L;
    private volatile boolean closed = false;
    private long lastUnpausedNS = 0L;
    private int maxMessages;

    public ChronicleSource(@NotNull Chronicle chronicle, int port) throws IOException {
        this(chronicle, ChronicleSourceConfig.DEFAULT, new InetSocketAddress(port));
    }

    public ChronicleSource(@NotNull Chronicle chronicle, @NotNull InetSocketAddress address) throws IOException {
        this(chronicle, ChronicleSourceConfig.DEFAULT, address);
    }

    public ChronicleSource(@NotNull Chronicle chronicle, @NotNull ChronicleSourceConfig config, int port) throws IOException {
        this(chronicle, config, new InetSocketAddress(port));
    }

    public ChronicleSource(@NotNull Chronicle chronicle, @NotNull ChronicleSourceConfig config, @NotNull InetSocketAddress address) throws IOException {
        this.chronicle = chronicle;
        this.config = config;
        this.server = ServerSocketChannel.open();
        this.server.socket().setReuseAddress(true);
        this.server.socket().bind(address);
        this.server.configureBlocking(false);
        this.selector = Selector.open();
        this.server.register(this.selector, 16);
        this.name = chronicle.name() + "@" + address.getPort();
        this.logger = LoggerFactory.getLogger((String)(this.getClass().getName() + "." + this.name));
        this.service = Executors.newCachedThreadPool((ThreadFactory)new NamedThreadFactory(this.name, Boolean.valueOf(true)));
        this.service.execute(new Acceptor());
        this.maxMessages = config.maxMessages();
    }

    @Override
    public String name() {
        return this.chronicle.name();
    }

    public int getLocalPort() {
        return this.server.socket().getLocalPort();
    }

    @Override
    public long lastWrittenIndex() {
        return this.chronicle.lastWrittenIndex();
    }

    @Override
    public long size() {
        return this.chronicle.size();
    }

    @Override
    public void clear() {
        this.chronicle.clear();
    }

    @Override
    public void close() {
        this.closed = true;
        try {
            this.chronicle.close();
            this.server.close();
            this.service.shutdownNow();
            this.service.awaitTermination(10000L, TimeUnit.MILLISECONDS);
        }
        catch (IOException e) {
            this.logger.warn("Error closing server port", (Throwable)e);
        }
        catch (InterruptedException ie) {
            this.logger.warn("Error shutting down service threads", (Throwable)ie);
        }
    }

    @Override
    public Excerpt createExcerpt() throws IOException {
        return new SourceExcerpt(this.chronicle.createExcerpt());
    }

    @Override
    public ExcerptTailer createTailer() throws IOException {
        return new SourceExcerpt(this.chronicle.createTailer());
    }

    @Override
    public ExcerptAppender createAppender() throws IOException {
        return new SourceExcerpt(this.chronicle.createAppender());
    }

    protected void checkCounts(int min, int max) {
        if (this.chronicle instanceof VanillaChronicle) {
            ((VanillaChronicle)this.chronicle).checkCounts(min, max);
        }
    }

    protected void pauseReset() {
        this.lastUnpausedNS = System.nanoTime();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void pause() {
        if (this.lastUnpausedNS + 100000L > System.nanoTime()) {
            return;
        }
        try {
            Object object = this.notifier;
            synchronized (object) {
                this.notifier.wait(this.config.heartbeatInterval() / 2L);
            }
        }
        catch (InterruptedException ie) {
            this.logger.warn("Interrupt ignored");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void wakeSessionHandlers() {
        Object object = this.notifier;
        synchronized (object) {
            this.notifier.notifyAll();
        }
    }

    protected Runnable createSocketHandler(SocketChannel channel) throws IOException {
        return this.chronicle instanceof IndexedChronicle ? new IndexedSocketHandler(channel) : new VanillaSocketHandler(channel);
    }

    protected Chronicle chronicle() {
        return this.chronicle;
    }

    protected ChronicleSourceConfig config() {
        return this.config;
    }

    protected boolean closed() {
        return this.closed;
    }

    private final class VanillaSocketHandler
    extends ChronicleSourceSocketHandler {
        private boolean nextIndex;
        private long index;

        public VanillaSocketHandler(SocketChannel socket) throws IOException {
            super(ChronicleSource.this, socket, ChronicleSource.this.logger);
            this.nextIndex = true;
            this.index = -1L;
        }

        @Override
        protected boolean handleSubscribe(SelectionKey key) throws IOException {
            this.index = this.command.data();
            if (this.index == -1L) {
                this.nextIndex = true;
                this.tailer = this.tailer.toStart();
                this.index = -1L;
            } else if (this.index == -2L) {
                this.nextIndex = false;
                this.tailer = this.tailer.toEnd();
                this.index = this.tailer.index();
            } else {
                this.nextIndex = false;
            }
            this.sendSizeAndIndex(-126, this.index);
            key.interestOps(5);
            return false;
        }

        @Override
        protected boolean publishData() throws IOException {
            long remaining;
            if (this.nextIndex) {
                if (!this.tailer.nextIndex()) {
                    ChronicleSource.this.pause();
                    if (!ChronicleSource.this.closed && !this.tailer.nextIndex()) {
                        return false;
                    }
                }
            } else {
                if (!this.tailer.index(this.index)) {
                    return false;
                }
                this.nextIndex = true;
            }
            ChronicleSource.this.pauseReset();
            long size = this.tailer.capacity();
            this.buffer.clear();
            this.buffer.putInt((int)size);
            this.buffer.putLong(this.tailer.index());
            if (size > (long)(this.buffer.capacity() / 2)) {
                for (remaining = size + 12L; remaining > 0L; remaining -= (long)this.buffer.remaining()) {
                    int size2 = (int)Math.min(remaining, (long)this.buffer.capacity());
                    this.buffer.limit(size2);
                    this.tailer.read(this.buffer);
                    this.buffer.flip();
                    ChronicleTcp.writeAll(this.socket, this.buffer);
                }
            } else {
                this.buffer.limit((int)remaining);
                this.tailer.read(this.buffer);
                int count = 1;
                while (count++ < ChronicleSource.this.maxMessages) {
                    if (!this.tailer.nextIndex()) continue;
                    if (this.tailer.wasPadding()) {
                        throw new AssertionError((Object)"Entry should not be padding - remove");
                    }
                    if (this.tailer.capacity() + 12L >= (long)(this.buffer.capacity() - this.buffer.position())) break;
                    int size2 = (int)this.tailer.capacity();
                    this.buffer.limit(this.buffer.position() + size2 + 12);
                    this.buffer.putInt(size2);
                    this.buffer.putLong(this.tailer.index());
                    this.tailer.read(this.buffer);
                }
                this.buffer.flip();
                ChronicleTcp.writeAll(this.socket, this.buffer);
            }
            if (this.buffer.remaining() > 0) {
                throw new EOFException("Failed to send index=" + this.tailer.index());
            }
            return true;
        }
    }

    private final class IndexedSocketHandler
    extends ChronicleSourceSocketHandler {
        private long index;

        public IndexedSocketHandler(SocketChannel socket) throws IOException {
            super(ChronicleSource.this, socket, ChronicleSource.this.logger);
            this.index = -1L;
        }

        @Override
        protected boolean handleSubscribe(SelectionKey key) throws IOException {
            this.index = this.command.data();
            if (this.index == -1L) {
                this.index = -1L;
            } else if (this.index == -2L) {
                this.index = this.tailer.toEnd().index();
            }
            this.sendSizeAndIndex(-126, this.index);
            key.interestOps(5);
            return false;
        }

        @Override
        protected boolean publishData() throws IOException {
            long remaining;
            if (!this.tailer.index(this.index)) {
                if (this.tailer.wasPadding()) {
                    if (this.index >= 0L) {
                        this.sendSizeAndIndex(-127, this.tailer.index());
                    }
                    ++this.index;
                }
                ChronicleSource.this.pause();
                if (!ChronicleSource.this.closed && !this.tailer.index(this.index)) {
                    return false;
                }
            }
            ChronicleSource.this.pauseReset();
            long size = this.tailer.capacity();
            this.buffer.clear();
            this.buffer.putInt((int)size);
            this.buffer.putLong(this.tailer.index());
            if (size > (long)(this.buffer.capacity() / 2)) {
                for (remaining = size + 12L; remaining > 0L; remaining -= (long)this.buffer.remaining()) {
                    int size2 = (int)Math.min(remaining, (long)this.buffer.capacity());
                    this.buffer.limit(size2);
                    this.tailer.read(this.buffer);
                    this.buffer.flip();
                    ChronicleTcp.writeAll(this.socket, this.buffer);
                }
            } else {
                this.buffer.limit((int)remaining);
                this.tailer.read(this.buffer);
                int count = 1;
                while (this.tailer.index(this.index + 1L) && count++ < ChronicleSource.this.maxMessages) {
                    if (!this.tailer.wasPadding()) {
                        if (this.tailer.capacity() + 12L >= (long)(this.buffer.capacity() - this.buffer.position())) break;
                        int size2 = (int)this.tailer.capacity();
                        this.buffer.limit(this.buffer.position() + size2 + 12);
                        this.buffer.putInt(size2);
                        this.buffer.putLong(this.tailer.index());
                        this.tailer.read(this.buffer);
                        ++this.index;
                        continue;
                    }
                    ++this.index;
                }
                this.buffer.flip();
                ChronicleTcp.writeAll(this.socket, this.buffer);
            }
            if (this.buffer.remaining() > 0) {
                throw new EOFException("Failed to send index=" + this.index);
            }
            ++this.index;
            return true;
        }
    }

    private final class SourceExcerpt
    extends WrappedExcerpt {
        public SourceExcerpt(ExcerptCommon excerptCommon) {
            super(excerptCommon);
        }

        @Override
        public void finish() {
            super.finish();
            ChronicleSource.this.wakeSessionHandlers();
        }
    }

    private final class Acceptor
    implements Runnable {
        private Acceptor() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            Thread.currentThread().setName(ChronicleSource.this.name + "-acceptor");
            try {
                while (!ChronicleSource.this.closed) {
                    ChronicleSource.this.selector.select();
                    Set<SelectionKey> keys = ChronicleSource.this.selector.keys();
                    for (SelectionKey key : keys) {
                        if (!key.isAcceptable()) continue;
                        SocketChannel socket = ChronicleSource.this.server.accept();
                        socket.configureBlocking(true);
                        ChronicleSource.this.logger.info("Accepted connection from: " + socket.getRemoteAddress());
                        ChronicleSource.this.service.execute(ChronicleSource.this.createSocketHandler(socket));
                    }
                }
            }
            catch (IOException e) {
                if (!ChronicleSource.this.closed) {
                    ChronicleSource.this.logger.warn("Acceptor dying", (Throwable)e);
                }
            }
            finally {
                ChronicleSource.this.service.shutdown();
                ChronicleSource.this.logger.info("Acceptor loop ended");
            }
        }
    }
}

