Skip to content
This repository has been archived by the owner on Feb 11, 2020. It is now read-only.

Commit

Permalink
Properly handle un-subscription, fixes #405
Browse files Browse the repository at this point in the history
  • Loading branch information
behrad committed Feb 6, 2016
1 parent 0db960d commit f24ad43
Showing 1 changed file with 29 additions and 4 deletions.
33 changes: 29 additions & 4 deletions lib/persistence/redis.js
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ function RedisPersistence(options, callback) {
this._closing = false;
this._closed = false;

var newSub = function(key, cb, retried) {
var newSub = function(key, unsubs, retried, cb) {
that._client.get(key, function(err, result) {
if (err) {
if (cb) {
Expand All @@ -98,7 +98,7 @@ function RedisPersistence(options, callback) {

if (!result || typeof subs !== 'object') {
if (!retried) {
setTimeout(newSub.bind(null, key, cb, true), 500);
setTimeout(newSub.bind(null, key, unsubs, true, cb), 500);
}
return;
}
Expand All @@ -109,6 +109,12 @@ function RedisPersistence(options, callback) {
}
});

if( unsubs ) {
unsubs.forEach(function(sub) {
that._subMatcher.remove(sub, id);
});
}

var redisError = null;
var redisVersions = that._client.server_info.versions;
if ( (redisVersions[0] * 1000 + redisVersions[1]) < 2006 ) {
Expand All @@ -135,7 +141,7 @@ function RedisPersistence(options, callback) {
}
var parsed = JSON.parse(message);
if (parsed.process !== that._id) {
newSub(parsed.key);
newSub(parsed.key, parsed.unsubs);
}
});

Expand All @@ -147,7 +153,9 @@ function RedisPersistence(options, callback) {
if (err) {
return callback && callback(err, that);
}
async.each(keys, newSub, function(err) {
async.each(keys, function(k,next){
newSub(k,null,false,next);
}, function(err) {
if (callback) {
callback(err, that);
}
Expand Down Expand Up @@ -252,6 +260,23 @@ RedisPersistence.prototype.storeSubscriptions = function(client, cb) {
});

var op = this._client.multi()
.get(clientSubKey, function(err, currentSubs){
if( err || !currentSubs ) {
return;
}
currentSubs = JSON.parse( currentSubs );
var unsubs = Object.keys(currentSubs).filter(function(topic){
return !subscriptions[topic];
});
unsubs.forEach(function(topic) {
that._subMatcher.remove(topic, client.id);
});
that._client.publish(that.options.channel, JSON.stringify({
key: clientSubKey,
unsubs: unsubs,
process: that._id
}));
})
.set(clientSubKey, JSON.stringify(subscriptions))
.publish(this.options.channel, JSON.stringify({
key: clientSubKey,
Expand Down

0 comments on commit f24ad43

Please sign in to comment.