/*
 * Decompiled with CFR 0.152.
 */
package io.aeron.samples.archive;

import io.aeron.Aeron;
import io.aeron.ExclusivePublication;
import io.aeron.Subscription;
import io.aeron.archive.Archive;
import io.aeron.archive.ArchivingMediaDriver;
import io.aeron.archive.client.AeronArchive;
import io.aeron.archive.client.RecordingEventsAdapter;
import io.aeron.archive.client.RecordingEventsListener;
import io.aeron.archive.codecs.SourceLocation;
import io.aeron.driver.MediaDriver;
import io.aeron.samples.SampleConfiguration;
import io.aeron.samples.archive.TestUtil;
import java.io.File;
import org.agrona.BufferUtil;
import org.agrona.CloseHelper;
import org.agrona.concurrent.BackoffIdleStrategy;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.console.ContinueBarrier;

public class EmbeddedRecordingThroughput
implements AutoCloseable,
RecordingEventsListener {
    private static final long NUMBER_OF_MESSAGES = SampleConfiguration.NUMBER_OF_MESSAGES;
    private static final int MESSAGE_LENGTH = SampleConfiguration.MESSAGE_LENGTH;
    private static final int FRAGMENT_COUNT_LIMIT = SampleConfiguration.FRAGMENT_COUNT_LIMIT;
    private static final int STREAM_ID = SampleConfiguration.STREAM_ID;
    private static final String CHANNEL = SampleConfiguration.CHANNEL;
    private final ArchivingMediaDriver archivingMediaDriver;
    private final Aeron aeron;
    private final AeronArchive aeronArchive;
    private final UnsafeBuffer buffer = new UnsafeBuffer(BufferUtil.allocateDirectAligned(MESSAGE_LENGTH, 32));
    private final Thread recordingEventsThread;
    private final Thread consumerThread;
    private volatile long recordingStartTimeMs;
    private volatile long stopPosition;
    private volatile boolean isRunning = true;

    public static void main(String[] args) throws Exception {
        MediaDriver.loadPropertiesFiles(args);
        try (EmbeddedRecordingThroughput test = new EmbeddedRecordingThroughput();){
            Thread.sleep(1000L);
            test.startRecording();
            ContinueBarrier barrier = new ContinueBarrier("Execute again?");
            do {
                test.streamMessagesForRecording();
                Thread.sleep(10L);
            } while (barrier.await());
            test.stop();
        }
    }

    public EmbeddedRecordingThroughput() {
        String archiveDirName = Archive.Configuration.archiveDirName();
        File archiveDir = "archive".equals(archiveDirName) ? TestUtil.createTempDir() : new File(archiveDirName);
        this.archivingMediaDriver = ArchivingMediaDriver.launch(new MediaDriver.Context().dirsDeleteOnStart(true), new Archive.Context().archiveDir(archiveDir));
        this.aeron = Aeron.connect();
        this.aeronArchive = AeronArchive.connect(new AeronArchive.Context().aeron(this.aeron));
        this.recordingEventsThread = new Thread(this::runRecordingEventPoller);
        this.recordingEventsThread.setName("recording-events-poller");
        this.recordingEventsThread.start();
        this.consumerThread = new Thread(this::runConsumer);
        this.consumerThread.setName("consumer-thread");
        this.consumerThread.start();
    }

    @Override
    public void close() {
        CloseHelper.close(this.aeronArchive);
        CloseHelper.close(this.archivingMediaDriver);
        this.archivingMediaDriver.archive().context().deleteArchiveDirectory();
        this.archivingMediaDriver.mediaDriver().context().deleteAeronDirectory();
    }

    @Override
    public void onStart(long recordingId, long startPosition, int sessionId, int streamId, String channel, String sourceIdentity) {
        System.out.println("Recording started for id: " + recordingId);
    }

    @Override
    public void onProgress(long recordingId, long startPosition, long position) {
        if (position == this.stopPosition) {
            long durationMs = System.currentTimeMillis() - this.recordingStartTimeMs;
            long recordingLength = position - startPosition;
            double dataRate = (double)recordingLength * 1000.0 / (double)durationMs / 1048576.0;
            double recordingMb = (double)recordingLength / 1048576.0;
            long msgRate = NUMBER_OF_MESSAGES / durationMs * 1000L;
            System.out.printf("Recorded %.02f MB @ %.02f MB/s - %,d msg/sec%n", recordingMb, dataRate, msgRate);
        }
    }

    @Override
    public void onStop(long recordingId, long startPosition, long stopPosition) {
    }

    public void streamMessagesForRecording() {
        try (ExclusivePublication publication = this.aeron.addExclusivePublication(CHANNEL, STREAM_ID);){
            this.stopPosition = -1L;
            this.recordingStartTimeMs = System.currentTimeMillis();
            int i = 0;
            while ((long)i < NUMBER_OF_MESSAGES) {
                this.buffer.putInt(0, i);
                while (publication.offer(this.buffer, 0, MESSAGE_LENGTH) < 0L) {
                    Thread.yield();
                }
                ++i;
            }
            this.stopPosition = publication.position();
        }
    }

    public void stop() throws InterruptedException {
        this.isRunning = false;
        this.recordingEventsThread.join();
        this.consumerThread.join();
    }

    public void startRecording() {
        this.aeronArchive.startRecording(CHANNEL, STREAM_ID, SourceLocation.LOCAL);
    }

    private void runRecordingEventPoller() {
        try (Subscription subscription = this.aeron.addSubscription(AeronArchive.Configuration.recordingEventsChannel(), AeronArchive.Configuration.recordingEventsStreamId());){
            BackoffIdleStrategy idleStrategy = new BackoffIdleStrategy(10L, 100L, 1L, 1L);
            RecordingEventsAdapter recordingEventsAdapter = new RecordingEventsAdapter(this, subscription, FRAGMENT_COUNT_LIMIT);
            while (this.isRunning) {
                idleStrategy.idle(recordingEventsAdapter.poll());
            }
        }
    }

    private void runConsumer() {
        try (Subscription subscription = this.aeron.addSubscription(CHANNEL, STREAM_ID);){
            BackoffIdleStrategy idleStrategy = new BackoffIdleStrategy(10L, 100L, 1L, 1L);
            while (this.isRunning) {
                idleStrategy.idle(subscription.poll(TestUtil.NOOP_FRAGMENT_HANDLER, FRAGMENT_COUNT_LIMIT));
            }
        }
    }
}

