Skip to content
This repository has been archived by the owner on Feb 11, 2020. It is now read-only.

Commit

Permalink
Merge pull request #4 from mcollina/master
Browse files Browse the repository at this point in the history
update
  • Loading branch information
behrad committed Jan 29, 2016
2 parents 08809ab + 08a5b2c commit 0db960d
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 60 deletions.
5 changes: 2 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@ addons:
node_js:
- 0.10
- 0.12
- iojs-v1
- iojs-v2
- iojs-v3
- 4
- 5
services:
- redis-server
- mongodb
Expand Down
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ Mosca   [![Build Status](https://travis-ci.org/mcollina/mosca.png
* Various storage options for QoS 1 offline packets, and subscriptions.
* As fast as it is possible.
* Usable inside ANY other Node.js app.
* Supports Node.js v0.10 and v0.12.
* Supports io.js v1.x and v2.x and v3.x (please do not use v3.1.0)
* version 1.0.0+ targets node v5, v4 and v0.12, with partial support for node v0.10.

##Quickstart

Expand Down
10 changes: 10 additions & 0 deletions lib/persistence/memory.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,16 @@ function MemoryPersistence(options, callback) {

util.inherits(MemoryPersistence, LevelUpPersistence);

MemoryPersistence.prototype.close = function(cb) {

MemDOWN.clearGlobalStore();

this._streams.forEach(function(stream) {
stream.destroy();
});
this.db.close(cb);
};

/**
* Export it as a module
*
Expand Down
19 changes: 14 additions & 5 deletions lib/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,11 @@ function Server(opts, callback) {

if (validationResult.errors.length > 0) {
var errMessage = validationResult.errors[0].message;
callback(new Error(errMessage));
if (callback) {
callback(new Error(errMessage));
} else {
throw new Error(errMessage);
}
}

modernOpts = options.populate(modernOpts);
Expand Down Expand Up @@ -155,16 +159,21 @@ function Server(opts, callback) {
// each Server has a dummy id for logging purposes
this.id = this.modernOpts.id || shortid.generate();

this.ascoltatore = this.modernOpts.ascoltatore || ascoltatori.build(this.modernOpts.backend);
this.ascoltatore.on("error", this.emit.bind(this));

// initialize servers list
this.servers = [];

async.series([

// async.series: wait for ascoltatore
function (done) {
that.ascoltatore.on("ready", done);

if(that.modernOpts.ascoltatore) {
that.ascoltatore = that.modernOpts.ascoltatore;
done();
}
else {
that.ascoltatore = ascoltatori.build(that.modernOpts.backend, done);
}
},

// async.series: wait for persistence
Expand Down
51 changes: 27 additions & 24 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "mosca",
"version": "0.32.1",
"version": "1.0.2",
"description": "MQTT broker as a module",
"main": "index.js",
"bin": {
Expand Down Expand Up @@ -29,6 +29,10 @@
"bugs": {
"url": "http://github.com/mcollina/mosca/issues"
},
"engines": {
"node": ">= 0.12",
"iojs": "< 4"
},
"keywords": [
"mqtt",
"mqtt server",
Expand Down Expand Up @@ -66,40 +70,39 @@
"underscore": "^1.7.0"
},
"dependencies": {
"ascoltatori": "~0.21.1",
"async": "~0.9.0",
"brfs": "1.3.0",
"bunyan": "^1.4.0",
"commander": "~2.3.0",
"ascoltatori": "^2.0.0",
"async": "~1.5.2",
"brfs": "~1.4.2",
"bunyan": "^1.5.1",
"commander": "~2.9.0",
"deepcopy": "^0.3.3",
"extend": "^2.0.0",
"json-buffer": "~2.0.7",
"jsonschema": "^1.0.0",
"level-sublevel": "^6.4.6",
"levelup": "^1.2.1",
"lru-cache": "~2.5.0",
"memdown": "~0.10.2",
"minimatch": "~1.0.0",
"json-buffer": "~2.0.11",
"jsonschema": "^1.0.3",
"level-sublevel": "^6.5.2",
"levelup": "^1.3.1",
"lru-cache": "~4.0.0",
"memdown": "~1.1.1",
"minimatch": "~3.0.0",
"moment": "~2.8.0",
"moving-average": "0.0.5",
"mqtt": "^1.3.5",
"mqtt": "^1.6.3",
"mqtt-connection": "^2.1.1",
"msgpack5": "^1.3.0",
"pbkdf2-password": "^1.0.0",
"qlobber": "~0.5.0",
"msgpack5": "^3.3.0",
"pbkdf2-password": "^1.1.0",
"qlobber": "~0.5.3",
"retimer": "^1.0.1",
"shortid": "^2.1.3",
"st": "~0.5.1",
"shortid": "^2.2.4",
"st": "~1.1.0",
"uuid": "^2.0.1",
"websocket-stream": "~2.0.2"
},
"optionalDependencies": {
"leveldown": "~1.4.1",
"nan": "~2.0.5",
"zmq": "~2.12.0",
"leveldown": "~1.4.3",
"zmq": "~2.14.0",
"amqp": "~0.2.4",
"redis": "~0.12.1",
"redis": "~2.4.2",
"hiredis": "^0.4.1",
"mongodb": "~2.0.33"
"mongodb": "~2.1.4"
}
}
46 changes: 21 additions & 25 deletions test/abstract_server.js
Original file line number Diff line number Diff line change
Expand Up @@ -131,14 +131,6 @@ module.exports = function(moscaSettings, createConnection) {
}
}

instance.once("published", function(packet) {
expect(packet.topic).to.be.equal("$SYS/" + instance.id + "/new/subscribes");
var payload = JSON.parse( packet.payload.toString() );
publishedClientId = payload.clientId;
expect(payload.topic).to.be.equal('hello');
verify();
});

buildAndConnect(d, function(client) {
var messageId = Math.floor(65535 * Math.random());
var subscriptions = [{
Expand All @@ -147,9 +139,14 @@ module.exports = function(moscaSettings, createConnection) {
}
];

connectedClient = client;
connectedClient = client;

client.on("suback", function(packet) {
instance.once("published", function(packet) {
expect(packet.topic).to.be.equal("$SYS/" + instance.id + "/new/subscribes");
var payload = JSON.parse( packet.payload.toString() );
publishedClientId = payload.clientId;
expect(payload.topic).to.be.equal('hello');
verify();
client.disconnect();
});

Expand All @@ -173,17 +170,6 @@ module.exports = function(moscaSettings, createConnection) {
}
}

instance.once("published", function(packet) {
expect(packet.topic).to.be.equal("$SYS/" + instance.id + "/new/subscribes");
instance.once("published", function(packet) {
expect(packet.topic).to.be.equal("$SYS/" + instance.id + "/new/unsubscribes");
var payload = JSON.parse( packet.payload.toString() );
expect(payload.topic).to.be.equal('hello');
publishedClientId = payload.clientId;
verify();
});
});

buildAndConnect(d, function(client) {
var messageId = Math.floor(65535 * Math.random());
var subscriptions = [{
Expand All @@ -193,15 +179,25 @@ module.exports = function(moscaSettings, createConnection) {

connectedClient = client;

client.on("unsuback", function(packet) {
client.disconnect();
});
instance.once("published", function(packet) {

expect(packet.topic).to.be.equal("$SYS/" + instance.id + "/new/subscribes");

client.on("suback", function(packet) {
client.unsubscribe({
unsubscriptions: ["hello"],
messageId: messageId
});

instance.once("published", function(packet) {

expect(packet.topic).to.be.equal("$SYS/" + instance.id + "/new/unsubscribes");
var payload = JSON.parse( packet.payload.toString() );
expect(payload.topic).to.be.equal('hello');
publishedClientId = payload.clientId;
verify();

client.disconnect();
});
});

client.subscribe({
Expand Down
2 changes: 1 addition & 1 deletion test/persistence/abstract.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ module.exports = function(create, buildOpts) {
this.instance.storeRetained(packet, done);
});

it("should lookup retain messages and not matching", function(done) {
it("should lookup retain messages and not match", function(done) {
this.instance.lookupRetained("hello", function(err, results) {
expect(results).to.eql([]);
done();
Expand Down

0 comments on commit 0db960d

Please sign in to comment.