diff --git a/app/client/cli/cache/session.go b/app/client/cli/cache/session.go new file mode 100644 index 000000000..6412459fa --- /dev/null +++ b/app/client/cli/cache/session.go @@ -0,0 +1,87 @@ +package cache + +// TODO: add a TTL for cached sessions, since we know the sessions' length +import ( + "encoding/json" + "errors" + "fmt" + + "github.com/pokt-network/pocket/persistence/kvstore" + "github.com/pokt-network/pocket/rpc" +) + +var errSessionNotFound = errors.New("session not found in cache") + +// SessionCache defines the set of methods used to interact with the client-side session cache +type SessionCache interface { + Get(appAddr, chain string) (*rpc.Session, error) + Set(session *rpc.Session) error + Stop() error +} + +// sessionCache stores and retrieves sessions for application+relaychain pairs +// +// It uses a key-value store as backing storage +type sessionCache struct { + // store is the local store for cached sessions + store kvstore.KVStore +} + +// NewSessionCache returns a session cache backed by a kvstore using the provided database path. +func NewSessionCache(databasePath string) (SessionCache, error) { + store, err := kvstore.NewKVStore(databasePath) + if err != nil { + return nil, fmt.Errorf("Error initializing key-value store using path %s: %w", databasePath, err) + } + + return &sessionCache{ + store: store, + }, nil +} + +// Get returns the cached session, if found, for an app+chain combination. +// The caller is responsible to verify that the returned session is valid for the current block height. +// Get is NOT safe to use concurrently +// DISCUSS: do we need concurrency here? +func (s *sessionCache) Get(appAddr, chain string) (*rpc.Session, error) { + key := sessionKey(appAddr, chain) + bz, err := s.store.Get(key) + if err != nil { + return nil, fmt.Errorf("error getting session from the store: %s %w", err.Error(), errSessionNotFound) + } + + var session rpc.Session + if err := json.Unmarshal(bz, &session); err != nil { + return nil, fmt.Errorf("error unmarshalling session from store: %w", err) + } + + return &session, nil +} + +// Set stores the provided session in the cache with the key being the app+chain combination. +// For each app+chain combination, a single session will be stored. Subsequent calls to Set will overwrite the entry for the provided app and chain. +// Set is NOT safe to use concurrently +func (s *sessionCache) Set(session *rpc.Session) error { + bz, err := json.Marshal(*session) + if err != nil { + return fmt.Errorf("error marshalling session for app: %s, chain: %s, session height: %d: %w", session.Application.Address, session.Chain, session.SessionHeight, err) + } + + key := sessionKey(session.Application.Address, session.Chain) + if err := s.store.Set(key, bz); err != nil { + return fmt.Errorf("error storing session for app: %s, chain: %s, session height: %d in the cache: %w", session.Application.Address, session.Chain, session.SessionHeight, err) + } + return nil +} + +// Stop call stop on the backing store. No calls should be made to Get or Set after calling Stop. +func (s *sessionCache) Stop() error { + return s.store.Stop() +} + +// sessionKey returns a key to get/set a session, based on application's address and the relay chain. +// +// The height is not used as part of the key, because for each app+chain combination only one session, i.e. the current one, is of interest. +func sessionKey(appAddr, chain string) []byte { + return []byte(fmt.Sprintf("%s-%s", appAddr, chain)) +} diff --git a/app/client/cli/cache/session_test.go b/app/client/cli/cache/session_test.go new file mode 100644 index 000000000..4b5afbaec --- /dev/null +++ b/app/client/cli/cache/session_test.go @@ -0,0 +1,75 @@ +package cache + +import ( + "os" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/pokt-network/pocket/rpc" +) + +func TestGet(t *testing.T) { + const ( + app1 = "app1Addr" + relaychainEth = "ETH-Goerli" + numSessionBlocks = 4 + sessionHeight = 8 + sessionNumber = 2 + ) + + session1 := &rpc.Session{ + Application: rpc.ProtocolActor{ + ActorType: rpc.Application, + Address: "app1Addr", + Chains: []string{relaychainEth}, + }, + Chain: relaychainEth, + NumSessionBlocks: numSessionBlocks, + SessionHeight: sessionHeight, + SessionNumber: sessionNumber, + } + + testCases := []struct { + name string + cacheContents []*rpc.Session + app string + chain string + expected *rpc.Session + expectedErr error + }{ + { + name: "Return cached session", + cacheContents: []*rpc.Session{session1}, + app: app1, + chain: relaychainEth, + expected: session1, + }, + { + name: "Error returned for session not found in cache", + app: "foo", + chain: relaychainEth, + expectedErr: errSessionNotFound, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + dbPath, err := os.MkdirTemp("", "cacheStoragePath") + require.NoError(t, err) + defer os.RemoveAll(dbPath) + + cache, err := NewSessionCache(dbPath) + require.NoError(t, err) + + for _, s := range tc.cacheContents { + err := cache.Set(s) + require.NoError(t, err) + } + + got, err := cache.Get(tc.app, tc.chain) + require.ErrorIs(t, err, tc.expectedErr) + require.EqualValues(t, tc.expected, got) + }) + } +} diff --git a/app/client/cli/servicer.go b/app/client/cli/servicer.go index 5787d2d7e..0ed35dff4 100644 --- a/app/client/cli/servicer.go +++ b/app/client/cli/servicer.go @@ -4,19 +4,39 @@ import ( "context" "encoding/hex" "encoding/json" + "errors" "fmt" "net/http" "github.com/spf13/cobra" + "github.com/pokt-network/pocket/app/client/cli/cache" "github.com/pokt-network/pocket/app/client/cli/flags" + "github.com/pokt-network/pocket/logger" "github.com/pokt-network/pocket/rpc" coreTypes "github.com/pokt-network/pocket/shared/core/types" "github.com/pokt-network/pocket/shared/crypto" ) +// IMPROVE: make this configurable +const sessionCacheDBPath = "/tmp" + +var ( + errNoSessionCache = errors.New("session cache not set up") + errSessionNotFoundInCache = errors.New("session not found in cache") + errNoMatchingSessionInCache = errors.New("no session matching the requested height found in cache") + + sessionCache cache.SessionCache +) + func init() { rootCmd.AddCommand(NewServicerCommand()) + + var err error + sessionCache, err = cache.NewSessionCache(sessionCacheDBPath) + if err != nil { + logger.Global.Warn().Err(err).Msg("failed to initialize session cache") + } } func NewServicerCommand() *cobra.Command { @@ -52,6 +72,12 @@ Will prompt the user for the *application* account passphrase`, Aliases: []string{}, Args: cobra.ExactArgs(4), RunE: func(cmd *cobra.Command, args []string) error { + defer func() { + if err := sessionCache.Stop(); err != nil { + logger.Global.Warn().Err(err).Msg("failed to stop session cache") + } + }() + applicationAddr := args[0] servicerAddr := args[1] chain := args[2] @@ -115,6 +141,25 @@ func validateServicer(session *rpc.Session, servicerAddress string) (*rpc.Protoc return nil, fmt.Errorf("Error getting servicer: address %s does not match any servicers in the session %d", servicerAddress, session.SessionNumber) } +// getSessionFromCache uses the client-side session cache to fetch a session for app+chain combination at the provided height, if one has already been retrieved and cached. +func getSessionFromCache(c cache.SessionCache, appAddress, chain string, height int64) (*rpc.Session, error) { + if c == nil { + return nil, errNoSessionCache + } + + session, err := c.Get(appAddress, chain) + if err != nil { + return nil, fmt.Errorf("%w: %s", errSessionNotFoundInCache, err.Error()) + } + + // verify the cached session matches the provided height + if height >= session.SessionHeight && height < session.SessionHeight+session.NumSessionBlocks { + return session, nil + } + + return nil, errNoMatchingSessionInCache +} + func getCurrentSession(ctx context.Context, appAddress, chain string) (*rpc.Session, error) { // CONSIDERATION: passing 0 as the height value to get the current session seems more optimal than this. currentHeight, err := getCurrentHeight(ctx) @@ -122,6 +167,11 @@ func getCurrentSession(ctx context.Context, appAddress, chain string) (*rpc.Sess return nil, fmt.Errorf("Error getting current session: %w", err) } + session, err := getSessionFromCache(sessionCache, appAddress, chain, currentHeight) + if err == nil { + return session, nil + } + req := rpc.SessionRequest{ AppAddress: appAddress, Chain: chain, @@ -148,7 +198,17 @@ func getCurrentSession(ctx context.Context, appAddress, chain string) (*rpc.Sess return nil, fmt.Errorf("Error getting current session: Unexpected response %v", resp) } - return resp.JSON200, nil + session = resp.JSON200 + if sessionCache == nil { + logger.Global.Warn().Msg("session cache not available: cannot cache the retrieved session") + return session, nil + } + + if err := sessionCache.Set(session); err != nil { + logger.Global.Warn().Err(err).Msg("failed to store session in cache") + } + + return session, nil } // REFACTOR: reuse this function in all the query commands diff --git a/app/client/cli/servicer_test.go b/app/client/cli/servicer_test.go new file mode 100644 index 000000000..cd84a1e87 --- /dev/null +++ b/app/client/cli/servicer_test.go @@ -0,0 +1,88 @@ +package cli + +import ( + "os" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/pokt-network/pocket/app/client/cli/cache" + "github.com/pokt-network/pocket/rpc" +) + +const ( + testRelaychainEth = "ETH-Goerli" + testSessionHeight = 8 + testCurrentHeight = 9 +) + +func TestGetSessionFromCache(t *testing.T) { + const app1Addr = "app1Addr" + + testCases := []struct { + name string + cachedSessions []*rpc.Session + expected *rpc.Session + expectedErr error + }{ + { + name: "cached session is returned", + cachedSessions: []*rpc.Session{testSession(app1Addr, testSessionHeight)}, + expected: testSession(app1Addr, testSessionHeight), + }, + { + name: "nil session cache returns an error", + expectedErr: errNoSessionCache, + }, + { + name: "session not found in cache", + cachedSessions: []*rpc.Session{testSession("foo", testSessionHeight)}, + expectedErr: errSessionNotFoundInCache, + }, + { + name: "cached session does not match the provided height", + cachedSessions: []*rpc.Session{testSession(app1Addr, 9999999)}, + expectedErr: errNoMatchingSessionInCache, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + var c cache.SessionCache + // prepare cache with test session for this unit test + if len(tc.cachedSessions) > 0 { + dbPath, err := os.MkdirTemp("", "cliCacheStoragePath") + require.NoError(t, err) + defer os.RemoveAll(dbPath) + + c, err = cache.NewSessionCache(dbPath) + require.NoError(t, err) + + for _, s := range tc.cachedSessions { + err := c.Set(s) + require.NoError(t, err) + } + } + + got, err := getSessionFromCache(c, app1Addr, testRelaychainEth, testCurrentHeight) + require.ErrorIs(t, err, tc.expectedErr) + require.EqualValues(t, tc.expected, got) + }) + } +} + +func testSession(appAddr string, height int64) *rpc.Session { + const numSessionBlocks = 4 + + return &rpc.Session{ + Application: rpc.ProtocolActor{ + ActorType: rpc.Application, + Address: appAddr, + Chains: []string{testRelaychainEth}, + }, + Chain: testRelaychainEth, + NumSessionBlocks: numSessionBlocks, + SessionHeight: height, + SessionNumber: (height / numSessionBlocks), // assumes numSessionBlocks never changed + } +}