-
Notifications
You must be signed in to change notification settings - Fork 79
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
This command is used for keep container with blocks for blockfetcher updated. Close #3578 Signed-off-by: Ekaterina Pavlova <[email protected]>
- Loading branch information
1 parent
9a38360
commit 13e075a
Showing
2 changed files
with
401 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,364 @@ | ||
package server | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"fmt" | ||
"log" | ||
"strconv" | ||
"sync" | ||
"time" | ||
|
||
"github.com/google/uuid" | ||
"github.com/nspcc-dev/neo-go/cli/cmdargs" | ||
"github.com/nspcc-dev/neo-go/cli/options" | ||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys" | ||
"github.com/nspcc-dev/neo-go/pkg/io" | ||
"github.com/nspcc-dev/neo-go/pkg/rpcclient" | ||
"github.com/nspcc-dev/neo-go/pkg/services/oracle/neofs" | ||
"github.com/nspcc-dev/neo-go/pkg/wallet" | ||
"github.com/nspcc-dev/neofs-sdk-go/client" | ||
cid "github.com/nspcc-dev/neofs-sdk-go/container/id" | ||
neofsecdsa "github.com/nspcc-dev/neofs-sdk-go/crypto/ecdsa" | ||
"github.com/nspcc-dev/neofs-sdk-go/object" | ||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id" | ||
"github.com/nspcc-dev/neofs-sdk-go/object/slicer" | ||
"github.com/nspcc-dev/neofs-sdk-go/session" | ||
"github.com/nspcc-dev/neofs-sdk-go/user" | ||
"github.com/urfave/cli/v2" | ||
) | ||
|
||
const ( | ||
searchBatchSize = 10000 // Number of objects to search in a batch for finding max block in container. | ||
maxParallelSearches = 40 // Control the number of concurrent searches for index files generation. | ||
indexFileSize = 128000 // Size of each index file. | ||
) | ||
|
||
func downloadPut(ctx *cli.Context) error { | ||
if err := cmdargs.EnsureNone(ctx); err != nil { | ||
return err | ||
} | ||
|
||
rpcNeoFS := ctx.String("rpc-neofs") | ||
containerIDStr := ctx.String("container") | ||
attribute := ctx.String("block-attribute") | ||
indexFileAttribute := ctx.String("index-attribute") | ||
acc, _, err := options.GetAccFromContext(ctx) | ||
if err != nil { | ||
return fmt.Errorf("failed to load wallet: %w", err) | ||
} | ||
|
||
var containerID cid.ID | ||
if err = containerID.DecodeString(containerIDStr); err != nil { | ||
return fmt.Errorf("failed to decode container ID: %w", err) | ||
} | ||
|
||
clientSDK, err := neofs.GetSDKClient(context.Background(), rpcNeoFS, 10*time.Minute) | ||
if err != nil { | ||
return fmt.Errorf("failed to create NeoFS client: %w", err) | ||
} | ||
defer clientSDK.Close() | ||
|
||
sessionToken, err := createSessionToken(*acc, containerID) | ||
if err != nil { | ||
return fmt.Errorf("failed to create session token: %w", err) | ||
} | ||
|
||
endpoint := ctx.String(options.RPCEndpointFlag) | ||
rpcClient, err := rpcclient.New(ctx.Context, endpoint, rpcclient.Options{ | ||
DialTimeout: 50 * time.Second, | ||
RequestTimeout: 50 * time.Second, | ||
}) | ||
if err != nil { | ||
return fmt.Errorf("failed to create RPC client: %w", err) | ||
} | ||
err = rpcClient.Init() | ||
if err != nil { | ||
return fmt.Errorf("failed to initialize RPC client: %w", err) | ||
} | ||
|
||
currentBlockHeight, err := rpcClient.GetBlockCount() | ||
if err != nil { | ||
return fmt.Errorf("failed to get current block height from RPC: %w", err) | ||
} | ||
log.Println("Current block height:", currentBlockHeight) | ||
|
||
maxBlockIndex, err := fetchMaxBlockIndex(ctx.Context, clientSDK, containerID, acc.PrivateKey(), uint(currentBlockHeight), attribute) | ||
if err != nil { | ||
return fmt.Errorf("failed to fetch max block index from container: %w", err) | ||
} | ||
log.Println("Max block index in NeoFS:", maxBlockIndex) | ||
if maxBlockIndex == 0 { | ||
maxBlockIndex = -1 | ||
} | ||
|
||
if maxBlockIndex > int(currentBlockHeight) { | ||
return fmt.Errorf("no new blocks to upload. Max index in NeoFS: %d, current height: %d", maxBlockIndex, currentBlockHeight) | ||
} | ||
|
||
for blockIndex := maxBlockIndex + 1; blockIndex <= int(currentBlockHeight); blockIndex++ { | ||
blockData, err := fetchBlockData(rpcClient, uint(blockIndex)) | ||
if err != nil { | ||
return fmt.Errorf("failed to fetch block %d: %w", blockIndex, err) | ||
} | ||
|
||
err = uploadBObjWithSlicer(ctx.Context, clientSDK, *acc, containerID, blockData, attribute, strconv.Itoa(blockIndex), sessionToken) | ||
if err != nil { | ||
return fmt.Errorf("failed to upload block %d: %w", blockIndex, err) | ||
} | ||
|
||
if blockIndex%1000 == 0 { | ||
log.Printf("Successfully uploaded block: %d", blockIndex) | ||
} | ||
} | ||
|
||
err = updateIndexFiles(ctx, clientSDK, containerID, *acc, uint(currentBlockHeight), indexFileAttribute, attribute) | ||
if err != nil { | ||
return fmt.Errorf("failed to update index files after upload: %w", err) | ||
} | ||
|
||
log.Println("Upload completed successfully.") | ||
return nil | ||
} | ||
|
||
// fetchMaxBlockIndex searches for the maximum block index in the container. | ||
func fetchMaxBlockIndex(ctx context.Context, clientSDK *client.Client, containerID cid.ID, priv *keys.PrivateKey, currentHeight uint, attributeKey string) (int, error) { | ||
height := int(currentHeight) | ||
var ( | ||
finalResult int | ||
finalErr error | ||
) | ||
|
||
searchBatch := func(start, end int) (int, bool, error) { | ||
prm := client.PrmObjectSearch{} | ||
filters := object.NewSearchFilters() | ||
filters.AddFilter(attributeKey, fmt.Sprintf("%d", start), object.MatchNumGE) | ||
filters.AddFilter(attributeKey, fmt.Sprintf("%d", end), object.MatchNumLE) | ||
prm.SetFilters(filters) | ||
|
||
objectIDs, err := neofs.ObjectSearch(ctx, clientSDK, priv, containerID.String(), prm) | ||
if err != nil { | ||
return 0, false, fmt.Errorf("failed to search objects from %d to %d: %w", start, end, err) | ||
} | ||
|
||
numOIDs := len(objectIDs) | ||
log.Printf("Found %d blocks between %d and %d", numOIDs, start, end) | ||
|
||
if numOIDs == 0 { | ||
return 0, false, nil // Keep searching. | ||
} | ||
|
||
// Return the max block index found in this batch | ||
maxInBatch := start + numOIDs - 1 | ||
|
||
// Stop immediately after finding the first non-empty batch | ||
return maxInBatch, true, nil | ||
} | ||
|
||
for height >= 0 { | ||
startIndex := height - searchBatchSize + 1 | ||
if startIndex < 0 { | ||
startIndex = 0 | ||
} | ||
|
||
maxInBatch, foundNonEmptyBatch, err := searchBatch(startIndex, height) | ||
if err != nil { | ||
finalErr = err | ||
break | ||
} | ||
|
||
if maxInBatch > finalResult { | ||
finalResult = maxInBatch | ||
} | ||
|
||
// Stop the search as soon as we find any valid blocks | ||
if foundNonEmptyBatch { | ||
break | ||
} | ||
|
||
height -= searchBatchSize | ||
} | ||
|
||
if finalErr != nil { | ||
return 0, finalErr | ||
} | ||
|
||
return finalResult, nil | ||
} | ||
|
||
func fetchBlockData(rpcClient *rpcclient.Client, index uint) ([]byte, error) { | ||
if index%1000 == 0 { | ||
log.Println("Fetching block", index) | ||
} | ||
block, err := rpcClient.GetBlockByIndex(uint32(index)) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to fetch block %d: %w", index, err) | ||
} | ||
|
||
var buf bytes.Buffer | ||
bw := io.NewBinWriterFromIO(&buf) | ||
|
||
block.EncodeBinary(bw) | ||
|
||
if bw.Err != nil { | ||
return nil, fmt.Errorf("failed to encode block %d: %w", index, bw.Err) | ||
} | ||
|
||
return buf.Bytes(), nil | ||
} | ||
|
||
func uploadBObjWithSlicer(ctx context.Context, clientSDK *client.Client, account wallet.Account, containerID cid.ID, objData []byte, attributeKey, attributeValue string, sessionToken *session.Object) error { | ||
signer := user.NewAutoIDSignerRFC6979(account.PrivateKey().PrivateKey) | ||
var ownerID user.ID | ||
ownerID.SetScriptHash(account.PrivateKey().GetScriptHash()) | ||
|
||
slc, err := slicer.New(ctx, clientSDK, signer, containerID, ownerID, sessionToken) | ||
if err != nil { | ||
return fmt.Errorf("failed to create slicer: %w", err) | ||
} | ||
|
||
attrs := []object.Attribute{ | ||
*object.NewAttribute(attributeKey, attributeValue), | ||
} | ||
|
||
_, err = slc.Put(ctx, bytes.NewReader(objData), attrs) | ||
if err != nil { | ||
return fmt.Errorf("failed to slice and upload block data: %w", err) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// updateIndexFiles updates the index files in the container. | ||
func updateIndexFiles(ctx *cli.Context, clientSDK *client.Client, containerID cid.ID, account wallet.Account, currentHeight uint, attributeKey string, blockAttributeKey string) error { | ||
log.Println("Updating index files...") | ||
|
||
prm := client.PrmObjectSearch{} | ||
filters := object.NewSearchFilters() | ||
filters.AddFilter(attributeKey, fmt.Sprintf("%d", 0), object.MatchNumGE) | ||
prm.SetFilters(filters) | ||
|
||
objectIDs, err := neofs.ObjectSearch(ctx.Context, clientSDK, account.PrivateKey(), containerID.String(), prm) | ||
if err != nil { | ||
return fmt.Errorf("no OIDs found for index files: %w", err) | ||
} | ||
|
||
existingIndexCount := uint(len(objectIDs)) | ||
expectedIndexCount := currentHeight / uint(indexFileSize) | ||
|
||
if existingIndexCount == expectedIndexCount { | ||
return nil | ||
} | ||
|
||
var buf bytes.Buffer | ||
fileIndex := int(existingIndexCount) | ||
|
||
ctxWithCancel, cancel := context.WithCancel(ctx.Context) | ||
defer cancel() | ||
|
||
workerPool := make(chan struct{}, maxParallelSearches) | ||
errCh := make(chan error, 1) | ||
wg := sync.WaitGroup{} | ||
|
||
for i := existingIndexCount; i < expectedIndexCount; i++ { | ||
startIndex := i * indexFileSize | ||
endIndex := startIndex + indexFileSize | ||
if endIndex > currentHeight { | ||
return fmt.Errorf("end index %d exceeds current height %d", endIndex, currentHeight) | ||
} | ||
|
||
blockOids := make([]oid.ID, indexFileSize) | ||
for j := startIndex; j < endIndex; j++ { | ||
select { | ||
case <-ctxWithCancel.Done(): | ||
return <-errCh | ||
default: | ||
} | ||
|
||
wg.Add(1) | ||
workerPool <- struct{}{} | ||
|
||
go func(index uint) { | ||
defer wg.Done() | ||
defer func() { <-workerPool }() | ||
|
||
oidBlock, err := searchBlockOID(ctx, clientSDK, account, containerID, blockAttributeKey, index) | ||
if err != nil { | ||
select { | ||
case errCh <- fmt.Errorf("failed to search for block OID at index %d: %w", index, err): | ||
cancel() | ||
default: | ||
} | ||
return | ||
} | ||
blockOids[index-startIndex] = oidBlock | ||
if index%1000 == 0 { | ||
log.Println("Found block OID", index) | ||
} | ||
}(j) | ||
} | ||
wg.Wait() | ||
|
||
select { | ||
case err := <-errCh: | ||
return err | ||
default: | ||
} | ||
|
||
for _, oidBlock := range blockOids { | ||
oidBytes := make([]byte, 32) | ||
oidBlock.Encode(oidBytes) | ||
buf.Write(oidBytes) | ||
} | ||
err = uploadBObjWithSlicer(ctx.Context, clientSDK, account, containerID, buf.Bytes(), attributeKey, strconv.Itoa(fileIndex), nil) | ||
if err != nil { | ||
return fmt.Errorf("failed to upload index file %d: %w", fileIndex, err) | ||
} | ||
log.Printf("Uploaded index file %d", fileIndex) | ||
|
||
fileIndex++ | ||
buf.Reset() | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// searchBlockOID function to search for the OID of a specific block by index. | ||
func searchBlockOID(ctx *cli.Context, clientSDK *client.Client, account wallet.Account, containerID cid.ID, blockAttributeKey string, blockIndex uint) (oid.ID, error) { | ||
prm := client.PrmObjectSearch{} | ||
filters := object.NewSearchFilters() | ||
filters.AddFilter(blockAttributeKey, fmt.Sprintf("%d", blockIndex), object.MatchStringEqual) | ||
prm.SetFilters(filters) | ||
|
||
objectIDs, err := neofs.ObjectSearch(ctx.Context, clientSDK, account.PrivateKey(), containerID.String(), prm) | ||
if err != nil || len(objectIDs) == 0 { | ||
return oid.ID{}, fmt.Errorf("no OIDs found for block index %d", blockIndex) | ||
} | ||
if len(objectIDs) > 1 { | ||
log.Println("Multiple OIDs found for block index", blockIndex) | ||
} | ||
return objectIDs[0], nil | ||
} | ||
|
||
func createSessionToken(account wallet.Account, containerID cid.ID) (*session.Object, error) { | ||
sessionToken := session.Object{} | ||
|
||
sessionID := uuid.New() | ||
sessionToken.SetID(sessionID) | ||
|
||
pubKey := account.PublicKey() | ||
authKey := neofsecdsa.PublicKey(*pubKey) | ||
sessionToken.SetAuthKey(&authKey) | ||
|
||
sessionToken.BindContainer(containerID) | ||
sessionToken.ForVerb(session.VerbObjectPut) | ||
|
||
sessionToken.SetExp(uint64(time.Now().Add(100 * time.Minute).Unix())) | ||
|
||
signer := user.NewAutoIDSignerRFC6979(account.PrivateKey().PrivateKey) | ||
if err := sessionToken.Sign(signer); err != nil { | ||
return nil, fmt.Errorf("failed to sign session token: %w", err) | ||
} | ||
|
||
return &sessionToken, nil | ||
} |
Oops, something went wrong.