/*
 * Decompiled with CFR 0.152.
 */
package io.aeron.samples.archive;

import io.aeron.Aeron;
import io.aeron.FragmentAssembler;
import io.aeron.Subscription;
import io.aeron.archive.Archive;
import io.aeron.archive.ArchivingMediaDriver;
import io.aeron.archive.client.AeronArchive;
import io.aeron.driver.MediaDriver;
import io.aeron.logbuffer.FragmentHandler;
import io.aeron.logbuffer.Header;
import io.aeron.samples.SampleConfiguration;
import io.aeron.samples.archive.TestUtil;
import java.io.File;
import org.agrona.BufferUtil;
import org.agrona.CloseHelper;
import org.agrona.DirectBuffer;
import org.agrona.collections.MutableLong;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.console.ContinueBarrier;

public class EmbeddedReplayThroughput
implements AutoCloseable {
    private static final int REPLAY_STREAM_ID = 101;
    private static final String REPLAY_URI = "aeron:udp?endpoint=127.0.0.1:54326";
    private static final long NUMBER_OF_MESSAGES = SampleConfiguration.NUMBER_OF_MESSAGES;
    private static final int MESSAGE_LENGTH = SampleConfiguration.MESSAGE_LENGTH;
    private static final int FRAGMENT_COUNT_LIMIT = SampleConfiguration.FRAGMENT_COUNT_LIMIT;
    private static final int STREAM_ID = SampleConfiguration.STREAM_ID;
    private static final String CHANNEL = SampleConfiguration.CHANNEL;
    private ArchivingMediaDriver archivingMediaDriver;
    private Aeron aeron;
    private AeronArchive aeronArchive;
    private final UnsafeBuffer buffer = new UnsafeBuffer(BufferUtil.allocateDirectAligned(MESSAGE_LENGTH, 32));
    private FragmentHandler fragmentHandler = new FragmentAssembler(this::onMessage);
    private int messageCount;

    public static void main(String[] args) throws Exception {
        MediaDriver.loadPropertiesFiles(args);
        try (EmbeddedReplayThroughput test = new EmbeddedReplayThroughput();){
            System.out.println("Making a recording for playback...");
            long recordingLength = test.makeRecording();
            Thread.sleep(10L);
            System.out.println("Finding the recording...");
            long recordingId = test.findRecordingId(CHANNEL, STREAM_ID);
            ContinueBarrier barrier = new ContinueBarrier("Execute again?");
            do {
                System.out.printf("Replaying %,d messages%n", NUMBER_OF_MESSAGES);
                long start = System.currentTimeMillis();
                test.replayRecording(recordingLength, recordingId);
                long durationMs = System.currentTimeMillis() - start;
                double dataRate = (double)recordingLength * 1000.0 / (double)durationMs / 1048576.0;
                double recordingMb = (double)recordingLength / 1048576.0;
                long msgRate = NUMBER_OF_MESSAGES / durationMs * 1000L;
                System.out.println("Performance inclusive of replay request and connection setup:");
                System.out.printf("Replayed %.02f MB @ %.02f MB/s - %,d msg/sec%n", recordingMb, dataRate, msgRate);
            } while (barrier.await());
        }
    }

    public EmbeddedReplayThroughput() {
        String archiveDirName = Archive.Configuration.archiveDirName();
        File archiveDir = "archive".equals(archiveDirName) ? TestUtil.createTempDir() : new File(archiveDirName);
        this.archivingMediaDriver = ArchivingMediaDriver.launch(new MediaDriver.Context().dirsDeleteOnStart(true), new Archive.Context().archiveDir(archiveDir));
        this.aeron = Aeron.connect();
        this.aeronArchive = AeronArchive.connect(new AeronArchive.Context().aeron(this.aeron));
    }

    @Override
    public void close() {
        CloseHelper.close(this.aeronArchive);
        CloseHelper.close(this.archivingMediaDriver);
        this.archivingMediaDriver.archive().context().deleteArchiveDirectory();
        this.archivingMediaDriver.mediaDriver().context().deleteAeronDirectory();
    }

    public void onMessage(DirectBuffer buffer, int offset, int length, Header header) {
        int count = buffer.getInt(offset);
        if (count != this.messageCount) {
            throw new IllegalStateException("Invalid message count=" + count + " @ " + this.messageCount);
        }
        ++this.messageCount;
    }

    /*
     * Exception decompiling
     */
    private long makeRecording() {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Started 2 blocks at once
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    private void replayRecording(long recordingLength, long recordingId) {
        try (Subscription subscription = this.aeronArchive.replay(recordingId, 0L, recordingLength, REPLAY_URI, 101);){
            while (subscription.hasNoImages()) {
                Thread.yield();
            }
            this.messageCount = 0;
            while ((long)this.messageCount < NUMBER_OF_MESSAGES) {
                int fragments = subscription.poll(this.fragmentHandler, FRAGMENT_COUNT_LIMIT);
                if (0 != fragments) continue;
                if (subscription.hasNoImages()) {
                    System.out.println("Unexpected end of stream at message count: " + this.messageCount);
                    break;
                }
                Thread.yield();
            }
        }
    }

    private long findRecordingId(String expectedChannel, int expectedStreamId) {
        MutableLong foundRecordingId = new MutableLong();
        int recordingsFound = this.aeronArchive.listRecordingsForUri(0L, 10, expectedChannel, expectedStreamId, (controlSessionId, correlationId, recordingId, startTimestamp, stopTimestamp, startPosition, stopPosition, initialTermId, segmentFileLength, termBufferLength, mtuLength, sessionId, streamId, strippedChannel, originalChannel, sourceIdentity) -> foundRecordingId.set(recordingId));
        if (1 != recordingsFound) {
            throw new IllegalStateException("Should have been one recording");
        }
        return foundRecordingId.get();
    }
}

