/*
 * Decompiled with CFR 0.152.
 */
package io.aeron.driver;

import io.aeron.driver.Configuration;
import io.aeron.driver.DriverConductor;
import io.aeron.driver.DriverManagedResource;
import io.aeron.driver.FlowControl;
import io.aeron.driver.NetworkPublicationPadding3;
import io.aeron.driver.RetransmitHandler;
import io.aeron.driver.RetransmitSender;
import io.aeron.driver.buffer.RawLog;
import io.aeron.driver.media.SendChannelEndpoint;
import io.aeron.driver.status.SystemCounterDescriptor;
import io.aeron.driver.status.SystemCounters;
import io.aeron.logbuffer.LogBufferDescriptor;
import io.aeron.logbuffer.LogBufferPartition;
import io.aeron.logbuffer.LogBufferUnblocker;
import io.aeron.logbuffer.TermScanner;
import io.aeron.protocol.DataHeaderFlyweight;
import io.aeron.protocol.SetupFlyweight;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import org.agrona.concurrent.EpochClock;
import org.agrona.concurrent.NanoClock;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.concurrent.status.AtomicCounter;
import org.agrona.concurrent.status.Position;

public class NetworkPublication
extends NetworkPublicationPadding3
implements RetransmitSender,
DriverManagedResource {
    private final int positionBitsToShift;
    private final int initialTermId;
    private final int termLengthMask;
    private final int mtuLength;
    private final int termWindowLength;
    private volatile boolean hasStatusMessageBeenReceived = false;
    private boolean reachedEndOfLife = false;
    private final LogBufferPartition[] logPartitions;
    private final ByteBuffer[] sendBuffers;
    private final Position publisherLimit;
    private final Position senderPosition;
    private final SendChannelEndpoint channelEndpoint;
    private final ByteBuffer heartbeatFrameBuffer = ByteBuffer.allocateDirect(32);
    private final DataHeaderFlyweight dataHeader = new DataHeaderFlyweight(this.heartbeatFrameBuffer);
    private final ByteBuffer setupFrameBuffer = ByteBuffer.allocateDirect(40);
    private final SetupFlyweight setupHeader = new SetupFlyweight(this.setupFrameBuffer);
    private final FlowControl flowControl;
    private final EpochClock epochClock;
    private final RetransmitHandler retransmitHandler;
    private final RawLog rawLog;
    private final AtomicCounter heartbeatsSent;
    private final AtomicCounter retransmitsSent;
    private final AtomicCounter senderFlowControlLimits;
    private final AtomicCounter dataPacketShortSends;
    private final AtomicCounter setupMessageShortSends;

    public NetworkPublication(SendChannelEndpoint channelEndpoint, NanoClock nanoClock, EpochClock epochClock, RawLog rawLog, Position publisherLimit, Position senderPosition, int sessionId, int streamId, int initialTermId, int mtuLength, SystemCounters systemCounters, FlowControl flowControl, RetransmitHandler retransmitHandler) {
        this.channelEndpoint = channelEndpoint;
        this.rawLog = rawLog;
        this.epochClock = epochClock;
        this.senderPosition = senderPosition;
        this.flowControl = flowControl;
        this.retransmitHandler = retransmitHandler;
        this.publisherLimit = publisherLimit;
        this.mtuLength = mtuLength;
        this.initialTermId = initialTermId;
        this.heartbeatsSent = systemCounters.get(SystemCounterDescriptor.HEARTBEATS_SENT);
        this.dataPacketShortSends = systemCounters.get(SystemCounterDescriptor.DATA_PACKET_SHORT_SENDS);
        this.retransmitsSent = systemCounters.get(SystemCounterDescriptor.RETRANSMITS_SENT);
        this.senderFlowControlLimits = systemCounters.get(SystemCounterDescriptor.SENDER_FLOW_CONTROL_LIMITS);
        this.setupMessageShortSends = systemCounters.get(SystemCounterDescriptor.SETUP_MESSAGE_SHORT_SENDS);
        this.logPartitions = rawLog.partitions();
        this.sendBuffers = rawLog.sliceTerms();
        int termLength = rawLog.termLength();
        this.termLengthMask = termLength - 1;
        flowControl.initialize(initialTermId, termLength);
        this.timeOfLastSendOrHeartbeat = nanoClock.nanoTime() - Configuration.PUBLICATION_HEARTBEAT_TIMEOUT_NS - 1L;
        this.timeOfLastSetup = nanoClock.nanoTime() - Configuration.PUBLICATION_SETUP_TIMEOUT_NS - 1L;
        this.positionBitsToShift = Integer.numberOfTrailingZeros(termLength);
        this.termWindowLength = Configuration.publicationTermWindowLength(termLength);
        this.initSetupFrame(initialTermId, termLength, sessionId, streamId);
        this.initHeartBeatFrame(sessionId, streamId);
    }

    public void close() {
        this.rawLog.close();
        this.publisherLimit.close();
        this.senderPosition.close();
    }

    public int send(long now) {
        int bytesSent;
        long senderPosition = this.senderPosition.get();
        int activeTermId = LogBufferDescriptor.computeTermIdFromPosition((long)senderPosition, (int)this.positionBitsToShift, (int)this.initialTermId);
        int termOffset = (int)senderPosition & this.termLengthMask;
        if (this.shouldSendSetupFrame) {
            this.setupMessageCheck(now, activeTermId, termOffset);
        }
        if (0 == (bytesSent = this.sendData(now, senderPosition, termOffset))) {
            this.heartbeatMessageCheck(now, activeTermId, termOffset);
            this.senderPositionLimit = this.flowControl.onIdle(now);
        }
        this.retransmitHandler.processTimeouts(now, this);
        return bytesSent;
    }

    public SendChannelEndpoint sendChannelEndpoint() {
        return this.channelEndpoint;
    }

    public int sessionId() {
        return this.dataHeader.sessionId();
    }

    public int streamId() {
        return this.dataHeader.streamId();
    }

    void senderPositionLimit(long positionLimit) {
        this.senderPositionLimit = positionLimit;
        if (!this.hasStatusMessageBeenReceived) {
            this.hasStatusMessageBeenReceived = true;
        }
    }

    int cleanLogBuffer() {
        int workCount = 0;
        for (LogBufferPartition partition : this.logPartitions) {
            if (partition.status() != 1) continue;
            partition.clean();
            workCount = 1;
        }
        return workCount;
    }

    @Override
    public void resend(int termId, int termOffset, int length) {
        long senderPosition = this.senderPosition.get();
        int activeTermId = LogBufferDescriptor.computeTermIdFromPosition((long)senderPosition, (int)this.positionBitsToShift, (int)this.initialTermId);
        if (termId == activeTermId || termId == activeTermId - 1) {
            long scanOutcome;
            int available;
            int activeIndex = LogBufferDescriptor.indexByTerm((int)this.initialTermId, (int)termId);
            UnsafeBuffer termBuffer = this.logPartitions[activeIndex].termBuffer();
            ByteBuffer sendBuffer = this.sendBuffers[activeIndex];
            int remainingBytes = length;
            int bytesSent = 0;
            while ((available = TermScanner.available((long)(scanOutcome = TermScanner.scanForAvailability((UnsafeBuffer)termBuffer, (int)(termOffset += bytesSent), (int)this.mtuLength)))) > 0) {
                sendBuffer.limit(termOffset + available).position(termOffset);
                if (available != this.channelEndpoint.send(sendBuffer)) {
                    this.dataPacketShortSends.orderedIncrement();
                    break;
                }
                bytesSent = available + TermScanner.padding((long)scanOutcome);
                if ((remainingBytes -= bytesSent) > 0) continue;
            }
            this.retransmitsSent.orderedIncrement();
        }
    }

    public void triggerSendSetupFrame() {
        this.shouldSendSetupFrame = true;
    }

    RawLog rawLog() {
        return this.rawLog;
    }

    int publisherLimitId() {
        return this.publisherLimit.id();
    }

    int updatePublishersLimit() {
        long candidatePublisherLimit;
        int workCount = 0;
        long l = candidatePublisherLimit = this.hasStatusMessageBeenReceived ? this.senderPosition.getVolatile() + (long)this.termWindowLength : 0L;
        if (this.publisherLimit.proposeMaxOrdered(candidatePublisherLimit)) {
            workCount = 1;
        }
        return workCount;
    }

    public void onNak(int termId, int termOffset, int length) {
        this.retransmitHandler.onNak(termId, termOffset, length, this);
    }

    public void onStatusMessage(int termId, int termOffset, int receiverWindowLength, InetSocketAddress srcAddress) {
        long position = this.flowControl.onStatusMessage(termId, termOffset, receiverWindowLength, srcAddress);
        this.senderPositionLimit(position);
        long now = this.epochClock.time();
        LogBufferDescriptor.timeOfLastStatusMessage((UnsafeBuffer)this.rawLog.logMetaData(), (long)now);
    }

    private int sendData(long now, long senderPosition, int termOffset) {
        int bytesSent = 0;
        int availableWindow = (int)(this.senderPositionLimit - senderPosition);
        if (availableWindow > 0) {
            int scanLimit = Math.min(availableWindow, this.mtuLength);
            int activeIndex = LogBufferDescriptor.indexByPosition((long)senderPosition, (int)this.positionBitsToShift);
            long scanOutcome = TermScanner.scanForAvailability((UnsafeBuffer)this.logPartitions[activeIndex].termBuffer(), (int)termOffset, (int)scanLimit);
            int available = TermScanner.available((long)scanOutcome);
            if (available > 0) {
                ByteBuffer sendBuffer = this.sendBuffers[activeIndex];
                sendBuffer.limit(termOffset + available).position(termOffset);
                if (available == this.channelEndpoint.send(sendBuffer)) {
                    this.timeOfLastSendOrHeartbeat = now;
                    this.trackSenderLimits = true;
                    bytesSent = available;
                    this.senderPosition.setOrdered(senderPosition + (long)bytesSent + (long)TermScanner.padding((long)scanOutcome));
                } else {
                    this.dataPacketShortSends.orderedIncrement();
                }
            }
        } else if (this.trackSenderLimits) {
            this.trackSenderLimits = false;
            this.senderFlowControlLimits.orderedIncrement();
        }
        return bytesSent;
    }

    private void setupMessageCheck(long now, int activeTermId, int termOffset) {
        if (now > this.timeOfLastSetup + Configuration.PUBLICATION_SETUP_TIMEOUT_NS) {
            this.setupFrameBuffer.clear();
            this.setupHeader.activeTermId(activeTermId).termOffset(termOffset);
            int bytesSent = this.channelEndpoint.send(this.setupFrameBuffer);
            if (40 != bytesSent) {
                this.setupMessageShortSends.orderedIncrement();
            }
            this.timeOfLastSetup = now;
            this.timeOfLastSendOrHeartbeat = now;
            if (this.hasStatusMessageBeenReceived) {
                this.shouldSendSetupFrame = false;
            }
        }
    }

    private void heartbeatMessageCheck(long now, int activeTermId, int termOffset) {
        if (now > this.timeOfLastSendOrHeartbeat + Configuration.PUBLICATION_HEARTBEAT_TIMEOUT_NS) {
            this.heartbeatFrameBuffer.clear();
            this.dataHeader.termId(activeTermId).termOffset(termOffset);
            int bytesSent = this.channelEndpoint.send(this.heartbeatFrameBuffer);
            if (32 != bytesSent) {
                this.dataPacketShortSends.orderedIncrement();
            }
            this.heartbeatsSent.orderedIncrement();
            this.timeOfLastSendOrHeartbeat = now;
        }
    }

    private void initSetupFrame(int activeTermId, int termLength, int sessionId, int streamId) {
        this.setupHeader.sessionId(sessionId).streamId(streamId).initialTermId(this.initialTermId).activeTermId(activeTermId).termOffset(0).termLength(termLength).mtuLength(this.mtuLength).ttl(this.channelEndpoint.multicastTtl()).version((short)0).flags((short)0).headerType(5).frameLength(40);
    }

    private void initHeartBeatFrame(int sessionId, int streamId) {
        this.dataHeader.sessionId(sessionId).streamId(streamId).version((short)0).flags((short)-64).headerType(1).frameLength(0);
    }

    private boolean isUnreferencedAndPotentiallyInactive(long now) {
        boolean result = false;
        if (0 == this.refCount) {
            long senderPosition = this.senderPosition.getVolatile();
            this.timeOfLastActivity = senderPosition == this.lastSenderPosition ? this.timeOfLastActivity : now;
            this.lastSenderPosition = senderPosition;
            result = true;
        } else {
            this.timeOfLastActivity = now;
        }
        return result;
    }

    @Override
    public void onTimeEvent(long time, DriverConductor conductor) {
        if (this.isUnreferencedAndPotentiallyInactive(time) && time > this.timeOfLastActivity + Configuration.PUBLICATION_LINGER_NS) {
            this.reachedEndOfLife = true;
            conductor.cleanupPublication(this);
        }
    }

    @Override
    public boolean hasReachedEndOfLife() {
        return this.reachedEndOfLife;
    }

    public void timeOfLastStateChange(long time) {
    }

    public long timeOfLastStateChange() {
        return this.timeOfLastActivity;
    }

    public void delete() {
    }

    @Override
    public int decRef() {
        int count;
        if (0 == (count = --this.refCount)) {
            this.channelEndpoint.removePublication(this);
        }
        return count;
    }

    @Override
    public int incRef() {
        return ++this.refCount;
    }

    @Override
    public long producerPosition() {
        UnsafeBuffer logMetaDataBuffer = this.rawLog.logMetaData();
        int initialTermId = LogBufferDescriptor.initialTermId((UnsafeBuffer)logMetaDataBuffer);
        long rawTail = this.logPartitions[LogBufferDescriptor.activePartitionIndex((UnsafeBuffer)logMetaDataBuffer)].rawTailVolatile();
        int termOffset = LogBufferDescriptor.termOffset((long)rawTail, (long)this.rawLog.termLength());
        return LogBufferDescriptor.computePosition((int)LogBufferDescriptor.termId((long)rawTail), (int)termOffset, (int)this.positionBitsToShift, (int)initialTermId);
    }

    @Override
    public long consumerPosition() {
        return this.senderPosition.getVolatile();
    }

    @Override
    public boolean unblockAtConsumerPosition() {
        return LogBufferUnblocker.unblock((LogBufferPartition[])this.logPartitions, (UnsafeBuffer)this.rawLog.logMetaData(), (long)this.consumerPosition());
    }
}

