/*
 * Decompiled with CFR 0.152.
 */
package kafka.server;

import com.yammer.metrics.core.Meter;
import java.io.Serializable;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import kafka.server.BaseRequestTest;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.utils.CoreUtils$;
import kafka.utils.MockTopicCreatePolicy;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.scalactic.source.Position;
import org.scalatest.Assertions$;
import scala.Function0;
import scala.Function1;
import scala.Predef;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.Map;
import scala.collection.Seq;
import scala.package$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichLong$;
import scala.runtime.ScalaRunTime$;

@ScalaSignature(bytes="\u0006\u0005a2AAB\u0004\u0001\u0019!)\u0011\u0003\u0001C\u0001%!)A\u0003\u0001C!+!)a\u0005\u0001C\u0001O!)Q\u0007\u0001C\u0001O!)q\u0007\u0001C\u0001O\ta\u0002K]8ek\u000e,'+Z9vKN$x+\u001b;i!>d\u0017nY=UKN$(B\u0001\u0005\n\u0003\u0019\u0019XM\u001d<fe*\t!\"A\u0003lC\u001a\\\u0017m\u0001\u0001\u0014\u0005\u0001i\u0001C\u0001\b\u0010\u001b\u00059\u0011B\u0001\t\b\u0005=\u0011\u0015m]3SKF,Xm\u001d;UKN$\u0018A\u0002\u001fj]&$h\bF\u0001\u0014!\tq\u0001!A\fce>\\WM\u001d)s_B,'\u000f^=Pm\u0016\u0014(/\u001b3fgR\u0011a\u0003\b\t\u0003/ii\u0011\u0001\u0007\u0006\u00023\u0005)1oY1mC&\u00111\u0004\u0007\u0002\u0005+:LG\u000fC\u0003\u001e\u0005\u0001\u0007a$\u0001\u0006qe>\u0004XM\u001d;jKN\u0004\"a\b\u0013\u000e\u0003\u0001R!!\t\u0012\u0002\tU$\u0018\u000e\u001c\u0006\u0002G\u0005!!.\u0019<b\u0013\t)\u0003E\u0001\u0006Qe>\u0004XM\u001d;jKN\f\u0011\u0006^3tiB\u0013x\u000eZ;dKJ+\u0017/^3ti\u0006;\u0017-\u001b8ti:{g.\u0012=jgRLgn\u001a+pa&\u001cG#\u0001\f)\u0005\rI\u0003C\u0001\u00164\u001b\u0005Y#B\u0001\u0017.\u0003\r\t\u0007/\u001b\u0006\u0003]=\nqA[;qSR,'O\u0003\u00021c\u0005)!.\u001e8ji*\t!'A\u0002pe\u001eL!\u0001N\u0016\u0003\tQ+7\u000f^\u0001\u001ci\u0016\u001cH/Q;u_R{\u0007/[2De\u0016\fG/[8o\u001b\u0016$(/[2)\u0005\u0011I\u0013\u0001\u00073jg\u0006\u0014G.Z!vi>$v\u000e]5d\u0007J,\u0017\r^5p]\u0002")
public class ProduceRequestWithPolicyTest
extends BaseRequestTest {
    @Override
    public void brokerPropertyOverrides(Properties properties) {
        properties.setProperty(KafkaConfig$.MODULE$.CreateTopicPolicyClassNameProp(), MockTopicCreatePolicy.class.getName());
    }

    @Test
    public void testProduceRequestAgainstNonExistingTopic() {
        String goodTopic = "random-topic-that-should-be-auto-created";
        TestUtils$.MODULE$.generateAndProduceMessages(this.servers(), goodTopic, 1, -1);
        String badTopic = "disallowed-topic";
        ExecutionException exception = (ExecutionException)Assertions$.MODULE$.intercept((Function0 & Serializable)() -> TestUtils$.MODULE$.generateAndProduceMessages(this.servers(), badTopic, 1, -1), ClassTag$.MODULE$.apply(ExecutionException.class), new Position("ProduceRequestWithPolicyTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 47));
        Assertions.assertEquals(TopicAuthorizationException.class, exception.getCause().getClass());
    }

    @Test
    public void testAutoTopicCreationMetric() {
        String existingTopic = "topic1";
        String nonExistingtopic = "topic2";
        String nonExistingtopic2 = "topic3";
        String disallowedTopic = "disallowed-topic";
        Meter atcConfigAcceptedMeter = ((KafkaServer)this.servers().head()).dataPlaneRequestProcessor().requestChannel().atcConfigEnabledMeter();
        Meter atcConfigDeniedMeter = ((KafkaServer)this.servers().head()).dataPlaneRequestProcessor().requestChannel().atcConfigDisabledMeter();
        long successCnt = atcConfigAcceptedMeter.count();
        long failCnt = atcConfigDeniedMeter.count();
        TestUtils$.MODULE$.createTopic(this.zkClient(), existingTopic, (Map<Object, Seq<Object>>)((Map)Predef$.MODULE$.Map().apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)BoxesRunTime.boxToInteger((int)0)), (Object)package$.MODULE$.Seq().apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapIntArray(new int[]{0})))}))), this.servers());
        TestUtils$.MODULE$.generateAndProduceMessages(this.servers(), existingTopic, 1, -1);
        Assertions.assertEquals((long)0L, (long)(atcConfigAcceptedMeter.count() - successCnt));
        Assertions.assertEquals((long)0L, (long)(atcConfigDeniedMeter.count() - failCnt));
        successCnt = atcConfigAcceptedMeter.count();
        failCnt = atcConfigDeniedMeter.count();
        TestUtils$.MODULE$.generateAndProduceMessages(this.servers(), nonExistingtopic, 1, -1);
        Assertions.assertEquals((long)1L, (long)(atcConfigAcceptedMeter.count() - successCnt));
        Assertions.assertEquals((long)0L, (long)(atcConfigDeniedMeter.count() - failCnt));
        successCnt = atcConfigAcceptedMeter.count();
        failCnt = atcConfigDeniedMeter.count();
        Assertions$.MODULE$.intercept((Function0 & Serializable)() -> TestUtils$.MODULE$.generateAndProduceMessages(this.servers(), disallowedTopic, 1, -1), ClassTag$.MODULE$.apply(ExecutionException.class), new Position("ProduceRequestWithPolicyTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 80));
        Assertions.assertEquals((long)1L, (long)(atcConfigAcceptedMeter.count() - successCnt));
        Assertions.assertEquals((long)0L, (long)(atcConfigDeniedMeter.count() - failCnt));
        successCnt = atcConfigAcceptedMeter.count();
        failCnt = atcConfigDeniedMeter.count();
        TestUtils$.MODULE$.generateAndProduceMessages(this.servers(), existingTopic, 1, -1);
        Assertions.assertEquals((long)0L, (long)(atcConfigAcceptedMeter.count() - successCnt));
        Assertions.assertEquals((long)0L, (long)(atcConfigDeniedMeter.count() - failCnt));
        successCnt = atcConfigAcceptedMeter.count();
        atcConfigDeniedMeter.count();
        this.disableAutoTopicCreation();
        try {
            TestUtils$.MODULE$.generateAndProduceMessages(this.servers(), nonExistingtopic2, 1, -1);
            Assertions.fail((String)"producing to a non-existing topic should fail when auto-topic creation is disabled");
        }
        catch (ExecutionException executionException) {
            Assertions.assertTrue((boolean)(executionException.getCause() instanceof TimeoutException));
        }
        catch (Throwable throwable) {
            throwable.printStackTrace();
            Assertions.fail((String)"Should expect ExecutionException");
        }
        Assertions.assertEquals((long)0L, (long)(atcConfigAcceptedMeter.count() - successCnt));
        Assertions.assertTrue((atcConfigDeniedMeter.count() > 0L ? 1 : 0) != 0);
    }

    /*
     * WARNING - void declaration
     */
    public void disableAutoTopicCreation() {
        Properties props = CoreUtils$.MODULE$.propsWith(KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), Boolean.toString(false));
        try (ConfluentAdmin admin = this.createConfluentAdminClient(this.createConfluentAdminClient$default$1());){
            TestUtils$.MODULE$.incrementalAlterConfigs(this.servers(), (Admin)admin, props, false, AlterConfigOp.OpType.SET).all().get();
            long l = 100L;
            long waitUntilTrue_waitTimeMs = 3000L;
            long waitUntilTrue_startTime = System.currentTimeMillis();
            while (!ProduceRequestWithPolicyTest.$anonfun$disableAutoTopicCreation$1(this)) {
                void waitUntilTrue_pause;
                if (System.currentTimeMillis() > waitUntilTrue_startTime + waitUntilTrue_waitTimeMs) {
                    Assertions.fail((String)"Timed out waiting for config update to propagate");
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue_waitTimeMs), (long)waitUntilTrue_pause));
            }
        }
    }

    public static final /* synthetic */ boolean $anonfun$disableAutoTopicCreation$2(KafkaServer x$1) {
        return BoxesRunTime.equals((Object)x$1.config().autoCreateTopicsEnable(), (Object)BoxesRunTime.boxToBoolean((boolean)false));
    }

    public static final /* synthetic */ boolean $anonfun$disableAutoTopicCreation$1(ProduceRequestWithPolicyTest $this) {
        return $this.servers().forall((Function1 & Serializable)x$1 -> BoxesRunTime.boxToBoolean((boolean)ProduceRequestWithPolicyTest.$anonfun$disableAutoTopicCreation$2(x$1)));
    }

    public static final /* synthetic */ String $anonfun$disableAutoTopicCreation$3() {
        return "Timed out waiting for config update to propagate";
    }
}

