From f24ad43a4cf8206e8b3ac4c597001fc0486ba9c9 Mon Sep 17 00:00:00 2001 From: Behrad Date: Sat, 6 Feb 2016 09:07:16 +0330 Subject: [PATCH] Properly handle un-subscription, fixes #405 --- lib/persistence/redis.js | 33 +++++++++++++++++++++++++++++---- 1 file changed, 29 insertions(+), 4 deletions(-) diff --git a/lib/persistence/redis.js b/lib/persistence/redis.js index 1d6d179..f17c925 100644 --- a/lib/persistence/redis.js +++ b/lib/persistence/redis.js @@ -83,7 +83,7 @@ function RedisPersistence(options, callback) { this._closing = false; this._closed = false; - var newSub = function(key, cb, retried) { + var newSub = function(key, unsubs, retried, cb) { that._client.get(key, function(err, result) { if (err) { if (cb) { @@ -98,7 +98,7 @@ function RedisPersistence(options, callback) { if (!result || typeof subs !== 'object') { if (!retried) { - setTimeout(newSub.bind(null, key, cb, true), 500); + setTimeout(newSub.bind(null, key, unsubs, true, cb), 500); } return; } @@ -109,6 +109,12 @@ function RedisPersistence(options, callback) { } }); + if( unsubs ) { + unsubs.forEach(function(sub) { + that._subMatcher.remove(sub, id); + }); + } + var redisError = null; var redisVersions = that._client.server_info.versions; if ( (redisVersions[0] * 1000 + redisVersions[1]) < 2006 ) { @@ -135,7 +141,7 @@ function RedisPersistence(options, callback) { } var parsed = JSON.parse(message); if (parsed.process !== that._id) { - newSub(parsed.key); + newSub(parsed.key, parsed.unsubs); } }); @@ -147,7 +153,9 @@ function RedisPersistence(options, callback) { if (err) { return callback && callback(err, that); } - async.each(keys, newSub, function(err) { + async.each(keys, function(k,next){ + newSub(k,null,false,next); + }, function(err) { if (callback) { callback(err, that); } @@ -252,6 +260,23 @@ RedisPersistence.prototype.storeSubscriptions = function(client, cb) { }); var op = this._client.multi() + .get(clientSubKey, function(err, currentSubs){ + if( err || !currentSubs ) { + return; + } + currentSubs = JSON.parse( currentSubs ); + var unsubs = Object.keys(currentSubs).filter(function(topic){ + return !subscriptions[topic]; + }); + unsubs.forEach(function(topic) { + that._subMatcher.remove(topic, client.id); + }); + that._client.publish(that.options.channel, JSON.stringify({ + key: clientSubKey, + unsubs: unsubs, + process: that._id + })); + }) .set(clientSubKey, JSON.stringify(subscriptions)) .publish(this.options.channel, JSON.stringify({ key: clientSubKey,