Skip to content

Commit

Permalink
cli: adjust index file creation in upload-bin
Browse files Browse the repository at this point in the history
In case of incomplete search result it will try to find each missed oid
and process it. In case of duplicates the first found will be in index
file.

Close #3647

Signed-off-by: Ekaterina Pavlova <[email protected]>
  • Loading branch information
AliceInHunterland committed Nov 1, 2024
1 parent 365bbe0 commit 35d1277
Showing 1 changed file with 165 additions and 87 deletions.
252 changes: 165 additions & 87 deletions cli/util/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,115 +354,193 @@ func uploadIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, accoun
}

var (
errCh = make(chan error)
buffer = make([]byte, indexFileSize*oidSize)
oidCh = make(chan oid.ID, indexFileSize)
oidFetcherToProcessor = make(chan struct{}, indexFileSize)

buffer = make([]byte, indexFileSize*oidSize)
doneCh = make(chan struct{})
errCh = make(chan error)

Check warning on line 359 in cli/util/uploader.go

View check run for this annotation

Codecov / codecov/patch

cli/util/uploader.go#L357-L359

Added lines #L357 - L359 were not covered by tests
emptyOid = make([]byte, oidSize)
)
defer close(oidCh)
for range maxParallelSearches {
go func() {
for id := range oidCh {
var obj *object.Object
errRetr := retry(func() error {
var errGetHead error
obj, errGetHead = p.ObjectHead(context.Background(), containerID, id, signer, client.PrmObjectHead{})
return errGetHead
})
if errRetr != nil {
select {
case errCh <- fmt.Errorf("failed to fetch object %s: %w", id.String(), errRetr):
default:

go func() {
defer close(doneCh)

// Main processing loop for each index file.
for i := existingIndexCount; i < expectedIndexCount; i++ {
// Start block parsing goroutines.
var (
// processedIndices is a mapping from position in buffer to the block index.
processedIndices sync.Map
wg sync.WaitGroup
oidCh = make(chan oid.ID, 2*maxParallelSearches)
)
wg.Add(maxParallelSearches)
for range maxParallelSearches {
go func() {
defer wg.Done()
for id := range oidCh {
var obj *object.Object
errRetr := retry(func() error {
var errGetHead error
obj, errGetHead = p.ObjectHead(context.Background(), containerID, id, signer, client.PrmObjectHead{})
return errGetHead
})
if errRetr != nil {
select {
case errCh <- fmt.Errorf("failed to fetch object %s: %w", id.String(), errRetr):
default:

Check warning on line 389 in cli/util/uploader.go

View check run for this annotation

Codecov / codecov/patch

cli/util/uploader.go#L362-L389

Added lines #L362 - L389 were not covered by tests
}
return

Check warning on line 391 in cli/util/uploader.go

View check run for this annotation

Codecov / codecov/patch

cli/util/uploader.go#L391

Added line #L391 was not covered by tests
}
blockIndex, err := getBlockIndex(obj, blockAttributeKey)
if err != nil {
select {
case errCh <- fmt.Errorf("failed to get block index from object %s: %w", id.String(), err):
default:

Check warning on line 397 in cli/util/uploader.go

View check run for this annotation

Codecov / codecov/patch

cli/util/uploader.go#L393-L397

Added lines #L393 - L397 were not covered by tests
}
return

Check warning on line 399 in cli/util/uploader.go

View check run for this annotation

Codecov / codecov/patch

cli/util/uploader.go#L399

Added line #L399 was not covered by tests
}
pos := uint(blockIndex) % indexFileSize
if _, ok := processedIndices.LoadOrStore(pos, blockIndex); !ok {
id.Encode(buffer[pos*oidSize:])
}

Check warning on line 404 in cli/util/uploader.go

View check run for this annotation

Codecov / codecov/patch

cli/util/uploader.go#L401-L404

Added lines #L401 - L404 were not covered by tests
}
return
}
blockIndex, err := getBlockIndex(obj, blockAttributeKey)
if err != nil {
select {
case errCh <- fmt.Errorf("failed to get block index from object %s: %w", id.String(), err):
default:
}()
}

// Search for blocks within the index file range.
startIndex := i * indexFileSize
endIndex := startIndex + indexFileSize
objIDs := searchObjects(ctx.Context, p, containerID, account, blockAttributeKey, startIndex, endIndex, maxParallelSearches, errCh)
for id := range objIDs {
oidCh <- id
}
close(oidCh)
wg.Wait()
fmt.Fprintf(ctx.App.Writer, "Index file %d generated, checking for the missing blocks...\n", i)

// Check if there are empty OIDs in the generated index file. This may happen
// if searchObjects has returned not all blocks within the requested range, ref.
// #3645. In this case, retry the search for every missing object.
var count int

for idx := range indexFileSize {
if _, ok := processedIndices.Load(idx); !ok {
count++
objIDs = searchObjects(ctx.Context, p, containerID, account, blockAttributeKey, i*indexFileSize+idx, i*indexFileSize+idx+1, 1, errCh)
// Block object duplicates are allowed, we're OK with the first found result.
id, ok := <-objIDs
for range objIDs {

Check warning on line 431 in cli/util/uploader.go

View check run for this annotation

Codecov / codecov/patch

cli/util/uploader.go#L410-L431

Added lines #L410 - L431 were not covered by tests
}
return
if !ok {
select {
case errCh <- fmt.Errorf("block %d is missing from the storage", i*indexFileSize+idx):
default:

Check warning on line 436 in cli/util/uploader.go

View check run for this annotation

Codecov / codecov/patch

cli/util/uploader.go#L433-L436

Added lines #L433 - L436 were not covered by tests
}
return

Check warning on line 438 in cli/util/uploader.go

View check run for this annotation

Codecov / codecov/patch

cli/util/uploader.go#L438

Added line #L438 was not covered by tests
}
processedIndices.Store(idx, id)
id.Encode(buffer[idx*oidSize:])

Check warning on line 441 in cli/util/uploader.go

View check run for this annotation

Codecov / codecov/patch

cli/util/uploader.go#L440-L441

Added lines #L440 - L441 were not covered by tests
}
offset := (uint(blockIndex) % indexFileSize) * oidSize
id.Encode(buffer[offset:])
oidFetcherToProcessor <- struct{}{}
}
}()
}

for i := existingIndexCount; i < expectedIndexCount; i++ {
startIndex := i * indexFileSize
endIndex := startIndex + indexFileSize
go func() {
for j := int(startIndex); j < int(endIndex); j += searchBatchSize {
remaining := int(endIndex) - j
end := j + min(searchBatchSize, remaining)

prm = client.PrmObjectSearch{}
filters = object.NewSearchFilters()
filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", j), object.MatchNumGE)
filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", end), object.MatchNumLT)
prm.SetFilters(filters)
var objIDs []oid.ID
err := retry(func() error {
var errSearchIndex error
objIDs, errSearchIndex = neofs.ObjectSearch(ctx.Context, p, account.PrivateKey(), containerID.String(), prm)
return errSearchIndex
})
fmt.Fprintf(ctx.App.Writer, "%d missing block(s) processed for index file %d, uploading index file...\n", count, i)

Check warning on line 444 in cli/util/uploader.go

View check run for this annotation

Codecov / codecov/patch

cli/util/uploader.go#L444

Added line #L444 was not covered by tests

if err != nil {
// Check if there are empty OIDs in the generated index file. If it happens at
// this stage, then there's a bug in the code.
for k := 0; k < len(buffer); k += oidSize {
if slices.Compare(buffer[k:k+oidSize], emptyOid) == 0 {

Check warning on line 449 in cli/util/uploader.go

View check run for this annotation

Codecov / codecov/patch

cli/util/uploader.go#L446-L449

Added lines #L446 - L449 were not covered by tests
select {
case errCh <- fmt.Errorf("failed to search for objects from %d to %d for index file %d: %w", j, end, i, err):
case errCh <- fmt.Errorf("empty OID found in index file %d at position %d (block index %d)", i, k/oidSize, i*indexFileSize+uint(k/oidSize)):

Check warning on line 451 in cli/util/uploader.go

View check run for this annotation

Codecov / codecov/patch

cli/util/uploader.go#L451

Added line #L451 was not covered by tests
default:
}
return
}

for _, id := range objIDs {
oidCh <- id
}
}
}()

var completed int
waitLoop:
for {
select {
case err := <-errCh:
return err
case <-oidFetcherToProcessor:
completed++
if completed == int(indexFileSize) {
break waitLoop
}
// Upload index file.
attrs := []object.Attribute{
*object.NewAttribute(attributeKey, strconv.Itoa(int(i))),
*object.NewAttribute("IndexSize", strconv.Itoa(int(indexFileSize))),

Check warning on line 461 in cli/util/uploader.go

View check run for this annotation

Codecov / codecov/patch

cli/util/uploader.go#L459-L461

Added lines #L459 - L461 were not covered by tests
}
}
// Check if there are empty OIDs in the generated index file. If it happens at
// this stage, then there's a bug in the code.
for k := 0; k < len(buffer); k += oidSize {
if slices.Compare(buffer[k:k+oidSize], emptyOid) == 0 {
return fmt.Errorf("empty OID found in index file %d at position %d (block index %d)", i, k/oidSize, i+uint(k/oidSize))
err := retry(func() error {
return uploadObj(ctx.Context, p, signer, account.PrivateKey().GetScriptHash(), containerID, buffer, attrs, homomorphicHashingDisabled)
})
if err != nil {
select {
case errCh <- fmt.Errorf("failed to upload index file %d: %w", i, err):
default:

Check warning on line 469 in cli/util/uploader.go

View check run for this annotation

Codecov / codecov/patch

cli/util/uploader.go#L463-L469

Added lines #L463 - L469 were not covered by tests
}
return

Check warning on line 471 in cli/util/uploader.go

View check run for this annotation

Codecov / codecov/patch

cli/util/uploader.go#L471

Added line #L471 was not covered by tests
}
fmt.Fprintf(ctx.App.Writer, "Uploaded index file %d\n", i)
clear(buffer)

Check warning on line 474 in cli/util/uploader.go

View check run for this annotation

Codecov / codecov/patch

cli/util/uploader.go#L473-L474

Added lines #L473 - L474 were not covered by tests
}
attrs := []object.Attribute{
*object.NewAttribute(attributeKey, strconv.Itoa(int(i))),
*object.NewAttribute("IndexSize", strconv.Itoa(int(indexFileSize))),
}
err := retry(func() error {
return uploadObj(ctx.Context, p, signer, account.PrivateKey().GetScriptHash(), containerID, buffer, attrs, homomorphicHashingDisabled)
})
if err != nil {
return fmt.Errorf("failed to upload index file %d: %w", i, err)
}
fmt.Fprintf(ctx.App.Writer, "Uploaded index file %d\n", i)
}()

select {
case err := <-errCh:
return err
case <-doneCh:

Check warning on line 481 in cli/util/uploader.go

View check run for this annotation

Codecov / codecov/patch

cli/util/uploader.go#L478-L481

Added lines #L478 - L481 were not covered by tests
}

return nil
}

// searchObjects searches in parallel for objects with attribute GE startIndex and LT
// endIndex. It returns a buffered channel of resulting object IDs and closes it once
// OID search is finished. Errors are sent to errCh in a non-blocking way.
func searchObjects(ctx context.Context, p *pool.Pool, containerID cid.ID, account *wallet.Account, blockAttributeKey string, startIndex, endIndex uint, maxParallelSearches int, errCh chan error) chan oid.ID {
var res = make(chan oid.ID, 2*searchBatchSize)
go func() {
var wg sync.WaitGroup
defer close(res)

for i := int(startIndex); i < int(endIndex); i += searchBatchSize * maxParallelSearches {
for j := range maxParallelSearches {
start := i + j*searchBatchSize
end := start + searchBatchSize

if start >= int(endIndex) {
break

Check warning on line 502 in cli/util/uploader.go

View check run for this annotation

Codecov / codecov/patch

cli/util/uploader.go#L490-L502

Added lines #L490 - L502 were not covered by tests
}
if end > int(endIndex) {
end = int(endIndex)
}

Check warning on line 506 in cli/util/uploader.go

View check run for this annotation

Codecov / codecov/patch

cli/util/uploader.go#L504-L506

Added lines #L504 - L506 were not covered by tests

wg.Add(1)
go func(start, end int) {
defer wg.Done()

prm := client.PrmObjectSearch{}
filters := object.NewSearchFilters()
filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", start), object.MatchNumGE)
filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", end), object.MatchNumLT)
prm.SetFilters(filters)

var objIDs []oid.ID
err := retry(func() error {
var errBlockSearch error
objIDs, errBlockSearch = neofs.ObjectSearch(ctx, p, account.PrivateKey(), containerID.String(), prm)
return errBlockSearch
})
if err != nil {
select {
case errCh <- fmt.Errorf("failed to search for block(s) from %d to %d: %w", start, end, err):
default:

Check warning on line 527 in cli/util/uploader.go

View check run for this annotation

Codecov / codecov/patch

cli/util/uploader.go#L508-L527

Added lines #L508 - L527 were not covered by tests
}
return

Check warning on line 529 in cli/util/uploader.go

View check run for this annotation

Codecov / codecov/patch

cli/util/uploader.go#L529

Added line #L529 was not covered by tests
}

for _, id := range objIDs {
res <- id
}

Check warning on line 534 in cli/util/uploader.go

View check run for this annotation

Codecov / codecov/patch

cli/util/uploader.go#L532-L534

Added lines #L532 - L534 were not covered by tests
}(start, end)
}
wg.Wait()

Check warning on line 537 in cli/util/uploader.go

View check run for this annotation

Codecov / codecov/patch

cli/util/uploader.go#L537

Added line #L537 was not covered by tests
}
}()

return res

Check warning on line 541 in cli/util/uploader.go

View check run for this annotation

Codecov / codecov/patch

cli/util/uploader.go#L541

Added line #L541 was not covered by tests
}

// uploadObj uploads object to the container using provided settings.
func uploadObj(ctx context.Context, p *pool.Pool, signer user.Signer, owner util.Uint160, containerID cid.ID, objData []byte, attrs []object.Attribute, homomorphicHashingDisabled bool) error {

Check warning on line 545 in cli/util/uploader.go

View check run for this annotation

Codecov / codecov/patch

cli/util/uploader.go#L545

Added line #L545 was not covered by tests
var (
Expand Down

0 comments on commit 35d1277

Please sign in to comment.