From 55fda410e13f5e980b2d97dc9351f93ffc725d89 Mon Sep 17 00:00:00 2001 From: Aayush Gupta <135004899+aayustark007-fk@users.noreply.github.com> Date: Wed, 12 Jun 2024 19:13:32 +0530 Subject: [PATCH] Implement missing methods in PulsarMessage (#152) * implement missing methods in Pulsar Message * use a lazy init approach for parsing request headers --- .../pulsar/entities/PulsarMessage.java | 39 ++++++++++--- .../pulsar/entities/PulsarMessages.java | 2 + .../pulsar/entities/PulsarMessageTest.java | 55 +++++++++++++++++++ 3 files changed, 87 insertions(+), 9 deletions(-) create mode 100644 pulsar/src/test/java/com/flipkart/varadhi/pulsar/entities/PulsarMessageTest.java diff --git a/pulsar/src/main/java/com/flipkart/varadhi/pulsar/entities/PulsarMessage.java b/pulsar/src/main/java/com/flipkart/varadhi/pulsar/entities/PulsarMessage.java index 24203ba9..239dc226 100644 --- a/pulsar/src/main/java/com/flipkart/varadhi/pulsar/entities/PulsarMessage.java +++ b/pulsar/src/main/java/com/flipkart/varadhi/pulsar/entities/PulsarMessage.java @@ -1,18 +1,39 @@ package com.flipkart.varadhi.pulsar.entities; -import com.flipkart.varadhi.exceptions.NotImplementedException; +import com.flipkart.varadhi.entities.StandardHeaders; +import com.flipkart.varadhi.pulsar.util.PropertyHelper; import com.flipkart.varadhi.spi.services.PolledMessage; +import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Multimap; -import lombok.RequiredArgsConstructor; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.impl.MessageIdImpl; import java.util.List; +import java.util.Map; -@RequiredArgsConstructor public class PulsarMessage implements PolledMessage { private final Message msg; + private ArrayListMultimap requestHeaders = null; + + public PulsarMessage(Message msg) { + this.msg = msg; + } + + private ArrayListMultimap requestHeaders() { + if (requestHeaders == null) { + requestHeaders = computeRequestHeaders(); + } + return requestHeaders; + } + + private ArrayListMultimap computeRequestHeaders() { + ArrayListMultimap headers = ArrayListMultimap.create(); + for (Map.Entry entry : msg.getProperties().entrySet()) { + headers.putAll(entry.getKey(), PropertyHelper.decodePropertyValues(entry.getValue())); + } + return headers; + } @Override public String getTopicName() { @@ -31,27 +52,27 @@ public PulsarOffset getOffset() { @Override public String getMessageId() { - throw new NotImplementedException("fetching message Id from pulsar message"); + return getHeader(StandardHeaders.MESSAGE_ID); } @Override public String getGroupId() { - throw new NotImplementedException("fetching group id from pulsar message"); + return getHeader(StandardHeaders.GROUP_ID); } @Override public boolean hasHeader(String key) { - throw new NotImplementedException("checking header from pulsar message"); + return requestHeaders().containsKey(key); } @Override public String getHeader(String key) { - throw new NotImplementedException("fetching header from pulsar message"); + return requestHeaders().get(key).get(0); } @Override public List getHeaders(String key) { - throw new NotImplementedException("fetching headers from pulsar message"); + return requestHeaders().get(key); } @Override @@ -61,7 +82,7 @@ public byte[] getPayload() { @Override public Multimap getRequestHeaders() { - throw new NotImplementedException("get all headers from pulsar message"); + return requestHeaders(); } @Override diff --git a/pulsar/src/main/java/com/flipkart/varadhi/pulsar/entities/PulsarMessages.java b/pulsar/src/main/java/com/flipkart/varadhi/pulsar/entities/PulsarMessages.java index 3a9c1cc4..941c1284 100644 --- a/pulsar/src/main/java/com/flipkart/varadhi/pulsar/entities/PulsarMessages.java +++ b/pulsar/src/main/java/com/flipkart/varadhi/pulsar/entities/PulsarMessages.java @@ -2,6 +2,7 @@ import com.flipkart.varadhi.spi.services.PolledMessage; import com.flipkart.varadhi.spi.services.PolledMessages; +import jakarta.annotation.Nonnull; import lombok.RequiredArgsConstructor; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Messages; @@ -19,6 +20,7 @@ public int getCount() { } @Override + @Nonnull public Iterator> iterator() { Iterator> it = msgs.iterator(); return new Iterator<>() { diff --git a/pulsar/src/test/java/com/flipkart/varadhi/pulsar/entities/PulsarMessageTest.java b/pulsar/src/test/java/com/flipkart/varadhi/pulsar/entities/PulsarMessageTest.java new file mode 100644 index 00000000..187daebf --- /dev/null +++ b/pulsar/src/test/java/com/flipkart/varadhi/pulsar/entities/PulsarMessageTest.java @@ -0,0 +1,55 @@ +package com.flipkart.varadhi.pulsar.entities; + +import com.flipkart.varadhi.entities.Message; +import com.flipkart.varadhi.entities.ProducerMessage; +import com.flipkart.varadhi.entities.StandardHeaders; +import com.flipkart.varadhi.pulsar.util.PropertyHelper; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.TypedMessageBuilder; +import org.apache.pulsar.client.impl.TypedMessageBuilderImpl; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.nio.charset.StandardCharsets; +import java.util.List; + +class PulsarMessageTest { + + @Test + void testPulsarMessagesEqualsProducerMessage() { + // test request headers + Multimap requestHeaders = ArrayListMultimap.create(); + requestHeaders.put("header1", "value1"); + requestHeaders.put(StandardHeaders.MESSAGE_ID, "msgId"); + requestHeaders.put(StandardHeaders.GROUP_ID, "grpId"); + requestHeaders.putAll("header2", List.of("value2", "value3")); + + // now create the producer message + Message producerMessage = new ProducerMessage("message".getBytes(StandardCharsets.UTF_8), requestHeaders); + + // create produce path message builder + TypedMessageBuilder messageBuilder = new TypedMessageBuilderImpl<>(null, Schema.BYTES) + .key("key").value(producerMessage.getPayload()); + producerMessage.getRequestHeaders().asMap() + .forEach((key, values) -> messageBuilder.property(key, PropertyHelper.encodePropertyValues(values))); + + // create pulsar message, which is the message that is consumed by the consumer + PulsarMessage pulsarMessage = + new PulsarMessage(((TypedMessageBuilderImpl) messageBuilder).getMessage()); + + // pulsar message and producer message should match + Assertions.assertEquals(producerMessage.getMessageId(), pulsarMessage.getMessageId()); + Assertions.assertEquals(producerMessage.getGroupId(), producerMessage.getGroupId()); + + Assertions.assertEquals(producerMessage.getRequestHeaders().size(), pulsarMessage.getRequestHeaders().size()); + producerMessage.getRequestHeaders().asMap().forEach((key, values) -> { + Assertions.assertTrue(pulsarMessage.getRequestHeaders().containsKey(key)); + Assertions.assertEquals(values, pulsarMessage.getRequestHeaders().get(key)); + }); + + } + + +}