/*
 * Decompiled with CFR 0.152.
 */
package io.aeron.driver;

import io.aeron.Aeron;
import io.aeron.driver.Configuration;
import io.aeron.driver.DriverConductor;
import io.aeron.driver.DriverManagedResource;
import io.aeron.driver.FlowControl;
import io.aeron.driver.NetworkPublicationPadding3;
import io.aeron.driver.NetworkPublicationThreadLocals;
import io.aeron.driver.RetransmitHandler;
import io.aeron.driver.RetransmitSender;
import io.aeron.driver.Subscribable;
import io.aeron.driver.buffer.RawLog;
import io.aeron.driver.media.SendChannelEndpoint;
import io.aeron.driver.status.SystemCounterDescriptor;
import io.aeron.driver.status.SystemCounters;
import io.aeron.logbuffer.LogBufferDescriptor;
import io.aeron.logbuffer.LogBufferUnblocker;
import io.aeron.logbuffer.TermScanner;
import io.aeron.protocol.DataHeaderFlyweight;
import io.aeron.protocol.RttMeasurementFlyweight;
import io.aeron.protocol.SetupFlyweight;
import io.aeron.protocol.StatusMessageFlyweight;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import org.agrona.collections.ArrayUtil;
import org.agrona.concurrent.EpochClock;
import org.agrona.concurrent.NanoClock;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.concurrent.status.AtomicCounter;
import org.agrona.concurrent.status.Position;
import org.agrona.concurrent.status.ReadablePosition;

public class NetworkPublication
extends NetworkPublicationPadding3
implements RetransmitSender,
DriverManagedResource,
Subscribable {
    private final long registrationId;
    private final long unblockTimeoutNs;
    private final int positionBitsToShift;
    private final int initialTermId;
    private final int termBufferLength;
    private final int termLengthMask;
    private final int mtuLength;
    private final int termWindowLength;
    private final int sessionId;
    private final int streamId;
    private final boolean isExclusive;
    private volatile boolean isConnected;
    private volatile boolean hasSenderReleased;
    private volatile boolean isEndOfStream;
    private Status status = Status.ACTIVE;
    private final UnsafeBuffer[] termBuffers;
    private final ByteBuffer[] sendBuffers;
    private final Position publisherLimit;
    private final Position senderPosition;
    private final Position senderLimit;
    private final SendChannelEndpoint channelEndpoint;
    private final ByteBuffer heartbeatBuffer;
    private final DataHeaderFlyweight heartbeatDataHeader;
    private final ByteBuffer setupBuffer;
    private final SetupFlyweight setupHeader;
    private final ByteBuffer rttMeasurementBuffer;
    private final RttMeasurementFlyweight rttMeasurementHeader;
    private final FlowControl flowControl;
    private final NanoClock nanoClock;
    private final EpochClock epochClock;
    private final RetransmitHandler retransmitHandler;
    private final UnsafeBuffer metaDataBuffer;
    private final RawLog rawLog;
    private final AtomicCounter heartbeatsSent;
    private final AtomicCounter retransmitsSent;
    private final AtomicCounter senderFlowControlLimits;
    private final AtomicCounter shortSends;
    private final AtomicCounter unblockedPublications;

    public NetworkPublication(long registrationId, SendChannelEndpoint channelEndpoint, NanoClock nanoClock, EpochClock epochClock, RawLog rawLog, Position publisherLimit, Position senderPosition, Position senderLimit, int sessionId, int streamId, int initialTermId, int mtuLength, SystemCounters systemCounters, FlowControl flowControl, RetransmitHandler retransmitHandler, NetworkPublicationThreadLocals threadLocals, long unblockTimeoutNs, boolean isExclusive) {
        int termLength;
        this.registrationId = registrationId;
        this.unblockTimeoutNs = unblockTimeoutNs;
        this.channelEndpoint = channelEndpoint;
        this.rawLog = rawLog;
        this.nanoClock = nanoClock;
        this.epochClock = epochClock;
        this.senderPosition = senderPosition;
        this.senderLimit = senderLimit;
        this.flowControl = flowControl;
        this.retransmitHandler = retransmitHandler;
        this.publisherLimit = publisherLimit;
        this.mtuLength = mtuLength;
        this.initialTermId = initialTermId;
        this.sessionId = sessionId;
        this.streamId = streamId;
        this.isExclusive = isExclusive;
        this.metaDataBuffer = rawLog.metaData();
        this.setupBuffer = threadLocals.setupBuffer();
        this.setupHeader = threadLocals.setupHeader();
        this.heartbeatBuffer = threadLocals.heartbeatBuffer();
        this.heartbeatDataHeader = threadLocals.heartbeatDataHeader();
        this.rttMeasurementBuffer = threadLocals.rttMeasurementBuffer();
        this.rttMeasurementHeader = threadLocals.rttMeasurementHeader();
        this.heartbeatsSent = systemCounters.get(SystemCounterDescriptor.HEARTBEATS_SENT);
        this.shortSends = systemCounters.get(SystemCounterDescriptor.SHORT_SENDS);
        this.retransmitsSent = systemCounters.get(SystemCounterDescriptor.RETRANSMITS_SENT);
        this.senderFlowControlLimits = systemCounters.get(SystemCounterDescriptor.SENDER_FLOW_CONTROL_LIMITS);
        this.unblockedPublications = systemCounters.get(SystemCounterDescriptor.UNBLOCKED_PUBLICATIONS);
        this.termBuffers = rawLog.termBuffers();
        this.sendBuffers = rawLog.sliceTerms();
        this.termBufferLength = termLength = rawLog.termLength();
        this.termLengthMask = termLength - 1;
        flowControl.initialize(initialTermId, termLength);
        long nowNs = nanoClock.nanoTime();
        this.timeOfLastSendOrHeartbeatNs = nowNs - Configuration.PUBLICATION_HEARTBEAT_TIMEOUT_NS - 1L;
        this.timeOfLastSetupNs = nowNs - Configuration.PUBLICATION_SETUP_TIMEOUT_NS - 1L;
        this.positionBitsToShift = Integer.numberOfTrailingZeros(termLength);
        this.termWindowLength = Configuration.publicationTermWindowLength(termLength);
        this.cleanPosition = this.lastSenderPosition = senderPosition.get();
        this.timeOfLastActivityNs = nowNs;
    }

    public void close() {
        this.publisherLimit.close();
        this.senderPosition.close();
        this.senderLimit.close();
        for (ReadablePosition position : this.spyPositions) {
            position.close();
        }
        this.rawLog.close();
    }

    public int mtuLength() {
        return this.mtuLength;
    }

    public long registrationId() {
        return this.registrationId;
    }

    public boolean isExclusive() {
        return this.isExclusive;
    }

    public int send(long nowNs) {
        int bytesSent;
        long senderPosition = this.senderPosition.get();
        int activeTermId = LogBufferDescriptor.computeTermIdFromPosition(senderPosition, this.positionBitsToShift, this.initialTermId);
        int termOffset = (int)senderPosition & this.termLengthMask;
        if (this.shouldSendSetupFrame) {
            this.setupMessageCheck(nowNs, activeTermId, termOffset);
        }
        if (0 == (bytesSent = this.sendData(nowNs, senderPosition, termOffset))) {
            boolean isEndOfStream = this.isEndOfStream;
            bytesSent = this.heartbeatMessageCheck(nowNs, activeTermId, termOffset, isEndOfStream);
            this.senderLimit.setOrdered(this.flowControl.onIdle(nowNs, this.senderLimit.get(), this.senderPosition.get(), isEndOfStream));
        }
        this.retransmitHandler.processTimeouts(nowNs, this);
        return bytesSent;
    }

    public SendChannelEndpoint channelEndpoint() {
        return this.channelEndpoint;
    }

    public int sessionId() {
        return this.sessionId;
    }

    public int streamId() {
        return this.streamId;
    }

    @Override
    public void resend(int termId, int termOffset, int length) {
        long senderPosition = this.senderPosition.get();
        long resendPosition = LogBufferDescriptor.computePosition(termId, termOffset, this.positionBitsToShift, this.initialTermId);
        if (resendPosition < senderPosition && resendPosition >= senderPosition - (long)this.rawLog.termLength()) {
            long scanOutcome;
            int available;
            int activeIndex = LogBufferDescriptor.indexByPosition(resendPosition, this.positionBitsToShift);
            UnsafeBuffer termBuffer = this.termBuffers[activeIndex];
            ByteBuffer sendBuffer = this.sendBuffers[activeIndex];
            int remainingBytes = length;
            int bytesSent = 0;
            int offset = termOffset;
            while ((available = TermScanner.available(scanOutcome = TermScanner.scanForAvailability(termBuffer, offset += bytesSent, Math.min(this.mtuLength, remainingBytes)))) > 0) {
                sendBuffer.limit(offset + available).position(offset);
                if (available != this.channelEndpoint.send(sendBuffer)) {
                    this.shortSends.increment();
                    break;
                }
                bytesSent = available + TermScanner.padding(scanOutcome);
                if ((remainingBytes -= bytesSent) > 0) continue;
            }
            this.retransmitsSent.orderedIncrement();
        }
    }

    public void triggerSendSetupFrame() {
        this.shouldSendSetupFrame = true;
    }

    @Override
    public void addSubscriber(ReadablePosition spyPosition) {
        this.spyPositions = ArrayUtil.add(this.spyPositions, spyPosition);
    }

    @Override
    public void removeSubscriber(ReadablePosition spyPosition) {
        this.spyPositions = ArrayUtil.remove(this.spyPositions, spyPosition);
        spyPosition.close();
    }

    public void onNak(int termId, int termOffset, int length) {
        this.retransmitHandler.onNak(termId, termOffset, length, this.termBufferLength, this);
    }

    public void onStatusMessage(StatusMessageFlyweight msg, InetSocketAddress srcAddress) {
        LogBufferDescriptor.timeOfLastStatusMessage(this.metaDataBuffer, this.epochClock.time());
        if (!this.isConnected) {
            this.isConnected = true;
        }
        this.senderLimit.setOrdered(this.flowControl.onStatusMessage(msg, srcAddress, this.senderLimit.get(), this.initialTermId, this.positionBitsToShift, this.nanoClock.nanoTime()));
    }

    public void onRttMeasurement(RttMeasurementFlyweight msg, InetSocketAddress srcAddress) {
        if (128 == (msg.flags() & 0x80)) {
            this.rttMeasurementHeader.receiverId(msg.receiverId()).echoTimestampNs(msg.echoTimestampNs()).receptionDelta(0L).sessionId(this.sessionId).streamId(this.streamId).flags((short)0);
            int bytesSent = this.channelEndpoint.send(this.rttMeasurementBuffer);
            if (40 != bytesSent) {
                this.shortSends.increment();
            }
        }
    }

    RawLog rawLog() {
        return this.rawLog;
    }

    int publisherLimitId() {
        return this.publisherLimit.id();
    }

    int updatePublisherLimit() {
        int workCount = 0;
        long senderPosition = this.senderPosition.getVolatile();
        if (this.isConnected) {
            long proposedPublisherLimit;
            long minConsumerPosition = senderPosition;
            if (this.spyPositions.length > 0) {
                for (ReadablePosition spyPosition : this.spyPositions) {
                    minConsumerPosition = Math.min(minConsumerPosition, spyPosition.getVolatile());
                }
            }
            if (this.publisherLimit.proposeMaxOrdered(proposedPublisherLimit = minConsumerPosition + (long)this.termWindowLength)) {
                this.cleanBuffer(proposedPublisherLimit);
                workCount = 1;
            }
        } else if (this.publisherLimit.get() > senderPosition) {
            this.publisherLimit.setOrdered(senderPosition);
        }
        return workCount;
    }

    boolean hasSpies() {
        return this.spyPositions.length > 0;
    }

    private int sendData(long nowNs, long senderPosition, int termOffset) {
        int bytesSent = 0;
        int availableWindow = (int)(this.senderLimit.get() - senderPosition);
        if (availableWindow > 0) {
            int scanLimit = Math.min(availableWindow, this.mtuLength);
            int activeIndex = LogBufferDescriptor.indexByPosition(senderPosition, this.positionBitsToShift);
            long scanOutcome = TermScanner.scanForAvailability(this.termBuffers[activeIndex], termOffset, scanLimit);
            int available = TermScanner.available(scanOutcome);
            if (available > 0) {
                ByteBuffer sendBuffer = this.sendBuffers[activeIndex];
                sendBuffer.limit(termOffset + available).position(termOffset);
                if (available == this.channelEndpoint.send(sendBuffer)) {
                    this.timeOfLastSendOrHeartbeatNs = nowNs;
                    this.trackSenderLimits = true;
                    bytesSent = available;
                    this.senderPosition.setOrdered(senderPosition + (long)bytesSent + (long)TermScanner.padding(scanOutcome));
                } else {
                    this.shortSends.increment();
                }
            }
        } else if (this.trackSenderLimits) {
            this.trackSenderLimits = false;
            this.senderFlowControlLimits.orderedIncrement();
        }
        return bytesSent;
    }

    private void setupMessageCheck(long nowNs, int activeTermId, int termOffset) {
        if (nowNs > this.timeOfLastSetupNs + Configuration.PUBLICATION_SETUP_TIMEOUT_NS) {
            this.setupBuffer.clear();
            this.setupHeader.activeTermId(activeTermId).termOffset(termOffset).sessionId(this.sessionId).streamId(this.streamId).initialTermId(this.initialTermId).termLength(this.termBufferLength).mtuLength(this.mtuLength).ttl(this.channelEndpoint.multicastTtl());
            int bytesSent = this.channelEndpoint.send(this.setupBuffer);
            if (40 != bytesSent) {
                this.shortSends.increment();
            }
            this.timeOfLastSetupNs = nowNs;
            this.timeOfLastSendOrHeartbeatNs = nowNs;
            if (this.isConnected) {
                this.shouldSendSetupFrame = false;
            }
        }
    }

    private int heartbeatMessageCheck(long nowNs, int activeTermId, int termOffset, boolean isEndOfStream) {
        int bytesSent = 0;
        if (nowNs > this.timeOfLastSendOrHeartbeatNs + Configuration.PUBLICATION_HEARTBEAT_TIMEOUT_NS) {
            this.heartbeatBuffer.clear();
            this.heartbeatDataHeader.sessionId(this.sessionId).streamId(this.streamId).termId(activeTermId).termOffset(termOffset);
            if (isEndOfStream) {
                this.heartbeatDataHeader.flags((short)-32);
            } else {
                this.heartbeatDataHeader.flags((short)-64);
            }
            bytesSent = this.channelEndpoint.send(this.heartbeatBuffer);
            if (32 != bytesSent) {
                this.shortSends.increment();
            }
            this.heartbeatsSent.orderedIncrement();
            this.timeOfLastSendOrHeartbeatNs = nowNs;
        }
        return bytesSent;
    }

    private void cleanBuffer(long publisherLimit) {
        long cleanPosition = this.cleanPosition;
        long dirtyRange = publisherLimit - cleanPosition;
        int bufferCapacity = this.termBufferLength;
        int reservedRange = bufferCapacity * 2;
        if (dirtyRange > (long)reservedRange) {
            UnsafeBuffer dirtyTerm = this.termBuffers[LogBufferDescriptor.indexByPosition(cleanPosition, this.positionBitsToShift)];
            int termOffset = (int)cleanPosition & this.termLengthMask;
            int bytesForCleaning = (int)(dirtyRange - (long)reservedRange);
            int length = Math.min(bytesForCleaning, bufferCapacity - termOffset);
            dirtyTerm.setMemory(termOffset, length, (byte)0);
            this.cleanPosition = cleanPosition + (long)length;
        }
    }

    private void checkForBlockedPublisher(long timeNs, long senderPosition) {
        if (senderPosition == this.lastSenderPosition && this.producerPosition() > senderPosition) {
            if (timeNs > this.timeOfLastActivityNs + this.unblockTimeoutNs && LogBufferUnblocker.unblock(this.termBuffers, this.metaDataBuffer, senderPosition)) {
                this.unblockedPublications.orderedIncrement();
            }
        } else {
            this.timeOfLastActivityNs = timeNs;
            this.lastSenderPosition = senderPosition;
        }
    }

    private boolean spiesFinishedConsuming(DriverConductor conductor, long eosPosition) {
        if (this.spyPositions.length > 0) {
            for (ReadablePosition spyPosition : this.spyPositions) {
                if (spyPosition.getVolatile() >= eosPosition) continue;
                return false;
            }
            conductor.cleanupSpies(this);
            for (ReadablePosition position : this.spyPositions) {
                position.close();
            }
            this.spyPositions = EMPTY_POSITIONS;
        }
        return true;
    }

    private void updateConnectedStatus(long timeMs) {
        if (this.isConnected && timeMs > LogBufferDescriptor.timeOfLastStatusMessage(this.metaDataBuffer) + Aeron.PUBLICATION_CONNECTION_TIMEOUT_MS) {
            this.isConnected = false;
        }
    }

    @Override
    public void onTimeEvent(long timeNs, long timeMs, DriverConductor conductor) {
        this.updateConnectedStatus(timeMs);
        switch (this.status) {
            case ACTIVE: {
                this.checkForBlockedPublisher(timeNs, this.consumerPosition());
                break;
            }
            case DRAINING: {
                long consumerPosition = this.consumerPosition();
                if (this.producerPosition() > consumerPosition) {
                    if (LogBufferUnblocker.unblock(this.termBuffers, this.metaDataBuffer, consumerPosition)) {
                        this.unblockedPublications.orderedIncrement();
                        this.timeOfLastActivityNs = timeNs;
                        break;
                    }
                    if (this.isConnected) {
                        break;
                    }
                } else {
                    this.isEndOfStream = true;
                }
                if (!this.spiesFinishedConsuming(conductor, consumerPosition)) break;
                this.timeOfLastActivityNs = timeNs;
                this.status = Status.LINGER;
                break;
            }
            case LINGER: {
                if (timeNs <= this.timeOfLastActivityNs + Configuration.PUBLICATION_LINGER_NS) break;
                conductor.cleanupPublication(this);
                this.status = Status.CLOSING;
            }
        }
    }

    @Override
    public boolean hasReachedEndOfLife() {
        return this.hasSenderReleased;
    }

    @Override
    public void timeOfLastStateChange(long time) {
    }

    @Override
    public long timeOfLastStateChange() {
        return this.timeOfLastActivityNs;
    }

    @Override
    public void delete() {
        this.close();
    }

    @Override
    public int decRef() {
        int count;
        if (0 == (count = --this.refCount)) {
            this.status = Status.DRAINING;
            this.channelEndpoint.decRef();
            this.timeOfLastActivityNs = this.nanoClock.nanoTime();
            if (this.consumerPosition() >= this.producerPosition()) {
                this.isEndOfStream = true;
            }
        }
        return count;
    }

    @Override
    public int incRef() {
        return ++this.refCount;
    }

    Status status() {
        return this.status;
    }

    void senderRelease() {
        this.hasSenderReleased = true;
    }

    long producerPosition() {
        long rawTail = LogBufferDescriptor.rawTailVolatile(this.metaDataBuffer);
        int termOffset = LogBufferDescriptor.termOffset(rawTail, this.termBufferLength);
        return LogBufferDescriptor.computePosition(LogBufferDescriptor.termId(rawTail), termOffset, this.positionBitsToShift, this.initialTermId);
    }

    long consumerPosition() {
        return this.senderPosition.getVolatile();
    }

    public static enum Status {
        ACTIVE,
        DRAINING,
        LINGER,
        CLOSING;

    }
}

