package org.nuxeo.runtime.stream.tests;

import java.time.Duration;
import javax.inject.Inject;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.nuxeo.lib.stream.computation.Record;
import org.nuxeo.lib.stream.computation.StreamManager;
import org.nuxeo.lib.stream.log.LogManager;
import org.nuxeo.lib.stream.log.LogRecord;
import org.nuxeo.lib.stream.log.LogTailer;
import org.nuxeo.runtime.management.api.ProbeStatus;
import org.nuxeo.runtime.stream.StreamProbe;
import org.nuxeo.runtime.stream.StreamService;
import org.nuxeo.runtime.test.runner.Deploy;
import org.nuxeo.runtime.test.runner.Deploys;
import org.nuxeo.runtime.test.runner.Features;
import org.nuxeo.runtime.test.runner.FeaturesRunner;
import org.nuxeo.runtime.test.runner.RuntimeFeature;

@RunWith(FeaturesRunner.class)
@Features({RuntimeFeature.class})
@Deploys({@Deploy({"org.nuxeo.runtime.stream"}), @Deploy({"org.nuxeo.runtime.stream:test-stream-contrib.xml"})})
/* loaded from: input_file:org/nuxeo/runtime/stream/tests/TestStreamService.class */
public class TestStreamService {

    @Inject
    public StreamService service;

    @Test
    public void testLogManagerAccess() {
        Assert.assertNotNull(this.service);
        Assert.assertNotNull(this.service.getLogManager("default"));
        Assert.assertNotNull(this.service.getLogManager("import"));
        try {
            this.service.getLogManager("unknown");
            Assert.fail("Expected exception");
        } catch (IllegalArgumentException e) {
        }
        try {
            this.service.getLogManager("customDisabled");
            Assert.fail("Expected exception");
        } catch (IllegalArgumentException e2) {
        }
        LogManager logManager = this.service.getLogManager("default");
        Assert.assertNotNull(logManager);
        logManager.exists("input");
        Assert.assertEquals(1L, logManager.size("input"));
    }

    @Test
    public void testBasicLogUsage() throws Exception {
        LogManager logManager = this.service.getLogManager("default");
        logManager.getAppender("myLog").append("a key", Record.of("a key", "a value".getBytes("UTF-8")));
        LogTailer createTailer = logManager.createTailer("myGroup", "myLog");
        try {
            LogRecord read = createTailer.read(Duration.ofSeconds(1L));
            Assert.assertNotNull(read);
            Assert.assertEquals("a key", read.message().getKey());
            Assert.assertEquals("a value", new String(read.message().getData(), "UTF-8"));
            if (createTailer != null) {
                createTailer.close();
            }
        } catch (Throwable th) {
            if (createTailer != null) {
                try {
                    createTailer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testStreamProcessor() throws Exception {
        LogManager logManager = this.service.getLogManager("default");
        StreamManager streamManager = this.service.getStreamManager("default");
        LogTailer createTailer = logManager.createTailer("counter", "output");
        streamManager.append("input", Record.of("a key", "a value".getBytes("UTF-8")));
        streamManager.append("input", Record.of("skipMeNow", (byte[]) null));
        streamManager.append("input", Record.of("changeMeNow", (byte[]) null));
        LogRecord read = createTailer.read(Duration.ofSeconds(1L));
        Assert.assertNotNull("Record not found in output stream", read);
        Assert.assertEquals("a key", read.message().getKey());
        Assert.assertEquals("a value", new String(read.message().getData(), "UTF-8"));
        LogRecord read2 = createTailer.read(Duration.ofSeconds(1L));
        Assert.assertNotNull("Record not found in output stream", read2);
        Assert.assertEquals("changedNow", read2.message().getKey());
    }

    @Test
    public void testDisabledStreamProcessor() throws Exception {
        StreamManager streamManager = this.service.getStreamManager("default");
        try {
            streamManager.append("streamThatDoesNotExist", Record.of("key", (byte[]) null));
            Assert.fail("Expected exception");
        } catch (IllegalArgumentException e) {
        }
        try {
            streamManager.append("input2", Record.of("key", (byte[]) null));
            Assert.fail("Expected exception");
        } catch (IllegalArgumentException e2) {
        }
    }

    @Test
    public void testProbe() throws Exception {
        StreamManager streamManager = this.service.getStreamManager("default");
        StreamProbe streamProbe = new StreamProbe();
        streamProbe.reset();
        Assert.assertFalse(streamProbe.run().isFailure());
        try {
            streamManager.append("inputFailure", Record.of("key", (byte[]) null));
            Thread.sleep(500L);
            ProbeStatus run = streamProbe.run();
            Assert.assertFalse(run.toString(), run.isFailure());
            Thread.sleep(1500L);
            ProbeStatus run2 = streamProbe.run();
            Assert.assertTrue(run2.toString(), run2.isFailure());
            streamProbe.reset();
        } catch (Throwable th) {
            streamProbe.reset();
            throw th;
        }
    }
}
