/*
 * Decompiled with CFR 0.152.
 */
package net.openhft.chronicle.tcp;

import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import net.openhft.chronicle.ExcerptTailer;
import net.openhft.chronicle.tcp.ChronicleSource;
import net.openhft.chronicle.tcp.ChronicleSourceConfig;
import net.openhft.chronicle.tcp.ChronicleTcp;
import net.openhft.lang.model.constraints.NotNull;
import org.slf4j.Logger;

public abstract class ChronicleSourceSocketHandler
implements Runnable {
    private final ChronicleSource chronicle;
    private final ChronicleSourceConfig config;
    private final Logger logger;
    protected final SocketChannel socket;
    protected final Selector selector;
    protected final ByteBuffer buffer;
    protected ExcerptTailer tailer;
    protected long lastHeartbeat;
    protected ChronicleTcp.Command command;

    protected ChronicleSourceSocketHandler(@NotNull ChronicleSource chronicle, @NotNull SocketChannel socket, @NotNull Logger logger) throws IOException {
        this.chronicle = chronicle;
        this.config = this.chronicle.config();
        this.logger = logger;
        this.tailer = this.chronicle.createTailer();
        this.buffer = ChronicleTcp.createBuffer(1, ByteOrder.nativeOrder());
        this.lastHeartbeat = 0L;
        this.command = new ChronicleTcp.Command();
        this.socket = socket;
        this.socket.configureBlocking(false);
        this.socket.socket().setSendBufferSize(this.config.minBufferSize());
        this.socket.socket().setTcpNoDelay(true);
        this.selector = Selector.open();
    }

    @Override
    public void run() {
        block13: {
            try {
                this.socket.register(this.selector, 1);
                block4: while (!this.chronicle.closed()) {
                    if (this.selector.select(this.config.selectTimeout()) <= 0) continue;
                    Set<SelectionKey> keys = this.selector.selectedKeys();
                    Iterator<SelectionKey> it = keys.iterator();
                    while (it.hasNext()) {
                        SelectionKey key = it.next();
                        boolean stop = false;
                        if (key.isReadable()) {
                            if (!this.onRead(key)) {
                                keys.clear();
                                continue block4;
                            }
                            it.remove();
                            continue;
                        }
                        if (!key.isWritable()) continue;
                        if (!this.onWrite(key)) {
                            keys.clear();
                            continue block4;
                        }
                        it.remove();
                    }
                }
            }
            catch (Exception e) {
                if (this.chronicle.closed()) break block13;
                String msg = e.getMessage();
                if (msg != null && (msg.contains("reset by peer") || msg.contains("Broken pipe") || msg.contains("was aborted by"))) {
                    this.logger.info("Connection {} closed from the other end: ", (Object)this.socket, (Object)e.getMessage());
                } else {
                    this.logger.info("Connection {} died", (Object)this.socket, (Object)e);
                }
                try {
                    if (this.socket.isOpen()) {
                        this.socket.close();
                    }
                }
                catch (IOException ioe) {
                    this.logger.warn("", (Throwable)e);
                }
            }
        }
        if (this.tailer != null) {
            this.tailer.close();
        }
    }

    protected void setLastHeartbeat() {
        this.lastHeartbeat = System.currentTimeMillis() + this.config.heartbeatInterval();
    }

    protected void setLastHeartbeat(long from) {
        this.lastHeartbeat = from + this.config.heartbeatInterval();
    }

    protected void sendSizeAndIndex(int size, long index) throws IOException {
        this.buffer.clear();
        this.buffer.putInt(size);
        this.buffer.putLong(index);
        this.buffer.flip();
        ChronicleTcp.writeAll(this.socket, this.buffer);
        this.setLastHeartbeat();
    }

    protected boolean handleSubscribe(SelectionKey key) throws IOException {
        return true;
    }

    protected boolean handleQuery(SelectionKey key) throws IOException {
        block3: {
            if (this.tailer.index(this.command.data())) {
                long now = System.currentTimeMillis();
                this.setLastHeartbeat(now);
                do {
                    if (!this.tailer.nextIndex()) continue;
                    this.sendSizeAndIndex(-126, this.tailer.index());
                    break block3;
                } while (this.lastHeartbeat > now);
                this.sendSizeAndIndex(-128, 0L);
            } else {
                this.sendSizeAndIndex(-128, 0L);
            }
        }
        return true;
    }

    protected boolean onRead(SelectionKey key) throws IOException {
        try {
            this.command.read(this.socket);
            if (this.command.isSubscribe()) {
                return this.handleSubscribe(key);
            }
            if (this.command.isQuery()) {
                return this.handleQuery(key);
            }
            throw new IOException("Unknown action received (" + this.command.action() + ")");
        }
        catch (EOFException e) {
            key.selector().close();
            throw e;
        }
    }

    protected boolean onWrite(SelectionKey key) throws IOException {
        long now = System.currentTimeMillis();
        if (!this.chronicle.closed() && !this.publishData() && this.lastHeartbeat <= now) {
            this.sendSizeAndIndex(-128, 0L);
        }
        return true;
    }

    protected abstract boolean publishData() throws IOException;
}

