/*
 * Decompiled with CFR 0.152.
 */
package kafka.common;

import java.io.Serializable;
import kafka.common.NotificationHandler;
import kafka.common.ZkNodeChangeNotificationListener;
import kafka.common.ZkNodeChangeNotificationListener$;
import kafka.security.auth.Group$;
import kafka.security.auth.Resource;
import kafka.security.auth.ResourceType;
import kafka.utils.TestUtils$;
import kafka.zk.LiteralAclChangeStore$;
import kafka.zk.LiteralAclStore$;
import kafka.zk.ZkAclChangeStore$;
import kafka.zk.ZooKeeperTestHarness;
import org.apache.kafka.common.resource.PatternType;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import scala.Function0;
import scala.Function1;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.collection.Seq;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.ArrayBuffer$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.runtime.java8.JFunction0;
import scala.runtime.java8.JFunction1;

@ScalaSignature(bytes="\u0006\u0001\u0005\u001dc\u0001B\u0001\u0003\u0001\u001d\u0011AEW6O_\u0012,7\t[1oO\u0016tu\u000e^5gS\u000e\fG/[8o\u0019&\u001cH/\u001a8feR+7\u000f\u001e\u0006\u0003\u0007\u0011\taaY8n[>t'\"A\u0003\u0002\u000b-\fgm[1\u0004\u0001M\u0011\u0001\u0001\u0003\t\u0003\u00131i\u0011A\u0003\u0006\u0003\u0017\u0011\t!A_6\n\u00055Q!\u0001\u0006.p_.+W\r]3s)\u0016\u001cH\u000fS1s]\u0016\u001c8\u000fC\u0003\u0010\u0001\u0011\u0005\u0001#\u0001\u0004=S:LGO\u0010\u000b\u0002#A\u0011!\u0003A\u0007\u0002\u0005!9A\u0003\u0001b\u0001\n\u0013)\u0012AE2iC:<W-\u0012=qSJ\fG/[8o\u001bN,\u0012A\u0006\t\u0003/ii\u0011\u0001\u0007\u0006\u00023\u0005)1oY1mC&\u00111\u0004\u0007\u0002\u0004\u0013:$\bBB\u000f\u0001A\u0003%a#A\ndQ\u0006tw-Z#ya&\u0014\u0018\r^5p]6\u001b\b\u0005C\u0005 \u0001\u0001\u0007\t\u0019!C\u0005A\u0005!bn\u001c;jM&\u001c\u0017\r^5p]2K7\u000f^3oKJ,\u0012!\t\t\u0003%\tJ!a\t\u0002\u0003Ai[gj\u001c3f\u0007\"\fgnZ3O_RLg-[2bi&|g\u000eT5ti\u0016tWM\u001d\u0005\nK\u0001\u0001\r\u00111A\u0005\n\u0019\n\u0001D\\8uS\u001aL7-\u0019;j_:d\u0015n\u001d;f]\u0016\u0014x\fJ3r)\t9#\u0006\u0005\u0002\u0018Q%\u0011\u0011\u0006\u0007\u0002\u0005+:LG\u000fC\u0004,I\u0005\u0005\t\u0019A\u0011\u0002\u0007a$\u0013\u0007C\u0005.\u0001\u0001\u0007\t\u0011)Q\u0005C\u0005)bn\u001c;jM&\u001c\u0017\r^5p]2K7\u000f^3oKJ\u0004\u0003\"C\u0018\u0001\u0001\u0004\u0005\r\u0011\"\u00031\u0003Mqw\u000e^5gS\u000e\fG/[8o\u0011\u0006tG\r\\3s+\u0005\t\u0004C\u0001\u001a4\u001b\u0005\u0001a\u0001\u0002\u001b\u0001\tU\u0012q\u0003V3ti:{G/\u001b4jG\u0006$\u0018n\u001c8IC:$G.\u001a:\u0014\u0007M2\u0014\b\u0005\u0002\u0018o%\u0011\u0001\b\u0007\u0002\u0007\u0003:L(+\u001a4\u0011\u0005IQ\u0014BA\u001e\u0003\u0005Mqu\u000e^5gS\u000e\fG/[8o\u0011\u0006tG\r\\3s\u0011\u0015y1\u0007\"\u0001>)\u0005\t\u0004bB 4\u0005\u0004%I\u0001Q\u0001\t[\u0016\u001c8/Y4fgV\t\u0011\tE\u0002C\u000f&k\u0011a\u0011\u0006\u0003\t\u0016\u000bq!\\;uC\ndWM\u0003\u0002G1\u0005Q1m\u001c7mK\u000e$\u0018n\u001c8\n\u0005!\u001b%aC!se\u0006L()\u001e4gKJ\u0004\"AS(\u000e\u0003-S!\u0001T'\u0002\t\u0005,H\u000f\u001b\u0006\u0003\u001d\u0012\t\u0001b]3dkJLG/_\u0005\u0003!.\u0013\u0001BU3t_V\u00148-\u001a\u0005\u0007%N\u0002\u000b\u0011B!\u0002\u00135,7o]1hKN\u0004\u0003b\u0002+4\u0001\u0004%I!V\u0001\ni\"\u0014xn^*ju\u0016,\u0012A\u0016\t\u0004/]3\u0012B\u0001-\u0019\u0005\u0019y\u0005\u000f^5p]\"9!l\ra\u0001\n\u0013Y\u0016!\u0004;ie><8+\u001b>f?\u0012*\u0017\u000f\u0006\u0002(9\"91&WA\u0001\u0002\u00041\u0006B\u000204A\u0003&a+\u0001\u0006uQJ|woU5{K\u0002B#!\u00181\u0011\u0005]\t\u0017B\u00012\u0019\u0005!1x\u000e\\1uS2,\u0007\"\u000234\t\u0003*\u0017a\u00059s_\u000e,7o\u001d(pi&4\u0017nY1uS>tGCA\u0014g\u0011\u001597\r1\u0001i\u0003Mqw\u000e^5gS\u000e\fG/[8o\u001b\u0016\u001c8/Y4f!\r9\u0012n[\u0005\u0003Ub\u0011Q!\u0011:sCf\u0004\"a\u00067\n\u00055D\"\u0001\u0002\"zi\u0016DQa\\\u001a\u0005\u0002A\f\u0001B]3dK&4X\r\u001a\u000b\u0002cB\u0019!O_%\u000f\u0005MDhB\u0001;x\u001b\u0005)(B\u0001<\u0007\u0003\u0019a$o\\8u}%\t\u0011$\u0003\u0002z1\u00059\u0001/Y2lC\u001e,\u0017BA>}\u0005\r\u0019V-\u001d\u0006\u0003sbAQA`\u001a\u0005\u0002}\fAb]3u)\"\u0014xn^*ju\u0016$2aJA\u0001\u0011\u0019\t\u0019! a\u0001-\u0005)\u0011N\u001c3fq\"Y\u0011q\u0001\u0001A\u0002\u0003\u0007I\u0011BA\u0005\u0003]qw\u000e^5gS\u000e\fG/[8o\u0011\u0006tG\r\\3s?\u0012*\u0017\u000fF\u0002(\u0003\u0017A\u0001bKA\u0003\u0003\u0003\u0005\r!\r\u0005\u000b\u0003\u001f\u0001\u0001\u0019!A!B\u0013\t\u0014\u0001\u00068pi&4\u0017nY1uS>t\u0007*\u00198eY\u0016\u0014\b\u0005C\u0004\u0002\u0014\u0001!\t%!\u0006\u0002\u000bM,G/\u00169\u0015\u0003\u001dBC!!\u0005\u0002\u001aA!\u00111DA\u0013\u001b\t\tiB\u0003\u0003\u0002 \u0005\u0005\u0012!\u00026v]&$(BAA\u0012\u0003\ry'oZ\u0005\u0005\u0003O\tiB\u0001\u0004CK\u001a|'/\u001a\u0005\b\u0003W\u0001A\u0011IA\u000b\u0003!!X-\u0019:E_^t\u0007\u0006BA\u0015\u0003_\u0001B!a\u0007\u00022%!\u00111GA\u000f\u0005\u0015\te\r^3s\u0011\u001d\t9\u0004\u0001C\u0001\u0003+\tq\u0003^3tiB\u0013xnY3tg:{G/\u001b4jG\u0006$\u0018n\u001c8)\t\u0005U\u00121\b\t\u0005\u00037\ti$\u0003\u0003\u0002@\u0005u!\u0001\u0002+fgRDq!a\u0011\u0001\t\u0003\t)\"\u0001\u0010uKN$8k^1mY><8\u000f\u0015:pG\u0016\u001c8o\u001c:Fq\u000e,\u0007\u000f^5p]\"\"\u0011\u0011IA\u001e\u0001")
public class ZkNodeChangeNotificationListenerTest
extends ZooKeeperTestHarness {
    private final int changeExpirationMs;
    private ZkNodeChangeNotificationListener notificationListener;
    private TestNotificationHandler notificationHandler;

    private int changeExpirationMs() {
        return this.changeExpirationMs;
    }

    private ZkNodeChangeNotificationListener notificationListener() {
        return this.notificationListener;
    }

    private void notificationListener_$eq(ZkNodeChangeNotificationListener x$1) {
        this.notificationListener = x$1;
    }

    private TestNotificationHandler notificationHandler() {
        return this.notificationHandler;
    }

    private void notificationHandler_$eq(TestNotificationHandler x$1) {
        this.notificationHandler = x$1;
    }

    @Override
    @Before
    public void setUp() {
        super.setUp();
        this.zkClient().createAclPaths();
        this.notificationHandler_$eq(new TestNotificationHandler());
    }

    @Override
    @After
    public void tearDown() {
        if (this.notificationListener() != null) {
            this.notificationListener().close();
        }
        super.tearDown();
    }

    @Test
    public void testProcessNotification() {
        Resource notificationMessage1 = new Resource((ResourceType)Group$.MODULE$, "messageA", PatternType.LITERAL);
        Resource notificationMessage2 = new Resource((ResourceType)Group$.MODULE$, "messageB", PatternType.LITERAL);
        this.notificationListener_$eq(new ZkNodeChangeNotificationListener(this.zkClient(), LiteralAclChangeStore$.MODULE$.aclChangePath(), ZkAclChangeStore$.MODULE$.SequenceNumberPrefix(), (NotificationHandler)this.notificationHandler(), (long)this.changeExpirationMs(), ZkNodeChangeNotificationListener$.MODULE$.$lessinit$greater$default$6()));
        this.notificationListener().init();
        this.zkClient().createAclChangeNotification(notificationMessage1);
        TestUtils$.MODULE$.waitUntilTrue((Function0<Object>)(JFunction0.mcZ.sp & Serializable & scala.Serializable)() -> {
            if (this.notificationHandler().received().size() != 1) return false;
            Object object = this.notificationHandler().received().last();
            Resource resource = notificationMessage1;
            if (object != null) {
                if (!object.equals(resource)) return false;
                return true;
            }
            if (resource == null) return true;
            return false;
        }, (Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Failed to send/process notification message in the timeout period.", TestUtils$.MODULE$.waitUntilTrue$default$3(), TestUtils$.MODULE$.waitUntilTrue$default$4(), TestUtils$.MODULE$.waitUntilTrue$default$5());
        this.zkClient().createAclChangeNotification(notificationMessage2);
        TestUtils$.MODULE$.waitUntilTrue((Function0<Object>)(JFunction0.mcZ.sp & Serializable & scala.Serializable)() -> {
            if (this.notificationHandler().received().size() != 2) return false;
            Object object = this.notificationHandler().received().last();
            Resource resource = notificationMessage2;
            if (object != null) {
                if (!object.equals(resource)) return false;
                return true;
            }
            if (resource == null) return true;
            return false;
        }, (Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Failed to send/process notification message in the timeout period.", TestUtils$.MODULE$.waitUntilTrue$default$3(), TestUtils$.MODULE$.waitUntilTrue$default$4(), TestUtils$.MODULE$.waitUntilTrue$default$5());
        RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(3), 10).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> this.zkClient().createAclChangeNotification(new Resource((ResourceType)Group$.MODULE$, new StringBuilder(7).append("message").append(i).toString(), PatternType.LITERAL)));
        TestUtils$.MODULE$.waitUntilTrue((Function0<Object>)(JFunction0.mcZ.sp & Serializable & scala.Serializable)() -> this.notificationHandler().received().size() == 10, (Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(64).append("Expected 10 invocations of processNotifications, but there were ").append(this.notificationHandler().received()).toString(), TestUtils$.MODULE$.waitUntilTrue$default$3(), TestUtils$.MODULE$.waitUntilTrue$default$4(), TestUtils$.MODULE$.waitUntilTrue$default$5());
    }

    @Test
    public void testSwallowsProcessorException() {
        this.notificationHandler().setThrowSize(2);
        this.notificationListener_$eq(new ZkNodeChangeNotificationListener(this.zkClient(), LiteralAclChangeStore$.MODULE$.aclChangePath(), ZkAclChangeStore$.MODULE$.SequenceNumberPrefix(), (NotificationHandler)this.notificationHandler(), (long)this.changeExpirationMs(), ZkNodeChangeNotificationListener$.MODULE$.$lessinit$greater$default$6()));
        this.notificationListener().init();
        this.zkClient().createAclChangeNotification(new Resource((ResourceType)Group$.MODULE$, "messageA", PatternType.LITERAL));
        this.zkClient().createAclChangeNotification(new Resource((ResourceType)Group$.MODULE$, "messageB", PatternType.LITERAL));
        this.zkClient().createAclChangeNotification(new Resource((ResourceType)Group$.MODULE$, "messageC", PatternType.LITERAL));
        TestUtils$.MODULE$.waitUntilTrue((Function0<Object>)(JFunction0.mcZ.sp & Serializable & scala.Serializable)() -> this.notificationHandler().received().size() == 3, (Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(63).append("Expected 2 invocations of processNotifications, but there were ").append(this.notificationHandler().received()).toString(), TestUtils$.MODULE$.waitUntilTrue$default$3(), TestUtils$.MODULE$.waitUntilTrue$default$4(), TestUtils$.MODULE$.waitUntilTrue$default$5());
    }

    public ZkNodeChangeNotificationListenerTest() {
        this.changeExpirationMs = 1000;
    }

    public class TestNotificationHandler
    implements NotificationHandler {
        private final ArrayBuffer<Resource> messages;
        private volatile Option<Object> throwSize;

        private ArrayBuffer<Resource> messages() {
            return this.messages;
        }

        private Option<Object> throwSize() {
            return this.throwSize;
        }

        private void throwSize_$eq(Option<Object> x$1) {
            this.throwSize = x$1;
        }

        public void processNotification(byte[] notificationMessage) {
            this.messages().$plus$eq((Object)LiteralAclStore$.MODULE$.changeStore().decode(notificationMessage));
            if (this.throwSize().contains((Object)BoxesRunTime.boxToInteger((int)this.messages().size()))) {
                throw new RuntimeException("Oh no, my processing failed!");
            }
        }

        public Seq<Resource> received() {
            return this.messages();
        }

        public void setThrowSize(int index) {
            this.throwSize_$eq((Option<Object>)Option$.MODULE$.apply((Object)BoxesRunTime.boxToInteger((int)index)));
        }

        public /* synthetic */ ZkNodeChangeNotificationListenerTest kafka$common$ZkNodeChangeNotificationListenerTest$TestNotificationHandler$$$outer() {
            return ZkNodeChangeNotificationListenerTest.this;
        }

        public TestNotificationHandler() {
            if (ZkNodeChangeNotificationListenerTest.this == null) {
                throw null;
            }
            this.messages = (ArrayBuffer)ArrayBuffer$.MODULE$.empty();
            this.throwSize = Option$.MODULE$.empty();
        }
    }
}

