You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
We have shared consumer approach, now after the consumer consume the message, I take the messageId covert it to the string and pass it along the ETL process, now after the ETL process is completed. I want to deserialize the message string to the Pulsar MessageId object and the use it for acknowledgement. The weird issue is that it's getting acknowledged but the backlog count in the topic is not getting reduced nor the messageAckCount in the topic is incrementing. So it's actually acknowledging it. Also I verified the consumer is instance is same when pulling the message and acknowledging the message as well. One more weird thing that I found is that if I acknowledge the message immediately once I pull the message, like if I call await consumer.acknowledgeId(messageId); immediately after await consumer.receive(400); then its working fine. I am not able to understand the behavior of Pulsar here, Can someone explain what am doing wrong here ? Btw am using "pulsar-client": "^1.11.0" for Node
Message pulling logic, triggered with a grpahQL query
try {
// Convert the base64 string back to a Buffer
const messageIdBuffer = Buffer.from(messageIdStr, 'base64');
// Deserialize the Buffer to get the MessageId object
const messageId = Pulsar.MessageId.deserialize(messageIdBuffer);
// Re-serialize the MessageId to a base64 string
const reSerializedMessageIdStr = messageId.serialize().toString('base64');
// Compare the original and re-serialized MessageId strings
if (messageIdStr === reSerializedMessageIdStr) {
console.info('MessageId deserialized and re-serialized correctly.');
} else {
console.warn('Mismatch in MessageId serialization/deserialization.');
}
await consumer.acknowledgeId(messageId);
return true;
} catch (error) {
console.error('Acknowledgment failed:', error);
return false;
}
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
-
We have shared consumer approach, now after the consumer consume the message, I take the messageId covert it to the string and pass it along the ETL process, now after the ETL process is completed. I want to deserialize the message string to the Pulsar MessageId object and the use it for acknowledgement. The weird issue is that it's getting acknowledged but the backlog count in the topic is not getting reduced nor the messageAckCount in the topic is incrementing. So it's actually acknowledging it. Also I verified the consumer is instance is same when pulling the message and acknowledging the message as well. One more weird thing that I found is that if I acknowledge the message immediately once I pull the message, like if I call await consumer.acknowledgeId(messageId); immediately after await consumer.receive(400); then its working fine. I am not able to understand the behavior of Pulsar here, Can someone explain what am doing wrong here ? Btw am using "pulsar-client": "^1.11.0" for Node
Message pulling logic, triggered with a grpahQL query
async pull_message(
namespace: string,
topic: string,
subscription_key: string,
) {
const consumer = await this.ensure_consumer(
namespace,
topic,
subscription_key,
);
console.info(
pull_message using consumerId ${(consumer as any).consumerId}
,);
try {
const message = await consumer.receive(400);
}
Message Acknowledgement logic.
async acknowledge_message(
namespace: string,
topic: string,
subscription_key: string,
messageIdStr: string,
) {
const consumer = await this.ensure_consumer(
namespace,
topic,
subscription_key,
);
console.info(
acknowledge_message using consumerId ${(consumer as any).consumerId}
,);
}
async ensure_consumer(namespace, topic, subscription_key) {
const stream_key =
${byterat_iris_apollo_stream_one_tenant}/${namespace}/${topic}
;}
Beta Was this translation helpful? Give feedback.
All reactions