diff --git a/.gitignore b/.gitignore index 4fa4654fa0..b6c39d8a2d 100644 --- a/.gitignore +++ b/.gitignore @@ -52,3 +52,4 @@ gengen dockerfile coverage_unit.txt coverage_venus_shared.txt +venus-shared/blockstore/splitstore/test_data/base_583_bafy2bzaceazuutcexhvwkyyesszohrkjjzk2zgknasgs7bb7zgfnwghtnu5w2.db/LOCK diff --git a/app/node/env.go b/app/node/env.go index 1e7812f801..91e37f4843 100644 --- a/app/node/env.go +++ b/app/node/env.go @@ -7,6 +7,7 @@ import ( "github.com/filecoin-project/venus/app/submodule/storagenetworking" v1api "github.com/filecoin-project/venus/venus-shared/api/chain/v1" + "github.com/filecoin-project/venus/venus-shared/blockstore/splitstore" ) // Env is the environment for command API handlers. @@ -22,10 +23,11 @@ type Env struct { MingingAPI v1api.IMining MessagePoolAPI v1api.IMessagePool - MarketAPI v1api.IMarket - PaychAPI v1api.IPaychan - CommonAPI v1api.ICommon - EthAPI v1api.IETH + MarketAPI v1api.IMarket + PaychAPI v1api.IPaychan + CommonAPI v1api.ICommon + EthAPI v1api.IETH + SplitstoreAPI splitstore.Controller } var _ cmds.Environment = (*Env)(nil) diff --git a/app/node/node.go b/app/node/node.go index 6134eef64a..61012ba203 100644 --- a/app/node/node.go +++ b/app/node/node.go @@ -33,6 +33,7 @@ import ( _ "github.com/filecoin-project/venus/pkg/crypto/secp" // enable secp signatures metricsPKG "github.com/filecoin-project/venus/pkg/metrics" "github.com/filecoin-project/venus/pkg/repo" + "github.com/filecoin-project/venus/venus-shared/blockstore/splitstore" "github.com/ipfs-force-community/metrics" "github.com/ipfs-force-community/sophon-auth/jwtclient" cmds "github.com/ipfs/go-ipfs-cmds" @@ -379,7 +380,45 @@ func (node *Node) createServerEnv(ctx context.Context) *Env { MarketAPI: node.market.API(), CommonAPI: node.common, EthAPI: node.eth.API(), + SplitstoreAPI: &RepoKeeper{repo: node.repo}, } return &env } + +type RepoKeeper struct { + repo repo.Repo +} + +var _ splitstore.Controller = (*RepoKeeper)(nil) + +func (r *RepoKeeper) Rollback() error { + ds := r.repo.Datastore() + if ds == nil { + return fmt.Errorf("no blockstore found") + } + + rb, ok := ds.(splitstore.Controller) + if !ok { + return fmt.Errorf("split store was disabled") + } + err := rb.Rollback() + if err != nil { + return fmt.Errorf("rollback splitstore: %w", err) + } + + // rewrite config + cfg := r.repo.Config() + if cfg == nil { + return fmt.Errorf("no config found") + } + if cfg.Datastore.Type == "splitstore" { + cfg.Datastore.Type = "badgerds" + err = r.repo.ReplaceConfig(cfg) + if err != nil { + return fmt.Errorf("replace config: %w", err) + } + } + + return nil +} diff --git a/app/submodule/chain/chain_submodule.go b/app/submodule/chain/chain_submodule.go index 54c988d909..3186d6741b 100644 --- a/app/submodule/chain/chain_submodule.go +++ b/app/submodule/chain/chain_submodule.go @@ -20,6 +20,7 @@ import ( "github.com/filecoin-project/venus/pkg/vmsupport" v0api "github.com/filecoin-project/venus/venus-shared/api/chain/v0" v1api "github.com/filecoin-project/venus/venus-shared/api/chain/v1" + "github.com/filecoin-project/venus/venus-shared/blockstore/splitstore" "github.com/filecoin-project/venus/venus-shared/types" ) @@ -55,6 +56,7 @@ func NewChainSubmodule(ctx context.Context, ) (*ChainSubmodule, error) { repo := config.Repo() // initialize chain store + basebs := repo.Datastore() chainStore := chain.NewStore(repo.ChainDatastore(), repo.Datastore(), config.GenesisCid(), circulatiingSupplyCalculator) // drand genBlk, err := chainStore.GetGenesisBlock(context.TODO()) @@ -78,6 +80,11 @@ func NewChainSubmodule(ctx context.Context, waiter := chain.NewWaiter(chainStore, messageStore, config.Repo().Datastore(), cbor.NewCborStore(config.Repo().Datastore())) + // SubscribeHeadChanges for splitstore + if ss, ok := basebs.(*splitstore.Splitstore); ok { + chainStore.SubscribeHeadChanges(ss.HeadChange) + } + store := &ChainSubmodule{ ChainReader: chainStore, MessageStore: messageStore, diff --git a/app/submodule/mining/mining_api.go b/app/submodule/mining/mining_api.go index 880e14be0f..3a30e3e621 100644 --- a/app/submodule/mining/mining_api.go +++ b/app/submodule/mining/mining_api.go @@ -34,6 +34,11 @@ type MiningAPI struct { //nolint // MinerGetBaseInfo get current miner information func (miningAPI *MiningAPI) MinerGetBaseInfo(ctx context.Context, maddr address.Address, round abi.ChainEpoch, tsk types.TipSetKey) (*types.MiningBaseInfo, error) { + localLog := log.With( + "maddr", maddr, + "round", round, + ) + chainStore := miningAPI.Ming.ChainModule.ChainReader ts, err := chainStore.GetTipSet(ctx, tsk) if err != nil { @@ -79,6 +84,7 @@ func (miningAPI *MiningAPI) MinerGetBaseInfo(ctx context.Context, maddr address. return nil, fmt.Errorf("loading miner in current state: %v", err) } + localLog.Infof("miner actor(%s) not found at look back tipset %s", maddr, ts.Key()) return nil, nil } if err != nil { @@ -106,6 +112,7 @@ func (miningAPI *MiningAPI) MinerGetBaseInfo(ctx context.Context, maddr address. } if len(xsectors) == 0 { + localLog.Info("no sectors found for winning post") return nil, nil } diff --git a/app/submodule/mining/mining_submodule.go b/app/submodule/mining/mining_submodule.go index 6a9b3e035f..0812f30477 100644 --- a/app/submodule/mining/mining_submodule.go +++ b/app/submodule/mining/mining_submodule.go @@ -11,8 +11,12 @@ import ( "github.com/filecoin-project/venus/pkg/util/ffiwrapper" v0api "github.com/filecoin-project/venus/venus-shared/api/chain/v0" v1api "github.com/filecoin-project/venus/venus-shared/api/chain/v1" + + logging "github.com/ipfs/go-log/v2" ) +var log = logging.Logger("mining") + type miningConfig interface { Repo() repo.Repo Verifier() ffiwrapper.Verifier diff --git a/cmd/main.go b/cmd/main.go index fe14ccd37a..73ebbcb6ff 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -138,6 +138,7 @@ TOOL COMMANDS version - Show venus version information seed - Seal sectors for genesis miner fetch - Fetch proving parameters + splitstore - Manage splitstore `, }, Options: []cmds.Option{ @@ -167,21 +168,22 @@ var rootSubcmdsLocal = map[string]*cmds.Command{ // all top level commands, available on daemon. set during init() to avoid configuration loops. var rootSubcmdsDaemon = map[string]*cmds.Command{ - "chain": chainCmd, - "sync": syncCmd, - "drand": drandCmd, - "inspect": inspectCmd, - "log": logCmd, - "send": msgSendCmd, - "mpool": mpoolCmd, - "swarm": swarmCmd, - "wallet": walletCmd, - "version": versionCmd, - "state": stateCmd, - "miner": minerCmd, - "paych": paychCmd, - "info": infoCmd, - "evm": evmCmd, + "chain": chainCmd, + "sync": syncCmd, + "drand": drandCmd, + "inspect": inspectCmd, + "log": logCmd, + "send": msgSendCmd, + "mpool": mpoolCmd, + "swarm": swarmCmd, + "wallet": walletCmd, + "version": versionCmd, + "state": stateCmd, + "miner": minerCmd, + "paych": paychCmd, + "info": infoCmd, + "evm": evmCmd, + "splitstore": splitstoreCmd, } func init() { diff --git a/cmd/splitstore.go b/cmd/splitstore.go new file mode 100644 index 0000000000..13ba0c6e18 --- /dev/null +++ b/cmd/splitstore.go @@ -0,0 +1,30 @@ +package cmd + +import ( + "fmt" + + "github.com/filecoin-project/venus/app/node" + cmds "github.com/ipfs/go-ipfs-cmds" +) + +var splitstoreCmd = &cmds.Command{ + Helptext: cmds.HelpText{ + Tagline: "Manage splitstore", + }, + Subcommands: map[string]*cmds.Command{ + "rollback": splitstoreRollbackCmd, + }, +} + +var splitstoreRollbackCmd = &cmds.Command{ + Helptext: cmds.HelpText{ + Tagline: "Rollback splitstore to badger store", + }, + PreRun: func(req *cmds.Request, env cmds.Environment) error { + fmt.Println("It may take a while to transfer block ...") + return nil + }, + Run: func(req *cmds.Request, re cmds.ResponseEmitter, env cmds.Environment) error { + return env.(*node.Env).SplitstoreAPI.Rollback() + }, +} diff --git a/main.go b/main.go index 4bf46db5e7..f213c719d1 100644 --- a/main.go +++ b/main.go @@ -31,6 +31,10 @@ func main() { _ = logging.SetLogLevel("pubsub", "error") _ = logging.SetLogLevel("relay", "error") _ = logging.SetLogLevel("dht/RtRefreshManager", "error") + // todo: remove it + _ = logging.SetLogLevel("splitstore", "debug") + _ = logging.SetLogLevel("chainsync.syncer", "debug") + } else { level, err := logging.LevelFromString(lvl) if err != nil { diff --git a/pkg/chain/waiter.go b/pkg/chain/waiter.go index 854e4c1451..13c90f6dcf 100644 --- a/pkg/chain/waiter.go +++ b/pkg/chain/waiter.go @@ -43,7 +43,6 @@ type Waiter struct { chainReader waiterChainReader messageProvider MessageProvider cst cbor.IpldStore - bs bstore.Blockstore Stmgr IStmgr } @@ -55,7 +54,6 @@ func NewWaiter(chainStore waiterChainReader, messages MessageProvider, bs bstore return &Waiter{ chainReader: chainStore, cst: cst, - bs: bs, messageProvider: messages, } } diff --git a/pkg/config/config.go b/pkg/config/config.go index a747ab9f83..d2b617cac4 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -17,6 +17,7 @@ import ( "github.com/pkg/errors" "github.com/filecoin-project/venus/pkg/constants" + "github.com/filecoin-project/venus/venus-shared/actors/policy" "github.com/filecoin-project/venus/venus-shared/types" ) @@ -78,8 +79,12 @@ func newDefaultAPIConfig() *APIConfig { // DatastoreConfig holds all the configuration options for the datastore. // TODO: use the advanced datastore configuration from ipfs type DatastoreConfig struct { - Type string `json:"type"` - Path string `json:"path"` + Type string `json:"type"` + Path string `json:"path"` + SplitstoreSize int64 `json:"splitstoreSize"` + SplitstoreCount int `json:"splitstoreCount"` + SplitstoreInitProtectEpoch int64 `json:"splitstoreInitProtectEpoch"` + SplitstoreSoftDelete bool `json:"splitstoreSoftDelete"` } // Validators hold the list of validation functions for each configuration @@ -93,8 +98,10 @@ var Validators = map[string]func(string, string) error{ func newDefaultDatastoreConfig() *DatastoreConfig { return &DatastoreConfig{ - Type: "badgerds", - Path: "badger", + Type: "badgerds", + Path: "badger", + SplitstoreSize: int64(5 * policy.ChainFinality), + SplitstoreCount: 3, } } diff --git a/pkg/consensus/processor.go b/pkg/consensus/processor.go index dd59dbd8dc..de507def34 100644 --- a/pkg/consensus/processor.go +++ b/pkg/consensus/processor.go @@ -11,6 +11,7 @@ import ( "github.com/filecoin-project/venus/pkg/fvm" "github.com/filecoin-project/venus/pkg/vm/vmcontext" "github.com/filecoin-project/venus/venus-shared/actors/builtin/reward" + "github.com/filecoin-project/venus/venus-shared/blockstore" "github.com/filecoin-project/go-address" amt4 "github.com/filecoin-project/go-amt-ipld/v4" @@ -95,6 +96,11 @@ func (p *DefaultProcessor) ApplyBlocks(ctx context.Context, ) makeVM := func(base cid.Cid, e abi.ChainEpoch, timestamp uint64) (vm.Interface, error) { + bs := vmOpts.Bsstore + if ts.Height() == 1185554 { + bs = blockstore.NewLogStore("./bs.log", bs) + // _, _ = bs.Has(ctx, base) + } vmOpt := vm.VmOption{ CircSupplyCalculator: vmOpts.CircSupplyCalculator, LookbackStateGetter: vmOpts.LookbackStateGetter, @@ -107,7 +113,7 @@ func (p *DefaultProcessor) ApplyBlocks(ctx context.Context, Timestamp: timestamp, GasPriceSchedule: vmOpts.GasPriceSchedule, PRoot: base, - Bsstore: vmOpts.Bsstore, + Bsstore: bs, SysCallsImpl: vmOpts.SysCallsImpl, TipSetGetter: vmOpts.TipSetGetter, Tracing: vmOpts.Tracing, diff --git a/pkg/fork/fork.go b/pkg/fork/fork.go index ccc15b68e2..d83a007666 100644 --- a/pkg/fork/fork.go +++ b/pkg/fork/fork.go @@ -60,7 +60,6 @@ import ( "github.com/filecoin-project/venus/pkg/config" "github.com/filecoin-project/venus/pkg/constants" - "github.com/filecoin-project/venus/pkg/repo" vmstate "github.com/filecoin-project/venus/pkg/state/tree" "github.com/filecoin-project/venus/venus-shared/actors" "github.com/filecoin-project/venus/venus-shared/actors/adt" @@ -569,8 +568,6 @@ type ChainFork struct { // upgrade param networkType types.NetworkType forkUpgrade *config.ForkUpgradeConfig - - metadataDs repo.Datastore } func NewChainFork(ctx context.Context, @@ -586,7 +583,6 @@ func NewChainFork(ctx context.Context, ipldstore: ipldstore, networkType: networkParams.NetworkType, forkUpgrade: networkParams.ForkUpgradeParam, - metadataDs: metadataDs, } // If we have upgrades, make sure they're in-order and make sense. diff --git a/pkg/repo/fsrepo.go b/pkg/repo/fsrepo.go index c64e0c6322..0d32bf040a 100644 --- a/pkg/repo/fsrepo.go +++ b/pkg/repo/fsrepo.go @@ -10,9 +10,11 @@ import ( "sync" "time" + "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/venus/pkg/repo/fskeystore" blockstoreutil "github.com/filecoin-project/venus/venus-shared/blockstore" + "github.com/filecoin-project/venus/venus-shared/blockstore/splitstore" bstore "github.com/ipfs/boxo/blockstore" badgerds "github.com/ipfs/go-ds-badger2" @@ -36,6 +38,7 @@ const ( versionFilename = "version" walletDatastorePrefix = "wallet" chainDatastorePrefix = "chain" + splitstorePrefix = "splitstore" metaDatastorePrefix = "metadata" paychDatastorePrefix = "paych" snapshotFilenamePrefix = "snapshot" @@ -52,7 +55,7 @@ type FSRepo struct { // lk protects the config file lk sync.RWMutex - ds *blockstoreutil.BadgerBlockstore + ds blockstoreutil.Blockstore keystore fskeystore.Keystore walletDs Datastore chainDs Datastore @@ -65,6 +68,8 @@ type FSRepo struct { sqlPath string sqlErr error sqlOnce sync.Once + + closers []func() error } var _ Repo = (*FSRepo)(nil) @@ -209,6 +214,14 @@ func (r *FSRepo) loadFromDisk() error { return errors.Wrap(err, "failed to load config file") } + if err := r.openChainDatastore(); err != nil { + return errors.Wrap(err, "failed to open chain datastore") + } + + if err := r.openMetaDatastore(); err != nil { + return errors.Wrap(err, "failed to open metadata datastore") + } + if err := r.openDatastore(); err != nil { return errors.Wrap(err, "failed to open datastore") } @@ -221,14 +234,6 @@ func (r *FSRepo) loadFromDisk() error { return errors.Wrap(err, "failed to open wallet datastore") } - if err := r.openChainDatastore(); err != nil { - return errors.Wrap(err, "failed to open chain datastore") - } - - if err := r.openMetaDatastore(); err != nil { - return errors.Wrap(err, "failed to open metadata datastore") - } - if err := r.openPaychDataStore(); err != nil { return errors.Wrap(err, "failed to open paych datastore") } @@ -297,30 +302,13 @@ func (r *FSRepo) Keystore() fskeystore.Keystore { // Close closes the repo. func (r *FSRepo) Close() error { - if err := r.ds.Close(); err != nil { - return errors.Wrap(err, "failed to close datastore") - } - - if err := r.walletDs.Close(); err != nil { - return errors.Wrap(err, "failed to close wallet datastore") - } - - if err := r.chainDs.Close(); err != nil { - return errors.Wrap(err, "failed to close chain datastore") - } - - if err := r.metaDs.Close(); err != nil { - return errors.Wrap(err, "failed to close meta datastore") - } - - if err := r.paychDs.Close(); err != nil { - return errors.Wrap(err, "failed to close paych datastore") + // todo: use new way to close others + for _, closer := range r.closers { + if err := closer(); err != nil { + return fmt.Errorf("close fs repo: %w", err) + } } - /*if err := r.marketDs.Close(); err != nil { - return errors.Wrap(err, "failed to close market datastore") - }*/ - if err := r.removeAPIFile(); err != nil { return errors.Wrap(err, "error removing API file") } @@ -376,19 +364,40 @@ func (r *FSRepo) readVersion() (uint, error) { } func (r *FSRepo) openDatastore() error { + path := filepath.Join(r.path, Config.Datastore.Path) + opts, err := blockstoreutil.BadgerBlockstoreOptions(path, false) + if err != nil { + return err + } + opts.Prefix = bstore.BlockPrefix.String() + ds, err := blockstoreutil.Open(opts) + if err != nil { + return err + } + + r.closers = append(r.closers, ds.Close) + switch Config.Datastore.Type { case "badgerds": - path := filepath.Join(r.path, Config.Datastore.Path) - opts, err := blockstoreutil.BadgerBlockstoreOptions(path, false) - if err != nil { - return err + r.ds = ds + case "splitstore": + if r.chainDs == nil { + return fmt.Errorf("meta data store is nil") + } + + ssPath := filepath.Join(r.path, splitstorePrefix) + splitstore.SoftDelete = Config.Datastore.SplitstoreSoftDelete + opt := splitstore.Option{ + MaxLayerCount: Config.Datastore.SplitstoreCount, + LayerSize: abi.ChainEpoch(Config.Datastore.SplitstoreSize), + InitSyncProtect: abi.ChainEpoch(Config.Datastore.SplitstoreInitProtectEpoch), } - opts.Prefix = bstore.BlockPrefix.String() - ds, err := blockstoreutil.Open(opts) + splitstore, err := splitstore.NewSplitstore(ssPath, ds, opt) if err != nil { - return err + return fmt.Errorf("build splitstore: %w", err) } - r.ds = ds + + r.ds = splitstore default: return fmt.Errorf("unknown datastore type in config: %s", Config.Datastore.Type) } @@ -416,6 +425,7 @@ func (r *FSRepo) openChainDatastore() error { } r.chainDs = ds + r.closers = append(r.closers, ds.Close) return nil } @@ -427,6 +437,7 @@ func (r *FSRepo) openMetaDatastore() error { } r.metaDs = ds + r.closers = append(r.closers, ds.Close) return nil } @@ -437,6 +448,7 @@ func (r *FSRepo) openPaychDataStore() error { if err != nil { return err } + r.closers = append(r.closers, r.paychDs.Close) return nil } @@ -448,6 +460,7 @@ func (r *FSRepo) openWalletDatastore() error { } r.walletDs = ds + r.closers = append(r.closers, ds.Close) return nil } diff --git a/venus-shared/blockstore/badger.go b/venus-shared/blockstore/badger.go index 1c36566fce..6f8c436a94 100644 --- a/venus-shared/blockstore/badger.go +++ b/venus-shared/blockstore/badger.go @@ -478,6 +478,39 @@ func (b *BadgerBlockstore) HashOnRead(_ bool) { log.Warnf("called HashOnRead on badger blockstore; function not supported; ignoring") } +func (b *BadgerBlockstore) ForEachKey(f func(cid.Cid) error) error { + if atomic.LoadInt64(&b.state) != stateOpen { + return ErrBlockstoreClosed + } + + txn := b.DB.NewTransaction(false) + defer txn.Discard() + opts := badger.IteratorOptions{PrefetchSize: 100} + iter := txn.NewIterator(opts) + defer iter.Close() + + for iter.Rewind(); iter.Valid(); iter.Next() { + if atomic.LoadInt64(&b.state) != stateOpen { + // open iterators will run even after the database is closed... + return ErrBlockstoreClosed + } + k := iter.Item().Key() + // need to convert to key.Key using key.KeyFromDsKey. + bk, err := dshelp.BinaryFromDsKey(datastore.RawKey(string(k))) + if err != nil { + log.Warnf("error parsing key from binary: %s", err) + continue + } + cidKey := cid.NewCidV1(cid.Raw, bk) + err = f(cidKey) + if err != nil { + return err + } + } + + return nil +} + func (b *BadgerBlockstore) ConvertKey(cid cid.Cid) datastore.Key { key := dshelp.MultihashToDsKey(cid.Hash()) return b.keyTransform.ConvertKey(key) diff --git a/venus-shared/blockstore/log_store.go b/venus-shared/blockstore/log_store.go new file mode 100644 index 0000000000..c6573495f0 --- /dev/null +++ b/venus-shared/blockstore/log_store.go @@ -0,0 +1,102 @@ +package blockstore + +import ( + "context" + llog "log" + "os" + + blocks "github.com/ipfs/go-block-format" + cid "github.com/ipfs/go-cid" +) + +type LogStore struct { + logger *llog.Logger + bs Blockstore +} + +// DeleteMany implements Blockstore. +func (l *LogStore) DeleteMany(ctx context.Context, cids []cid.Cid) error { + for _, c := range cids { + l.log(c, "delete", "") + } + return l.bs.DeleteMany(ctx, cids) +} + +// Flush implements Blockstore. +func (l *LogStore) Flush(ctx context.Context) error { + l.logger.Println("flush") + return l.bs.Flush(ctx) +} + +// View implements Blockstore. +func (l *LogStore) View(ctx context.Context, cid cid.Cid, callback func([]byte) error) error { + l.log(cid, "view", "") + return l.bs.View(ctx, cid, callback) +} + +// AllKeysChan implements blockstore.Blockstore. +func (l *LogStore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { + return l.bs.AllKeysChan(ctx) +} + +// DeleteBlock implements blockstore.Blockstore. +func (l *LogStore) DeleteBlock(ctx context.Context, c cid.Cid) error { + l.log(c, "delete", "") + return l.bs.DeleteBlock(ctx, c) +} + +// Get implements blockstore.Blockstore. +func (l *LogStore) Get(ctx context.Context, c cid.Cid) (blocks.Block, error) { + l.log(c, "get", "") + return l.bs.Get(ctx, c) +} + +// GetSize implements blockstore.Blockstore. +func (l *LogStore) GetSize(ctx context.Context, c cid.Cid) (int, error) { + l.log(c, "getsize", "") + return l.bs.GetSize(ctx, c) +} + +// Has implements blockstore.Blockstore. +func (l *LogStore) Has(ctx context.Context, c cid.Cid) (bool, error) { + l.log(c, "has", "") + return l.bs.Has(ctx, c) +} + +// HashOnRead implements blockstore.Blockstore. +func (l *LogStore) HashOnRead(enabled bool) { + l.bs.HashOnRead(enabled) +} + +// Put implements blockstore.Blockstore. +func (l *LogStore) Put(ctx context.Context, b blocks.Block) error { + l.log(b.Cid(), "put", "") + return l.bs.Put(ctx, b) +} + +// DeleteMany implements blockstore.Blockstore. +func (l *LogStore) PutMany(ctx context.Context, bs []blocks.Block) error { + for _, b := range bs { + l.log(b.Cid(), "put", "") + } + return l.bs.PutMany(ctx, bs) +} + +var _ Blockstore = (*LogStore)(nil) + +func NewLogStore(path string, bs Blockstore) *LogStore { + file, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) + if err != nil { + panic(err) + } + logger := llog.New(file, "", llog.LstdFlags) + logger.Println("log store opened") + return &LogStore{ + logger: logger, + bs: bs, + } +} + +func (l *LogStore) log(c cid.Cid, op, msg string) { + l.logger.Printf("%s %s %s", c.String(), op, msg) +} diff --git a/venus-shared/blockstore/splitstore/compose_store.go b/venus-shared/blockstore/splitstore/compose_store.go new file mode 100644 index 0000000000..0ca0149d6a --- /dev/null +++ b/venus-shared/blockstore/splitstore/compose_store.go @@ -0,0 +1,248 @@ +package splitstore + +import ( + "context" + "fmt" + + "github.com/filecoin-project/venus/venus-shared/blockstore" + "github.com/filecoin-project/venus/venus-shared/logging" + blocks "github.com/ipfs/go-block-format" + cid "github.com/ipfs/go-cid" + ipld "github.com/ipfs/go-ipld-format" +) + +var log = logging.New("splitstore") + +// ComposeStore compose two block store into one +// Read: firstly from primary store, if not exist in primary will try secondary and rewrite to primary store +// Write: write to primary store only +// Delete: Delete from all store +type ComposeStore struct { + shouldSync bool + primary blockstore.Blockstore + secondary blockstore.Blockstore +} + +// NewComposeStore create a new ComposeStore with a list of blockstore +// low priority come first +func NewComposeStore(bs ...blockstore.Blockstore) blockstore.Blockstore { + switch len(bs) { + case 0: + return nil + case 1: + return bs[0] + } + return Compose(bs...) +} + +func Compose(bs ...blockstore.Blockstore) *ComposeStore { + switch len(bs) { + case 0: + return nil + case 1: + return &ComposeStore{ + shouldSync: false, + primary: bs[0], + secondary: bs[0], + } + case 2: + return &ComposeStore{ + shouldSync: true, + primary: bs[1], + secondary: bs[0], + } + } + + ret := &ComposeStore{ + shouldSync: false, + primary: bs[1], + secondary: bs[0], + } + for i := 2; i < len(bs); i++ { + ret = &ComposeStore{ + shouldSync: i == len(bs)-1, + primary: bs[i], + secondary: ret, + } + } + + return ret +} + +// AllKeysChan implements blockstore.Blockstore. +func (cs *ComposeStore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { + ctx, cancel := context.WithCancel(ctx) + + primaryCh, err := cs.primary.AllKeysChan(ctx) + if err != nil { + cancel() + return nil, err + } + + secondaryCh, err := cs.secondary.AllKeysChan(ctx) + if err != nil { + cancel() + return nil, err + } + + seen := cid.NewSet() + ch := make(chan cid.Cid, 8) // buffer is arbitrary, just enough to avoid context switches + go func() { + defer cancel() + defer close(ch) + + for _, in := range []<-chan cid.Cid{primaryCh, secondaryCh} { + for c := range in { + // ensure we only emit each key once + if !seen.Visit(c) { + continue + } + + select { + case ch <- c: + case <-ctx.Done(): + return + } + } + } + }() + + return ch, nil +} + +// DeleteBlock implements blockstore.Blockstore. +func (cs *ComposeStore) DeleteBlock(ctx context.Context, c cid.Cid) error { + if err := cs.secondary.DeleteBlock(ctx, c); err != nil { + return err + } + return cs.primary.DeleteBlock(ctx, c) +} + +// DeleteMany implements blockstore.Blockstore. +func (cs *ComposeStore) DeleteMany(ctx context.Context, cids []cid.Cid) error { + // primary and secondly can be both incomplete + // don't try to batch delete + return fmt.Errorf("delete many not implemented on compose store; don't do this") +} + +// Flush implements blockstore.Blockstore. +func (cs *ComposeStore) Flush(ctx context.Context) error { + if err := cs.secondary.Flush(ctx); err != nil { + return err + } + return cs.primary.Flush(ctx) +} + +// Get implements blockstore.Blockstore. +func (cs *ComposeStore) Get(ctx context.Context, c cid.Cid) (blocks.Block, error) { + has, err := cs.primary.Has(ctx, c) + if err != nil { + return nil, err + } + if has { + return cs.primary.Get(ctx, c) + } + + b, err := cs.secondary.Get(ctx, c) + if err != nil { + return nil, err + } + + cs.sync(ctx, c, b) + + return b, nil +} + +// GetSize implements blockstore.Blockstore. +func (cs *ComposeStore) GetSize(ctx context.Context, c cid.Cid) (int, error) { + s, err := cs.primary.GetSize(ctx, c) + if err == nil { + return s, nil + } + if !ipld.IsNotFound(err) { + return 0, err + } + + cs.sync(ctx, c, nil) + return cs.secondary.GetSize(ctx, c) +} + +// Has implements blockstore.Blockstore. +func (cs *ComposeStore) Has(ctx context.Context, c cid.Cid) (bool, error) { + has, err := cs.primary.Has(ctx, c) + if err != nil { + return false, err + } + if !has { + has, err = cs.secondary.Has(ctx, c) + if err != nil { + return false, err + } + if has { + cs.sync(ctx, c, nil) + } + } + + return has, nil +} + +// HashOnRead implements blockstore.Blockstore. +func (cs *ComposeStore) HashOnRead(enabled bool) { + cs.primary.HashOnRead(enabled) + cs.secondary.HashOnRead(enabled) +} + +// Put implements blockstore.Blockstore. +func (cs *ComposeStore) Put(ctx context.Context, b blocks.Block) error { + return cs.primary.Put(ctx, b) +} + +// PutMany implements blockstore.Blockstore. +func (cs *ComposeStore) PutMany(ctx context.Context, bs []blocks.Block) error { + return cs.primary.PutMany(ctx, bs) +} + +// View implements blockstore.Blockstore. +func (cs *ComposeStore) View(ctx context.Context, c cid.Cid, cb func([]byte) error) error { + err := cs.primary.View(ctx, c, cb) + if err == nil { + return nil + } + if !ipld.IsNotFound(err) { + return err + } + cs.sync(ctx, c, nil) + return cs.secondary.View(ctx, c, cb) + +} + +// sync sync block from secondly to primary +func (cs *ComposeStore) sync(ctx context.Context, c cid.Cid, b blocks.Block) { + if !cs.shouldSync { + return + } + + go func() { + select { + case <-ctx.Done(): + return + default: + } + + if b == nil { + var err error + b, err = cs.secondary.Get(ctx, c) + if err != nil { + // it is ok to ignore the err + return + } + } + + err := cs.primary.Put(ctx, b) + if err != nil { + log.Warnf("put block(%s) to primary store: %w", b.Cid(), err) + } + }() +} + +var _ blockstore.Blockstore = (*ComposeStore)(nil) diff --git a/venus-shared/blockstore/splitstore/compose_store_test.go b/venus-shared/blockstore/splitstore/compose_store_test.go new file mode 100644 index 0000000000..dd4143f309 --- /dev/null +++ b/venus-shared/blockstore/splitstore/compose_store_test.go @@ -0,0 +1,380 @@ +package splitstore + +import ( + "context" + "path/filepath" + "testing" + "time" + + "github.com/filecoin-project/venus/venus-shared/blockstore" + blocks "github.com/ipfs/go-block-format" + cid "github.com/ipfs/go-cid" + ipld "github.com/ipfs/go-ipld-format" + "github.com/stretchr/testify/require" +) + +func TestNewComposeStore(t *testing.T) { + s := NewComposeStore(nil, nil) + require.True(t, s.(*ComposeStore).shouldSync) + + s = NewComposeStore(nil, nil, nil) + require.True(t, s.(*ComposeStore).shouldSync) + require.False(t, s.(*ComposeStore).secondary.(*ComposeStore).shouldSync) + require.Nil(t, s.(*ComposeStore).secondary.(*ComposeStore).secondary) +} + +func TestComposeStoreGet(t *testing.T) { + ctx := context.Background() + composeStore, primaryStore, secondaryStore, tertiaryStore := getBlockstore(t) + blocksInPrimary := []blocks.Block{ + newBlock("b1"), + newBlock("b2"), + newBlock("b3"), + } + blocksInSecondary := []blocks.Block{ + newBlock("b3"), + newBlock("b4"), + } + blocksInTertiary := []blocks.Block{ + newBlock("b4"), + newBlock("b5"), + } + blockNotExist := newBlock("b6") + + for _, b := range blocksInPrimary { + require.NoError(t, primaryStore.Put(ctx, b)) + } + for _, b := range blocksInSecondary { + require.NoError(t, secondaryStore.Put(ctx, b)) + } + for _, b := range blocksInTertiary { + require.NoError(t, tertiaryStore.Put(ctx, b)) + } + + t.Run("Get", func(t *testing.T) { + for _, b := range blocksInPrimary { + block, err := composeStore.Get(ctx, b.Cid()) + require.NoError(t, err) + require.Equal(t, b.RawData(), block.RawData()) + } + for _, b := range blocksInSecondary { + block, err := composeStore.Get(ctx, b.Cid()) + require.NoError(t, err) + require.Equal(t, b.RawData(), block.RawData()) + } + for _, b := range blocksInTertiary { + block, err := composeStore.Get(ctx, b.Cid()) + require.NoError(t, err) + require.Equal(t, b.RawData(), block.RawData()) + } + + _, err := composeStore.Get(ctx, blockNotExist.Cid()) + require.True(t, ipld.IsNotFound(err)) + + // test for sync + // wait for sync (switch goroutine) + time.Sleep(5 * time.Millisecond) + for _, b := range blocksInTertiary { + block, err := primaryStore.Get(ctx, b.Cid()) + require.NoError(t, err) + require.Equal(t, b.RawData(), block.RawData()) + } + }) +} + +func TestComposeStoreGetSize(t *testing.T) { + ctx := context.Background() + composeStore, primaryStore, secondaryStore, _ := getBlockstore(t) + blocksInPrimary := []blocks.Block{ + newBlock("b1"), + newBlock("b2"), + newBlock("b3"), + } + blocksInSecondary := []blocks.Block{ + newBlock("b3"), + newBlock("b4"), + } + blockNotExist := newBlock("b5") + + for _, b := range blocksInPrimary { + require.NoError(t, primaryStore.Put(ctx, b)) + } + for _, b := range blocksInSecondary { + require.NoError(t, secondaryStore.Put(ctx, b)) + } + + t.Run("GetSize", func(t *testing.T) { + for _, b := range blocksInPrimary { + sz, err := composeStore.GetSize(ctx, b.Cid()) + require.NoError(t, err) + require.Equal(t, len(b.RawData()), sz) + } + for _, b := range blocksInSecondary { + sz, err := composeStore.GetSize(ctx, b.Cid()) + require.NoError(t, err) + require.Equal(t, len(b.RawData()), sz) + } + + _, err := composeStore.GetSize(ctx, blockNotExist.Cid()) + require.True(t, ipld.IsNotFound(err)) + + // test for sync + // wait for sync (switch goroutine) + time.Sleep(1 * time.Millisecond) + for _, b := range blocksInSecondary { + sz, err := primaryStore.GetSize(ctx, b.Cid()) + require.NoError(t, err) + require.Equal(t, len(b.RawData()), sz) + } + }) +} + +func TestComposeStoreView(t *testing.T) { + ctx := context.Background() + composeStore, primaryStore, secondaryStore, _ := getBlockstore(t) + blocksInPrimary := []blocks.Block{ + newBlock("b1"), + newBlock("b2"), + newBlock("b3"), + } + blocksInSecondary := []blocks.Block{ + newBlock("b3"), + newBlock("b4"), + } + blockNotExist := newBlock("b5") + + for _, b := range blocksInPrimary { + require.NoError(t, primaryStore.Put(ctx, b)) + } + for _, b := range blocksInSecondary { + require.NoError(t, secondaryStore.Put(ctx, b)) + } + + t.Run("View", func(t *testing.T) { + for _, b := range blocksInPrimary { + err := composeStore.View(ctx, b.Cid(), func(bByte []byte) error { + require.Equal(t, b.RawData(), bByte) + return nil + }) + require.NoError(t, err) + } + for _, b := range blocksInSecondary { + err := composeStore.View(ctx, b.Cid(), func(bByte []byte) error { + require.Equal(t, b.RawData(), bByte) + return nil + }) + require.NoError(t, err) + } + + err := composeStore.View(ctx, blockNotExist.Cid(), func(b []byte) error { + require.Nil(t, b) + return nil + }) + require.True(t, ipld.IsNotFound(err)) + + // test for sync + for _, b := range blocksInSecondary { + err := composeStore.View(ctx, b.Cid(), func(bByte []byte) error { + require.Equal(t, b.RawData(), bByte) + return nil + }) + require.NoError(t, err) + } + }) +} + +func TestComposeStoreHas(t *testing.T) { + ctx := context.Background() + composeStore, primaryStore, secondaryStore, _ := getBlockstore(t) + blocksInPrimary := []blocks.Block{ + newBlock("b1"), + newBlock("b2"), + newBlock("b3"), + } + blocksInSecondary := []blocks.Block{ + newBlock("b3"), + newBlock("b4"), + } + blockNotExist := newBlock("b5") + + for _, b := range blocksInPrimary { + require.NoError(t, primaryStore.Put(ctx, b)) + } + for _, b := range blocksInSecondary { + require.NoError(t, secondaryStore.Put(ctx, b)) + } + + t.Run("Has", func(t *testing.T) { + for _, b := range blocksInPrimary { + h, err := composeStore.Has(ctx, b.Cid()) + require.NoError(t, err) + require.True(t, h) + } + for _, b := range blocksInSecondary { + h, err := composeStore.Has(ctx, b.Cid()) + require.NoError(t, err) + require.True(t, h) + } + + h, err := composeStore.Has(ctx, blockNotExist.Cid()) + require.NoError(t, err) + require.False(t, h) + + // test for sync + // wait for sync (switch goroutine) + time.Sleep(1 * time.Millisecond) + for _, b := range blocksInSecondary { + h, err := primaryStore.Has(ctx, b.Cid()) + require.NoError(t, err) + require.True(t, h) + } + }) +} + +func TestComposeStoreAllKeysChan(t *testing.T) { + ctx := context.Background() + composeStore, primaryStore, secondaryStore, tertiaryStore := getBlockstore(t) + blocksInPrimary := []blocks.Block{ + newBlock("b1"), + newBlock("b2"), + newBlock("b3"), + } + blocksInSecondary := []blocks.Block{ + newBlock("b3"), + newBlock("b4"), + } + blocksInTertiary := []blocks.Block{ + newBlock("b5"), + newBlock("b6"), + } + + for _, b := range blocksInPrimary { + require.NoError(t, primaryStore.Put(ctx, b)) + } + for _, b := range blocksInSecondary { + require.NoError(t, secondaryStore.Put(ctx, b)) + } + for _, b := range blocksInTertiary { + require.NoError(t, tertiaryStore.Put(ctx, b)) + } + + t.Run("All keys chan", func(t *testing.T) { + ch, err := composeStore.AllKeysChan(ctx) + require.NoError(t, err) + require.NotNil(t, ch) + + cidGet := cid.NewSet() + for cid := range ch { + require.True(t, cidGet.Visit(cid)) + } + for _, b := range blocksInPrimary { + require.False(t, cidGet.Has(b.Cid())) + } + for _, b := range blocksInSecondary { + require.False(t, cidGet.Has(b.Cid())) + } + + require.Equal(t, 6, cidGet.Len()) + }) +} + +func TestComposeStorePut(t *testing.T) { + ctx := context.Background() + composeStore, primaryStore, secondaryStore, _ := getBlockstore(t) + blockNotExist := newBlock("b5") + + t.Run("Put", func(t *testing.T) { + require.NoError(t, composeStore.Put(ctx, blockNotExist)) + h, err := composeStore.Has(ctx, blockNotExist.Cid()) + require.NoError(t, err) + require.True(t, h) + + h, err = primaryStore.Has(ctx, blockNotExist.Cid()) + require.NoError(t, err) + require.True(t, h) + + h, err = secondaryStore.Has(ctx, blockNotExist.Cid()) + require.NoError(t, err) + require.False(t, h) + }) +} + +func TestComposeStoreDelete(t *testing.T) { + ctx := context.Background() + composeStore, primaryStore, secondaryStore, tertiaryStore := getBlockstore(t) + blocksInPrimary := []blocks.Block{ + newBlock("b1"), + newBlock("b2"), + newBlock("b3"), + } + blocksInSecondary := []blocks.Block{ + newBlock("b3"), + newBlock("b4"), + } + blocksInTertiary := []blocks.Block{ + newBlock("b3"), + newBlock("b6"), + } + blockNotExist := newBlock("b5") + + for _, b := range blocksInPrimary { + require.NoError(t, primaryStore.Put(ctx, b)) + } + for _, b := range blocksInSecondary { + require.NoError(t, secondaryStore.Put(ctx, b)) + } + for _, b := range blocksInTertiary { + require.NoError(t, tertiaryStore.Put(ctx, b)) + } + + t.Run("Delete", func(t *testing.T) { + for _, b := range blocksInPrimary { + err := composeStore.DeleteBlock(ctx, b.Cid()) + require.NoError(t, err) + + h, err := composeStore.Has(ctx, b.Cid()) + require.NoError(t, err) + require.False(t, h) + } + for _, b := range blocksInSecondary { + err := composeStore.DeleteBlock(ctx, b.Cid()) + require.NoError(t, err) + + h, err := composeStore.Has(ctx, b.Cid()) + require.NoError(t, err) + require.False(t, h) + } + + err := composeStore.DeleteBlock(ctx, blockNotExist.Cid()) + require.NoError(t, err) + }) +} + +func getBlockstore(t *testing.T) (compose, primary, secondary, tertiary blockstore.Blockstore) { + tempDir := t.TempDir() + + primaryPath := filepath.Join(tempDir, "primary") + secondaryPath := filepath.Join(tempDir, "secondary") + tertiaryPath := filepath.Join(tempDir, "tertiary") + + optPri, err := blockstore.BadgerBlockstoreOptions(primaryPath, false) + require.NoError(t, err) + dsPri, err := blockstore.Open(optPri) + require.NoError(t, err) + + optSnd, err := blockstore.BadgerBlockstoreOptions(secondaryPath, false) + require.NoError(t, err) + dsSnd, err := blockstore.Open(optSnd) + require.NoError(t, err) + + optTertiary, err := blockstore.BadgerBlockstoreOptions(tertiaryPath, false) + require.NoError(t, err) + dsTertiary, err := blockstore.Open(optTertiary) + require.NoError(t, err) + + return NewComposeStore(dsTertiary, dsSnd, dsPri), dsPri, dsSnd, dsTertiary +} + +func newBlock(s string) blocks.Block { + return blocks.NewBlock([]byte(s)) +} diff --git a/venus-shared/blockstore/splitstore/safe_sliece.go b/venus-shared/blockstore/splitstore/safe_sliece.go new file mode 100644 index 0000000000..e584b482f7 --- /dev/null +++ b/venus-shared/blockstore/splitstore/safe_sliece.go @@ -0,0 +1,131 @@ +package splitstore + +import "sync" + +type SafeSlice[T any] struct { + data []T + mux sync.RWMutex +} + +func (s *SafeSlice[T]) Append(v ...T) { + s.mux.Lock() + defer s.mux.Unlock() + s.data = append(s.data, v...) +} + +func (s *SafeSlice[T]) At(idx int) T { + s.mux.RLock() + defer s.mux.RUnlock() + return s.data[idx] +} + +func (s *SafeSlice[T]) Len() int { + s.mux.RLock() + defer s.mux.RUnlock() + return len(s.data) +} + +func (s *SafeSlice[T]) Prototype() []T { + s.mux.RLock() + defer s.mux.RUnlock() + return s.data +} + +func (s *SafeSlice[T]) First() T { + s.mux.RLock() + defer s.mux.RUnlock() + return s.data[0] +} + +func (s *SafeSlice[T]) Last() T { + s.mux.RLock() + defer s.mux.RUnlock() + return s.data[len(s.data)-1] +} + +func (s *SafeSlice[T]) Delete(idx int) { + s.mux.Lock() + defer s.mux.Unlock() + s.data = append(s.data[:idx], s.data[idx+1:]...) +} + +func (s *SafeSlice[T]) Slice(start, end int) *SafeSlice[T] { + s.mux.RLock() + defer s.mux.RUnlock() + return &SafeSlice[T]{ + data: s.data[start:end], + } +} + +// Range calls f sequentially for each element present in the slice. +// If f returns false, range stops the iteration. +// Should not modify the slice during the iteration. (modify the element is ok) +func (s *SafeSlice[T]) ForEach(f func(int, T) bool) { + s.mux.RLock() + defer s.mux.RUnlock() + for i, v := range s.data { + if !f(i, v) { + return + } + } +} + +func NewSafeSlice[T any](data []T) *SafeSlice[T] { + return &SafeSlice[T]{ + data: data, + } +} + +type InfinityChannel[T any] struct { + data []T + in chan T + out chan T +} + +func NewInfinityChannel[T any]() (in chan<- T, out <-chan T) { + + ch := &InfinityChannel[T]{ + data: make([]T, 0), + in: make(chan T), + out: make(chan T), + } + + notClosed := true + + go func() { + for { + if notClosed { + var v T + if len(ch.data) > 0 { + select { + case v, notClosed = <-ch.in: + if notClosed { + ch.data = append(ch.data, v) + } + case ch.out <- ch.data[0]: + ch.data = ch.data[1:] + } + } else { + v, notClosed = <-ch.in + ch.data = append(ch.data, v) + } + } else { + for _, v := range ch.data { + ch.out <- v + } + close(ch.out) + break + } + } + }() + + return ch.in, ch.out +} + +func Map[T, R any](data []T, f func(T) R) []R { + result := make([]R, len(data)) + for i, v := range data { + result[i] = f(v) + } + return result +} diff --git a/venus-shared/blockstore/splitstore/splitstore.go b/venus-shared/blockstore/splitstore/splitstore.go new file mode 100644 index 0000000000..18e3942bec --- /dev/null +++ b/venus-shared/blockstore/splitstore/splitstore.go @@ -0,0 +1,699 @@ +package splitstore + +import ( + "context" + "fmt" + "math" + "os" + "path/filepath" + "regexp" + "sort" + "strconv" + "sync" + + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/venus/venus-shared/actors/builtin" + "github.com/filecoin-project/venus/venus-shared/actors/policy" + "github.com/filecoin-project/venus/venus-shared/blockstore" + "github.com/filecoin-project/venus/venus-shared/types" + blocks "github.com/ipfs/go-block-format" + cid "github.com/ipfs/go-cid" + + cbor "github.com/ipfs/go-ipld-cbor" + + bstore "github.com/ipfs/boxo/blockstore" +) + +const ( + CheckPointDuration = 4 * policy.ChainFinality +) + +var SoftDelete = false + +type Closer interface { + Close() error +} + +type Cleaner interface { + Clean() error +} +type CleanFunc func() error + +func (c CleanFunc) Clean() error { + return c() +} + +var _ Cleaner = (*CleanFunc)(nil) + +type BaseKeeper interface { + Base() cid.Cid +} +type BaseFunc func() cid.Cid + +func (b BaseFunc) Base() cid.Cid { + return b() +} + +var _ BaseKeeper = (*BaseFunc)(nil) + +type Layer interface { + blockstore.Blockstore + Cleaner + BaseKeeper +} +type LayerImpl struct { + blockstore.Blockstore + Cleaner + BaseKeeper +} + +type Option struct { + MaxLayerCount int + LayerSize abi.ChainEpoch + InitSyncProtect abi.ChainEpoch +} + +type Controller interface { + Rollback() error +} + +type Splitstore struct { + path string + maxLayerCount int + layerSize abi.ChainEpoch + initSyncProtect abi.ChainEpoch + + initStore blockstore.Blockstore + layers *SafeSlice[Layer] + + epochToAppend abi.ChainEpoch + epochToDrop abi.ChainEpoch + + headTipsetKey cid.Cid + headSubscriber []chan<- cid.Cid + rollbackFlag bool + + checkPoint cid.Cid + + taskQue chan func() + + mux sync.RWMutex +} + +var _ blockstore.Blockstore = (*Splitstore)(nil) +var _ Controller = (*Splitstore)(nil) + +func NewSplitstore(path string, initStore blockstore.Blockstore, opts ...Option) (*Splitstore, error) { + opt := Option{ + MaxLayerCount: 3, + LayerSize: 3 * policy.ChainFinality, + InitSyncProtect: 3 * builtin.EpochsInDay, + } + if len(opts) > 1 { + return nil, fmt.Errorf("splitstore: too many options") + } + if len(opts) == 1 { + if opts[0].MaxLayerCount > 1 { + opt.MaxLayerCount = opts[0].MaxLayerCount + } else { + log.Warnf("splitstore: max layer count must greater than 1, use default value %d", opt.MaxLayerCount) + } + + if opts[0].LayerSize > policy.ChainFinality { + opt.LayerSize = opts[0].LayerSize + } else { + log.Warnf("splitstore: layer size must greater than chain finality, use default value %d", opt.LayerSize) + } + + opt.LayerSize = opts[0].LayerSize + + if opts[0].InitSyncProtect > 0 { + opt.InitSyncProtect = opts[0].InitSyncProtect + } + } + + // check path + stat, err := os.Stat(path) + if err != nil { + if os.IsNotExist(err) { + err = os.MkdirAll(path, 0755) + if err != nil { + return nil, err + } + } + } + if stat != nil && !stat.IsDir() { + return nil, fmt.Errorf("split store path %s is not a directory", path) + } + + ss := &Splitstore{ + path: path, + layers: NewSafeSlice([]Layer{}), + + // must greater than 1 + maxLayerCount: opt.MaxLayerCount, + layerSize: opt.LayerSize, + + epochToAppend: 0, + epochToDrop: math.MaxInt64, + + initStore: initStore, + initSyncProtect: opt.InitSyncProtect, + + taskQue: make(chan func(), 50), + } + + // scan for stores + bs, err := scan(path) + if err != nil { + return nil, err + } + + for i := range bs { + ss.layers.Append(bs[i]) + } + + // update epochToAppend + if ss.layers.Len() > 0 { + ctx := context.Background() + tskCid := ss.layers.Last().Base() + var tsk types.TipSetKey + err = ss.getCbor(ctx, tskCid, &tsk) + if err != nil { + return nil, fmt.Errorf("load store base tsk(%s): %w", tskCid, err) + } + ts, err := ss.getTipset(ctx, &tsk) + if err != nil { + return nil, fmt.Errorf("load store base tipset(%s): %w", tsk, err) + } + ss.epochToAppend = ts.Height() + ss.layerSize + } + + ss.dropRedundantLayer() + + ss.start() + log.Infof("load splitstore with %d layer", ss.layers.Len()) + return ss, nil +} + +// HeadChange subscribe to head change, schedule for new store and delete old +func (ss *Splitstore) HeadChange(_, apply []*types.TipSet) error { + ctx := context.Background() + + for _, ts := range apply { + height := ts.Height() + log := log.With("height", height) + var err error + tsk := ts.Key() + tskCid, err := tsk.Cid() + if err != nil { + log.Errorf("get tipset(%d) key: %s", height, err) + continue + } + ss.setHead(tskCid) + if ss.isRollback() { + // only update head tipset key after rollback + continue + } + + if height >= ss.epochToDrop && ss.layers.Len() > ss.maxLayerCount { + ss.epochToDrop = height + ss.layerSize + ss.dropRedundantLayer() + } + + if height >= ss.epochToAppend { + ss.epochToAppend = height + ss.layerSize + if ss.epochToDrop == math.MaxInt64 { + ss.epochToDrop = height + ss.layerSize/2 + ss.initSyncProtect + } + + // make sure tsk have been persisted + h, err := ss.Has(ctx, tskCid) + if err != nil { + log.Warnf("check tsk(%s) exist: %s", tskCid, err) + } + if !h { + tskBlock, err := tsk.ToStorageBlock() + if err != nil { + log.Errorf("tsk(%s) to storage block: %s", tskCid, err) + } + err = ss.Put(ctx, tskBlock) + if err != nil { + log.Errorf("persist tsk(%s): %s", tskCid, err) + } + } + + // snapshot store before append + snapStore := ss.composeStore() + + layer, err := newLayer(ss.path, int64(height), tskCid) + if err != nil { + log.Errorf("create new layer: %s", err) + continue + } + ss.layers.Append(layer) + log.Infof("append new layer base(%s)", tskCid.String()) + + // backup header to init store + err = backupHeader(ctx, tskCid, NewComposeStore(snapStore, ss.initStore), ss.initStore) + if err != nil { + log.Errorf("append new layer: backup header: %v", err) + } + } + } + return nil +} + +// AllKeysChan implements blockstore.Blockstore. +func (ss *Splitstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { + return ss.composeStore().AllKeysChan(ctx) +} + +// DeleteBlock implements blockstore.Blockstore. +func (ss *Splitstore) DeleteBlock(ctx context.Context, c cid.Cid) error { + return ss.composeStore().DeleteBlock(ctx, c) +} + +// DeleteMany implements blockstore.Blockstore. +func (ss *Splitstore) DeleteMany(ctx context.Context, cids []cid.Cid) error { + return ss.composeStore().DeleteMany(ctx, cids) +} + +// Flush implements blockstore.Blockstore. +func (ss *Splitstore) Flush(ctx context.Context) error { + return ss.composeStore().Flush(ctx) +} + +// Get implements blockstore.Blockstore. +func (ss *Splitstore) Get(ctx context.Context, c cid.Cid) (blocks.Block, error) { + return ss.composeStore().Get(ctx, c) +} + +// GetSize implements blockstore.Blockstore. +func (ss *Splitstore) GetSize(ctx context.Context, c cid.Cid) (int, error) { + return ss.composeStore().GetSize(ctx, c) +} + +// Has implements blockstore.Blockstore. +func (ss *Splitstore) Has(ctx context.Context, c cid.Cid) (bool, error) { + return ss.composeStore().Has(ctx, c) +} + +// HashOnRead implements blockstore.Blockstore. +func (ss *Splitstore) HashOnRead(enabled bool) { + ss.composeStore().HashOnRead(enabled) +} + +// Put implements blockstore.Blockstore. +func (ss *Splitstore) Put(ctx context.Context, b blocks.Block) error { + return ss.composeStore().Put(ctx, b) +} + +// PutMany implements blockstore.Blockstore. +func (ss *Splitstore) PutMany(ctx context.Context, bs []blocks.Block) error { + return ss.composeStore().PutMany(ctx, bs) +} + +// View implements blockstore.Blockstore. +func (ss *Splitstore) View(ctx context.Context, cid cid.Cid, callback func([]byte) error) error { + return ss.composeStore().View(ctx, cid, callback) +} + +// Close close all sub store +func (ss *Splitstore) Close() error { + ss.layers.ForEach(func(i int, store Layer) bool { + if closer, ok := store.(Closer); ok { + err := closer.Close() + if err != nil { + log.Errorf("close %dth store: %s", i, err) + } + } + return true + }) + + if ss.isRollback() { + // try best to clean all store + ss.layers.ForEach(func(i int, store Layer) bool { + err := store.Clean() + if err != nil { + bsCid := store.Base() + log.Errorf("clean store(%s) fail, try to clean manually: %w", bsCid, err) + } + return true + }) + } + return nil +} + +// Rollback rollback splitstore to init store: +// 1. redirect query to init store +// 2. disable store appending and dropping +// 3. transfer blocks back to init store +// 4. clean all stores +func (ss *Splitstore) Rollback() error { + ss.mux.Lock() + ss.rollbackFlag = true + ss.mux.Unlock() + + ctx := context.Background() + + // wait to next head + head := ss.nextHead() + + // transfer state to init store + currentHeight, err := ss.getTipsetHeight(ctx, head) + if err != nil { + return fmt.Errorf("get current height: %w", err) + } + targetHeight := currentHeight - CheckPointDuration + err = WalkUntil(ctx, ss.composeStore(), head, targetHeight) + if err != nil { + return fmt.Errorf("transfer state to init store: %w", err) + } + + // transfer block header to init store + err = backupHeader(ctx, head, ss.composeStore(), ss.initStore) + if err != nil { + return fmt.Errorf("backup header: %w", err) + } + + // drop all store + dropCount := ss.layers.Len() + for i := 0; i < dropCount; i++ { + storeToDrop := ss.layers.First() + // close and clean + if closer, ok := storeToDrop.(Closer); ok { + err := closer.Close() + if err != nil { + log.Errorf("close store(%s): %s", storeToDrop.Base(), err) + } + } + err := storeToDrop.Clean() + if err != nil { + log.Errorf("clean store(%s): %s", storeToDrop.Base(), err) + } + ss.layers.Delete(0) + } + + return nil +} + +// Len return the number of store layers +func (ss *Splitstore) Len() int { + return ss.layers.Len() +} + +func (ss *Splitstore) dropRedundantLayer() { + if ss.layers.Len() <= ss.maxLayerCount { + return + } + + // collect state, then close and clean + ss.taskQue <- func() { + if !(ss.layers.Len() > ss.maxLayerCount) { + log.Warnf("drop last layer: unexpected layer count(%d)", ss.layers.Len()) + return + } + + dropCount := ss.layers.Len() - ss.maxLayerCount + targetStore := ss.layers.At(ss.layers.Len() - 2) + bs := []blockstore.Blockstore{ss.initStore} + ss.layers.Slice(0, ss.layers.Len()-1).ForEach(func(i int, store Layer) bool { + bs = append(bs, store) + return true + }) + snapshot := NewComposeStore(bs...) + + if !ss.checkPoint.Equals(targetStore.Base()) { + var height abi.ChainEpoch + height, err := ss.getTipsetHeight(context.Background(), targetStore.Base()) + if err != nil { + log.Errorf("collect state: get tipset height: %s", err) + return + } + + // collect the last state of target store + err = WalkUntil(context.Background(), snapshot, targetStore.Base(), height-CheckPointDuration) + if err != nil { + log.Errorf("collect state: %s", err) + return + } + ss.checkPoint = targetStore.Base() + } + + for i := 0; i < dropCount; i++ { + storeToDrop := ss.layers.First() + // close and clean + if closer, ok := storeToDrop.(Closer); ok { + err := closer.Close() + if err != nil { + log.Errorf("close store(%s): %s", storeToDrop.Base(), err) + } + } + err := storeToDrop.Clean() + if err != nil { + log.Errorf("clean store(%s): %s", storeToDrop.Base(), err) + } + ss.layers.Delete(0) + } + } +} + +func (ss *Splitstore) composeStore() blockstore.Blockstore { + bs := make([]blockstore.Blockstore, 0, ss.layers.Len()) + ss.layers.ForEach(func(i int, store Layer) bool { + bs = append(bs, store) + return true + }) + + if ss.isRollback() { + bs = append(bs, ss.initStore) + } else { + bs = append([]blockstore.Blockstore{ss.initStore}, bs...) + } + + return NewComposeStore(bs...) +} + +func (ss *Splitstore) start() { + go func() { + for task := range ss.taskQue { + task() + } + }() +} + +func (ss *Splitstore) getCbor(ctx context.Context, c cid.Cid, out interface{}) error { + cst := cbor.NewCborStore(ss.composeStore()) + return cst.Get(ctx, c, out) +} + +func (ss *Splitstore) getTipsetKey(ctx context.Context, c cid.Cid) (*types.TipSetKey, error) { // nolint:unused + var tsk types.TipSetKey + err := ss.getCbor(ctx, c, &tsk) + if err != nil { + return nil, err + } + return &tsk, nil +} + +func (ss *Splitstore) getTipset(ctx context.Context, key *types.TipSetKey) (*types.TipSet, error) { + if key.IsEmpty() { + return nil, fmt.Errorf("get tipset: tipset key is empty") + } + + cids := key.Cids() + blks := make([]*types.BlockHeader, len(cids)) + for idx, c := range cids { + var blk types.BlockHeader + err := ss.getCbor(ctx, c, &blk) + if err != nil { + return nil, err + } + + blks[idx] = &blk + } + + ts, err := types.NewTipSet(blks) + if err != nil { + return nil, err + } + return ts, nil +} + +func (ss *Splitstore) getBlock(ctx context.Context, c cid.Cid) (*types.BlockHeader, error) { // nolint:unused + if !c.Defined() { + return nil, fmt.Errorf("get block: block cid is undefined") + } + + var blk types.BlockHeader + err := ss.getCbor(ctx, c, &blk) + if err != nil { + return nil, err + } + + return &blk, nil +} + +func (ss *Splitstore) getTipsetHeight(ctx context.Context, key cid.Cid) (abi.ChainEpoch, error) { + if !key.Defined() { + return 0, fmt.Errorf("get tipset: tipset key is undefined") + } + + var tsk types.TipSetKey + if err := ss.getCbor(ctx, key, &tsk); err != nil { + return 0, err + } + + if tsk.IsEmpty() { + return 0, fmt.Errorf("get tipset: tipset key is empty") + } + + cids := tsk.Cids() + var blk types.BlockHeader + if err := ss.getCbor(ctx, cids[0], &blk); err != nil { + return 0, err + } + + return blk.Height, nil +} + +func (ss *Splitstore) isRollback() bool { + ss.mux.RLock() + defer ss.mux.RUnlock() + return ss.rollbackFlag +} + +func (ss *Splitstore) currentHead() cid.Cid { // nolint:unused + ss.mux.RLock() + defer ss.mux.RUnlock() + return ss.headTipsetKey +} + +func (ss *Splitstore) setHead(tsk cid.Cid) { + ss.mux.Lock() + ss.headTipsetKey = tsk + subs := ss.headSubscriber + ss.headSubscriber = nil + ss.mux.Unlock() + + for _, sub := range subs { + sub <- tsk + } +} + +func (ss *Splitstore) nextHead() cid.Cid { + ch := make(chan cid.Cid, 1) + + ss.mux.Lock() + ss.headSubscriber = append(ss.headSubscriber, ch) + ss.mux.Unlock() + + ret := <-ch + + return ret +} + +// scan for stores from splitstore path +func scan(path string) ([]*LayerImpl, error) { + // path have been check before + entries, err := os.ReadDir(path) + if err != nil { + return nil, err + } + + var bs []*LayerImpl + heights := make(map[*LayerImpl]int64) + + // name of entry need to match '%d_%s' pattern + for _, entry := range entries { + if !entry.IsDir() { + continue + } + name := entry.Name() + + height, c, err := extractHeightAndCid(name) + if err != nil { + continue + } + + innerStore, err := newLayer(path, height, c) + if err != nil { + return nil, fmt.Errorf("scan splitstore: %w", err) + } + + bs = append(bs, innerStore) + heights[innerStore] = (height) + } + + // sort by height ASC + sort.Slice(bs, func(i, j int) bool { + return heights[bs[i]] < heights[bs[j]] + }) + + return bs, nil +} + +func newLayer(path string, height int64, c cid.Cid) (*LayerImpl, error) { + if !c.Defined() { + return nil, fmt.Errorf("new inner store: cid is undefined") + } + + var store blockstore.Blockstore + storePath := filepath.Join(path, fmt.Sprintf("base_%d_%s.db", height, c)) + + stat, err := os.Stat(storePath) + if os.IsNotExist(err) { + err = os.MkdirAll(storePath, 0755) + if err != nil { + return nil, fmt.Errorf("create store path(%s): %w", storePath, err) + } + } else if err != nil { + return nil, fmt.Errorf("check store path(%s): %w", storePath, err) + } + + if stat != nil && !stat.IsDir() { + return nil, fmt.Errorf("store path(%s) is not a directory", storePath) + } + + opt, _ := blockstore.BadgerBlockstoreOptions(storePath, false) + opt.Prefix = bstore.BlockPrefix.String() + store, err = blockstore.Open(opt) + if err != nil { + return nil, err + } + + return &LayerImpl{ + Blockstore: store, + Cleaner: CleanFunc(func() error { + if SoftDelete { + // soft delete: just rename with suffix '.del' + return os.Rename(storePath, storePath+".del") + } + return os.RemoveAll(storePath) + }), + BaseKeeper: BaseFunc(func() cid.Cid { + return c + }), + }, nil +} + +func extractHeightAndCid(s string) (int64, cid.Cid, error) { + re := regexp.MustCompile(`base_(\d+)_(\w+)\.db$`) + match := re.FindStringSubmatch(s) + if len(match) != 3 { + return 0, cid.Undef, fmt.Errorf("failed to extract height and cid from %s", s) + } + height, err := strconv.ParseInt(match[1], 10, 64) + if err != nil { + return 0, cid.Undef, err + } + if match[2] == "init" { + return height, cid.Undef, nil + } + c, err := cid.Parse(match[2]) + if err != nil { + return 0, cid.Undef, err + } + return height, c, nil +} diff --git a/venus-shared/blockstore/splitstore/splitstore_test.go b/venus-shared/blockstore/splitstore/splitstore_test.go new file mode 100644 index 0000000000..67ae59e13a --- /dev/null +++ b/venus-shared/blockstore/splitstore/splitstore_test.go @@ -0,0 +1,179 @@ +package splitstore + +import ( + "context" + "encoding/json" + "fmt" + "os" + "path/filepath" + "runtime" + "sync" + "testing" + + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/venus/venus-shared/types" + cid "github.com/ipfs/go-cid" + "github.com/stretchr/testify/require" +) + +func TestNewSplitstore(t *testing.T) { + ctx := context.Background() + tempDir := t.TempDir() + mockTipset := []*types.TipSet{ + newTipSet(100), + newTipSet(200), + newTipSet(300), + newTipSet(400), + newTipSet(500), + } + + initStore, err := openStore(tempDir + "/badger.db") + require.NoError(t, err) + + for i, ts := range mockTipset { + tsk := ts.Key() + tskCid, err := tsk.Cid() + require.NoError(t, err) + + s, err := newLayer(tempDir, int64(100+i*100), tskCid) + require.NoError(t, err) + + rawBlk, err := ts.Blocks()[0].ToStorageBlock() + require.NoError(t, err) + err = s.Put(ctx, rawBlk) + require.NoError(t, err) + + rawTsk, err := tsk.ToStorageBlock() + require.NoError(t, err) + err = s.Put(ctx, rawTsk) + require.NoError(t, err) + + if s, ok := s.Blockstore.(Closer); ok { + err := s.Close() + require.NoError(t, err) + } + } + + ss, err := NewSplitstore(tempDir, initStore) + require.NoError(t, err) + require.Equal(t, 5, ss.layers.Len()) +} + +func TestSethead(t *testing.T) { + tempDir := t.TempDir() + + ss, err := NewSplitstore(tempDir, nil) + require.NoError(t, err) + + var c1, c2 cid.Cid + c2 = cid.MustParse("bafy2bzacedqrlux7zaeaoka7b5udzwvdguzf3vqsgxglrdtakislgofte3ehi") + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + c1 = ss.nextHead() + }() + + runtime.Gosched() + ss.setHead(c2) + ss.setHead(c2) + + wg.Wait() + fmt.Println(c1, c2) + require.True(t, c1.Equals(c2)) +} + +func TestScan(t *testing.T) { + ctx := context.Background() + tempDir := t.TempDir() + mockTipset := []*types.TipSet{ + newTipSet(100), + newTipSet(200), + newTipSet(300), + } + + var tskCids []cid.Cid + for i, ts := range mockTipset { + tsk := ts.Key() + tskCid, err := tsk.Cid() + require.NoError(t, err) + tskCids = append(tskCids, tskCid) + + s, err := newLayer(tempDir, int64(100+i*100), tskCid) + require.NoError(t, err) + + rawBlk, err := ts.Blocks()[0].ToStorageBlock() + require.NoError(t, err) + err = s.Put(ctx, rawBlk) + require.NoError(t, err) + + rawTsk, err := tsk.ToStorageBlock() + require.NoError(t, err) + err = s.Put(ctx, rawTsk) + require.NoError(t, err) + + if s, ok := s.Blockstore.(Closer); ok { + err := s.Close() + require.NoError(t, err) + } + } + + // any.db will not be scanned in + err := os.MkdirAll(filepath.Join(tempDir, "any.db"), 0777) + require.NoError(t, err) + + bs, err := scan(tempDir) + require.NoError(t, err) + + t.Run("scan in", func(t *testing.T) { + require.Len(t, bs, len(mockTipset)) + + for i, c := range tskCids { + require.Equal(t, c, bs[i].Base()) + } + }) + + t.Run("clean up", func(t *testing.T) { + for i := range bs { + store := bs[i] + err := store.Clean() + require.NoError(t, err) + } + + bs, err = scan(tempDir) + require.NoError(t, err) + require.Len(t, bs, 0) + }) +} + +func TestExtractHeightAndCid(t *testing.T) { + h, _, err := extractHeightAndCid("base_10_bafy2bzacedyokdqa4mnkercuk5hcufi52w5q2xannm567ij2njiqovgwiicx6.db") + require.NoError(t, err) + require.Equal(t, int64(10), h) + + _, _, err = extractHeightAndCid("base_10_bafy2bzacedyokdqa4mnkercuk5hcufi52w5q2xannm567ij2njiqovgwiicx6") + require.Error(t, err) + + _, _, err = extractHeightAndCid("base_bafy2bzacedyokdqa4mnkercuk5hcufi52w5q2xannm567ij2njiqovgwiicx6") + require.Error(t, err) + + _, _, err = extractHeightAndCid("base_10_bafy2bzacedyokdqa4mnkercuk5hcufi52w5q2xannm567ij2njiqovgwiicx6.db.del") + require.Error(t, err) +} + +const blkRaw = `{"Miner":"t038057","Ticket":{"VRFProof":"kfggWR2GcEbfTuJ20hkAFNRbF7xusDuAQR7XwTjJ2/gc1rwIDmaXbSVxXe4j1njcCBoMhmlYIn9D/BLqQuIOayMHPYvDmOJGc9M27Hwg1UZkiuJmXji+iM/JBNYaOA61"},"ElectionProof":{"WinCount":1,"VRFProof":"tI7cWWM9sGsKc69N9DjN41glaO5Hg7r742H56FPzg7szbhTrxj8kw0OsiJzcPJdiAa6D5jZ1S2WKoLK7lwg2R5zYvCRwwWLGDiExqbqsvqmH5z/e6YGpaD7ghTPRH1SR"},"BeaconEntries":[{"Round":2118576,"Data":"rintMKcvVAslYpn9DcshDBmlPN6hUR+wjvVQSkkVUK5klx1kOSpcDvzODSc2wXFQA7BVbEcXJW/5KLoL0KHx2alLUWDOwxhsIQnAydXdZqG8G76nTIgogthfIMgSGdB2"}],"WinPoStProof":[{"PoStProof":3,"ProofBytes":"t0ZgPHFv0ao9fVZJ/fxbBrzATmOiIv9/IueSyAjtcqEpxqWViqchaaxnz1afwzAbhahpfZsGiGWyc408WYh7Q8u0Aa52KGPmUNtf3pAvxWfsUDMz9QUfhLZVg/p8/PUVC/O/E7RBNq4YPrRK5b6Q8PVwzIOxGOS14ge6ys8Htq+LfNJbcqY676qOYF4lzMuMtQIe3CxMSAEaEBfNpHhAEs83dO6vll9MZKzcXYpNWeqmMIz4xSdF18StQq9vL/Lo"}],"Parents":[{"/":"bafy2bzacecf4wtqz3kgumeowhdulejk3xbfzgibfyhs42x4vx2guqgudem2hg"},{"/":"bafy2bzacebkpxh2k63xreigl6a3ggdr2adwk67b4zw5dddckhqex2tmha6hee"},{"/":"bafy2bzacecor3xq4ykmhhrgq55rdo5w7up65elc4qwx5uwjy25ffynidskbxw"},{"/":"bafy2bzacedr2mztmef65fodqzvyjcdnsgpcjthstseinll4maqg24avnv7ljo"}],"ParentWeight":"21779626255","Height":1164251,"ParentStateRoot":{"/":"bafy2bzacecypgutbewmyop2wfuafvxt7dm7ew4u3ssy2p4rn457f6ynrj2i6a"},"ParentMessageReceipts":{"/":"bafy2bzaceaflsspsxuxew2y4g6o72wp5i2ewp3fcolga6n2plw3gycam7s4lg"},"Messages":{"/":"bafy2bzaceanux5ivzlxzvhqxtwc5vkktcfqepubwtwgv26dowzbl3rtgqk54k"},"BLSAggregate":{"Type":2,"Data":"lQg9jBfYhY2vvjB/RPlWg6i+MBTlH1u0lmdasiab5BigsKAuZSeLNlTGbdoVZhAsDUT59ZdGsMmueHjafygDUN2KLhZoChFf6LQHH42PTSXFlkRVHvmKVz9DDU03FLMB"},"Timestamp":1658988330,"BlockSig":{"Type":2,"Data":"rMOv2tXKqV5VDOq5IQ35cP0cCAzGmaugVr/g5JTrilhAn4LYK0h6ByPL5cX5ONzlDTx9+zYZFteIzaenirZhw7G510Lh0J8lbTLP5X2EX251rEA8dpkPZPcNylzN0r8X"},"ForkSignaling":0,"ParentBaseFee":"100"}` + +func newTipSet(height abi.ChainEpoch) *types.TipSet { + var blk types.BlockHeader + err := json.Unmarshal([]byte(blkRaw), &blk) + if err != nil { + panic(err) + } + blk.Height = height + + ts, err := types.NewTipSet([]*types.BlockHeader{&blk}) + if err != nil { + panic(err) + } + return ts +} diff --git a/venus-shared/blockstore/splitstore/test_data/base_583_bafy2bzaceazuutcexhvwkyyesszohrkjjzk2zgknasgs7bb7zgfnwghtnu5w2.db/000000.vlog b/venus-shared/blockstore/splitstore/test_data/base_583_bafy2bzaceazuutcexhvwkyyesszohrkjjzk2zgknasgs7bb7zgfnwghtnu5w2.db/000000.vlog new file mode 100644 index 0000000000..de398b2911 Binary files /dev/null and b/venus-shared/blockstore/splitstore/test_data/base_583_bafy2bzaceazuutcexhvwkyyesszohrkjjzk2zgknasgs7bb7zgfnwghtnu5w2.db/000000.vlog differ diff --git a/venus-shared/blockstore/splitstore/test_data/base_583_bafy2bzaceazuutcexhvwkyyesszohrkjjzk2zgknasgs7bb7zgfnwghtnu5w2.db/KEYREGISTRY b/venus-shared/blockstore/splitstore/test_data/base_583_bafy2bzaceazuutcexhvwkyyesszohrkjjzk2zgknasgs7bb7zgfnwghtnu5w2.db/KEYREGISTRY new file mode 100644 index 0000000000..21b49b1755 --- /dev/null +++ b/venus-shared/blockstore/splitstore/test_data/base_583_bafy2bzaceazuutcexhvwkyyesszohrkjjzk2zgknasgs7bb7zgfnwghtnu5w2.db/KEYREGISTRY @@ -0,0 +1 @@ +Q`aþÛ°+7³nP #/Hello Badger \ No newline at end of file diff --git a/venus-shared/blockstore/splitstore/test_data/base_583_bafy2bzaceazuutcexhvwkyyesszohrkjjzk2zgknasgs7bb7zgfnwghtnu5w2.db/MANIFEST b/venus-shared/blockstore/splitstore/test_data/base_583_bafy2bzaceazuutcexhvwkyyesszohrkjjzk2zgknasgs7bb7zgfnwghtnu5w2.db/MANIFEST new file mode 100644 index 0000000000..0b5596943f Binary files /dev/null and b/venus-shared/blockstore/splitstore/test_data/base_583_bafy2bzaceazuutcexhvwkyyesszohrkjjzk2zgknasgs7bb7zgfnwghtnu5w2.db/MANIFEST differ diff --git a/venus-shared/blockstore/splitstore/utils_test.go b/venus-shared/blockstore/splitstore/utils_test.go new file mode 100644 index 0000000000..07a08c43e5 --- /dev/null +++ b/venus-shared/blockstore/splitstore/utils_test.go @@ -0,0 +1,31 @@ +package splitstore + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestInfinityChannel(t *testing.T) { + in, out := NewInfinityChannel[int]() + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + i := 0 + + for v := range out { + require.Equal(t, i, v) + i++ + } + }() + + for i := 0; i < 100; i++ { + in <- i + } + + close(in) + wg.Wait() +} diff --git a/venus-shared/blockstore/splitstore/walk.go b/venus-shared/blockstore/splitstore/walk.go new file mode 100644 index 0000000000..5d45f81382 --- /dev/null +++ b/venus-shared/blockstore/splitstore/walk.go @@ -0,0 +1,302 @@ +package splitstore + +import ( + "bytes" + "context" + "fmt" + "runtime" + "sync" + "time" + + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/venus/venus-shared/blockstore" + "github.com/filecoin-project/venus/venus-shared/types" + cid "github.com/ipfs/go-cid" + cbor "github.com/ipfs/go-ipld-cbor" + cbg "github.com/whyrusleeping/cbor-gen" +) + +type Visitor interface { + // Visit return true means first visit, to continue walk + // it may be called concurrently + Visit(cid.Cid) bool + HandleError(cid.Cid, error) + Len() int + Cids() []cid.Cid + RegisterVisitHook(func(cid.Cid) bool) + RegisterErrorHook(func(cid.Cid, error)) +} + +type syncVisitor struct { + cidSet map[cid.Cid]struct{} + mutex sync.RWMutex + visitHooks []func(cid.Cid) bool + errorHooks []func(cid.Cid, error) +} + +var _ Visitor = (*syncVisitor)(nil) + +func (v *syncVisitor) Visit(c cid.Cid) bool { + { + // check if already visited + v.mutex.RLock() + _, ok := v.cidSet[c] + v.mutex.RUnlock() + if ok { + return false + } + for _, hook := range v.visitHooks { + if !hook(c) { + return false + } + } + } + + { + // mark as visited + v.mutex.Lock() + v.cidSet[c] = struct{}{} + v.mutex.Unlock() + } + + return true +} + +func (v *syncVisitor) HandleError(c cid.Cid, err error) { + for _, hook := range v.errorHooks { + hook(c, err) + } +} + +func (v *syncVisitor) Len() int { + v.mutex.Lock() + defer v.mutex.Unlock() + return len(v.cidSet) +} + +func (v *syncVisitor) Cids() []cid.Cid { + v.mutex.Lock() + defer v.mutex.Unlock() + cids := make([]cid.Cid, 0, len(v.cidSet)) + for c := range v.cidSet { + cids = append(cids, c) + } + return cids +} + +func (v *syncVisitor) RegisterVisitHook(hook func(cid.Cid) bool) { + v.visitHooks = append(v.visitHooks, hook) +} + +func (v *syncVisitor) RegisterErrorHook(hook func(cid.Cid, error)) { + v.errorHooks = append(v.errorHooks, hook) +} + +type Flag string + +const ( + FlagEnableDefaultErrorHandler Flag = "enable-default-error-handler" +) + +func NewSyncVisitor(flags ...Flag) Visitor { + v := &syncVisitor{ + cidSet: make(map[cid.Cid]struct{}), + } + + for _, flag := range flags { + switch flag { + case FlagEnableDefaultErrorHandler: + v.RegisterErrorHook(func(c cid.Cid, err error) { + log.Debugf("visit %s fail: %s", c, err) + }) + default: + log.Warnf("unknown flag: %s", flag) + } + } + + return v +} + +func WalkUntil(ctx context.Context, store blockstore.Blockstore, tipsetKey cid.Cid, targetEpoch abi.ChainEpoch) error { + log.Info("walk chain to sync state") + v := NewSyncVisitor() + return walkChain(ctx, store, tipsetKey, v, targetEpoch, true) +} + +func WalkHeader(ctx context.Context, store blockstore.Blockstore, tipsetKey cid.Cid, targetEpoch abi.ChainEpoch) error { + log.Info("walk chain to back up header") + v := NewSyncVisitor() + return walkChain(ctx, store, tipsetKey, v, targetEpoch, false) +} + +func walkChain(ctx context.Context, store blockstore.Blockstore, tipsetKey cid.Cid, v Visitor, targetEpoch abi.ChainEpoch, walkState bool) (err error) { + skipMessage := false + skipReceipts := false + + start := time.Now() + defer func() { + log.Infow("finish walk chain", "from", tipsetKey, "target", targetEpoch, "elapsed", time.Since(start), "visited", v.Len(), "error", err) + }() + + log.Infow("start walk chain", "from", tipsetKey, "target", targetEpoch) + + cst := cbor.NewCborStore(store) + var tsk types.TipSetKey + err = cst.Get(ctx, tipsetKey, &tsk) + if err != nil { + err = fmt.Errorf("get tipsetKey(%s): %w", tipsetKey, err) + return + } + var b types.BlockHeader + err = cst.Get(ctx, tsk.Cids()[0], &b) + if err != nil { + err = fmt.Errorf("get block(%s): %w", tsk.Cids()[0], err) + return + } + + blockToWalk := make([]cid.Cid, 0) + objectToWalk := make([]cid.Cid, 0) + + pushObject := func(c ...cid.Cid) { + if !walkState { + return + } + objectToWalk = append(objectToWalk, c...) + } + + blockToWalk = append(blockToWalk, tsk.Cids()...) + + blockCount := 0 + for len(blockToWalk) > 0 { + if ctx.Err() != nil { + return ctx.Err() + } + + bCid := blockToWalk[0] + blockToWalk = blockToWalk[1:] + + if !v.Visit(bCid) { + continue + } + + var b types.BlockHeader + err = cst.Get(ctx, bCid, &b) + if err != nil { + return err + } + + if b.Height%1000 == 0 { + log.Debugf("walking block(%s, %d)", bCid, b.Height) + } + + if b.Height < targetEpoch || b.Height == 0 { + if b.Height == 0 { + if len(b.Parents) != 1 { + err = fmt.Errorf("invalid genesis block seed(%v)", b.Parents) + return + } + // genesis block + objectToWalk = append(objectToWalk, b.Parents[0]) + } + } else { + if !skipMessage { + pushObject(b.Messages) + } + if !skipReceipts { + pushObject(b.ParentMessageReceipts) + } + + pushObject(b.ParentStateRoot) + + blockToWalk = append(blockToWalk, b.Parents...) + var tskCid cid.Cid + tskCid, err = types.NewTipSetKey(b.Parents...).Cid() + if err != nil { + return err + } + + // objectToWalk = append(objectToWalk, tskCid) + pushObject(tskCid) + } + blockCount++ + } + + log.Infof("walk chain visited %d block header, remain %d object to walk", blockCount, len(objectToWalk)) + + objectCh := make(chan cid.Cid, len(objectToWalk)) + for _, c := range objectToWalk { + objectCh <- c + } + close(objectCh) + + wg := sync.WaitGroup{} + walkObjectWorkerCount := runtime.NumCPU() / 2 + log.Debugf("start %d walk object worker", walkObjectWorkerCount) + wg.Add(walkObjectWorkerCount) + for i := 0; i < walkObjectWorkerCount; i++ { + go func() { + defer wg.Done() + for c := range objectCh { + walkObject(ctx, store, c, v) + } + }() + } + wg.Wait() + + return nil +} + +func walkObject(ctx context.Context, store blockstore.Blockstore, c cid.Cid, v Visitor) { + if !v.Visit(c) { + return + } + + // handle only dag-cbor which is the default cid codec of state types + if c.Prefix().Codec != cid.DagCBOR { + // should be exit + has, err := store.Has(ctx, c) + if err != nil { + v.HandleError(c, fmt.Errorf("check has(%s): %w", c, err)) + } + if !has { + v.HandleError(c, fmt.Errorf("object(%s) not found", c)) + } + return + } + + var links []cid.Cid + err := store.View(ctx, c, func(data []byte) error { + return cbg.ScanForLinks(bytes.NewReader(data), func(c cid.Cid) { + links = append(links, c) + }) + }) + if err != nil { + v.HandleError(c, fmt.Errorf("scan link for(cid: %s): %w", c, err)) + } + + for _, c := range links { + walkObject(ctx, store, c, v) + } + +} + +func backupHeader(ctx context.Context, from cid.Cid, src, dst blockstore.Blockstore) error { + log.Infow("backup header", "from", from) + v := NewSyncVisitor(FlagEnableDefaultErrorHandler) + v.RegisterVisitHook(func(c cid.Cid) bool { + has, err := dst.Has(ctx, c) + if err != nil { + log.Warnf("backup header: check %s whether exit in destination fail", c) + } + return !has + }) + + return walkChain(ctx, src, from, v, 1, false) +} + +func Once(f func()) func() { + var once sync.Once + return func() { + once.Do(f) + } +} diff --git a/venus-shared/blockstore/splitstore/walk_test.go b/venus-shared/blockstore/splitstore/walk_test.go new file mode 100644 index 0000000000..6ba061081f --- /dev/null +++ b/venus-shared/blockstore/splitstore/walk_test.go @@ -0,0 +1,82 @@ +package splitstore + +import ( + "context" + "testing" + + "github.com/filecoin-project/venus/venus-shared/blockstore" + "github.com/filecoin-project/venus/venus-shared/logging" + "github.com/filecoin-project/venus/venus-shared/types" + bstore "github.com/ipfs/boxo/blockstore" + "github.com/ipfs/go-cid" + cbor "github.com/ipfs/go-ipld-cbor" + "github.com/stretchr/testify/require" +) + +func init() { + err := logging.SetLogLevel("splitstore", "debug") + if err != nil { + panic(err) + } +} + +func TestWalk(t *testing.T) { + ctx := context.Background() + + log.Info("log level") + log.Debug("log level") + + badgerPath := "./test_data/base_583_bafy2bzaceazuutcexhvwkyyesszohrkjjzk2zgknasgs7bb7zgfnwghtnu5w2.db" + blockCid := cid.MustParse("bafy2bzaceazuutcexhvwkyyesszohrkjjzk2zgknasgs7bb7zgfnwghtnu5w2") + + ds, err := openStore(badgerPath) + require.NoError(t, err) + + cst := cbor.NewCborStore(ds) + + var b types.BlockHeader + err = cst.Get(ctx, blockCid, &b) + require.NoError(t, err) + + tsk := types.NewTipSetKey(blockCid) + require.False(t, tsk.IsEmpty()) + + tskCid, err := tsk.Cid() + require.NoError(t, err) + + err = WalkUntil(ctx, ds, tskCid, 10) + require.NoError(t, err) +} + +func TestVisitor(t *testing.T) { + t.Run("visit duplicated cid", func(t *testing.T) { + v := NewSyncVisitor() + c, err := types.DefaultCidBuilder.Sum([]byte("duplicated_cid")) + require.NoError(t, err) + require.True(t, v.Visit(c)) + require.False(t, v.Visit(c)) + require.Equal(t, v.Len(), 1) + require.Len(t, v.Cids(), 1) + require.Equal(t, v.Cids()[0], c) + }) + + t.Run("test hook", func(t *testing.T) { + v := NewSyncVisitor() + c, err := types.DefaultCidBuilder.Sum([]byte("cids_reject")) + require.NoError(t, err) + v.RegisterVisitHook(func(a cid.Cid) bool { + return !a.Equals(c) + }) + + require.False(t, v.Visit(c)) + }) +} + +func openStore(path string) (*blockstore.BadgerBlockstore, error) { + opt, err := blockstore.BadgerBlockstoreOptions(path, false) + opt.Prefix = bstore.BlockPrefix.String() + if err != nil { + return nil, err + } + return blockstore.Open(opt) +} diff --git a/venus-shared/logging/logger.go b/venus-shared/logging/logger.go index 38fccea19c..59418f80f6 100644 --- a/venus-shared/logging/logger.go +++ b/venus-shared/logging/logger.go @@ -33,3 +33,7 @@ func LoggerFromContext(ctx context.Context, fallback *EventLogger) *TaggedLogger return &fallback.SugaredLogger } + +func SetLogLevel(name, level string) error { + return logging.SetLogLevel(name, level) +}