/*
 * Decompiled with CFR 0.152.
 */
package io.aeron.archive;

import io.aeron.Aeron;
import io.aeron.ChannelUri;
import io.aeron.ChannelUriStringBuilder;
import io.aeron.ExclusivePublication;
import io.aeron.Image;
import io.aeron.Publication;
import io.aeron.Subscription;
import io.aeron.archive.Archive;
import io.aeron.archive.Catalog;
import io.aeron.archive.ControlResponseProxy;
import io.aeron.archive.ControlSession;
import io.aeron.archive.ControlSessionDemuxer;
import io.aeron.archive.ListRecordingsForUriSession;
import io.aeron.archive.ListRecordingsSession;
import io.aeron.archive.RecordingEventsProxy;
import io.aeron.archive.RecordingSession;
import io.aeron.archive.ReplaySession;
import io.aeron.archive.Session;
import io.aeron.archive.SessionWorker;
import io.aeron.archive.client.AeronArchive;
import io.aeron.archive.codecs.ControlResponseCode;
import io.aeron.archive.codecs.RecordingDescriptorDecoder;
import io.aeron.archive.codecs.RecordingDescriptorEncoder;
import io.aeron.archive.codecs.SourceLocation;
import java.io.File;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.file.OpenOption;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ThreadLocalRandom;
import org.agrona.CloseHelper;
import org.agrona.UnsafeAccess;
import org.agrona.collections.Long2ObjectHashMap;
import org.agrona.concurrent.AgentInvoker;
import org.agrona.concurrent.EpochClock;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.concurrent.status.AtomicCounter;
import org.agrona.concurrent.status.CountersManager;

abstract class ArchiveConductor
extends SessionWorker<Session> {
    private static final int CONTROL_TERM_LENGTH = AeronArchive.Configuration.controlTermBufferLength();
    private static final int CONTROL_MTU = AeronArchive.Configuration.controlMtuLength();
    private final ChannelUriStringBuilder channelBuilder = new ChannelUriStringBuilder();
    private final Long2ObjectHashMap<ReplaySession> replaySessionByIdMap = new Long2ObjectHashMap();
    private final Long2ObjectHashMap<RecordingSession> recordingSessionByIdMap = new Long2ObjectHashMap();
    private final Long2ObjectHashMap<AtomicCounter> recordingPositionByIdMap = new Long2ObjectHashMap();
    private final Map<String, Subscription> subscriptionMap = new HashMap<String, Subscription>();
    private final ReplayPublicationSupplier newReplayPublication = this::newReplayPublication;
    private final UnsafeBuffer descriptorBuffer = new UnsafeBuffer();
    private final RecordingDescriptorDecoder recordingDescriptorDecoder = new RecordingDescriptorDecoder();
    private final RecordingDescriptorEncoder recordingDescriptorEncoder = new RecordingDescriptorEncoder();
    private final Aeron aeron;
    private final AgentInvoker aeronAgentInvoker;
    private final AgentInvoker driverAgentInvoker;
    private final EpochClock epochClock;
    private final File archiveDir;
    private final FileChannel archiveDirChannel;
    private final Catalog catalog;
    private final RecordingEventsProxy recordingEventsProxy;
    private final int maxConcurrentRecordings;
    private final int maxConcurrentReplays;
    private final CountersManager countersManager;
    protected final Archive.Context ctx;
    protected final ControlResponseProxy controlResponseProxy;
    protected SessionWorker<ReplaySession> replayer;
    protected SessionWorker<RecordingSession> recorder;
    private long replaySessionId = ThreadLocalRandom.current().nextInt();
    private long controlSessionId = ThreadLocalRandom.current().nextInt();

    ArchiveConductor(Aeron aeron, Archive.Context ctx) {
        super("archive-conductor", ctx.countedErrorHandler());
        this.aeron = aeron;
        this.ctx = ctx;
        this.aeronAgentInvoker = aeron.conductorAgentInvoker();
        Objects.requireNonNull(this.aeronAgentInvoker, "An aeron invoker should be present in the context");
        this.maxConcurrentRecordings = ctx.maxConcurrentRecordings();
        this.maxConcurrentReplays = ctx.maxConcurrentReplays();
        this.epochClock = ctx.epochClock();
        this.driverAgentInvoker = ctx.mediaDriverAgentInvoker();
        this.archiveDir = ctx.archiveDir();
        int fileSyncLevel = ctx.fileSyncLevel();
        this.archiveDirChannel = ArchiveConductor.channelForDirectorySync(this.archiveDir, fileSyncLevel);
        this.controlResponseProxy = new ControlResponseProxy();
        aeron.addSubscription(ctx.controlChannel(), ctx.controlStreamId(), this::onControlConnection, null);
        this.recordingEventsProxy = new RecordingEventsProxy(ctx.idleStrategy(), aeron.addPublication(ctx.recordingEventsChannel(), ctx.recordingEventsStreamId()));
        this.catalog = new Catalog(this.archiveDir, this.archiveDirChannel, fileSyncLevel, this.epochClock);
        this.countersManager = ctx.countersManager();
    }

    @Override
    public void onStart() {
        this.replayer = this.newReplayer();
        this.recorder = this.newRecorder();
    }

    protected abstract SessionWorker<RecordingSession> newRecorder();

    protected abstract SessionWorker<ReplaySession> newReplayer();

    @Override
    protected final void preSessionsClose() {
        this.closeSessionWorkers();
        this.subscriptionMap.values().forEach(Subscription::close);
        this.subscriptionMap.clear();
    }

    protected abstract void closeSessionWorkers();

    @Override
    protected void postSessionsClose() {
        CloseHelper.quietClose(this.catalog);
        CloseHelper.quietClose(this.archiveDirChannel);
        CloseHelper.quietClose(this.aeronAgentInvoker);
        CloseHelper.quietClose(this.driverAgentInvoker);
    }

    @Override
    protected int preWork() {
        int workCount = 0;
        workCount += null != this.driverAgentInvoker ? this.driverAgentInvoker.invoke() : 0;
        return workCount += this.aeronAgentInvoker.invoke();
    }

    private void onControlConnection(Image image) {
        this.addSession(new ControlSessionDemuxer(image, this));
    }

    void stopRecording(long correlationId, ControlSession controlSession, int streamId, String channel) {
        try {
            String key = ArchiveConductor.makeKey(streamId, this.strippedChannelBuilder(channel).build());
            Subscription oldSubscription = this.subscriptionMap.remove(key);
            if (oldSubscription != null) {
                oldSubscription.close();
                controlSession.sendOkResponse(correlationId, this.controlResponseProxy);
            } else {
                controlSession.sendResponse(correlationId, ControlResponseCode.ERROR, "No recording subscription found for: " + key, this.controlResponseProxy);
            }
        }
        catch (Exception ex) {
            this.errorHandler.onError(ex);
            controlSession.sendResponse(correlationId, ControlResponseCode.ERROR, ex.getMessage(), this.controlResponseProxy);
        }
    }

    void startRecordingSubscription(long correlationId, ControlSession controlSession, int streamId, String originalChannel, SourceLocation sourceLocation) {
        if (this.recordingSessionByIdMap.size() >= this.maxConcurrentRecordings) {
            controlSession.sendResponse(correlationId, ControlResponseCode.ERROR, "Max concurrent recordings reached: " + this.maxConcurrentRecordings, this.controlResponseProxy);
            return;
        }
        try {
            String strippedChannel = this.strippedChannelBuilder(originalChannel).build();
            String key = ArchiveConductor.makeKey(streamId, strippedChannel);
            Subscription oldSubscription = this.subscriptionMap.get(key);
            if (oldSubscription == null) {
                String channel = originalChannel.contains("udp") && sourceLocation == SourceLocation.LOCAL ? "aeron-spy:" + strippedChannel : strippedChannel;
                Subscription subscription = this.aeron.addSubscription(channel, streamId, image -> this.startImageRecording(strippedChannel, originalChannel, image), null);
                this.subscriptionMap.put(key, subscription);
                controlSession.sendOkResponse(correlationId, this.controlResponseProxy);
            } else {
                controlSession.sendResponse(correlationId, ControlResponseCode.ERROR, "Recording already setup for subscription: " + key, this.controlResponseProxy);
            }
        }
        catch (Exception ex) {
            this.errorHandler.onError(ex);
            controlSession.sendResponse(correlationId, ControlResponseCode.ERROR, ex.getMessage(), this.controlResponseProxy);
        }
    }

    ListRecordingsSession newListRecordingsSession(long correlationId, long fromId, int count, ControlSession controlSession) {
        return new ListRecordingsSession(correlationId, fromId, count, this.catalog, this.controlResponseProxy, controlSession, this.descriptorBuffer);
    }

    ListRecordingsForUriSession newListRecordingsForUriSession(long correlationId, long fromRecordingId, int count, int streamId, String channel, ControlSession controlSession) {
        return new ListRecordingsForUriSession(correlationId, fromRecordingId, count, channel, streamId, this.catalog, this.controlResponseProxy, controlSession, this.descriptorBuffer, this.recordingDescriptorDecoder);
    }

    void startReplay(long correlationId, ControlSession controlSession, long recordingId, long position, long length, int replayStreamId, String replayChannel) {
        if (this.replaySessionByIdMap.size() >= this.maxConcurrentReplays) {
            controlSession.sendResponse(correlationId, ControlResponseCode.ERROR, "Max concurrent replays reached: " + this.maxConcurrentReplays, this.controlResponseProxy);
            return;
        }
        UnsafeBuffer descriptorBuffer = this.catalog.wrapDescriptor(recordingId);
        if (descriptorBuffer == null) {
            controlSession.sendResponse(correlationId, ControlResponseCode.ERROR, "Unknown recording : " + recordingId, this.controlResponseProxy);
            return;
        }
        long newId = this.replaySessionId++;
        ReplaySession replaySession = new ReplaySession(position, length, this.newReplayPublication, controlSession, this.archiveDir, this.controlResponseProxy, newId, correlationId, this.epochClock, replayChannel, replayStreamId, descriptorBuffer, this.recordingPositionByIdMap.get(recordingId));
        this.replaySessionByIdMap.put(newId, replaySession);
        this.replayer.addSession(replaySession);
    }

    ControlSession newControlSession(long correlationId, int streamId, String channel, ControlSessionDemuxer demuxer) {
        String controlChannel = !channel.contains("term-length") ? this.strippedChannelBuilder(channel).termLength(CONTROL_TERM_LENGTH).mtu(CONTROL_MTU).build() : channel;
        Publication publication = this.aeron.addPublication(controlChannel, streamId);
        ControlSession controlSession = new ControlSession(this.controlSessionId++, correlationId, demuxer, publication, this, this.epochClock, this.controlResponseProxy);
        this.addSession(controlSession);
        return controlSession;
    }

    private static String makeKey(int streamId, String strippedChannel) {
        return streamId + 58 + strippedChannel;
    }

    ChannelUriStringBuilder strippedChannelBuilder(String channel) {
        ChannelUri channelUri = ChannelUri.parse(channel);
        this.channelBuilder.clear().media(channelUri.media()).endpoint(channelUri.get("endpoint")).networkInterface(channelUri.get("interface")).controlEndpoint(channelUri.get("control"));
        return this.channelBuilder;
    }

    private void startImageRecording(String strippedChannel, String originalChannel, Image image) {
        if (this.recordingSessionByIdMap.size() >= 2 * this.maxConcurrentRecordings) {
            throw new IllegalStateException("Too many recordings, can't record: '" + originalChannel + ":" + image.subscription().streamId() + "'");
        }
        int sessionId = image.sessionId();
        int streamId = image.subscription().streamId();
        String sourceIdentity = image.sourceIdentity();
        int termBufferLength = image.termBufferLength();
        int mtuLength = image.mtuLength();
        int initialTermId = image.initialTermId();
        long startPosition = image.joinPosition();
        long recordingId = this.catalog.addNewRecording(startPosition, this.epochClock.time(), initialTermId, this.ctx.segmentFileLength(), termBufferLength, mtuLength, sessionId, streamId, strippedChannel, originalChannel, sourceIdentity);
        AtomicCounter position = this.newRecordingPositionCounter(recordingId, sessionId, streamId, strippedChannel);
        RecordingSession session = new RecordingSession(recordingId, this.catalog.wrapDescriptor(recordingId), this.recordingEventsProxy, strippedChannel, image, position, this.archiveDirChannel, this.ctx);
        this.recordingSessionByIdMap.put(recordingId, session);
        this.recordingPositionByIdMap.put(recordingId, position);
        this.recorder.addSession(session);
    }

    void closeRecordingSession(RecordingSession session) {
        this.recordingSessionByIdMap.remove(session.sessionId());
        this.closeSession(session);
        AtomicCounter position = this.recordingPositionByIdMap.remove(session.sessionId());
        Catalog.wrapDescriptorEncoder(this.recordingDescriptorEncoder, session.descriptorBuffer());
        this.recordingDescriptorEncoder.stopPosition(position.get());
        this.recordingDescriptorEncoder.stopTimestamp(this.epochClock.time());
        UnsafeAccess.UNSAFE.storeFence();
        position.close();
    }

    void closeReplaySession(ReplaySession session) {
        this.replaySessionByIdMap.remove(session.sessionId());
        this.closeSession(session);
    }

    private ExclusivePublication newReplayPublication(String replayChannel, int replayStreamId, long fromPosition, int mtuLength, int initialTermId, int termBufferLength) {
        int termId = (int)(fromPosition / (long)termBufferLength + (long)initialTermId);
        int termOffset = (int)(fromPosition % (long)termBufferLength);
        String channel = this.strippedChannelBuilder(replayChannel).mtu(mtuLength).termLength(termBufferLength).initialTermId(initialTermId).termId(termId).termOffset(termOffset).build();
        return this.aeron.addExclusivePublication(channel, replayStreamId);
    }

    private static FileChannel channelForDirectorySync(File directory, int fileSyncLevel) {
        if (fileSyncLevel > 0) {
            try {
                return FileChannel.open(directory.toPath(), new OpenOption[0]);
            }
            catch (IOException iOException) {
                // empty catch block
            }
        }
        return null;
    }

    private AtomicCounter newRecordingPositionCounter(long recordingId, int sessionId, int streamId, String strippedChannel) {
        String label = "rec-pos: " + recordingId + ' ' + sessionId + ' ' + streamId + ' ' + strippedChannel;
        return this.countersManager.newCounter(label, 100, buffer -> buffer.putLong(0, recordingId));
    }

    static interface ReplayPublicationSupplier {
        public ExclusivePublication newReplayPublication(String var1, int var2, long var3, int var5, int var6, int var7);
    }
}

