package software.chronicle.enterprise.queue.replication;

import java.util.Objects;
import java.util.concurrent.TimeUnit;
import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.core.annotation.UsedViaReflection;
import net.openhft.chronicle.core.pool.ClassAliasPool;
import net.openhft.chronicle.network.cluster.AbstractSubHandler;
import net.openhft.chronicle.network.cluster.WritableSubHandler;
import net.openhft.chronicle.network.connection.CoreFields;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.impl.ExcerptContext;
import net.openhft.chronicle.queue.impl.RollingChronicleQueue;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueExcerpts;
import net.openhft.chronicle.wire.Demarshallable;
import net.openhft.chronicle.wire.DocumentContext;
import net.openhft.chronicle.wire.Wire;
import net.openhft.chronicle.wire.WireIn;
import net.openhft.chronicle.wire.WireOut;
import net.openhft.chronicle.wire.WireType;
import net.openhft.chronicle.wire.Wires;
import net.openhft.chronicle.wire.WriteMarshallable;
import net.openhft.chronicle.wire.YamlLogging;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/chronicle-queue-enterprise-1.4.27.jar:software/chronicle/enterprise/queue/replication/SourceReplicationHandler.class */
public class SourceReplicationHandler extends AbstractSubHandler<QueueClusterNetworkContext> implements Demarshallable, WriteMarshallable, WritableSubHandler {
    private static final Logger LOG;
    private RollingChronicleQueue chronicleQueue;
    private final String queueName;
    private final WireType wireType;
    private final boolean acknowledgement;
    private long nextIndexRequired;
    private ExcerptAppender appender;
    private ExcerptTailer tailer;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final QueueReplicationEvent queueReplicationEvent = new QueueReplicationEvent();
    private long second = System.currentTimeMillis();
    private long messagesReadPerSecond = 0;
    private boolean noQueueFileExists = false;
    private boolean handshaken = false;

    @NotNull
    private WriteMarshallable out = wireOut -> {
        DocumentContext writingDocument;
        if (this.handshaken) {
            if (this.noQueueFileExists) {
                this.tailer = this.chronicleQueue.createTailer();
                this.noQueueFileExists = false;
            }
            boolean z = true;
            while (wireOut.bytes().readRemaining() < 32768) {
                if (((ExcerptContext) this.tailer).wire() == null) {
                    this.noQueueFileExists = true;
                    return;
                }
                DocumentContext readingDocument = this.tailer.readingDocument();
                Throwable th = null;
                try {
                    if (closable().isClosed()) {
                        if (readingDocument != null) {
                            if (0 == 0) {
                                readingDocument.close();
                                return;
                            }
                            try {
                                readingDocument.close();
                                return;
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                                return;
                            }
                        }
                        return;
                    }
                    if (!readingDocument.isPresent()) {
                        if (readingDocument != null) {
                            if (0 == 0) {
                                readingDocument.close();
                                return;
                            }
                            try {
                                readingDocument.close();
                                return;
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                                return;
                            }
                        }
                        return;
                    }
                    long index = this.tailer.index();
                    if (index >= this.nextIndexRequired) {
                        Bytes<?> bytes = readingDocument.wire().bytes();
                        if (bytes.isEmpty()) {
                            if (readingDocument != null) {
                                if (0 == 0) {
                                    readingDocument.close();
                                    return;
                                }
                                try {
                                    readingDocument.close();
                                    return;
                                } catch (Throwable th4) {
                                    th.addSuppressed(th4);
                                    return;
                                }
                            }
                            return;
                        }
                        this.queueReplicationEvent.index(index);
                        this.queueReplicationEvent.payload(bytes);
                        if (z) {
                            writingDocument = wireOut.writingDocument(true);
                            Throwable th5 = null;
                            try {
                                try {
                                    writingDocument.wire().write(CoreFields.cid).int64(cid());
                                    if (writingDocument != null) {
                                        if (0 != 0) {
                                            try {
                                                writingDocument.close();
                                            } catch (Throwable th6) {
                                                th5.addSuppressed(th6);
                                            }
                                        } else {
                                            writingDocument.close();
                                        }
                                    }
                                    z = false;
                                } catch (Throwable th7) {
                                    th5 = th7;
                                    throw th7;
                                }
                            } finally {
                            }
                        }
                        writingDocument = wireOut.writingDocument(false);
                        Throwable th8 = null;
                        try {
                            try {
                                Wire wire = writingDocument.wire();
                                wire.writeEventName("re");
                                this.queueReplicationEvent.writeMarshallable(wire);
                                if (writingDocument != null) {
                                    if (0 != 0) {
                                        try {
                                            writingDocument.close();
                                        } catch (Throwable th9) {
                                            th8.addSuppressed(th9);
                                        }
                                    } else {
                                        writingDocument.close();
                                    }
                                }
                                this.messagesReadPerSecond++;
                                long seconds = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
                                if (seconds >= this.second + 10) {
                                    this.second = seconds;
                                    LOG.info("replicationEvents send per second=" + (this.messagesReadPerSecond / 10));
                                    this.messagesReadPerSecond = 0L;
                                }
                                if (YamlLogging.showServerWrites()) {
                                    LOG.info("server wrote:\n" + Wires.fromSizePrefixedBlobs((Wire) wireOut));
                                }
                            } catch (Throwable th10) {
                                th8 = th10;
                                throw th10;
                            }
                        } finally {
                        }
                    } else if (readingDocument != null) {
                        if (0 != 0) {
                            try {
                                readingDocument.close();
                            } catch (Throwable th11) {
                                th.addSuppressed(th11);
                            }
                        } else {
                            readingDocument.close();
                        }
                    }
                } finally {
                    if (readingDocument != null) {
                        if (0 != 0) {
                            try {
                                readingDocument.close();
                            } catch (Throwable th12) {
                                th.addSuppressed(th12);
                            }
                        } else {
                            readingDocument.close();
                        }
                    }
                }
            }
        }
    };

    @UsedViaReflection
    private SourceReplicationHandler(@NotNull WireIn wireIn) {
        this.queueName = wireIn.read(() -> {
            return "queueName";
        }).text();
        this.wireType = (WireType) wireIn.read(() -> {
            return "wireType";
        }).asEnum(WireType.class);
        this.acknowledgement = wireIn.read(() -> {
            return "acknowledgement";
        }).bool();
        this.nextIndexRequired = wireIn.read(() -> {
            return "nextIndexRequired";
        }).int64();
    }

    @UsedViaReflection
    public SourceReplicationHandler(@NotNull String str, @NotNull WireType wireType, boolean z, long j) {
        Objects.requireNonNull(str);
        if (!$assertionsDisabled && j < 0) {
            throw new AssertionError();
        }
        this.queueName = str;
        this.wireType = wireType;
        this.nextIndexRequired = j;
        this.acknowledgement = z;
    }

    @Override // net.openhft.chronicle.wire.WriteMarshallable
    public void writeMarshallable(@NotNull WireOut wireOut) {
        if (!$assertionsDisabled && this.nextIndexRequired < 0) {
            throw new AssertionError();
        }
        wireOut.write(() -> {
            return "queueName";
        }).text(this.queueName);
        wireOut.write(() -> {
            return "wireType";
        }).asEnum(this.wireType);
        wireOut.write(() -> {
            return "acknowledgement";
        }).bool(Boolean.valueOf(this.acknowledgement));
        wireOut.write(() -> {
            return "nextIndexRequired";
        }).int64_0x(this.nextIndexRequired);
    }

    @Override // net.openhft.chronicle.network.api.session.SubHandler
    public void onInitialize(@NotNull WireOut wireOut) {
        LOG.info("Replication source start up: IN PROGRESS. Locking the queue queueName=" + this.queueName);
        this.chronicleQueue = getChronicleQueue();
        this.chronicleQueue.queueLock().acquireLock();
        this.appender = this.chronicleQueue.acquireAppender();
    }

    @Override // net.openhft.chronicle.network.cluster.AbstractSubHandler, net.openhft.chronicle.network.api.session.SubHandler
    public void onRead(@NotNull WireIn wireIn, @NotNull WireOut wireOut) {
        if (this.handshaken) {
            if (this.acknowledgement) {
                ((SingleChronicleQueueExcerpts.StoreTailer) this.tailer).lastAcknowledgedIndexReplicated(wireIn.read(() -> {
                    return "idx";
                }).int64());
                if (nc().networkStatsListener() != null) {
                    nc().networkStatsListener().onRoundTripLatency(System.nanoTime() - wireIn.read(() -> {
                        return "ns";
                    }).int64());
                    return;
                }
                return;
            }
            return;
        }
        StringBuilder acquireStringBuilder = Wires.acquireStringBuilder();
        wireIn.readEventName(acquireStringBuilder);
        if ("re".contentEquals(acquireStringBuilder)) {
            this.queueReplicationEvent.readMarshallable(wireIn);
            ((SingleChronicleQueueExcerpts.InternalAppender) this.appender).writeBytes(this.queueReplicationEvent.index(), this.queueReplicationEvent.payload());
            return;
        }
        if ("eos".contentEquals(acquireStringBuilder)) {
            this.appender = null;
            this.chronicleQueue.queueLock().unlock();
            this.tailer = this.chronicleQueue.createTailer();
            this.handshaken = true;
            LOG.info("Replication source start up: DONE. Unlocking the queue queueName=" + this.queueName);
        }
    }

    @Override // net.openhft.chronicle.network.cluster.WritableSubHandler
    @NotNull
    public WriteMarshallable writer() {
        return this.out;
    }

    private RollingChronicleQueue getChronicleQueue() {
        return SingleChronicleQueueBuilder.builder(nc().getSourcePath(this.queueName), this.wireType).build();
    }

    static {
        $assertionsDisabled = !SourceReplicationHandler.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger((Class<?>) SourceReplicationHandler.class);
        ClassAliasPool.CLASS_ALIASES.addAlias(QueueReplicationEvent.class, SourceReplicationHandler.class);
    }
}
