From a6ae2566b227c484707fe344ace29ba3bb0ab05a Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Wed, 30 Oct 2024 12:08:20 +0000 Subject: [PATCH] refactor: multiple tests --- go.mod | 7 +- go.sum | 4 +- helpers.go | 288 +++++++++++++++++++ internal/bootstrap/bootstrap.go | 74 +++-- internal/printer/queryresults.go | 90 ++++++ internal/redis/blackhole.go | 36 +++ internal/redis/{redisclient.go => map.go} | 0 internal/testutil/gen.go | 22 +- internal/upload/service.go | 10 + main_test.go | 323 +++------------------- 10 files changed, 540 insertions(+), 314 deletions(-) create mode 100644 helpers.go create mode 100644 internal/printer/queryresults.go create mode 100644 internal/redis/blackhole.go rename internal/redis/{redisclient.go => map.go} (100%) diff --git a/go.mod b/go.mod index 8464434..f9340a9 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.23.2 require ( github.com/alanshaw/storetheindex v0.0.0-20241026220359-15f172e24dcc + github.com/ipfs/go-bitswap v0.11.0 github.com/ipfs/go-cid v0.4.1 github.com/ipfs/go-datastore v0.6.0 github.com/ipfs/go-log/v2 v2.5.1 @@ -17,9 +18,10 @@ require ( github.com/redis/go-redis/v9 v9.6.1 github.com/storacha/go-capabilities v0.0.0-20241028080752-22b41240aa67 github.com/storacha/go-ucanto v0.1.1-0.20241028163940-34de8cd912bb - github.com/storacha/indexing-service v0.0.2-0.20241029205304-9cbbc12f3269 + github.com/storacha/indexing-service v0.0.2-0.20241030071325-6725d07e355c github.com/storacha/storage v0.0.1-0.20241029210319-746c528fb2fc github.com/stretchr/testify v1.9.0 + github.com/ucan-wg/go-ucan v0.0.0-20240916120445-37f52863156c ) require ( @@ -57,6 +59,7 @@ require ( github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 // indirect github.com/containerd/cgroups v1.1.0 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect + github.com/cskr/pubsub v1.0.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 // indirect @@ -95,6 +98,7 @@ require ( github.com/ipfs/go-blockservice v0.5.2 // indirect github.com/ipfs/go-ds-flatfs v0.5.1 // indirect github.com/ipfs/go-ipfs-blockstore v1.3.1 // indirect + github.com/ipfs/go-ipfs-delay v0.0.1 // indirect github.com/ipfs/go-ipfs-ds-help v1.1.1 // indirect github.com/ipfs/go-ipfs-exchange-interface v0.2.1 // indirect github.com/ipfs/go-ipfs-util v0.0.3 // indirect @@ -180,7 +184,6 @@ require ( github.com/storacha/go-metadata v0.0.0-20241021141939-f94d93dcda78 // indirect github.com/storacha/ipni-publisher v0.0.0-20241029210117-3286a26a47a8 // indirect github.com/twmb/murmur3 v1.1.6 // indirect - github.com/ucan-wg/go-ucan v0.0.0-20240916120445-37f52863156c // indirect github.com/whyrusleeping/cbor v0.0.0-20171005072247-63513f603b11 // indirect github.com/whyrusleeping/cbor-gen v0.1.2 // indirect github.com/wlynxg/anet v0.0.5 // indirect diff --git a/go.sum b/go.sum index 31ec808..8641934 100644 --- a/go.sum +++ b/go.sum @@ -810,8 +810,8 @@ github.com/storacha/go-metadata v0.0.0-20241021141939-f94d93dcda78 h1:NAti9hMLMo github.com/storacha/go-metadata v0.0.0-20241021141939-f94d93dcda78/go.mod h1:DcwwQnyFuTk531cKD9sUkQg/gzpwKaIqH9I7oZYyeRc= github.com/storacha/go-ucanto v0.1.1-0.20241028163940-34de8cd912bb h1:lFwFtMjgt162ot9pnu230haLjRQ1rqwJOIAyywOqAX8= github.com/storacha/go-ucanto v0.1.1-0.20241028163940-34de8cd912bb/go.mod h1:7ba9jAgqmwlF/JfyFUQcGV07uiYNlmJNu8qH4hHtrJk= -github.com/storacha/indexing-service v0.0.2-0.20241029205304-9cbbc12f3269 h1:fTeL0y5LrKTyBR5JEfbM3TsINIxjFxBf6vSq2g7vrUw= -github.com/storacha/indexing-service v0.0.2-0.20241029205304-9cbbc12f3269/go.mod h1:HXMul+TKGkPL44K63zsk/EodegKiaFC5NXgafCNb+Ps= +github.com/storacha/indexing-service v0.0.2-0.20241030071325-6725d07e355c h1:21tx8rWH5n6DMXb/dKZwjsXinoUTHmDV/pSGKE+c3Mg= +github.com/storacha/indexing-service v0.0.2-0.20241030071325-6725d07e355c/go.mod h1:PzhVpNxtYwiPBZl6ph5nSNVTDtKZfGuHnvo1e0b39cw= github.com/storacha/ipni-publisher v0.0.0-20241029210117-3286a26a47a8 h1:QnLy9Y1x5NL1b8NFj71i+0bN+alPgjdcR0SMozXNeLU= github.com/storacha/ipni-publisher v0.0.0-20241029210117-3286a26a47a8/go.mod h1:fEuGSF5WMaOSAyDQCYAvow6Y+YKzpXczEk3A+H+s1fQ= github.com/storacha/storage v0.0.1-0.20241029210319-746c528fb2fc h1:yW7Y0sAHseq28o2o7HP4ioFKpzGLil7m+iLrGsD/Sw0= diff --git a/helpers.go b/helpers.go new file mode 100644 index 0000000..61a0f74 --- /dev/null +++ b/helpers.go @@ -0,0 +1,288 @@ +package main + +import ( + "bytes" + "context" + "fmt" + "io" + "net/http" + "net/url" + "slices" + "testing" + + "github.com/ipfs/go-cid" + "github.com/ipld/go-ipld-prime" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + "github.com/multiformats/go-multicodec" + "github.com/multiformats/go-multihash" + "github.com/storacha/go-capabilities/pkg/assert" + "github.com/storacha/go-capabilities/pkg/blob" + "github.com/storacha/go-capabilities/pkg/claim" + "github.com/storacha/go-ucanto/core/dag/blockstore" + "github.com/storacha/go-ucanto/core/delegation" + "github.com/storacha/go-ucanto/did" + "github.com/storacha/go-ucanto/principal" + "github.com/storacha/go-ucanto/ucan" + "github.com/storacha/indexing-service/pkg/blobindex" + "github.com/storacha/indexing-service/pkg/client" + "github.com/storacha/indexing-service/pkg/types" + "github.com/storacha/testthenetwork/internal/bootstrap" + "github.com/storacha/testthenetwork/internal/digestutil" + "github.com/storacha/testthenetwork/internal/testutil" + "github.com/storacha/testthenetwork/internal/upload" + "github.com/stretchr/testify/require" +) + +func generateURLs(t *testing.T) (ipniFindURL, ipniAnnounceURL, storageURL, indexingURL url.URL) { + ipniFindURL = testutil.RandomLocalURL(t) + ipniAnnounceURL = testutil.RandomLocalURL(t) + storageURL = testutil.RandomLocalURL(t) + indexingURL = testutil.RandomLocalURL(t) + return +} + +func generateIdentities(t *testing.T) (storageID, indexingID, uploadID, aliceID principal.Signer) { + storageID = testutil.RandomSigner(t) + indexingID = testutil.RandomSigner(t) + uploadID = testutil.RandomSigner(t) + aliceID = testutil.RandomSigner(t) + return +} + +func generateProofs(t *testing.T, storageID, indexingID principal.Signer, uploadID, aliceID ucan.Principal) (storageIndexingProof, uploadStorageProof, aliceIndexingProof delegation.Proof) { + // proof storage node can invoke on indexing service + storageIndexingProof = delegation.FromDelegation( + testutil.Must( + delegation.Delegate( + indexingID, + storageID, + []ucan.Capability[ucan.NoCaveats]{ + ucan.NewCapability( + claim.CacheAbility, + indexingID.DID().String(), + ucan.NoCaveats{}, + ), + }, + ), + )(t), + ) + // proof upload service can invoke on storage node + uploadStorageProof = delegation.FromDelegation( + testutil.Must( + delegation.Delegate( + storageID, + uploadID, + []ucan.Capability[ucan.NoCaveats]{ + ucan.NewCapability( + blob.AllocateAbility, + storageID.DID().String(), + ucan.NoCaveats{}, + ), + ucan.NewCapability( + blob.AcceptAbility, + storageID.DID().String(), + ucan.NoCaveats{}, + ), + }, + ), + )(t), + ) + // proof alice can invoke on indexing service + aliceIndexingProof = delegation.FromDelegation( + testutil.Must( + delegation.Delegate( + indexingID, + aliceID, + []ucan.Capability[ucan.NoCaveats]{ + ucan.NewCapability( + assert.EqualsAbility, + indexingID.DID().String(), + ucan.NoCaveats{}, + ), + ucan.NewCapability( + assert.IndexAbility, + indexingID.DID().String(), + ucan.NoCaveats{}, + ), + }, + ), + )(t), + ) + return +} + +func startServices( + t *testing.T, + ipniFindURL, ipniAnnounceURL url.URL, + storageID principal.Signer, + storageURL url.URL, + storageIndexingProof delegation.Proof, + indexingID principal.Signer, + indexingURL url.URL, + indexingNoCache bool, + uploadID principal.Signer, + uploadStorageProof delegation.Proof, +) (*upload.UploadService, *client.Client) { + fmt.Println("→ starting IPNI service") + closeIPNI := bootstrap.StartIPNIService(t, ipniAnnounceURL, ipniFindURL) + t.Cleanup(closeIPNI) + fmt.Printf("✔ IPNI find and announce services running at %s and %s\n", ipniFindURL.String(), ipniAnnounceURL.String()) + + fmt.Println("→ starting indexing service") + closeIndexing := bootstrap.StartIndexingService(t, indexingID, indexingURL, ipniFindURL, ipniAnnounceURL, indexingNoCache) + t.Cleanup(closeIndexing) + fmt.Printf("✔ indexing service (%s) running at %s\n", indexingID.DID(), indexingURL.String()) + + fmt.Println("→ starting storage node") + closeStorage := bootstrap.StartStorageNode(t, storageID, storageURL, ipniAnnounceURL, indexingID, indexingURL, storageIndexingProof) + t.Cleanup(closeStorage) + fmt.Printf("✔ storage node (%s) running at %s\n", storageID.DID(), storageURL.String()) + + fmt.Println("→ creating indexing service client") + indexingClient, err := client.New(indexingID, indexingURL) + require.NoError(t, err) + fmt.Printf("✔ indexing service client created\n") + + fmt.Println("→ creating upload service") + uploadService := upload.NewService(t, upload.Config{ + ID: uploadID, + StorageNodeID: storageID, + StorageNodeURL: storageURL, + StorageProof: uploadStorageProof, + }) + fmt.Printf("✔ upload service (%s) created\n", uploadID.DID()) + + return uploadService, indexingClient +} + +func generateContent(t *testing.T, size int) (ipld.Link, multihash.Multihash, multihash.Multihash, []byte) { + fmt.Println("→ generating content") + root, rootDigest, digest, data := testutil.RandomCAR(t, size) + fmt.Printf("✔ generation success, root: %s\n", root.String()) + return root, rootDigest, digest, data +} + +func putBlob(t *testing.T, location url.URL, headers http.Header, data []byte) { + fmt.Printf("→ performing http/put to %s\n", location.String()) + req, err := http.NewRequest("PUT", location.String(), bytes.NewReader(data)) + require.NoError(t, err) + req.Header = headers + + res, err := http.DefaultClient.Do(req) + require.NoError(t, err) + require.Equal(t, http.StatusCreated, res.StatusCode) + fmt.Println("✔ index http/put success") +} + +func decodeLocationCommitmentCaveats(t *testing.T, claim delegation.Delegation) assert.LocationCaveats { + fmt.Println("→ decoding location commitment") + nb, rerr := assert.LocationCaveatsReader.Read(claim.Capabilities()[0].Nb()) + require.NoError(t, rerr) + fmt.Printf("✔ decode success - %s @ %s\n", claim.Capabilities()[0].Can(), nb.Location[0].String()) + return nb +} + +func fetchBlob(t *testing.T, location url.URL) ([]byte, multihash.Multihash) { + fmt.Printf("→ fetching blob from %s\n", location.String()) + req, err := http.Get(location.String()) + require.NoError(t, err) + require.Equal(t, http.StatusOK, req.StatusCode) + data, err := io.ReadAll(req.Body) + require.NoError(t, err) + digest, err := multihash.Sum(data, multihash.SHA2_256, -1) + require.NoError(t, err) + fmt.Println("✔ fetch success") + return data, digest +} + +func generateIndex(t *testing.T, content ipld.Link, carBytes []byte) (blobindex.ShardedDagIndexView, multihash.Multihash, ipld.Link, []byte) { + fmt.Println("→ generating index") + index, err := blobindex.FromShardArchives(content, [][]byte{carBytes}) + require.NoError(t, err) + bytes, err := io.ReadAll(testutil.Must(index.Archive())(t)) + require.NoError(t, err) + digest, err := multihash.Sum(bytes, multihash.SHA2_256, -1) + require.NoError(t, err) + link := cidlink.Link{Cid: cid.NewCidV1(uint64(multicodec.Car), digest)} + fmt.Printf("✔ index created: %s (%s)\n", link.String(), digestutil.Format(digest)) + return index, digest, link, bytes +} + +func collectIndexes(t *testing.T, result types.QueryResult) []blobindex.ShardedDagIndexView { + br, err := blockstore.NewBlockReader(blockstore.WithBlocksIterator(result.Blocks())) + require.NoError(t, err) + + var indexes []blobindex.ShardedDagIndexView + for _, link := range result.Indexes() { + b, ok, err := br.Get(link) + require.NoError(t, err) + require.True(t, ok) + + index, err := blobindex.Extract(bytes.NewReader(b.Bytes())) + require.NoError(t, err) + indexes = append(indexes, index) + } + return indexes +} + +func collectClaims(t *testing.T, result types.QueryResult) []delegation.Delegation { + br, err := blockstore.NewBlockReader(blockstore.WithBlocksIterator(result.Blocks())) + require.NoError(t, err) + + var claims []delegation.Delegation + for _, link := range result.Claims() { + claim, err := delegation.NewDelegationView(link, br) + require.NoError(t, err) + claims = append(claims, claim) + } + return claims +} + +func requireContainsIndexClaim(t *testing.T, claims []delegation.Delegation, content ipld.Link, index ipld.Link) { + require.True(t, slices.ContainsFunc(claims, func(claim delegation.Delegation) bool { + cap := claim.Capabilities()[0] + if cap.Can() != assert.IndexAbility { + return false + } + nb, err := assert.IndexCaveatsReader.Read(cap.Nb()) + require.NoError(t, err) + return nb.Content == content && nb.Index == index + })) +} + +func requireContainsLocationCommitment(t *testing.T, claims []delegation.Delegation, content multihash.Multihash, space did.DID) { + require.True(t, slices.ContainsFunc(claims, func(claim delegation.Delegation) bool { + cap := claim.Capabilities()[0] + if cap.Can() != assert.LocationAbility { + return false + } + nb, err := assert.LocationCaveatsReader.Read(cap.Nb()) + require.NoError(t, err) + return bytes.Equal(nb.Content.Hash(), content) && nb.Space == space + })) +} + +func publishIndexClaim(t *testing.T, indexingClient *client.Client, issuer principal.Signer, proof delegation.Proof, content ipld.Link, index ipld.Link) { + fmt.Printf("→ performing assert/index with %s\n", index.String()) + err := indexingClient.PublishIndexClaim(context.Background(), issuer, assert.IndexCaveats{ + Content: content, + Index: index, + }, delegation.WithProof(proof)) + require.NoError(t, err) + fmt.Println("✔ assert/index success") +} + +func queryClaims(t *testing.T, indexingClient *client.Client, digest multihash.Multihash, space did.DID) types.QueryResult { + fmt.Printf("→ performing query for %s\n", digestutil.Format(digest)) + var match types.Match + if space != did.Undef { + match.Subject = append(match.Subject, space) + } + result, err := indexingClient.QueryClaims(context.Background(), types.Query{ + Hashes: []multihash.Multihash{digest}, + Match: match, + }) + require.NoError(t, err) + fmt.Println("✔ query success") + return result +} diff --git a/internal/bootstrap/bootstrap.go b/internal/bootstrap/bootstrap.go index e858858..11130cb 100644 --- a/internal/bootstrap/bootstrap.go +++ b/internal/bootstrap/bootstrap.go @@ -3,6 +3,7 @@ package bootstrap import ( "context" "fmt" + "net/http" "net/url" "testing" "time" @@ -37,7 +38,7 @@ func StartIPNIService( t *testing.T, announceURL url.URL, findURL url.URL, -) { +) func() { indexerCore := engine.New(memory.New()) reg, err := registry.New( @@ -66,19 +67,31 @@ func StartIPNIService( ingSvr, err := httpingest.New(announceAddr, indexerCore, ing, reg) require.NoError(t, err) + var ingStartErr error go func() { - err = ingSvr.Start() + ingStartErr = ingSvr.Start() }() findAddr := fmt.Sprintf("%s:%s", findURL.Hostname(), findURL.Port()) findSvr, err := httpfind.New(findAddr, indexerCore, reg) + var findStartErr error go func() { - err = findSvr.Start() + findStartErr = findSvr.Start() }() time.Sleep(time.Millisecond * 100) - require.NoError(t, err) + require.NoError(t, ingStartErr) + require.NoError(t, findStartErr) + + return func() { + ingSvr.Close() + ing.Close() + findSvr.Close() + reg.Close() + indexerCore.Close() + p2pHost.Close() + } } func StartIndexingService( @@ -87,7 +100,8 @@ func StartIndexingService( publicURL url.URL, indexerURL url.URL, directAnnounceURL url.URL, -) { + noCache bool, +) func() { privKey, err := crypto.UnmarshalEd25519PrivateKey(id.Raw()) require.NoError(t, err) @@ -103,14 +117,27 @@ func StartIndexingService( PublisherListenAddr: fmt.Sprintf("%s:%s", publisherListenURL.Hostname(), publisherListenURL.Port()), PublisherAnnounceAddrs: []string{announceAddr.String()}, } - indexer, err := construct.Construct( - cfg, - construct.WithStartIPNIServer(true), - construct.WithDatastore(dssync.MutexWrap(datastore.NewMapDatastore())), - construct.WithProvidersClient(rsync.MutexWrap(redis.NewMapRedis())), - construct.WithClaimsClient(rsync.MutexWrap(redis.NewMapRedis())), - construct.WithIndexesClient(rsync.MutexWrap(redis.NewMapRedis())), - ) + + var indexer construct.Service + if noCache { + indexer, err = construct.Construct( + cfg, + construct.WithStartIPNIServer(true), + construct.WithDatastore(dssync.MutexWrap(datastore.NewMapDatastore())), + construct.WithProvidersClient(redis.NewBlackholeRedis()), + construct.WithClaimsClient(redis.NewBlackholeRedis()), + construct.WithIndexesClient(redis.NewBlackholeRedis()), + ) + } else { + indexer, err = construct.Construct( + cfg, + construct.WithStartIPNIServer(true), + construct.WithDatastore(dssync.MutexWrap(datastore.NewMapDatastore())), + construct.WithProvidersClient(rsync.MutexWrap(redis.NewMapRedis())), + construct.WithClaimsClient(rsync.MutexWrap(redis.NewMapRedis())), + construct.WithIndexesClient(rsync.MutexWrap(redis.NewMapRedis())), + ) + } require.NoError(t, err) err = indexer.Startup(context.Background()) @@ -123,6 +150,10 @@ func StartIndexingService( time.Sleep(time.Millisecond * 100) require.NoError(t, err) + + return func() { + indexer.Shutdown(context.Background()) + } } func StartStorageNode( @@ -133,7 +164,7 @@ func StartStorageNode( indexingServiceDID ucan.Principal, indexingServiceURL url.URL, indexingServiceProof delegation.Proof, -) { +) func() { svc, err := storage.New( storage.WithIdentity(id), storage.WithBlobstore(blobstore.NewMapBlobstore()), @@ -147,11 +178,22 @@ func StartStorageNode( ) require.NoError(t, err) + srvMux, err := server.NewServer(svc) + require.NoError(t, err) + + httpServer := &http.Server{ + Addr: fmt.Sprintf("%s:%s", publicURL.Hostname(), publicURL.Port()), + Handler: srvMux, + } + go func() { - addr := fmt.Sprintf("%s:%s", publicURL.Hostname(), publicURL.Port()) - err = server.ListenAndServe(addr, svc) + err = httpServer.ListenAndServe() }() time.Sleep(time.Millisecond * 100) require.NoError(t, err) + + return func() { + httpServer.Close() + } } diff --git a/internal/printer/queryresults.go b/internal/printer/queryresults.go new file mode 100644 index 0000000..e0485be --- /dev/null +++ b/internal/printer/queryresults.go @@ -0,0 +1,90 @@ +package printer + +import ( + "bytes" + "fmt" + "testing" + + "github.com/storacha/go-capabilities/pkg/assert" + "github.com/storacha/go-ucanto/core/dag/blockstore" + "github.com/storacha/go-ucanto/core/delegation" + "github.com/storacha/indexing-service/pkg/blobindex" + "github.com/storacha/indexing-service/pkg/types" + "github.com/storacha/testthenetwork/internal/digestutil" + "github.com/stretchr/testify/require" +) + +func PrintQueryResults(t *testing.T, results types.QueryResult) { + br, err := blockstore.NewBlockReader(blockstore.WithBlocksIterator(results.Blocks())) + require.NoError(t, err) + + fmt.Println("") + fmt.Println("# Query Results") + fmt.Println("") + fmt.Printf("## Claims (%d)\n", len(results.Claims())) + fmt.Println("") + i := 1 + for _, link := range results.Claims() { + fmt.Printf("%d. %s\n", i, link.String()) + claim, err := delegation.NewDelegationView(link, br) + require.NoError(t, err) + fmt.Printf("\tIssuer: %s\n", claim.Issuer().DID()) + fmt.Printf("\tAudience: %s\n", claim.Audience().DID()) + cap := claim.Capabilities()[0] + fmt.Printf("\tCan: %s\n", cap.Can()) + fmt.Printf("\tWith: %s\n", cap.With()) + switch cap.Can() { + case assert.LocationAbility: + fmt.Println("\tCaveats:") + nb, err := assert.LocationCaveatsReader.Read(cap.Nb()) + require.NoError(t, err) + fmt.Printf("\t\tSpace: %s\n", nb.Space) + fmt.Printf("\t\tContent: %s\n", digestutil.Format(nb.Content.Hash())) + fmt.Printf("\t\tLocation: %s\n", &nb.Location[0]) + if nb.Range != nil { + if nb.Range.Length != nil { + fmt.Printf("\t\tRange: %d-%d\n", nb.Range.Offset, nb.Range.Offset+*nb.Range.Length) + } else { + fmt.Printf("\t\tRange: %d-\n", nb.Range.Offset) + } + } + case assert.IndexAbility: + fmt.Println("\tCaveats:") + nb, err := assert.IndexCaveatsReader.Read(cap.Nb()) + require.NoError(t, err) + fmt.Printf("\t\tContent: %s\n", nb.Content.String()) + fmt.Printf("\t\tIndex: %s\n", nb.Index.String()) + case assert.EqualsAbility: + fmt.Println("\tCaveats:") + nb, err := assert.EqualsCaveatsReader.Read(cap.Nb()) + require.NoError(t, err) + fmt.Printf("\t\tContent: %s\n", digestutil.Format(nb.Content.Hash())) + fmt.Printf("\t\tEquals: %s\n", nb.Equals.String()) + } + fmt.Println("") + i++ + } + fmt.Printf("## Indexes (%d)\n", len(results.Indexes())) + fmt.Println("") + i = 1 + for _, link := range results.Indexes() { + fmt.Printf("%d. %s\n", i, link.String()) + b, ok, err := br.Get(link) + require.NoError(t, err) + require.True(t, ok) + + index, err := blobindex.Extract(bytes.NewReader(b.Bytes())) + require.NoError(t, err) + + fmt.Printf("\tContent: %s\n", index.Content().String()) + fmt.Printf("\tShards (%d):\n", index.Shards().Size()) + for shard, slices := range index.Shards().Iterator() { + fmt.Printf("\t\t%s\n", digestutil.Format(shard)) + for slice, position := range slices.Iterator() { + fmt.Printf("\t\t\t%s %d-%d\n", digestutil.Format(slice), position.Offset, position.Offset+position.Length) + } + } + fmt.Println("") + i++ + } +} diff --git a/internal/redis/blackhole.go b/internal/redis/blackhole.go new file mode 100644 index 0000000..efbe86b --- /dev/null +++ b/internal/redis/blackhole.go @@ -0,0 +1,36 @@ +package redis + +import ( + "context" + "time" + + goredis "github.com/redis/go-redis/v9" + "github.com/storacha/indexing-service/pkg/redis" +) + +// BlackholeRedis is a client that does not retain any data. +type BlackholeRedis struct{} + +var _ redis.Client = (*BlackholeRedis)(nil) + +func NewBlackholeRedis() *BlackholeRedis { + return &BlackholeRedis{} +} + +func (bh *BlackholeRedis) Expire(ctx context.Context, key string, expiration time.Duration) *goredis.BoolCmd { + return goredis.NewBoolCmd(ctx, nil) +} + +func (bh *BlackholeRedis) Get(ctx context.Context, key string) *goredis.StringCmd { + cmd := goredis.NewStringCmd(ctx, nil) + cmd.SetErr(goredis.Nil) + return cmd +} + +func (m *BlackholeRedis) Persist(ctx context.Context, key string) *goredis.BoolCmd { + return goredis.NewBoolCmd(ctx, nil) +} + +func (m *BlackholeRedis) Set(ctx context.Context, key string, value interface{}, expiration time.Duration) *goredis.StatusCmd { + return goredis.NewStatusCmd(ctx, nil) +} diff --git a/internal/redis/redisclient.go b/internal/redis/map.go similarity index 100% rename from internal/redis/redisclient.go rename to internal/redis/map.go diff --git a/internal/testutil/gen.go b/internal/testutil/gen.go index 9719bd7..0f30172 100644 --- a/internal/testutil/gen.go +++ b/internal/testutil/gen.go @@ -20,9 +20,9 @@ import ( ) // RandomCAR creates a CAR with a single block of random bytes of the specified -// size. It returns the link of the root block, the hash of the CAR itself and -// the bytes of the CAR. -func RandomCAR(t *testing.T, size int) (ipld.Link, multihash.Multihash, []byte) { +// size. It returns the link of the root block, the hash of the root block, the +// hash of the CAR itself and the bytes of the CAR. +func RandomCAR(t *testing.T, size int) (ipld.Link, multihash.Multihash, multihash.Multihash, []byte) { digest, bytes := RandomBytes(t, size) root := cidlink.Link{Cid: cid.NewCidV1(cid.Raw, digest)} r := car.Encode([]ipld.Link{root}, func(yield func(block.Block, error) bool) { @@ -32,7 +32,7 @@ func RandomCAR(t *testing.T, size int) (ipld.Link, multihash.Multihash, []byte) require.NoError(t, err) carDigest, err := multihash.Sum(carBytes, multihash.SHA2_256, -1) require.NoError(t, err) - return root, carDigest, carBytes + return root, digest, carDigest, carBytes } func RandomBytes(t *testing.T, size int) (multihash.Multihash, []byte) { @@ -54,8 +54,20 @@ func RandomSigner(t *testing.T) principal.Signer { return id } +var assignedPorts = map[int]struct{}{} + +// RandomLocalURL finds a free port and will not generate another URL with the +// same port until test cleanup, even if no service is bound to it. func RandomLocalURL(t *testing.T) url.URL { - port := GetFreePort(t) + var port int + for { + port = GetFreePort(t) + if _, ok := assignedPorts[port]; !ok { + assignedPorts[port] = struct{}{} + t.Cleanup(func() { delete(assignedPorts, port) }) + break + } + } pubURL, err := url.Parse(fmt.Sprintf("http://127.0.0.1:%d", port)) require.NoError(t, err) return *pubURL diff --git a/internal/upload/service.go b/internal/upload/service.go index 81e9325..701a0e4 100644 --- a/internal/upload/service.go +++ b/internal/upload/service.go @@ -1,6 +1,7 @@ package upload import ( + "fmt" "net/http" "net/url" "testing" @@ -19,6 +20,7 @@ import ( "github.com/storacha/go-ucanto/principal" uhttp "github.com/storacha/go-ucanto/transport/http" "github.com/storacha/go-ucanto/ucan" + "github.com/storacha/testthenetwork/internal/digestutil" "github.com/storacha/testthenetwork/internal/testutil" "github.com/stretchr/testify/require" ) @@ -34,6 +36,8 @@ type Config struct { StorageProof delegation.Proof } +// UploadService simulates actions taken by the upload service in response to +// client invocations. type UploadService struct { cfg Config conn client.Connection @@ -44,6 +48,9 @@ type UploadService struct { // upload address if required (i.e. it may be nil if the storage node already // has the blob). func (s *UploadService) BlobAdd(t *testing.T, space did.DID, digest multihash.Multihash, size uint64) *blob.Address { + fmt.Printf("→ performing blob/add with %s\n", digestutil.Format(digest)) + defer fmt.Println("✔ blob/add success") + inv, err := blob.Allocate.Invoke( s.cfg.ID, s.cfg.StorageNodeID, @@ -99,6 +106,9 @@ func (s *UploadService) BlobAdd(t *testing.T, space did.DID, digest multihash.Mu // from the client. It sends a blob/accept invocation to the storage node and // returns the location commitment. func (s *UploadService) ConcludeHTTPPut(t *testing.T, space did.DID, digest multihash.Multihash, size uint64, expires uint64) delegation.Delegation { + fmt.Println("→ performing ucan/conclude for http/put") + defer fmt.Println("✔ ucan/conclude success") + inv, err := blob.Accept.Invoke( s.cfg.ID, s.cfg.StorageNodeID, diff --git a/main_test.go b/main_test.go index c324bf9..0e77b4e 100644 --- a/main_test.go +++ b/main_test.go @@ -1,313 +1,58 @@ package main import ( - "bytes" - "context" - "fmt" - "io" - "net/http" - "net/url" "testing" - "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" - "github.com/ipld/go-ipld-prime" - cidlink "github.com/ipld/go-ipld-prime/linking/cid" - "github.com/multiformats/go-multicodec" - "github.com/multiformats/go-multihash" - "github.com/storacha/go-capabilities/pkg/assert" - "github.com/storacha/go-capabilities/pkg/blob" - "github.com/storacha/go-capabilities/pkg/claim" - "github.com/storacha/go-ucanto/core/dag/blockstore" - "github.com/storacha/go-ucanto/core/delegation" - "github.com/storacha/go-ucanto/core/ipld/block" - "github.com/storacha/go-ucanto/ucan" - "github.com/storacha/indexing-service/pkg/blobindex" - "github.com/storacha/indexing-service/pkg/client" - "github.com/storacha/indexing-service/pkg/types" - "github.com/storacha/testthenetwork/internal/bootstrap" - "github.com/storacha/testthenetwork/internal/digestutil" + "github.com/storacha/go-ucanto/did" + "github.com/storacha/testthenetwork/internal/printer" "github.com/storacha/testthenetwork/internal/testutil" - "github.com/storacha/testthenetwork/internal/upload" "github.com/stretchr/testify/require" ) -func TestUpload(t *testing.T) { +func TestTheNetwork(t *testing.T) { logging.SetLogLevel("*", "warn") - // Start IPNI //////////////////////////////////////////////////////////////// - ipniFindURL := testutil.RandomLocalURL(t) - ipniAnnounceURL := testutil.RandomLocalURL(t) + t.Run("round trip", func(t *testing.T) { + storageID, indexingID, uploadID, aliceID := generateIdentities(t) + ipniFindURL, ipniAnnounceURL, storageURL, indexingURL := generateURLs(t) + storageIndexingProof, uploadStorageProof, aliceIndexingProof := generateProofs(t, storageID, indexingID, uploadID, aliceID) + uploadService, indexingClient := startServices(t, ipniFindURL, ipniAnnounceURL, storageID, storageURL, storageIndexingProof, indexingID, indexingURL, false, uploadID, uploadStorageProof) - fmt.Println("→ starting IPNI service") - bootstrap.StartIPNIService(t, ipniAnnounceURL, ipniFindURL) - fmt.Printf("✔ IPNI find and announce services running at %s and %s\n", ipniFindURL.String(), ipniAnnounceURL.String()) + space := testutil.RandomPrincipal(t).DID() + root, rootDigest, digest, data := generateContent(t, 256) - // Start Indexing Service //////////////////////////////////////////////////// - indexingID := testutil.RandomSigner(t) - indexingURL := testutil.RandomLocalURL(t) - - fmt.Println("→ starting indexing service") - bootstrap.StartIndexingService(t, indexingID, indexingURL, ipniFindURL, ipniAnnounceURL) - fmt.Printf("✔ indexing service (%s) running at %s\n", indexingID.DID(), indexingURL.String()) - - fmt.Println("→ creating indexing service client") - indexingClient, err := client.New(indexingID, indexingURL) - require.NoError(t, err) - fmt.Printf("✔ indexing service client created\n") - - // Start Storage Node //////////////////////////////////////////////////////// - storageID := testutil.RandomSigner(t) - storageURL := testutil.RandomLocalURL(t) - // proof storage node can invoke on indexing service - storageIndexingProof := delegation.FromDelegation( - testutil.Must( - delegation.Delegate( - indexingID, - storageID, - []ucan.Capability[ucan.NoCaveats]{ - ucan.NewCapability( - claim.CacheAbility, - indexingID.DID().String(), - ucan.NoCaveats{}, - ), - }, - ), - )(t), - ) - fmt.Println("→ starting storage node") - bootstrap.StartStorageNode(t, storageID, storageURL, ipniAnnounceURL, indexingID, indexingURL, storageIndexingProof) - fmt.Printf("✔ storage node (%s) running at %s\n", storageID.DID(), storageURL.String()) - - // Start Upload Service ////////////////////////////////////////////////////// - uploadID := testutil.RandomSigner(t) - // proof upload service can invoke on storage node - uploadStorageProof := delegation.FromDelegation( - testutil.Must( - delegation.Delegate( - storageID, - uploadID, - []ucan.Capability[ucan.NoCaveats]{ - ucan.NewCapability( - blob.AllocateAbility, - storageID.DID().String(), - ucan.NoCaveats{}, - ), - ucan.NewCapability( - blob.AcceptAbility, - storageID.DID().String(), - ucan.NoCaveats{}, - ), - }, - ), - )(t), - ) - - // Alice ///////////////////////////////////////////////////////////////////// - aliceID := testutil.RandomSigner(t) - // proof upload service can invoke on indexing service - aliceIndexingProof := delegation.FromDelegation( - testutil.Must( - delegation.Delegate( - indexingID, - aliceID, - []ucan.Capability[ucan.NoCaveats]{ - ucan.NewCapability( - assert.EqualsAbility, - indexingID.DID().String(), - ucan.NoCaveats{}, - ), - ucan.NewCapability( - assert.IndexAbility, - indexingID.DID().String(), - ucan.NoCaveats{}, - ), - }, - ), - )(t), - ) - - fmt.Println("→ creating upload service") - uploadService := upload.NewService(t, upload.Config{ - ID: uploadID, - StorageNodeID: storageID, - StorageNodeURL: storageURL, - StorageProof: uploadStorageProof, - }) - fmt.Printf("✔ upload service (%s) created\n", uploadID.DID()) - - space := testutil.RandomPrincipal(t).DID() - - fmt.Println("→ generating content") - root, digest, data := testutil.RandomCAR(t, 256) - rootDigest := digestutil.ExtractDigest(root) - size := uint64(len(data)) - fmt.Printf("✔ generation success, root: %s\n", root.String()) - - fmt.Printf("→ performing blob/add with %s\n", digestutil.Format(digest)) - address := uploadService.BlobAdd(t, space, digest, size) - fmt.Println("✔ blob/add success") - - if address != nil { - fmt.Printf("→ performing http/put to %s\n", address.URL.String()) - putBlob(t, address.URL, address.Headers, data) - fmt.Println("✔ http/put success") - } - - fmt.Println("→ performing ucan/conclude for http/put") - claim := uploadService.ConcludeHTTPPut(t, space, digest, size, address.Expires) - fmt.Println("✔ ucan/conclude success") - - fmt.Println("→ decoding location commitment") - nb, rerr := assert.LocationCaveatsReader.Read(claim.Capabilities()[0].Nb()) - require.NoError(t, rerr) - fmt.Printf("✔ decode success - %s @ %s\n", claim.Capabilities()[0].Can(), nb.Location[0].String()) - - fmt.Println("→ fetching blob") - blobBytes := fetchBlob(t, nb.Location[0]) - blobDigest, err := multihash.Sum(blobBytes, multihash.SHA2_256, -1) - require.NoError(t, err) - require.Equal(t, digest, blobDigest) - fmt.Println("✔ fetch success") - - fmt.Println("→ creating index") - index, err := blobindex.FromShardArchives(root, [][]byte{blobBytes}) - require.NoError(t, err) - indexData, err := io.ReadAll(testutil.Must(index.Archive())(t)) - require.NoError(t, err) - indexSize := uint64(len(indexData)) - indexDigest, err := multihash.Sum(indexData, multihash.SHA2_256, -1) - require.NoError(t, err) - indexLink := cidlink.Link{Cid: cid.NewCidV1(uint64(multicodec.Car), indexDigest)} - fmt.Printf("✔ index created: %s (%s)\n", indexLink.String(), digestutil.Format(indexDigest)) - - fmt.Printf("→ performing index blob/add with %s\n", digestutil.Format(indexDigest)) - address = uploadService.BlobAdd(t, space, indexDigest, indexSize) - fmt.Println("✔ index blob/add success") - - if address != nil { - fmt.Printf("→ performing index http/put to %s\n", address.URL.String()) - putBlob(t, address.URL, address.Headers, indexData) - fmt.Println("✔ index http/put success") - } - - fmt.Println("→ performing index ucan/conclude for http/put") - uploadService.ConcludeHTTPPut(t, space, indexDigest, indexSize, address.Expires) - fmt.Println("✔ index ucan/conclude success") + address := uploadService.BlobAdd(t, space, digest, uint64(len(data))) + if address != nil { + putBlob(t, address.URL, address.Headers, data) + } + claim := uploadService.ConcludeHTTPPut(t, space, digest, uint64(len(data)), address.Expires) - fmt.Printf("→ performing assert/index with %s\n", indexLink.String()) - err = indexingClient.PublishIndexClaim(context.Background(), aliceID, assert.IndexCaveats{ - Content: root, - Index: indexLink, - }, delegation.WithProof(aliceIndexingProof)) - require.NoError(t, err) - fmt.Println("✔ assert/index success") + nb := decodeLocationCommitmentCaveats(t, claim) - fmt.Printf("→ performing query for %s (%s)\n", root.String(), digestutil.Format(rootDigest)) - res, err := indexingClient.QueryClaims(context.Background(), types.Query{ - Hashes: []multihash.Multihash{rootDigest}, - }) - require.NoError(t, err) - fmt.Println("✔ query success") + blobBytes, blobDigest := fetchBlob(t, nb.Location[0]) + require.Equal(t, digest, blobDigest) - printQueryResults(t, res) -} + _, indexDigest, indexLink, indexData := generateIndex(t, root, blobBytes) -func printQueryResults(t *testing.T, results types.QueryResult) { - blocks := map[ipld.Link]block.Block{} - for b, err := range results.Blocks() { - require.NoError(t, err) - blocks[b.Link()] = b - } - br, err := blockstore.NewBlockReader(blockstore.WithBlocksIterator(results.Blocks())) - require.NoError(t, err) - - fmt.Println("") - fmt.Println("# Query Results") - fmt.Println("") - fmt.Printf("## Claims (%d)\n", len(results.Claims())) - fmt.Println("") - i := 1 - for _, link := range results.Claims() { - fmt.Printf("%d. %s\n", i, link.String()) - claim, err := delegation.NewDelegationView(link, br) - require.NoError(t, err) - fmt.Printf("\tIssuer: %s\n", claim.Issuer().DID()) - fmt.Printf("\tAudience: %s\n", claim.Audience().DID()) - cap := claim.Capabilities()[0] - fmt.Printf("\tCan: %s\n", cap.Can()) - fmt.Printf("\tWith: %s\n", cap.With()) - switch cap.Can() { - case assert.LocationAbility: - fmt.Println("\tCaveats:") - nb, err := assert.LocationCaveatsReader.Read(cap.Nb()) - require.NoError(t, err) - fmt.Printf("\t\tSpace: %s\n", nb.Space) - fmt.Printf("\t\tContent: %s\n", digestutil.Format(nb.Content.Hash())) - fmt.Printf("\t\tLocation: %s\n", &nb.Location[0]) - if nb.Range != nil { - if nb.Range.Length != nil { - fmt.Printf("\t\tRange: %d-%d\n", nb.Range.Offset, nb.Range.Offset+*nb.Range.Length) - } else { - fmt.Printf("\t\tRange: %d-\n", nb.Range.Offset) - } - } - case assert.IndexAbility: - fmt.Println("\tCaveats:") - nb, err := assert.IndexCaveatsReader.Read(cap.Nb()) - require.NoError(t, err) - fmt.Printf("\t\tContent: %s\n", nb.Content.String()) - fmt.Printf("\t\tIndex: %s\n", nb.Index.String()) - case assert.EqualsAbility: - fmt.Println("\tCaveats:") - nb, err := assert.EqualsCaveatsReader.Read(cap.Nb()) - require.NoError(t, err) - fmt.Printf("\t\tContent: %s\n", digestutil.Format(nb.Content.Hash())) - fmt.Printf("\t\tEquals: %s\n", nb.Equals.String()) + address = uploadService.BlobAdd(t, space, indexDigest, uint64(len(indexData))) + if address != nil { + putBlob(t, address.URL, address.Headers, indexData) } - fmt.Println("") - i++ - } - fmt.Printf("## Indexes (%d)\n", len(results.Indexes())) - fmt.Println("") - i = 1 - for _, link := range results.Indexes() { - fmt.Printf("%d. %s\n", i, link.String()) - b, ok, err := br.Get(link) - require.NoError(t, err) - require.True(t, ok) - - index, err := blobindex.Extract(bytes.NewReader(b.Bytes())) - require.NoError(t, err) + uploadService.ConcludeHTTPPut(t, space, indexDigest, uint64(len(indexData)), address.Expires) - fmt.Printf("\tContent: %s\n", index.Content().String()) - fmt.Printf("\tShards (%d):\n", index.Shards().Size()) - for shard, slices := range index.Shards().Iterator() { - fmt.Printf("\t\t%s\n", digestutil.Format(shard)) - for slice, position := range slices.Iterator() { - fmt.Printf("\t\t\t%s %d-%d\n", digestutil.Format(slice), position.Offset, position.Offset+position.Length) - } - } - fmt.Println("") - i++ - } -} + publishIndexClaim(t, indexingClient, aliceID, aliceIndexingProof, root, indexLink) -func putBlob(t *testing.T, location url.URL, headers http.Header, data []byte) { - req, err := http.NewRequest("PUT", location.String(), bytes.NewReader(data)) - require.NoError(t, err) - req.Header = headers + result := queryClaims(t, indexingClient, rootDigest, did.Undef) + printer.PrintQueryResults(t, result) - res, err := http.DefaultClient.Do(req) - require.NoError(t, err) - require.Equal(t, http.StatusCreated, res.StatusCode) -} + indexes := collectIndexes(t, result) + require.Len(t, indexes, 1) + require.Equal(t, indexLink, result.Indexes()[0]) // should be the index we generated -func fetchBlob(t *testing.T, location url.URL) []byte { - req, err := http.Get(location.String()) - require.NoError(t, err) - require.Equal(t, http.StatusOK, req.StatusCode) - data, err := io.ReadAll(req.Body) - require.NoError(t, err) - return data + claims := collectClaims(t, result) + requireContainsIndexClaim(t, claims, root, indexLink) // find an index claim for our root + requireContainsLocationCommitment(t, claims, indexDigest, space) // find a location commitment for the index + requireContainsLocationCommitment(t, claims, blobDigest, space) // find a location commitment for the shard + }) }