Skip to content

Commit

Permalink
observer monitors the latest signers and keepers stats (#89)
Browse files Browse the repository at this point in the history
* Save keeper and signer stats

* Add api

* Move conversation id to config

* Should send monitor message to observer

* observer boot blaze

* Fix msg key

* Improve json

* fix the observer stats api url

* remove some unused code

---------

Co-authored-by: Cedric Fung <[email protected]>
  • Loading branch information
hundredark and cedricfung authored Sep 20, 2024
1 parent a35c370 commit 3cfa157
Show file tree
Hide file tree
Showing 11 changed files with 462 additions and 12 deletions.
7 changes: 4 additions & 3 deletions apps/ethereum/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ func VerifyAssetKey(assetKey string) error {

func BuildChainAssetId(base, asset string) string {
h := md5.New()
io.WriteString(h, base)
io.WriteString(h, asset)
_, _ = io.WriteString(h, base)
_, _ = io.WriteString(h, asset)
sum := h.Sum(nil)
sum[6] = (sum[6] & 0x0f) | 0x30
sum[8] = (sum[8] & 0x3f) | 0x80
Expand All @@ -132,7 +132,8 @@ func HashMessageForSignature(msg string) []byte {
return hash.Bytes()
}

// TODO cross-chain deposits might be lost, which are sended from emtpy address and not included in the block traces in polygon
// TODO cross-chain deposits might be lost, which are sended from emtpy address
// and not included in the block traces in polygon pos
func LoopBlockTraces(chain byte, chainId string, traces []*RPCBlockCallTrace, blockTxs []*RPCTransaction) []*Transfer {
txs := []*RPCTransaction{}
for _, tx := range blockTxs {
Expand Down
13 changes: 8 additions & 5 deletions cmd/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func MonitorSigner(ctx context.Context, mdb *mtg.SQLite3Store, store *signer.SQL
logger.Verbosef("Monitor.bundleSignerState() => %v", err)
continue
}
postMessages(ctx, store, conv, conf.MTG, msg)
postMessages(ctx, store, conv, conf.MTG, msg, conf.ObserverUserId)
time.Sleep(30 * time.Minute)
}
}
Expand Down Expand Up @@ -128,7 +128,7 @@ func MonitorKeeper(ctx context.Context, mdb *mtg.SQLite3Store, store *kstore.SQL
logger.Verbosef("Monitor.bundleKeeperState() => %v", err)
continue
}
postMessages(ctx, store, conv, conf.MTG, msg)
postMessages(ctx, store, conv, conf.MTG, msg, conf.ObserverUserId)
time.Sleep(30 * time.Minute)
}
}
Expand Down Expand Up @@ -227,7 +227,7 @@ func bundleKeeperState(ctx context.Context, mdb *mtg.SQLite3Store, store *kstore
return state, nil
}

func postMessages(ctx context.Context, store UserStore, conv *bot.Conversation, conf *mtg.Configuration, msg string) {
func postMessages(ctx context.Context, store UserStore, conv *bot.Conversation, conf *mtg.Configuration, msg, observer string) {
app := conf.App
var messages []*bot.MessageRequest
for i := range conv.Participants {
Expand All @@ -236,7 +236,7 @@ func postMessages(ctx context.Context, store UserStore, conv *bot.Conversation,
continue
}
u, err := fetchConversationUser(ctx, store, s.UserId, conf)
if err != nil || checkBot(u) {
if err != nil || checkBot(u, observer) {
logger.Verbosef("Monitor.fetchConversationUser(%s) => %v %v", s.UserId, u, err)
continue
}
Expand Down Expand Up @@ -282,7 +282,10 @@ func fetchConversationUser(ctx context.Context, store UserStore, id string, conf
return u, err
}

func checkBot(u *bot.User) bool {
func checkBot(u *bot.User, observer string) bool {
if u.UserId == observer {
return false
}
id, err := strconv.ParseInt(u.IdentityNumber, 10, 64)
if err != nil {
panic(u.IdentityNumber)
Expand Down
4 changes: 4 additions & 0 deletions config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ store-dir = "/tmp/safe/signer"
messenger-conversation-id = ""
# the mixin messenger group for monitor messages
monitor-conversation-id = ""
# the observer aggregates the monitor messages
observer-user-id = "observer-id"
# the mpc threshold is recommended to be 2/3 of the mtg members count
threshold = 2
# a shared ed25519 private key to do ecdh with the keeper
Expand Down Expand Up @@ -115,6 +117,8 @@ private-key = "c56d95ec2d09ff5e0975ec0a667cc6cc5f03046935b329fc9f6fb2c3c8500109"
timestamp = 1721930640000000000
keeper-store-dir = "/tmp/safe/keeper"
keeper-public-key = "b6db9ab1f558a8dc064adae960df412b7513c3b02483d3b905ab0eed097dd29d"
# the mixin messenger group for monitor messages
monitor-conversation-id = ""
asset-id = "90f4351b-29b6-3b47-8b41-7efcec3c6672"
custom-key-price-asset-id = "31d2ea9c-95eb-3355-b65b-ba096853bc18"
custom-key-price-amount = "10"
Expand Down
192 changes: 192 additions & 0 deletions observer/blaze.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
package observer

import (
"context"
"encoding/base64"
"encoding/json"
"strings"
"time"

"github.com/MixinNetwork/bot-api-go-client/v3"
"github.com/MixinNetwork/mixin/logger"
)

const (
NodeTypeKeeper = "keeper"
NodeTypeSigner = "signer"
)

type AppInfo struct {
Version string `json:"binary_version"`

//keeper
SignerBitcoinKeys string `json:"signer_bitcoin_keys,omitempty"`
SignerEthereumKeys string `json:"signer_ethereum_keys,omitempty"`
ObserverBitcoinKeys string `json:"observer_bitcoin_keys,omitempty"`
ObserverEthereumKeys string `json:"observer_ethereum_keys,omitempty"`
InitialTxs string `json:"initial_transactions,omitempty"`
PendingTxs string `json:"pending_transactions,omitempty"`
DoneTxs string `json:"done_transactions,omitempty"`
FailedTxs string `json:"failed_transactions,omitempty"`

// signer
InitialSessions string `json:"initial_sessions,omitempty"`
PendingSessions string `json:"pending_sessions,omitempty"`
FinalSessions string `json:"final_sessions,omitempty"`
GeneratedKeys string `json:"generated_keys,omitempty"`
}

type MtgInfo struct {
InitialTxs string `json:"initial_transactions"`
SignedTxs string `json:"signed_transactions"`
SnapshotTxs string `json:"snapshot_transactions"`
MSKTOutputs string `json:"mskt_outputs"`

// keeper
LatestRequest string `json:"latest_request,omitempty"`
BitcoinHeight string `json:"bitcoin_height,omitempty"`
XINOutputs string `json:"xin_outputs,omitempty"`

// signer
MSSTOutputs string `json:"msst_outputs,omitempty"`
}

type StatsInfo struct {
Type string `json:"type"`
Mtg MtgInfo `json:"mtg"`
App AppInfo `json:"app"`
}

func (s *StatsInfo) String() string {
b, err := json.Marshal(s)
if err != nil {
panic(err)
}
return string(b)
}

type mixinBlazeHandler func(ctx context.Context, msg bot.MessageView, clientID string) error

func (f mixinBlazeHandler) OnMessage(ctx context.Context, msg bot.MessageView, clientID string) error {
return f(ctx, msg, clientID)
}

func (f mixinBlazeHandler) OnAckReceipt(ctx context.Context, msg bot.MessageView, clientID string) error {
return nil
}

func (f mixinBlazeHandler) SyncAck() bool {
return true
}

func (node *Node) Blaze(ctx context.Context) {
mixin := node.safeUser()
handler := func(ctx context.Context, botMsg bot.MessageView, clientID string) error {
err := node.handleMessage(ctx, botMsg)
if err != nil {
logger.Printf("blaze.handleMessage() => %v", err)
}
return err
}
for {
client := bot.NewBlazeClient(mixin.UserId, mixin.SessionId, mixin.SessionPrivateKey)
err := client.Loop(ctx, mixinBlazeHandler(handler))
if err != nil {
logger.Printf("client.Loop() => %#v", err)
}
time.Sleep(time.Second)
}
}

func (node *Node) handleMessage(ctx context.Context, bm bot.MessageView) error {
if bm.ConversationId != node.conf.MonitorConversaionId {
return nil
}
if bm.Category != bot.MessageCategoryPlainText {
return nil
}
stats := parseNodeStats(bm.DataBase64)
if stats == nil {
return nil
}
return node.store.UpsertNodeStats(ctx, bm.UserId, stats.Type, stats.String())
}

func parseNodeStats(dataBase64 string) *StatsInfo {
rb, err := base64.RawURLEncoding.DecodeString(dataBase64)
if err != nil {
return nil
}
msg := string(rb)
lines := strings.Split(msg, "\n")

stats := &StatsInfo{
Mtg: MtgInfo{},
App: AppInfo{},
}
switch {
case strings.HasPrefix(msg, "🧱🧱🧱🧱🧱 Keeper 🧱🧱🧱🧱🧱"):
stats.Type = NodeTypeKeeper
case strings.HasPrefix(msg, "📋📋📋📋📋 Signer 📋📋📋📋📋"):
stats.Type = NodeTypeSigner
default:
return nil
}

for _, line := range lines {
if line == "" {
continue
}
items := strings.Split(line, ":")
if len(items) != 2 {
continue
}
key, value := strings.TrimSpace(items[0])[5:], strings.TrimSpace(items[1])
switch key {
case "Latest request":
stats.Mtg.LatestRequest = value
case "Bitcoin height":
stats.Mtg.BitcoinHeight = value
case "Initial Transactions":
if stats.Type == NodeTypeSigner {
stats.Mtg.InitialTxs = value
} else {
stats.App.InitialTxs = value
}
case "Signed Transactions":
stats.Mtg.SignedTxs = value
case "Snapshot Transactions":
stats.Mtg.SnapshotTxs = value
case "XIN Outputs":
stats.Mtg.XINOutputs = value
case "MSKT Outputs":
stats.Mtg.MSKTOutputs = value
case "MSST Outputs":
stats.Mtg.MSSTOutputs = value
case "Signer Bitcoin keys":
stats.App.SignerBitcoinKeys = value
case "Signer Ethereum keys":
stats.App.SignerEthereumKeys = value
case "Observer Bitcoin keys":
stats.App.ObserverBitcoinKeys = value
case "Pending Transactions":
stats.App.PendingTxs = value
case "Done Transactions":
stats.App.DoneTxs = value
case "Failed Transactions":
stats.App.FailedTxs = value
case "Initial sessions":
stats.App.InitialSessions = value
case "Pending sessions":
stats.App.PendingSessions = value
case "Final sessions":
stats.App.FinalSessions = value
case "Generated keys":
stats.App.GeneratedKeys = value
case "Binary version":
stats.App.Version = value
}
}

return stats
}
46 changes: 43 additions & 3 deletions observer/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ func (node *Node) StartHTTP(version, readme string) {
router.GET("/", node.httpIndex)
router.GET("/favicon.ico", node.httpFavicon)
router.GET("/chains", node.httpListChains)
router.GET("/signers", node.httpListSigners)
router.GET("/keepers", node.httpListKeepers)
router.GET("/deposits", node.httpListDeposits)
router.GET("/recoveries", node.httpListRecoveries)
router.GET("/recoveries/:id", node.httpGetRecovery)
Expand All @@ -121,7 +123,7 @@ func (node *Node) httpIndex(w http.ResponseWriter, r *http.Request, params map[s
if r.Host == "safe.mixin.dev" {
w.Header().Set("Content-Type", "text/html; charset=utf-8")
w.WriteHeader(http.StatusOK)
w.Write([]byte(GUIDE))
_, _ = w.Write([]byte(GUIDE))
return
}

Expand Down Expand Up @@ -156,7 +158,7 @@ func (node *Node) httpIndex(w http.ResponseWriter, r *http.Request, params map[s
func (node *Node) httpFavicon(w http.ResponseWriter, r *http.Request, params map[string]string) {
w.Header().Set("Content-Type", "image/vnd.microsoft.icon")
w.WriteHeader(http.StatusOK)
w.Write(FAVICON)
_, _ = w.Write(FAVICON)
}

func (node *Node) httpListChains(w http.ResponseWriter, r *http.Request, params map[string]string) {
Expand Down Expand Up @@ -215,6 +217,44 @@ func (node *Node) httpListChains(w http.ResponseWriter, r *http.Request, params
common.RenderJSON(w, r, http.StatusOK, cs)
}

func (node *Node) httpListSigners(w http.ResponseWriter, r *http.Request, params map[string]string) {
node.httpListNodes(w, r, NodeTypeSigner)
}

func (node *Node) httpListKeepers(w http.ResponseWriter, r *http.Request, params map[string]string) {
node.httpListNodes(w, r, NodeTypeKeeper)
}

func (node *Node) httpListNodes(w http.ResponseWriter, r *http.Request, typ string) {
switch typ {
case NodeTypeKeeper, NodeTypeSigner:
default:
panic(typ)
}
nss, err := node.store.ListNodeStats(r.Context(), typ)
if err != nil {
common.RenderError(w, r, err)
return
}
var ns []map[string]any
for _, n := range nss {
stats, err := n.getStats()
if err != nil {
common.RenderError(w, r, err)
return
}
mp := map[string]any{
"id": n.AppId,
"type": n.Type,
"app": stats.App,
"mtg": stats.Mtg,
"updated_at": n.UpdatedAt,
}
ns = append(ns, mp)
}
common.RenderJSON(w, r, http.StatusOK, ns)
}

func (node *Node) httpListDeposits(w http.ResponseWriter, r *http.Request, params map[string]string) {
holder := r.URL.Query().Get("holder")
chain, _ := strconv.ParseInt(r.URL.Query().Get("chain"), 10, 64)
Expand Down Expand Up @@ -685,7 +725,7 @@ func (node *Node) viewDeposits(ctx context.Context, deposits []*Deposit, sent ma
return view
}

func (node *Node) viewRecoveries(ctx context.Context, recoveries []*Recovery) []map[string]any {
func (node *Node) viewRecoveries(_ context.Context, recoveries []*Recovery) []map[string]any {
view := make([]map[string]any, 0)
for _, r := range recoveries {
rm := map[string]any{
Expand Down
1 change: 1 addition & 0 deletions observer/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type Configuration struct {
PrivateKey string `toml:"private-key"`
Timestamp int64 `toml:"timestamp"`
KeeperStoreDir string `toml:"keeper-store-dir"`
MonitorConversaionId string `toml:"monitor-conversation-id"`
KeeperPublicKey string `toml:"keeper-public-key"`
AssetId string `toml:"asset-id"`
CustomKeyPriceAssetId string `toml:"custom-key-price-asset-id"`
Expand Down
1 change: 1 addition & 0 deletions observer/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ func (node *Node) Boot(ctx context.Context) {
go node.safeKeyLoop(ctx, common.SafeChainEthereum)
go node.mixinWithdrawalsLoop(ctx)
go node.sendAccountApprovals(ctx)
go node.Blaze(ctx)
node.snapshotsLoop(ctx)
}

Expand Down
Loading

0 comments on commit 3cfa157

Please sign in to comment.