diff --git a/pkg/local_object_storage/engine/evacuate.go b/pkg/local_object_storage/engine/evacuate.go index aaade013fd..8b17fd1372 100644 --- a/pkg/local_object_storage/engine/evacuate.go +++ b/pkg/local_object_storage/engine/evacuate.go @@ -152,9 +152,9 @@ mainLoop: if _, ok := shardMap[shards[j].ID().String()]; ok { continue } - putDone, exists := e.putToShard(shards[j].hashedShard, j, shards[j].pool, addr, PutPrm{obj: getRes.Object()}) - if putDone || exists { - if putDone { + exists, err := e.putToShard(shards[j].hashedShard, j, shards[j].pool, addr, PutPrm{obj: getRes.Object()}) + if err == nil { + if !exists { e.log.Debug("object is moved to another shard", zap.String("from", sidList[n]), zap.Stringer("to", shards[j].ID()), diff --git a/pkg/local_object_storage/engine/put.go b/pkg/local_object_storage/engine/put.go index 025822ce81..4666298eea 100644 --- a/pkg/local_object_storage/engine/put.go +++ b/pkg/local_object_storage/engine/put.go @@ -2,6 +2,8 @@ package engine import ( "errors" + "fmt" + "time" "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor" @@ -10,6 +12,7 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/util" objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "github.com/panjf2000/ants/v2" "go.uber.org/zap" ) @@ -76,8 +79,14 @@ func (e *StorageEngine) put(prm PutPrm) (PutRes, error) { } finished := false + var bestShard hashedShard + var bestPool util.WorkerPool e.iterateOverSortedShards(addr, func(ind int, sh hashedShard) (stop bool) { + if ind == 0 { + bestShard = sh + } + e.mtx.RLock() pool, ok := e.shardPools[sh.ID().String()] e.mtx.RUnlock() @@ -86,28 +95,61 @@ func (e *StorageEngine) put(prm PutPrm) (PutRes, error) { return false } - putDone, exists := e.putToShard(sh, ind, pool, addr, prm) - finished = putDone || exists + exists, err := e.putToShard(sh, ind, pool, addr, prm) + finished = err == nil || exists return finished }) if !finished { - err = errPutShard + err = e.putToShardWithDeadLine(bestShard, 0, bestPool, addr, prm) + if err != nil { + e.log.Warn("last stand to put object to the best shard", + zap.Stringer("addr", addr), + zap.Stringer("shard", bestShard.ID()), + zap.Error(err)) + + return PutRes{}, errPutShard + } } - return PutRes{}, err + return PutRes{}, nil +} + +func (e *StorageEngine) putToShardWithDeadLine(sh hashedShard, ind int, pool util.WorkerPool, addr oid.Address, prm PutPrm) error { + const deadline = 30 * time.Second + timer := time.NewTimer(deadline) + defer timer.Stop() + + const putCooldown = 100 * time.Millisecond + ticker := time.NewTicker(putCooldown) + defer ticker.Stop() + + for { + select { + case <-timer.C: + return fmt.Errorf("could not put object within %s", deadline) + case <-ticker.C: + _, err := e.putToShard(sh, ind, pool, addr, prm) + if errors.Is(err, ants.ErrPoolOverload) { + ticker.Reset(putCooldown) + continue + } + + return err + } + } } // putToShard puts object to sh. -// First return value is true iff put has been successfully done. -// Second return value is true iff object already exists. -func (e *StorageEngine) putToShard(sh hashedShard, ind int, pool util.WorkerPool, addr oid.Address, prm PutPrm) (bool, bool) { - var putSuccess, alreadyExists bool +// Return value is true iff object already exists. +func (e *StorageEngine) putToShard(sh hashedShard, ind int, pool util.WorkerPool, addr oid.Address, prm PutPrm) (bool, error) { + var alreadyExists bool + var errGlobal error id := sh.ID() exitCh := make(chan struct{}) - if err := pool.Submit(func() { + err := pool.Submit(func() { defer close(exitCh) var existPrm shard.ExistsPrm @@ -124,8 +166,11 @@ func (e *StorageEngine) putToShard(sh hashedShard, ind int, pool util.WorkerPool // object is already found but // expired => do nothing with it alreadyExists = true + return } + errGlobal = err + return // this is not ErrAlreadyRemoved error so we can go to the next shard } @@ -159,6 +204,8 @@ func (e *StorageEngine) putToShard(sh hashedShard, ind int, pool util.WorkerPool _, err = sh.Put(putPrm) if err != nil { + errGlobal = err + if errors.Is(err, shard.ErrReadOnlyMode) || errors.Is(err, blobstor.ErrNoPlaceFound) || errors.Is(err, common.ErrReadOnly) || errors.Is(err, common.ErrNoSpace) { e.log.Warn("could not put object to shard", @@ -167,19 +214,20 @@ func (e *StorageEngine) putToShard(sh hashedShard, ind int, pool util.WorkerPool return } - e.reportShardError(sh, "could not put object to shard", err) + e.reportShardError(sh, "could not put object to shard", errGlobal) return } - - putSuccess = true - }); err != nil { + }) + if err != nil { e.log.Warn("object put: pool task submitting", zap.Stringer("shard", id), zap.Error(err)) close(exitCh) + + return false, err } <-exitCh - return putSuccess, alreadyExists + return alreadyExists, errGlobal } // Put writes provided object to local storage.