/*
 * Decompiled with CFR 0.152.
 */
package kafka.common;

import java.io.Serializable;
import kafka.common.NotificationHandler;
import kafka.common.ZkNodeChangeNotificationListener;
import kafka.server.QuorumTestHarness;
import kafka.zk.LiteralAclChangeStore$;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import scala.Function1;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.collection.Seq;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.ArrayBuffer$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.runtime.RichLong$;
import scala.runtime.java8.JFunction1;

@ScalaSignature(bytes="\u0006\u0005\u0005=c\u0001\u0002\r\u001a\u0001yAQ!\n\u0001\u0005\u0002\u0019Bq!\u000b\u0001C\u0002\u0013%!\u0006\u0003\u00042\u0001\u0001\u0006Ia\u000b\u0005\ne\u0001\u0001\r\u00111A\u0005\nMB\u0011b\u000e\u0001A\u0002\u0003\u0007I\u0011\u0002\u001d\t\u0013y\u0002\u0001\u0019!A!B\u0013!\u0004\"C \u0001\u0001\u0004\u0005\r\u0011\"\u0003A\u0011-\tI\u0001\u0001a\u0001\u0002\u0004%I!a\u0003\t\u0015\u0005=\u0001\u00011A\u0001B\u0003&\u0011\tC\u0004\u0002\u0012\u0001!\t%a\u0005\t\u000f\u0005U\u0002\u0001\"\u0011\u00028!9\u0011\u0011\t\u0001\u0005\u0002\u0005]\u0002bBA&\u0001\u0011\u0005\u0011q\u0007\u0004\u0005\u0007\u0002!A\tC\u0003&\u001d\u0011\u00051\nC\u0004M\u001d\t\u0007I\u0011B'\t\r\tt\u0001\u0015!\u0003O\u0011\u001d\u0019g\u00021A\u0005\n\u0011Dq\u0001\u001b\bA\u0002\u0013%\u0011\u000e\u0003\u0004l\u001d\u0001\u0006K!\u001a\u0005\u0006a:!\t%\u001d\u0005\u0006u:!\ta\u001f\u0005\b\u0003\u0003qA\u0011AA\u0002\u0005\u0011R6NT8eK\u000eC\u0017M\\4f\u001d>$\u0018NZ5dCRLwN\u001c'jgR,g.\u001a:UKN$(B\u0001\u000e\u001c\u0003\u0019\u0019w.\\7p]*\tA$A\u0003lC\u001a\\\u0017m\u0001\u0001\u0014\u0005\u0001y\u0002C\u0001\u0011$\u001b\u0005\t#B\u0001\u0012\u001c\u0003\u0019\u0019XM\u001d<fe&\u0011A%\t\u0002\u0012#V|'/^7UKN$\b*\u0019:oKN\u001c\u0018A\u0002\u001fj]&$h\bF\u0001(!\tA\u0003!D\u0001\u001a\u0003I\u0019\u0007.\u00198hK\u0016C\b/\u001b:bi&|g.T:\u0016\u0003-\u0002\"\u0001L\u0018\u000e\u00035R\u0011AL\u0001\u0006g\u000e\fG.Y\u0005\u0003a5\u00121!\u00138u\u0003M\u0019\u0007.\u00198hK\u0016C\b/\u001b:bi&|g.T:!\u0003Qqw\u000e^5gS\u000e\fG/[8o\u0019&\u001cH/\u001a8feV\tA\u0007\u0005\u0002)k%\u0011a'\u0007\u0002!5.tu\u000eZ3DQ\u0006tw-\u001a(pi&4\u0017nY1uS>tG*[:uK:,'/\u0001\ro_RLg-[2bi&|g\u000eT5ti\u0016tWM]0%KF$\"!\u000f\u001f\u0011\u00051R\u0014BA\u001e.\u0005\u0011)f.\u001b;\t\u000fu*\u0011\u0011!a\u0001i\u0005\u0019\u0001\u0010J\u0019\u0002+9|G/\u001b4jG\u0006$\u0018n\u001c8MSN$XM\\3sA\u0005\u0019bn\u001c;jM&\u001c\u0017\r^5p]\"\u000bg\u000e\u001a7feV\t\u0011\t\u0005\u0002C\u001d5\t\u0001AA\fUKN$hj\u001c;jM&\u001c\u0017\r^5p]\"\u000bg\u000e\u001a7feN\u0019a\"\u0012%\u0011\u000512\u0015BA$.\u0005\u0019\te.\u001f*fMB\u0011\u0001&S\u0005\u0003\u0015f\u00111CT8uS\u001aL7-\u0019;j_:D\u0015M\u001c3mKJ$\u0012!Q\u0001\t[\u0016\u001c8/Y4fgV\ta\nE\u0002P)Zk\u0011\u0001\u0015\u0006\u0003#J\u000bq!\\;uC\ndWM\u0003\u0002T[\u0005Q1m\u001c7mK\u000e$\u0018n\u001c8\n\u0005U\u0003&aC!se\u0006L()\u001e4gKJ\u0004\"a\u00161\u000e\u0003aS!!\u0017.\u0002\u0011I,7o\\;sG\u0016T!AG.\u000b\u0005qa&BA/_\u0003\u0019\t\u0007/Y2iK*\tq,A\u0002pe\u001eL!!\u0019-\u0003\u001fI+7o\\;sG\u0016\u0004\u0016\r\u001e;fe:\f\u0011\"\\3tg\u0006<Wm\u001d\u0011\u0002\u0013QD'o\\<TSj,W#A3\u0011\u0007127&\u0003\u0002h[\t1q\n\u001d;j_:\fQ\u0002\u001e5s_^\u001c\u0016N_3`I\u0015\fHCA\u001dk\u0011\u001di4#!AA\u0002\u0015\f!\u0002\u001e5s_^\u001c\u0016N_3!Q\t!R\u000e\u0005\u0002-]&\u0011q.\f\u0002\tm>d\u0017\r^5mK\u0006\u0019\u0002O]8dKN\u001chj\u001c;jM&\u001c\u0017\r^5p]R\u0011\u0011H\u001d\u0005\u0006gV\u0001\r\u0001^\u0001\u0014]>$\u0018NZ5dCRLwN\\'fgN\fw-\u001a\t\u0004YU<\u0018B\u0001<.\u0005\u0015\t%O]1z!\ta\u00030\u0003\u0002z[\t!!)\u001f;f\u0003!\u0011XmY3jm\u0016$G#\u0001?\u0011\u0007uth+D\u0001S\u0013\ty(KA\u0002TKF\fAb]3u)\"\u0014xn^*ju\u0016$2!OA\u0003\u0011\u0019\t9a\u0006a\u0001W\u0005)\u0011N\u001c3fq\u00069bn\u001c;jM&\u001c\u0017\r^5p]\"\u000bg\u000e\u001a7fe~#S-\u001d\u000b\u0004s\u00055\u0001bB\u001f\t\u0003\u0003\u0005\r!Q\u0001\u0015]>$\u0018NZ5dCRLwN\u001c%b]\u0012dWM\u001d\u0011\u0002\u000bM,G/\u00169\u0015\u0007e\n)\u0002C\u0004\u0002\u0018)\u0001\r!!\u0007\u0002\u0011Q,7\u000f^%oM>\u0004B!a\u0007\u0002*5\u0011\u0011Q\u0004\u0006\u0005\u0003?\t\t#A\u0002ba&TA!a\t\u0002&\u00059!.\u001e9ji\u0016\u0014(bAA\u0014=\u0006)!.\u001e8ji&!\u00111FA\u000f\u0005!!Vm\u001d;J]\u001a|\u0007f\u0001\u0006\u00020A!\u00111DA\u0019\u0013\u0011\t\u0019$!\b\u0003\u0015\t+gm\u001c:f\u000b\u0006\u001c\u0007.\u0001\u0005uK\u0006\u0014Hi\\<o)\u0005I\u0004fA\u0006\u0002<A!\u00111DA\u001f\u0013\u0011\ty$!\b\u0003\u0013\u00053G/\u001a:FC\u000eD\u0017a\u0006;fgR\u0004&o\\2fgNtu\u000e^5gS\u000e\fG/[8oQ\ra\u0011Q\t\t\u0005\u00037\t9%\u0003\u0003\u0002J\u0005u!\u0001\u0002+fgR\fa\u0004^3tiN;\u0018\r\u001c7poN\u0004&o\\2fgN|'/\u0012=dKB$\u0018n\u001c8)\u00075\t)\u0005")
public class ZkNodeChangeNotificationListenerTest
extends QuorumTestHarness {
    private final int changeExpirationMs;
    private ZkNodeChangeNotificationListener notificationListener;
    private TestNotificationHandler notificationHandler;

    private int changeExpirationMs() {
        return this.changeExpirationMs;
    }

    private ZkNodeChangeNotificationListener notificationListener() {
        return this.notificationListener;
    }

    private void notificationListener_$eq(ZkNodeChangeNotificationListener x$1) {
        this.notificationListener = x$1;
    }

    private TestNotificationHandler notificationHandler() {
        return this.notificationHandler;
    }

    private void notificationHandler_$eq(TestNotificationHandler x$1) {
        this.notificationHandler = x$1;
    }

    @Override
    @BeforeEach
    public void setUp(TestInfo testInfo) {
        super.setUp(testInfo);
        this.zkClient().createAclPaths();
        this.notificationHandler_$eq(new TestNotificationHandler());
    }

    @Override
    @AfterEach
    public void tearDown() {
        if (this.notificationListener() != null) {
            this.notificationListener().close();
        }
        super.tearDown();
    }

    @Test
    public void testProcessNotification() {
        ResourcePattern notificationMessage1 = new ResourcePattern(ResourceType.GROUP, "messageA", PatternType.LITERAL);
        ResourcePattern notificationMessage2 = new ResourcePattern(ResourceType.GROUP, "messageB", PatternType.LITERAL);
        this.notificationListener_$eq(new ZkNodeChangeNotificationListener(this.zkClient(), LiteralAclChangeStore$.MODULE$.aclChangePath(), "acl_changes_", (NotificationHandler)this.notificationHandler(), (long)this.changeExpirationMs(), Time.SYSTEM));
        this.notificationListener().init();
        this.zkClient().createAclChangeNotification(notificationMessage1);
        long l = 15000L;
        long l2 = 100L;
        long waitUntilTrue_startTime = System.currentTimeMillis();
        while (!ZkNodeChangeNotificationListenerTest.$anonfun$testProcessNotification$1(this, notificationMessage1)) {
            if (System.currentTimeMillis() > waitUntilTrue_startTime + l) {
                Assertions.fail((String)"Failed to send/process notification message in the timeout period.");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(l), l2));
        }
        this.zkClient().createAclChangeNotification(notificationMessage2);
        long l3 = 15000L;
        long l4 = 100L;
        long waitUntilTrue_startTime2 = System.currentTimeMillis();
        while (!ZkNodeChangeNotificationListenerTest.$anonfun$testProcessNotification$3(this, notificationMessage2)) {
            if (System.currentTimeMillis() > waitUntilTrue_startTime2 + l3) {
                Assertions.fail((String)"Failed to send/process notification message in the timeout period.");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(l3), l4));
        }
        RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(3), 10).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable)i -> this.zkClient().createAclChangeNotification(new ResourcePattern(ResourceType.GROUP, new StringBuilder(7).append("message").append(i).toString(), PatternType.LITERAL)));
        long l5 = 15000L;
        long l6 = 100L;
        long waitUntilTrue_startTime3 = System.currentTimeMillis();
        while (!ZkNodeChangeNotificationListenerTest.$anonfun$testProcessNotification$6(this)) {
            if (System.currentTimeMillis() > waitUntilTrue_startTime3 + l5) {
                Assertions.fail((String)ZkNodeChangeNotificationListenerTest.$anonfun$testProcessNotification$7(this));
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(l5), l6));
        }
    }

    @Test
    public void testSwallowsProcessorException() {
        this.notificationHandler().setThrowSize(2);
        this.notificationListener_$eq(new ZkNodeChangeNotificationListener(this.zkClient(), LiteralAclChangeStore$.MODULE$.aclChangePath(), "acl_changes_", (NotificationHandler)this.notificationHandler(), (long)this.changeExpirationMs(), Time.SYSTEM));
        this.notificationListener().init();
        this.zkClient().createAclChangeNotification(new ResourcePattern(ResourceType.GROUP, "messageA", PatternType.LITERAL));
        this.zkClient().createAclChangeNotification(new ResourcePattern(ResourceType.GROUP, "messageB", PatternType.LITERAL));
        this.zkClient().createAclChangeNotification(new ResourcePattern(ResourceType.GROUP, "messageC", PatternType.LITERAL));
        long l = 15000L;
        long l2 = 100L;
        long waitUntilTrue_startTime = System.currentTimeMillis();
        while (!ZkNodeChangeNotificationListenerTest.$anonfun$testSwallowsProcessorException$1(this)) {
            if (System.currentTimeMillis() > waitUntilTrue_startTime + l) {
                Assertions.fail((String)ZkNodeChangeNotificationListenerTest.$anonfun$testSwallowsProcessorException$2(this));
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(l), l2));
        }
    }

    public static final /* synthetic */ boolean $anonfun$testProcessNotification$1(ZkNodeChangeNotificationListenerTest $this, ResourcePattern notificationMessage1$1) {
        if ($this.notificationHandler().received().size() == 1) {
            Object object = $this.notificationHandler().received().last();
            if (!(object != null ? !object.equals(notificationMessage1$1) : notificationMessage1$1 != null)) {
                return true;
            }
        }
        return false;
    }

    public static final /* synthetic */ String $anonfun$testProcessNotification$2() {
        return "Failed to send/process notification message in the timeout period.";
    }

    public static final /* synthetic */ boolean $anonfun$testProcessNotification$3(ZkNodeChangeNotificationListenerTest $this, ResourcePattern notificationMessage2$1) {
        if ($this.notificationHandler().received().size() == 2) {
            Object object = $this.notificationHandler().received().last();
            if (!(object != null ? !object.equals(notificationMessage2$1) : notificationMessage2$1 != null)) {
                return true;
            }
        }
        return false;
    }

    public static final /* synthetic */ String $anonfun$testProcessNotification$4() {
        return "Failed to send/process notification message in the timeout period.";
    }

    public static final /* synthetic */ boolean $anonfun$testProcessNotification$6(ZkNodeChangeNotificationListenerTest $this) {
        return $this.notificationHandler().received().size() == 10;
    }

    public static final /* synthetic */ String $anonfun$testProcessNotification$7(ZkNodeChangeNotificationListenerTest $this) {
        return new StringBuilder(64).append("Expected 10 invocations of processNotifications, but there were ").append($this.notificationHandler().received()).toString();
    }

    public static final /* synthetic */ boolean $anonfun$testSwallowsProcessorException$1(ZkNodeChangeNotificationListenerTest $this) {
        return $this.notificationHandler().received().size() == 3;
    }

    public static final /* synthetic */ String $anonfun$testSwallowsProcessorException$2(ZkNodeChangeNotificationListenerTest $this) {
        return new StringBuilder(63).append("Expected 2 invocations of processNotifications, but there were ").append($this.notificationHandler().received()).toString();
    }

    public ZkNodeChangeNotificationListenerTest() {
        this.changeExpirationMs = 1000;
    }

    private class TestNotificationHandler
    implements NotificationHandler {
        private final ArrayBuffer<ResourcePattern> messages;
        private volatile Option<Object> throwSize;

        private ArrayBuffer<ResourcePattern> messages() {
            return this.messages;
        }

        private Option<Object> throwSize() {
            return this.throwSize;
        }

        private void throwSize_$eq(Option<Object> x$1) {
            this.throwSize = x$1;
        }

        public void processNotification(byte[] notificationMessage) {
            this.messages().$plus$eq((Object)LiteralAclChangeStore$.MODULE$.decode(notificationMessage));
            if (this.throwSize().contains((Object)BoxesRunTime.boxToInteger((int)this.messages().size()))) {
                throw new RuntimeException("Oh no, my processing failed!");
            }
        }

        public Seq<ResourcePattern> received() {
            return this.messages();
        }

        public void setThrowSize(int index) {
            this.throwSize_$eq((Option<Object>)Option$.MODULE$.apply((Object)BoxesRunTime.boxToInteger((int)index)));
        }

        public /* synthetic */ ZkNodeChangeNotificationListenerTest kafka$common$ZkNodeChangeNotificationListenerTest$TestNotificationHandler$$$outer() {
            return ZkNodeChangeNotificationListenerTest.this;
        }

        public TestNotificationHandler() {
            if (ZkNodeChangeNotificationListenerTest.this == null) {
                throw null;
            }
            this.messages = ArrayBuffer$.MODULE$.empty();
            this.throwSize = Option$.MODULE$.empty();
        }
    }
}

