package org.nuxeo.runtime.stream.tests;

import io.dropwizard.metrics5.Gauge;
import io.dropwizard.metrics5.MetricName;
import io.dropwizard.metrics5.MetricRegistry;
import io.dropwizard.metrics5.SharedMetricRegistries;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.SortedMap;
import javax.inject.Inject;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.nuxeo.lib.stream.computation.ComputationMetadataMapping;
import org.nuxeo.lib.stream.computation.Record;
import org.nuxeo.lib.stream.computation.StreamManager;
import org.nuxeo.lib.stream.computation.StreamProcessor;
import org.nuxeo.lib.stream.computation.internals.ComputationContextImpl;
import org.nuxeo.lib.stream.log.LogManager;
import org.nuxeo.lib.stream.log.LogRecord;
import org.nuxeo.lib.stream.log.LogTailer;
import org.nuxeo.lib.stream.log.Name;
import org.nuxeo.runtime.management.api.ProbeStatus;
import org.nuxeo.runtime.stream.RuntimeStreamFeature;
import org.nuxeo.runtime.stream.StreamMetricsComputation;
import org.nuxeo.runtime.stream.StreamProbe;
import org.nuxeo.runtime.stream.StreamService;
import org.nuxeo.runtime.test.runner.Deploy;
import org.nuxeo.runtime.test.runner.Features;
import org.nuxeo.runtime.test.runner.FeaturesRunner;

@RunWith(FeaturesRunner.class)
@Deploy({"org.nuxeo.runtime.stream:test-stream-contrib.xml"})
@Features({RuntimeStreamFeature.class})
/* loaded from: input_file:org/nuxeo/runtime/stream/tests/TestStreamService.class */
public class TestStreamService {

    @Inject
    public StreamService service;

    @Test
    public void testLogManagerAccess() {
        Assert.assertNotNull(this.service);
        LogManager logManager = this.service.getLogManager();
        Assert.assertNotNull(logManager);
        Assert.assertNotNull(this.service.getStreamManager());
        Assert.assertTrue(logManager.exists(Name.ofUrn("input")));
        Assert.assertTrue(logManager.exists(Name.ofUrn("output")));
        Assert.assertTrue(logManager.exists(Name.ofUrn("myLog")));
        Assert.assertTrue(logManager.exists(Name.ofUrn("cq/cq-foo")));
        Assert.assertFalse(logManager.exists(Name.ofUrn("aLogThatShouldNotBeCreated")));
        Assert.assertFalse(logManager.exists(Name.ofUrn("unexisting")));
    }

    @Test
    public void testBasicLogUsage() throws Exception {
        LogManager logManager = this.service.getLogManager();
        Name ofUrn = Name.ofUrn("myLog");
        logManager.getAppender(ofUrn).append("a key", Record.of("a key", "a value".getBytes("UTF-8")));
        LogTailer createTailer = logManager.createTailer(Name.ofUrn("myGroup"), ofUrn);
        try {
            LogRecord read = createTailer.read(Duration.ofSeconds(1L));
            Assert.assertNotNull(read);
            Assert.assertEquals("a key", read.message().getKey());
            Assert.assertEquals("a value", new String(read.message().getData(), "UTF-8"));
            if (createTailer != null) {
                createTailer.close();
            }
        } catch (Throwable th) {
            if (createTailer != null) {
                try {
                    createTailer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testStreamProcessor() throws Exception {
        LogManager logManager = this.service.getLogManager();
        StreamManager streamManager = this.service.getStreamManager();
        LogTailer createTailer = logManager.createTailer(Name.ofUrn("counter"), Name.ofUrn("output"));
        streamManager.append("input", Record.of("a key", "a value".getBytes("UTF-8")));
        streamManager.append("input", Record.of("skipMeNow", (byte[]) null));
        streamManager.append("input", Record.of("changeMeNow", (byte[]) null));
        LogRecord read = createTailer.read(Duration.ofSeconds(1L));
        Assert.assertNotNull("Record not found in output stream", read);
        Assert.assertEquals("a key", read.message().getKey());
        Assert.assertEquals("a value", new String(read.message().getData(), "UTF-8"));
        LogRecord read2 = createTailer.read(Duration.ofSeconds(1L));
        Assert.assertNotNull("Record not found in output stream", read2);
        Assert.assertEquals("changedNow", read2.message().getKey());
    }

    @Test
    public void testDisabledStreamProcessor() throws Exception {
        StreamManager streamManager = this.service.getStreamManager();
        try {
            streamManager.append("streamThatDoesNotExist", Record.of("key", (byte[]) null));
            Assert.fail("Expected exception");
        } catch (IllegalArgumentException e) {
        }
        try {
            streamManager.append("input2", Record.of("key", (byte[]) null));
            Assert.fail("Expected exception");
        } catch (IllegalArgumentException e2) {
        }
    }

    @Test
    public void testDefaultPartitions() throws Exception {
        Assert.assertTrue(this.service.getLogManager().exists(Name.ofUrn("s1")));
        Assert.assertEquals(2L, r0.size(r0));
    }

    @Test
    public void testRegisterAndExternalStream() throws Exception {
        LogManager logManager = this.service.getLogManager();
        Assert.assertTrue(logManager.exists(Name.ofUrn("input3")));
        Assert.assertTrue(logManager.exists(Name.ofUrn("output3")));
        Assert.assertTrue(logManager.exists(Name.ofUrn("registerInput")));
        Assert.assertFalse(logManager.exists(Name.ofUrn("externalOutput")));
        LogTailer createTailer = logManager.createTailer(Name.ofUrn("test"), Name.ofUrn("output3"));
        createTailer.toEnd();
        StreamManager streamManager = this.service.getStreamManager();
        streamManager.append("input3", Record.of("key", (byte[]) null));
        Assert.assertNull(createTailer.read(Duration.ofSeconds(1L)));
        StreamProcessor createStreamProcessor = streamManager.createStreamProcessor("registerProcessor");
        Assert.assertNotNull(createStreamProcessor);
        createStreamProcessor.start();
        createStreamProcessor.waitForAssignments(Duration.ofSeconds(10L));
        createStreamProcessor.drainAndStop(Duration.ofSeconds(5L));
        Assert.assertEquals("key", createTailer.read(Duration.ofSeconds(1L)).message().getKey());
    }

    @Test
    public void testProbe() throws Exception {
        StreamManager streamManager = this.service.getStreamManager();
        StreamProbe streamProbe = new StreamProbe();
        streamProbe.reset();
        Assert.assertFalse(streamProbe.run().isFailure());
        try {
            streamManager.append("inputFailure", Record.of("key", (byte[]) null));
            Thread.sleep(500L);
            ProbeStatus run = streamProbe.run();
            Assert.assertFalse(run.toString(), run.isFailure());
            Thread.sleep(1500L);
            ProbeStatus run2 = streamProbe.run();
            Assert.assertTrue(run2.toString(), run2.isFailure());
            streamProbe.reset();
        } catch (Throwable th) {
            streamProbe.reset();
            throw th;
        }
    }

    @Test
    public void testStreamMetrics() {
        MetricRegistry orCreate = SharedMetricRegistries.getOrCreate("org.nuxeo.runtime.metrics.MetricsService");
        Assert.assertTrue(orCreate.getGauges((metricName, metric) -> {
            return metricName.getKey().startsWith("nuxeo.streams.global");
        }).isEmpty());
        StreamMetricsComputation streamMetricsComputation = new StreamMetricsComputation(Duration.ofMinutes(1L), (List) null);
        ComputationContextImpl computationContextImpl = new ComputationContextImpl(new ComputationMetadataMapping(streamMetricsComputation.metadata(), Collections.emptyMap()));
        Assert.assertFalse(computationContextImpl.isSpareComputation());
        streamMetricsComputation.init(computationContextImpl);
        streamMetricsComputation.processTimer(computationContextImpl, "tracker", 0L);
        SortedMap gauges = orCreate.getGauges((metricName2, metric2) -> {
            return metricName2.getKey().startsWith("nuxeo.streams.global");
        });
        Assert.assertFalse(gauges.isEmpty());
        Gauge gauge = (Gauge) gauges.get(MetricName.build(new String[]{"nuxeo.streams.global.stream.group.end"}).tagged(new String[]{"stream", "input"}).tagged(new String[]{"group", "myComputation"}));
        Assert.assertNotNull(gauge);
        Assert.assertNotNull(gauge.getValue());
        streamMetricsComputation.destroy();
        Assert.assertTrue(orCreate.getGauges((metricName3, metric3) -> {
            return metricName3.getKey().startsWith("nuxeo.streams.global");
        }).isEmpty());
    }
}
