/*
 * Decompiled with CFR 0.152.
 */
package kafka.controller;

import com.yammer.metrics.Metrics;
import com.yammer.metrics.core.Histogram;
import com.yammer.metrics.core.MetricName;
import com.yammer.metrics.core.Timer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import kafka.controller.AutoPreferredReplicaLeaderElection$;
import kafka.controller.BrokerChange$;
import kafka.controller.ControllerEvent;
import kafka.controller.ControllerEventManager;
import kafka.controller.ControllerEventProcessor;
import kafka.controller.ControllerState;
import kafka.controller.ControllerStats;
import kafka.controller.TopicChange$;
import kafka.controller.UpdateMetadataResponseReceived;
import kafka.utils.TestUtils$;
import org.apache.kafka.common.message.UpdateMetadataResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.UpdateMetadataResponse;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import scala.Function0;
import scala.Function1;
import scala.Predef$;
import scala.Serializable;
import scala.StringContext;
import scala.collection.JavaConverters$;
import scala.collection.MapLike;
import scala.collection.Seq;
import scala.collection.TraversableOnce;
import scala.collection.mutable.Set;
import scala.collection.mutable.Set$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.Nothing$;

@ScalaSignature(bytes="\u0006\u0001\u001d4A!\u0001\u0002\u0001\u000f\tQ2i\u001c8ue>dG.\u001a:Fm\u0016tG/T1oC\u001e,'\u000fV3ti*\u00111\u0001B\u0001\u000bG>tGO]8mY\u0016\u0014(\"A\u0003\u0002\u000b-\fgm[1\u0004\u0001M\u0011\u0001\u0001\u0003\t\u0003\u00131i\u0011A\u0003\u0006\u0002\u0017\u0005)1oY1mC&\u0011QB\u0003\u0002\u0007\u0003:L(+\u001a4\t\u000b=\u0001A\u0011\u0001\t\u0002\rqJg.\u001b;?)\u0005\t\u0002C\u0001\n\u0001\u001b\u0005\u0011\u0001\"\u0003\u000b\u0001\u0001\u0004\u0005\r\u0011\"\u0003\u0016\u0003Y\u0019wN\u001c;s_2dWM]#wK:$X*\u00198bO\u0016\u0014X#\u0001\f\u0011\u0005I9\u0012B\u0001\r\u0003\u0005Y\u0019uN\u001c;s_2dWM]#wK:$X*\u00198bO\u0016\u0014\b\"\u0003\u000e\u0001\u0001\u0004\u0005\r\u0011\"\u0003\u001c\u0003i\u0019wN\u001c;s_2dWM]#wK:$X*\u00198bO\u0016\u0014x\fJ3r)\tar\u0004\u0005\u0002\n;%\u0011aD\u0003\u0002\u0005+:LG\u000fC\u0004!3\u0005\u0005\t\u0019\u0001\f\u0002\u0007a$\u0013\u0007\u0003\u0004#\u0001\u0001\u0006KAF\u0001\u0018G>tGO]8mY\u0016\u0014XI^3oi6\u000bg.Y4fe\u0002BQ\u0001\n\u0001\u0005\u0002\u0015\n\u0001\u0002^3be\u0012{wO\u001c\u000b\u00029!\u00121e\n\t\u0003Q5j\u0011!\u000b\u0006\u0003U-\nQA[;oSRT\u0011\u0001L\u0001\u0004_J<\u0017B\u0001\u0018*\u0005\u0015\te\r^3s\u0011\u0015\u0001\u0004\u0001\"\u0001&\u0003e!Xm\u001d;NKR\u0014\u0018nY:DY\u0016\fg.\u001a3P]\u000ecwn]3)\u0005=\u0012\u0004C\u0001\u00154\u0013\t!\u0014F\u0001\u0003UKN$\b\"\u0002\u001c\u0001\t\u0003)\u0013a\u0007;fgR,e/\u001a8u/&$\bn\\;u%\u0006$X-T3ue&\u001c7\u000f\u000b\u00026e!)\u0011\b\u0001C\u0001K\u0005\u0011B/Z:u\u000bZ,g\u000e^)vKV,G+[7fQ\tA$\u0007C\u0003=\u0001\u0011\u0005Q%A\nuKN$8+^2dKN\u001ch-\u001e7Fm\u0016tG\u000f\u000b\u0002<e!)q\b\u0001C\u0001K\u0005aB/Z:u\u000bZ,g\u000e\u001e+iCR$\u0006N]8xg\u0016C8-\u001a9uS>t\u0007F\u0001 3\u0011\u0015\u0011\u0005\u0001\"\u0003D\u0003\u0015\u0019\u0007.Z2l)\u0011aB)\u0014*\t\u000b\u0015\u000b\u0005\u0019\u0001$\u0002\u00155,GO]5d\u001d\u0006lW\r\u0005\u0002H\u0015:\u0011\u0011\u0002S\u0005\u0003\u0013*\ta\u0001\u0015:fI\u00164\u0017BA&M\u0005\u0019\u0019FO]5oO*\u0011\u0011J\u0003\u0005\u0006\u001d\u0006\u0003\raT\u0001\u0006KZ,g\u000e\u001e\t\u0003%AK!!\u0015\u0002\u0003\u001f\r{g\u000e\u001e:pY2,'/\u0012<f]RDQaU!A\u0002Q\u000bAAZ;oGB\u0019\u0011\"\u0016\u000f\n\u0005YS!!\u0003$v]\u000e$\u0018n\u001c81\u0011\u0015A\u0006\u0001\"\u0003Z\u0003\u0015!\u0018.\\3s)\tQf\r\u0005\u0002\\I6\tAL\u0003\u0002^=\u0006!1m\u001c:f\u0015\ty\u0006-A\u0004nKR\u0014\u0018nY:\u000b\u0005\u0005\u0014\u0017AB=b[6,'OC\u0001d\u0003\r\u0019w.\\\u0005\u0003Kr\u0013Q\u0001V5nKJDQ!R,A\u0002\u0019\u0003")
public class ControllerEventManagerTest {
    private ControllerEventManager kafka$controller$ControllerEventManagerTest$$controllerEventManager;

    public ControllerEventManager kafka$controller$ControllerEventManagerTest$$controllerEventManager() {
        return this.kafka$controller$ControllerEventManagerTest$$controllerEventManager;
    }

    private void kafka$controller$ControllerEventManagerTest$$controllerEventManager_$eq(ControllerEventManager x$1) {
        this.kafka$controller$ControllerEventManagerTest$$controllerEventManager = x$1;
    }

    @After
    public void tearDown() {
        if (this.kafka$controller$ControllerEventManagerTest$$controllerEventManager() != null) {
            this.kafka$controller$ControllerEventManagerTest$$controllerEventManager().close();
        }
    }

    @Test
    public void testMetricsCleanedOnClose() {
        MockTime time = new MockTime();
        ControllerStats controllerStats = new ControllerStats();
        ControllerEventProcessor eventProcessor = new ControllerEventProcessor(this){

            public void process(ControllerEvent event) {
            }

            public void preempt(ControllerEvent event) {
            }
        };
        this.kafka$controller$ControllerEventManagerTest$$controllerEventManager_$eq(new ControllerEventManager(0, eventProcessor, (Time)time, controllerStats.rateAndTimeMetrics()));
        this.kafka$controller$ControllerEventManagerTest$$controllerEventManager().start();
        Assert.assertTrue((boolean)this.allEventManagerMetrics$1().nonEmpty());
        this.kafka$controller$ControllerEventManagerTest$$controllerEventManager().close();
        Assert.assertTrue((boolean)this.allEventManagerMetrics$1().isEmpty());
    }

    @Test
    public void testEventWithoutRateMetrics() {
        MockTime time = new MockTime();
        ControllerStats controllerStats = new ControllerStats();
        Set processedEvents = Set$.MODULE$.empty();
        ControllerEventProcessor eventProcessor = new ControllerEventProcessor(this, processedEvents){
            private final Set processedEvents$1;

            public void process(ControllerEvent event) {
                this.processedEvents$1.$plus$eq((Object)event);
            }

            public void preempt(ControllerEvent event) {
            }
            {
                this.processedEvents$1 = processedEvents$1;
            }
        };
        this.kafka$controller$ControllerEventManagerTest$$controllerEventManager_$eq(new ControllerEventManager(0, eventProcessor, (Time)time, controllerStats.rateAndTimeMetrics()));
        this.kafka$controller$ControllerEventManagerTest$$controllerEventManager().start();
        UpdateMetadataResponse updateMetadataResponse = new UpdateMetadataResponse(new UpdateMetadataResponseData().setErrorCode(Errors.NONE.code()));
        UpdateMetadataResponseReceived updateMetadataResponseEvent = new UpdateMetadataResponseReceived(updateMetadataResponse, 1);
        this.kafka$controller$ControllerEventManagerTest$$controllerEventManager().put((ControllerEvent)updateMetadataResponseEvent);
        TestUtils$.MODULE$.waitUntilTrue((Function0<Object>)new Serializable(this, processedEvents){
            public static final long serialVersionUID = 0L;
            private final Set processedEvents$1;

            public final boolean apply() {
                return this.apply$mcZ$sp();
            }

            public boolean apply$mcZ$sp() {
                return this.processedEvents$1.size() == 1;
            }
            {
                this.processedEvents$1 = processedEvents$1;
            }
        }, (Function0<String>)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final String apply() {
                return "Failed to process expected event before timing out";
            }
        }, TestUtils$.MODULE$.waitUntilTrue$default$3(), TestUtils$.MODULE$.waitUntilTrue$default$4());
        Assert.assertEquals((Object)updateMetadataResponseEvent, (Object)processedEvents.head());
    }

    @Test
    public void testEventQueueTime() {
        String metricName = "kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs";
        ControllerStats controllerStats = new ControllerStats();
        MockTime time = new MockTime();
        CountDownLatch latch = new CountDownLatch(1);
        AtomicInteger processedEvents = new AtomicInteger();
        ControllerEventProcessor eventProcessor = new ControllerEventProcessor(this, time, latch, processedEvents){
            private final MockTime time$1;
            private final CountDownLatch latch$1;
            private final AtomicInteger processedEvents$2;

            public void process(ControllerEvent event) {
                this.latch$1.await();
                this.time$1.sleep(500L);
                this.processedEvents$2.incrementAndGet();
            }

            public void preempt(ControllerEvent event) {
            }
            {
                this.time$1 = time$1;
                this.latch$1 = latch$1;
                this.processedEvents$2 = processedEvents$2;
            }
        };
        Assert.assertTrue((boolean)((MapLike)JavaConverters$.MODULE$.mapAsScalaMapConverter(Metrics.defaultRegistry().allMetrics()).asScala()).filterKeys((Function1)new Serializable(this, metricName){
            public static final long serialVersionUID = 0L;
            private final String metricName$1;

            public final boolean apply(MetricName x$2) {
                String string = x$2.getMBeanName();
                String string2 = this.metricName$1;
                return !(string != null ? !string.equals(string2) : string2 != null);
            }
            {
                this.metricName$1 = metricName$1;
            }
        }).values().isEmpty());
        this.kafka$controller$ControllerEventManagerTest$$controllerEventManager_$eq(new ControllerEventManager(0, eventProcessor, (Time)time, controllerStats.rateAndTimeMetrics()));
        this.kafka$controller$ControllerEventManagerTest$$controllerEventManager().start();
        this.kafka$controller$ControllerEventManagerTest$$controllerEventManager().put((ControllerEvent)TopicChange$.MODULE$);
        this.kafka$controller$ControllerEventManagerTest$$controllerEventManager().put((ControllerEvent)TopicChange$.MODULE$);
        latch.countDown();
        TestUtils$.MODULE$.waitUntilTrue((Function0<Object>)new Serializable(this, processedEvents){
            public static final long serialVersionUID = 0L;
            private final AtomicInteger processedEvents$2;

            public final boolean apply() {
                return this.apply$mcZ$sp();
            }

            public boolean apply$mcZ$sp() {
                return this.processedEvents$2.get() == 2;
            }
            {
                this.processedEvents$2 = processedEvents$2;
            }
        }, (Function0<String>)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final String apply() {
                return "Timed out waiting for processing of all events";
            }
        }, TestUtils$.MODULE$.waitUntilTrue$default$3(), TestUtils$.MODULE$.waitUntilTrue$default$4());
        Histogram queueTimeHistogram = (Histogram)((MapLike)JavaConverters$.MODULE$.mapAsScalaMapConverter(Metrics.defaultRegistry().allMetrics()).asScala()).filterKeys((Function1)new Serializable(this, metricName){
            public static final long serialVersionUID = 0L;
            private final String metricName$1;

            public final boolean apply(MetricName x$3) {
                String string = x$3.getMBeanName();
                String string2 = this.metricName$1;
                return !(string != null ? !string.equals(string2) : string2 != null);
            }
            {
                this.metricName$1 = metricName$1;
            }
        }).values().headOption().getOrElse((Function0)new Serializable(this, metricName){
            public static final long serialVersionUID = 0L;
            private final String metricName$1;

            public final void apply() {
                this.apply$mcV$sp();
            }

            public void apply$mcV$sp() {
                Assert.fail((String)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Unable to find metric ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.metricName$1})));
            }
            {
                this.metricName$1 = metricName$1;
            }
        });
        Assert.assertEquals((long)2L, (long)queueTimeHistogram.count());
        Assert.assertEquals((double)0.0, (double)queueTimeHistogram.min(), (double)0.01);
        Assert.assertEquals((double)500.0, (double)queueTimeHistogram.max(), (double)0.01);
    }

    @Test
    public void testSuccessfulEvent() {
        this.check("kafka.controller:type=ControllerStats,name=AutoLeaderBalanceRateAndTimeMs", (ControllerEvent)AutoPreferredReplicaLeaderElection$.MODULE$, (Function0<BoxedUnit>)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final void apply() {
                this.apply$mcV$sp();
            }

            public void apply$mcV$sp() {
            }
        });
    }

    @Test
    public void testEventThatThrowsException() {
        this.check("kafka.controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs", (ControllerEvent)BrokerChange$.MODULE$, (Function0<BoxedUnit>)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final Nothing$ apply() {
                throw new NullPointerException();
            }
        });
    }

    private void check(String metricName, ControllerEvent event, Function0<BoxedUnit> func) {
        ControllerStats controllerStats = new ControllerStats();
        AtomicInteger eventProcessedListenerCount = new AtomicInteger();
        CountDownLatch latch = new CountDownLatch(1);
        ControllerEventProcessor eventProcessor = new ControllerEventProcessor(this, func, eventProcessedListenerCount, latch){
            private final Function0 func$1;
            private final AtomicInteger eventProcessedListenerCount$1;
            private final CountDownLatch latch$2;

            public void process(ControllerEvent event) {
                this.latch$2.await();
                this.eventProcessedListenerCount$1.incrementAndGet();
                this.func$1.apply$mcV$sp();
            }

            public void preempt(ControllerEvent event) {
            }
            {
                this.func$1 = func$1;
                this.eventProcessedListenerCount$1 = eventProcessedListenerCount$1;
                this.latch$2 = latch$2;
            }
        };
        this.kafka$controller$ControllerEventManagerTest$$controllerEventManager_$eq(new ControllerEventManager(0, eventProcessor, (Time)new MockTime(), controllerStats.rateAndTimeMetrics()));
        this.kafka$controller$ControllerEventManagerTest$$controllerEventManager().start();
        long initialTimerCount = this.timer(metricName).count();
        this.kafka$controller$ControllerEventManagerTest$$controllerEventManager().put(event);
        TestUtils$.MODULE$.waitUntilTrue((Function0<Object>)new Serializable(this, event){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ ControllerEventManagerTest $outer;
            private final ControllerEvent event$1;

            public final boolean apply() {
                return this.apply$mcZ$sp();
            }

            public boolean apply$mcZ$sp() {
                ControllerState controllerState = this.$outer.kafka$controller$ControllerEventManagerTest$$controllerEventManager().state();
                ControllerState controllerState2 = this.event$1.state();
                return !(controllerState != null ? !controllerState.equals(controllerState2) : controllerState2 != null);
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
                this.event$1 = event$1;
            }
        }, (Function0<String>)new Serializable(this, event){
            public static final long serialVersionUID = 0L;
            private final ControllerEvent event$1;

            public final String apply() {
                return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Controller state is not ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.event$1.state()}));
            }
            {
                this.event$1 = event$1;
            }
        }, TestUtils$.MODULE$.waitUntilTrue$default$3(), TestUtils$.MODULE$.waitUntilTrue$default$4());
        latch.countDown();
        TestUtils$.MODULE$.waitUntilTrue((Function0<Object>)new Serializable(this){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ ControllerEventManagerTest $outer;

            public final boolean apply() {
                return this.apply$mcZ$sp();
            }

            public boolean apply$mcZ$sp() {
                ControllerState controllerState = this.$outer.kafka$controller$ControllerEventManagerTest$$controllerEventManager().state();
                ControllerState.Idle$ idle$ = ControllerState.Idle$.MODULE$;
                return !(controllerState != null ? !controllerState.equals(idle$) : idle$ != null);
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
            }
        }, (Function0<String>)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final String apply() {
                return "Controller state has not changed back to Idle";
            }
        }, TestUtils$.MODULE$.waitUntilTrue$default$3(), TestUtils$.MODULE$.waitUntilTrue$default$4());
        Assert.assertEquals((long)1L, (long)eventProcessedListenerCount.get());
        Assert.assertEquals((String)"Timer has not been updated", (long)(initialTimerCount + 1L), (long)this.timer(metricName).count());
    }

    private Timer timer(String metricName) {
        return (Timer)((MapLike)JavaConverters$.MODULE$.mapAsScalaMapConverter(Metrics.defaultRegistry().allMetrics()).asScala()).filterKeys((Function1)new Serializable(this, metricName){
            public static final long serialVersionUID = 0L;
            private final String metricName$2;

            public final boolean apply(MetricName x$4) {
                String string = x$4.getMBeanName();
                String string2 = this.metricName$2;
                return !(string != null ? !string.equals(string2) : string2 != null);
            }
            {
                this.metricName$2 = metricName$2;
            }
        }).values().headOption().getOrElse((Function0)new Serializable(this, metricName){
            public static final long serialVersionUID = 0L;
            private final String metricName$2;

            public final void apply() {
                this.apply$mcV$sp();
            }

            public void apply$mcV$sp() {
                Assert.fail((String)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Unable to find metric ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.metricName$2})));
            }
            {
                this.metricName$2 = metricName$2;
            }
        });
    }

    private final scala.collection.immutable.Set allEventManagerMetrics$1() {
        return ((TraversableOnce)((MapLike)JavaConverters$.MODULE$.mapAsScalaMapConverter(Metrics.defaultRegistry().allMetrics()).asScala()).keySet().filter((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final boolean apply(MetricName x$1) {
                return x$1.getMBeanName().startsWith("kafka.controller:type=ControllerEventManager");
            }
        })).toSet();
    }
}

