package io.confluent.kafka.schemaregistry.storage;

import io.confluent.kafka.schemaregistry.ClusterTestHarness;
import io.confluent.kafka.schemaregistry.id.IncrementalIdGenerator;
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig;
import io.confluent.kafka.schemaregistry.storage.exceptions.StoreException;
import io.confluent.kafka.schemaregistry.storage.exceptions.StoreInitializationException;
import io.confluent.kafka.schemaregistry.storage.serialization.SchemaRegistrySerializer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigResource;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/kafka/schemaregistry/storage/KafkaStoreTest.class */
public class KafkaStoreTest extends ClusterTestHarness {
    private static final int ADMIN_TIMEOUT_SEC = 60;
    private static final Logger log = LoggerFactory.getLogger(KafkaStoreTest.class);
    private static final TopicPartition tp = new TopicPartition(ClusterTestHarness.KAFKASTORE_TOPIC, 0);

    @Test
    public void testInitialization() throws Exception {
        StoreUtils.createAndInitKafkaStoreInstance(this.brokerList).close();
    }

    @Test
    public void testDoubleInitialization() throws Exception {
        Assertions.assertThrows(StoreInitializationException.class, () -> {
            KafkaStore<String, String> createAndInitKafkaStoreInstance = StoreUtils.createAndInitKafkaStoreInstance(this.brokerList);
            try {
                createAndInitKafkaStoreInstance.init();
            } finally {
                createAndInitKafkaStoreInstance.close();
            }
        });
    }

    @Test
    public void testSimplePut() throws Exception {
        KafkaStore<String, String> createAndInitKafkaStoreInstance = StoreUtils.createAndInitKafkaStoreInstance(this.brokerList);
        try {
            createAndInitKafkaStoreInstance.put("Kafka", "Rocks");
            Assertions.assertEquals("Rocks", (String) createAndInitKafkaStoreInstance.get("Kafka"), "Retrieved value should match entered value");
            createAndInitKafkaStoreInstance.close();
        } catch (Throwable th) {
            createAndInitKafkaStoreInstance.close();
            throw th;
        }
    }

    @Test
    public void testSimpleGetAfterFailure() throws Exception {
        InMemoryCache inMemoryCache = new InMemoryCache(StringSerializer.INSTANCE);
        KafkaStore<String, String> createAndInitKafkaStoreInstance = StoreUtils.createAndInitKafkaStoreInstance(this.brokerList, inMemoryCache);
        try {
            try {
                createAndInitKafkaStoreInstance.put("Kafka", "Rocks");
                try {
                    Assertions.assertEquals("Rocks", (String) createAndInitKafkaStoreInstance.get("Kafka"), "Retrieved value should match entered value");
                    createAndInitKafkaStoreInstance.close();
                    createAndInitKafkaStoreInstance = StoreUtils.createAndInitKafkaStoreInstance(this.brokerList, inMemoryCache);
                    try {
                        try {
                            Assertions.assertEquals("Rocks", (String) createAndInitKafkaStoreInstance.get("Kafka"), "Retrieved value should match entered value");
                            createAndInitKafkaStoreInstance.close();
                        } catch (StoreException e) {
                            throw new RuntimeException("Kafka store get(Kafka) operation failed", e);
                        }
                    } finally {
                    }
                } catch (StoreException e2) {
                    throw new RuntimeException("Kafka store get(Kafka) operation failed", e2);
                }
            } catch (StoreException e3) {
                throw new RuntimeException("Kafka store put(Kafka, Rocks) operation failed", e3);
            }
        } finally {
        }
    }

    @Test
    public void testSimpleDelete() throws Exception {
        KafkaStore<String, String> createAndInitKafkaStoreInstance = StoreUtils.createAndInitKafkaStoreInstance(this.brokerList);
        try {
            try {
                createAndInitKafkaStoreInstance.put("Kafka", "Rocks");
                try {
                    Assertions.assertEquals("Rocks", (String) createAndInitKafkaStoreInstance.get("Kafka"), "Retrieved value should match entered value");
                    try {
                        createAndInitKafkaStoreInstance.delete("Kafka");
                        try {
                            Assertions.assertNull((String) createAndInitKafkaStoreInstance.get("Kafka"), "Value should have been deleted");
                            createAndInitKafkaStoreInstance.close();
                        } catch (StoreException e) {
                            throw new RuntimeException("Kafka store get(Kafka) operation failed", e);
                        }
                    } catch (StoreException e2) {
                        throw new RuntimeException("Kafka store delete(Kafka) operation failed", e2);
                    }
                } catch (StoreException e3) {
                    throw new RuntimeException("Kafka store get(Kafka) operation failed", e3);
                }
            } catch (StoreException e4) {
                throw new RuntimeException("Kafka store put(Kafka, Rocks) operation failed", e4);
            }
        } catch (Throwable th) {
            createAndInitKafkaStoreInstance.close();
            throw th;
        }
    }

    @Test
    public void testDeleteAfterRestart() throws Exception {
        InMemoryCache inMemoryCache = new InMemoryCache(StringSerializer.INSTANCE);
        KafkaStore<String, String> createAndInitKafkaStoreInstance = StoreUtils.createAndInitKafkaStoreInstance(this.brokerList, inMemoryCache);
        try {
            try {
                createAndInitKafkaStoreInstance.put("Kafka", "Rocks");
                try {
                    Assertions.assertEquals("Rocks", (String) createAndInitKafkaStoreInstance.get("Kafka"), "Retrieved value should match entered value");
                    try {
                        createAndInitKafkaStoreInstance.delete("Kafka");
                        try {
                            Assertions.assertNull((String) createAndInitKafkaStoreInstance.get("Kafka"), "Value should have been deleted");
                            createAndInitKafkaStoreInstance.close();
                            createAndInitKafkaStoreInstance = StoreUtils.createAndInitKafkaStoreInstance(this.brokerList, inMemoryCache);
                            try {
                                Assertions.assertNull((String) createAndInitKafkaStoreInstance.get("Kafka"), "Value should have been deleted");
                                createAndInitKafkaStoreInstance.close();
                            } catch (StoreException e) {
                                throw new RuntimeException("Kafka store get(Kafka) operation failed", e);
                            }
                        } catch (StoreException e2) {
                            throw new RuntimeException("Kafka store get(Kafka) operation failed", e2);
                        }
                    } catch (StoreException e3) {
                        throw new RuntimeException("Kafka store delete(Kafka) operation failed", e3);
                    }
                } catch (StoreException e4) {
                    throw new RuntimeException("Kafka store get(Kafka) operation failed", e4);
                }
            } catch (StoreException e5) {
                throw new RuntimeException("Kafka store put(Kafka, Rocks) operation failed", e5);
            }
        } catch (Throwable th) {
            createAndInitKafkaStoreInstance.close();
            throw th;
        }
    }

    @Test
    public void testCustomGroupIdConfig() throws Exception {
        InMemoryCache inMemoryCache = new InMemoryCache(StringSerializer.INSTANCE);
        Properties properties = new Properties();
        properties.put("kafkastore.group.id", "test-group-id");
        Assertions.assertEquals(StoreUtils.createAndInitKafkaStoreInstance(this.brokerList, inMemoryCache, properties).getKafkaStoreReaderThread().getConsumerProperty("group.id"), "test-group-id");
    }

    @Test
    public void testDefaultGroupIdConfig() throws Exception {
        Assertions.assertTrue(StoreUtils.createAndInitKafkaStoreInstance(this.brokerList, new InMemoryCache(StringSerializer.INSTANCE), new Properties()).getKafkaStoreReaderThread().getConsumerProperty("group.id").startsWith("schema-registry-"));
    }

    @Test
    public void testMandatoryCompactionPolicy() throws Exception {
        Assertions.assertThrows(StoreInitializationException.class, () -> {
            Properties properties = new Properties();
            HashMap hashMap = new HashMap();
            hashMap.put("cleanup.policy", "delete");
            NewTopic newTopic = new NewTopic(ClusterTestHarness.KAFKASTORE_TOPIC, 1, (short) 1);
            newTopic.configs(hashMap);
            Properties properties2 = new Properties();
            properties2.put("bootstrap.servers", this.brokerList);
            AdminClient create = AdminClient.create(properties2);
            try {
                create.createTopics(Collections.singletonList(newTopic)).all().get(60L, TimeUnit.SECONDS);
                if (create != null) {
                    create.close();
                }
                StoreUtils.createAndInitKafkaStoreInstance(this.brokerList, new InMemoryCache(StringSerializer.INSTANCE), properties);
            } catch (Throwable th) {
                if (create != null) {
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        });
    }

    @Test
    public void testTooManyPartitions() throws Exception {
        Assertions.assertThrows(StoreInitializationException.class, () -> {
            Properties properties = new Properties();
            HashMap hashMap = new HashMap();
            hashMap.put("cleanup.policy", "compact");
            NewTopic newTopic = new NewTopic(ClusterTestHarness.KAFKASTORE_TOPIC, 3, (short) 1);
            newTopic.configs(hashMap);
            Properties properties2 = new Properties();
            properties2.put("bootstrap.servers", this.brokerList);
            AdminClient create = AdminClient.create(properties2);
            try {
                create.createTopics(Collections.singletonList(newTopic)).all().get(60L, TimeUnit.SECONDS);
                if (create != null) {
                    create.close();
                }
                StoreUtils.createAndInitKafkaStoreInstance(this.brokerList, new InMemoryCache(StringSerializer.INSTANCE), properties);
            } catch (Throwable th) {
                if (create != null) {
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        });
    }

    @Test
    public void testTopicAdditionalConfigs() throws Exception {
        Properties properties = new Properties();
        properties.put("kafkastore.topic.config.delete.retention.ms", "10000");
        properties.put("kafkastore.topic.config.segment.ms", "10000");
        StoreUtils.createAndInitKafkaStoreInstance(this.brokerList, new InMemoryCache(StringSerializer.INSTANCE), properties);
        Properties properties2 = new Properties();
        properties2.put("bootstrap.servers", this.brokerList);
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, ClusterTestHarness.KAFKASTORE_TOPIC);
        AdminClient create = AdminClient.create(properties2);
        try {
            Map map = (Map) create.describeConfigs(Collections.singleton(configResource)).all().get(60L, TimeUnit.SECONDS);
            if (create != null) {
                create.close();
            }
            Config config = (Config) map.get(configResource);
            Assertions.assertNotNull(config.get("delete.retention.ms"));
            Assertions.assertEquals("10000", config.get("delete.retention.ms").value());
            Assertions.assertNotNull(config.get("segment.ms"));
            Assertions.assertEquals("10000", config.get("segment.ms").value());
        } catch (Throwable th) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testGetAlwaysTrueHostnameVerifierWhenSslEndpointIdentificationAlgorithmIsNotSet() throws Exception {
        Properties properties = new Properties();
        properties.put("kafkastore.bootstrap.servers", this.brokerList);
        properties.put("kafkastore.topic", ClusterTestHarness.KAFKASTORE_TOPIC);
        Assertions.assertTrue(new KafkaSchemaRegistry(new SchemaRegistryConfig(properties), new SchemaRegistrySerializer()).getHostnameVerifier().verify("", null));
    }

    @Test
    public void testGetAlwaysTrueHostnameVerifierWhenSslEndpointIdentificationAlgorithmIsNone() throws Exception {
        Properties properties = new Properties();
        properties.put("kafkastore.bootstrap.servers", this.brokerList);
        properties.put("kafkastore.topic", ClusterTestHarness.KAFKASTORE_TOPIC);
        properties.put("ssl.endpoint.identification.algorithm", "none");
        Assertions.assertTrue(new KafkaSchemaRegistry(new SchemaRegistryConfig(properties), new SchemaRegistrySerializer()).getHostnameVerifier().verify("", null));
    }

    @Test
    public void testGetAlwaysTrueHostnameVerifierWhenSslEndpointIdentificationAlgorithmIsEmptyString() throws Exception {
        Properties properties = new Properties();
        properties.put("kafkastore.bootstrap.servers", this.brokerList);
        properties.put("kafkastore.topic", ClusterTestHarness.KAFKASTORE_TOPIC);
        properties.put("ssl.endpoint.identification.algorithm", "");
        Assertions.assertTrue(new KafkaSchemaRegistry(new SchemaRegistryConfig(properties), new SchemaRegistrySerializer()).getHostnameVerifier().verify("", null));
    }

    @Test
    public void testGetNullHostnameVerifierWhenSslEndpointIdentificationAlgorithmIsHttps() throws Exception {
        Properties properties = new Properties();
        properties.put("kafkastore.bootstrap.servers", this.brokerList);
        properties.put("kafkastore.topic", ClusterTestHarness.KAFKASTORE_TOPIC);
        properties.put("ssl.endpoint.identification.algorithm", "https");
        Assertions.assertNull(new KafkaSchemaRegistry(new SchemaRegistryConfig(properties), new SchemaRegistrySerializer()).getHostnameVerifier());
    }

    @Test
    public void testKafkaStoreMessageHandlerSameIdDifferentSchema() throws Exception {
        Properties properties = new Properties();
        properties.put("kafkastore.bootstrap.servers", this.brokerList);
        properties.put("kafkastore.topic", ClusterTestHarness.KAFKASTORE_TOPIC);
        KafkaStore kafkaStore = new KafkaSchemaRegistry(new SchemaRegistryConfig(properties), new SchemaRegistrySerializer()).kafkaStore;
        kafkaStore.init();
        kafkaStore.put(new SchemaKey("subject", 1), new SchemaValue("subject", 1, 100, "schemaString", false));
        kafkaStore.put(new SchemaKey("subject2", 1), new SchemaValue("subject2", 1, 100, "schemaString2", false));
        int i = 0;
        CloseableIterator allKeys = kafkaStore.getAllKeys();
        while (allKeys.hasNext()) {
            try {
                i++;
                allKeys.next();
            } catch (Throwable th) {
                if (allKeys != null) {
                    try {
                        allKeys.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        if (allKeys != null) {
            allKeys.close();
        }
        Assertions.assertEquals(1, i);
    }

    @Test
    public void testKafkaStoreMessageHandlerSameIdSameSchema() throws Exception {
        Properties properties = new Properties();
        properties.put("kafkastore.bootstrap.servers", this.brokerList);
        properties.put("kafkastore.topic", ClusterTestHarness.KAFKASTORE_TOPIC);
        KafkaStore kafkaStore = new KafkaSchemaRegistry(new SchemaRegistryConfig(properties), new SchemaRegistrySerializer()).kafkaStore;
        kafkaStore.init();
        kafkaStore.put(new SchemaKey("subject", 1), new SchemaValue("subject", 1, 100, "schemaString", false));
        kafkaStore.put(new SchemaKey("subject2", 1), new SchemaValue("subject2", 1, 100, "schemaString", false));
        int i = 0;
        CloseableIterator allKeys = kafkaStore.getAllKeys();
        while (allKeys.hasNext()) {
            try {
                i++;
                allKeys.next();
            } catch (Throwable th) {
                if (allKeys != null) {
                    try {
                        allKeys.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        if (allKeys != null) {
            allKeys.close();
        }
        Assertions.assertEquals(2, i);
    }

    @Test
    public void testKafkaStoreMessageHandlerSameIdDifferentDeletedSchema() throws Exception {
        Properties properties = new Properties();
        properties.put("kafkastore.bootstrap.servers", this.brokerList);
        properties.put("kafkastore.topic", ClusterTestHarness.KAFKASTORE_TOPIC);
        KafkaStore kafkaStore = new KafkaSchemaRegistry(new SchemaRegistryConfig(properties), new SchemaRegistrySerializer()).kafkaStore;
        kafkaStore.init();
        kafkaStore.put(new SchemaKey("subject", 1), new SchemaValue("subject", 1, 100, "schemaString", false));
        kafkaStore.put(new SchemaKey("subject", 1), new SchemaValue("subject", 1, 100, "schemaString", true));
        kafkaStore.put(new SchemaKey("subject2", 1), new SchemaValue("subject2", 1, 100, "schemaString2", false));
        int i = 0;
        CloseableIterator allKeys = kafkaStore.getAllKeys();
        while (allKeys.hasNext()) {
            try {
                i++;
                allKeys.next();
            } catch (Throwable th) {
                if (allKeys != null) {
                    try {
                        allKeys.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        if (allKeys != null) {
            allKeys.close();
        }
        Assertions.assertEquals(1, i);
    }

    @Test
    public void testKafkaStoreMessageHandlerSameIdSameDeletedSchema() throws Exception {
        Properties properties = new Properties();
        properties.put("kafkastore.bootstrap.servers", this.brokerList);
        properties.put("kafkastore.topic", ClusterTestHarness.KAFKASTORE_TOPIC);
        KafkaStore kafkaStore = new KafkaSchemaRegistry(new SchemaRegistryConfig(properties), new SchemaRegistrySerializer()).kafkaStore;
        kafkaStore.init();
        kafkaStore.put(new SchemaKey("subject", 1), new SchemaValue("subject", 1, 100, "schemaString", false));
        kafkaStore.put(new SchemaKey("subject", 1), new SchemaValue("subject", 1, 100, "schemaString", true));
        kafkaStore.put(new SchemaKey("subject2", 1), new SchemaValue("subject2", 1, 100, "schemaString", false));
        int i = 0;
        CloseableIterator allKeys = kafkaStore.getAllKeys();
        while (allKeys.hasNext()) {
            try {
                i++;
                allKeys.next();
            } catch (Throwable th) {
                if (allKeys != null) {
                    try {
                        allKeys.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        if (allKeys != null) {
            allKeys.close();
        }
        Assertions.assertEquals(2, i);
    }

    @Test
    public void testKafkaStoreMessageHandlerDeleteSubjectKeyNullValue() throws Exception {
        Properties properties = new Properties();
        properties.put("kafkastore.bootstrap.servers", this.brokerList);
        properties.put("kafkastore.topic", ClusterTestHarness.KAFKASTORE_TOPIC);
        KafkaSchemaRegistry kafkaSchemaRegistry = new KafkaSchemaRegistry(new SchemaRegistryConfig(properties), new SchemaRegistrySerializer());
        InMemoryCache inMemoryCache = new InMemoryCache(new SchemaRegistrySerializer());
        inMemoryCache.init();
        KafkaStoreMessageHandler kafkaStoreMessageHandler = new KafkaStoreMessageHandler(kafkaSchemaRegistry, inMemoryCache, new IncrementalIdGenerator(kafkaSchemaRegistry));
        kafkaStoreMessageHandler.handleUpdate(new DeleteSubjectKey("test"), (SchemaRegistryValue) null, (SchemaRegistryValue) null, tp, 0L, 0L);
        Assertions.assertEquals(1L, (Long) kafkaStoreMessageHandler.checkpoint(1).get(tp));
    }

    @Test
    public void testKafkaStoreMessageHandlerClearSubjectKeyNullValue() throws Exception {
        Properties properties = new Properties();
        properties.put("kafkastore.bootstrap.servers", this.brokerList);
        properties.put("kafkastore.topic", ClusterTestHarness.KAFKASTORE_TOPIC);
        KafkaSchemaRegistry kafkaSchemaRegistry = new KafkaSchemaRegistry(new SchemaRegistryConfig(properties), new SchemaRegistrySerializer());
        InMemoryCache inMemoryCache = new InMemoryCache(new SchemaRegistrySerializer());
        inMemoryCache.init();
        KafkaStoreMessageHandler kafkaStoreMessageHandler = new KafkaStoreMessageHandler(kafkaSchemaRegistry, inMemoryCache, new IncrementalIdGenerator(kafkaSchemaRegistry));
        kafkaStoreMessageHandler.handleUpdate(new ClearSubjectKey("test"), (SchemaRegistryValue) null, (SchemaRegistryValue) null, tp, 0L, 0L);
        Assertions.assertEquals(1L, (Long) kafkaStoreMessageHandler.checkpoint(1).get(tp));
    }
}
