/*
 * Decompiled with CFR 0.152.
 */
package net.openhft.chronicle.tcp;

import java.io.EOFException;
import java.io.IOException;
import java.io.StreamCorruptedException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.SocketChannel;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import net.openhft.chronicle.Chronicle;
import net.openhft.chronicle.Excerpt;
import net.openhft.chronicle.ExcerptAppender;
import net.openhft.chronicle.ExcerptCommon;
import net.openhft.chronicle.ExcerptComparator;
import net.openhft.chronicle.ExcerptTailer;
import net.openhft.chronicle.IndexedChronicle;
import net.openhft.chronicle.VanillaChronicle;
import net.openhft.chronicle.tcp.ChronicleSinkConfig;
import net.openhft.chronicle.tcp.ChronicleTcp;
import net.openhft.chronicle.tools.WrappedExcerpt;
import net.openhft.lang.io.NativeBytes;
import net.openhft.lang.model.constraints.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import sun.nio.ch.DirectBuffer;

public class ChronicleSink
implements Chronicle {
    @NotNull
    private final Chronicle chronicle;
    @NotNull
    private final ChronicleSinkConfig config;
    @NotNull
    private final InetSocketAddress address;
    private final List<ExcerptCommon> excerpts;
    private final Logger logger;
    private volatile boolean closed = false;
    private final boolean isLocal;

    public ChronicleSink(String hostname, int port) throws IOException {
        this(null, ChronicleSinkConfig.DEFAULT, new InetSocketAddress(hostname, port));
    }

    public ChronicleSink(@NotNull Chronicle chronicle, String hostname, int port) throws IOException {
        this(chronicle, ChronicleSinkConfig.DEFAULT, new InetSocketAddress(hostname, port));
    }

    public ChronicleSink(@NotNull Chronicle chronicle, @NotNull ChronicleSinkConfig config, String hostname, int port) throws IOException {
        this(chronicle, config, new InetSocketAddress(hostname, port));
    }

    public ChronicleSink(@NotNull ChronicleSinkConfig config, String hostname, int port) throws IOException {
        this(null, config, new InetSocketAddress(hostname, port));
    }

    public ChronicleSink(@NotNull InetSocketAddress address) throws IOException {
        this(null, ChronicleSinkConfig.DEFAULT, address);
    }

    public ChronicleSink(@NotNull Chronicle chronicle, @NotNull InetSocketAddress address) throws IOException {
        this(chronicle, ChronicleSinkConfig.DEFAULT, address);
    }

    public ChronicleSink(@NotNull ChronicleSinkConfig config, @NotNull InetSocketAddress address) throws IOException {
        this(null, config, address);
    }

    public ChronicleSink(@NotNull Chronicle chronicle, @NotNull ChronicleSinkConfig config, @NotNull InetSocketAddress address) throws IOException {
        this.chronicle = chronicle;
        this.config = config;
        this.address = address;
        this.logger = LoggerFactory.getLogger((String)(this.getClass().getName() + '.' + address.getHostName() + '@' + address.getPort()));
        this.excerpts = Collections.synchronizedList(new LinkedList());
        this.isLocal = config.sharedChronicle() && ChronicleTcp.isLocalhost(this.address.getAddress());
    }

    @Override
    public String name() {
        return this.chronicle.name();
    }

    @Override
    @NotNull
    public synchronized Excerpt createExcerpt() throws IOException {
        Excerpt excerpt = null;
        excerpt = this.chronicle == null ? new VolatileExcerpt() : (Excerpt)this.createPersistedExcerpt();
        if (excerpt != null) {
            this.excerpts.add(excerpt);
        }
        return excerpt;
    }

    @Override
    @NotNull
    public synchronized ExcerptTailer createTailer() throws IOException {
        ExcerptTailer excerpt = null;
        excerpt = this.chronicle == null ? new VolatileExcerptTailer() : this.createPersistedExcerpt();
        if (excerpt != null) {
            this.excerpts.add(excerpt);
        }
        return excerpt;
    }

    @Override
    @NotNull
    public ExcerptAppender createAppender() throws IOException {
        throw new UnsupportedOperationException();
    }

    @Override
    public long lastWrittenIndex() {
        return this.chronicle != null ? this.chronicle.lastWrittenIndex() : -1L;
    }

    @Override
    public long size() {
        return this.chronicle != null ? this.chronicle.size() : 0L;
    }

    @Override
    public void clear() {
        if (this.chronicle != null) {
            this.chronicle.clear();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public synchronized void close() {
        if (!this.closed) {
            this.closed = true;
            List<ExcerptCommon> list = this.excerpts;
            synchronized (list) {
                for (ExcerptCommon excerpt : this.excerpts) {
                    excerpt.close();
                }
            }
            try {
                if (this.chronicle != null) {
                    this.chronicle.close();
                }
            }
            catch (IOException e) {
                this.logger.warn("Error closing Sink", (Throwable)e);
            }
        }
    }

    public void checkCounts(int min, int max) {
        if (this.chronicle instanceof VanillaChronicle) {
            ((VanillaChronicle)this.chronicle).checkCounts(min, max);
        }
    }

    private ExcerptTailer createPersistedExcerpt() throws IOException {
        if (!this.excerpts.isEmpty()) {
            throw new UnsupportedOperationException("An Excerpt has already been created");
        }
        if (this.isLocal) {
            return this.chronicle instanceof IndexedChronicle ? new PersistentIndexedLocalSinkExcerpt(this.chronicle.createTailer()) : new PersistentVanillaLocalSinkExcerpt(this.chronicle.createTailer());
        }
        return this.chronicle instanceof IndexedChronicle ? new PersistentIndexedSinkExcerpt(this.chronicle.createTailer()) : new PersistentVanillaSinkExcerpt(this.chronicle.createTailer());
    }

    protected Chronicle chronicle() {
        return this.chronicle;
    }

    protected boolean closed() {
        return this.closed;
    }

    protected ChronicleSinkConfig config() {
        return this.config;
    }

    private class VolatileExcerpt
    extends VolatileExcerptTailer
    implements Excerpt {
        private VolatileExcerpt() {
        }

        @Override
        public long findMatch(@NotNull ExcerptComparator comparator) {
            throw new UnsupportedOperationException();
        }

        @Override
        public void findRange(@NotNull long[] startEnd, @NotNull ExcerptComparator comparator) {
            throw new UnsupportedOperationException();
        }

        @Override
        public Excerpt toStart() {
            super.toStart();
            return this;
        }

        @Override
        public Excerpt toEnd() {
            super.toEnd();
            return this;
        }
    }

    private class VolatileExcerptTailer
    extends NativeBytes
    implements ExcerptTailer {
        private final Logger logger;
        private long index;
        private int lastSize;
        private final ByteBuffer buffer;
        private final SinkConnector connector;

        public VolatileExcerptTailer() {
            super(NO_PAGE, NO_PAGE);
            this.index = -1L;
            this.lastSize = 0;
            this.logger = LoggerFactory.getLogger((String)(this.getClass().getName() + "@" + ChronicleSink.this.address.toString()));
            this.connector = new SinkConnector();
            this.buffer = this.connector.buffer();
            this.startAddr = ((DirectBuffer)((Object)this.buffer)).address();
            this.capacityAddr = this.startAddr + (long)ChronicleSink.this.config.minBufferSize();
        }

        @Override
        public void finish() {
            if (!this.isFinished()) {
                if (this.lastSize > 0) {
                    this.buffer.position(this.buffer.position() + this.lastSize);
                }
                super.finish();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public synchronized void close() {
            try {
                this.connector.close();
            }
            catch (IOException e) {
                this.logger.warn("Error closing socket", (Throwable)e);
            }
            List list = ChronicleSink.this.excerpts;
            synchronized (list) {
                ChronicleSink.this.excerpts.remove(this);
            }
        }

        @Override
        public boolean wasPadding() {
            return false;
        }

        @Override
        public long index() {
            return this.index;
        }

        @Override
        public long lastWrittenIndex() {
            return this.index();
        }

        @Override
        public Chronicle chronicle() {
            return ChronicleSink.this;
        }

        @Override
        public ExcerptTailer toStart() {
            this.index(-1L);
            return this;
        }

        @Override
        public ExcerptTailer toEnd() {
            this.index(-2L);
            return this;
        }

        @Override
        public boolean index(long index) {
            this.index = index;
            this.lastSize = 0;
            try {
                ChronicleTcp.Command command;
                if (!this.connector.isOpen()) {
                    this.connector.open();
                }
                if ((command = ChronicleTcp.Command.make(1L, this.index)).write(this.connector.channel)) {
                    while (this.connector.read(12)) {
                        int receivedSize = this.buffer.getInt();
                        long receivedIndex = this.buffer.getLong();
                        switch (receivedSize) {
                            case -126: {
                                if (index == -1L) {
                                    return receivedIndex == -1L;
                                }
                                if (index == -2L) {
                                    return this.advanceIndex();
                                }
                                return index == receivedIndex ? this.advanceIndex() : false;
                            }
                            case -128: 
                            case -127: {
                                return false;
                            }
                        }
                        if (this.buffer.remaining() < receivedSize) continue;
                        this.buffer.position(this.buffer.position() + receivedSize);
                    }
                }
            }
            catch (IOException e) {
                this.logger.warn("", (Throwable)e);
            }
            return false;
        }

        @Override
        public boolean nextIndex() {
            try {
                if (!this.connector.isOpen()) {
                    return this.index(this.index);
                }
                if (!this.connector.read(20)) {
                    return false;
                }
                int excerptSize = this.buffer.getInt();
                long receivedIndex = this.buffer.getLong();
                switch (excerptSize) {
                    case -128: 
                    case -127: {
                        return false;
                    }
                    case -126: {
                        return false;
                    }
                }
                if (excerptSize > 0x8000000 || excerptSize < 0) {
                    throw new StreamCorruptedException("Size was " + excerptSize);
                }
                if (this.buffer.remaining() < excerptSize && !this.connector.read(this.buffer.remaining() - excerptSize)) {
                    return false;
                }
                this.index = receivedIndex;
                this.positionAddr = this.startAddr + (long)this.buffer.position();
                this.limitAddr = this.startAddr + (long)this.buffer.limit();
                this.lastSize = excerptSize;
                this.finished = false;
            }
            catch (IOException e) {
                this.close();
                return false;
            }
            return true;
        }

        protected boolean advanceIndex() throws IOException {
            if (this.nextIndex()) {
                this.finish();
                return true;
            }
            return false;
        }
    }

    private final class PersistentVanillaSinkExcerpt
    extends AbstractPersistentSinkExcerpt<VanillaChronicle> {
        @NotNull
        private final VanillaChronicle.VanillaAppender appender;

        public PersistentVanillaSinkExcerpt(ExcerptCommon excerpt) throws IOException {
            super(excerpt);
            this.appender = ((VanillaChronicle)this.chronicleImpl).createAppender();
        }

        @Override
        protected long lastLocalIndex() {
            return ((VanillaChronicle)this.chronicleImpl).lastIndex();
        }

        @Override
        protected boolean readNextExcerpt() {
            try {
                if (!ChronicleSink.this.closed && !this.connector.read(12, 20)) {
                    return false;
                }
                int size = this.buffer.getInt();
                long scIndex = this.buffer.getLong();
                switch (size) {
                    case -128: {
                        return false;
                    }
                    case -127: {
                        return false;
                    }
                    case -126: {
                        return this.readNextExcerpt();
                    }
                }
                if (size > 0x8000000 || size < 0) {
                    throw new StreamCorruptedException("size was " + size);
                }
                if (this.lastLocalIndex != scIndex) {
                    int cycle = (int)(scIndex >>> ((VanillaChronicle)this.chronicleImpl).getEntriesForCycleBits());
                    this.appender.startExcerpt(size, cycle);
                    long remaining = size;
                    int limit = this.buffer.limit();
                    int size2 = (int)Math.min((long)this.buffer.remaining(), remaining);
                    remaining -= (long)size2;
                    this.buffer.limit(this.buffer.position() + size2);
                    this.appender.write(this.buffer);
                    this.buffer.limit(limit);
                    while (remaining > 0L) {
                        this.buffer.clear();
                        int size3 = (int)Math.min((long)this.buffer.capacity(), remaining);
                        this.buffer.limit(size3);
                        this.connector.read();
                        this.buffer.flip();
                        remaining -= (long)this.buffer.remaining();
                        this.appender.write(this.buffer);
                    }
                } else {
                    this.buffer.position(this.buffer.position() + size);
                    return this.readNextExcerpt();
                }
                this.appender.finish();
            }
            catch (IOException e) {
                ChronicleSink.this.logger.info("Lost connection to {} retrying ", (Object)ChronicleSink.this.address, (Object)e);
                try {
                    this.connector.close();
                }
                catch (IOException iOException) {
                    // empty catch block
                }
            }
            return true;
        }
    }

    private class PersistentVanillaLocalSinkExcerpt
    extends AbstractPersistentSinkExcerpt<VanillaChronicle> {
        public PersistentVanillaLocalSinkExcerpt(ExcerptCommon excerpt) throws IOException {
            super(excerpt);
        }

        @Override
        protected long lastLocalIndex() {
            return ((VanillaChronicle)this.chronicleImpl).lastIndex();
        }

        @Override
        protected boolean readNextExcerpt() {
            try {
                if (!ChronicleSink.this.closed) {
                    ChronicleTcp.Command.makeAndSend(2L, this.lastLocalIndex(), this.connector.channel);
                    if (this.connector.read(12)) {
                        int size = this.buffer.getInt();
                        long scIndex = this.buffer.getLong();
                        switch (size) {
                            case -128: {
                                return false;
                            }
                            case -127: {
                                return false;
                            }
                            case -126: {
                                return true;
                            }
                        }
                    }
                }
            }
            catch (IOException e) {
                ChronicleSink.this.logger.info("Lost connection to {} retrying", (Object)ChronicleSink.this.address, (Object)e);
                try {
                    this.connector.close();
                }
                catch (IOException iOException) {
                    // empty catch block
                }
            }
            return false;
        }
    }

    private final class PersistentIndexedSinkExcerpt
    extends AbstractPersistentSinkExcerpt<IndexedChronicle> {
        @NotNull
        private final ExcerptAppender appender;

        public PersistentIndexedSinkExcerpt(ExcerptCommon excerpt) throws IOException {
            super(excerpt);
            this.appender = ChronicleSink.this.chronicle.createAppender();
        }

        @Override
        protected long lastLocalIndex() {
            return ((IndexedChronicle)this.chronicleImpl).lastWrittenIndex();
        }

        @Override
        protected boolean readNextExcerpt() {
            try {
                if (!ChronicleSink.this.closed && !this.connector.read(12, 20)) {
                    return false;
                }
                int size = this.buffer.getInt();
                long scIndex = this.buffer.getLong();
                switch (size) {
                    case -128: {
                        return false;
                    }
                    case -127: {
                        this.appender.startExcerpt(((IndexedChronicle)ChronicleSink.this.chronicle).config().dataBlockSize() - 1);
                        return true;
                    }
                    case -126: {
                        return this.readNextExcerpt();
                    }
                }
                if (size > 0x8000000 || size < 0) {
                    throw new StreamCorruptedException("size was " + size);
                }
                if (scIndex != ChronicleSink.this.chronicle.size()) {
                    throw new StreamCorruptedException("Expected index " + ChronicleSink.this.chronicle.size() + " but got " + scIndex);
                }
                this.appender.startExcerpt(size);
                long remaining = size;
                int limit = this.buffer.limit();
                int size2 = (int)Math.min((long)this.buffer.remaining(), remaining);
                remaining -= (long)size2;
                this.buffer.limit(this.buffer.position() + size2);
                this.appender.write(this.buffer);
                this.buffer.limit(limit);
                while (remaining > 0L) {
                    this.buffer.clear();
                    int size3 = (int)Math.min((long)this.buffer.capacity(), remaining);
                    this.buffer.limit(size3);
                    this.connector.read();
                    this.buffer.flip();
                    remaining -= (long)this.buffer.remaining();
                    this.appender.write(this.buffer);
                }
                this.appender.finish();
            }
            catch (IOException e) {
                ChronicleSink.this.logger.info("Lost connection to {} retrying", (Object)ChronicleSink.this.address, (Object)e);
                try {
                    this.connector.close();
                }
                catch (IOException iOException) {
                    // empty catch block
                }
            }
            return true;
        }
    }

    private class PersistentIndexedLocalSinkExcerpt
    extends AbstractPersistentSinkExcerpt<IndexedChronicle> {
        public PersistentIndexedLocalSinkExcerpt(ExcerptCommon excerpt) throws IOException {
            super(excerpt);
        }

        @Override
        protected long lastLocalIndex() {
            return ((IndexedChronicle)this.chronicleImpl).lastWrittenIndex();
        }

        @Override
        protected boolean readNextExcerpt() {
            try {
                if (!ChronicleSink.this.closed) {
                    ChronicleTcp.Command.makeAndSend(2L, this.lastLocalIndex(), this.connector.channel);
                    if (this.connector.read(12)) {
                        int size = this.buffer.getInt();
                        long scIndex = this.buffer.getLong();
                        switch (size) {
                            case -128: {
                                return false;
                            }
                            case -127: {
                                return false;
                            }
                            case -126: {
                                return true;
                            }
                        }
                    }
                }
            }
            catch (IOException e) {
                ChronicleSink.this.logger.info("Lost connection to {} retrying", (Object)ChronicleSink.this.address, (Object)e);
                try {
                    this.connector.close();
                }
                catch (IOException iOException) {
                    // empty catch block
                }
            }
            return false;
        }
    }

    private abstract class AbstractPersistentSinkExcerpt<T extends Chronicle>
    extends WrappedExcerpt {
        protected final T chronicleImpl;
        protected final ByteBuffer buffer;
        protected final SinkConnector connector;
        protected long lastLocalIndex;

        public AbstractPersistentSinkExcerpt(ExcerptCommon excerptCommon) {
            super(excerptCommon);
            this.chronicleImpl = ChronicleSink.this.chronicle;
            this.connector = new SinkConnector();
            this.buffer = this.connector.buffer();
            this.lastLocalIndex = -1L;
        }

        @Override
        public boolean nextIndex() {
            return super.nextIndex() || this.readNext() && super.nextIndex();
        }

        @Override
        public boolean index(long index) throws IndexOutOfBoundsException {
            return super.index(index) || index >= 0L && this.readNext() && super.index(index);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public synchronized void close() {
            try {
                this.connector.close();
            }
            catch (IOException e) {
                ChronicleSink.this.logger.warn("Error closing socket", (Throwable)e);
            }
            if (!ChronicleSink.this.closed) {
                List list = ChronicleSink.this.excerpts;
                synchronized (list) {
                    ChronicleSink.this.excerpts.remove(this);
                }
            }
            super.close();
        }

        protected boolean readNext() {
            if (!this.connector.isOpen()) {
                if (this.connector.open()) {
                    if (!ChronicleSink.this.isLocal) {
                        ChronicleTcp.Command command = ChronicleTcp.Command.make(1L, this.lastLocalIndex());
                        try {
                            command.write(this.connector.channel);
                            this.lastLocalIndex = command.data();
                        }
                        catch (IOException e) {
                            return false;
                        }
                    }
                } else {
                    return false;
                }
            }
            return this.connector.isOpen() && this.readNextExcerpt();
        }

        protected abstract long lastLocalIndex();

        protected abstract boolean readNextExcerpt();
    }

    private final class SinkConnector {
        private final ByteBuffer buffer;
        private SocketChannel channel = null;

        public SinkConnector() {
            this.buffer = ChronicleTcp.createBuffer(ChronicleSink.this.config.minBufferSize(), ByteOrder.nativeOrder());
        }

        public ByteBuffer buffer() {
            return this.buffer;
        }

        public void close() throws IOException {
            if (this.channel != null) {
                this.channel.close();
                this.channel = null;
            }
        }

        public boolean open() {
            while (!ChronicleSink.this.closed) {
                try {
                    this.buffer.clear();
                    this.buffer.limit(0);
                    this.channel = SocketChannel.open(ChronicleSink.this.address);
                    this.channel.socket().setTcpNoDelay(true);
                    this.channel.socket().setReceiveBufferSize(ChronicleSink.this.config.minBufferSize());
                    ChronicleSink.this.logger.info("Connected to " + ChronicleSink.this.address);
                    return true;
                }
                catch (IOException e) {
                    ChronicleSink.this.logger.info("Failed to connect to {}, retrying", (Object)ChronicleSink.this.address, (Object)e);
                    try {
                        Thread.sleep(ChronicleSink.this.config.reconnectDelay());
                    }
                    catch (InterruptedException e2) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
            return false;
        }

        public boolean isOpen() {
            return !ChronicleSink.this.closed && this.channel != null && this.channel.isOpen();
        }

        public boolean isClosed() {
            return !this.isOpen();
        }

        public boolean write(ByteBuffer buffer) {
            try {
                ChronicleTcp.writeAllOrEOF(this.channel, buffer);
            }
            catch (IOException e) {
                return false;
            }
            return true;
        }

        public boolean read() throws IOException {
            if (this.channel.read(this.buffer) < 0) {
                throw new EOFException();
            }
            return true;
        }

        public boolean read(int size) throws IOException {
            return this.read(size, size);
        }

        public boolean read(int threshod, int size) throws IOException {
            if (!ChronicleSink.this.closed && this.buffer.remaining() < threshod) {
                if (this.buffer.remaining() == 0) {
                    this.buffer.clear();
                } else {
                    this.buffer.compact();
                }
                while (this.buffer.position() < size) {
                    if (this.channel.read(this.buffer) >= 0) continue;
                    this.channel.close();
                    return false;
                }
                this.buffer.flip();
            }
            return !ChronicleSink.this.closed;
        }
    }
}

