package org.nuxeo.runtime.stream.tests;

import java.time.Duration;
import javax.inject.Inject;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.nuxeo.lib.stream.computation.Record;
import org.nuxeo.lib.stream.log.LogAppender;
import org.nuxeo.lib.stream.log.LogManager;
import org.nuxeo.lib.stream.log.LogRecord;
import org.nuxeo.lib.stream.log.LogTailer;
import org.nuxeo.runtime.stream.StreamService;
import org.nuxeo.runtime.test.runner.Deploy;
import org.nuxeo.runtime.test.runner.Deploys;
import org.nuxeo.runtime.test.runner.Features;
import org.nuxeo.runtime.test.runner.FeaturesRunner;
import org.nuxeo.runtime.test.runner.RuntimeFeature;

@RunWith(FeaturesRunner.class)
@Features({RuntimeFeature.class})
@Deploys({@Deploy({"org.nuxeo.runtime.stream"}), @Deploy({"org.nuxeo.runtime.stream:test-stream-contrib.xml"})})
/* loaded from: input_file:org/nuxeo/runtime/stream/tests/TestStreamService.class */
public class TestStreamService {

    @Inject
    public StreamService service;

    @Test
    public void testLogManagerAccess() {
        Assert.assertNotNull(this.service);
        Assert.assertNotNull(this.service.getLogManager("default"));
        Assert.assertNotNull(this.service.getLogManager("import"));
        try {
            this.service.getLogManager("unknown");
            Assert.fail("Expected exception");
        } catch (IllegalArgumentException e) {
        }
        LogManager logManager = this.service.getLogManager("default");
        Assert.assertNotNull(logManager);
        logManager.exists("input");
        Assert.assertEquals(1L, logManager.size("input"));
    }

    @Test
    public void testBasicLogUsage() throws Exception {
        LogManager logManager = this.service.getLogManager("default");
        logManager.getAppender("myLog").append("a key", Record.of("a key", "a value".getBytes("UTF-8")));
        LogTailer createTailer = logManager.createTailer("myGroup", "myLog");
        Throwable th = null;
        try {
            try {
                LogRecord read = createTailer.read(Duration.ofSeconds(1L));
                Assert.assertNotNull(read);
                Assert.assertEquals("a key", read.message().getKey());
                Assert.assertEquals("a value", new String(read.message().getData(), "UTF-8"));
                if (createTailer != null) {
                    if (0 == 0) {
                        createTailer.close();
                        return;
                    }
                    try {
                        createTailer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createTailer != null) {
                if (th != null) {
                    try {
                        createTailer.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createTailer.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testStreamProcessor() throws Exception {
        LogManager logManager = this.service.getLogManager("default");
        LogAppender appender = logManager.getAppender("input");
        LogTailer createTailer = logManager.createTailer("counter", "output");
        appender.append("a key", Record.of("a key", "a value".getBytes("UTF-8")));
        LogRecord read = createTailer.read(Duration.ofSeconds(1L));
        Assert.assertNotNull("Record not found in output stream", read);
        Assert.assertEquals("a key", read.message().getKey());
        Assert.assertEquals("a value", new String(read.message().getData(), "UTF-8"));
    }
}
