From 616393d7dc2e4a25e47b109b9bd654e24a555916 Mon Sep 17 00:00:00 2001 From: Ekaterina Pavlova Date: Mon, 12 Aug 2024 09:54:19 +0300 Subject: [PATCH] services: add new service for fetching blocks from NeoFS Close #3496 Signed-off-by: Ekaterina Pavlova --- cli/server/server.go | 9 +- internal/fakechain/fakechain.go | 6 + pkg/config/application_config.go | 1 + pkg/config/neofs.go | 14 ++ pkg/core/storage/dbconfig/store_config.go | 11 + pkg/network/server.go | 15 ++ pkg/network/server_config.go | 4 + pkg/services/blockfetcher/blockfetcher.go | 214 ++++++++++++++++++ .../blockfetcher/blockfetcher_test.go | 44 ++++ .../services/blockfetcher}/dump.go | 16 +- .../services/blockfetcher}/dump_test.go | 2 +- pkg/services/oracle/neofs/neofs.go | 66 ++++++ pkg/services/oracle/neofs/neofs_test.go | 24 ++ 13 files changed, 413 insertions(+), 13 deletions(-) create mode 100644 pkg/config/neofs.go create mode 100644 pkg/services/blockfetcher/blockfetcher.go create mode 100644 pkg/services/blockfetcher/blockfetcher_test.go rename {cli/server => pkg/services/blockfetcher}/dump.go (88%) rename {cli/server => pkg/services/blockfetcher}/dump_test.go (96%) diff --git a/cli/server/server.go b/cli/server/server.go index 67787ced9b..ceda4901c7 100644 --- a/cli/server/server.go +++ b/cli/server/server.go @@ -22,6 +22,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/io" "github.com/nspcc-dev/neo-go/pkg/network" + "github.com/nspcc-dev/neo-go/pkg/services/blockfetcher" "github.com/nspcc-dev/neo-go/pkg/services/metrics" "github.com/nspcc-dev/neo-go/pkg/services/notary" "github.com/nspcc-dev/neo-go/pkg/services/oracle" @@ -287,9 +288,9 @@ func restoreDB(ctx *cli.Context) error { gctx := newGraceContext() var lastIndex uint32 - dump := newDump() + dump := blockfetcher.NewDump() defer func() { - _ = dump.tryPersist(dumpDir, lastIndex) + _ = dump.TryPersist(dumpDir, lastIndex) }() var f = func(b *block.Block) error { @@ -312,10 +313,10 @@ func restoreDB(ctx *cli.Context) error { if batch == nil && b.Index == 0 { return nil } - dump.add(b.Index, batch) + dump.Add(b.Index, batch) lastIndex = b.Index if b.Index%1000 == 0 { - if err := dump.tryPersist(dumpDir, b.Index); err != nil { + if err := dump.TryPersist(dumpDir, b.Index); err != nil { return fmt.Errorf("can't dump storage to file: %w", err) } } diff --git a/internal/fakechain/fakechain.go b/internal/fakechain/fakechain.go index a2553b61a1..5420bef155 100644 --- a/internal/fakechain/fakechain.go +++ b/internal/fakechain/fakechain.go @@ -13,6 +13,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/core/mpt" "github.com/nspcc-dev/neo-go/pkg/core/native" "github.com/nspcc-dev/neo-go/pkg/core/state" + "github.com/nspcc-dev/neo-go/pkg/core/storage" "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/crypto/hash" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" @@ -405,6 +406,11 @@ func (chain *FakeChain) UnsubscribeFromTransactions(ch chan *transaction.Transac panic("TODO") } +// LastBatch returns last persisted storage batch. +func (chain *FakeChain) LastBatch() *storage.MemBatch { + panic("TODO") +} + // AddBlock implements the StateSync interface. func (s *FakeStateSync) AddBlock(block *block.Block) error { panic("TODO") diff --git a/pkg/config/application_config.go b/pkg/config/application_config.go index 2e94961d7b..75bb2e9bb3 100644 --- a/pkg/config/application_config.go +++ b/pkg/config/application_config.go @@ -29,6 +29,7 @@ type ApplicationConfiguration struct { Oracle OracleConfiguration `yaml:"Oracle"` P2PNotary P2PNotary `yaml:"P2PNotary"` StateRoot StateRoot `yaml:"StateRoot"` + NeoFS NeoFS `yaml:"NeoFS"` } // EqualsButServices returns true when the o is the same as a except for services diff --git a/pkg/config/neofs.go b/pkg/config/neofs.go new file mode 100644 index 0000000000..1d8f97f619 --- /dev/null +++ b/pkg/config/neofs.go @@ -0,0 +1,14 @@ +package config + +import "time" + +// NeoFS represents the configuration for the blockfetcher service. +type ( + NeoFS struct { + Nodes []string `yaml:"Nodes"` + Timeout time.Duration `yaml:"Timeout"` + ContainerID string `yaml:"ContainerID"` + DumpDir string `yaml:"DumpDir"` + Restore bool `yaml:"Restore"` + } +) diff --git a/pkg/core/storage/dbconfig/store_config.go b/pkg/core/storage/dbconfig/store_config.go index 3d23c63589..fd4108bebc 100644 --- a/pkg/core/storage/dbconfig/store_config.go +++ b/pkg/core/storage/dbconfig/store_config.go @@ -22,3 +22,14 @@ type ( ReadOnly bool `yaml:"ReadOnly"` } ) + +// FilePath returns the file path for the DB. In case "inmemory" DB is used, it returns an empty string. +func (db DBConfiguration) FilePath() string { + switch db.Type { + case "boltdb": + return db.BoltDBOptions.FilePath + case "leveldb": + return db.LevelDBOptions.DataDirectoryPath + } + return "" +} diff --git a/pkg/network/server.go b/pkg/network/server.go index 34da851b52..65158b3705 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -21,6 +21,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/core/mempool" "github.com/nspcc-dev/neo-go/pkg/core/mempoolevent" "github.com/nspcc-dev/neo-go/pkg/core/mpt" + "github.com/nspcc-dev/neo-go/pkg/core/storage" "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/encoding/address" "github.com/nspcc-dev/neo-go/pkg/io" @@ -28,6 +29,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/network/capability" "github.com/nspcc-dev/neo-go/pkg/network/extpool" "github.com/nspcc-dev/neo-go/pkg/network/payload" + "github.com/nspcc-dev/neo-go/pkg/services/blockfetcher" "github.com/nspcc-dev/neo-go/pkg/util" "go.uber.org/zap" ) @@ -77,6 +79,7 @@ type ( RegisterPostBlock(f func(func(*transaction.Transaction, *mempool.Pool, bool) bool, *mempool.Pool, *block.Block)) SubscribeForBlocks(ch chan *block.Block) UnsubscribeFromBlocks(ch chan *block.Block) + LastBatch() *storage.MemBatch } // Service is a service abstraction (oracle, state root, consensus, etc). @@ -107,6 +110,7 @@ type ( notaryRequestPool *mempool.Pool extensiblePool *extpool.Pool notaryFeer NotaryFeer + blockFetcher *blockfetcher.Service serviceLock sync.RWMutex services map[string]Service @@ -220,6 +224,8 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy s.bSyncQueue = bqueue.New(s.stateSync, log, nil, updateBlockQueueLenMetric) + s.blockFetcher = blockfetcher.New(chain, s.NeoFSCfg, log) + if s.MinPeers < 0 { s.log.Info("bad MinPeers configured, using the default value", zap.Int("configured", s.MinPeers), @@ -295,6 +301,15 @@ func (s *Server) Start() { go s.relayBlocksLoop() go s.bQueue.Run() go s.bSyncQueue.Run() + if s.ServerConfig.NeoFSCfg.Restore { + done := make(chan struct{}) + s.blockFetcher.Start( + func() { + s.log.Info("BlockFetcher service finished") + close(done) + }) + <-done + } for _, tr := range s.transports { go tr.Accept() } diff --git a/pkg/network/server_config.go b/pkg/network/server_config.go index c0f1e727f6..2da10968e0 100644 --- a/pkg/network/server_config.go +++ b/pkg/network/server_config.go @@ -76,6 +76,9 @@ type ( // BroadcastFactor is the factor (0-100) for fan-out optimization. BroadcastFactor int + + // NeoFSCfg is NeoFS configuration. + NeoFSCfg config.NeoFS } ) @@ -107,6 +110,7 @@ func NewServerConfig(cfg config.Config) (ServerConfig, error) { StateRootCfg: appConfig.StateRoot, ExtensiblePoolSize: appConfig.P2P.ExtensiblePoolSize, BroadcastFactor: appConfig.P2P.BroadcastFactor, + NeoFSCfg: appConfig.NeoFS, } return c, nil } diff --git a/pkg/services/blockfetcher/blockfetcher.go b/pkg/services/blockfetcher/blockfetcher.go new file mode 100644 index 0000000000..7bb4ff8873 --- /dev/null +++ b/pkg/services/blockfetcher/blockfetcher.go @@ -0,0 +1,214 @@ +package blockfetcher + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/url" + "time" + + "github.com/nspcc-dev/neo-go/pkg/config" + "github.com/nspcc-dev/neo-go/pkg/core/block" + "github.com/nspcc-dev/neo-go/pkg/core/chaindump" + "github.com/nspcc-dev/neo-go/pkg/core/storage" + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" + gio "github.com/nspcc-dev/neo-go/pkg/io" + "github.com/nspcc-dev/neo-go/pkg/services/oracle/neofs" + "github.com/nspcc-dev/neo-go/pkg/util" + "github.com/nspcc-dev/neofs-sdk-go/client" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "go.uber.org/zap" +) + +// Ledger is an interface to Blockchain sufficient for Service. +type Ledger interface { + LastBatch() *storage.MemBatch + AddBlock(block *block.Block) error + GetBlock(hash util.Uint256) (*block.Block, error) + GetConfig() config.Blockchain + GetHeaderHash(u uint32) util.Uint256 +} + +type Service struct { + chain Ledger + log *zap.Logger + client *client.Client + quit chan bool + containerID cid.ID + Timeout time.Duration + Nodes []string + dumpDir string +} + +// ConfigObject represents the configuration object in NeoFS. +type ConfigObject struct { + HashOIDs []string `json:"hash_oid"` + BlockOIDs []string `json:"block_oid"` + Height uint32 `json:"height"` + Timestamp int64 `json:"timestamp"` + Step uint32 `json:"step"` +} + +// New creates a new BlockFetcherService. +func New(chain Ledger, cfg config.NeoFS, logger *zap.Logger) *Service { + neofsClient, err := client.New(client.PrmInit{}) + if err != nil { + logger.Error("Failed to create NeoFS client", zap.Error(err)) + return nil + } + var containerID cid.ID + err = containerID.DecodeString(cfg.ContainerID) + if err != nil { + logger.Error("Failed to decode container ID", zap.Error(err)) + return nil + } + return &Service{ + chain: chain, + log: logger, + client: neofsClient, + quit: make(chan bool), + dumpDir: cfg.DumpDir, + containerID: containerID, + Nodes: cfg.Nodes, + Timeout: cfg.Timeout, + } +} + +// Name implements the core.Service interface. +func (bfs *Service) Name() string { + return "BlockFetcherService" +} + +// Start implements the core.Service interface. +func (bfs *Service) Start(done func()) { + bfs.log.Info("Starting Block Fetcher Service") + go func() { + err := bfs.fetchData() + if err != nil { + close(bfs.quit) + return + } + done() + }() +} + +// Shutdown implements the core.Service interface. +func (bfs *Service) Shutdown() { + bfs.log.Info("Shutting down Block Fetcher Service") + close(bfs.quit) +} + +func (bfs *Service) fetchData() error { + select { + case <-bfs.quit: + return nil + default: + prm := client.PrmObjectSearch{} + filters := object.NewSearchFilters() + filters.AddFilter("type", "config", object.MatchStringEqual) + prm.SetFilters(filters) + + configOid, err := bfs.search(prm) + if err != nil { + bfs.log.Error("Failed to fetch object IDs", zap.Error(err)) + return err + } + cfg, err := bfs.get(configOid[0].String()) + if err != nil { + bfs.log.Error("Failed to fetch config object", zap.Error(err)) + return err + } + var configObj ConfigObject + err = json.Unmarshal(cfg, &configObj) + if err != nil { + bfs.log.Error("Failed to unmarshal configuration data", zap.Error(err)) + return err + } + + for _, HashOID := range configObj.HashOIDs { + _, err = bfs.get(HashOID) + if err != nil { + bfs.log.Error("Failed to fetch hash", zap.Error(err)) + return err + } + } + + for _, blockOID := range configObj.BlockOIDs { + data, err := bfs.get(blockOID) + if err != nil { + bfs.log.Error(fmt.Sprintf("Failed to fetch block %s", blockOID), zap.Error(err)) + return err + } + err = bfs.ProcessBlock(data, configObj.Step) + if err != nil { + bfs.log.Error(fmt.Sprintf("Failed to process block %s", blockOID), zap.Error(err)) + return err + } + } + } + close(bfs.quit) + return nil +} + +func (bfs *Service) ProcessBlock(data []byte, count uint32) error { + br := gio.NewBinReaderFromBuf(data) + dump := NewDump() + var lastIndex uint32 + + err := chaindump.Restore(bfs.chain, br, 0, count, func(b *block.Block) error { + batch := bfs.chain.LastBatch() + if batch != nil { + dump.Add(b.Index, batch) + lastIndex = b.Index + if b.Index%1000 == 0 { + if err := dump.TryPersist(bfs.dumpDir, lastIndex); err != nil { + return fmt.Errorf("can't dump storage to file: %w", err) + } + } + } + return nil + }) + if err != nil { + return fmt.Errorf("failed to restore blocks: %w", err) + } + + if err = dump.TryPersist(bfs.dumpDir, lastIndex); err != nil { + return fmt.Errorf("final persistence failed: %w", err) + } + return nil +} + +func (bfs *Service) get(oid string) ([]byte, error) { + privateKey, err := keys.NewPrivateKey() + if err != nil { + return nil, err + } + ctx, cancel := context.WithTimeout(context.Background(), bfs.Timeout) + defer cancel() + u, err := url.Parse(fmt.Sprintf("neofs:%s/%s", bfs.containerID, oid)) + if err != nil { + return nil, err + } + rc, err := neofs.GetWithClient(ctx, bfs.client, privateKey, u, bfs.Nodes[0]) + if err != nil { + return nil, err + } + data, err := io.ReadAll(rc) + if err != nil { + return nil, err + } + return data, nil +} + +func (bfs *Service) search(prm client.PrmObjectSearch) ([]oid.ID, error) { + privateKey, err := keys.NewPrivateKey() + if err != nil { + return nil, err + } + ctx, cancel := context.WithTimeout(context.Background(), bfs.Timeout) + defer cancel() + return neofs.ObjectSearch(ctx, bfs.client, privateKey, bfs.containerID, bfs.Nodes[0], prm) +} diff --git a/pkg/services/blockfetcher/blockfetcher_test.go b/pkg/services/blockfetcher/blockfetcher_test.go new file mode 100644 index 0000000000..0a844d88db --- /dev/null +++ b/pkg/services/blockfetcher/blockfetcher_test.go @@ -0,0 +1,44 @@ +package blockfetcher_test + +import ( + "testing" + + "github.com/nspcc-dev/neo-go/internal/basicchain" + "github.com/nspcc-dev/neo-go/pkg/config" + "github.com/nspcc-dev/neo-go/pkg/core/chaindump" + gio "github.com/nspcc-dev/neo-go/pkg/io" + "github.com/nspcc-dev/neo-go/pkg/neotest" + "github.com/nspcc-dev/neo-go/pkg/neotest/chain" + "github.com/nspcc-dev/neo-go/pkg/services/blockfetcher" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" +) + +func TestProcessBlock(t *testing.T) { + bc, validators, committee := chain.NewMultiWithCustomConfig(t, + func(c *config.Blockchain) { + c.P2PSigExtensions = true + }) + e := neotest.NewExecutor(t, bc, validators, committee) + + basicchain.Init(t, "../../../", e) + require.True(t, bc.BlockHeight() > 5) + + w := gio.NewBufBinWriter() + require.NoError(t, chaindump.Dump(bc, w.BinWriter, 0, bc.BlockHeight()+1)) + require.NoError(t, w.Err) + buf := w.Bytes() + bc2, _, _ := chain.NewMultiWithCustomConfig(t, func(c *config.Blockchain) { + c.P2PSigExtensions = true + }) + cfg := config.NeoFS{ + ContainerID: "9iVfUg8aDHKjPC4LhQXEkVUM4HDkR7UCXYLs8NQwYfSG", + Nodes: []string{"https://test"}, + Timeout: 10, + DumpDir: "./", + } + serv := blockfetcher.New(bc2, cfg, zaptest.NewLogger(t)) + err := serv.ProcessBlock(buf, bc.BlockHeight()+1) + require.NoError(t, err) + require.Equal(t, bc.BlockHeight(), bc2.BlockHeight()) +} diff --git a/cli/server/dump.go b/pkg/services/blockfetcher/dump.go similarity index 88% rename from cli/server/dump.go rename to pkg/services/blockfetcher/dump.go index d7b6a18034..215de23981 100644 --- a/cli/server/dump.go +++ b/pkg/services/blockfetcher/dump.go @@ -1,4 +1,4 @@ -package server +package blockfetcher import ( "encoding/json" @@ -10,7 +10,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/core/storage/dboper" ) -type dump []blockDump +type Dump []blockDump type blockDump struct { Block uint32 `json:"block"` @@ -18,11 +18,11 @@ type blockDump struct { Storage []dboper.Operation `json:"storage"` } -func newDump() *dump { - return new(dump) +func NewDump() *Dump { + return new(Dump) } -func (d *dump) add(index uint32, batch *storage.MemBatch) { +func (d *Dump) Add(index uint32, batch *storage.MemBatch) { ops := storage.BatchToOperations(batch) *d = append(*d, blockDump{ Block: index, @@ -31,7 +31,7 @@ func (d *dump) add(index uint32, batch *storage.MemBatch) { }) } -func (d *dump) tryPersist(prefix string, index uint32) error { +func (d *Dump) TryPersist(prefix string, index uint32) error { if len(*d) == 0 { return nil } @@ -62,12 +62,12 @@ func (d *dump) tryPersist(prefix string, index uint32) error { return nil } -func readFile(path string) (*dump, error) { +func readFile(path string) (*Dump, error) { data, err := os.ReadFile(path) if err != nil { return nil, err } - d := newDump() + d := NewDump() if err := json.Unmarshal(data, d); err != nil { return nil, err } diff --git a/cli/server/dump_test.go b/pkg/services/blockfetcher/dump_test.go similarity index 96% rename from cli/server/dump_test.go rename to pkg/services/blockfetcher/dump_test.go index 74877e44b0..8e919d30b8 100644 --- a/cli/server/dump_test.go +++ b/pkg/services/blockfetcher/dump_test.go @@ -1,4 +1,4 @@ -package server +package blockfetcher import ( "path/filepath" diff --git a/pkg/services/oracle/neofs/neofs.go b/pkg/services/oracle/neofs/neofs.go index 27351ba8b2..a133be2591 100644 --- a/pkg/services/oracle/neofs/neofs.go +++ b/pkg/services/oracle/neofs/neofs.go @@ -82,6 +82,41 @@ func Get(ctx context.Context, priv *keys.PrivateKey, u *url.URL, addr string) (i return res, err } +// GetWithClient returns a neofs object from the provided url using the provided client. +// URI scheme is "neofs://". +// If Command is not provided, full object is requested. +func GetWithClient(ctx context.Context, c *client.Client, priv *keys.PrivateKey, u *url.URL, addr string) (io.ReadCloser, error) { + objectAddr, ps, err := parseNeoFSURL(u) + if err != nil { + return nil, err + } + var ( + res = clientCloseWrapper{c: c} + prmd client.PrmDial + ) + prmd.SetServerURI(addr) + prmd.SetContext(ctx) + err = c.Dial(prmd) //nolint:contextcheck // contextcheck: Function `Dial->Balance->SendUnary->Init->setNeoFSAPIServer` should pass the context parameter + if err != nil { + return res, err + } + + var s = user.NewAutoIDSignerRFC6979(priv.PrivateKey) + switch { + case len(ps) == 0 || ps[0] == "": // Get request + res.ReadCloser, err = getPayload(ctx, s, c, objectAddr) + case ps[0] == rangeCmd: + res.ReadCloser, err = getRange(ctx, s, c, objectAddr, ps[1:]...) + case ps[0] == headerCmd: + res.ReadCloser, err = getHeader(ctx, s, c, objectAddr) + case ps[0] == hashCmd: + res.ReadCloser, err = getHash(ctx, s, c, objectAddr, ps[1:]...) + default: + err = ErrInvalidCommand + } + return res, err +} + type clientCloseWrapper struct { io.ReadCloser c *client.Client @@ -220,3 +255,34 @@ func parseRange(s string) (*object.Range, error) { r.SetLength(length) return r, nil } + +// ObjectSearch returns a list of object IDs from the provided container. +func ObjectSearch(ctx context.Context, c *client.Client, priv *keys.PrivateKey, containerID cid.ID, addr string, prm client.PrmObjectSearch) ([]oid.ID, error) { + var ( + prmd client.PrmDial + s = user.NewAutoIDSignerRFC6979(priv.PrivateKey) + objectIDs []oid.ID + ) + + prmd.SetServerURI(addr) + prmd.SetContext(ctx) + err := c.Dial(prmd) + if err != nil { + return nil, err + } + + reader, err := c.ObjectSearchInit(ctx, containerID, s, prm) + if err != nil { + return nil, fmt.Errorf("failed to initiate object search: %w", err) + } + defer reader.Close() + + err = reader.Iterate(func(oid oid.ID) bool { + objectIDs = append(objectIDs, oid) + return false + }) + if err != nil { + return nil, fmt.Errorf("error during object ID iteration: %w", err) + } + return objectIDs, nil +} diff --git a/pkg/services/oracle/neofs/neofs_test.go b/pkg/services/oracle/neofs/neofs_test.go index aa3308a32e..03eeaea755 100644 --- a/pkg/services/oracle/neofs/neofs_test.go +++ b/pkg/services/oracle/neofs/neofs_test.go @@ -1,9 +1,14 @@ package neofs import ( + "context" "net/url" "testing" + "time" + "github.com/nspcc-dev/neo-go/pkg/crypto/keys" + "github.com/nspcc-dev/neofs-sdk-go/client" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/stretchr/testify/require" ) @@ -72,3 +77,22 @@ func TestParseNeoFSURL(t *testing.T) { }) } } + +func TestFetchListFromNeoFS(t *testing.T) { + privateKey, err := keys.NewPrivateKey() + require.NoError(t, err) + + cStr := "9iVfUg8aDHKjPC4LhQXEkVUM4HDkR7UCXYLs8NQwYfSG" + var containerID cid.ID + if err := containerID.DecodeString(cStr); err != nil { + require.NoError(t, err) + } + neofsClient, err := client.New(client.PrmInit{}) + require.NoError(t, err) + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + prm := client.PrmObjectSearch{} + _, err = ObjectSearch(ctx, neofsClient, privateKey, containerID, "st1.t5.fs.neo.org:8080", prm) + require.NoError(t, err) +}