Skip to content

Commit

Permalink
allow overriding correlator. enables ability to not use .queues file …
Browse files Browse the repository at this point in the history
…in case manually providing all subscription queue names. improved some logging and listening+subscribed queue events
  • Loading branch information
Matt Walters committed Apr 11, 2017
1 parent a72bd27 commit cb0729f
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 10 deletions.
20 changes: 16 additions & 4 deletions bus/rabbitmq/bus.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ function RabbitMQBus (options, implOpts) {

this.assertQueuesOnFirstSend = (options.assertQueuesOnFirstSend === undefined) ? true : options.assertQueuesOnFirstSend;
this.channels = [];
this.correlator = new Correlator(options);
this.correlator = options.correlator || new Correlator(options);
this.delayOnStartup = options.delayOnStartup || 10;
this.exchangeName = options.exchangeName;
this.formatter = json;
Expand Down Expand Up @@ -105,7 +105,9 @@ util.inherits(RabbitMQBus, Bus);

RabbitMQBus.prototype.listen = function listen (queueName, options, callback) {

this.log('listen on queue %s', queueName);
var self = this;

this.log('listen on queue %j', queueName);

if (typeof options === "function") {
callback = options;
Expand All @@ -120,7 +122,11 @@ RabbitMQBus.prototype.listen = function listen (queueName, options, callback) {

if (this.queues[options.queueName] === undefined) {
this.log('creating queue %s', options.queueName);
this.queues[options.queueName] = new Queue(options);
var queue = new Queue(options);
queue.on('listening', function () {
self.emit('listening', queue);
});
this.queues[options.queueName] = queue;
}

this.queues[options.queueName].listen(callback, options);
Expand Down Expand Up @@ -214,6 +220,8 @@ RabbitMQBus.prototype.subscribe = function subscribe (queueName, options, callba
options = {};
}

this.log('subscribe on queue %j', queueName);

var handle = null;
function _unsubscribe (options) {
handle.unsubscribe(options);
Expand All @@ -227,7 +235,11 @@ RabbitMQBus.prototype.subscribe = function subscribe (queueName, options, callba

if (this.pubsubqueues[options.queueName] === undefined) {
this.log('creating pusubqueue %s', options.queueName);
this.pubsubqueues[options.queueName] = new PubSubQueue(options);
var pubSubQueue = new PubSubQueue(options);
pubSubQueue.on('subscribed', function () {
self.emit('subscribed', pubSubQueue);
});
this.pubsubqueues[options.queueName] = pubSubQueue;
}

handle = this.pubsubqueues[options.queueName].subscribe(options, callback);
Expand Down
12 changes: 7 additions & 5 deletions bus/rabbitmq/pubsubqueue.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,13 @@ PubSubQueue.prototype.publish = function publish (event, options, cb) {

PubSubQueue.prototype.subscribe = function subscribe (options, callback) {
var self = this;
var listening = false;
var subscribed = false;
var subscription = null;

this.log('subscribing to queue %j with routingKey %j', this.queueName, this.routingKey);

function _unsubscribe (cb) {
if (listening) {
if (subscribed) {
// should we prevent multiple cancel calls?
self.listenChannel
.cancel(subscription.consumerTag)
Expand All @@ -84,7 +86,7 @@ PubSubQueue.prototype.subscribe = function subscribe (options, callback) {
}
});
} else {
self.on('listening', _unsubscribe.bind(this, cb));
self.on('subscribed', _unsubscribe.bind(this, cb));
}
}

Expand Down Expand Up @@ -119,9 +121,9 @@ PubSubQueue.prototype.subscribe = function subscribe (options, callback) {
});
}, { noAck: ! self.ack })
.then(function (ok) {
listening = true;
subscribed = true;
subscription = { consumerTag: ok.consumerTag };
self.emit('listening');
self.emit('subscribed');
});
}

Expand Down
2 changes: 1 addition & 1 deletion bus/rabbitmq/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ Queue.prototype.listen = function listen (callback, options) {

var self = this;

this.log('listening to queue %s', this.queueName);
this.log('listening to queue %j', this.queueName);

if ( ! this.initialized) {
return this.on('ready', listen.bind(this, callback, options));
Expand Down

0 comments on commit cb0729f

Please sign in to comment.