Skip to content

Commit

Permalink
Implement missing methods in PulsarMessage (#152)
Browse files Browse the repository at this point in the history
* implement missing methods in Pulsar Message

* use a lazy init approach for parsing request headers
  • Loading branch information
aayustark007-fk authored Jun 12, 2024
1 parent d77831a commit 55fda41
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -1,18 +1,39 @@
package com.flipkart.varadhi.pulsar.entities;

import com.flipkart.varadhi.exceptions.NotImplementedException;
import com.flipkart.varadhi.entities.StandardHeaders;
import com.flipkart.varadhi.pulsar.util.PropertyHelper;
import com.flipkart.varadhi.spi.services.PolledMessage;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import lombok.RequiredArgsConstructor;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.impl.MessageIdImpl;

import java.util.List;
import java.util.Map;

@RequiredArgsConstructor
public class PulsarMessage implements PolledMessage<PulsarOffset> {

private final Message<byte[]> msg;
private ArrayListMultimap<String, String> requestHeaders = null;

public PulsarMessage(Message<byte[]> msg) {
this.msg = msg;
}

private ArrayListMultimap<String, String> requestHeaders() {
if (requestHeaders == null) {
requestHeaders = computeRequestHeaders();
}
return requestHeaders;
}

private ArrayListMultimap<String, String> computeRequestHeaders() {
ArrayListMultimap<String, String> headers = ArrayListMultimap.create();
for (Map.Entry<String, String> entry : msg.getProperties().entrySet()) {
headers.putAll(entry.getKey(), PropertyHelper.decodePropertyValues(entry.getValue()));
}
return headers;
}

@Override
public String getTopicName() {
Expand All @@ -31,27 +52,27 @@ public PulsarOffset getOffset() {

@Override
public String getMessageId() {
throw new NotImplementedException("fetching message Id from pulsar message");
return getHeader(StandardHeaders.MESSAGE_ID);
}

@Override
public String getGroupId() {
throw new NotImplementedException("fetching group id from pulsar message");
return getHeader(StandardHeaders.GROUP_ID);
}

@Override
public boolean hasHeader(String key) {
throw new NotImplementedException("checking header from pulsar message");
return requestHeaders().containsKey(key);
}

@Override
public String getHeader(String key) {
throw new NotImplementedException("fetching header from pulsar message");
return requestHeaders().get(key).get(0);
}

@Override
public List<String> getHeaders(String key) {
throw new NotImplementedException("fetching headers from pulsar message");
return requestHeaders().get(key);
}

@Override
Expand All @@ -61,7 +82,7 @@ public byte[] getPayload() {

@Override
public Multimap<String, String> getRequestHeaders() {
throw new NotImplementedException("get all headers from pulsar message");
return requestHeaders();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.flipkart.varadhi.spi.services.PolledMessage;
import com.flipkart.varadhi.spi.services.PolledMessages;
import jakarta.annotation.Nonnull;
import lombok.RequiredArgsConstructor;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Messages;
Expand All @@ -19,6 +20,7 @@ public int getCount() {
}

@Override
@Nonnull
public Iterator<PolledMessage<PulsarOffset>> iterator() {
Iterator<Message<byte[]>> it = msgs.iterator();
return new Iterator<>() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package com.flipkart.varadhi.pulsar.entities;

import com.flipkart.varadhi.entities.Message;
import com.flipkart.varadhi.entities.ProducerMessage;
import com.flipkart.varadhi.entities.StandardHeaders;
import com.flipkart.varadhi.pulsar.util.PropertyHelper;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.nio.charset.StandardCharsets;
import java.util.List;

class PulsarMessageTest {

@Test
void testPulsarMessagesEqualsProducerMessage() {
// test request headers
Multimap<String, String> requestHeaders = ArrayListMultimap.create();
requestHeaders.put("header1", "value1");
requestHeaders.put(StandardHeaders.MESSAGE_ID, "msgId");
requestHeaders.put(StandardHeaders.GROUP_ID, "grpId");
requestHeaders.putAll("header2", List.of("value2", "value3"));

// now create the producer message
Message producerMessage = new ProducerMessage("message".getBytes(StandardCharsets.UTF_8), requestHeaders);

// create produce path message builder
TypedMessageBuilder<byte[]> messageBuilder = new TypedMessageBuilderImpl<>(null, Schema.BYTES)
.key("key").value(producerMessage.getPayload());
producerMessage.getRequestHeaders().asMap()
.forEach((key, values) -> messageBuilder.property(key, PropertyHelper.encodePropertyValues(values)));

// create pulsar message, which is the message that is consumed by the consumer
PulsarMessage pulsarMessage =
new PulsarMessage(((TypedMessageBuilderImpl<byte[]>) messageBuilder).getMessage());

// pulsar message and producer message should match
Assertions.assertEquals(producerMessage.getMessageId(), pulsarMessage.getMessageId());
Assertions.assertEquals(producerMessage.getGroupId(), producerMessage.getGroupId());

Assertions.assertEquals(producerMessage.getRequestHeaders().size(), pulsarMessage.getRequestHeaders().size());
producerMessage.getRequestHeaders().asMap().forEach((key, values) -> {
Assertions.assertTrue(pulsarMessage.getRequestHeaders().containsKey(key));
Assertions.assertEquals(values, pulsarMessage.getRequestHeaders().get(key));
});

}


}

0 comments on commit 55fda41

Please sign in to comment.