package org.apache.kafka.clients.producer;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Properties;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.producer.internals.ProducerInterceptors;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.AbstractConfigTest;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.ExtendedSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.test.MockMetricsReporter;
import org.apache.kafka.test.MockPartitioner;
import org.apache.kafka.test.MockProducerInterceptor;
import org.apache.kafka.test.MockSerializer;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
import org.powermock.api.support.membermodification.MemberModifier;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareOnlyThisForTest;
import org.powermock.modules.junit4.PowerMockRunner;

@RunWith(PowerMockRunner.class)
@PowerMockIgnore({"javax.management.*"})
/* loaded from: input_file:org/apache/kafka/clients/producer/KafkaProducerTest.class */
public class KafkaProducerTest {
    @Test
    public void testConstructorWithSerializers() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9000");
        new KafkaProducer(properties, new ByteArraySerializer(), new ByteArraySerializer()).close();
    }

    @Test(expected = ConfigException.class)
    public void testNoSerializerProvided() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9000");
        new KafkaProducer(properties);
    }

    @Test
    public void testConstructorFailureCloseResource() {
        Properties properties = new Properties();
        properties.setProperty("client.id", "testConstructorClose");
        properties.setProperty("bootstrap.servers", "some.invalid.hostname.foo.bar.local:9999");
        properties.setProperty(AbstractConfigTest.TestConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
        int i = MockMetricsReporter.INIT_COUNT.get();
        int i2 = MockMetricsReporter.CLOSE_COUNT.get();
        try {
            new KafkaProducer(properties, new ByteArraySerializer(), new ByteArraySerializer());
            Assert.fail("should have caught an exception and returned");
        } catch (KafkaException e) {
            Assert.assertEquals(i + 1, MockMetricsReporter.INIT_COUNT.get());
            Assert.assertEquals(i2 + 1, MockMetricsReporter.CLOSE_COUNT.get());
            Assert.assertEquals("Failed to construct kafka producer", e.getMessage());
        }
    }

    @Test
    public void testSerializerClose() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("client.id", "testConstructorClose");
        hashMap.put("bootstrap.servers", "localhost:9999");
        hashMap.put(AbstractConfigTest.TestConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
        hashMap.put("security.protocol", "PLAINTEXT");
        int i = MockSerializer.INIT_COUNT.get();
        int i2 = MockSerializer.CLOSE_COUNT.get();
        KafkaProducer kafkaProducer = new KafkaProducer(hashMap, new MockSerializer(), new MockSerializer());
        Assert.assertEquals(i + 2, MockSerializer.INIT_COUNT.get());
        Assert.assertEquals(i2, MockSerializer.CLOSE_COUNT.get());
        kafkaProducer.close();
        Assert.assertEquals(i + 2, MockSerializer.INIT_COUNT.get());
        Assert.assertEquals(i2 + 2, MockSerializer.CLOSE_COUNT.get());
    }

    @Test
    public void testInterceptorConstructClose() throws Exception {
        try {
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", "localhost:9999");
            properties.setProperty("interceptor.classes", MockProducerInterceptor.class.getName());
            properties.setProperty(MockProducerInterceptor.APPEND_STRING_PROP, "something");
            KafkaProducer kafkaProducer = new KafkaProducer(properties, new StringSerializer(), new StringSerializer());
            Assert.assertEquals(1L, MockProducerInterceptor.INIT_COUNT.get());
            Assert.assertEquals(0L, MockProducerInterceptor.CLOSE_COUNT.get());
            Assert.assertNull(MockProducerInterceptor.CLUSTER_META.get());
            kafkaProducer.close();
            Assert.assertEquals(1L, MockProducerInterceptor.INIT_COUNT.get());
            Assert.assertEquals(1L, MockProducerInterceptor.CLOSE_COUNT.get());
        } finally {
            MockProducerInterceptor.resetCounters();
        }
    }

    @Test
    public void testPartitionerClose() throws Exception {
        try {
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", "localhost:9999");
            properties.setProperty("partitioner.class", MockPartitioner.class.getName());
            KafkaProducer kafkaProducer = new KafkaProducer(properties, new StringSerializer(), new StringSerializer());
            Assert.assertEquals(1L, MockPartitioner.INIT_COUNT.get());
            Assert.assertEquals(0L, MockPartitioner.CLOSE_COUNT.get());
            kafkaProducer.close();
            Assert.assertEquals(1L, MockPartitioner.INIT_COUNT.get());
            Assert.assertEquals(1L, MockPartitioner.CLOSE_COUNT.get());
        } finally {
            MockPartitioner.resetCounters();
        }
    }

    @Test
    public void testOsDefaultSocketBufferSizes() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", "localhost:9999");
        hashMap.put("send.buffer.bytes", -1);
        hashMap.put("receive.buffer.bytes", -1);
        new KafkaProducer(hashMap, new ByteArraySerializer(), new ByteArraySerializer()).close();
    }

    @Test(expected = KafkaException.class)
    public void testInvalidSocketSendBufferSize() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", "localhost:9999");
        hashMap.put("send.buffer.bytes", -2);
        new KafkaProducer(hashMap, new ByteArraySerializer(), new ByteArraySerializer());
    }

    @Test(expected = KafkaException.class)
    public void testInvalidSocketReceiveBufferSize() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", "localhost:9999");
        hashMap.put("receive.buffer.bytes", -2);
        new KafkaProducer(hashMap, new ByteArraySerializer(), new ByteArraySerializer());
    }

    @PrepareOnlyThisForTest({Metadata.class})
    @Test
    public void testMetadataFetch() throws Exception {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9999");
        KafkaProducer kafkaProducer = new KafkaProducer(properties, new StringSerializer(), new StringSerializer());
        Metadata metadata = (Metadata) PowerMock.createNiceMock(Metadata.class);
        MemberModifier.field(KafkaProducer.class, "metadata").set(kafkaProducer, metadata);
        ProducerRecord producerRecord = new ProducerRecord("topic", "value");
        Cluster cluster = new Cluster((String) null, Collections.singletonList(new Node(0, "host1", 1000)), Collections.emptySet(), Collections.emptySet(), Collections.emptySet());
        Cluster cluster2 = new Cluster("dummy", Collections.singletonList(new Node(0, "host1", 1000)), Arrays.asList(new PartitionInfo("topic", 0, (Node) null, (Node[]) null, (Node[]) null)), Collections.emptySet(), Collections.emptySet());
        EasyMock.expect(metadata.fetch()).andReturn(cluster).times(4);
        EasyMock.expect(metadata.fetch()).andReturn(cluster2).once();
        EasyMock.expect(metadata.fetch()).andThrow(new IllegalStateException("Unexpected call to metadata.fetch()")).anyTimes();
        PowerMock.replay(new Object[]{metadata});
        kafkaProducer.send(producerRecord);
        PowerMock.verify(new Object[]{metadata});
        PowerMock.reset(new Object[]{metadata});
        EasyMock.expect(metadata.fetch()).andReturn(cluster2).once();
        EasyMock.expect(metadata.fetch()).andThrow(new IllegalStateException("Unexpected call to metadata.fetch()")).anyTimes();
        PowerMock.replay(new Object[]{metadata});
        kafkaProducer.send(producerRecord, (Callback) null);
        PowerMock.verify(new Object[]{metadata});
        PowerMock.reset(new Object[]{metadata});
        EasyMock.expect(metadata.fetch()).andReturn(cluster2).once();
        EasyMock.expect(metadata.fetch()).andThrow(new IllegalStateException("Unexpected call to metadata.fetch()")).anyTimes();
        PowerMock.replay(new Object[]{metadata});
        kafkaProducer.partitionsFor("topic");
        PowerMock.verify(new Object[]{metadata});
    }

    @PrepareOnlyThisForTest({Metadata.class})
    @Test
    public void testMetadataFetchOnStaleMetadata() throws Exception {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9999");
        KafkaProducer kafkaProducer = new KafkaProducer(properties, new StringSerializer(), new StringSerializer());
        Metadata metadata = (Metadata) PowerMock.createNiceMock(Metadata.class);
        MemberModifier.field(KafkaProducer.class, "metadata").set(kafkaProducer, metadata);
        ProducerRecord producerRecord = new ProducerRecord("topic", "value");
        ProducerRecord producerRecord2 = new ProducerRecord("topic", 2, (Object) null, "value");
        Cluster cluster = new Cluster((String) null, Collections.singletonList(new Node(0, "host1", 1000)), Collections.emptySet(), Collections.emptySet(), Collections.emptySet());
        Cluster cluster2 = new Cluster("dummy", Collections.singletonList(new Node(0, "host1", 1000)), Arrays.asList(new PartitionInfo("topic", 0, (Node) null, (Node[]) null, (Node[]) null)), Collections.emptySet(), Collections.emptySet());
        Cluster cluster3 = new Cluster("dummy", Collections.singletonList(new Node(0, "host1", 1000)), Arrays.asList(new PartitionInfo("topic", 0, (Node) null, (Node[]) null, (Node[]) null), new PartitionInfo("topic", 1, (Node) null, (Node[]) null, (Node[]) null), new PartitionInfo("topic", 2, (Node) null, (Node[]) null, (Node[]) null)), Collections.emptySet(), Collections.emptySet());
        EasyMock.expect(metadata.fetch()).andReturn(cluster).times(4);
        EasyMock.expect(metadata.fetch()).andReturn(cluster2).once();
        EasyMock.expect(metadata.fetch()).andThrow(new IllegalStateException("Unexpected call to metadata.fetch()")).anyTimes();
        PowerMock.replay(new Object[]{metadata});
        kafkaProducer.send(producerRecord);
        PowerMock.verify(new Object[]{metadata});
        PowerMock.reset(new Object[]{metadata});
        EasyMock.expect(metadata.fetch()).andReturn(cluster2).once();
        EasyMock.expect(metadata.fetch()).andThrow(new IllegalStateException("Unexpected call to metadata.fetch()")).anyTimes();
        PowerMock.replay(new Object[]{metadata});
        kafkaProducer.send(producerRecord, (Callback) null);
        PowerMock.verify(new Object[]{metadata});
        PowerMock.reset(new Object[]{metadata});
        EasyMock.expect(metadata.fetch()).andReturn(cluster2).once();
        EasyMock.expect(metadata.fetch()).andReturn(cluster2).once();
        EasyMock.expect(metadata.fetch()).andThrow(new IllegalStateException("Unexpected call to metadata.fetch()")).anyTimes();
        PowerMock.replay(new Object[]{metadata});
        try {
            kafkaProducer.send(producerRecord2, (Callback) null);
            Assert.fail("Expected KafkaException to be raised");
        } catch (KafkaException e) {
        }
        PowerMock.verify(new Object[]{metadata});
        PowerMock.reset(new Object[]{metadata});
        EasyMock.expect(metadata.fetch()).andReturn(cluster2).once();
        EasyMock.expect(metadata.fetch()).andReturn(cluster3).once();
        EasyMock.expect(metadata.fetch()).andThrow(new IllegalStateException("Unexpected call to metadata.fetch()")).anyTimes();
        PowerMock.replay(new Object[]{metadata});
        kafkaProducer.send(producerRecord2, (Callback) null);
        PowerMock.verify(new Object[]{metadata});
    }

    @Test
    public void testTopicRefreshInMetadata() throws Exception {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9999");
        properties.setProperty("max.block.ms", "600000");
        KafkaProducer kafkaProducer = new KafkaProducer(properties, new StringSerializer(), new StringSerializer());
        final Metadata metadata = new Metadata(500L, 60000L, true, true, new ClusterResourceListeners());
        final MockTime mockTime = new MockTime();
        MemberModifier.field(KafkaProducer.class, "metadata").set(kafkaProducer, metadata);
        MemberModifier.field(KafkaProducer.class, "time").set(kafkaProducer, mockTime);
        new Thread() { // from class: org.apache.kafka.clients.producer.KafkaProducerTest.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                long currentTimeMillis = System.currentTimeMillis();
                for (int i = 0; i < 10; i++) {
                    while (!metadata.updateRequested() && System.currentTimeMillis() - currentTimeMillis < 1000) {
                        yield();
                    }
                    metadata.update(Cluster.empty(), Collections.singleton("topic"), mockTime.milliseconds());
                    mockTime.sleep(60000L);
                }
            }
        }.start();
        try {
            kafkaProducer.partitionsFor("topic");
            Assert.fail("Expect TimeoutException");
        } catch (TimeoutException e) {
        }
        Assert.assertTrue("Topic should still exist in metadata", metadata.containsTopic("topic"));
    }

    @PrepareOnlyThisForTest({Metadata.class})
    @Test
    public void testHeaders() throws Exception {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9999");
        ExtendedSerializer extendedSerializer = (ExtendedSerializer) PowerMock.createNiceMock(ExtendedSerializer.class);
        ExtendedSerializer extendedSerializer2 = (ExtendedSerializer) PowerMock.createNiceMock(ExtendedSerializer.class);
        KafkaProducer kafkaProducer = new KafkaProducer(properties, extendedSerializer, extendedSerializer2);
        Metadata metadata = (Metadata) PowerMock.createNiceMock(Metadata.class);
        MemberModifier.field(KafkaProducer.class, "metadata").set(kafkaProducer, metadata);
        Collections.singletonList(new Node(0, "host1", 1000));
        EasyMock.expect(metadata.fetch()).andReturn(new Cluster("dummy", Collections.singletonList(new Node(0, "host1", 1000)), Arrays.asList(new PartitionInfo("topic", 0, (Node) null, (Node[]) null, (Node[]) null)), Collections.emptySet(), Collections.emptySet())).anyTimes();
        PowerMock.replay(new Object[]{metadata});
        ProducerRecord producerRecord = new ProducerRecord("topic", "value");
        EasyMock.expect(extendedSerializer.serialize("topic", producerRecord.headers(), (Object) null)).andReturn((Object) null).once();
        EasyMock.expect(extendedSerializer2.serialize("topic", producerRecord.headers(), "value")).andReturn("value".getBytes()).once();
        PowerMock.replay(new Object[]{extendedSerializer});
        PowerMock.replay(new Object[]{extendedSerializer2});
        producerRecord.headers().add(new RecordHeader("test", "header2".getBytes()));
        kafkaProducer.send(producerRecord, (Callback) null);
        try {
            producerRecord.headers().add(new RecordHeader("test", "test".getBytes()));
            Assert.fail("Expected IllegalStateException to be raised");
        } catch (IllegalStateException e) {
        }
        Assert.assertTrue(Arrays.equals(producerRecord.headers().lastHeader("test").value(), "header2".getBytes()));
        PowerMock.verify(new Object[]{extendedSerializer2});
        PowerMock.verify(new Object[]{extendedSerializer});
    }

    @Test
    public void closeShouldBeIdempotent() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9000");
        KafkaProducer kafkaProducer = new KafkaProducer(properties, new ByteArraySerializer(), new ByteArraySerializer());
        kafkaProducer.close();
        kafkaProducer.close();
    }

    @Test
    public void testMetricConfigRecordingLevel() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9000");
        KafkaProducer kafkaProducer = new KafkaProducer(properties, new ByteArraySerializer(), new ByteArraySerializer());
        Throwable th = null;
        try {
            try {
                Assert.assertEquals(Sensor.RecordingLevel.INFO, kafkaProducer.metrics.config().recordLevel());
                if (kafkaProducer != null) {
                    if (0 != 0) {
                        try {
                            kafkaProducer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        kafkaProducer.close();
                    }
                }
                properties.put("metrics.recording.level", "DEBUG");
                KafkaProducer kafkaProducer2 = new KafkaProducer(properties, new ByteArraySerializer(), new ByteArraySerializer());
                Throwable th3 = null;
                try {
                    Assert.assertEquals(Sensor.RecordingLevel.DEBUG, kafkaProducer2.metrics.config().recordLevel());
                    if (kafkaProducer2 != null) {
                        if (0 == 0) {
                            kafkaProducer2.close();
                            return;
                        }
                        try {
                            kafkaProducer2.close();
                        } catch (Throwable th4) {
                            th3.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    if (kafkaProducer2 != null) {
                        if (0 != 0) {
                            try {
                                kafkaProducer2.close();
                            } catch (Throwable th6) {
                                th3.addSuppressed(th6);
                            }
                        } else {
                            kafkaProducer2.close();
                        }
                    }
                    throw th5;
                }
            } catch (Throwable th7) {
                th = th7;
                throw th7;
            }
        } catch (Throwable th8) {
            if (kafkaProducer != null) {
                if (th != null) {
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    kafkaProducer.close();
                }
            }
            throw th8;
        }
    }

    @PrepareOnlyThisForTest({Metadata.class})
    @Test
    public void testInterceptorPartitionSetOnTooLargeRecord() throws Exception {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9999");
        properties.setProperty("max.request.size", "1");
        ProducerRecord producerRecord = new ProducerRecord("topic", "value");
        KafkaProducer kafkaProducer = new KafkaProducer(properties, new StringSerializer(), new StringSerializer());
        Metadata metadata = (Metadata) PowerMock.createNiceMock(Metadata.class);
        MemberModifier.field(KafkaProducer.class, "metadata").set(kafkaProducer, metadata);
        EasyMock.expect(metadata.fetch()).andReturn(new Cluster("dummy", Collections.singletonList(new Node(0, "host1", 1000)), Arrays.asList(new PartitionInfo("topic", 0, (Node) null, (Node[]) null, (Node[]) null)), Collections.emptySet(), Collections.emptySet())).once();
        ProducerInterceptors producerInterceptors = (ProducerInterceptors) PowerMock.createMock(ProducerInterceptors.class);
        EasyMock.expect(producerInterceptors.onSend(producerRecord)).andReturn(producerRecord);
        producerInterceptors.onSendError((ProducerRecord) EasyMock.eq(producerRecord), (TopicPartition) EasyMock.notNull(), (Exception) EasyMock.notNull());
        EasyMock.expectLastCall();
        MemberModifier.field(KafkaProducer.class, "interceptors").set(kafkaProducer, producerInterceptors);
        PowerMock.replay(new Object[]{metadata});
        EasyMock.replay(new Object[]{producerInterceptors});
        kafkaProducer.send(producerRecord);
        EasyMock.verify(new Object[]{producerInterceptors});
    }

    @Test
    public void testPartitionsForWithNullTopic() {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9000");
        try {
            KafkaProducer kafkaProducer = new KafkaProducer(properties, new ByteArraySerializer(), new ByteArraySerializer());
            Throwable th = null;
            try {
                try {
                    kafkaProducer.partitionsFor((String) null);
                    Assert.fail("Expected NullPointerException to be raised");
                    if (kafkaProducer != null) {
                        if (0 != 0) {
                            try {
                                kafkaProducer.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            kafkaProducer.close();
                        }
                    }
                } finally {
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (NullPointerException e) {
        }
    }

    @Test
    public void testBatchExpiryMs() {
        Throwable th;
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:2002");
        properties.setProperty("request.timeout.ms", "5");
        KafkaProducer kafkaProducer = new KafkaProducer(properties, new StringSerializer(), new StringSerializer());
        Throwable th2 = null;
        try {
            try {
                Assert.assertEquals(5L, kafkaProducer.batchExpiryMs());
                if (kafkaProducer != null) {
                    if (0 != 0) {
                        try {
                            kafkaProducer.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        kafkaProducer.close();
                    }
                }
                properties.setProperty("confluent.batch.expiry.ms", "10");
                kafkaProducer = new KafkaProducer(properties, new StringSerializer(), new StringSerializer());
                th = null;
            } catch (Throwable th4) {
                th2 = th4;
                throw th4;
            }
            try {
                try {
                    Assert.assertEquals(10L, kafkaProducer.batchExpiryMs());
                    if (kafkaProducer != null) {
                        if (0 == 0) {
                            kafkaProducer.close();
                            return;
                        }
                        try {
                            kafkaProducer.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    }
                } catch (Throwable th6) {
                    th = th6;
                    throw th6;
                }
            } finally {
            }
        } finally {
        }
    }
}
