Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: pikiwidb support multi-raft cluster #442

Merged
merged 19 commits into from
Oct 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions etc/conf/pikiwidb.conf
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ logfile stdout
# Set the number of databases. The default database is DB 0, you can select
# a different one on a per-connection basis using SELECT <dbid> where
# dbid is a number between 0 and 'databases'-1
databases 16
databases 2
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

数据库数量的重大变更

数据库数量从16减少到2是一个显著的变化。

请考虑以下几点:

  1. 这种减少可能会影响系统的整体容量和数据组织方式。
  2. 确保这个变更与多raft集群的实现目标相一致。
  3. 建议在文档中说明这个变更的原因和潜在影响,以便其他开发者和用户理解。

您能否解释一下为什么选择减少到2个数据库?这个决定是基于什么考虑做出的?


################################ SNAPSHOTTING #################################
#
Expand Down Expand Up @@ -343,6 +343,6 @@ rocksdb-ttl-second 604800
rocksdb-periodic-second 259200;

############################### RAFT ###############################
use-raft no
use-raft yes
# Braft relies on brpc to communicate via the default port number plus the port offset
raft-port-offset 10
41 changes: 41 additions & 0 deletions ppd/main.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright (c) 2023-present, OpenAtom Foundation, Inc. All rights reserved.
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*/

#include "brpc/server.h"
#include "butil/errno.h"
#include "gflags/gflags.h"
#include "spdlog/spdlog.h"

#include "pd_service.h"

DEFINE_int32(port, 8080, "Port of rpc server");
DEFINE_int32(idle_timeout_s, 60,
"Connection will be closed if there is no "
"read/write operations during the last `idle_timeout_s`");
DEFINE_int32(max_concurrency, 0, "Limit of request processing in parallel");

int main(int argc, char* argv[]) {
GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
brpc::Server server;
PlacementDriverServiceImpl service;
if (server.AddService(&service, brpc::SERVER_OWNS_SERVICE) != 0) {
spdlog::error("Failed to add service for: {}", berror());
return -1;
}

brpc::ServerOptions options;
options.idle_timeout_sec = FLAGS_idle_timeout_s;
options.max_concurrency = FLAGS_max_concurrency;

// 启动服务
if (server.Start(FLAGS_port, &options) != 0) {
spdlog::error("Failed to start server for: {}", berror());
return -1;
}

server.RunUntilAskedToQuit();
}
93 changes: 93 additions & 0 deletions ppd/pd.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
syntax = "proto3";
package pikiwidb;
option cc_generic_services = true;

message Peer {
string group_id = 1;
int32 cluster_idx = 2;
};

message GetClusterInfoRequest {
};

message GetClusterInfoResponse {
bool success = 1;
repeated Store store = 2;
};

message Store {
int64 store_id = 1;
string ip = 2;
int32 port = 3;
StoreState state = 4;
repeated Region region = 5;
};

message Region {
int64 region_id = 1;
string start_key = 2;
string end_key = 3;
repeated RegionEpoch region_epoch = 4;
repeated Peer peers = 5;
};

message RegionEpoch {
int64 conf_change_ver = 1; // conf change version
int64 region_ver = 2; // region version (split or merge)
};

enum StoreState {
UP = 0;
OFFLINE = 1;
TOMBSTONE = 2;
};

message RegionOptions {
string start_key = 1;
string end_key = 2;
int32 max_data_size = 3;
};

message CreateAllRegionsRequest {
int64 regions_count = 1;
int32 region_peers_count = 2;
repeated RegionOptions regionOptions = 3;
longfar-ncy marked this conversation as resolved.
Show resolved Hide resolved
};

message CreateAllRegionsResponse {
bool success = 1;
};

message DeleteAllRegionsRequest {
};

message DeleteAllRegionsResponse {
bool success = 1;
};

message AddStoreRequest {
string ip = 1;
int32 port = 2;
};

message AddStoreResponse {
bool success = 1;
optional int64 store_id = 2;
optional string redirect = 3;
};

message RemoveStoreRequest {
int64 store_id = 1;
};

message RemoveStoreResponse {
bool success = 1;
};

service PlacementDriverService {
rpc CreateAllRegions(CreateAllRegionsRequest) returns (CreateAllRegionsResponse);
rpc DeleteAllRegions(DeleteAllRegionsRequest) returns (DeleteAllRegionsResponse);
rpc AddStore(AddStoreRequest) returns (AddStoreResponse);
rpc RemoveStore(RemoveStoreRequest) returns (RemoveStoreResponse);
rpc GetClusterInfo(GetClusterInfoRequest) returns (GetClusterInfoResponse);
};
61 changes: 61 additions & 0 deletions ppd/pd_service.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright (c) 2023-present, OpenAtom Foundation, Inc. All rights reserved.
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*/

#include "pd_service.h"

#include "pd_server.h"
#include "spdlog/spdlog.h"

namespace pikiwidb {
void PlacementDriverServiceImpl::CreateAllRegions(::google::protobuf::RpcController* controller,
const ::pikiwidb::CreateAllRegionsRequest* request,
::pikiwidb::CreateAllRegionsResponse* response,
::google::protobuf::Closure* done) {}

void PlacementDriverServiceImpl::DeleteAllRegions(::google::protobuf::RpcController* controller,
const ::pikiwidb::DeleteAllRegionsRequest* request,
::pikiwidb::DeleteAllRegionsResponse* response,
::google::protobuf::Closure* done) {}

void PlacementDriverServiceImpl::AddStore(::google::protobuf::RpcController* controller,
const ::pikiwidb::AddStoreRequest* request,
::pikiwidb::AddStoreResponse* response, ::google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
auto [success, store_id] = PDSERVER.AddStore(request->ip(), request->port());
if (!success) {
response->set_success(false);
return;
}

response->set_success(true);
response->set_store_id(store_id);
spdlog::info("add store success: {}", store_id);
}

void PlacementDriverServiceImpl::RemoveStore(::google::protobuf::RpcController* controller,
const ::pikiwidb::RemoveStoreRequest* request,
::pikiwidb::RemoveStoreResponse* response,
::google::protobuf::Closure* done) {}

void PlacementDriverServiceImpl::GetClusterInfo(::google::protobuf::RpcController* controller,
const ::pikiwidb::GetClusterInfoRequest* request,
::pikiwidb::GetClusterInfoResponse* response,
::google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
PDSERVER.GetClusterInfo(response);
}

void PlacementDriverServiceImpl::OpenPDScheduling(::google::protobuf::RpcController* controller,
const ::pikiwidb::OpenPDSchedulingRequest* request,
::pikiwidb::OpenPDSchedulingResponse* response,
::google::protobuf::Closure* done) {}

void PlacementDriverServiceImpl::ClosePDScheduling(::google::protobuf::RpcController* controller,
const ::pikiwidb::ClosePDSchedulingRequest* request,
::pikiwidb::ClosePDSchedulingResponse* response,
::google::protobuf::Closure* done) {}
} // namespace pikiwidb
44 changes: 44 additions & 0 deletions ppd/pd_service.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright (c) 2023-present, OpenAtom Foundation, Inc. All rights reserved.
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*/
Comment on lines +1 to +6
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

请更新版权声明的年份和组织名称

文件顶部的版权声明可能需要更新,以反映当前的年份和正确的组织名称。


#pragma once

#include "pd.pb.h"

namespace pikiwidb {

class PlacementDriverServiceImpl : public PlacementDriverService {
public:
PlacementDriverServiceImpl() = default;

void CreateAllRegions(::google::protobuf::RpcController* controller,
const ::pikiwidb::CreateAllRegionsRequest* request,
::pikiwidb::CreateAllRegionsResponse* response, ::google::protobuf::Closure* done) override;

void DeleteAllRegions(::google::protobuf::RpcController* controller,
const ::pikiwidb::DeleteAllRegionsRequest* request,
::pikiwidb::DeleteAllRegionsResponse* response, ::google::protobuf::Closure* done) override;

void AddStore(::google::protobuf::RpcController* controller, const ::pikiwidb::AddStoreRequest* request,
::pikiwidb::AddStoreResponse* response, ::google::protobuf::Closure* done) override;

void RemoveStore(::google::protobuf::RpcController* controller, const ::pikiwidb::RemoveStoreRequest* request,
::pikiwidb::RemoveStoreResponse* response, ::google::protobuf::Closure* done) override;

void GetClusterInfo(::google::protobuf::RpcController* controller, const ::pikiwidb::GetClusterInfoRequest* request,
::pikiwidb::GetClusterInfoResponse* response, ::google::protobuf::Closure* done) override;

void OpenPDScheduling(::google::protobuf::RpcController* controller,
const ::pikiwidb::OpenPDSchedulingRequest* request,
::pikiwidb::OpenPDSchedulingResponse* response, ::google::protobuf::Closure* done) override;

void ClosePDScheduling(::google::protobuf::RpcController* controller,
const ::pikiwidb::ClosePDSchedulingRequest* request,
::pikiwidb::ClosePDSchedulingResponse* response, ::google::protobuf::Closure* done) override;
};

} // namespace pikiwidb
39 changes: 39 additions & 0 deletions pproxy/main.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright (c) 2023-present, OpenAtom Foundation, Inc. All rights reserved.
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*/

#include "brpc/server.h"
#include "gflags/gflags.h"
#include "spdlog/spdlog.h"

#include "proxy_service.h"

DEFINE_int32(port, 8080, "Port of rpc server");
DEFINE_int32(idle_timeout_s, 60,
"Connection will be closed if there is no "
"read/write operations during the last `idle_timeout_s`");
DEFINE_int32(max_concurrency, 0, "Limit of request processing in parallel");

int main(int argc, char* argv[]) {
GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
brpc::Server server;
ProxyServiceImpl service;
if (server.AddService(&service, brpc::SERVER_OWNS_SERVICE) != 0) {
spdlog::error("Failed to add service for: {}", berror());
return -1;
}

brpc::ServerOptions options;
options.idle_timeout_sec = FLAGS_idle_timeout_s;
options.max_concurrency = FLAGS_max_concurrency;

if (server.Start(FLAGS_port, &options) != 0) {
spdlog::error("Failed to start server for: {}", berror());
return -1;
}

server.RunUntilAskedToQuit();
}
26 changes: 26 additions & 0 deletions pproxy/proxy.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
syntax = "proto3";
package pikiwidb.proxy;
option cc_generic_services = true;

message RunCommandRequest {
string command = 1;
}

message RunCommandResponse {
string output = 1;
}
message GetRouteInfoRequest {
}
message GetRouteInfoResponse {
message RouteInfo {
string group_id = 1;
string endpoint = 2;
int32 role = 3;
}
repeated RouteInfo infos = 1;
}

service ProxyService {
rpc RunCommand(RunCommandRequest) returns (RunCommandResponse);
rpc GetRouteInfo(GetRouteInfoRequest) returns (GetRouteInfoResponse);
}
65 changes: 65 additions & 0 deletions pproxy/proxy_service.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright (c) 2023-present, OpenAtom Foundation, Inc. All rights reserved.
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*/

#include "proxy_service.h"

#include <array>
#include <memory>
#include <string>

namespace pikiwidb::proxy {
void ProxyServiceImpl::RunCommand(::google::protobuf::RpcController* cntl,
const pikiwidb::proxy::RunCommandRequest* request,
pikiwidb::proxy::RunCommandResponse* response, ::google::protobuf::Closure* done) {
std::string command = request->command(); // 检查命令是否在白名单中

if (!IsCommandAllowed(command)) {
response->set_error("Command not allowed");
done->Run();
return;
}

std::string output = ExecuteCommand(command);
if (output.empty()) {
response->set_error("Command execution failed");
} else {
response->set_output(output);
}
done->Run();
}

void ProxyServiceImpl::GetRouteINfo(::google::protobuf::RpcController* cntl,
const pikiwidb::proxy::GetRouteInfoRequest* request,
pikiwidb::proxy::GetRouteInfoResponse* response,
::google::protobuf::Closure* done) {}

std::string ProxyServiceImpl::ExecuteCommand(const std::string& command) {
if (!IsCommandAllowed(command)) {
return "Command not allowed";
}

std::array<char, 128> buffer;
std::string result;
std::unique_ptr<FILE, decltype(&pclose)> pipe(popen(command.c_str(), "r"), pclose);
if (!pipe) {
return "Failed to execute command";
}

while (true) {
if (fgets(buffer.data(), buffer.size(), pipe.get()) == nullptr) {
if (feof(pipe.get())) {
break;
} else {
return "Error reading command output";
}
}
result += buffer.data();
}
return result;
}

} // namespace pikiwidb::proxy
Loading
Loading