diff --git a/src/client.cc b/src/client.cc index 0920b9160..8b3a1fbf8 100644 --- a/src/client.cc +++ b/src/client.cc @@ -625,9 +625,9 @@ void PClient::TransferToSlaveThreads() { // } } -void PClient::AddCurrentToMonitor() { +void PClient::AddToMonitor() { std::unique_lock guard(monitors_mutex); - monitors.insert(std::static_pointer_cast(s_current->shared_from_this())); + monitors.insert(weak_from_this()); } void PClient::FeedMonitors(const std::vector& params) { diff --git a/src/client.h b/src/client.h index 5079ed7b4..3da74f70b 100644 --- a/src/client.h +++ b/src/client.h @@ -250,8 +250,8 @@ class PClient : public std::enable_shared_from_this, public CmdRes { void SetSlaveInfo(); PSlaveInfo* GetSlaveInfo() const { return slave_info_.get(); } void TransferToSlaveThreads(); + void AddToMonitor(); - static void AddCurrentToMonitor(); static void FeedMonitors(const std::vector& params); void SetAuth() { auth_ = true; } diff --git a/src/cmd_admin.cc b/src/cmd_admin.cc index 70fd7a42b..be7872747 100644 --- a/src/cmd_admin.cc +++ b/src/cmd_admin.cc @@ -644,4 +644,14 @@ void SortCmd::InitialArgument() { get_patterns_.clear(); ret_.clear(); } +MonitorCmd::MonitorCmd(const std::string& name, int arity) + : BaseCmd(name, arity, kCmdFlagsReadonly | kCmdFlagsAdmin, kAclCategoryAdmin) {} + +bool MonitorCmd::DoInitial(PClient* client) { return true; } + +void MonitorCmd::DoCmd(PClient* client) { + client->AddToMonitor(); + client->SetRes(CmdRes::kOK); +} + } // namespace pikiwidb diff --git a/src/cmd_admin.h b/src/cmd_admin.h index 1b9fe4f4c..c79c0561e 100644 --- a/src/cmd_admin.h +++ b/src/cmd_admin.h @@ -23,6 +23,7 @@ const std::vector debugHelps = {"DEBUG [ [value] " Crash the server simulating an out-of-memory error."}; namespace pikiwidb { +const std::string kCmdNameMonitor = "monitor"; class CmdConfig : public BaseCmdGroup { public: @@ -213,6 +214,17 @@ class CmdDebugSegfault : public BaseCmd { void DoCmd(PClient* client) override; }; +class MonitorCmd : public BaseCmd { + public: + MonitorCmd(const std::string& name, int arity); + + protected: + bool DoInitial(PClient* client) override; + + private: + void DoCmd(PClient* client) override; +}; + class SortCmd : public BaseCmd { public: SortCmd(const std::string& name, int16_t arity); diff --git a/src/cmd_table_manager.cc b/src/cmd_table_manager.cc index f335de729..8739f4383 100644 --- a/src/cmd_table_manager.cc +++ b/src/cmd_table_manager.cc @@ -61,6 +61,7 @@ void CmdTableManager::InitCmdTable() { ADD_SUBCOMMAND(Debug, OOM, 2); ADD_SUBCOMMAND(Debug, Segfault, 2); ADD_COMMAND(Sort, -2); + ADD_COMMAND(Monitor, 1); // server ADD_COMMAND(Flushdb, 1); diff --git a/tests/admin_test.go b/tests/admin_test.go index 7e2c6c957..5200c2679 100644 --- a/tests/admin_test.go +++ b/tests/admin_test.go @@ -11,6 +11,7 @@ import ( "context" "log" "strconv" + "time" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -251,4 +252,34 @@ var _ = Describe("Admin", Ordered, func() { del2 := client.Del(ctx, "list2") Expect(del2.Err()).NotTo(HaveOccurred()) }) + + It("should monitor", Label("monitor"), func() { + ress := make(chan string) + client1 := s.NewClient() + mn := client1.Monitor(ctx, ress) + mn.Start() + // Wait for the Redis server to be in monitoring mode. + time.Sleep(100 * time.Millisecond) + client.Set(ctx, "foo", "bar", 0) + client.Set(ctx, "bar", "baz", 0) + client.Set(ctx, "bap", 8, 0) + client.Get(ctx, "bap") + lst := []string{} + for i := 0; i < 5; i++ { + s := <-ress + lst = append(lst, s) + } + mn.Stop() + Expect(lst[0]).To(ContainSubstring("OK")) + Expect(lst[2]).To(ContainSubstring(`"set foo bar"`)) + Expect(lst[3]).To(ContainSubstring(`"set bar baz"`)) + Expect(lst[4]).To(ContainSubstring(`"set bap 8"`)) + + err := client1.Close() + if err != nil { + log.Println("Close monitor client conn fail.", err.Error()) + return + } + + }) })