/*
 * Decompiled with CFR 0.152.
 */
package io.aeron.driver.media;

import io.aeron.driver.EventLog;
import io.aeron.driver.MediaDriver;
import io.aeron.driver.NetworkPublication;
import io.aeron.driver.media.UdpChannel;
import io.aeron.driver.media.UdpChannelTransport;
import io.aeron.driver.media.UdpDestinationTracker;
import io.aeron.driver.status.ChannelEndpointStatus;
import io.aeron.driver.status.SystemCounterDescriptor;
import io.aeron.protocol.NakFlyweight;
import io.aeron.protocol.RttMeasurementFlyweight;
import io.aeron.protocol.StatusMessageFlyweight;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.PortUnreachableException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.TimeUnit;
import org.agrona.LangUtil;
import org.agrona.collections.BiInt2ObjectMap;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.concurrent.status.AtomicCounter;

@EventLog
public class SendChannelEndpoint
extends UdpChannelTransport {
    private static final long DESTINATION_TIMEOUT = TimeUnit.SECONDS.toNanos(5L);
    private int refCount = 0;
    private final BiInt2ObjectMap<NetworkPublication> publicationBySessionAndStreamId = new BiInt2ObjectMap();
    private final UdpDestinationTracker multiDestinationTracker;
    private final AtomicCounter statusMessagesReceived;
    private final AtomicCounter nakMessagesReceived;
    private final AtomicCounter statusIndicator;

    public SendChannelEndpoint(UdpChannel udpChannel, AtomicCounter statusIndicator, MediaDriver.Context context) {
        super(udpChannel, udpChannel.remoteControl(), udpChannel.localControl(), !udpChannel.hasExplicitControl() ? udpChannel.remoteData() : null, context.errorLog(), context.systemCounters().get(SystemCounterDescriptor.INVALID_PACKETS));
        this.nakMessagesReceived = context.systemCounters().get(SystemCounterDescriptor.NAK_MESSAGES_RECEIVED);
        this.statusMessagesReceived = context.systemCounters().get(SystemCounterDescriptor.STATUS_MESSAGES_RECEIVED);
        this.statusIndicator = statusIndicator;
        UdpDestinationTracker destinationTracker = null;
        if (udpChannel.hasExplicitControl()) {
            String mode = udpChannel.channelUri().get("control-mode");
            destinationTracker = "manual".equals(mode) ? new UdpDestinationTracker(this::presend) : new UdpDestinationTracker(context.nanoClock(), this::presend, DESTINATION_TIMEOUT);
        }
        this.multiDestinationTracker = destinationTracker;
    }

    public int decRef() {
        return --this.refCount;
    }

    public int incRef() {
        return ++this.refCount;
    }

    public void openChannel() {
        this.openDatagramChannel(this.statusIndicator);
    }

    public String originalUriString() {
        return this.udpChannel().originalUriString();
    }

    public void indicateActive() {
        long currentStatus = this.statusIndicator.get();
        if (currentStatus != 0L) {
            throw new IllegalStateException("Channel cannot be registered unless INITALIZING: status=" + ChannelEndpointStatus.status(currentStatus));
        }
        this.statusIndicator.setOrdered(1L);
    }

    public void closeStatusIndicator() {
        if (!this.statusIndicator.isClosed()) {
            this.statusIndicator.setOrdered(2L);
            this.statusIndicator.close();
        }
    }

    public boolean shouldBeClosed() {
        return 0 == this.refCount && !this.statusIndicator.isClosed();
    }

    public void registerForSend(NetworkPublication publication) {
        this.publicationBySessionAndStreamId.put(publication.sessionId(), publication.streamId(), publication);
    }

    public void unregisterForSend(NetworkPublication publication) {
        this.publicationBySessionAndStreamId.remove(publication.sessionId(), publication.streamId());
    }

    public int send(ByteBuffer buffer) {
        int byteSent = 0;
        if (null == this.multiDestinationTracker) {
            try {
                this.presend(buffer, this.connectAddress);
                byteSent = this.sendDatagramChannel.write(buffer);
            }
            catch (PortUnreachableException | ClosedChannelException iOException) {
            }
            catch (IOException ex) {
                LangUtil.rethrowUnchecked(ex);
            }
        } else {
            byteSent = this.multiDestinationTracker.sendToDestinations(this.sendDatagramChannel, buffer);
        }
        return byteSent;
    }

    protected void presend(ByteBuffer buffer, InetSocketAddress address) {
    }

    public void onStatusMessage(StatusMessageFlyweight msg, UnsafeBuffer buffer, int length, InetSocketAddress srcAddress) {
        NetworkPublication publication = this.publicationBySessionAndStreamId.get(msg.sessionId(), msg.streamId());
        if (null != this.multiDestinationTracker) {
            this.multiDestinationTracker.destinationActivity(msg, srcAddress);
            if (0 == msg.sessionId() && 0 == msg.streamId() && 128 == (msg.flags() & 0x80)) {
                this.publicationBySessionAndStreamId.forEach(NetworkPublication::triggerSendSetupFrame);
                this.statusMessagesReceived.orderedIncrement();
            }
        }
        if (null != publication) {
            if (128 == (msg.flags() & 0x80)) {
                publication.triggerSendSetupFrame();
            } else {
                publication.onStatusMessage(msg, srcAddress);
            }
            this.statusMessagesReceived.orderedIncrement();
        }
    }

    public void onNakMessage(NakFlyweight msg, UnsafeBuffer buffer, int length, InetSocketAddress srcAddress) {
        NetworkPublication publication = this.publicationBySessionAndStreamId.get(msg.sessionId(), msg.streamId());
        if (null != publication) {
            publication.onNak(msg.termId(), msg.termOffset(), msg.length());
            this.nakMessagesReceived.orderedIncrement();
        }
    }

    public void onRttMeasurement(RttMeasurementFlyweight msg, UnsafeBuffer buffer, int length, InetSocketAddress srcAddress) {
        NetworkPublication publication = this.publicationBySessionAndStreamId.get(msg.sessionId(), msg.streamId());
        if (null != publication) {
            publication.onRttMeasurement(msg, srcAddress);
        }
    }

    public void validateAllowsManualControl() {
        if (null == this.multiDestinationTracker || !this.multiDestinationTracker.isManualControlMode()) {
            throw new IllegalArgumentException("Control channel does not allow manual control");
        }
    }

    public void addDestination(InetSocketAddress address) {
        this.multiDestinationTracker.addDestination(address);
    }

    public void removeDestination(InetSocketAddress address) {
        this.multiDestinationTracker.removeDestination(address);
    }
}

