From 35d12779d66b1cd557c2b36fc4a6a1aa4b2be28a Mon Sep 17 00:00:00 2001 From: Ekaterina Pavlova Date: Fri, 1 Nov 2024 19:50:05 +0300 Subject: [PATCH] cli: adjust index file creation in upload-bin In case of incomplete search result it will try to find each missed oid and process it. In case of duplicates the first found will be in index file. Close #3647 Signed-off-by: Ekaterina Pavlova --- cli/util/uploader.go | 252 ++++++++++++++++++++++++++++--------------- 1 file changed, 165 insertions(+), 87 deletions(-) diff --git a/cli/util/uploader.go b/cli/util/uploader.go index 520b04c7b3..8ef315ab20 100644 --- a/cli/util/uploader.go +++ b/cli/util/uploader.go @@ -354,115 +354,193 @@ func uploadIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, accoun } var ( - errCh = make(chan error) - buffer = make([]byte, indexFileSize*oidSize) - oidCh = make(chan oid.ID, indexFileSize) - oidFetcherToProcessor = make(chan struct{}, indexFileSize) - + buffer = make([]byte, indexFileSize*oidSize) + doneCh = make(chan struct{}) + errCh = make(chan error) emptyOid = make([]byte, oidSize) ) - defer close(oidCh) - for range maxParallelSearches { - go func() { - for id := range oidCh { - var obj *object.Object - errRetr := retry(func() error { - var errGetHead error - obj, errGetHead = p.ObjectHead(context.Background(), containerID, id, signer, client.PrmObjectHead{}) - return errGetHead - }) - if errRetr != nil { - select { - case errCh <- fmt.Errorf("failed to fetch object %s: %w", id.String(), errRetr): - default: + + go func() { + defer close(doneCh) + + // Main processing loop for each index file. + for i := existingIndexCount; i < expectedIndexCount; i++ { + // Start block parsing goroutines. + var ( + // processedIndices is a mapping from position in buffer to the block index. + processedIndices sync.Map + wg sync.WaitGroup + oidCh = make(chan oid.ID, 2*maxParallelSearches) + ) + wg.Add(maxParallelSearches) + for range maxParallelSearches { + go func() { + defer wg.Done() + for id := range oidCh { + var obj *object.Object + errRetr := retry(func() error { + var errGetHead error + obj, errGetHead = p.ObjectHead(context.Background(), containerID, id, signer, client.PrmObjectHead{}) + return errGetHead + }) + if errRetr != nil { + select { + case errCh <- fmt.Errorf("failed to fetch object %s: %w", id.String(), errRetr): + default: + } + return + } + blockIndex, err := getBlockIndex(obj, blockAttributeKey) + if err != nil { + select { + case errCh <- fmt.Errorf("failed to get block index from object %s: %w", id.String(), err): + default: + } + return + } + pos := uint(blockIndex) % indexFileSize + if _, ok := processedIndices.LoadOrStore(pos, blockIndex); !ok { + id.Encode(buffer[pos*oidSize:]) + } } - return - } - blockIndex, err := getBlockIndex(obj, blockAttributeKey) - if err != nil { - select { - case errCh <- fmt.Errorf("failed to get block index from object %s: %w", id.String(), err): - default: + }() + } + + // Search for blocks within the index file range. + startIndex := i * indexFileSize + endIndex := startIndex + indexFileSize + objIDs := searchObjects(ctx.Context, p, containerID, account, blockAttributeKey, startIndex, endIndex, maxParallelSearches, errCh) + for id := range objIDs { + oidCh <- id + } + close(oidCh) + wg.Wait() + fmt.Fprintf(ctx.App.Writer, "Index file %d generated, checking for the missing blocks...\n", i) + + // Check if there are empty OIDs in the generated index file. This may happen + // if searchObjects has returned not all blocks within the requested range, ref. + // #3645. In this case, retry the search for every missing object. + var count int + + for idx := range indexFileSize { + if _, ok := processedIndices.Load(idx); !ok { + count++ + objIDs = searchObjects(ctx.Context, p, containerID, account, blockAttributeKey, i*indexFileSize+idx, i*indexFileSize+idx+1, 1, errCh) + // Block object duplicates are allowed, we're OK with the first found result. + id, ok := <-objIDs + for range objIDs { } - return + if !ok { + select { + case errCh <- fmt.Errorf("block %d is missing from the storage", i*indexFileSize+idx): + default: + } + return + } + processedIndices.Store(idx, id) + id.Encode(buffer[idx*oidSize:]) } - offset := (uint(blockIndex) % indexFileSize) * oidSize - id.Encode(buffer[offset:]) - oidFetcherToProcessor <- struct{}{} } - }() - } - - for i := existingIndexCount; i < expectedIndexCount; i++ { - startIndex := i * indexFileSize - endIndex := startIndex + indexFileSize - go func() { - for j := int(startIndex); j < int(endIndex); j += searchBatchSize { - remaining := int(endIndex) - j - end := j + min(searchBatchSize, remaining) - - prm = client.PrmObjectSearch{} - filters = object.NewSearchFilters() - filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", j), object.MatchNumGE) - filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", end), object.MatchNumLT) - prm.SetFilters(filters) - var objIDs []oid.ID - err := retry(func() error { - var errSearchIndex error - objIDs, errSearchIndex = neofs.ObjectSearch(ctx.Context, p, account.PrivateKey(), containerID.String(), prm) - return errSearchIndex - }) + fmt.Fprintf(ctx.App.Writer, "%d missing block(s) processed for index file %d, uploading index file...\n", count, i) - if err != nil { + // Check if there are empty OIDs in the generated index file. If it happens at + // this stage, then there's a bug in the code. + for k := 0; k < len(buffer); k += oidSize { + if slices.Compare(buffer[k:k+oidSize], emptyOid) == 0 { select { - case errCh <- fmt.Errorf("failed to search for objects from %d to %d for index file %d: %w", j, end, i, err): + case errCh <- fmt.Errorf("empty OID found in index file %d at position %d (block index %d)", i, k/oidSize, i*indexFileSize+uint(k/oidSize)): default: } return } - - for _, id := range objIDs { - oidCh <- id - } } - }() - var completed int - waitLoop: - for { - select { - case err := <-errCh: - return err - case <-oidFetcherToProcessor: - completed++ - if completed == int(indexFileSize) { - break waitLoop - } + // Upload index file. + attrs := []object.Attribute{ + *object.NewAttribute(attributeKey, strconv.Itoa(int(i))), + *object.NewAttribute("IndexSize", strconv.Itoa(int(indexFileSize))), } - } - // Check if there are empty OIDs in the generated index file. If it happens at - // this stage, then there's a bug in the code. - for k := 0; k < len(buffer); k += oidSize { - if slices.Compare(buffer[k:k+oidSize], emptyOid) == 0 { - return fmt.Errorf("empty OID found in index file %d at position %d (block index %d)", i, k/oidSize, i+uint(k/oidSize)) + err := retry(func() error { + return uploadObj(ctx.Context, p, signer, account.PrivateKey().GetScriptHash(), containerID, buffer, attrs, homomorphicHashingDisabled) + }) + if err != nil { + select { + case errCh <- fmt.Errorf("failed to upload index file %d: %w", i, err): + default: + } + return } + fmt.Fprintf(ctx.App.Writer, "Uploaded index file %d\n", i) + clear(buffer) } - attrs := []object.Attribute{ - *object.NewAttribute(attributeKey, strconv.Itoa(int(i))), - *object.NewAttribute("IndexSize", strconv.Itoa(int(indexFileSize))), - } - err := retry(func() error { - return uploadObj(ctx.Context, p, signer, account.PrivateKey().GetScriptHash(), containerID, buffer, attrs, homomorphicHashingDisabled) - }) - if err != nil { - return fmt.Errorf("failed to upload index file %d: %w", i, err) - } - fmt.Fprintf(ctx.App.Writer, "Uploaded index file %d\n", i) + }() + + select { + case err := <-errCh: + return err + case <-doneCh: } return nil } +// searchObjects searches in parallel for objects with attribute GE startIndex and LT +// endIndex. It returns a buffered channel of resulting object IDs and closes it once +// OID search is finished. Errors are sent to errCh in a non-blocking way. +func searchObjects(ctx context.Context, p *pool.Pool, containerID cid.ID, account *wallet.Account, blockAttributeKey string, startIndex, endIndex uint, maxParallelSearches int, errCh chan error) chan oid.ID { + var res = make(chan oid.ID, 2*searchBatchSize) + go func() { + var wg sync.WaitGroup + defer close(res) + + for i := int(startIndex); i < int(endIndex); i += searchBatchSize * maxParallelSearches { + for j := range maxParallelSearches { + start := i + j*searchBatchSize + end := start + searchBatchSize + + if start >= int(endIndex) { + break + } + if end > int(endIndex) { + end = int(endIndex) + } + + wg.Add(1) + go func(start, end int) { + defer wg.Done() + + prm := client.PrmObjectSearch{} + filters := object.NewSearchFilters() + filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", start), object.MatchNumGE) + filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", end), object.MatchNumLT) + prm.SetFilters(filters) + + var objIDs []oid.ID + err := retry(func() error { + var errBlockSearch error + objIDs, errBlockSearch = neofs.ObjectSearch(ctx, p, account.PrivateKey(), containerID.String(), prm) + return errBlockSearch + }) + if err != nil { + select { + case errCh <- fmt.Errorf("failed to search for block(s) from %d to %d: %w", start, end, err): + default: + } + return + } + + for _, id := range objIDs { + res <- id + } + }(start, end) + } + wg.Wait() + } + }() + + return res +} + // uploadObj uploads object to the container using provided settings. func uploadObj(ctx context.Context, p *pool.Pool, signer user.Signer, owner util.Uint160, containerID cid.ID, objData []byte, attrs []object.Attribute, homomorphicHashingDisabled bool) error { var (