Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove bitcore, make app/query log config options #12

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions bitsocketd/bit.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ const zmq = require("zeromq")
const mingo = require("mingo")
const bcode = require("../bcode")
const jq = require("../bigjq")
const defaults = { host: "127.0.0.1", port: 28339 }
const defaults = { host: "127.0.0.1", port: 28339, logs: 'dev' }
const init = function(config) {
let sock = zmq.socket("sub")
let host = (config.host ? config.host : defaults.host)
let port = (config.port ? config.port : defaults.port)
let logs = (config.logs ? config.logs : defaults.logs)
let connections = config.connections
sock.connect("tcp://" + host + ":" + port)
sock.subscribe("mempool")
Expand All @@ -29,12 +30,16 @@ const init = function(config) {
let result
try {
if (encoded.r && encoded.r.f) {
result = await jq.run(encoded.r.f, [decoded])
if(logs = 'dev') {
result = await jq.run(encoded.r.f, [decoded], true)
} else {
result = await jq.run(encoded.r.f, [decoded], false)
}
} else {
result = [decoded]
}
} catch (e) {
console.log("Error", e)
console.error("Error", e)
}
connection.res.sseSend({ type: "mempool", data: result })
}
Expand All @@ -60,13 +65,17 @@ const init = function(config) {
let result
try {
if (encoded.r && encoded.r.f) {
result = await jq.run(encoded.r.f, [decoded])
if(logs == 'dev') {
result = await jq.run(encoded.r.f, [decoded], true)
} else {
result = await jq.run(encoded.r.f, [decoded], false)
}
} else {
result = decoded
}
transformed.push(result)
} catch (e) {
console.log("Error", e)
console.error("Error", e)
}
}
connection.res.sseSend({
Expand Down
57 changes: 22 additions & 35 deletions bitsocketd/socket.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
const cors = require("cors")
const express = require("express")
const ip = require("ip")
const bch = require("bitcore-lib-cash")
const { v4: uuid } = require('uuid')
const defaults = { port: 3001 }
const init = function(config) {
let app = (config.app ? config.app : express())
let logs = (config.logs ? config.logs : 'dev')
let connections = config.connections
app.use(cors())
app.use(function (req, res, next) {
Expand All @@ -18,66 +19,53 @@ const init = function(config) {
})
res.sseSend({ type: "open", data: [] })
}
res.sseSend = function(data) {
res.write("data: " + JSON.stringify(data) + "\n\n")
}
res.sseHeartbeat = function() {
res.write(":heartbeat" + "\n\n")
}
res.sseSend = function(data) {res.write("data: " + JSON.stringify(data) + "\n\n")}
res.sseHeartbeat = function() {res.write(":heartbeat" + "\n\n")}
next()
})
app.get("/s", async function(req, res) {
try {
let query = {
"v": 3, "q": { "find": {} }
}

// bitcoin address as fingerprint
const privateKey = new bch.PrivateKey()
const fingerprint = privateKey.toAddress().toString()

const fingerprint = uuid()
res.$fingerprint = fingerprint
connections.pool[fingerprint] = { res: res, query: query }
console.log("## Opening connection from: " + fingerprint)
console.log(JSON.stringify(req.headers, null, 2))

console.log("## Opening: " + fingerprint + " - (" + Object.keys(connections.pool).length + " total)")
if (logs == 'dev') console.log(JSON.stringify(req.headers, null, 2))
req.on("close", function() {
console.log("## Closing connection from: " + res.$fingerprint)
console.log(JSON.stringify(req.headers, null, 2))
console.log("## Closing: " + res.$fingerprint + " - (" + Object.keys(connections.pool).length + " total)")
delete connections.pool[res.$fingerprint]
console.log(".. Pool size is now", Object.keys(connections.pool).length)
if (logs == 'dev') console.log(JSON.stringify(req.headers, null, 2))
})
} catch (e) {
console.log(e)
console.error(e)
}
})
app.get(/^\/s\/(.+)/, async function(req, res) {
try {
let b64 = req.params[0]

// bitcoin address as fingerprint
const privateKey = new bch.PrivateKey()
const fingerprint = privateKey.toAddress().toString()

const fingerprint = uuid()
res.sseSetup()
let json = Buffer.from(b64, "base64").toString()
let query = JSON.parse(json)
if (! query.q) {
query.q = {};
query.q = {}
}

res.$fingerprint = fingerprint
connections.pool[fingerprint] = { res: res, query: query }

console.log("## Opening connection from: " + fingerprint)
console.log(JSON.stringify(req.headers, null, 2))
console.log("## Opening: " + fingerprint + " - (" + Object.keys(connections.pool).length + " total)")
if (logs == 'dev') console.log(JSON.stringify(req.headers, null, 2))
req.on("close", function() {
console.log("## Closing connection from: " + res.$fingerprint)
console.log(JSON.stringify(req.headers, null, 2))
console.log("## Closing: " + res.$fingerprint + " - (" + Object.keys(connections.pool).length + " total)")
delete connections.pool[res.$fingerprint]
console.log(".. Pool size is now", Object.keys(connections.pool).length)
if (logs == 'dev') console.log(JSON.stringify(req.headers, null, 2))
})
} catch (e) {
console.log(e)
console.error(e)
}
})
// if no express app was passed in, need to bootstrap.
Expand All @@ -91,19 +79,18 @@ const init = function(config) {
console.log("#")
console.log(`# API Endpoint: ${ip.address()}:${port}/s`)
console.log("#")
console.log("# Learn more at https://bitsocket.org")
console.log("# Learn more at https://fountainhead.cash")
console.log("#")
console.log("######################################################################################")
})
}

// set up heartbeat
setInterval(function() {
console.log('## Sending heartbeat to ' + Object.keys(connections.pool).length);
console.log('## Sending heartbeat to ' + Object.keys(connections.pool).length)
Object.keys(connections.pool).forEach(async function(key) {
let connection = connections.pool[key]
connection.res.sseHeartbeat()
});
}, (config.heartbeat ? config.heartbeat : 10) * 1000); // every N seconds
})
}, (config.heartbeat ? config.heartbeat : 10) * 1000) // every N seconds
}
module.exports = { init: init }
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
"node-jq": "^1.6.0",
"sinon": "^7.3.2",
"traverse": "^0.6.6",
"uuid": "^8.3.0",
"zeromq": "^5.1.0"
},
"devDependencies": {
Expand Down