diff --git a/buildSrc/src/main/groovy/com.flipkart.varadhi.java-common-conventions.gradle b/buildSrc/src/main/groovy/com.flipkart.varadhi.java-common-conventions.gradle index 749bd7fc..b583b968 100644 --- a/buildSrc/src/main/groovy/com.flipkart.varadhi.java-common-conventions.gradle +++ b/buildSrc/src/main/groovy/com.flipkart.varadhi.java-common-conventions.gradle @@ -3,6 +3,7 @@ import org.gradle.api.tasks.testing.logging.TestExceptionFormat plugins { id 'idea' id 'java' + id 'jacoco' } group = projectGroupId @@ -33,12 +34,6 @@ sourceSets { } } -tasks.register('testE2E', Test) { - testClassesDirs = sourceSets.testE2E.output.classesDirs - classpath = sourceSets.testE2E.runtimeClasspath - outputs.upToDateWhen { false } //this will force testE2E to execute always on target invocation. -} - configurations { testE2EImplementation.extendsFrom testImplementation testE2ERuntime.extendsFrom testRuntime @@ -79,6 +74,7 @@ dependencies { implementation("io.micrometer:micrometer-registry-jmx:$micrometer_version") testImplementation('org.junit.jupiter:junit-jupiter:5.9.1') + testImplementation('org.mockito:mockito-junit-jupiter:5.4.0') testImplementation("io.vertx:vertx-junit5:$vertx_version") testE2EImplementation("javax.ws.rs:javax.ws.rs-api:$ws_rs_version") @@ -96,6 +92,7 @@ dependencies { // common test framework testImplementation('org.junit.jupiter:junit-jupiter') + testImplementation('org.mockito:mockito-junit-jupiter') } java { @@ -104,19 +101,32 @@ java { } } +tasks.register('testE2E', Test) { + testClassesDirs = sourceSets.testE2E.output.classesDirs + classpath = sourceSets.testE2E.runtimeClasspath + outputs.upToDateWhen { false } //this will force testE2E to execute always on target invocation. +} + +tasks.register('copyDependencies', Copy) { + into layout.buildDirectory.dir('dependencies') + from configurations.runtimeClasspath +} + tasks.withType(JavaCompile).configureEach { options.release.set(17) } - tasks.withType(JavaCompile).configureEach { options.compilerArgs.add("-parameters") } - -tasks.register('copyDependencies', Copy) { - into layout.buildDirectory.dir('dependencies') - from configurations.runtimeClasspath +tasks.withType(JacocoReport).configureEach { + reports { + html.required = true + } + getSourceDirectories().from(sourceSets.main.allSource.srcDirs) + getClassDirectories().from(sourceSets.main.output) + getExecutionData().from(layout.buildDirectory.files().findAll { it.name.endsWith('.exec') }) } tasks.withType(Test).configureEach { diff --git a/common/src/main/java/com/flipkart/varadhi/utils/YamlLoader.java b/common/src/main/java/com/flipkart/varadhi/utils/YamlLoader.java index 7058a9a3..bfe73238 100644 --- a/common/src/main/java/com/flipkart/varadhi/utils/YamlLoader.java +++ b/common/src/main/java/com/flipkart/varadhi/utils/YamlLoader.java @@ -3,7 +3,6 @@ import com.flipkart.varadhi.exceptions.VaradhiException; import org.yaml.snakeyaml.Yaml; -import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; @@ -12,7 +11,7 @@ public class YamlLoader { public static T loadConfig(String configFile, Class clazz) { try { return new Yaml().loadAs(Files.readString(Path.of(configFile)), clazz); - } catch (IOException e) { + } catch (Exception e) { throw new VaradhiException(String.format("Failed to load config file: %s as %s.", configFile, clazz), e); } } diff --git a/common/src/test/java/com/flipkart/varadhi/utils/JsonMapperTest.java b/common/src/test/java/com/flipkart/varadhi/utils/JsonMapperTest.java index d1816227..f9812369 100644 --- a/common/src/test/java/com/flipkart/varadhi/utils/JsonMapperTest.java +++ b/common/src/test/java/com/flipkart/varadhi/utils/JsonMapperTest.java @@ -2,8 +2,10 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.exc.InvalidDefinitionException; import com.fasterxml.jackson.databind.jsontype.NamedType; import com.flipkart.varadhi.exceptions.VaradhiException; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -39,6 +41,13 @@ public void testJsonDeserialize_InvalidPolymorphicData() { assertTrue(exception.getMessage().contains("Could not resolve type id 'InvalidType'")); } + @Test + public void testJsonSerializeFailure() { + CantSerialize obj = new CantSerialize(); + Exception exception = assertThrows(VaradhiException.class, () -> JsonMapper.jsonSerialize(obj)); + Assertions.assertEquals(InvalidDefinitionException.class, exception.getCause().getClass()); + } + @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@vehicleType") private abstract static class Vehicle { private String manufacturer; @@ -57,4 +66,12 @@ public Car(String manufacturer, String model, int year) { setManufacturer(manufacturer); } } + + private static class CantSerialize { + private int foobar; + + private void CantSerialize() { + + } + } } diff --git a/common/src/test/java/com/flipkart/varadhi/utils/YamlLoaderTest.java b/common/src/test/java/com/flipkart/varadhi/utils/YamlLoaderTest.java new file mode 100644 index 00000000..63ac1787 --- /dev/null +++ b/common/src/test/java/com/flipkart/varadhi/utils/YamlLoaderTest.java @@ -0,0 +1,69 @@ +package com.flipkart.varadhi.utils; + + +import com.flipkart.varadhi.exceptions.VaradhiException; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; + +public class YamlLoaderTest { + + @TempDir + Path tempDir; + + @Test + public void testLoadConfig_ValidFile() throws IOException { + // Create a temporary YAML config file + Path configFile = tempDir.resolve("config.yaml"); + String yamlContent = "message: Hello, World!"; + Files.write(configFile, yamlContent.getBytes()); + + // Load the config using YamlLoader + Config config = YamlLoader.loadConfig(configFile.toString(), Config.class); + + // Verify the loaded config + Assertions.assertNotNull(config); + Assertions.assertEquals("Hello, World!", config.getMessage()); + } + + @Test + public void testLoadConfig_InvalidFile() { + // Non-existent file path + String configFile = "nonexistent.yaml"; + + // Verify that VaradhiException is thrown when loading the config + Assertions.assertThrows(VaradhiException.class, () -> { + YamlLoader.loadConfig(configFile, Config.class); + }); + } + + @Test + public void testLoadConfig_InvalidYamlContent() throws IOException { + // Create a temporary YAML config file with invalid content + Path configFile = tempDir.resolve("config.yaml"); + String yamlContent = "invalid_yaml_content"; + Files.write(configFile, yamlContent.getBytes()); + + // Verify that VaradhiException is thrown when loading the config + Assertions.assertThrows(VaradhiException.class, () -> { + YamlLoader.loadConfig(configFile.toString(), Config.class); + }); + } + + // Sample config class for testing + public static class Config { + private String message; + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + } +} diff --git a/core/build.gradle b/core/build.gradle index 86a8f1f6..6190d069 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -6,4 +6,5 @@ dependencies { implementation(project(':entities')) implementation(project(':common')) implementation('com.fasterxml.jackson.core:jackson-databind') + testImplementation(project(':pulsar')) } diff --git a/core/src/test/java/com/flipkart/varadhi/services/VaradhiTopicServiceTest.java b/core/src/test/java/com/flipkart/varadhi/services/VaradhiTopicServiceTest.java new file mode 100644 index 00000000..0e9cdf4f --- /dev/null +++ b/core/src/test/java/com/flipkart/varadhi/services/VaradhiTopicServiceTest.java @@ -0,0 +1,70 @@ +package com.flipkart.varadhi.services; + +import com.flipkart.varadhi.db.MetaStore; +import com.flipkart.varadhi.entities.*; +import com.flipkart.varadhi.exceptions.VaradhiException; +import com.flipkart.varadhi.pulsar.entities.PulsarStorageTopic; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.mockito.Mockito.*; + +public class VaradhiTopicServiceTest { + + StorageTopicFactory storageTopicFactory; + private VaradhiTopicFactory varadhiTopicFactory; + private StorageTopicService storageTopicService; + private MetaStore metaStore; + private VaradhiTopicService varadhiTopicService; + + @BeforeEach + public void setUp() { + storageTopicService = mock(StorageTopicService.class); + metaStore = mock(MetaStore.class); + storageTopicFactory = mock(StorageTopicFactory.class); + varadhiTopicFactory = spy(new VaradhiTopicFactory(storageTopicFactory)); + varadhiTopicService = new VaradhiTopicService(storageTopicService, metaStore); + //TODO::check it, not a circular dependency though + PulsarStorageTopic pTopic = new PulsarStorageTopic("public.default.testTopic.Main.local", 1); + doReturn(pTopic).when(storageTopicFactory).getTopic("public.default.testTopic.Main.local", null); + } + + @Test + public void createVaradhiTopic() { + TopicResource topicResource = new TopicResource("testTopic", 1, "testProject", true, false, null); + VaradhiTopic varadhiTopic = varadhiTopicFactory.get(topicResource); + varadhiTopicService.create(varadhiTopic); + verify(metaStore, times(1)).createVaradhiTopic(varadhiTopic); + StorageTopic st = varadhiTopic.getInternalTopics().get(InternalTopic.TopicKind.Main).getStorageTopic(); + verify(storageTopicService, times(1)).create(st); + verify(storageTopicFactory, times(1)).getTopic(st.getName(), null); + } + + @Test + public void createVaradhiTopicWhenMetaStoreFails() { + TopicResource topicResource = new TopicResource("testTopic", 1, "testProject", true, false, null); + VaradhiTopic varadhiTopic = varadhiTopicFactory.get(topicResource); + StorageTopic st = varadhiTopic.getInternalTopics().get(InternalTopic.TopicKind.Main).getStorageTopic(); + doThrow(new VaradhiException("Some error")).when(metaStore).createVaradhiTopic(varadhiTopic); + Exception exception = + Assertions.assertThrows(VaradhiException.class, () -> varadhiTopicService.create(varadhiTopic)); + verify(metaStore, times(1)).createVaradhiTopic(varadhiTopic); + verify(storageTopicService, never()).create(st); + Assertions.assertEquals(exception.getClass(), VaradhiException.class); + } + + @Test + public void createVaradhiTopicWhenStorageTopicServiceFails() { + TopicResource topicResource = new TopicResource("testTopic", 1, "testProject", true, false, null); + VaradhiTopic varadhiTopic = varadhiTopicFactory.get(topicResource); + StorageTopic st = varadhiTopic.getInternalTopics().get(InternalTopic.TopicKind.Main).getStorageTopic(); + doThrow(new VaradhiException("Some error")).when(storageTopicService).create(st); + Exception exception = + Assertions.assertThrows(VaradhiException.class, () -> varadhiTopicService.create(varadhiTopic)); + verify(metaStore, times(1)).createVaradhiTopic(varadhiTopic); + verify(storageTopicService, times(1)).create(st); + Assertions.assertEquals(exception.getClass(), VaradhiException.class); + } +} + diff --git a/entities/build.gradle b/entities/build.gradle index 387672bb..5113d44c 100644 --- a/entities/build.gradle +++ b/entities/build.gradle @@ -4,4 +4,5 @@ plugins { dependencies { implementation(project(':common')) implementation('com.fasterxml.jackson.core:jackson-databind:2.14.2') + testImplementation(project(':pulsar')) } diff --git a/entities/src/test/java/com/flipkart/varadhi/entities/VaradhiTopicFactoryTest.java b/entities/src/test/java/com/flipkart/varadhi/entities/VaradhiTopicFactoryTest.java new file mode 100644 index 00000000..61cd6e78 --- /dev/null +++ b/entities/src/test/java/com/flipkart/varadhi/entities/VaradhiTopicFactoryTest.java @@ -0,0 +1,37 @@ +package com.flipkart.varadhi.entities; + +import com.flipkart.varadhi.pulsar.entities.PulsarStorageTopic; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.mockito.Mockito.*; + +public class VaradhiTopicFactoryTest { + private VaradhiTopicFactory varadhiTopicFactory; + private StorageTopicFactory storageTopicFactory; + + @BeforeEach + public void setUp() { + storageTopicFactory = mock(StorageTopicFactory.class); + varadhiTopicFactory = new VaradhiTopicFactory(storageTopicFactory); + //TODO::check it, not a circular dependency though + PulsarStorageTopic pTopic = new PulsarStorageTopic("public.default.testTopic.Main.local", 1); + doReturn(pTopic).when(storageTopicFactory).getTopic("public.default.testTopic.Main.local", null); + } + + @Test + public void getTopic() { + TopicResource topicResource = new TopicResource("testTopic", 1, "testProject", true, false, null); + VaradhiTopic varadhiTopic = varadhiTopicFactory.get(topicResource); + Assertions.assertNotNull(varadhiTopic); + Assertions.assertEquals(1, varadhiTopic.getInternalTopics().size()); + InternalTopic it = varadhiTopic.getInternalTopics().get(InternalTopic.TopicKind.Main); + StorageTopic st = it.getStorageTopic(); + Assertions.assertEquals(it.getStatus(), InternalTopic.ProduceStatus.Active); + Assertions.assertEquals(it.getRegion(), "local"); + Assertions.assertNull(it.getSourceRegion()); + Assertions.assertNotNull(st); + verify(storageTopicFactory, times(1)).getTopic("public.default.testTopic.Main.local", null); + } +} diff --git a/lombok.config b/lombok.config new file mode 100644 index 00000000..e5340441 --- /dev/null +++ b/lombok.config @@ -0,0 +1,7 @@ +# This tells lombok this directory is the root, +# no need to look somewhere else for java code. +config.stopBubbling = true +# This will add the @lombok.Generated annotation +# to all the code generated by Lombok, +# so it can be excluded from coverage by jacoco. +lombok.addLombokGeneratedAnnotation = true diff --git a/pulsar/src/main/java/com/flipkart/varadhi/pulsar/PulsarStackProvider.java b/pulsar/src/main/java/com/flipkart/varadhi/pulsar/PulsarStackProvider.java index 75a75068..d428a843 100644 --- a/pulsar/src/main/java/com/flipkart/varadhi/pulsar/PulsarStackProvider.java +++ b/pulsar/src/main/java/com/flipkart/varadhi/pulsar/PulsarStackProvider.java @@ -5,6 +5,7 @@ import com.flipkart.varadhi.entities.StorageTopic; import com.flipkart.varadhi.entities.StorageTopicFactory; import com.flipkart.varadhi.exceptions.InvalidStateException; +import com.flipkart.varadhi.pulsar.config.PulsarClientOptions; import com.flipkart.varadhi.pulsar.config.PulsarConfig; import com.flipkart.varadhi.pulsar.entities.PulsarStorageTopic; import com.flipkart.varadhi.pulsar.entities.PulsarTopicFactory; @@ -13,12 +14,17 @@ import com.flipkart.varadhi.services.MessagingStackProvider; import com.flipkart.varadhi.services.StorageTopicService; import com.flipkart.varadhi.utils.YamlLoader; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.PulsarClientException; + +import java.util.concurrent.TimeUnit; public class PulsarStackProvider implements MessagingStackProvider { private PulsarTopicService pulsarTopicService; private PulsarTopicFactory pulsarTopicFactory; private volatile boolean initialised = false; + private TimeUnit timeUnit = TimeUnit.MILLISECONDS; public void init(MessagingStackOptions messagingStackOptions, ObjectMapper mapper) { if (!initialised) { @@ -27,7 +33,8 @@ public void init(MessagingStackOptions messagingStackOptions, ObjectMapper mappe PulsarConfig pulsarConfig = YamlLoader.loadConfig(messagingStackOptions.getConfigFile(), PulsarConfig.class); pulsarTopicFactory = new PulsarTopicFactory(); - pulsarTopicService = new PulsarTopicService(pulsarConfig.getPulsarClientOptions()); + PulsarAdmin pulsarAdmin = getPulsarAdminClient(pulsarConfig.getPulsarClientOptions()); + pulsarTopicService = new PulsarTopicService(pulsarAdmin); registerSubtypes(mapper); initialised = true; } @@ -52,4 +59,18 @@ public StorageTopicService getStorageTopicService() private void registerSubtypes(ObjectMapper mapper) { mapper.registerSubtypes(new NamedType(PulsarStorageTopic.class, "Pulsar")); } + + PulsarAdmin getPulsarAdminClient(PulsarClientOptions pulsarClientOptions) { + try { + //TODO::Add authentication to the pulsar clients. It should be optional however. + return PulsarAdmin.builder() + .serviceHttpUrl(pulsarClientOptions.getPulsarUrl()) + .connectionTimeout(pulsarClientOptions.getConnectTimeout(), timeUnit) + .requestTimeout(pulsarClientOptions.getRequestTimeout(), timeUnit) + .readTimeout(pulsarClientOptions.getReadTimeout(), timeUnit) + .build(); + } catch (PulsarClientException e) { + throw new RuntimeException(e); + } + } } diff --git a/pulsar/src/main/java/com/flipkart/varadhi/pulsar/clients/AdminClient.java b/pulsar/src/main/java/com/flipkart/varadhi/pulsar/clients/AdminClient.java index ebda595a..b39809b5 100644 --- a/pulsar/src/main/java/com/flipkart/varadhi/pulsar/clients/AdminClient.java +++ b/pulsar/src/main/java/com/flipkart/varadhi/pulsar/clients/AdminClient.java @@ -1,54 +1,30 @@ package com.flipkart.varadhi.pulsar.clients; import com.flipkart.varadhi.exceptions.VaradhiException; -import com.flipkart.varadhi.pulsar.config.PulsarClientOptions; import com.flipkart.varadhi.pulsar.entities.PulsarStorageTopic; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; -import org.apache.pulsar.client.api.PulsarClientException; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.TimeUnit; @Slf4j public class AdminClient { - PulsarAdmin admin; - TimeUnit tu = TimeUnit.MILLISECONDS; + private final PulsarAdmin pulsarAdmin; - //TODO::tenant and namespace should be assigned when creating PulsarStorageTopic. - String tenant = "public"; - String namespace = "default"; - String schema = "persistent"; - - public AdminClient(PulsarClientOptions pulsarClientOptions) { - try { - //TODO::Add authentication to the pulsar clients. It should be optional however. - this.admin = PulsarAdmin.builder() - .serviceHttpUrl(pulsarClientOptions.getPulsarUrl()) - .connectionTimeout(pulsarClientOptions.getConnectTimeout(), tu) - .requestTimeout(pulsarClientOptions.getRequestTimeout(), tu) - .readTimeout(pulsarClientOptions.getReadTimeout(), tu) - .build(); - } catch (PulsarClientException e) { - throw new RuntimeException(e); - } + public AdminClient(PulsarAdmin pulsarAdmin) { + this.pulsarAdmin = pulsarAdmin; } public void create(PulsarStorageTopic topic) { try { Map properties = new HashMap<>(); - //TODO::see properties are needed, if any. - String topicFqdn = getTopicFqdn(topic); - admin.topics().createPartitionedTopic(topicFqdn, topic.getPartitionCount(), properties); - log.info("Created the pulsar topic:{}", topicFqdn); + //TODO::see properties are needed, if any.; + pulsarAdmin.topics().createPartitionedTopic(topic.getFqdn(), topic.getPartitionCount(), properties); + log.info("Created the pulsar topic:{}", topic.getFqdn()); } catch (PulsarAdminException e) { throw new VaradhiException(e); } } - - private String getTopicFqdn(PulsarStorageTopic topic) { - return String.format("%s://%s/%s/%s", schema, tenant, namespace, topic.getName()); - } } diff --git a/pulsar/src/main/java/com/flipkart/varadhi/pulsar/entities/PulsarStorageTopic.java b/pulsar/src/main/java/com/flipkart/varadhi/pulsar/entities/PulsarStorageTopic.java index 88d5780b..293d6f6a 100644 --- a/pulsar/src/main/java/com/flipkart/varadhi/pulsar/entities/PulsarStorageTopic.java +++ b/pulsar/src/main/java/com/flipkart/varadhi/pulsar/entities/PulsarStorageTopic.java @@ -12,6 +12,9 @@ @EqualsAndHashCode(callSuper = true) @FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE) public class PulsarStorageTopic extends StorageTopic { + private static final String DEFAULT_TENANT = "public"; + private static final String DEFAULT_NAMESPACE = "default"; + private static final String TOPIC_SCHEMA = "persistent"; int partitionCount; String namespace; @@ -30,4 +33,9 @@ private static int getPartitionCount(CapacityPolicy capacityPolicy) { //This should be based on capacity planner for the underlying messaging stack. return 1; } + + public String getFqdn() { + //TODO::tenant and namespace should be assigned when creating PulsarStorageTopic. + return String.format("%s://%s/%s/%s", TOPIC_SCHEMA, DEFAULT_TENANT, DEFAULT_NAMESPACE, getName()); + } } diff --git a/pulsar/src/main/java/com/flipkart/varadhi/pulsar/services/PulsarTopicService.java b/pulsar/src/main/java/com/flipkart/varadhi/pulsar/services/PulsarTopicService.java index b7ab2391..a84c3688 100644 --- a/pulsar/src/main/java/com/flipkart/varadhi/pulsar/services/PulsarTopicService.java +++ b/pulsar/src/main/java/com/flipkart/varadhi/pulsar/services/PulsarTopicService.java @@ -1,18 +1,18 @@ package com.flipkart.varadhi.pulsar.services; import com.flipkart.varadhi.pulsar.clients.AdminClient; -import com.flipkart.varadhi.pulsar.config.PulsarClientOptions; import com.flipkart.varadhi.pulsar.entities.PulsarStorageTopic; import com.flipkart.varadhi.services.StorageTopicService; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.admin.PulsarAdmin; @Slf4j public class PulsarTopicService extends StorageTopicService { AdminClient adminClient; - public PulsarTopicService(PulsarClientOptions pulsarClientOptions) { - adminClient = new AdminClient(pulsarClientOptions); + public PulsarTopicService(PulsarAdmin pulsarAdmin) { + adminClient = new AdminClient(pulsarAdmin); } public void create(PulsarStorageTopic topic) { diff --git a/pulsar/src/test/java/com/flipkart/varadhi/pulsar/PulsarStackProviderTest.java b/pulsar/src/test/java/com/flipkart/varadhi/pulsar/PulsarStackProviderTest.java new file mode 100644 index 00000000..dff11d2a --- /dev/null +++ b/pulsar/src/test/java/com/flipkart/varadhi/pulsar/PulsarStackProviderTest.java @@ -0,0 +1,102 @@ +package com.flipkart.varadhi.pulsar; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.flipkart.varadhi.entities.StorageTopicFactory; +import com.flipkart.varadhi.exceptions.InvalidStateException; +import com.flipkart.varadhi.pulsar.config.PulsarClientOptions; +import com.flipkart.varadhi.pulsar.entities.PulsarStorageTopic; +import com.flipkart.varadhi.services.MessagingStackOptions; +import com.flipkart.varadhi.services.StorageTopicService; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.admin.Topics; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +public class PulsarStackProviderTest { + private PulsarStackProvider pulsarStackProvider; + private MessagingStackOptions messagingStackOptions; + private ObjectMapper objectMapper; + private PulsarAdmin pulsarAdmin; + + @TempDir + Path tempDir; + + @BeforeEach + public void setUp() throws IOException { + String yamlContent = + "pulsarClientOptions:\n pulsarUrl: \"http://127.0.0.1:8081\"\n connectTimeout: 2000\n readTimeout: 2000\n requestTimeout: 2000\n"; + Path configFile = tempDir.resolve("pulsarConfig.yaml"); + Files.write(configFile, yamlContent.getBytes()); + + messagingStackOptions = new MessagingStackOptions(); + messagingStackOptions.setConfigFile(configFile.toString()); + messagingStackOptions.setProviderClassName("com.flipkart.varadhi.pulsar.PulsarStackProvider"); + + objectMapper = mock(ObjectMapper.class); + pulsarAdmin = mock(PulsarAdmin.class); + pulsarStackProvider = spy(new PulsarStackProvider()); + doReturn(pulsarAdmin).when(pulsarStackProvider).getPulsarAdminClient(any(PulsarClientOptions.class)); + doNothing().when(objectMapper).registerSubtypes(new NamedType(PulsarStorageTopic.class, "Pulsar")); + } + + @Test + public void testInit() { + pulsarStackProvider.init(messagingStackOptions, objectMapper); + verify(objectMapper, times(1)).registerSubtypes(new NamedType(PulsarStorageTopic.class, "Pulsar")); + pulsarStackProvider.init(messagingStackOptions, objectMapper); + verify(objectMapper, times(1)).registerSubtypes(new NamedType(PulsarStorageTopic.class, "Pulsar"));// + } + + @Test + public void testGetStorageTopicFactory_NotInitialized() { + assertThrows(InvalidStateException.class, () -> pulsarStackProvider.getStorageTopicFactory()); + } + + @Test + public void testGetStorageTopicFactory_Initialized() { + pulsarStackProvider.init(messagingStackOptions, objectMapper); + StorageTopicFactory storageTopicFactory = pulsarStackProvider.getStorageTopicFactory(); + StorageTopicFactory storageTopicFactorySecond = + pulsarStackProvider.getStorageTopicFactory(); + Assertions.assertEquals(storageTopicFactory, storageTopicFactorySecond); + PulsarStorageTopic topic = storageTopicFactory.getTopic("foobar", null); + Assertions.assertTrue(topic.getFqdn().endsWith("/foobar")); + Assertions.assertEquals(1, topic.getPartitionCount()); + } + + @Test + public void testGetStorageTopicService_NotInitialized() { + assertThrows(InvalidStateException.class, () -> pulsarStackProvider.getStorageTopicService()); + } + + @Test + public void testGetStorageTopicService_Initialized() throws PulsarAdminException { + pulsarStackProvider.init(messagingStackOptions, objectMapper); + StorageTopicService storageTopicService = pulsarStackProvider.getStorageTopicService(); + StorageTopicService storageTopicServiceSecond = + pulsarStackProvider.getStorageTopicService(); + StorageTopicFactory storageTopicFactory = pulsarStackProvider.getStorageTopicFactory(); + Assertions.assertEquals(storageTopicService, storageTopicServiceSecond); + + Topics topics = mock(Topics.class); + doReturn(topics).when(pulsarAdmin).topics(); + doNothing().when(topics).createPartitionedTopic(anyString(), anyInt(), any(Map.class)); + PulsarStorageTopic pulsarStorageTopic = storageTopicFactory.getTopic("foobar", null); + storageTopicService.create(pulsarStorageTopic); + verify(topics, times(1)).createPartitionedTopic(anyString(), anyInt(), any(Map.class)); + } +} + diff --git a/pulsar/src/test/java/com/flipkart/varadhi/pulsar/clients/AdminClientTest.java b/pulsar/src/test/java/com/flipkart/varadhi/pulsar/clients/AdminClientTest.java new file mode 100644 index 00000000..10cbf4e0 --- /dev/null +++ b/pulsar/src/test/java/com/flipkart/varadhi/pulsar/clients/AdminClientTest.java @@ -0,0 +1,48 @@ +package com.flipkart.varadhi.pulsar.clients; + +import com.flipkart.varadhi.exceptions.VaradhiException; +import com.flipkart.varadhi.pulsar.entities.PulsarStorageTopic; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.admin.Topics; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.*; + +public class AdminClientTest { + private PulsarAdmin pulsarAdmin; + private Topics topics; + private AdminClient adminClient; + + @BeforeEach + public void setUp() { + pulsarAdmin = mock(PulsarAdmin.class); + topics = mock(Topics.class); + doReturn(topics).when(pulsarAdmin).topics(); + adminClient = spy(new AdminClient(pulsarAdmin)); + } + + @Test + public void testCreate() throws PulsarAdminException { + Map properties = new HashMap<>(); + PulsarStorageTopic topic = new PulsarStorageTopic("testTopic", 1); + doNothing().when(topics).createPartitionedTopic(anyString(), anyInt(), anyMap()); + adminClient.create(topic); + verify(topics, times(1)).createPartitionedTopic(eq(topic.getFqdn()), eq(1), eq(properties)); + } + + @Test + public void testCreate_PulsarAdminException() throws PulsarAdminException { + PulsarStorageTopic topic = new PulsarStorageTopic("testTopic", 1); + doThrow(PulsarAdminException.class).when(topics).createPartitionedTopic(anyString(), anyInt(), anyMap()); + assertThrows(VaradhiException.class, () -> adminClient.create(topic)); + verify(pulsarAdmin.topics(), times(1)).createPartitionedTopic(anyString(), anyInt(), anyMap()); + } +}