Skip to content

Commit

Permalink
Stop the queue on close
Browse files Browse the repository at this point in the history
  • Loading branch information
marcospassos committed Oct 17, 2024
1 parent 76dbce7 commit 077619e
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 4 deletions.
4 changes: 4 additions & 0 deletions src/channel/queuedChannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ export class QueuedChannel<T> implements OutputChannel<T> {
throw error;
}

if (this.closed) {
throw MessageDeliveryError.retryable('Connection deliberately closed.');
}

try {
const result = await this.channel.publish(message);

Expand Down
25 changes: 21 additions & 4 deletions test/channel/queuedChannel.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -314,13 +314,30 @@ describe('A queued channel', () => {
it('should close the output channel and wait for pending messages', async () => {
const outputChannel: OutputChannel<string> = {
close: jest.fn().mockResolvedValue(undefined),
publish: jest.fn().mockResolvedValue(undefined),
publish: jest.fn()
.mockImplementationOnce(
() => new Promise(resolve => {
setTimeout(resolve, 2);
}),
),
};
const channel = new QueuedChannel(outputChannel, new InMemoryQueue('foo'));

await expect(channel.flush()).resolves.toBeUndefined();
const queue = new InMemoryQueue();
const channel = new QueuedChannel(outputChannel, queue);

await channel.close();
const firstPromise = channel.publish('foo');
const secondPromise = channel.publish('bar');

const close = new Promise(resolve => { setTimeout(() => resolve(channel.close()), 1); });

await expect(firstPromise).resolves.toBeUndefined();
await expect(secondPromise).rejects.toThrowWithMessage(MessageDeliveryError, 'Connection deliberately closed.');
await expect(close).resolves.toBeUndefined();

expect(outputChannel.publish).toHaveBeenCalledTimes(1);

expect(queue.length()).toBe(1);
expect(queue.peek()).toBe('bar');

expect(outputChannel.close).toHaveBeenCalled();
});
Expand Down

0 comments on commit 077619e

Please sign in to comment.