From 9a60f815a254fbc2f1610d2fe095e2495e601561 Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Mon, 26 Feb 2024 23:55:15 +0300 Subject: [PATCH] wip Signed-off-by: Pavel Karpy --- cmd/neofs-node/object.go | 102 ++++++++++++++---- go.mod | 14 +-- go.sum | 28 ++--- pkg/core/object/fmt.go | 73 ++++++++++++- pkg/local_object_storage/metabase/VERSION.md | 4 + .../metabase/containers.go | 6 ++ pkg/local_object_storage/metabase/delete.go | 2 + pkg/local_object_storage/metabase/get.go | 16 +++ pkg/local_object_storage/metabase/put.go | 40 ++++++- pkg/local_object_storage/metabase/select.go | 3 + .../metabase/select_test.go | 11 ++ pkg/local_object_storage/metabase/util.go | 13 ++- pkg/local_object_storage/util/splitinfo.go | 4 + pkg/services/object/acl/acl.go | 10 ++ pkg/services/object/acl/eacl/v2/headers.go | 46 +++++++- pkg/services/object/acl/eacl/v2/opts.go | 6 ++ pkg/services/object/acl/v2/service.go | 4 +- pkg/services/object/get/assemble.go | 7 ++ pkg/services/object/get/assemble_v2split.go | 62 +++++++++++ pkg/services/object/get/exec.go | 4 + pkg/services/object/put/service.go | 6 ++ pkg/services/object/split/verify.go | 82 ++++++++++++++ 22 files changed, 487 insertions(+), 56 deletions(-) create mode 100644 pkg/services/object/get/assemble_v2split.go create mode 100644 pkg/services/object/split/verify.go diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index ba671507f3f..807a6081b74 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" + lru "github.com/hashicorp/golang-lru/v2" "github.com/nspcc-dev/neofs-api-go/v2/object" objectGRPC "github.com/nspcc-dev/neofs-api-go/v2/object/grpc" replicatorconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/replicator" @@ -29,6 +30,7 @@ import ( putsvcV2 "github.com/nspcc-dev/neofs-node/pkg/services/object/put/v2" searchsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/search" searchsvcV2 "github.com/nspcc-dev/neofs-node/pkg/services/object/search/v2" + "github.com/nspcc-dev/neofs-node/pkg/services/object/split" "github.com/nspcc-dev/neofs-node/pkg/services/object/util" "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" "github.com/nspcc-dev/neofs-node/pkg/services/policer" @@ -238,6 +240,26 @@ func initObjectService(c *cfg) { } } + sGet := getsvc.New( + getsvc.WithLogger(c.log), + getsvc.WithLocalStorageEngine(ls), + getsvc.WithClientConstructor(coreConstructor), + getsvc.WithTraverserGenerator( + traverseGen.WithTraverseOptions( + placement.SuccessAfter(1), + ), + ), + getsvc.WithNetMapSource(c.netMapSource), + getsvc.WithKeyStorage(keyStorage), + ) + + *c.cfgObject.getSvc = *sGet // need smth better + + sGetV2 := getsvcV2.NewService( + getsvcV2.WithInternalService(sGet), + getsvcV2.WithKeyStorage(keyStorage), + ) + sPut := putsvc.NewService( putsvc.WithKeyStorage(keyStorage), putsvc.WithClientConstructor(putConstructor), @@ -249,6 +271,7 @@ func initObjectService(c *cfg) { putsvc.WithNetworkState(c.cfgNetmap.state), putsvc.WithWorkerPools(c.cfgObject.pool.putRemote, c.cfgObject.pool.putLocal), putsvc.WithLogger(c.log), + putsvc.WithSplitChainVerifier(split.NewVerifier(sGet)), ) sPutV2 := putsvcV2.NewService( @@ -274,26 +297,6 @@ func initObjectService(c *cfg) { searchsvcV2.WithKeyStorage(keyStorage), ) - sGet := getsvc.New( - getsvc.WithLogger(c.log), - getsvc.WithLocalStorageEngine(ls), - getsvc.WithClientConstructor(coreConstructor), - getsvc.WithTraverserGenerator( - traverseGen.WithTraverseOptions( - placement.SuccessAfter(1), - ), - ), - getsvc.WithNetMapSource(c.netMapSource), - getsvc.WithKeyStorage(keyStorage), - ) - - *c.cfgObject.getSvc = *sGet // need smth better - - sGetV2 := getsvcV2.NewService( - getsvcV2.WithInternalService(sGet), - getsvcV2.WithKeyStorage(keyStorage), - ) - sDelete := deletesvc.New( deletesvc.WithLogger(c.log), deletesvc.WithHeadService(sGet), @@ -326,6 +329,13 @@ func initObjectService(c *cfg) { }, ) + // cachedFirstObjectsNumber is a total cached objects number; the V2 split scheme + // expects the first part of the chain to hold a user-defined header of the original + // object which should be treated as a header to use for the eACL rules check; so + // every object part in every chain will try to refer to the first part, so caching + // should help a lot here + const cachedFirstObjectsNumber = 1000 + aclSvc := v2.New( v2.WithLogger(c.log), v2.WithIRFetcher(newCachedIRFetcher(irFetcher)), @@ -339,7 +349,8 @@ func initObjectService(c *cfg) { SetNetmapState(c.cfgNetmap.state). SetEACLSource(c.cfgObject.eaclSource). SetValidator(eaclSDK.NewValidator()). - SetLocalStorage(ls), + SetLocalStorage(ls). + SetHeaderSource(cachedHeaderSource(sGet, cachedFirstObjectsNumber)), ), ), ) @@ -590,3 +601,52 @@ func (e engineWithoutNotifications) Lock(locker oid.Address, toLock []oid.ID) er func (e engineWithoutNotifications) Put(o *objectSDK.Object) error { return engine.Put(e.engine, o) } + +func cachedHeaderSource(getSvc *getsvc.Service, cacheSize int) headerSource { + hs := headerSource{getsvc: getSvc} + + if cacheSize > 0 { + hs.cache, _ = lru.New[oid.Address, *objectSDK.Object](cacheSize) + } + + return hs +} + +type headerSource struct { + getsvc *getsvc.Service + cache *lru.Cache[oid.Address, *objectSDK.Object] +} + +type headerWriter struct { + h *objectSDK.Object +} + +func (h *headerWriter) WriteHeader(o *objectSDK.Object) error { + h.h = o + return nil +} + +func (h headerSource) Head(address oid.Address) (*objectSDK.Object, error) { + if h.cache != nil { + head, ok := h.cache.Get(address) + if ok { + return head, nil + } + } + + var hw headerWriter + + // no custom common prms since a caller is expected to be a container + // participant so no additional headers, access tokens, etc + var prm getsvc.HeadPrm + prm.SetHeaderWriter(&hw) + prm.WithAddress(address) + prm.WithRawFlag(true) + + err := h.getsvc.Head(context.Background(), prm) + if err != nil { + return nil, fmt.Errorf("reading header: %w", err) + } + + return hw.h, nil +} diff --git a/go.mod b/go.mod index b878d8c0f49..cebde4b23f2 100644 --- a/go.mod +++ b/go.mod @@ -7,9 +7,9 @@ require ( github.com/chzyer/readline v1.5.1 github.com/flynn-archive/go-shlex v0.0.0-20150515145356-3f9db97f8568 github.com/google/go-github/v39 v39.2.0 - github.com/google/uuid v1.3.1 + github.com/google/uuid v1.6.0 github.com/hashicorp/golang-lru/v2 v2.0.7 - github.com/klauspost/compress v1.17.2 + github.com/klauspost/compress v1.17.6 github.com/mitchellh/go-homedir v1.1.0 github.com/mr-tron/base58 v1.2.0 github.com/multiformats/go-multiaddr v0.12.0 @@ -17,9 +17,9 @@ require ( github.com/nspcc-dev/hrw/v2 v2.0.0 github.com/nspcc-dev/locode-db v0.5.0 github.com/nspcc-dev/neo-go v0.105.1 - github.com/nspcc-dev/neofs-api-go/v2 v2.14.0 + github.com/nspcc-dev/neofs-api-go/v2 v2.14.1-0.20240213170208-cfca09b5acbe github.com/nspcc-dev/neofs-contract v0.19.1 - github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.11.0.20240130073207-03ed6db7e1cd + github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.11.0.20240215154601-e1a82c5e589c github.com/nspcc-dev/tzhash v1.7.1 github.com/olekukonko/tablewriter v0.0.5 github.com/panjf2000/ants/v2 v2.8.2 @@ -33,8 +33,8 @@ require ( go.uber.org/zap v1.26.0 golang.org/x/exp v0.0.0-20231006140011-7918f672742d golang.org/x/sync v0.3.0 - golang.org/x/sys v0.15.0 - golang.org/x/term v0.15.0 + golang.org/x/sys v0.17.0 + golang.org/x/term v0.17.0 google.golang.org/grpc v1.59.0 google.golang.org/protobuf v1.31.0 gopkg.in/yaml.v3 v3.0.1 @@ -97,7 +97,7 @@ require ( github.com/twmb/murmur3 v1.1.8 // indirect github.com/urfave/cli v1.22.5 // indirect go.uber.org/multierr v1.10.0 // indirect - golang.org/x/crypto v0.17.0 // indirect + golang.org/x/crypto v0.19.0 // indirect golang.org/x/net v0.17.0 // indirect golang.org/x/text v0.14.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 // indirect diff --git a/go.sum b/go.sum index 6f213fdd73e..589b2a1a227 100644 --- a/go.sum +++ b/go.sum @@ -159,8 +159,8 @@ github.com/google/pprof v0.0.0-20201218002935-b9804c9f04c2/go.mod h1:kpwsk12EmLe github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/subcommands v1.2.0/go.mod h1:ZjhPrFU+Olkh9WazFPsl27BQ4UPiG37m3yTrtFlrHVk= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4= -github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= github.com/googleapis/google-cloud-go-testing v0.0.0-20200911160855-bcd43fbb19e8/go.mod h1:dvDLG8qkwmyD9a/MJJN3XJcT3xFxOKAvTZGvuZmac9g= @@ -186,8 +186,8 @@ github.com/ipfs/go-cid v0.3.2/go.mod h1:gQ8pKqT/sUxGY+tIwy1RPpAojYu7jAyCp5Tz1svo github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4= -github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.17.6 h1:60eq2E/jlfwQXtvZEeBUYADs+BwKBWURIY+Gj2eRGjI= +github.com/klauspost/compress v1.17.6/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.2.3 h1:sxCkb+qR91z4vsqw4vGGZlDgPz3G7gjaLyK3V8y70BU= github.com/klauspost/cpuid/v2 v2.2.3/go.mod h1:RVVoqg1df56z8g3pUjL/3lE5UfnlrJX8tyFgg4nqhuY= @@ -251,14 +251,14 @@ github.com/nspcc-dev/neo-go v0.105.1 h1:r0b2yIwLBi+ARBKU94gHL9oTFEB/XMJ0YlS2HN9Q github.com/nspcc-dev/neo-go v0.105.1/go.mod h1:GNh0cRALV/cuj+/xg2ZHDsrFbqcInqG7jjhqsLEnlNc= github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20231127165613-b35f351f0ba0 h1:N+dMIBmteXjJpkH6UZ7HmNftuFxkqszfGLbhsEctnv0= github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20231127165613-b35f351f0ba0/go.mod h1:J/Mk6+nKeKSW4wygkZQFLQ6SkLOSGX5Ga0RuuuktEag= -github.com/nspcc-dev/neofs-api-go/v2 v2.14.0 h1:jhuN8Ldqz7WApvUJRFY0bjRXE1R3iCkboMX5QVZhHVk= -github.com/nspcc-dev/neofs-api-go/v2 v2.14.0/go.mod h1:DRIr0Ic1s+6QgdqmNFNLIqMqd7lNMJfYwkczlm1hDtM= +github.com/nspcc-dev/neofs-api-go/v2 v2.14.1-0.20240213170208-cfca09b5acbe h1:Hoq88+PWS6tNnX4Y0jxE0C8wvxPI8UlVnCs2ZJDEy4Y= +github.com/nspcc-dev/neofs-api-go/v2 v2.14.1-0.20240213170208-cfca09b5acbe/go.mod h1:eaffSBIGhXUIMYvRBGXmlgQRLyyCWlzOft9jGYlqwrw= github.com/nspcc-dev/neofs-contract v0.19.1 h1:U1Uh+MlzfkalO0kRJ2pADZyHrmAOroC6KLFjdWnTNR0= github.com/nspcc-dev/neofs-contract v0.19.1/go.mod h1:ZOGouuwuHpgvYkx/LCGufGncIzEUhYEO18LL4cWEbyw= github.com/nspcc-dev/neofs-crypto v0.4.0 h1:5LlrUAM5O0k1+sH/sktBtrgfWtq1pgpDs09fZo+KYi4= github.com/nspcc-dev/neofs-crypto v0.4.0/go.mod h1:6XJ8kbXgOfevbI2WMruOtI+qUJXNwSGM/E9eClXxPHs= -github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.11.0.20240130073207-03ed6db7e1cd h1:kRIn6i7BTa55ae4cH+UcqRfH//XC20mSC4E9WcWxkmM= -github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.11.0.20240130073207-03ed6db7e1cd/go.mod h1:2PKUuH7kQaAmQ/USBgmiD/k08ssnSvayor6JAFhrC1c= +github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.11.0.20240215154601-e1a82c5e589c h1:t5Ta7JGqhYiAP3I56U91xX8kmq8HQtX3kHusISPyzas= +github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.11.0.20240215154601-e1a82c5e589c/go.mod h1:GED7qcgut188OQYY7lak/cGN8DHtjqQbrXdSt/82QNE= github.com/nspcc-dev/rfc6979 v0.2.0 h1:3e1WNxrN60/6N0DW7+UYisLeZJyfqZTNOjeV/toYvOE= github.com/nspcc-dev/rfc6979 v0.2.0/go.mod h1:exhIh1PdpDC5vQmyEsGvc4YDM/lyQp/452QxGq/UEso= github.com/nspcc-dev/tzhash v1.7.1 h1:6zmexLqdTF/ssbUAh7XJS7RxgKWaw28kdNpE/4UFdEU= @@ -365,8 +365,8 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210817164053-32db794688a5/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= -golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k= -golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= +golang.org/x/crypto v0.19.0 h1:ENy+Az/9Y1vSrlrvBSyna3PITt4tiZLf7sgCjZBX7Wo= +golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -508,11 +508,11 @@ golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220310020820-b874c991c1a5/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= -golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y= +golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= -golang.org/x/term v0.15.0 h1:y/Oo/a/q3IXu26lQgl04j/gjuBDOBlx7X6Om1j2CPW4= -golang.org/x/term v0.15.0/go.mod h1:BDl952bC7+uMoWR75FIrCDx79TPU9oHkTZ9yRbYOrX0= +golang.org/x/term v0.17.0 h1:mkTF7LCd6WGJNL3K1Ad7kwxNfYAW6a8a8QqtMblp/4U= +golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/pkg/core/object/fmt.go b/pkg/core/object/fmt.go index c533e0c040d..8719ae1bb44 100644 --- a/pkg/core/object/fmt.go +++ b/pkg/core/object/fmt.go @@ -1,6 +1,7 @@ package object import ( + "context" "errors" "fmt" "strconv" @@ -23,6 +24,7 @@ type FormatValidatorOption func(*cfg) type cfg struct { netState netmap.State e LockSource + sv SplitVerifier } // DeleteHandler is an interface of delete queue processor. @@ -49,6 +51,12 @@ type Locker interface { Lock(idCnr cid.ID, locker oid.ID, locked []oid.ID) error } +// todo +type SplitVerifier interface { + // todo + VerifySplit(context.Context, oid.Address, []object.MeasuredObject) error +} + var errNilObject = errors.New("object is nil") var errNilID = errors.New("missing identifier") @@ -103,6 +111,41 @@ func (v *FormatValidator) Validate(obj *object.Object, unprepared bool) error { return err } + _, firstSet := obj.FirstID() + splitID := obj.SplitID() + par := obj.Parent() + + if obj.HasParent() { + if splitID != nil { + // V1 split + if firstSet { + return errors.New("v1 split: first ID object is set") + } + } else { + // V2 split + + if !firstSet { + // first part only + if obj.Parent() == nil { + return errors.New("v2 split: first object part does not have parent header") + } + } else { + // 2nd+ parts + + typ := obj.Type() + + // link object only + if typ == object.TypeLink && (par == nil || par.Signature() == nil) { + return errors.New("v2 split: incorrect link object's parent header") + } + + if _, hasPrevious := obj.PreviousID(); typ != object.TypeLink && !hasPrevious { + return errors.New("v2 split: middle part does not have previous object ID") + } + } + } + } + if err := v.checkAttributes(obj); err != nil { return fmt.Errorf("invalid attributes: %w", err) } @@ -121,9 +164,9 @@ func (v *FormatValidator) Validate(obj *object.Object, unprepared bool) error { } } - if obj = obj.Parent(); obj != nil { + if par != nil { // Parent object already exists. - return v.Validate(obj, false) + return v.Validate(obj, firstSet) } return nil @@ -161,6 +204,7 @@ func (v *FormatValidator) validateSignatureKey(obj *object.Object) error { // is one of: // - object.TypeTombstone; // - object.TypeStorageGroup; +// - object.TypeLink; // - object.TypeLock. type ContentMeta struct { typ object.Type @@ -191,6 +235,24 @@ func (v *FormatValidator) ValidateContent(o *object.Object) (ContentMeta, error) switch o.Type() { case object.TypeRegular: // ignore regular objects, they do not need payload formatting + case object.TypeLink: + if len(o.Payload()) == 0 { + return ContentMeta{}, fmt.Errorf("(%T) empty payload in the link object", v) + } + + var testLink object.Link + + err := o.ReadLink(&testLink) + if err != nil { + return ContentMeta{}, fmt.Errorf("reading link object's payload: %w", err) + } + + AddressOf(o) + + err = v.sv.VerifySplit(context.Background(), AddressOf(o), testLink.Objects()) + if err != nil { + return ContentMeta{}, fmt.Errorf("link object's split chain verification: %w", err) + } case object.TypeTombstone: if len(o.Payload()) == 0 { return ContentMeta{}, fmt.Errorf("(%T) empty payload in tombstone", v) @@ -392,3 +454,10 @@ func WithLockSource(e LockSource) FormatValidatorOption { c.e = e } } + +// todo +func WithSplitVerifier(sv SplitVerifier) FormatValidatorOption { + return func(c *cfg) { + c.sv = sv + } +} diff --git a/pkg/local_object_storage/metabase/VERSION.md b/pkg/local_object_storage/metabase/VERSION.md index 6750900ce38..971033a41e0 100644 --- a/pkg/local_object_storage/metabase/VERSION.md +++ b/pkg/local_object_storage/metabase/VERSION.md @@ -57,6 +57,10 @@ Numbers stand for a single byte value. - Name: container ID + `9` - Key: object ID - Value: marshaled object +- Buckets containing object or LINK type + - Name: container ID + `18` + - Key: object ID + - Value: marshaled object - Buckets mapping objects to the storage ID they are stored in - Name: container ID + `10` - Key: object ID diff --git a/pkg/local_object_storage/metabase/containers.go b/pkg/local_object_storage/metabase/containers.go index b037e8b7a3f..2c95b0cfa19 100644 --- a/pkg/local_object_storage/metabase/containers.go +++ b/pkg/local_object_storage/metabase/containers.go @@ -184,6 +184,12 @@ func (db *DB) DeleteContainer(cID cid.ID) error { return fmt.Errorf("root object's bucket cleanup: %w", err) } + // Link objects + err = tx.DeleteBucket(linkObjectsBucketName(cID, buff)) + if err != nil && !errors.Is(err, bbolt.ErrBucketNotFound) { + return fmt.Errorf("link objects' bucket cleanup: %w", err) + } + // indexes err = tx.DeleteBucket(ownerBucketName(cID, buff)) diff --git a/pkg/local_object_storage/metabase/delete.go b/pkg/local_object_storage/metabase/delete.go index 75a4a55ad90..fed46acab34 100644 --- a/pkg/local_object_storage/metabase/delete.go +++ b/pkg/local_object_storage/metabase/delete.go @@ -336,6 +336,8 @@ func delUniqueIndexes(tx *bbolt.Tx, obj *objectSDK.Object, isParent bool) error bucketName = storageGroupBucketName(cnr, bucketName) case objectSDK.TypeLock: bucketName = bucketNameLockers(cnr, bucketName) + case objectSDK.TypeLink: + bucketName = linkObjectsBucketName(cnr, bucketName) default: return ErrUnknownObjectType } diff --git a/pkg/local_object_storage/metabase/get.go b/pkg/local_object_storage/metabase/get.go index d41d971b9e4..f1a8c251283 100644 --- a/pkg/local_object_storage/metabase/get.go +++ b/pkg/local_object_storage/metabase/get.go @@ -109,6 +109,12 @@ func (db *DB) get(tx *bbolt.Tx, addr oid.Address, key []byte, checkStatus, raw b return obj, obj.Unmarshal(data) } + // if not found then check in link objects index + data = getFromBucket(tx, linkObjectsBucketName(cnr, bucketName), key) + if len(data) != 0 { + return obj, obj.Unmarshal(data) + } + // if not found then check if object is a virtual return getVirtualObject(tx, cnr, key, raw) } @@ -246,6 +252,16 @@ func listContainerObjects(tx *bbolt.Tx, cID cid.ID, unique map[oid.ID]struct{}, return nil } + // link objects + bktInit := tx.Bucket(linkObjectsBucketName(cID, buff)) + err = expandObjectsFromBucket(bktInit, unique, limit) + if err != nil { + return fmt.Errorf("link objects iteration: %w", err) + } + if len(unique) >= limit { + return nil + } + bktSmall := tx.Bucket(smallBucketName(cID, buff)) err = expandObjectsFromBucket(bktSmall, unique, limit) if err != nil { diff --git a/pkg/local_object_storage/metabase/put.go b/pkg/local_object_storage/metabase/put.go index ad8999e31e5..42a7bca56d5 100644 --- a/pkg/local_object_storage/metabase/put.go +++ b/pkg/local_object_storage/metabase/put.go @@ -187,6 +187,8 @@ func putUniqueIndexes( bucketName = storageGroupBucketName(cnr, bucketName) case objectSDK.TypeLock: bucketName = bucketNameLockers(cnr, bucketName) + case objectSDK.TypeLink: + bucketName = linkObjectsBucketName(cnr, bucketName) default: return ErrUnknownObjectType } @@ -498,6 +500,10 @@ func splitInfoFromObject(obj *objectSDK.Object) (*objectSDK.SplitInfo, error) { info := objectSDK.NewSplitInfo() info.SetSplitID(obj.SplitID()) + if firstID, set := obj.FirstID(); set { + info.SetFirstPart(firstID) + } + switch { case isLinkObject(obj): id, ok := obj.ID() @@ -520,14 +526,38 @@ func splitInfoFromObject(obj *objectSDK.Object) (*objectSDK.SplitInfo, error) { return info, nil } -// isLinkObject returns true if object contains parent header and list -// of children. +// isLinkObject returns true if +// V1: object contains parent header and list +// +// of children +// +// V2: object is LINK typed. func isLinkObject(obj *objectSDK.Object) bool { + // V2 split + if obj.Type() == objectSDK.TypeLink { + return true + } + + // V1 split return len(obj.Children()) > 0 && obj.Parent() != nil } -// isLastObject returns true if object contains only parent header without list -// of children. +// isLastObject returns true if an object has parent and +// V1: object has children in the object's header +// V2: there is no split ID, object's type is LINK, and it has first part's ID func isLastObject(obj *objectSDK.Object) bool { - return len(obj.Children()) == 0 && obj.Parent() != nil + par := obj.Parent() + if par == nil { + return false + } + + _, hasFirstObjID := obj.FirstID() + + // V2 split + if obj.SplitID() == nil && (obj.Type() != objectSDK.TypeLink && hasFirstObjID) { + return true + } + + // V1 split + return len(obj.Children()) == 0 } diff --git a/pkg/local_object_storage/metabase/select.go b/pkg/local_object_storage/metabase/select.go index 9be6c291c6d..57b4f5d03e8 100644 --- a/pkg/local_object_storage/metabase/select.go +++ b/pkg/local_object_storage/metabase/select.go @@ -144,6 +144,7 @@ func (db *DB) selectAll(tx *bbolt.Tx, cnr cid.ID, to map[string]int) { selectAllFromBucket(tx, storageGroupBucketName(cnr, bucketName), to, 0) selectAllFromBucket(tx, parentBucketName(cnr, bucketName), to, 0) selectAllFromBucket(tx, bucketNameLockers(cnr, bucketName), to, 0) + selectAllFromBucket(tx, linkObjectsBucketName(cnr, bucketName), to, 0) } // selectAllFromBucket goes through all keys in bucket and adds them in a @@ -198,6 +199,7 @@ func (db *DB) selectFastFilter( selectAllFromBucket(tx, tombstoneBucketName(cnr, bucketName), to, fNum) selectAllFromBucket(tx, storageGroupBucketName(cnr, bucketName), to, fNum) selectAllFromBucket(tx, bucketNameLockers(cnr, bucketName), to, fNum) + selectAllFromBucket(tx, linkObjectsBucketName(cnr, bucketName), to, fNum) default: // user attribute bucketName := attributeBucketName(cnr, f.Header(), bucketName) @@ -214,6 +216,7 @@ var mBucketNaming = map[string][]func(cid.ID, []byte) []byte{ object.TypeTombstone.EncodeToString(): {tombstoneBucketName}, object.TypeStorageGroup.EncodeToString(): {storageGroupBucketName}, object.TypeLock.EncodeToString(): {bucketNameLockers}, + object.TypeLink.EncodeToString(): {linkObjectsBucketName}, } func allBucketNames(cnr cid.ID) (names [][]byte) { diff --git a/pkg/local_object_storage/metabase/select_test.go b/pkg/local_object_storage/metabase/select_test.go index df2b058ca97..b0555c8afca 100644 --- a/pkg/local_object_storage/metabase/select_test.go +++ b/pkg/local_object_storage/metabase/select_test.go @@ -548,6 +548,17 @@ func TestDB_SelectObjectID(t *testing.T) { fs = objectSDK.SearchFilters{} fs.AddObjectIDFilter(objectSDK.MatchStringNotEqual, id) + _, err = metaGet(db, object.AddressOf(regular), false) + require.NoError(t, err) + _, err = metaGet(db, object.AddressOf(parent), false) + require.NoError(t, err) + _, err = metaGet(db, object.AddressOf(sg), false) + require.NoError(t, err) + _, err = metaGet(db, object.AddressOf(ts), false) + require.NoError(t, err) + _, err = metaGet(db, object.AddressOf(lock), false) + require.NoError(t, err) + testSelect(t, db, cnr, fs, object.AddressOf(regular), object.AddressOf(parent), diff --git a/pkg/local_object_storage/metabase/util.go b/pkg/local_object_storage/metabase/util.go index ff77fe00208..4c0822dfaba 100644 --- a/pkg/local_object_storage/metabase/util.go +++ b/pkg/local_object_storage/metabase/util.go @@ -120,6 +120,11 @@ const ( // Key: container ID // Value: dummy value garbageContainersPrefix + + // linkObjectsPrefix is used for prefixing buckets containing objects of LINK type. + // Key: object ID + // Value: marshaled object + linkObjectsPrefix ) const ( @@ -157,6 +162,11 @@ func smallBucketName(cnr cid.ID, key []byte) []byte { return bucketName(cnr, smallPrefix, key) } +// linkObjectsBucketName returns link objects bucket key (`17`). +func linkObjectsBucketName(cnr cid.ID, key []byte) []byte { + return bucketName(cnr, linkObjectsPrefix, key) +} + // attributeBucketName returns _attr_. func attributeBucketName(cnr cid.ID, attributeKey string, key []byte) []byte { key[0] = userAttributePrefix @@ -240,7 +250,7 @@ func firstIrregularObjectType(tx *bbolt.Tx, idCnr cid.ID, objs ...[]byte) object panic("empty object list in firstIrregularObjectType") } - var keys [3][1 + cidSize]byte + var keys [4][1 + cidSize]byte irregularTypeBuckets := [...]struct { typ object.Type @@ -249,6 +259,7 @@ func firstIrregularObjectType(tx *bbolt.Tx, idCnr cid.ID, objs ...[]byte) object {object.TypeTombstone, tombstoneBucketName(idCnr, keys[0][:])}, {object.TypeStorageGroup, storageGroupBucketName(idCnr, keys[1][:])}, {object.TypeLock, bucketNameLockers(idCnr, keys[2][:])}, + {object.TypeLink, linkObjectsBucketName(idCnr, keys[3][:])}, } for i := range objs { diff --git a/pkg/local_object_storage/util/splitinfo.go b/pkg/local_object_storage/util/splitinfo.go index fcb97af1efb..5311cf2950e 100644 --- a/pkg/local_object_storage/util/splitinfo.go +++ b/pkg/local_object_storage/util/splitinfo.go @@ -17,5 +17,9 @@ func MergeSplitInfo(from, to *object.SplitInfo) *object.SplitInfo { to.SetLink(link) } + if init, ok := from.FirstPart(); ok { + to.SetFirstPart(init) + } + return to } diff --git a/pkg/services/object/acl/acl.go b/pkg/services/object/acl/acl.go index 5360b06228b..423f7360ac0 100644 --- a/pkg/services/object/acl/acl.go +++ b/pkg/services/object/acl/acl.go @@ -25,6 +25,7 @@ type CheckerPrm struct { validator *eaclSDK.Validator localStorage *engine.StorageEngine state netmap.State + headerSource eaclV2.HeaderSource } func (c *CheckerPrm) SetEACLSource(v container.EACLSource) *CheckerPrm { @@ -47,6 +48,11 @@ func (c *CheckerPrm) SetNetmapState(v netmap.State) *CheckerPrm { return c } +func (c *CheckerPrm) SetHeaderSource(hs eaclV2.HeaderSource) *CheckerPrm { + c.headerSource = hs + return c +} + // Checker implements v2.ACLChecker interfaces and provides // ACL/eACL validation functionality. type Checker struct { @@ -54,6 +60,7 @@ type Checker struct { validator *eaclSDK.Validator localStorage *engine.StorageEngine state netmap.State + headerSource eaclV2.HeaderSource } // Various EACL check errors. @@ -79,12 +86,14 @@ func NewChecker(prm *CheckerPrm) *Checker { panicOnNil("EACLValidator", prm.validator) panicOnNil("LocalStorageEngine", prm.localStorage) panicOnNil("NetmapState", prm.state) + panicOnNil("HeaderSource", prm.headerSource) return &Checker{ eaclSrc: prm.eaclSrc, validator: prm.validator, localStorage: prm.localStorage, state: prm.state, + headerSource: prm.headerSource, } } @@ -154,6 +163,7 @@ func (c *Checker) CheckEACL(msg any, reqInfo v2.RequestInfo) error { eaclV2.WithLocalObjectStorage(c.localStorage), eaclV2.WithCID(cnr), eaclV2.WithOID(reqInfo.ObjectID()), + eaclV2.WithHeaderSource(c.headerSource), ) if req, ok := msg.(eaclV2.Request); ok { diff --git a/pkg/services/object/acl/eacl/v2/headers.go b/pkg/services/object/acl/eacl/v2/headers.go index 54d3569ac5f..a82f515bd7c 100644 --- a/pkg/services/object/acl/eacl/v2/headers.go +++ b/pkg/services/object/acl/eacl/v2/headers.go @@ -18,7 +18,8 @@ import ( type Option func(*cfg) type cfg struct { - storage ObjectStorage + storage ObjectStorage + headerSource HeaderSource msg xHeaderSource @@ -30,6 +31,12 @@ type ObjectStorage interface { Head(oid.Address) (*object.Object, error) } +// todo +type HeaderSource interface { + // todo + Head(oid.Address) (*object.Object, error) +} + type Request interface { GetMetaHeader() *session.RequestMetaHeader } @@ -129,11 +136,40 @@ func (h *cfg) readObjectHeaders(dst *headerSource) error { dst.objectHeaders = addressHeaders(h.cnr, h.obj) case *objectV2.PutRequest: if v, ok := req.GetBody().GetObjectPart().(*objectV2.PutObjectPartInit); ok { - oV2 := new(objectV2.Object) - oV2.SetObjectID(v.GetObjectID()) - oV2.SetHeader(v.GetHeader()) + if v.GetHeader().GetSplit() != nil { + // V1 split scheme, only the received object's header + // can be checked + oV2 := new(objectV2.Object) + oV2.SetObjectID(v.GetObjectID()) + oV2.SetHeader(v.GetHeader()) + + dst.objectHeaders = headersFromObject(object.NewFromV2(oV2), h.cnr, h.obj) + + break + } + + if first := v.GetHeader().GetSplit().GetFirst(); first != nil { + // that is an object part from the V2 split scheme, check + // the original object header instead + + var firstID oid.ID + err := firstID.ReadFromV2(*first) + if err != nil { + return fmt.Errorf("converting first object ID: %w", err) + } + + var addr oid.Address + addr.SetObject(firstID) + addr.SetContainer(h.cnr) + + firstObject, err := h.headerSource.Head(addr) + if err != nil { + return fmt.Errorf("fetching first object header: %w", err) + } + + dst.objectHeaders = headersFromObject(firstObject.Parent(), h.cnr, h.obj) + } - dst.objectHeaders = headersFromObject(object.NewFromV2(oV2), h.cnr, h.obj) } case *objectV2.SearchRequest: cnrV2 := req.GetBody().GetContainerID() diff --git a/pkg/services/object/acl/eacl/v2/opts.go b/pkg/services/object/acl/eacl/v2/opts.go index 4a653757fec..5e6c286f045 100644 --- a/pkg/services/object/acl/eacl/v2/opts.go +++ b/pkg/services/object/acl/eacl/v2/opts.go @@ -20,6 +20,12 @@ func WithLocalObjectStorage(v *engine.StorageEngine) Option { } } +func WithHeaderSource(hs HeaderSource) Option { + return func(c *cfg) { + c.headerSource = hs + } +} + func WithServiceRequest(v Request) Option { return func(c *cfg) { c.msg = requestXHeaderSource{ diff --git a/pkg/services/object/acl/v2/service.go b/pkg/services/object/acl/v2/service.go index 298b1a8f7ba..066e5d58a9b 100644 --- a/pkg/services/object/acl/v2/service.go +++ b/pkg/services/object/acl/v2/service.go @@ -455,7 +455,9 @@ func (p putStreamBasicChecker) Send(request *objectV2.PutRequest) error { return err } - idV2 := part.GetHeader().GetOwnerID() + header := part.GetHeader() + + idV2 := header.GetOwnerID() if idV2 == nil { return errors.New("missing object owner") } diff --git a/pkg/services/object/get/assemble.go b/pkg/services/object/get/assemble.go index b627846d941..cfc72c93a0c 100644 --- a/pkg/services/object/get/assemble.go +++ b/pkg/services/object/get/assemble.go @@ -44,6 +44,13 @@ func (exec *execCtx) assemble() { } } + if splitInfo.SplitID() == nil { + exec.log.Debug("handling V2 split") + exec.processV2Split(splitInfo) + } + + exec.log.Debug("handling V1 split") + prev, children := exec.initFromChild(childID) if len(children) > 0 { diff --git a/pkg/services/object/get/assemble_v2split.go b/pkg/services/object/get/assemble_v2split.go new file mode 100644 index 00000000000..1372e490400 --- /dev/null +++ b/pkg/services/object/get/assemble_v2split.go @@ -0,0 +1,62 @@ +package getsvc + +import ( + "errors" + + objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "go.uber.org/zap" +) + +func (exec *execCtx) processV2Split(si *objectSDK.SplitInfo) { + if _, set := si.FirstPart(); !set { + exec.log.Debug("no first ID found in V2 split") + exec.err = errors.New("v2 split without first object ID") + + return + } + + linkID, set := si.Link() + if set && exec.processV2Link(linkID) { + return + } +} + +func (exec *execCtx) processV2Link(linkID oid.ID) bool { + linkObj, ok := exec.getChild(linkID, nil, true) + if !ok { + exec.log.Debug("failed to read link object") + return false + } + + exec.collectedObject = linkObj.Parent() + + var link objectSDK.Link + err := linkObj.ReadLink(&link) + if err != nil { + exec.log.Debug("failed to parse link object", zap.Error(err)) + return false + } + + if exec.ctxRange() == nil { + if ok := exec.writeCollectedHeader(); ok { + exec.overtakePayloadDirectly(measuredObjsToIDs(link.Objects()), nil, true) + return true + } + + exec.log.Debug("failed to write parent header") + + return true + } + + return false +} + +func measuredObjsToIDs(mm []objectSDK.MeasuredObject) []oid.ID { + res := make([]oid.ID, 0, len(mm)) + for i := range mm { + res = append(res, mm[i].ObjectID()) + } + + return res +} diff --git a/pkg/services/object/get/exec.go b/pkg/services/object/get/exec.go index 7a1b5fa9206..581404d888a 100644 --- a/pkg/services/object/get/exec.go +++ b/pkg/services/object/get/exec.go @@ -298,6 +298,10 @@ func mergeSplitInfo(dst, src *objectSDK.SplitInfo) { dst.SetLink(link) } + if first, ok := src.FirstPart(); ok { + dst.SetFirstPart(first) + } + if splitID := src.SplitID(); splitID != nil { dst.SetSplitID(splitID) } diff --git a/pkg/services/object/put/service.go b/pkg/services/object/put/service.go index 48067f6524d..f6d14ccef14 100644 --- a/pkg/services/object/put/service.go +++ b/pkg/services/object/put/service.go @@ -136,6 +136,12 @@ func WithNetworkState(v netmap.State) Option { } } +func WithSplitChainVerifier(sv object.SplitVerifier) Option { + return func(c *cfg) { + c.fmtValidatorOpts = append(c.fmtValidatorOpts, object.WithSplitVerifier(sv)) + } +} + func WithClientConstructor(v ClientConstructor) Option { return func(c *cfg) { c.clientConstructor = v diff --git a/pkg/services/object/split/verify.go b/pkg/services/object/split/verify.go new file mode 100644 index 00000000000..0341c4db37d --- /dev/null +++ b/pkg/services/object/split/verify.go @@ -0,0 +1,82 @@ +package split + +import ( + "context" + "fmt" + + getsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/get" + "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "golang.org/x/sync/errgroup" +) + +// todo +func NewVerifier(get *getsvc.Service) *Verifier { + return &Verifier{ + get: get, + } +} + +// todo +type Verifier struct { + get *getsvc.Service +} + +// todo +func (v *Verifier) VerifySplit(ctx context.Context, firstObject oid.Address, childrenFromLink []object.MeasuredObject) error { + // can be limited, depends on the expected big objects payload length + var wg errgroup.Group + + for _, measuredObject := range childrenFromLink { + wg.Go(func() error { + return v.verifySinglePart(ctx, firstObject, measuredObject) + }) + } + + return wg.Wait() +} + +type headerWriter struct { + h *object.Object +} + +func (w headerWriter) WriteHeader(o *object.Object) error { + w.h = o + return nil +} + +func (v *Verifier) verifySinglePart(ctx context.Context, firstObject oid.Address, objToCheck object.MeasuredObject) error { + id := firstObject.Object() + + var hw headerWriter + + // no custom common prms since a caller is expected to be a container + // participant so no additional headers, access tokens, etc + var prm getsvc.HeadPrm + prm.SetHeaderWriter(&hw) + prm.WithAddress(firstObject) + prm.WithRawFlag(true) + + err := v.get.Head(ctx, prm) + if err != nil { + return fmt.Errorf("reading %s header: %w", firstObject, err) + } + + idRead, has := hw.h.FirstID() + if !has { + return readObjectErr(firstObject, "object that does not have first object's ID") + } + if idRead != id { + return readObjectErr(firstObject, fmt.Sprintf("its first object is unknown: got: %s, want: %s", idRead, id)) + } + + if sizeRead := uint32(hw.h.PayloadSize()); sizeRead != objToCheck.ObjectSize() { + return readObjectErr(firstObject, fmt.Sprintf("its size differs: got: %d, want: %d", sizeRead, objToCheck.ObjectSize())) + } + + return nil +} + +func readObjectErr(a oid.Address, text string) error { + return fmt.Errorf("read %s object: %s", a, text) +}