Skip to content

Commit

Permalink
Merge pull request #1 from CortexFoundation/dev
Browse files Browse the repository at this point in the history
multiply services switch added
  • Loading branch information
ucwong authored Jul 6, 2023
2 parents 6b68681 + a7d4833 commit 2337b4d
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 3 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,5 @@ Make sure config the right cortex RPC service endpoint
```
go run cmd/main.go
```
<img width="1416" alt="image" src="https://github.com/CortexFoundation/robot/assets/22344498/9fea7065-9ffe-4076-a840-5dc218247543">

1 change: 1 addition & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func main() {
if m, err := robot.New(cfg, true, false, false, nil); err != nil {
panic(err)
} else {
m.SwitchService(robot.SRV_PRINT)
if err := m.Start(); err != nil {
log.Error("start failed", "err", err)
panic(err)
Expand Down
22 changes: 22 additions & 0 deletions doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright 2023 The CortexTheseus Authors
// This file is part of the CortexTheseus library.
//
// The CortexTheseus library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The CortexTheseus library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the CortexTheseus library. If not, see <http://www.gnu.org/licenses/>.

package robot

const (
SRV_MODEL = 0
SRV_PRINT = 99
)
57 changes: 54 additions & 3 deletions monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ type Monitor struct {
lock sync.RWMutex

callback chan any

srv atomic.Int32
}

// NewMonitor creates a new instance of monitor.
Expand Down Expand Up @@ -116,6 +118,7 @@ func New(flag *params.Config, cache, compress, listen bool, callback chan any) (
//start: mclock.Now(),
}

// TODO https://github.com/ucwong/golang-kv
if fs_, err := backend.NewChainDB(flag); err != nil {
log.Error("file storage failed", "err", err)
return nil, err
Expand All @@ -137,6 +140,8 @@ func New(flag *params.Config, cache, compress, listen bool, callback chan any) (

m.mode = flag.Mode

m.srv.Store(SRV_MODEL)

/*torrents, _ := fs.initTorrents()
if m.mode != params.LAZY {
for k, v := range torrents {
Expand Down Expand Up @@ -222,7 +227,7 @@ func (m *Monitor) indexCheck() error {
// return err
//}
}
log.Warn("Fs storage is reloading ...", "name", m.ckp.Name, "number", checkpoint.TfsCheckPoint, "version", common.BytesToHash(version), "checkpoint", checkpoint.TfsRoot, "blocks", len(m.fs.Blocks()), "files", len(m.fs.Files()), "txs", m.fs.Txs(), "lastNumber", m.lastNumber.Load(), "last in db", m.fs.LastListenBlockNumber())
log.Warn("Fs storage is reloading ...", "name", m.ckp.Name, "number", checkpoint.TfsCheckPoint, "version", common.BytesToHash(version), "checkpoint", checkpoint.TfsRoot, "blocks", len(m.fs.Blocks()), "files", len(m.fs.Files()), "txs", m.fs.Txs(), "lastNumber", m.lastNumber.Load(), "last", m.fs.LastListenBlockNumber())
} else {
log.Info("Fs storage version check passed", "name", m.ckp.Name, "number", checkpoint.TfsCheckPoint, "version", common.BytesToHash(version), "blocks", len(m.fs.Blocks()), "files", len(m.fs.Files()), "txs", m.fs.Txs())
}
Expand Down Expand Up @@ -646,7 +651,7 @@ func (m *Monitor) syncLatestBlock() {
elapsed := time.Duration(mclock.Now()) - time.Duration(m.start)
log.Info("Finish sync, listener will be paused", "current", m.currentNumber.Load(), "elapsed", common.PrettyDuration(elapsed), "progress", progress, "end", end, "last", m.lastNumber.Load())
//return
timer.Reset(time.Millisecond * 1000 * 180)
timer.Reset(time.Millisecond * 1000 * 60)
end = false
continue
}
Expand Down Expand Up @@ -682,6 +687,10 @@ func (m *Monitor) currentBlock() (uint64, error) {
}

func (m *Monitor) skip(i uint64) bool {
if m.srv.Load() != SRV_MODEL {
return false
}

if len(m.ckp.Skips) == 0 || i > m.ckp.Skips[len(m.ckp.Skips)-1].To || i < m.ckp.Skips[0].From {
return false
}
Expand Down Expand Up @@ -753,6 +762,8 @@ func (m *Monitor) syncLastBlock() uint64 {
m.lastNumber.Store(i - 1)
return 0
}

// batch blocks operation according service category
for _, rpcBlock := range blocks {
if err := m.solve(rpcBlock); err != nil {
m.lastNumber.Store(i - 1)
Expand Down Expand Up @@ -808,7 +819,48 @@ func (m *Monitor) syncLastBlock() uint64 {
return uint64(maxNumber - minNumber)
}

// solve block from node
func (m *Monitor) solve(block *types.Block) error {
switch m.srv.Load() {
case SRV_MODEL:
return m.forModelService(block)
//case 1:
// return m.forExplorerService(block) // others service, explorer, exchange, zkp, nft, etc.
//case 2:
// return m.forExchangeService(block)
case SRV_PRINT:
return m.forPrintService(block)
default:
return errors.New("no block operation service found")
}
}

func (m *Monitor) SwitchService(srv int) error {
m.srv.Store(int32(srv))
return nil
}

// only for examples
func (m *Monitor) forExplorerService(block *types.Block) error {
return errors.New("not support")
}

func (m *Monitor) forExchangeService(block *types.Block) error {
return errors.New("not support")
}

func (m *Monitor) forPrintService(block *types.Block) error {
log.Info("Block print", "num", block.Number, "hash", block.Hash, "txs", len(block.Txs))
if len(block.Txs) > 0 {
for _, t := range block.Txs {
log.Info("Tx print", "hash", t.Hash, "amount", t.Amount, "gas", t.GasLimit, "receipt", t.Recipient, "payload", t.Payload)
}
}
m.fs.Anchor(block.Number)
return nil
}

func (m *Monitor) forModelService(block *types.Block) error {
i := block.Number
if i%65536 == 0 {
defer func() {
Expand Down Expand Up @@ -836,7 +888,6 @@ func (m *Monitor) solve(block *types.Block) error {
log.Debug("Seal fs record", "number", i, "record", record, "root", m.fs.Root().Hex(), "blocks", len(m.fs.Blocks()), "txs", m.fs.Txs(), "files", len(m.fs.Files()), "ckp", m.fs.CheckPoint())
} else {
if m.fs.LastListenBlockNumber() < i {
//m.fs.LastListenBlockNumber = i
m.fs.Anchor(i)
}

Expand Down

0 comments on commit 2337b4d

Please sign in to comment.