/*
 * Decompiled with CFR 0.152.
 */
package org.nuxeo.runtime.stream.tests;

import io.dropwizard.metrics5.Gauge;
import io.dropwizard.metrics5.MetricName;
import io.dropwizard.metrics5.MetricRegistry;
import io.dropwizard.metrics5.SharedMetricRegistries;
import java.io.Externalizable;
import java.time.Duration;
import java.util.Collections;
import java.util.SortedMap;
import javax.inject.Inject;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.nuxeo.lib.stream.computation.ComputationContext;
import org.nuxeo.lib.stream.computation.ComputationMetadataMapping;
import org.nuxeo.lib.stream.computation.Record;
import org.nuxeo.lib.stream.computation.StreamManager;
import org.nuxeo.lib.stream.computation.StreamProcessor;
import org.nuxeo.lib.stream.computation.internals.ComputationContextImpl;
import org.nuxeo.lib.stream.log.LogAppender;
import org.nuxeo.lib.stream.log.LogManager;
import org.nuxeo.lib.stream.log.LogRecord;
import org.nuxeo.lib.stream.log.LogTailer;
import org.nuxeo.lib.stream.log.Name;
import org.nuxeo.runtime.management.api.ProbeStatus;
import org.nuxeo.runtime.stream.RuntimeStreamFeature;
import org.nuxeo.runtime.stream.StreamMetricsComputation;
import org.nuxeo.runtime.stream.StreamProbe;
import org.nuxeo.runtime.stream.StreamService;
import org.nuxeo.runtime.test.runner.Deploy;
import org.nuxeo.runtime.test.runner.Features;
import org.nuxeo.runtime.test.runner.FeaturesRunner;

@RunWith(value=FeaturesRunner.class)
@Features(value={RuntimeStreamFeature.class})
@Deploy(value={"org.nuxeo.runtime.stream:test-stream-contrib.xml"})
public class TestStreamService {
    @Inject
    public StreamService service;

    @Test
    public void testLogManagerAccess() {
        Assert.assertNotNull((Object)this.service);
        LogManager manager = this.service.getLogManager();
        Assert.assertNotNull((Object)manager);
        Assert.assertNotNull((Object)this.service.getStreamManager());
        Assert.assertTrue((boolean)manager.exists(Name.ofUrn((String)"input")));
        Assert.assertTrue((boolean)manager.exists(Name.ofUrn((String)"output")));
        Assert.assertTrue((boolean)manager.exists(Name.ofUrn((String)"myLog")));
        Assert.assertTrue((boolean)manager.exists(Name.ofUrn((String)"cq/cq-foo")));
        Assert.assertFalse((boolean)manager.exists(Name.ofUrn((String)"aLogThatShouldNotBeCreated")));
        Assert.assertFalse((boolean)manager.exists(Name.ofUrn((String)"unexisting")));
    }

    @Test
    public void testBasicLogUsage() throws Exception {
        LogManager manager = this.service.getLogManager();
        Name logName = Name.ofUrn((String)"myLog");
        String key = "a key";
        String value = "a value";
        LogAppender appender = manager.getAppender(logName);
        appender.append(key, (Externalizable)Record.of((String)key, (byte[])value.getBytes("UTF-8")));
        try (LogTailer tailer = manager.createTailer(Name.ofUrn((String)"myGroup"), logName);){
            LogRecord logRecord = tailer.read(Duration.ofSeconds(1L));
            Assert.assertNotNull((Object)logRecord);
            Assert.assertEquals((Object)key, (Object)((Record)logRecord.message()).getKey());
            Assert.assertEquals((Object)value, (Object)new String(((Record)logRecord.message()).getData(), "UTF-8"));
        }
    }

    @Test
    public void testStreamProcessor() throws Exception {
        LogManager manager = this.service.getLogManager();
        StreamManager streamManager = this.service.getStreamManager();
        LogTailer tailer = manager.createTailer(Name.ofUrn((String)"counter"), Name.ofUrn((String)"output"));
        String key = "a key";
        String value = "a value";
        streamManager.append("input", Record.of((String)key, (byte[])value.getBytes("UTF-8")));
        streamManager.append("input", Record.of((String)"skipMeNow", null));
        streamManager.append("input", Record.of((String)"changeMeNow", null));
        LogRecord logRecord = tailer.read(Duration.ofSeconds(1L));
        Assert.assertNotNull((String)"Record not found in output stream", (Object)logRecord);
        Assert.assertEquals((Object)key, (Object)((Record)logRecord.message()).getKey());
        Assert.assertEquals((Object)value, (Object)new String(((Record)logRecord.message()).getData(), "UTF-8"));
        logRecord = tailer.read(Duration.ofSeconds(1L));
        Assert.assertNotNull((String)"Record not found in output stream", (Object)logRecord);
        Assert.assertEquals((Object)"changedNow", (Object)((Record)logRecord.message()).getKey());
    }

    @Test
    public void testDisabledStreamProcessor() throws Exception {
        StreamManager streamManager = this.service.getStreamManager();
        try {
            streamManager.append("streamThatDoesNotExist", Record.of((String)"key", null));
            Assert.fail((String)"Expected exception");
        }
        catch (IllegalArgumentException illegalArgumentException) {
            // empty catch block
        }
        try {
            streamManager.append("input2", Record.of((String)"key", null));
            Assert.fail((String)"Expected exception");
        }
        catch (IllegalArgumentException illegalArgumentException) {
            // empty catch block
        }
    }

    @Test
    public void testDefaultPartitions() throws Exception {
        LogManager manager = this.service.getLogManager();
        Name streamName = Name.ofUrn((String)"s1");
        Assert.assertTrue((boolean)manager.exists(streamName));
        Assert.assertEquals((long)2L, (long)manager.size(streamName));
    }

    @Test
    public void testRegisterAndExternalStream() throws Exception {
        LogManager manager = this.service.getLogManager();
        Assert.assertTrue((boolean)manager.exists(Name.ofUrn((String)"input3")));
        Assert.assertTrue((boolean)manager.exists(Name.ofUrn((String)"output3")));
        Assert.assertTrue((boolean)manager.exists(Name.ofUrn((String)"registerInput")));
        Assert.assertFalse((boolean)manager.exists(Name.ofUrn((String)"externalOutput")));
        LogTailer tailer = manager.createTailer(Name.ofUrn((String)"test"), Name.ofUrn((String)"output3"));
        tailer.toEnd();
        StreamManager streamManager = this.service.getStreamManager();
        streamManager.append("input3", Record.of((String)"key", null));
        Assert.assertNull((Object)tailer.read(Duration.ofSeconds(1L)));
        StreamProcessor processor = streamManager.createStreamProcessor("registerProcessor");
        Assert.assertNotNull((Object)processor);
        processor.start();
        processor.waitForAssignments(Duration.ofSeconds(10L));
        processor.drainAndStop(Duration.ofSeconds(5L));
        Assert.assertEquals((Object)"key", (Object)((Record)tailer.read(Duration.ofSeconds(1L)).message()).getKey());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testProbe() throws Exception {
        StreamManager streamManager = this.service.getStreamManager();
        StreamProbe probe = new StreamProbe();
        probe.reset();
        ProbeStatus status = probe.run();
        Assert.assertFalse((boolean)status.isFailure());
        try {
            streamManager.append("inputFailure", Record.of((String)"key", null));
            Thread.sleep(500L);
            status = probe.run();
            Assert.assertFalse((String)status.toString(), (boolean)status.isFailure());
            Thread.sleep(1500L);
            status = probe.run();
            Assert.assertTrue((String)status.toString(), (boolean)status.isFailure());
        }
        finally {
            probe.reset();
        }
    }

    @Test
    public void testStreamMetrics() {
        MetricRegistry registry = SharedMetricRegistries.getOrCreate((String)"org.nuxeo.runtime.metrics.MetricsService");
        SortedMap gauges = registry.getGauges((name, metric) -> name.getKey().startsWith("nuxeo.streams.global"));
        Assert.assertTrue((boolean)gauges.isEmpty());
        StreamMetricsComputation comp = new StreamMetricsComputation(Duration.ofMinutes(1L), null);
        ComputationContextImpl context = new ComputationContextImpl(new ComputationMetadataMapping(comp.metadata(), Collections.emptyMap()));
        Assert.assertFalse((boolean)context.isSpareComputation());
        comp.init((ComputationContext)context);
        comp.processTimer((ComputationContext)context, "tracker", 0L);
        gauges = registry.getGauges((name, metric) -> name.getKey().startsWith("nuxeo.streams.global"));
        Assert.assertFalse((boolean)gauges.isEmpty());
        Gauge gauge = (Gauge)gauges.get(MetricName.build((String[])new String[]{"nuxeo.streams.global.stream.group.end"}).tagged(new String[]{"stream", "input"}).tagged(new String[]{"group", "myComputation"}));
        Assert.assertNotNull((Object)gauge);
        Assert.assertNotNull((Object)gauge.getValue());
        comp.destroy();
        gauges = registry.getGauges((name, metric) -> name.getKey().startsWith("nuxeo.streams.global"));
        Assert.assertTrue((boolean)gauges.isEmpty());
    }
}

