Skip to content

Commit

Permalink
Fix message body corruption on republish (#218)
Browse files Browse the repository at this point in the history
  • Loading branch information
JatinSanghvi authored Oct 17, 2022
1 parent bbd4a43 commit b07eae9
Showing 1 changed file with 21 additions and 15 deletions.
36 changes: 21 additions & 15 deletions extension/WebJobs.Extensions.RabbitMQ/Trigger/RabbitMQListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,38 +85,44 @@ public Task StartAsync(CancellationToken cancellationToken)
this.rabbitMQModel.BasicQos(0, this.prefetchCount, false); // Non zero prefetchSize doesn't work (tested upto 5.2.0) and will throw NOT_IMPLEMENTED exception
this.consumer = new EventingBasicConsumer(this.rabbitMQModel.Model);

this.consumer.Received += async (model, ea) =>
this.consumer.Received += async (model, args) =>
{
using Activity activity = StartActivity(ea);
var input = new TriggeredFunctionData() { TriggerValue = ea };
// The RabbitMQ client rents an array from the ArrayPool to hold a copy of the message body, and passes it
// to the listener. Once all event handlers are executed, the array is returned back to the pool so that the
// memory can be reused for future messages for that connection. However, since our event handler is async,
// the very first await statement i.e. the call to TryExecuteAsync below causes the event handler invocation
// to complete and lets the RabbitMQ client release the memory. This led to message body corruption when the
// message is republished (see: https://github.com/Azure/azure-functions-rabbitmq-extension/issues/211).
//
// We chose to copy the message body instead of having a new 'args' object as there is only one event
// handler registered for the consumer so there should be no side-effects.
args.Body = args.Body.ToArray();
using Activity activity = StartActivity(args);
var input = new TriggeredFunctionData() { TriggerValue = args };
FunctionResult result = await this.executor.TryExecuteAsync(input, cancellationToken).ConfigureAwait(false);
if (!result.Succeeded)
{
ea.BasicProperties.Headers ??= new Dictionary<string, object>();
ea.BasicProperties.Headers.TryGetValue(RequeueCountHeaderName, out object headerValue);
args.BasicProperties.Headers ??= new Dictionary<string, object>();
args.BasicProperties.Headers.TryGetValue(RequeueCountHeaderName, out object headerValue);
int requeueCount = Convert.ToInt32(headerValue, CultureInfo.InvariantCulture) + 1;
if (requeueCount >= 5)
{
// Add message to dead letter exchange.
this.logger.LogDebug("Requeue count exceeded: rejecting message");
this.rabbitMQModel.BasicReject(ea.DeliveryTag, false);
this.rabbitMQModel.BasicReject(args.DeliveryTag, false);
return;
}
this.logger.LogDebug("Republishing message");
ea.BasicProperties.Headers[RequeueCountHeaderName] = requeueCount;
// RabbitMQ client library seems to be reusing the memory pointed by 'ea.Body' for subsequent
// message-received events. This led to https://github.com/Azure/azure-functions-rabbitmq-extension/issues/211.
// Hence, pass a copy of 'ea.Body' to method 'BasicPublish' instead of the object itself to prevent
// sharing of the memory and possibility of memory corruption.
this.rabbitMQModel.BasicPublish(exchange: string.Empty, routingKey: this.queueName, ea.BasicProperties, ea.Body.ToArray());
args.BasicProperties.Headers[RequeueCountHeaderName] = requeueCount;
this.rabbitMQModel.BasicPublish(exchange: string.Empty, routingKey: this.queueName, args.BasicProperties, args.Body);
}
this.rabbitMQModel.BasicAck(ea.DeliveryTag, false);
this.rabbitMQModel.BasicAck(args.DeliveryTag, false);
};

this.consumerTag = this.rabbitMQModel.BasicConsume(queue: this.queueName, autoAck: false, consumer: this.consumer);
Expand Down

0 comments on commit b07eae9

Please sign in to comment.