Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Makefile: use CXXFLAGS += to allowing add user flags #20

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion pink/Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
CLEAN_FILES = # deliberately empty, so we can append below.
CXX=g++
LDFLAGS= -lpthread -lrt
CXXFLAGS= -g -std=c++11 -fno-builtin-memcmp -msse -msse4.2 -pipe -fPIC
CXXFLAGS += -g -std=c++11 -fno-builtin-memcmp -msse -msse4.2 -pipe -fPIC
PROFILING_FLAGS=-pg
ARFLAGS = rs
OPT=
Expand Down
2 changes: 1 addition & 1 deletion pink/examples/myredis_cli.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class MyClientHandle : public pink::ClientHandle {
int DeleteWorkerSpecificData(void* data) const override {
return 0;
}
void DestConnectFailedHandle(std::string ip_port, std::string reason) const override {
void DestConnectFailedHandle(const std::string& ip_port, const std::string& reason) const override {
}
};

Expand Down
4 changes: 2 additions & 2 deletions pink/include/backend_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class BackendHandle {
/*
* DestConnectFailedHandle(...) will run the invoker's logic when socket connect failed
*/
virtual void DestConnectFailedHandle(std::string ip_port, std::string reason) const {
virtual void DestConnectFailedHandle(const std::string& ip_port, const std::string& reason) const {
UNUSED(ip_port);
UNUSED(reason);
}
Expand Down Expand Up @@ -131,7 +131,7 @@ class BackendThread : public Thread {
void CloseFd(const int fd);
void CleanUpConnRemaining(const int fd);
void DoCronTask();
void NotifyWrite(const std::string ip_port);
void NotifyWrite(const std::string& ip_port);
void NotifyWrite(const int fd);
void NotifyClose(const int fd);
void ProcessNotifyEvents(const PinkFiredEvent* pfe);
Expand Down
3 changes: 2 additions & 1 deletion pink/include/bg_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ struct TimerItem {
}
};

class BGThread : public Thread {
class BGThread final : public Thread {
public:
explicit BGThread(int full = 100000) :
Thread::Thread(),
Expand All @@ -39,6 +39,7 @@ class BGThread : public Thread {
}

virtual ~BGThread() {
// call virtual in destructor, BGThread must be final
StopThread();
}

Expand Down
4 changes: 2 additions & 2 deletions pink/include/client_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class ClientHandle {
/*
* DestConnectFailedHandle(...) will run the invoker's logic when socket connect failed
*/
virtual void DestConnectFailedHandle(std::string ip_port, std::string reason) const {
virtual void DestConnectFailedHandle(const std::string& ip_port, const std::string& reason) const {
UNUSED(ip_port);
UNUSED(reason);
}
Expand Down Expand Up @@ -130,7 +130,7 @@ class ClientThread : public Thread {
void CloseFd(int fd, const std::string& ip_port);
void CleanUpConnRemaining(const std::string& ip_port);
void DoCronTask();
void NotifyWrite(const std::string ip_port);
void NotifyWrite(const std::string& ip_port);
void ProcessNotifyEvents(const PinkFiredEvent* pfe);

int keepalive_timeout_;
Expand Down
2 changes: 1 addition & 1 deletion pink/include/pink_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,11 @@ class PinkConn : public std::enable_shared_from_this<PinkConn> {

private:
int fd_;
int flags_;
std::string ip_port_;
bool is_reply_;
bool is_writable_;
struct timeval last_interaction_;
int flags_;

#ifdef __ENABLE_SSL
SSL* ssl_;
Expand Down
14 changes: 7 additions & 7 deletions pink/include/pink_pubsub.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ class PubSubThread : public Thread {

int Publish(const std::string& channel, const std::string& msg);

void Subscribe(std::shared_ptr<PinkConn> conn,
void Subscribe(const std::shared_ptr<PinkConn>& conn,
const std::vector<std::string>& channels,
const bool pattern,
std::vector<std::pair<std::string, int>>* result);

int UnSubscribe(std::shared_ptr<PinkConn> conn,
int UnSubscribe(const std::shared_ptr<PinkConn>& conn,
const std::vector<std::string>& channels,
const bool pattern,
std::vector<std::pair<std::string, int>>* result);
Expand All @@ -61,17 +61,17 @@ class PubSubThread : public Thread {
int PubSubNumPat();

// Move out from pubsub thread
void MoveConnOut(std::shared_ptr<PinkConn> conn);
void MoveConnOut(const std::shared_ptr<PinkConn>& conn);
// Move into pubsub thread
void MoveConnIn(std::shared_ptr<PinkConn> conn, const NotifyType& notify_type);
void MoveConnIn(const std::shared_ptr<PinkConn>& conn, NotifyType notify_type);

enum ReadyState {
kNotReady,
kReady,
};

struct ConnHandle {
ConnHandle(std::shared_ptr<PinkConn> pc, ReadyState state = kNotReady)
ConnHandle(const std::shared_ptr<PinkConn>& pc, ReadyState state = kNotReady)
: conn(pc), ready_state(state) { }
void UpdateReadyState(const ReadyState& state);
bool IsReady();
Expand All @@ -84,9 +84,9 @@ class PubSubThread : public Thread {
bool IsReady(int fd);

private:
void RemoveConn(std::shared_ptr<PinkConn> conn);
void RemoveConn(const std::shared_ptr<PinkConn>& conn);

int ClientChannelSize(std::shared_ptr<PinkConn> conn);
int ClientChannelSize(const std::shared_ptr<PinkConn>& conn);

int msg_pfd_[2];
bool should_exit_;
Expand Down
4 changes: 1 addition & 3 deletions pink/include/pink_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,11 @@ class Thread {
}

protected:
std::atomic<bool> should_stop_;

private:
static void* RunThread(void* arg);
virtual void *ThreadMain() = 0;

slash::Mutex running_mu_;
std::atomic<bool> should_stop_;
bool running_;
pthread_t thread_id_;
std::string thread_name_;
Expand Down
2 changes: 1 addition & 1 deletion pink/include/redis_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class RedisConn: public PinkConn {

private:
static int ParserDealMessageCb(RedisParser* parser, const RedisCmdArgsType& argv);
static int ParserCompleteCb(RedisParser* parser, const std::vector<RedisCmdArgsType>& argvs);
static int ParserCompleteCb(RedisParser* parser, std::vector<RedisCmdArgsType>&& argvs);
ReadStatus ParseRedisParserStatus(RedisParserStatus status);

HandleType handle_type_;
Expand Down
2 changes: 1 addition & 1 deletion pink/include/redis_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class RedisParser;

typedef std::vector<std::string> RedisCmdArgsType;
typedef int (*RedisParserDataCb) (RedisParser*, const RedisCmdArgsType&);
typedef int (*RedisParserMultiDataCb) (RedisParser*, const std::vector<RedisCmdArgsType>&);
typedef int (*RedisParserMultiDataCb) (RedisParser*, std::vector<RedisCmdArgsType>&&);
typedef int (*RedisParserCb) (RedisParser*);
typedef int RedisParserType;

Expand Down
2 changes: 1 addition & 1 deletion pink/include/server_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ class ServerThread : public Thread {
// Move out from server thread
virtual std::shared_ptr<PinkConn> MoveConnOut(int fd) = 0;
// Move into server thread
virtual void MoveConnIn(std::shared_ptr<PinkConn> conn, const NotifyType& type) = 0;
virtual void MoveConnIn(const std::shared_ptr<PinkConn>& conn, NotifyType type) = 0;

virtual void KillAllConns() = 0;
virtual bool KillConn(const std::string& ip_port) = 0;
Expand Down
31 changes: 3 additions & 28 deletions pink/include/thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,24 +41,7 @@ struct TimeTask {
class ThreadPool {

public:
class Worker {
public:
explicit Worker(ThreadPool* tp) : start_(false), thread_pool_(tp) {};
static void* WorkerMain(void* arg);

int start();
int stop();
private:
pthread_t thread_id_;
std::atomic<bool> start_;
ThreadPool* const thread_pool_;
std::string worker_name_;
/*
* No allowed copy and copy assign
*/
Worker(const Worker&);
void operator=(const Worker&);
};
class Worker;

explicit ThreadPool(size_t worker_num,
size_t max_queue_size,
Expand All @@ -79,26 +62,18 @@ class ThreadPool {
std::string thread_pool_name();

private:
void runInThread();

size_t worker_num_;
size_t max_queue_size_;
std::string thread_pool_name_;
std::queue<Task> queue_;
std::priority_queue<TimeTask> time_queue_;
std::vector<Worker*> workers_;
std::atomic<bool> running_;
std::atomic<bool> should_stop_;

slash::Mutex mu_;
slash::CondVar rsignal_;
slash::CondVar wsignal_;

/*
* No allowed copy and copy assign
*/
ThreadPool(const ThreadPool&);
void operator=(const ThreadPool&);
ThreadPool(const ThreadPool&) = delete;
void operator=(const ThreadPool&) = delete;
};

} // namespace pink
Expand Down
10 changes: 5 additions & 5 deletions pink/src/backend_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ void BackendThread::InternalDebugPrint() {
const std::vector<std::string>& tmp = to_send.second;
for (const auto& tmp_to_send : tmp) {
UNUSED(tmp_to_send);
log_info("%s %s\n", to_send.first.c_str(), tmp_to_send.c_str());
log_info("%d %s\n", to_send.first, tmp_to_send.c_str());
}
}
}
Expand All @@ -297,21 +297,21 @@ void BackendThread::InternalDebugPrint() {
log_info("___________________________________\n");
}

void BackendThread::NotifyWrite(const std::string ip_port) {
void BackendThread::NotifyWrite(const std::string& ip_port) {
// put fd = 0, cause this lib user doesnt need to know which fd to write to
// we will check fd by checking ipport_conns_
PinkItem ti(0, ip_port, kNotiWrite);
pink_epoll_->Register(ti, true);
pink_epoll_->Register(std::move(ti), true);
}

void BackendThread::NotifyWrite(const int fd) {
PinkItem ti(fd, "", kNotiWrite);
pink_epoll_->Register(ti, true);
pink_epoll_->Register(std::move(ti), true);
}

void BackendThread::NotifyClose(const int fd) {
PinkItem ti(fd, "", kNotiClose);
pink_epoll_->Register(ti, true);
pink_epoll_->Register(std::move(ti), true);
}

void BackendThread::ProcessNotifyEvents(const PinkFiredEvent* pfe) {
Expand Down
9 changes: 4 additions & 5 deletions pink/src/client_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -298,11 +298,11 @@ void ClientThread::InternalDebugPrint() {
log_info("___________________________________\n");
}

void ClientThread::NotifyWrite(const std::string ip_port) {
void ClientThread::NotifyWrite(const std::string& ip_port) {
// put fd = 0, cause this lib user doesnt need to know which fd to write to
// we will check fd by checking ipport_conns_
PinkItem ti(0, ip_port, kNotiWrite);
pink_epoll_->Register(ti, true);
pink_epoll_->Register(std::move(ti), true);
}


Expand All @@ -315,7 +315,7 @@ void ClientThread::ProcessNotifyEvents(const PinkFiredEvent* pfe) {
} else {
for (int32_t idx = 0; idx < nread; ++idx) {
PinkItem ti = pink_epoll_->notify_queue_pop();
std::string ip_port = ti.ip_port();
const std::string& ip_port = ti.ip_port();
int fd = ti.fd();
if (ti.notify_type() == kNotiWrite) {
if (ipport_conns_.find(ip_port) == ipport_conns_.end()) {
Expand All @@ -326,8 +326,7 @@ void ClientThread::ProcessNotifyEvents(const PinkFiredEvent* pfe) {
}
Status s = ScheduleConnect(ip, port);
if (!s.ok()) {
std::string ip_port = ip + ":" + std::to_string(port);
handle_->DestConnectFailedHandle(ip_port, s.ToString());
handle_->DestConnectFailedHandle(ip + ":" + std::to_string(port), s.ToString());
log_info("Ip %s, port %d Connect err %s\n", ip.c_str(), port, s.ToString().c_str());
continue;
}
Expand Down
4 changes: 2 additions & 2 deletions pink/src/dispatch_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ std::shared_ptr<PinkConn> DispatchThread::MoveConnOut(int fd) {
return nullptr;
}

void DispatchThread::MoveConnIn(std::shared_ptr<PinkConn> conn, const NotifyType& type) {
void DispatchThread::MoveConnIn(const std::shared_ptr<PinkConn>& conn, NotifyType type) {
WorkerThread* worker_thread = worker_thread_[last_thread_];
bool success = worker_thread->MoveConnIn(conn, type, true);
if (success) {
Expand Down Expand Up @@ -167,7 +167,7 @@ void DispatchThread::HandleNewConn(
bool find = false;
for (int cnt = 0; cnt < work_num_; cnt++) {
WorkerThread* worker_thread = worker_thread_[next_thread];
find = worker_thread->MoveConnIn(ti, false);
find = worker_thread->MoveConnIn(std::move(ti), false);
if (find) {
last_thread_ = (next_thread + 1) % work_num_;
log_info("find worker(%d), refresh the last_thread_ to %d",
Expand Down
2 changes: 1 addition & 1 deletion pink/src/dispatch_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class DispatchThread : public ServerThread {

virtual std::shared_ptr<PinkConn> MoveConnOut(int fd) override;

virtual void MoveConnIn(std::shared_ptr<PinkConn> conn, const NotifyType& type) override;
virtual void MoveConnIn(const std::shared_ptr<PinkConn>& conn, NotifyType type) override;

virtual void KillAllConns() override;

Expand Down
2 changes: 1 addition & 1 deletion pink/src/holy_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class HolyThread: public ServerThread {

virtual std::shared_ptr<PinkConn> MoveConnOut(int fd) override;

virtual void MoveConnIn(std::shared_ptr<PinkConn> conn, const NotifyType& type) override { }
virtual void MoveConnIn(const std::shared_ptr<PinkConn>& conn, NotifyType) override { }

virtual void KillAllConns() override;

Expand Down
4 changes: 2 additions & 2 deletions pink/src/pb_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -201,12 +201,12 @@ void PbConn::TryResizeBuffer() {

void PbConn::NotifyWrite() {
pink::PinkItem ti(fd(), ip_port(), pink::kNotiWrite);
pink_epoll()->Register(ti, true);
pink_epoll()->Register(std::move(ti), true);
}

void PbConn::NotifyClose() {
pink::PinkItem ti(fd(), ip_port(), pink::kNotiClose);
pink_epoll()->Register(ti, true);
pink_epoll()->Register(std::move(ti), true);
}

} // namespace pink
1 change: 1 addition & 0 deletions pink/src/pink_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ PinkConn::PinkConn(const int fd,
#endif
thread_(thread),
pink_epoll_(pink_epoll) {
flags_ = 0;
gettimeofday(&last_interaction_, nullptr);
}

Expand Down
9 changes: 4 additions & 5 deletions pink/src/pink_epoll.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,13 @@ int PinkEpoll::PinkDelEvent(const int fd) {
return epoll_ctl(epfd_, EPOLL_CTL_DEL, fd, &ee);
}

bool PinkEpoll::Register(const PinkItem& it, bool force) {
bool PinkEpoll::Register(PinkItem&& it, bool force) {
bool success = false;
notify_queue_protector_.Lock();
if (force ||
queue_limit_ == kUnlimitedQueue ||
notify_queue_.size() < static_cast<size_t>(queue_limit_)) {
notify_queue_.push(it);
notify_queue_.push(std::move(it));
success = true;
}
notify_queue_protector_.Unlock();
Expand All @@ -93,12 +93,11 @@ bool PinkEpoll::Register(const PinkItem& it, bool force) {
}

PinkItem PinkEpoll::notify_queue_pop() {
PinkItem it;
notify_queue_protector_.Lock();
it = notify_queue_.front();
PinkItem it = std::move(notify_queue_.front());
notify_queue_.pop();
notify_queue_protector_.Unlock();
return it;
return std::move(it);
}

int PinkEpoll::PinkPoll(const int timeout) {
Expand Down
2 changes: 1 addition & 1 deletion pink/src/pink_epoll.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class PinkEpoll {
}
PinkItem notify_queue_pop();

bool Register(const PinkItem& it, bool force);
bool Register(PinkItem&& it, bool force);
bool Deregister(const PinkItem& it) { return false; }

private:
Expand Down
Loading