Skip to content

Commit

Permalink
cli: adjust index file creation in upload-bin
Browse files Browse the repository at this point in the history
In case of incomplete search result it will try to find each missed oid
and process it. In case of duplicates the first found will be in index
file.

Close #3647

Signed-off-by: Ekaterina Pavlova <[email protected]>
  • Loading branch information
AliceInHunterland committed Nov 1, 2024
1 parent b57a22c commit 861df72
Showing 1 changed file with 174 additions and 94 deletions.
268 changes: 174 additions & 94 deletions cli/util/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/nspcc-dev/neo-go/cli/cmdargs"
"github.com/nspcc-dev/neo-go/cli/options"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
"github.com/nspcc-dev/neo-go/pkg/services/oracle/neofs"
Expand Down Expand Up @@ -136,7 +135,7 @@ func uploadBin(ctx *cli.Context) error {
}
fmt.Fprintln(ctx.App.Writer, "Chain block height:", currentBlockHeight)

oldestMissingBlockIndex, errBlock := fetchLatestMissingBlockIndex(ctx.Context, p, containerID, acc.PrivateKey(), attr, int(currentBlockHeight), maxParallelSearches)
oldestMissingBlockIndex, errBlock := fetchLatestMissingBlockIndex(ctx.Context, p, containerID, acc, attr, int(currentBlockHeight), maxParallelSearches)
if errBlock != nil {
return cli.Exit(fmt.Errorf("failed to fetch the oldest missing block index from container: %w", err), 1)
}
Expand All @@ -149,7 +148,7 @@ func uploadBin(ctx *cli.Context) error {
}
}

err = uploadIndexFiles(ctx, p, containerID, *acc, signer, uint(currentBlockHeight), attr, homomorphicHashingDisabled, maxParallelSearches)
err = uploadIndexFiles(ctx, p, containerID, acc, signer, uint(currentBlockHeight), attr, homomorphicHashingDisabled, maxParallelSearches)
if err != nil {
return cli.Exit(fmt.Errorf("failed to upload index files: %w", err), 1)
}
Expand Down Expand Up @@ -183,11 +182,12 @@ type searchResult struct {
// fetchLatestMissingBlockIndex searches the container for the latest full batch of blocks
// starting from the currentHeight and going backwards. It returns the index of first block
// in the next batch.
func fetchLatestMissingBlockIndex(ctx context.Context, p *pool.Pool, containerID cid.ID, priv *keys.PrivateKey, attributeKey string, currentHeight int, maxParallelSearches int) (int, error) {
func fetchLatestMissingBlockIndex(ctx context.Context, p *pool.Pool, containerID cid.ID, acc *wallet.Account, attributeKey string, currentHeight int, maxParallelSearches int) (int, error) {
var (
wg sync.WaitGroup
numBatches = currentHeight/searchBatchSize + 1
emptyBatchFound bool
errCh = make(chan error)
)

for batch := numBatches; batch > -maxParallelSearches; batch -= maxParallelSearches {
Expand All @@ -206,20 +206,18 @@ func fetchLatestMissingBlockIndex(ctx context.Context, p *pool.Pool, containerID
wg.Add(1)
go func(i, startIndex, endIndex int) {
defer wg.Done()

prm := client.PrmObjectSearch{}
filters := object.NewSearchFilters()
filters.AddFilter(attributeKey, fmt.Sprintf("%d", startIndex), object.MatchNumGE)
filters.AddFilter(attributeKey, fmt.Sprintf("%d", endIndex), object.MatchNumLT)
prm.SetFilters(filters)
objCh := searchObjects(ctx, p, containerID, acc, attributeKey, uint(startIndex), uint(endIndex), 1, errCh)
var (
objectIDs []oid.ID
objectIDs = make([]oid.ID, 0, searchBatchSize)
err error
)
err = retry(func() error {
objectIDs, err = neofs.ObjectSearch(ctx, p, priv, containerID.String(), prm)
return err
})
for id := range objCh {
objectIDs = append(objectIDs, id)
}
select {
case err = <-errCh:
default:
}
results[i] = searchResult{startIndex: startIndex, endIndex: endIndex, numOIDs: len(objectIDs), err: err}
}(i, startIndex, endIndex)
}
Expand Down Expand Up @@ -326,7 +324,7 @@ func uploadBlocks(ctx *cli.Context, p *pool.Pool, rpc *rpcclient.Client, signer
}

// uploadIndexFiles uploads missing index files to the container.
func uploadIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, account wallet.Account, signer user.Signer, currentHeight uint, blockAttributeKey string, homomorphicHashingDisabled bool, maxParallelSearches int) error {
func uploadIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, account *wallet.Account, signer user.Signer, currentHeight uint, blockAttributeKey string, homomorphicHashingDisabled bool, maxParallelSearches int) error {
attributeKey := ctx.String("index-attribute")
indexFileSize := ctx.Uint("index-file-size")
fmt.Fprintln(ctx.App.Writer, "Uploading index files...")
Expand Down Expand Up @@ -354,114 +352,196 @@ func uploadIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, accoun
}

var (
errCh = make(chan error)
buffer = make([]byte, indexFileSize*oidSize)
oidCh = make(chan oid.ID, indexFileSize)
oidFetcherToProcessor = make(chan struct{}, indexFileSize)

buffer = make([]byte, indexFileSize*oidSize)
doneCh = make(chan struct{})
errCh = make(chan error)
emptyOid = make([]byte, oidSize)
)
defer close(oidCh)
for range maxParallelSearches {
go func() {
for id := range oidCh {
var obj *object.Object
errRetr := retry(func() error {
var errGetHead error
obj, errGetHead = p.ObjectHead(context.Background(), containerID, id, signer, client.PrmObjectHead{})
return errGetHead
})
if errRetr != nil {
select {
case errCh <- fmt.Errorf("failed to fetch object %s: %w", id.String(), errRetr):
default:

go func() {
defer close(doneCh)

// Main processing loop for each index file.
for i := existingIndexCount; i < expectedIndexCount; i++ {
// Start block parsing goroutines.
var (
// processedIndices is a mapping from position in buffer to the block index.
processedIndices sync.Map
wg sync.WaitGroup
oidCh = make(chan oid.ID, 2*maxParallelSearches)
)
wg.Add(maxParallelSearches)
for range maxParallelSearches {
go func() {
defer wg.Done()
for id := range oidCh {
var obj *object.Object
errRetr := retry(func() error {
var errGetHead error
obj, errGetHead = p.ObjectHead(context.Background(), containerID, id, signer, client.PrmObjectHead{})
return errGetHead
})
if errRetr != nil {
select {
case errCh <- fmt.Errorf("failed to fetch object %s: %w", id.String(), errRetr):
default:
}
return
}
blockIndex, err := getBlockIndex(obj, blockAttributeKey)
if err != nil {
select {
case errCh <- fmt.Errorf("failed to get block index from object %s: %w", id.String(), err):
default:
}
return
}
pos := uint(blockIndex) % indexFileSize
if _, ok := processedIndices.LoadOrStore(pos, blockIndex); !ok {
id.Encode(buffer[pos*oidSize:])
}
}
return
}()
}

// Search for blocks within the index file range.
startIndex := i * indexFileSize
endIndex := startIndex + indexFileSize
objIDs := searchObjects(ctx.Context, p, containerID, account, blockAttributeKey, startIndex, endIndex, maxParallelSearches, errCh)
for id := range objIDs {
oidCh <- id
}
close(oidCh)
wg.Wait()
fmt.Fprintf(ctx.App.Writer, "Index file %d generated, checking for the missing blocks...\n", i)

// Check if there are empty OIDs in the generated index file. This may happen
// if searchObjects has returned not all blocks within the requested range, ref.
// #3645. In this case, retry the search for every missing object.
var (
count int
)
for idx := range indexFileSize {
if _, ok := processedIndices.Load(idx); !ok {
count++
objIDs = searchObjects(ctx.Context, p, containerID, account, blockAttributeKey, i*indexFileSize+idx, i*indexFileSize+idx+1, 1, errCh)
// Block object duplicates are allowed, we're OK with the first found result.
id, ok := <-objIDs
for range objIDs {
}
if !ok {
select {
case errCh <- fmt.Errorf("block %d is missing from the storage", i*indexFileSize+idx):
default:
}
return
}
processedIndices.Store(idx, id)
id.Encode(buffer[idx*oidSize:])
}
blockIndex, err := getBlockIndex(obj, blockAttributeKey)
if err != nil {
}
fmt.Fprintf(ctx.App.Writer, "%d missing block(s) processed for index file %d, uploading index file...\n", count, i)

// Check if there are empty OIDs in the generated index file. If it happens at
// this stage, then there's a bug in the code.
for k := 0; k < len(buffer); k += oidSize {
if slices.Compare(buffer[k:k+oidSize], emptyOid) == 0 {
select {
case errCh <- fmt.Errorf("failed to get block index from object %s: %w", id.String(), err):
case errCh <- fmt.Errorf("empty OID found in index file %d at position %d (block index %d)", i, k/oidSize, i*indexFileSize+uint(k/oidSize)):
default:
}
return
}
offset := (uint(blockIndex) % indexFileSize) * oidSize
id.Encode(buffer[offset:])
oidFetcherToProcessor <- struct{}{}
}
}()

// Upload index file.
attrs := []object.Attribute{
*object.NewAttribute(attributeKey, strconv.Itoa(int(i))),
*object.NewAttribute("IndexSize", strconv.Itoa(int(indexFileSize))),
}
err := retry(func() error {
return uploadObj(ctx.Context, p, signer, account.PrivateKey().GetScriptHash(), containerID, buffer, attrs, homomorphicHashingDisabled)
})
if err != nil {
select {
case errCh <- fmt.Errorf("failed to upload index file %d: %w", i, err):
default:
}
return
}
fmt.Fprintf(ctx.App.Writer, "Uploaded index file %d\n", i)
clear(buffer)
}
}()

select {
case err := <-errCh:
return err
case <-doneCh:
}

for i := existingIndexCount; i < expectedIndexCount; i++ {
startIndex := i * indexFileSize
endIndex := startIndex + indexFileSize
go func() {
for j := int(startIndex); j < int(endIndex); j += searchBatchSize {
remaining := int(endIndex) - j
end := j + min(searchBatchSize, remaining)
return nil
}

// searchObjects searches in parallel for objects with attribute GE startIndex and LT
// endIndex. It returns a buffered channel of resulting object IDs and closes it once
// OID search is finished. Errors are sent to errCh in a non-blocking way.
func searchObjects(ctx context.Context, p *pool.Pool, containerID cid.ID, account *wallet.Account, blockAttributeKey string, startIndex, endIndex uint, maxParallelSearches int, errCh chan error) chan oid.ID {
var (
res = make(chan oid.ID, 2*searchBatchSize)
wg sync.WaitGroup
)

prm = client.PrmObjectSearch{}
filters = object.NewSearchFilters()
filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", j), object.MatchNumGE)
for j := int(startIndex); j < int(endIndex); j += searchBatchSize * maxParallelSearches {
for i := range maxParallelSearches {
start := j + i*searchBatchSize
end := start + searchBatchSize

if start >= int(endIndex) {
break
}
if end > int(endIndex) {
end = int(endIndex)
}

wg.Add(1)
go func(start, end int) {
defer wg.Done()

prm := client.PrmObjectSearch{}
filters := object.NewSearchFilters()
filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", start), object.MatchNumGE)
filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", end), object.MatchNumLT)
prm.SetFilters(filters)

var objIDs []oid.ID
err := retry(func() error {
var errSearchIndex error
objIDs, errSearchIndex = neofs.ObjectSearch(ctx.Context, p, account.PrivateKey(), containerID.String(), prm)
return errSearchIndex
var errBlockSearch error
objIDs, errBlockSearch = neofs.ObjectSearch(ctx, p, account.PrivateKey(), containerID.String(), prm)
return errBlockSearch
})

if err != nil {
select {
case errCh <- fmt.Errorf("failed to search for objects from %d to %d for index file %d: %w", j, end, i, err):
case errCh <- fmt.Errorf("failed to search for block(s) from %d to %d: %w", start, end, err):
default:
}
return
}

for _, id := range objIDs {
oidCh <- id
res <- id
}
}
}()

var completed int
waitLoop:
for {
select {
case err := <-errCh:
return err
case <-oidFetcherToProcessor:
completed++
if completed == int(indexFileSize) {
break waitLoop
}
}
}(start, end)
}
// Check if there are any empty oids in the created index file.
// This could happen if object payload is empty ->
// attribute is not set correctly -> empty oid is added to the index file.
for k := 0; k < len(buffer); k += oidSize {
if slices.Compare(buffer[k:k+oidSize], emptyOid) == 0 {
return fmt.Errorf("empty oid found in index file %d at position %d (block index %d)", i, k/oidSize, i+uint(k/oidSize))
}
}
attrs := []object.Attribute{
*object.NewAttribute(attributeKey, strconv.Itoa(int(i))),
*object.NewAttribute("IndexSize", strconv.Itoa(int(indexFileSize))),
}
err := retry(func() error {
return uploadObj(ctx.Context, p, signer, account.PrivateKey().GetScriptHash(), containerID, buffer, attrs, homomorphicHashingDisabled)
})
if err != nil {
return fmt.Errorf("failed to upload index file %d: %w", i, err)
}
fmt.Fprintf(ctx.App.Writer, "Uploaded index file %d\n", i)
wg.Wait()
}

return nil
go func() {
wg.Wait()
close(res)
}()

return res
}

// uploadObj uploads object to the container using provided settings.
Expand Down

0 comments on commit 861df72

Please sign in to comment.