diff --git a/cli/util/uploader.go b/cli/util/uploader.go index d716d9b092..7414241aed 100644 --- a/cli/util/uploader.go +++ b/cli/util/uploader.go @@ -110,6 +110,7 @@ func uploadBin(ctx *cli.Context) error { return cli.Exit(fmt.Errorf("failed to get network info: %w", err), 1) } homomorphicHashingDisabled := net.HomomorphicHashingDisabled() + var containerObj container.Container err = retry(func() error { containerObj, err = p.ContainerGet(ctx.Context, containerID, client.PrmContainerGet{}) @@ -126,7 +127,7 @@ func uploadBin(ctx *cli.Context) error { } magic := strconv.Itoa(int(v.Protocol.Network)) if containerMagic != magic { - return cli.Exit(fmt.Sprintf("Container magic %s does not match the network magic %s", containerMagic, magic), 1) + return cli.Exit(fmt.Sprintf("container magic %s does not match the network magic %s", containerMagic, magic), 1) } currentBlockHeight, err := rpc.GetBlockCount() @@ -135,27 +136,22 @@ func uploadBin(ctx *cli.Context) error { } fmt.Fprintln(ctx.App.Writer, "Chain block height:", currentBlockHeight) - if !ctx.Bool("skip-blocks-uploading") { - lastMissingBlockIndex, errBlock := fetchLatestMissingBlockIndex(ctx.Context, p, containerID, acc.PrivateKey(), attr, int(currentBlockHeight), maxParallelSearches) - if errBlock != nil { - return cli.Exit(fmt.Errorf("failed to fetch the latest missing block index from container: %w", err), 1) - } - - fmt.Fprintln(ctx.App.Writer, "First block of latest incomplete batch uploaded to NeoFS container:", lastMissingBlockIndex) + oldestMissingBlockIndex, errBlock := fetchLatestMissingBlockIndex(ctx.Context, p, containerID, acc.PrivateKey(), attr, int(currentBlockHeight), maxParallelSearches) + if errBlock != nil { + return cli.Exit(fmt.Errorf("failed to fetch the oldest missing block index from container: %w", err), 1) + } + fmt.Fprintln(ctx.App.Writer, "First block of latest incomplete batch uploaded to NeoFS container:", oldestMissingBlockIndex) - if lastMissingBlockIndex > int(currentBlockHeight) { - fmt.Fprintf(ctx.App.Writer, "No new blocks to upload. Need to upload starting from %d, current height %d\n", lastMissingBlockIndex, currentBlockHeight) - return nil - } - err = uploadBlocks(ctx, p, rpc, signer, containerID, acc, attr, lastMissingBlockIndex, uint(currentBlockHeight), homomorphicHashingDisabled, numWorkers) + if !ctx.Bool("skip-blocks-uploading") { + err = uploadBlocks(ctx, p, rpc, signer, containerID, acc, attr, oldestMissingBlockIndex, uint(currentBlockHeight), homomorphicHashingDisabled, numWorkers) if err != nil { - return cli.Exit(fmt.Errorf("failed to upload blocks to NeoFS container: %w", err), 1) + return cli.Exit(fmt.Errorf("failed to upload blocks: %w", err), 1) } } - err = updateIndexFiles(ctx, p, containerID, *acc, signer, uint(currentBlockHeight), attr, homomorphicHashingDisabled, maxParallelSearches) + err = uploadIndexFiles(ctx, p, containerID, *acc, signer, uint(currentBlockHeight), attr, homomorphicHashingDisabled, maxParallelSearches) if err != nil { - return cli.Exit(fmt.Errorf("failed to update index files after upload: %w", err), 1) + return cli.Exit(fmt.Errorf("failed to upload index files: %w", err), 1) } return nil } @@ -184,8 +180,9 @@ type searchResult struct { err error } -// fetchLatestMissingBlockIndex searches the container for the last full block batch, -// starting from the currentHeight and going backwards. +// fetchLatestMissingBlockIndex searches the container for the latest full batch of blocks +// starting from the currentHeight and going backwards. It returns the index of first block +// in the next batch. func fetchLatestMissingBlockIndex(ctx context.Context, p *pool.Pool, containerID cid.ID, priv *keys.PrivateKey, attributeKey string, currentHeight int, maxParallelSearches int) (int, error) { var ( wg sync.WaitGroup @@ -230,7 +227,7 @@ func fetchLatestMissingBlockIndex(ctx context.Context, p *pool.Pool, containerID for i := len(results) - 1; i >= 0; i-- { if results[i].err != nil { - return 0, fmt.Errorf("search of index files failed for batch with indexes from %d to %d: %w", results[i].startIndex, results[i].endIndex-1, results[i].err) + return 0, fmt.Errorf("blocks search failed for batch with indexes from %d to %d: %w", results[i].startIndex, results[i].endIndex-1, results[i].err) } if results[i].numOIDs < searchBatchSize { emptyBatchFound = true @@ -244,11 +241,15 @@ func fetchLatestMissingBlockIndex(ctx context.Context, p *pool.Pool, containerID return 0, nil } -func uploadBlocks(ctx *cli.Context, p *pool.Pool, rpc *rpcclient.Client, signer user.Signer, containerID cid.ID, acc *wallet.Account, attr string, lastMissingBlockIndex int, currentBlockHeight uint, homomorphicHashingDisabled bool, numWorkers int) error { - for batchStart := lastMissingBlockIndex; batchStart <= int(currentBlockHeight); batchStart += searchBatchSize { +func uploadBlocks(ctx *cli.Context, p *pool.Pool, rpc *rpcclient.Client, signer user.Signer, containerID cid.ID, acc *wallet.Account, attr string, oldestMissingBlockIndex int, currentBlockHeight uint, homomorphicHashingDisabled bool, numWorkers int) error { + if oldestMissingBlockIndex > int(currentBlockHeight) { + fmt.Fprintf(ctx.App.Writer, "No new blocks to upload. Need to upload starting from %d, current height %d\n", oldestMissingBlockIndex, currentBlockHeight) + return nil + } + for batchStart := oldestMissingBlockIndex; batchStart <= int(currentBlockHeight); batchStart += searchBatchSize { var ( batchEnd = min(batchStart+searchBatchSize, int(currentBlockHeight)+1) - errorCh = make(chan error) + errCh = make(chan error) doneCh = make(chan struct{}) wg sync.WaitGroup ) @@ -269,7 +270,7 @@ func uploadBlocks(ctx *cli.Context, p *pool.Pool, rpc *rpcclient.Client, signer }) if errGet != nil { select { - case errorCh <- errGet: + case errCh <- errGet: default: } return @@ -278,7 +279,10 @@ func uploadBlocks(ctx *cli.Context, p *pool.Pool, rpc *rpcclient.Client, signer bw := io.NewBufBinWriter() blk.EncodeBinary(bw.BinWriter) if bw.Err != nil { - errorCh <- fmt.Errorf("failed to encode block %d: %w", blockIndex, bw.Err) + select { + case errCh <- fmt.Errorf("failed to encode block %d: %w", blockIndex, bw.Err): + default: + } return } attrs := []object.Attribute{ @@ -295,7 +299,7 @@ func uploadBlocks(ctx *cli.Context, p *pool.Pool, rpc *rpcclient.Client, signer }) if errRetr != nil { select { - case errorCh <- errRetr: + case errCh <- errRetr: default: } return @@ -310,7 +314,7 @@ func uploadBlocks(ctx *cli.Context, p *pool.Pool, rpc *rpcclient.Client, signer }() select { - case err := <-errorCh: + case err := <-errCh: return fmt.Errorf("upload error: %w", err) case <-doneCh: } @@ -320,11 +324,11 @@ func uploadBlocks(ctx *cli.Context, p *pool.Pool, rpc *rpcclient.Client, signer return nil } -// updateIndexFiles updates the index files in the container. -func updateIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, account wallet.Account, signer user.Signer, currentHeight uint, blockAttributeKey string, homomorphicHashingDisabled bool, maxParallelSearches int) error { +// uploadIndexFiles uploads missing index files to the container. +func uploadIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, account wallet.Account, signer user.Signer, currentHeight uint, blockAttributeKey string, homomorphicHashingDisabled bool, maxParallelSearches int) error { attributeKey := ctx.String("index-attribute") indexFileSize := ctx.Uint("index-file-size") - fmt.Fprintln(ctx.App.Writer, "Updating index files...") + fmt.Fprintln(ctx.App.Writer, "Uploading index files...") prm := client.PrmObjectSearch{} filters := object.NewSearchFilters() @@ -338,129 +342,201 @@ func updateIndexFiles(ctx *cli.Context, p *pool.Pool, containerID cid.ID, accoun return errSearchIndex }) if errSearch != nil { - return fmt.Errorf("search of index files failed: %w", errSearch) + return fmt.Errorf("index files search failed: %w", errSearch) } existingIndexCount := uint(len(objectIDs)) expectedIndexCount := currentHeight / indexFileSize - if existingIndexCount >= expectedIndexCount { fmt.Fprintf(ctx.App.Writer, "Index files are up to date. Existing: %d, expected: %d\n", existingIndexCount, expectedIndexCount) return nil } var ( - errCh = make(chan error) - buffer = make([]byte, indexFileSize*oidSize) - oidCh = make(chan oid.ID, indexFileSize) - oidFetcherToProcessor = make(chan struct{}, indexFileSize) - - emptyOid = make([]byte, oidSize) + // processedIndices is a mapping from position in buffer to the block index. + processedIndices sync.Map + buffer = make([]byte, indexFileSize*oidSize) + doneCh = make(chan struct{}) + errCh = make(chan error) + emptyOid = make([]byte, oidSize) ) - defer close(oidCh) - for range maxParallelSearches { - go func() { - for id := range oidCh { - var obj *object.Object - errRetr := retry(func() error { - var errGetHead error - obj, errGetHead = p.ObjectHead(context.Background(), containerID, id, signer, client.PrmObjectHead{}) - return errGetHead - }) - if errRetr != nil { - select { - case errCh <- fmt.Errorf("failed to fetch object %s: %w", id.String(), errRetr): - default: + + go func() { + defer close(doneCh) + + // Main processing loop for each index file. + for i := existingIndexCount; i < expectedIndexCount; i++ { + // Start block parsing goroutines. + var ( + wg sync.WaitGroup + oidCh = make(chan oid.ID, 2*maxParallelSearches) + ) + wg.Add(maxParallelSearches) + for range maxParallelSearches { + go func() { + defer wg.Done() + for id := range oidCh { + var obj *object.Object + errRetr := retry(func() error { + var errGetHead error + obj, errGetHead = p.ObjectHead(context.Background(), containerID, id, signer, client.PrmObjectHead{}) + return errGetHead + }) + if errRetr != nil { + select { + case errCh <- fmt.Errorf("failed to fetch object %s: %w", id.String(), errRetr): + default: + } + return + } + blockIndex, err := getBlockIndex(obj, blockAttributeKey) + if err != nil { + select { + case errCh <- fmt.Errorf("failed to get block index from object %s: %w", id.String(), err): + default: + } + return + } + pos := uint(blockIndex) % indexFileSize + if _, ok := processedIndices.LoadOrStore(pos, blockIndex); !ok { + id.Encode(buffer[pos*oidSize:]) + } } - return - } - blockIndex, err := getBlockIndex(obj, blockAttributeKey) - if err != nil { - select { - case errCh <- fmt.Errorf("failed to get block index from object %s: %w", id.String(), err): - default: + }() + } + + // Search for blocks within the index file range. + startIndex := i * indexFileSize + endIndex := startIndex + indexFileSize + objIDs := searchObjects(ctx.Context, p, containerID, account, blockAttributeKey, startIndex, endIndex, maxParallelSearches, errCh) + for id := range objIDs { + oidCh <- id + } + close(oidCh) + wg.Wait() + fmt.Fprintf(ctx.App.Writer, "Index file %d generated, checking for the missing blocks...\n", i) + + // Check if there are empty OIDs in the generated index file. This may happen + // if searchObjects has returned not all blocks within the requested range, ref. + // #3645. In this case, retry the search for every missing object. + var ( + count int + ) + for idx := range indexFileSize { + if _, ok := processedIndices.Load(idx); !ok { + count++ + objIDs = searchObjects(ctx.Context, p, containerID, account, blockAttributeKey, i*indexFileSize+idx, i*indexFileSize+idx+1, 1, errCh) + // Block object duplicates are allowed, we're OK with the first found result. + id, ok := <-objIDs + for range objIDs { } - return + if !ok { + select { + case errCh <- fmt.Errorf("block %d is missing from the storage", i*indexFileSize+idx): + default: + } + return + } + processedIndices.Store(idx, id) + id.Encode(buffer[idx*oidSize:]) } - offset := (uint(blockIndex) % indexFileSize) * oidSize - id.Encode(buffer[offset:]) - oidFetcherToProcessor <- struct{}{} } - }() - } + fmt.Fprintf(ctx.App.Writer, "%d missing block(s) processed for index file %d, uploading index file...\n", count, i) - for i := existingIndexCount; i < expectedIndexCount; i++ { - startIndex := i * indexFileSize - endIndex := startIndex + indexFileSize - go func() { - for j := int(startIndex); j < int(endIndex); j += searchBatchSize { - remaining := int(endIndex) - j - end := j + min(searchBatchSize, remaining) - - prm = client.PrmObjectSearch{} - filters = object.NewSearchFilters() - filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", j), object.MatchNumGE) - filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", end), object.MatchNumLT) - prm.SetFilters(filters) - var objIDs []oid.ID - err := retry(func() error { - var errSearchIndex error - objIDs, errSearchIndex = neofs.ObjectSearch(ctx.Context, p, account.PrivateKey(), containerID.String(), prm) - return errSearchIndex - }) - - if err != nil { + // Check if there are empty OIDs in the generated index file. If it happens at + // this stage, then there's a bug in the code. + for k := 0; k < len(buffer); k += oidSize { + if slices.Compare(buffer[k:k+oidSize], emptyOid) == 0 { select { - case errCh <- fmt.Errorf("failed to search for objects from %d to %d for index file %d: %w", j, end, i, err): + case errCh <- fmt.Errorf("empty OID found in index file %d at position %d (block index %d)", i, k/oidSize, i+uint(k/oidSize)): default: } return } + } - for _, id := range objIDs { - oidCh <- id + // Upload index file. + attrs := []object.Attribute{ + *object.NewAttribute(attributeKey, strconv.Itoa(int(i))), + *object.NewAttribute("IndexSize", strconv.Itoa(int(indexFileSize))), + } + err := retry(func() error { + return uploadObj(ctx.Context, p, signer, account.PrivateKey().GetScriptHash(), containerID, buffer, attrs, homomorphicHashingDisabled) + }) + if err != nil { + select { + case errCh <- fmt.Errorf("failed to upload index file %d: %w", i, err): + default: } + return } - }() + fmt.Fprintf(ctx.App.Writer, "Uploaded index file %d\n", i) + clear(buffer) + } + }() - var completed int - waitLoop: - for { - select { - case err := <-errCh: - return err - case <-oidFetcherToProcessor: - completed++ - if completed == int(indexFileSize) { - break waitLoop + select { + case err := <-errCh: + return err + case <-doneCh: + } + + return nil +} + +// searchObjects searches in parallel for objects with attribute GE startIndex and LT +// endIndex. It returns a buffered channel of resulting object IDs and closes it once +// OID search is finished. Errors are sent to errCh in a non-blocking way. +func searchObjects(ctx context.Context, p *pool.Pool, containerID cid.ID, account wallet.Account, blockAttributeKey string, startIndex, endIndex uint, maxParallelSearches int, errCh chan error) chan oid.ID { + var ( + res = make(chan oid.ID, 2*searchBatchSize) + wg sync.WaitGroup + ) + + for j := int(startIndex); j < int(endIndex); j += searchBatchSize { + wg.Add(1) + go func(j int) { + defer wg.Done() + + remaining := int(endIndex) - j + end := j + min(searchBatchSize, remaining) + + prm := client.PrmObjectSearch{} + filters := object.NewSearchFilters() + filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", j), object.MatchNumGE) + filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", end), object.MatchNumLT) + prm.SetFilters(filters) + + var objIDs []oid.ID + err := retry(func() error { + var errBlockSearch error + objIDs, errBlockSearch = neofs.ObjectSearch(ctx, p, account.PrivateKey(), containerID.String(), prm) + return errBlockSearch + }) + if err != nil { + select { + case errCh <- fmt.Errorf("failed to search for block(s) from %d to %d: %w", j, end, err): + default: } + return } - } - // Check if there are any empty oids in the created index file. - // This could happen if object payload is empty -> - // attribute is not set correctly -> empty oid is added to the index file. - for k := 0; k < len(buffer); k += oidSize { - if slices.Compare(buffer[k:k+oidSize], emptyOid) == 0 { - return fmt.Errorf("empty oid found in index file %d at position %d (block index %d)", i, k/oidSize, i+uint(k/oidSize)) + + for _, id := range objIDs { + res <- id } - } - attrs := []object.Attribute{ - *object.NewAttribute(attributeKey, strconv.Itoa(int(i))), - *object.NewAttribute("IndexSize", strconv.Itoa(int(indexFileSize))), - } - err := retry(func() error { - return uploadObj(ctx.Context, p, signer, account.PrivateKey().GetScriptHash(), containerID, buffer, attrs, homomorphicHashingDisabled) - }) - if err != nil { - return fmt.Errorf("failed to upload index file %d: %w", i, err) - } - fmt.Fprintf(ctx.App.Writer, "Uploaded index file %d\n", i) + }(j) } - return nil + + go func() { + wg.Wait() + close(res) + }() + + return res } -// uploadObj uploads the block to the container using the pool. -func uploadObj(ctx context.Context, p *pool.Pool, signer user.Signer, owner util.Uint160, containerID cid.ID, objData []byte, attrs []object.Attribute, HomomorphicHashingDisabled bool) error { +// uploadObj uploads object to the container using provided settings. +func uploadObj(ctx context.Context, p *pool.Pool, signer user.Signer, owner util.Uint160, containerID cid.ID, objData []byte, attrs []object.Attribute, homomorphicHashingDisabled bool) error { var ( ownerID user.ID hdr object.Object @@ -479,7 +555,7 @@ func uploadObj(ctx context.Context, p *pool.Pool, signer user.Signer, owner util hdr.SetCreationEpoch(1) v.SetMajor(1) hdr.SetVersion(v) - if !HomomorphicHashingDisabled { + if !homomorphicHashingDisabled { checksum.Calculate(&chHomomorphic, checksum.TZ, objData) hdr.SetPayloadHomomorphicHash(chHomomorphic) }