Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: pikiwidb support multi-raft cluster #442

Merged
merged 19 commits into from
Oct 26, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions etc/conf/pikiwidb.conf
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ logfile stdout
# Set the number of databases. The default database is DB 0, you can select
# a different one on a per-connection basis using SELECT <dbid> where
# dbid is a number between 0 and 'databases'-1
databases 16
databases 2
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

数据库数量的重大变更

数据库数量从16减少到2是一个显著的变化。

请考虑以下几点:

  1. 这种减少可能会影响系统的整体容量和数据组织方式。
  2. 确保这个变更与多raft集群的实现目标相一致。
  3. 建议在文档中说明这个变更的原因和潜在影响,以便其他开发者和用户理解。

您能否解释一下为什么选择减少到2个数据库?这个决定是基于什么考虑做出的?


################################ SNAPSHOTTING #################################
#
Expand Down Expand Up @@ -343,6 +343,6 @@ rocksdb-ttl-second 604800
rocksdb-periodic-second 259200;

############################### RAFT ###############################
use-raft no
use-raft yes
# Braft relies on brpc to communicate via the default port number plus the port offset
raft-port-offset 10
27 changes: 27 additions & 0 deletions ppd/main.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright (c) 2023-present, OpenAtom Foundation, Inc. All rights reserved.
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*/

#include "pd_service.h"

int main(int argc, char* argv[]) {
brpc::Server server;

PlacementDriverServiceImpl service;
if (server.AddService(&service, brpc::SERVER_OWNS_SERVICE) != 0) {
fprintf(stderr, "Fail to add service!\n");
return -1;
}
longfar-ncy marked this conversation as resolved.
Show resolved Hide resolved

// 启动服务
if (server.Start(8080, nullptr) != 0) {
longfar-ncy marked this conversation as resolved.
Show resolved Hide resolved
fprintf(stderr, "Fail to start server!\n");
return -1;
}

server.RunUntilAskedToQuit();
return 0;
}
longfar-ncy marked this conversation as resolved.
Show resolved Hide resolved
84 changes: 84 additions & 0 deletions ppd/pd.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
syntax="proto3";
Copy link

@coderabbitai coderabbitai bot Oct 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

修正语法错误:在syntax声明中添加空格

Protocol Buffers的语法声明需要在等号两侧添加空格。正确的写法是syntax = "proto3";

修复建议:

-syntax="proto3";
+syntax = "proto3";
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
syntax="proto3";
syntax = "proto3";

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

格式修改下

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dingxiaoshuai123,感谢您修改格式!

^(๑˃̵ᴗ˂̵)و/


If you found this review helpful, would you consider giving us a shout-out on X?
Thank you for using CodeRabbit!

package pikiwidb;
option cc_generic_services = true;

import "store.proto";

message GetClusterInfoRequest {
};

message GetClusterInfoResponse {
bool success = 1;
repeated Store store = 2;
};

message Store {
int64 store_id = 1;
string ip = 2;
int32 port = 3;
StoreState state = 4;
repeated Region region = 5;
};

message Region {
int64 region_id = 1;
optional string start_key = 2;
optional string end_key = 3;
Copy link

@coderabbitai coderabbitai bot Oct 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

移除不必要的optional关键字

在proto3中,标量类型默认是可选的,无需使用optional关键字。

修复建议:

-optional string start_key = 2;
-optional string end_key = 3;
+string start_key = 2;
+string end_key = 3;
-optional int64 store_id = 2;
-optional string redirect = 3;
+int64 store_id = 2;
+string redirect = 3;

Also applies to: 66-67

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

proto3 中确实默认了很多关键字,按照兔子的改法改一下吧

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dingxiaoshuai123,好的,请按照建议进行修改。

(_/)
( ^.^)
(> < )


If you found this review helpful, would you consider giving us a shout-out on X?
Thank you for using CodeRabbit!

repeated RegionEpoch region_epoch = 4;
repeated Peer peers = 5;
};

message RegionEpoch {
int64 conf_change_ver = 1; // conf change version
int64 region_ver = 2; // region version (split or merge)
};

enum StoreState {
UP = 0;
OFFLINE = 1;
TOMBSTONE = 2;
};

message CreateAllRegionsRequest {
int64 regions_count = 1;
int32 region_peers_count = 2;
repeated RegionOptions regionOptions = 3;
longfar-ncy marked this conversation as resolved.
Show resolved Hide resolved
};

message CreateAllRegionsResponse {
bool success = 1;
};

message DeleteAllRegionsRequest {
};

message DeleteAllRegionsResponse {
bool success = 1;
};

message AddStoreRequest {
string ip = 1;
int32 port = 2;
};

message AddStoreResponse {
bool success = 1;
optional int64 store_id = 2;
optional string redirect = 3;
};

message RemoveStoreRequest {
int64 store_id = 1;
};

message RemoveStoreResponse {
bool success = 1;
};

service PlacementDriverService {
rpc CreateAllRegions(CreateAllRegionsRequest) returns (CreateAllRegionsResponse);
rpc DeleteAllRegions(DeleteAllRegionsRequest) returns (DeleteAllRegionsResponse);
rpc AddStore(AddStoreRequest) returns (AddStoreResponse);
rpc RemoveStore(RemoveStoreRequest) returns (RemoveStoreResponse);
rpc GetClusterInfo(GetClusterInfoRequest) returns (GetClusterInfoResponse);
};
59 changes: 59 additions & 0 deletions ppd/pd_service.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright (c) 2023-present, OpenAtom Foundation, Inc. All rights reserved.
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*/

#include "pd_service.h"

#include "pd_server.h"

namespace pikiwidb {
void PlacementDriverServiceImpl::CreateAllRegions(::google::protobuf::RpcController* controller,
const ::pikiwidb::CreateAllRegionsRequest* request,
::pikiwidb::CreateAllRegionsResponse* response,
::google::protobuf::Closure* done) {}

void PlacementDriverServiceImpl::DeleteAllRegions(::google::protobuf::RpcController* controller,
const ::pikiwidb::DeleteAllRegionsRequest* request,
::pikiwidb::DeleteAllRegionsResponse* response,
::google::protobuf::Closure* done) {}

void PlacementDriverServiceImpl::AddStore(::google::protobuf::RpcController* controller,
const ::pikiwidb::AddStoreRequest* request,
::pikiwidb::AddStoreResponse* response, ::google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
auto [success, store_id] = PDSERVER.AddStore(request->ip(), request->port());
if (!success) {
response->set_success(false);
return;
}

response->set_success(true);
response->set_store_id(store_id);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

AddStore 方法中添加请求参数的验证

为了提高系统的健壮性和安全性,建议在处理请求之前验证传入的 ipport 参数的有效性。例如,检查 IP 地址的格式是否正确,端口号是否在有效范围内。

AddStore 方法中添加日志记录

为了方便故障排查和系统监控,建议在成功添加 Store 后记录相关信息,例如新添加的 store_id 以及对应的 IP 和端口。


void PlacementDriverServiceImpl::RemoveStore(::google::protobuf::RpcController* controller,
const ::pikiwidb::RemoveStoreRequest* request,
::pikiwidb::RemoveStoreResponse* response,
::google::protobuf::Closure* done) {}

void PlacementDriverServiceImpl::GetClusterInfo(::google::protobuf::RpcController* controller,
const ::pikiwidb::GetClusterInfoRequest* request,
::pikiwidb::GetClusterInfoResponse* response,
::google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
PDSERVER.GetClusterInfo(response);
}

void PlacementDriverServiceImpl::OpenPDScheduling(::google::protobuf::RpcController* controller,
const ::pikiwidb::OpenPDSchedulingRequest* request,
::pikiwidb::OpenPDSchedulingResponse* response,
::google::protobuf::Closure* done) {}

void PlacementDriverServiceImpl::ClosePDScheduling(::google::protobuf::RpcController* controller,
const ::pikiwidb::ClosePDSchedulingRequest* request,
::pikiwidb::ClosePDSchedulingResponse* response,
::google::protobuf::Closure* done) {}
} // namespace pikiwidb
44 changes: 44 additions & 0 deletions ppd/pd_service.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright (c) 2023-present, OpenAtom Foundation, Inc. All rights reserved.
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*/
Comment on lines +1 to +6
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

请更新版权声明的年份和组织名称

文件顶部的版权声明可能需要更新,以反映当前的年份和正确的组织名称。


#pragma once

#include "pd.pb.h"

namespace pikiwidb {

class PlacementDriverServiceImpl : public PlacementDriverService {
public:
PlacementDriverServiceImpl() = default;

void CreateAllRegions(::google::protobuf::RpcController* controller,
const ::pikiwidb::CreateAllRegionsRequest* request,
::pikiwidb::CreateAllRegionsResponse* response, ::google::protobuf::Closure* done) override;

void DeleteAllRegions(::google::protobuf::RpcController* controller,
const ::pikiwidb::DeleteAllRegionsRequest* request,
::pikiwidb::DeleteAllRegionsResponse* response, ::google::protobuf::Closure* done) override;

void AddStore(::google::protobuf::RpcController* controller, const ::pikiwidb::AddStoreRequest* request,
::pikiwidb::AddStoreResponse* response, ::google::protobuf::Closure* done) override;

void RemoveStore(::google::protobuf::RpcController* controller, const ::pikiwidb::RemoveStoreRequest* request,
::pikiwidb::RemoveStoreResponse* response, ::google::protobuf::Closure* done) override;

void GetClusterInfo(::google::protobuf::RpcController* controller, const ::pikiwidb::GetClusterInfoRequest* request,
::pikiwidb::GetClusterInfoResponse* response, ::google::protobuf::Closure* done) override;

void OpenPDScheduling(::google::protobuf::RpcController* controller,
const ::pikiwidb::OpenPDSchedulingRequest* request,
::pikiwidb::OpenPDSchedulingResponse* response, ::google::protobuf::Closure* done) override;

void ClosePDScheduling(::google::protobuf::RpcController* controller,
const ::pikiwidb::ClosePDSchedulingRequest* request,
::pikiwidb::ClosePDSchedulingResponse* response, ::google::protobuf::Closure* done) override;
};

} // namespace pikiwidb
27 changes: 27 additions & 0 deletions pproxy/main.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright (c) 2023-present, OpenAtom Foundation, Inc. All rights reserved.
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*/

#include "proxy_service.h"

int main(int argc, char* argv[]) {
brpc::Server server;

ProxyServiceImpl service;
if (server.AddService(&service, brpc::SERVER_OWNS_SERVICE) != 0) {
fprintf(stderr, "Fail to add service!\n");
return -1;
}

// 启动服务
if (server.Start(8080, nullptr) != 0) {
fprintf(stderr, "Fail to start server!\n");
return -1;
}
longfar-ncy marked this conversation as resolved.
Show resolved Hide resolved

server.RunUntilAskedToQuit();
return 0;
}
28 changes: 28 additions & 0 deletions pproxy/proxy.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
syntax = "proto3";
package pikiwidb.proxy;
option cc_generic_services = true;

// 定义请求和响应
message RunCommandRequest {
string command = 1;
}

message RunCommandResponse {
string output = 1;
}
message GetRouteInfoRequest {
}
message GetRouteInfoResponse {
message RouteInfo {
string group_id = 1;
string endpoint = 2;
int32 role = 3;
}
repeated RouteInfo infos = 1;
}

// 定义服务
longfar-ncy marked this conversation as resolved.
Show resolved Hide resolved
service ProxyService {
rpc RunCommand(RunCommandRequest) returns (RunCommandResponse);
rpc GetRouteInfo(GetRouteInfoRequest) returns (GetRouteInfoResponse);
}
42 changes: 42 additions & 0 deletions pproxy/proxy_service.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (c) 2023-present, OpenAtom Foundation, Inc. All rights reserved.
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*/

#include "proxy_service.h"

namespace pikiwidb::proxy {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

缺少必要的标准库头文件

pproxy/proxy_service.cc 文件中使用了 std::stringstd::arraystd::unique_ptr,但未包含相应的头文件。建议添加以下标准库头文件:

#include <string>
#include <array>
#include <memory>
🔗 Analysis chain

验证头文件包含是否完整

文件头部包含了版权信息和 proxy_service.h,这是好的做法。但是,考虑到文件中使用了 std::stringstd::arraystd::unique_ptr,建议添加相应的标准库头文件。

请运行以下脚本来验证是否需要额外的头文件:

根据脚本结果,可能需要添加以下头文件:

#include <string>
#include <array>
#include <memory>
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# 描述:检查是否需要额外的头文件

# 测试:搜索 proxy_service.h 文件中的头文件包含
echo "Checking proxy_service.h for includes:"
rg '#include' pproxy/proxy_service.h

# 测试:搜索当前文件中使用的 STL 类型
echo "\nChecking for STL types usage in proxy_service.cc:"
rg '\b(string|array|unique_ptr)\b' pproxy/proxy_service.cc

Length of output: 628

void ProxyServiceImpl::RunCommand(::google::protobuf::RpcController* cntl,
const pikiwidb::proxy::RunCommandRequest* request,
pikiwidb::proxy::RunCommandResponse* response, ::google::protobuf::Closure* done) {
std::string command = request->command();
std::string output = ExecuteCommand(command);

response->set_output(output);

done->Run();
}
longfar-ncy marked this conversation as resolved.
Show resolved Hide resolved
void ProxyServiceImpl::GetRouteINfo(::google::protobuf::RpcController* cntl,
const pikiwidb::proxy::GetRouteInfoRequest* request,
pikiwidb::proxy::GetRouteInfoResponse* response,
::google::protobuf::Closure* done) {
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

修复 GetRouteInfo 方法的拼写错误并实现功能

当前 GetRouteInfo 方法存在以下问题:

  1. 方法名拼写错误:GetRouteINfo 应为 GetRouteInfo
  2. 方法体为空,缺少实现。

请进行以下修改:

  1. 更正方法名拼写。
  2. 实现 GetRouteInfo 方法的功能。

示例修复:

void ProxyServiceImpl::GetRouteInfo(::google::protobuf::RpcController* cntl,
                                    const pikiwidb::proxy::GetRouteInfoRequest* request,
                                    pikiwidb::proxy::GetRouteInfoResponse* response,
                                    ::google::protobuf::Closure* done) {
  // TODO: 实现获取路由信息的逻辑
  // 示例:
  // auto route_info = GetCurrentRouteInfo();
  // response->set_route_info(route_info);

  done->Run();
}

请确保实现获取实际路由信息的逻辑。


std::string ProxyServiceImpl::ExecuteCommand(const std::string& command) {
std::array<char, 128> buffer;
std::string result;

// 使用 popen 执行命令
std::unique_ptr<FILE, decltype(&pclose)> pipe(popen(command.c_str(), "r"), pclose);
if (!pipe) {
return "popen() failed!";
}
while (fgets(buffer.data(), buffer.size(), pipe.get()) != nullptr) {
result += buffer.data();
}
return result;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

改进 ExecuteCommand 方法的安全性和健壮性

ExecuteCommand 方法存在以下问题:

  1. 安全性:使用 popen 直接执行shell命令存在潜在的安全风险。
  2. 错误处理:缺少对 fgets 失败的处理。
  3. 资源管理:虽然使用了 unique_ptr,但可以进一步改进。

建议进行以下改进:

  1. 考虑使用更安全的命令执行方式,如通过 execve 执行白名单中的命令。
  2. 添加输入验证和清理逻辑。
  3. 改进错误处理和资源管理。

示例改进:

std::string ProxyServiceImpl::ExecuteCommand(const std::string& command) {
  if (!IsCommandAllowed(command)) {
    return "Command not allowed";
  }

  std::array<char, 128> buffer;
  std::string result;

  std::unique_ptr<FILE, decltype(&pclose)> pipe(popen(command.c_str(), "r"), pclose);
  if (!pipe) {
    return "Failed to execute command";
  }

  while (true) {
    if (fgets(buffer.data(), buffer.size(), pipe.get()) == nullptr) {
      if (feof(pipe.get())) {
        break;  // 正常结束
      } else {
        return "Error reading command output";
      }
    }
    result += buffer.data();
  }

  return result;
}

请确保实现 IsCommandAllowed 函数来验证命令的安全性,并考虑使用更安全的命令执行方式。


} // namespace pikiwidb::proxy
21 changes: 21 additions & 0 deletions pproxy/proxy_service.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright (c) 2023-present, OpenAtom Foundation, Inc. All rights reserved.
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*/

#pragma once

#include "proxy.pb.h"

class ProxyServiceImpl : public ProxyService {
public:
void RunCommand(::google::protobuf::RpcController* cntl, const pikiwidb::proxy::RunCommandRequest* request,
pikiwidb::proxy::RunCommandResponse* response, ::google::protobuf::Closure* done) override;
void GetRouteInfo(::google::protobuf::RpcController* cntl, const pikiwidb::proxy::GetRouteInfoRequest* request,
pikiwidb::proxy::GetRouteInfoResponse* response, ::google::protobuf::Closure* done) override;

private:
std::string ExecuteCommand(const std::string& command);
};
Loading
Loading