Skip to content

Commit

Permalink
add a new option "ip_worker_name_format" (default format will like "1…
Browse files Browse the repository at this point in the history
…92x168x1x23")
  • Loading branch information
SwimmingTiger committed Oct 13, 2020
1 parent 68fa1bb commit f7a5684
Show file tree
Hide file tree
Showing 8 changed files with 180 additions and 170 deletions.
54 changes: 19 additions & 35 deletions src/Server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -501,11 +501,7 @@ StratumSession::~StratumSession() {

void StratumSession::setWorkerName(const string &fullName) {
if (server_->useIpAsWorkerName()) {
// get source IP address
char saddrBuffer[INET_ADDRSTRLEN];
evutil_inet_ntop(AF_INET, &saddr_, saddrBuffer, INET_ADDRSTRLEN);

workerName_ = saddrBuffer;
workerName_ = Strings::FormatIP(saddr_.s_addr, server_->ipWorkerNameFormat());
return;
}

Expand Down Expand Up @@ -547,8 +543,8 @@ void StratumSession::recvData(struct evbuffer *buf) {
}

/////////////////////////////////// StratumServer //////////////////////////////
StratumServer::StratumServer(const string &listenIP, const uint16_t listenPort)
: listenIP_(listenIP), listenPort_(listenPort)
StratumServer::StratumServer(const AgentConf &conf)
: conf_(conf)
{
upSessions_ .resize(kUpSessionCount_, NULL);
upSessionCount_.resize(kUpSessionCount_, 0);
Expand Down Expand Up @@ -591,14 +587,6 @@ void StratumServer::stop() {
event_base_loopexit(base_, NULL);
}

void StratumServer::addUpPool(const std::vector<PoolConf> &poolConfs) {
upPools_ = poolConfs;

for (const auto &pool : upPools_) {
LOG(INFO) << "add pool: " << pool.host_ << ":" << pool.port_ << ", subaccount name: " << pool.upPoolUserName_ << std::endl;
}
}

UpStratumClient * StratumServer::createUpSession(int8_t idx) {
UpStratumClient *up = createUpClient(idx, this);
if (!up->connect()) {
Expand All @@ -607,25 +595,21 @@ UpStratumClient * StratumServer::createUpSession(int8_t idx) {
return up;
}

bool StratumServer::run(bool alwaysKeepDownconn, bool disconnectWhenLostAsicBoost,
bool useIpAsWorkerName, bool submitResponseFromServer,
const string &fixedWorkerName) {
alwaysKeepDownconn_ = alwaysKeepDownconn;
disconnectWhenLostAsicBoost_ = disconnectWhenLostAsicBoost;
useIpAsWorkerName_ = useIpAsWorkerName;
submitResponseFromServer_ = submitResponseFromServer;
fixedWorkerName_ = fixedWorkerName;
bool StratumServer::run() {
for (const auto &pool : conf_.pools_) {
LOG(INFO) << "add pool: " << pool.host_ << ":" << pool.port_ << ", subaccount name: " << pool.upPoolUserName_ << std::endl;
}

if (!fixedWorkerName_.empty()) {
LOG(INFO) << "[OPTION] Fixed worker name enabled, all worker name will be replaced to " << fixedWorkerName_ << " on the server.";
if (!conf_.fixedWorkerName_.empty()) {
LOG(INFO) << "[OPTION] Fixed worker name enabled, all worker name will be replaced to " << conf_.fixedWorkerName_ << " on the server.";
}

if (running_) {
return false;
}
running_ = true;

if (upPools_.size() == 0) {
if (conf_.pools_.size() == 0) {
return false;
}

Expand Down Expand Up @@ -694,10 +678,10 @@ bool StratumServer::run(bool alwaysKeepDownconn, bool disconnectWhenLostAsicBoos
struct sockaddr_in sin;
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_port = htons(listenPort_);
sin.sin_port = htons(conf_.listenPort_);
sin.sin_addr.s_addr = htonl(INADDR_ANY);
if (evutil_inet_pton(AF_INET, listenIP_.c_str(), &sin.sin_addr) == 0) {
LOG(ERROR) << "invalid ip: " << listenIP_ << std::endl;
if (evutil_inet_pton(AF_INET, conf_.listenIP_.c_str(), &sin.sin_addr) == 0) {
LOG(ERROR) << "invalid ip: " << conf_.listenIP_ << std::endl;
return false;
}

Expand All @@ -709,11 +693,11 @@ bool StratumServer::run(bool alwaysKeepDownconn, bool disconnectWhenLostAsicBoos
-1,
(struct sockaddr*)&sin, sizeof(sin));
if(!listener_) {
LOG(ERROR) << "cannot create listener: " << listenIP_ << ":" << listenPort_ << std::endl;
LOG(ERROR) << "cannot create listener: " << conf_.listenIP_ << ":" << conf_.listenPort_ << std::endl;
return false;
}

LOG(INFO) << "startup is successful, listening: " << listenIP_ << ":" << listenPort_ << std::endl;
LOG(INFO) << "startup is successful, listening: " << conf_.listenIP_ << ":" << conf_.listenPort_ << std::endl;

assert(base_ != NULL);
event_base_dispatch(base_);
Expand Down Expand Up @@ -776,7 +760,7 @@ void StratumServer::checkUpSessions() {
continue;
}

if (alwaysKeepDownconn_ || upSessions_[i]->reconnectCount_ < upPools_.size()) {
if (conf_.alwaysKeepDownconn_ || upSessions_[i]->reconnectCount_ < conf_.pools_.size()) {
upSessions_[i]->reconnect();
continue;
}
Expand Down Expand Up @@ -1008,7 +992,7 @@ void StratumServer::upEventCallback(struct bufferevent *bev,
LOG(ERROR) << "unhandled events from pool server: " << events << std::endl;
}

if (server->alwaysKeepDownconn_ || up->reconnectCount_ < server->upPools_.size()) {
if (server->conf_.alwaysKeepDownconn_ || up->reconnectCount_ < server->conf_.pools_.size()) {
up->reconnect();
return;
}
Expand Down Expand Up @@ -1081,7 +1065,7 @@ UpStratumClient *StratumServer::findUpSession() {
}

// Provide an unavailable connection if no available connection can be found
if (upSession == nullptr && alwaysKeepDownconn_) {
if (upSession == nullptr && conf_.alwaysKeepDownconn_) {
for (size_t i = 0; i < upSessions_.size(); i++) {
if (upSessions_[i] == nullptr) {
continue;
Expand All @@ -1108,7 +1092,7 @@ void StratumServer::registerWorker(StratumSession *downSession) {
//
// | magic_number(1) | cmd(1) | len (2) | session_id(2) | clientAgent | worker_name |
//
string workerName = fixedWorkerName_.empty() ? downSession->workerName() : fixedWorkerName_;
string workerName = conf_.fixedWorkerName_.empty() ? downSession->workerName() : conf_.fixedWorkerName_;

uint16_t len = 0;
len += (1+1+2+2); // magic_num, cmd, len, session_id
Expand Down
32 changes: 11 additions & 21 deletions src/Server.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,10 +211,6 @@ class StratumServer {
static const int8_t kUpSessionCount_ = 5; // MAX is 127
bool running_ = false;

string listenIP_;
uint16_t listenPort_ = 0;
vector<PoolConf> upPools_;

struct event *upEvTimer_ = nullptr;

// libevent2
Expand All @@ -240,29 +236,25 @@ class StratumServer {

// down stream connections
vector<StratumSession *> downSessions_;
bool alwaysKeepDownconn_ = false;
bool disconnectWhenLostAsicBoost_ = false;
bool useIpAsWorkerName_ = false;
bool submitResponseFromServer_ = false;
string fixedWorkerName_;

AgentConf conf_;

public:
SessionIDManager sessionIDManager_;

public:
StratumServer(const string &listenIP, const uint16_t listenPort);
StratumServer(const AgentConf &conf);
virtual ~StratumServer();

UpStratumClient *createUpSession(const int8_t idx);

void addUpPool(const std::vector<PoolConf> &poolConfs);

inline const vector<PoolConf>& getUpPools() { return upPools_; }
inline const vector<PoolConf>& getUpPools() { return conf_.pools_; }
inline struct event_base* getEventBase() { return base_; }
inline bool disconnectWhenLostAsicBoost() { return disconnectWhenLostAsicBoost_; }
inline bool useIpAsWorkerName() { return useIpAsWorkerName_; }
inline bool submitResponseFromServer() { return submitResponseFromServer_; }
inline const string &fixedWorkerName() { return fixedWorkerName_; }
inline bool disconnectWhenLostAsicBoost() { return conf_.disconnectWhenLostAsicBoost_; }
inline bool useIpAsWorkerName() { return conf_.useIpAsWorkerName_; }
inline string ipWorkerNameFormat() { return conf_.ipWorkerNameFormat_; }
inline bool submitResponseFromServer() { return conf_.submitResponseFromServer_; }
inline const string &fixedWorkerName() { return conf_.fixedWorkerName_; }

void addDownConnection (StratumSession *conn);
void removeDownConnection(StratumSession *conn);
Expand Down Expand Up @@ -296,10 +288,8 @@ class StratumServer {
void registerWorker (UpStratumClient *upSession);
void registerWorker (StratumSession *downSession);
void unRegisterWorker(StratumSession *downSession);

bool run(bool alwaysKeepDownconn, bool disconnectWhenLostAsicBoost,
bool useIpAsWorkerName, bool submitResponseFromServer,
const string &fixedWorkerName);

bool run();
void stop();
};

Expand Down
94 changes: 72 additions & 22 deletions src/Utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
#include <iostream>
#include <iomanip>

#include <event2/util.h>

#if defined(SUPPORT_GLOG) && defined(GLOG_TO_STDOUT)
void GLogToStdout::send(google::LogSeverity severity, const char* full_filename,
const char* base_filename, int line, const struct ::tm* tm_time,
Expand Down Expand Up @@ -89,6 +91,31 @@ void Strings::Append(string & dest, const char * fmt, ...) {
}
}

string Strings::ReplaceAll(std::string str, const std::string& from, const std::string& to) {
size_t start_pos = 0;
while((start_pos = str.find(from, start_pos)) != std::string::npos) {
str.replace(start_pos, from.length(), to);
start_pos += to.length(); // Handles case where 'to' is a substring of 'from'
}
return str;
}

string Strings::FormatIP(uint32_t ipv4Int, string format) {
union IPv4Arr {
uint32_t ipv4Int;
uint8_t parts[4];
} ipv4Arr;

ipv4Arr.ipv4Int = std::move(ipv4Int);

format = Strings::ReplaceAll(format, string("{1}"), std::to_string(ipv4Arr.parts[0]));
format = Strings::ReplaceAll(format, string("{2}"), std::to_string(ipv4Arr.parts[1]));
format = Strings::ReplaceAll(format, string("{3}"), std::to_string(ipv4Arr.parts[2]));
format = Strings::ReplaceAll(format, string("{4}"), std::to_string(ipv4Arr.parts[3]));

return format;
}

string getJsonStr(const char *c,const jsmntok_t *t) {
if (t == NULL || t->end <= t->start)
return "";
Expand All @@ -105,12 +132,7 @@ int jsoneq(const char *json, jsmntok_t *tok, const char *s) {
return -1;
}

bool parseConfJson(const string &jsonStr,
string &agentType, string &listenIP, string &listenPort,
std::vector<PoolConf> &poolConfs,
bool &alwaysKeepDownconn, bool &disconnectWhenLostAsicBoost,
bool &useIpAsWorkerName, bool &submitResponseFromServer,
string &fixedWorkerName) {
bool parseConfJson(const string &jsonStr, AgentConf &conf) {
jsmn_parser p;
jsmn_init(&p);
jsmntok_t t[64]; // we expect no more than 64 tokens
Expand All @@ -128,15 +150,15 @@ bool parseConfJson(const string &jsonStr,

for (int i = 1; i < r; i++) {
if (jsoneq(c, &t[i], "agent_type") == 0) {
agentType = getJsonStr(c, &t[i+1]);
conf.agentType_ = getJsonStr(c, &t[i+1]);
i++;
}
if (jsoneq(c, &t[i], "agent_listen_ip") == 0) {
listenIP = getJsonStr(c, &t[i+1]);
conf.listenIP_ = getJsonStr(c, &t[i+1]);
i++;
}
else if (jsoneq(c, &t[i], "agent_listen_port") == 0) {
listenPort = getJsonStr(c, &t[i+1]);
conf.listenPort_ = (uint16_t)strtoul(getJsonStr(c, &t[i+1]).c_str(), NULL, 10);
i++;
}
else if (jsoneq(c, &t[i], "pools") == 0) {
Expand All @@ -159,51 +181,79 @@ bool parseConfJson(const string &jsonStr,
return false;
}

PoolConf conf;
conf.host_ = getJsonStr(c, &t[idx + 1]);
conf.port_ = (uint16_t)strtoul(getJsonStr(c, &t[idx + 2]).c_str(), NULL, 10);
conf.upPoolUserName_= getJsonStr(c, &t[idx + 3]);
PoolConf pool;
pool.host_ = getJsonStr(c, &t[idx + 1]);
pool.port_ = (uint16_t)strtoul(getJsonStr(c, &t[idx + 2]).c_str(), NULL, 10);
pool.upPoolUserName_ = getJsonStr(c, &t[idx + 3]);

poolConfs.push_back(conf);
conf.pools_.push_back(pool);
}
i += poolCount * 4;
}
else if (jsoneq(c, &t[i], "always_keep_downconn") == 0) {
string opt = getJsonStr(c, &t[i+1]);
std::transform(opt.begin(), opt.end(), opt.begin(), ::tolower);
alwaysKeepDownconn = (opt == "true");
conf.alwaysKeepDownconn_ = (opt == "true");
i++;
}
else if (jsoneq(c, &t[i], "disconnect_when_lost_asicboost") == 0) {
string opt = getJsonStr(c, &t[i + 1]);
std::transform(opt.begin(), opt.end(), opt.begin(), ::tolower);
disconnectWhenLostAsicBoost = (opt == "true");
conf.disconnectWhenLostAsicBoost_ = (opt == "true");
i++;
}
else if (jsoneq(c, &t[i], "use_ip_as_worker_name") == 0) {
string opt = getJsonStr(c, &t[i + 1]);
std::transform(opt.begin(), opt.end(), opt.begin(), ::tolower);
useIpAsWorkerName = (opt == "true");
conf.useIpAsWorkerName_ = (opt == "true");
i++;
}
else if (jsoneq(c, &t[i], "ip_worker_name_format") == 0) {
conf.ipWorkerNameFormat_ = getJsonStr(c, &t[i+1]);
i++;
}
else if (jsoneq(c, &t[i], "submit_response_from_server") == 0) {
string opt = getJsonStr(c, &t[i + 1]);
std::transform(opt.begin(), opt.end(), opt.begin(), ::tolower);
submitResponseFromServer = (opt == "true");
conf.submitResponseFromServer_ = (opt == "true");
i++;
}
else if (jsoneq(c, &t[i], "fixed_worker_name") == 0) {
fixedWorkerName = getJsonStr(c, &t[i+1]);
conf.fixedWorkerName_ = getJsonStr(c, &t[i+1]);
i++;
}
}

// check parametes
if (listenIP.length() && listenPort.length() && poolConfs.size()) {
return true;
if (conf.listenIP_.empty()) {
LOG(ERROR) << "[conf] agent_listen_ip cannot be empty.";
return false;
}

if (conf.listenPort_ == 0) {
LOG(ERROR) << "[conf] agent_listen_port must be between 1 and 65535.";
return false;
}

if (conf.pools_.empty()) {
LOG(ERROR) << "[conf] pools cannot be empty.";
return false;
}

for (size_t i=0; i<conf.pools_.size(); i++) {
const auto &pool = conf.pools_[i];
if (pool.host_.empty()) {
LOG(ERROR) << "[conf] the host of pool " << i << " cannot be empty.";
return false;
}
if (pool.port_ == 0) {
LOG(ERROR) << "[conf] the port of pool " << i << " (" << pool.host_
<< ") must be between 1 and 65535.";
return false;
}
}

return false;
return true;
}

const char *splitNotify(const string &line, int number) {
Expand Down
Loading

0 comments on commit f7a5684

Please sign in to comment.