/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.producer;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.internals.ProducerInterceptors;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.ExtendedSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.test.MockMetricsReporter;
import org.apache.kafka.test.MockPartitioner;
import org.apache.kafka.test.MockProducerInterceptor;
import org.apache.kafka.test.MockSerializer;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
import org.powermock.api.support.membermodification.MemberModifier;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareOnlyThisForTest;
import org.powermock.modules.junit4.PowerMockRunner;

@RunWith(value=PowerMockRunner.class)
@PowerMockIgnore(value={"javax.management.*"})
public class KafkaProducerTest {
    @Test
    public void testConstructorWithSerializers() {
        Properties producerProps = new Properties();
        producerProps.put("bootstrap.servers", "localhost:9000");
        new KafkaProducer(producerProps, (Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer()).close();
    }

    @Test(expected=ConfigException.class)
    public void testNoSerializerProvided() {
        Properties producerProps = new Properties();
        producerProps.put("bootstrap.servers", "localhost:9000");
        new KafkaProducer(producerProps);
    }

    @Test
    public void testConstructorFailureCloseResource() {
        Properties props = new Properties();
        props.setProperty("client.id", "testConstructorClose");
        props.setProperty("bootstrap.servers", "some.invalid.hostname.foo.bar.local:9999");
        props.setProperty("metric.reporters", MockMetricsReporter.class.getName());
        int oldInitCount = MockMetricsReporter.INIT_COUNT.get();
        int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get();
        try {
            KafkaProducer kafkaProducer = new KafkaProducer(props, (Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());
        }
        catch (KafkaException e) {
            Assert.assertEquals((long)(oldInitCount + 1), (long)MockMetricsReporter.INIT_COUNT.get());
            Assert.assertEquals((long)(oldCloseCount + 1), (long)MockMetricsReporter.CLOSE_COUNT.get());
            Assert.assertEquals((Object)"Failed to construct kafka producer", (Object)e.getMessage());
            return;
        }
        Assert.fail((String)"should have caught an exception and returned");
    }

    @Test
    public void testSerializerClose() throws Exception {
        HashMap<String, String> configs = new HashMap<String, String>();
        configs.put("client.id", "testConstructorClose");
        configs.put("bootstrap.servers", "localhost:9999");
        configs.put("metric.reporters", MockMetricsReporter.class.getName());
        configs.put("security.protocol", "PLAINTEXT");
        int oldInitCount = MockSerializer.INIT_COUNT.get();
        int oldCloseCount = MockSerializer.CLOSE_COUNT.get();
        KafkaProducer producer = new KafkaProducer(configs, (Serializer)new MockSerializer(), (Serializer)new MockSerializer());
        Assert.assertEquals((long)(oldInitCount + 2), (long)MockSerializer.INIT_COUNT.get());
        Assert.assertEquals((long)oldCloseCount, (long)MockSerializer.CLOSE_COUNT.get());
        producer.close();
        Assert.assertEquals((long)(oldInitCount + 2), (long)MockSerializer.INIT_COUNT.get());
        Assert.assertEquals((long)(oldCloseCount + 2), (long)MockSerializer.CLOSE_COUNT.get());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testInterceptorConstructClose() throws Exception {
        try {
            Properties props = new Properties();
            props.setProperty("bootstrap.servers", "localhost:9999");
            props.setProperty("interceptor.classes", MockProducerInterceptor.class.getName());
            props.setProperty("mock.interceptor.append", "something");
            KafkaProducer producer = new KafkaProducer(props, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            Assert.assertEquals((long)1L, (long)MockProducerInterceptor.INIT_COUNT.get());
            Assert.assertEquals((long)0L, (long)MockProducerInterceptor.CLOSE_COUNT.get());
            Assert.assertNull((Object)MockProducerInterceptor.CLUSTER_META.get());
            producer.close();
            Assert.assertEquals((long)1L, (long)MockProducerInterceptor.INIT_COUNT.get());
            Assert.assertEquals((long)1L, (long)MockProducerInterceptor.CLOSE_COUNT.get());
        }
        finally {
            MockProducerInterceptor.resetCounters();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPartitionerClose() throws Exception {
        try {
            Properties props = new Properties();
            props.setProperty("bootstrap.servers", "localhost:9999");
            props.setProperty("partitioner.class", MockPartitioner.class.getName());
            KafkaProducer producer = new KafkaProducer(props, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            Assert.assertEquals((long)1L, (long)MockPartitioner.INIT_COUNT.get());
            Assert.assertEquals((long)0L, (long)MockPartitioner.CLOSE_COUNT.get());
            producer.close();
            Assert.assertEquals((long)1L, (long)MockPartitioner.INIT_COUNT.get());
            Assert.assertEquals((long)1L, (long)MockPartitioner.CLOSE_COUNT.get());
        }
        finally {
            MockPartitioner.resetCounters();
        }
    }

    @Test
    public void testOsDefaultSocketBufferSizes() throws Exception {
        HashMap<String, Object> config = new HashMap<String, Object>();
        config.put("bootstrap.servers", "localhost:9999");
        config.put("send.buffer.bytes", -1);
        config.put("receive.buffer.bytes", -1);
        KafkaProducer producer = new KafkaProducer(config, (Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());
        producer.close();
    }

    @Test(expected=KafkaException.class)
    public void testInvalidSocketSendBufferSize() throws Exception {
        HashMap<String, Object> config = new HashMap<String, Object>();
        config.put("bootstrap.servers", "localhost:9999");
        config.put("send.buffer.bytes", -2);
        new KafkaProducer(config, (Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());
    }

    @Test(expected=KafkaException.class)
    public void testInvalidSocketReceiveBufferSize() throws Exception {
        HashMap<String, Object> config = new HashMap<String, Object>();
        config.put("bootstrap.servers", "localhost:9999");
        config.put("receive.buffer.bytes", -2);
        new KafkaProducer(config, (Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());
    }

    @PrepareOnlyThisForTest(value={Metadata.class})
    @Test
    public void testMetadataFetch() throws Exception {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "localhost:9999");
        KafkaProducer producer = new KafkaProducer(props, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
        Metadata metadata = (Metadata)PowerMock.createNiceMock(Metadata.class);
        MemberModifier.field(KafkaProducer.class, (String)"metadata").set(producer, metadata);
        String topic = "topic";
        ProducerRecord record = new ProducerRecord(topic, (Object)"value");
        List<Node> nodes = Collections.singletonList(new Node(0, "host1", 1000));
        Cluster emptyCluster = new Cluster(null, nodes, Collections.emptySet(), Collections.emptySet(), Collections.emptySet());
        Cluster cluster = new Cluster("dummy", Collections.singletonList(new Node(0, "host1", 1000)), Arrays.asList(new PartitionInfo(topic, 0, null, null, null)), Collections.emptySet(), Collections.emptySet());
        int refreshAttempts = 5;
        EasyMock.expect((Object)metadata.fetch()).andReturn((Object)emptyCluster).times(4);
        EasyMock.expect((Object)metadata.fetch()).andReturn((Object)cluster).once();
        EasyMock.expect((Object)metadata.fetch()).andThrow((Throwable)new IllegalStateException("Unexpected call to metadata.fetch()")).anyTimes();
        PowerMock.replay((Object[])new Object[]{metadata});
        producer.send(record);
        PowerMock.verify((Object[])new Object[]{metadata});
        PowerMock.reset((Object[])new Object[]{metadata});
        EasyMock.expect((Object)metadata.fetch()).andReturn((Object)cluster).once();
        EasyMock.expect((Object)metadata.fetch()).andThrow((Throwable)new IllegalStateException("Unexpected call to metadata.fetch()")).anyTimes();
        PowerMock.replay((Object[])new Object[]{metadata});
        producer.send(record, null);
        PowerMock.verify((Object[])new Object[]{metadata});
        PowerMock.reset((Object[])new Object[]{metadata});
        EasyMock.expect((Object)metadata.fetch()).andReturn((Object)cluster).once();
        EasyMock.expect((Object)metadata.fetch()).andThrow((Throwable)new IllegalStateException("Unexpected call to metadata.fetch()")).anyTimes();
        PowerMock.replay((Object[])new Object[]{metadata});
        producer.partitionsFor(topic);
        PowerMock.verify((Object[])new Object[]{metadata});
    }

    @PrepareOnlyThisForTest(value={Metadata.class})
    @Test
    public void testMetadataFetchOnStaleMetadata() throws Exception {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "localhost:9999");
        KafkaProducer producer = new KafkaProducer(props, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
        Metadata metadata = (Metadata)PowerMock.createNiceMock(Metadata.class);
        MemberModifier.field(KafkaProducer.class, (String)"metadata").set(producer, metadata);
        String topic = "topic";
        ProducerRecord initialRecord = new ProducerRecord(topic, (Object)"value");
        ProducerRecord extendedRecord = new ProducerRecord(topic, Integer.valueOf(2), null, (Object)"value");
        List<Node> nodes = Collections.singletonList(new Node(0, "host1", 1000));
        Cluster emptyCluster = new Cluster(null, nodes, Collections.emptySet(), Collections.emptySet(), Collections.emptySet());
        Cluster initialCluster = new Cluster("dummy", Collections.singletonList(new Node(0, "host1", 1000)), Arrays.asList(new PartitionInfo(topic, 0, null, null, null)), Collections.emptySet(), Collections.emptySet());
        Cluster extendedCluster = new Cluster("dummy", Collections.singletonList(new Node(0, "host1", 1000)), Arrays.asList(new PartitionInfo(topic, 0, null, null, null), new PartitionInfo(topic, 1, null, null, null), new PartitionInfo(topic, 2, null, null, null)), Collections.emptySet(), Collections.emptySet());
        int refreshAttempts = 5;
        EasyMock.expect((Object)metadata.fetch()).andReturn((Object)emptyCluster).times(4);
        EasyMock.expect((Object)metadata.fetch()).andReturn((Object)initialCluster).once();
        EasyMock.expect((Object)metadata.fetch()).andThrow((Throwable)new IllegalStateException("Unexpected call to metadata.fetch()")).anyTimes();
        PowerMock.replay((Object[])new Object[]{metadata});
        producer.send(initialRecord);
        PowerMock.verify((Object[])new Object[]{metadata});
        PowerMock.reset((Object[])new Object[]{metadata});
        EasyMock.expect((Object)metadata.fetch()).andReturn((Object)initialCluster).once();
        EasyMock.expect((Object)metadata.fetch()).andThrow((Throwable)new IllegalStateException("Unexpected call to metadata.fetch()")).anyTimes();
        PowerMock.replay((Object[])new Object[]{metadata});
        producer.send(initialRecord, null);
        PowerMock.verify((Object[])new Object[]{metadata});
        PowerMock.reset((Object[])new Object[]{metadata});
        EasyMock.expect((Object)metadata.fetch()).andReturn((Object)initialCluster).once();
        EasyMock.expect((Object)metadata.fetch()).andReturn((Object)initialCluster).once();
        EasyMock.expect((Object)metadata.fetch()).andThrow((Throwable)new IllegalStateException("Unexpected call to metadata.fetch()")).anyTimes();
        PowerMock.replay((Object[])new Object[]{metadata});
        try {
            producer.send(extendedRecord, null);
            Assert.fail((String)"Expected KafkaException to be raised");
        }
        catch (KafkaException kafkaException) {
            // empty catch block
        }
        PowerMock.verify((Object[])new Object[]{metadata});
        PowerMock.reset((Object[])new Object[]{metadata});
        EasyMock.expect((Object)metadata.fetch()).andReturn((Object)initialCluster).once();
        EasyMock.expect((Object)metadata.fetch()).andReturn((Object)extendedCluster).once();
        EasyMock.expect((Object)metadata.fetch()).andThrow((Throwable)new IllegalStateException("Unexpected call to metadata.fetch()")).anyTimes();
        PowerMock.replay((Object[])new Object[]{metadata});
        producer.send(extendedRecord, null);
        PowerMock.verify((Object[])new Object[]{metadata});
    }

    @Test
    public void testTopicRefreshInMetadata() throws Exception {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "localhost:9999");
        props.setProperty("max.block.ms", "600000");
        KafkaProducer producer = new KafkaProducer(props, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
        long refreshBackoffMs = 500L;
        long metadataExpireMs = 60000L;
        final Metadata metadata = new Metadata(refreshBackoffMs, metadataExpireMs, true, true, new ClusterResourceListeners());
        final MockTime time = new MockTime();
        MemberModifier.field(KafkaProducer.class, (String)"metadata").set(producer, metadata);
        MemberModifier.field(KafkaProducer.class, (String)"time").set(producer, time);
        String topic = "topic";
        Thread t = new Thread(){

            @Override
            public void run() {
                long startTimeMs = System.currentTimeMillis();
                for (int i = 0; i < 10; ++i) {
                    while (!metadata.updateRequested() && System.currentTimeMillis() - startTimeMs < 1000L) {
                        1.yield();
                    }
                    metadata.update(Cluster.empty(), Collections.singleton("topic"), time.milliseconds());
                    time.sleep(60000L);
                }
            }
        };
        t.start();
        try {
            producer.partitionsFor("topic");
            Assert.fail((String)"Expect TimeoutException");
        }
        catch (TimeoutException timeoutException) {
            // empty catch block
        }
        Assert.assertTrue((String)"Topic should still exist in metadata", (boolean)metadata.containsTopic("topic"));
    }

    @PrepareOnlyThisForTest(value={Metadata.class})
    @Test
    public void testHeaders() throws Exception {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "localhost:9999");
        ExtendedSerializer keySerializer = (ExtendedSerializer)PowerMock.createNiceMock(ExtendedSerializer.class);
        ExtendedSerializer valueSerializer = (ExtendedSerializer)PowerMock.createNiceMock(ExtendedSerializer.class);
        KafkaProducer producer = new KafkaProducer(props, (Serializer)keySerializer, (Serializer)valueSerializer);
        Metadata metadata = (Metadata)PowerMock.createNiceMock(Metadata.class);
        MemberModifier.field(KafkaProducer.class, (String)"metadata").set(producer, metadata);
        String topic = "topic";
        List<Node> nodes = Collections.singletonList(new Node(0, "host1", 1000));
        Cluster cluster = new Cluster("dummy", Collections.singletonList(new Node(0, "host1", 1000)), Arrays.asList(new PartitionInfo(topic, 0, null, null, null)), Collections.emptySet(), Collections.emptySet());
        EasyMock.expect((Object)metadata.fetch()).andReturn((Object)cluster).anyTimes();
        PowerMock.replay((Object[])new Object[]{metadata});
        String value = "value";
        ProducerRecord record = new ProducerRecord(topic, (Object)value);
        EasyMock.expect((Object)keySerializer.serialize(topic, record.headers(), null)).andReturn(null).once();
        EasyMock.expect((Object)valueSerializer.serialize(topic, record.headers(), (Object)value)).andReturn((Object)value.getBytes()).once();
        PowerMock.replay((Object[])new Object[]{keySerializer});
        PowerMock.replay((Object[])new Object[]{valueSerializer});
        record.headers().add((Header)new RecordHeader("test", "header2".getBytes()));
        producer.send(record, null);
        try {
            record.headers().add((Header)new RecordHeader("test", "test".getBytes()));
            Assert.fail((String)"Expected IllegalStateException to be raised");
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
        Assert.assertTrue((boolean)Arrays.equals(record.headers().lastHeader("test").value(), "header2".getBytes()));
        PowerMock.verify((Object[])new Object[]{valueSerializer});
        PowerMock.verify((Object[])new Object[]{keySerializer});
    }

    @Test
    public void closeShouldBeIdempotent() {
        Properties producerProps = new Properties();
        producerProps.put("bootstrap.servers", "localhost:9000");
        KafkaProducer producer = new KafkaProducer(producerProps, (Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());
        producer.close();
        producer.close();
    }

    @Test
    public void testMetricConfigRecordingLevel() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9000");
        try (KafkaProducer producer = new KafkaProducer(props, (Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());){
            Assert.assertEquals((Object)Sensor.RecordingLevel.INFO, (Object)producer.metrics.config().recordLevel());
        }
        props.put("metrics.recording.level", "DEBUG");
        producer = new KafkaProducer(props, (Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());
        var3_3 = null;
        try {
            Assert.assertEquals((Object)Sensor.RecordingLevel.DEBUG, (Object)producer.metrics.config().recordLevel());
        }
        catch (Throwable throwable) {
            var3_3 = throwable;
            throw throwable;
        }
        finally {
            if (producer != null) {
                if (var3_3 != null) {
                    try {
                        producer.close();
                    }
                    catch (Throwable x2) {
                        var3_3.addSuppressed(x2);
                    }
                } else {
                    producer.close();
                }
            }
        }
    }

    @PrepareOnlyThisForTest(value={Metadata.class})
    @Test
    public void testInterceptorPartitionSetOnTooLargeRecord() throws Exception {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "localhost:9999");
        props.setProperty("max.request.size", "1");
        String topic = "topic";
        ProducerRecord record = new ProducerRecord(topic, (Object)"value");
        KafkaProducer producer = new KafkaProducer(props, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
        Metadata metadata = (Metadata)PowerMock.createNiceMock(Metadata.class);
        MemberModifier.field(KafkaProducer.class, (String)"metadata").set(producer, metadata);
        Cluster cluster = new Cluster("dummy", Collections.singletonList(new Node(0, "host1", 1000)), Arrays.asList(new PartitionInfo(topic, 0, null, null, null)), Collections.emptySet(), Collections.emptySet());
        EasyMock.expect((Object)metadata.fetch()).andReturn((Object)cluster).once();
        ProducerInterceptors interceptors = (ProducerInterceptors)PowerMock.createMock(ProducerInterceptors.class);
        EasyMock.expect((Object)interceptors.onSend(record)).andReturn((Object)record);
        interceptors.onSendError((ProducerRecord)EasyMock.eq((Object)record), (TopicPartition)EasyMock.notNull(), (Exception)EasyMock.notNull());
        EasyMock.expectLastCall();
        MemberModifier.field(KafkaProducer.class, (String)"interceptors").set(producer, interceptors);
        PowerMock.replay((Object[])new Object[]{metadata});
        EasyMock.replay((Object[])new Object[]{interceptors});
        producer.send(record);
        EasyMock.verify((Object[])new Object[]{interceptors});
    }

    @Test
    public void testPartitionsForWithNullTopic() {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "localhost:9000");
        try (KafkaProducer producer = new KafkaProducer(props, (Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());){
            producer.partitionsFor(null);
            Assert.fail((String)"Expected NullPointerException to be raised");
        }
        catch (NullPointerException nullPointerException) {
            // empty catch block
        }
    }

    @Test
    public void testBatchExpiryMs() {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "localhost:2002");
        props.setProperty("request.timeout.ms", "5");
        try (KafkaProducer producerWithDefaultBatchExpiry = new KafkaProducer(props, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());){
            Assert.assertEquals((long)5L, (long)producerWithDefaultBatchExpiry.batchExpiryMs());
        }
        props.setProperty("confluent.batch.expiry.ms", "10");
        var3_3 = null;
        try (KafkaProducer producerWithConfiguredBatchExpiry = new KafkaProducer(props, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());){
            Assert.assertEquals((long)10L, (long)producerWithConfiguredBatchExpiry.batchExpiryMs());
        }
        catch (Throwable throwable) {
            var3_3 = throwable;
            throw throwable;
        }
    }
}

