Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[LoadTesting] Code changes to unblock load testing #819

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions api/poktroll/application/types.pulsar.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 14 additions & 9 deletions load-testing/loadtest_manifest_example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,25 @@ is_ephemeral_chain: false

# testnet_node is the URL of the node that the load test will use to query the
# chain and submit transactions.
testnet_node: https://devnet-sophon-validator-rpc.poktroll.com
testnet_node: http://149.28.12.228:26657
# testnet_node: https://testnet-validated-validator-rpc.poktroll.com

# The service ID to request relays from.
service_id: "anvil"
service_id: "protocol-anvil"

# The address of the account that will be used to fund the application accounts
# so that they can stake on the network.
funding_account_address: pokt1awtlw5sjmw2f5lgj8ekdkaqezphgz88rdk93sk # address for faucet account
funding_account_address: pokt1v3mcrj0h2zfekyf2n8m369x4v9wfvdm34hecd9 # address for faucet account

# In non-ephemeral chains, the gateways are identified by their address.
gateways:
- address: pokt15vzxjqklzjtlz7lahe8z2dfe9nm5vxwwmscne4
exposed_url: https://devnet-sophon-gateway-1.poktroll.com
- address: pokt15w3fhfyc0lttv7r585e2ncpf6t2kl9uh8rsnyz
exposed_url: https://devnet-sophon-gateway-2.poktroll.com
- address: pokt1zhmkkd0rh788mc9prfq0m2h88t9ge0j83gnxya
exposed_url: https://devnet-sophon-gateway-3.poktroll.com
- address: pokt18anvrjyxvh2mc3agmxxhd047yna6ccfy2y97cs
exposed_url: https://testnet-gateway-1.us-nj.poktroll.com
- address: pokt1gdwgchpz56uyr2m2g29gvuu3lc8fe3649ew2am
exposed_url: https://testnet-gateway-2.us-nj.poktroll.com
- address: pokt1mm0xn9kxu789las3d4ehfndq3vudnujpdmkpjl
exposed_url: https://testnet-gateway-3.us-nj.poktroll.com
- address: pokt13an9w8plrpvf0vq2jywcc4xc5ggz3tkhmlj4uq
exposed_url: https://testnet-gateway-4.us-nj.poktroll.com
- address: pokt1y0pawkkgd8659n2mq6r80pgu50xkuzn5s62nfj
exposed_url: https://testnet-gateway-5.us-nj.poktroll.com
10 changes: 5 additions & 5 deletions load-testing/tests/relays_stress.feature
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@ Feature: Loading gateway server with relays

Scenario: Incrementing the number of relays and actors
Given localnet is running
And a rate of "1" relay requests per second is sent per application
And a rate of "10" relay requests per second is sent per application
And the following initial actors are staked:
| actor | count |
| application | 4 |
| application | 100 |
| gateway | 1 |
| supplier | 1 |
And more actors are staked as follows:
| actor | actor inc amount | blocks per inc | max actors |
| application | 4 | 10 | 12 |
| gateway | 1 | 10 | 3 |
| supplier | 1 | 10 | 3 |
| application | 50 | 10 | 250 |
| gateway | 1 | 10 | 5 |
| supplier | 1 | 10 | 5 |
When a load of concurrent relay requests are sent from the applications
Then the correct pairs count of claim and proof messages should be committed on-chain
37 changes: 22 additions & 15 deletions pkg/appgateserver/endpoint_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,10 @@ import (
sharedtypes "github.com/pokt-network/poktroll/x/shared/types"
)

// getRelayerUrl returns the next relayer endpoint to use for the given serviceId and rpcType.
// NB: This is a naive implementation of the endpoint selection strategy.
// It is intentionally kept simple for the sake of a clear example, and future
// optimizations (i.e. quality of service implementations) are left as an exercise
// to gateways.
func (app *appGateServer) getRelayerUrl(
rpcType sharedtypes.RPCType,
func (app *appGateServer) getMatchingEndpoints(
sessionFilter shannonsdk.SessionFilter,
requestUrlStr string,
) (supplierEndpoint shannonsdk.Endpoint, err error) {
// AppGateServer uses the custom getRelayerUrl instead of leveraging the SDK's
// filter to select the next endpoint to use.
// This is because it needs to maintain the state of the last selected endpoint
// and have a view on the original request URL to determine the next endpoint.
// This behavior is specific to the AppGateServer and needed by clients that
// need to instrument the endpoint selection strategy, such as the Load testing tool.
rpcType sharedtypes.RPCType,
) ([]shannonsdk.Endpoint, error) {
endpoints, err := sessionFilter.AllEndpoints()
if err != nil {
return nil, err
Expand All @@ -47,6 +35,25 @@ func (app *appGateServer) getRelayerUrl(
return nil, ErrAppGateNoRelayEndpoints
}

return matchingRPCTypeEndpoints, nil
}

// getRelayerUrl returns the next relayer endpoint to use for the given serviceId and rpcType.
// NB: This is a naive implementation of the endpoint selection strategy.
// It is intentionally kept simple for the sake of a clear example, and future
// optimizations (i.e. quality of service implementations) are left as an exercise
// to gateways.
func (app *appGateServer) getRelayerUrl(
requestUrlStr string,
matchingRPCTypeEndpoints []shannonsdk.Endpoint,
) (supplierEndpoint shannonsdk.Endpoint, err error) {
// AppGateServer uses the custom getRelayerUrl instead of leveraging the SDK's
// filter to select the next endpoint to use.
// This is because it needs to maintain the state of the last selected endpoint
// and have a view on the original request URL to determine the next endpoint.
// This behavior is specific to the AppGateServer and needed by clients that
// need to instrument the endpoint selection strategy, such as the Load testing tool.

// Protect the endpointSelectionIndex update from concurrent relay requests.
app.endpointSelectionIndexMu.Lock()
defer app.endpointSelectionIndexMu.Unlock()
Expand Down
8 changes: 7 additions & 1 deletion pkg/appgateserver/sdkadapter/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,9 @@ func (shannonSDK *ShannonSDK) SendRelay(
// for the given appAddress and serviceId.
func (shannonSDK *ShannonSDK) GetSessionSupplierEndpoints(
ctx context.Context,
currentHeight int64,
appAddress, serviceId string,
) (*shannonsdk.SessionFilter, error) {
currentHeight := shannonSDK.blockClient.LastBlock(ctx).Height()
session, err := shannonSDK.sessionClient.GetSession(ctx, appAddress, serviceId, currentHeight)
okdas marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
Expand All @@ -143,3 +143,9 @@ func (shannonSDK *ShannonSDK) GetSessionSupplierEndpoints(

return filteredSession, nil
}

func (shannonSDK *ShannonSDK) GetHeight(
ctx context.Context,
) int64 {
return shannonSDK.blockClient.LastBlock(ctx).Height()
}
252 changes: 252 additions & 0 deletions pkg/appgateserver/sdkadapter/sdk_tmp._go_
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
package sdkadapter

import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"sync"
"time"

"cosmossdk.io/depinject"
cryptotypes "github.com/cosmos/cosmos-sdk/crypto/types"
shannonsdk "github.com/pokt-network/shannon-sdk"

"github.com/pokt-network/poktroll/pkg/client"
"github.com/pokt-network/poktroll/pkg/client/query"
"github.com/pokt-network/poktroll/x/service/types"
)

// ShannonSDK is a wrapper around the Shannon SDK used by the AppGateServer
// to encapsulate the SDK's functionality and dependencies.
type ShannonSDK struct {
blockClient client.BlockClient
sessionClient client.SessionQueryClient
appClient client.ApplicationQueryClient
accountClient client.AccountQueryClient
relayClient *http.Client
signer *shannonsdk.Signer
}

// sessionCache stores session results to reduce redundant queries.
type sessionCache struct {
mu sync.RWMutex
cache map[string]*cachedSession
}

// cachedSession represents a cached session data.
type cachedSession struct {
filter *shannonsdk.SessionFilter
height int64
}

// sessionCacheInstance is a global instance of the session cache.
var sessionCacheInstance = sessionCache{
cache: make(map[string]*cachedSession),
}

// heightCache caches the latest block height to reduce queries.
type heightCache struct {
mu sync.RWMutex
cachedHeight int64
lastUpdated time.Time
invalidationDur time.Duration
}

// blockHeightCache is a global instance of the height cache with a default 1-second invalidation duration.
var blockHeightCache = heightCache{
invalidationDur: time.Second,
}

// cacheKey generates a unique key for the session cache.
func cacheKey(appAddress, serviceId string, height int64) string {
return fmt.Sprintf("%s-%s-%d", appAddress, serviceId, height)
}

// cleanupCache removes outdated entries from the session cache.
func (c *sessionCache) cleanupCache(currentHeight int64) {
c.mu.Lock()
defer c.mu.Unlock()

for key, entry := range c.cache {
if entry.height < currentHeight {
delete(c.cache, key)
}
}
}

func (shannonSDK *ShannonSDK) GetCachedHeight(ctx context.Context) (int64, error) {
blockHeightCache.mu.RLock()
if blockHeightCache.cachedHeight != 0 && time.Since(blockHeightCache.lastUpdated) < blockHeightCache.invalidationDur {
cachedHeight := blockHeightCache.cachedHeight
blockHeightCache.mu.RUnlock()
return cachedHeight, nil
}
blockHeightCache.mu.RUnlock()

blockHeightCache.mu.Lock()
defer blockHeightCache.mu.Unlock()

// Double-check the condition after acquiring the write lock
if blockHeightCache.cachedHeight != 0 && time.Since(blockHeightCache.lastUpdated) < blockHeightCache.invalidationDur {
return blockHeightCache.cachedHeight, nil
}

lastBlock := shannonSDK.blockClient.LastBlock(ctx)
if lastBlock == nil {
return 0, fmt.Errorf("failed to get last block")
}

newHeight := lastBlock.Height()
blockHeightCache.cachedHeight = newHeight
blockHeightCache.lastUpdated = time.Now()

return newHeight, nil
}

// NewShannonSDK creates a new ShannonSDK instance with the given signing key and dependencies.
func NewShannonSDK(
ctx context.Context,
signingKey cryptotypes.PrivKey,
deps depinject.Config,
) (*ShannonSDK, error) {
sessionClient, err := query.NewSessionQuerier(deps)
if err != nil {
return nil, fmt.Errorf("failed to create session querier: %w", err)
}

accountClient, err := query.NewAccountQuerier(deps)
if err != nil {
return nil, fmt.Errorf("failed to create account querier: %w", err)
}

appClient, err := query.NewApplicationQuerier(deps)
if err != nil {
return nil, fmt.Errorf("failed to create application querier: %w", err)
}

var blockClient client.BlockClient
if err = depinject.Inject(deps, &blockClient); err != nil {
return nil, fmt.Errorf("failed to inject block client: %w", err)
}

signer, err := NewSigner(signingKey)
if err != nil {
return nil, fmt.Errorf("failed to create signer: %w", err)
}

return &ShannonSDK{
blockClient: blockClient,
sessionClient: sessionClient,
accountClient: accountClient,
appClient: appClient,
relayClient: http.DefaultClient,
signer: signer,
}, nil
}

// SendRelay builds and sends a relay request to the given endpoint.
func (shannonSDK *ShannonSDK) SendRelay(
ctx context.Context,
appAddress string,
endpoint shannonsdk.Endpoint,
requestBz []byte,
) (*types.RelayResponse, error) {
relayRequest, err := shannonsdk.BuildRelayRequest(endpoint, requestBz)
if err != nil {
return nil, fmt.Errorf("failed to build relay request: %w", err)
}

application, err := shannonSDK.appClient.GetApplication(ctx, appAddress)
if err != nil {
return nil, fmt.Errorf("failed to get application: %w", err)
}

appRing := shannonsdk.ApplicationRing{
PublicKeyFetcher: shannonSDK.accountClient,
Application: application,
}

if _, err = shannonSDK.signer.Sign(ctx, relayRequest, appRing); err != nil {
return nil, fmt.Errorf("failed to sign relay request: %w", err)
}

relayRequestBz, err := relayRequest.Marshal()
if err != nil {
return nil, fmt.Errorf("failed to marshal relay request: %w", err)
}

response, err := shannonSDK.relayClient.Post(
endpoint.Endpoint().Url,
"application/json",
bytes.NewReader(relayRequestBz),
)
if err != nil {
return nil, fmt.Errorf("failed to send relay request: %w", err)
}
defer response.Body.Close()

responseBodyBz, err := io.ReadAll(response.Body)
if err != nil {
return nil, fmt.Errorf("failed to read response body: %w", err)
}

return shannonsdk.ValidateRelayResponse(
ctx,
endpoint.Supplier(),
responseBodyBz,
shannonSDK.accountClient,
)
}

// GetSessionSupplierEndpoints returns the current session's supplier endpoints,
// using a cached result if available.
func (shannonSDK *ShannonSDK) GetSessionSupplierEndpoints(
ctx context.Context,
appAddress, serviceId string,
) (*shannonsdk.SessionFilter, error) {
currentHeight, err := shannonSDK.GetCachedHeight(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get current height: %w", err)
}

key := cacheKey(appAddress, serviceId, currentHeight)

sessionCacheInstance.mu.RLock()
if result, found := sessionCacheInstance.cache[key]; found {
sessionCacheInstance.mu.RUnlock()
return result.filter, nil
}
sessionCacheInstance.mu.RUnlock()

session, err := shannonSDK.sessionClient.GetSession(ctx, appAddress, serviceId, currentHeight)
if err != nil {
return nil, fmt.Errorf("failed to get session: %w", err)
}

// Create a deep copy of the session to ensure immutability
sessionCopy := *session

filteredSession := &shannonsdk.SessionFilter{
Session: &sessionCopy, // Store a pointer to our copy
}

sessionCacheInstance.mu.Lock()
sessionCacheInstance.cache[key] = &cachedSession{
filter: filteredSession,
height: currentHeight,
}
sessionCacheInstance.mu.Unlock()

sessionCacheInstance.cleanupCache(currentHeight)

return filteredSession, nil
}

// SetHeightCacheInvalidationDuration allows configuration of the block height cache invalidation duration.
func SetHeightCacheInvalidationDuration(duration time.Duration) {
blockHeightCache.mu.Lock()
defer blockHeightCache.mu.Unlock()
blockHeightCache.invalidationDur = duration
}
Loading