Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor event subscriber #636

Merged
merged 6 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
@@ -1 +1 @@
* @janezpodhostnik @peterargue @m-Peter @zhangchiqing @ramtinms
* @janezpodhostnik @peterargue @m-Peter @zhangchiqing
6 changes: 3 additions & 3 deletions bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,11 @@ func (b *Bootstrap) StartEventIngestion(ctx context.Context) error {
Msg("indexing cadence height information")

// create event subscriber
subscriber := ingestion.NewRPCSubscriber(
subscriber := ingestion.NewRPCEventSubscriber(
b.logger,
b.client,
b.config.HeartbeatInterval,
b.config.FlowNetworkID,
b.logger,
latestCadenceHeight,
)

// initialize event ingestion engine
Expand Down
1 change: 0 additions & 1 deletion cmd/run/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,6 @@ func init() {
Cmd.Flags().Float64Var(&cfg.StreamLimit, "stream-limit", 10, "Rate-limits the events sent to the client within one second")
Cmd.Flags().Uint64Var(&cfg.RateLimit, "rate-limit", 50, "Rate-limit requests per second made by the client over any protocol (ws/http)")
Cmd.Flags().StringVar(&cfg.AddressHeader, "address-header", "", "Address header that contains the client IP, this is useful when the server is behind a proxy that sets the source IP of the client. Leave empty if no proxy is used.")
Cmd.Flags().Uint64Var(&cfg.HeartbeatInterval, "heartbeat-interval", 100, "Heartbeat interval for AN event subscription")
Cmd.Flags().UintVar(&cfg.CacheSize, "script-cache-size", 10000, "Cache size used for script execution in items kept in cache")
Cmd.Flags().IntVar(&streamTimeout, "stream-timeout", 3, "Defines the timeout in seconds the server waits for the event to be sent to the client")
Cmd.Flags().Uint64Var(&forceStartHeight, "force-start-height", 0, "Force set starting Cadence height. WARNING: This should only be used locally or for testing, never in production.")
Expand Down
2 changes: 0 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,6 @@ type Config struct {
FilterExpiry time.Duration
// ForceStartCadenceHeight will force set the starting Cadence height, this should be only used for testing or locally.
ForceStartCadenceHeight uint64
// HeartbeatInterval sets custom heartbeat interval for events
HeartbeatInterval uint64
// TracesBucketName sets the GCP bucket name where transaction traces are being stored.
TracesBucketName string
// TracesEnabled sets whether the node is supporting transaction traces.
Expand Down
14 changes: 4 additions & 10 deletions services/ingestion/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,25 +98,19 @@ func (e *Engine) Stop() {
// drops.
// All other errors are unexpected.
func (e *Engine) Run(ctx context.Context) error {
latestCadence, err := e.blocks.LatestCadenceHeight()
if err != nil {
return fmt.Errorf("failed to get latest cadence height: %w", err)
}

e.log.Info().Uint64("start-cadence-height", latestCadence).Msg("starting ingestion")
e.log.Info().Msg("starting ingestion")

e.MarkReady()

for events := range e.subscriber.Subscribe(ctx, latestCadence) {
for events := range e.subscriber.Subscribe(ctx) {
if events.Err != nil {
return fmt.Errorf(
"failure in event subscription at height %d, with: %w",
latestCadence,
"failure in event subscription with: %w",
events.Err,
)
}

err = e.processEvents(events.Events)
err := e.processEvents(events.Events)
if err != nil {
e.log.Error().Err(err).Msg("failed to process EVM events")
return err
Expand Down
21 changes: 10 additions & 11 deletions services/ingestion/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ func TestSerialBlockIngestion(t *testing.T) {

subscriber := &mocks.EventSubscriber{}
subscriber.
On("Subscribe", mock.Anything, mock.AnythingOfType("uint64")).
Return(func(ctx context.Context, latest uint64) <-chan models.BlockEvents {
On("Subscribe", mock.Anything).
Return(func(ctx context.Context) <-chan models.BlockEvents {
return eventsChan
})

Expand Down Expand Up @@ -136,8 +136,8 @@ func TestSerialBlockIngestion(t *testing.T) {
eventsChan := make(chan models.BlockEvents)
subscriber := &mocks.EventSubscriber{}
subscriber.
On("Subscribe", mock.Anything, mock.AnythingOfType("uint64")).
Return(func(ctx context.Context, latest uint64) <-chan models.BlockEvents {
On("Subscribe", mock.Anything).
Return(func(ctx context.Context) <-chan models.BlockEvents {
return eventsChan
})

Expand Down Expand Up @@ -246,8 +246,8 @@ func TestBlockAndTransactionIngestion(t *testing.T) {
eventsChan := make(chan models.BlockEvents)
subscriber := &mocks.EventSubscriber{}
subscriber.
On("Subscribe", mock.Anything, mock.AnythingOfType("uint64")).
Return(func(ctx context.Context, latest uint64) <-chan models.BlockEvents {
On("Subscribe", mock.Anything).
Return(func(ctx context.Context) <-chan models.BlockEvents {
return eventsChan
})

Expand Down Expand Up @@ -349,8 +349,8 @@ func TestBlockAndTransactionIngestion(t *testing.T) {
eventsChan := make(chan models.BlockEvents)
subscriber := &mocks.EventSubscriber{}
subscriber.
On("Subscribe", mock.Anything, mock.AnythingOfType("uint64")).
Return(func(ctx context.Context, latest uint64) <-chan models.BlockEvents {
On("Subscribe", mock.Anything).
Return(func(ctx context.Context) <-chan models.BlockEvents {
return eventsChan
})

Expand Down Expand Up @@ -448,9 +448,8 @@ func TestBlockAndTransactionIngestion(t *testing.T) {
eventsChan := make(chan models.BlockEvents)
subscriber := &mocks.EventSubscriber{}
subscriber.
On("Subscribe", mock.Anything, mock.AnythingOfType("uint64")).
Return(func(ctx context.Context, latest uint64) <-chan models.BlockEvents {
assert.Equal(t, latestCadenceHeight, latest)
On("Subscribe", mock.Anything).
Return(func(ctx context.Context) <-chan models.BlockEvents {
return eventsChan
}).
Once()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,33 +24,35 @@ type EventSubscriber interface {
//
// The BlockEvents type will contain an optional error in case
// the error happens, the consumer of the chanel should handle it.
Subscribe(ctx context.Context, height uint64) <-chan models.BlockEvents
Subscribe(ctx context.Context) <-chan models.BlockEvents
}

var _ EventSubscriber = &RPCSubscriber{}
var _ EventSubscriber = &RPCEventSubscriber{}

type RPCSubscriber struct {
client *requester.CrossSporkClient
chain flowGo.ChainID
heartbeatInterval uint64
logger zerolog.Logger
type RPCEventSubscriber struct {
logger zerolog.Logger

client *requester.CrossSporkClient
chain flowGo.ChainID
height uint64

recovery bool
recoveredEvents []flow.Event
}

func NewRPCSubscriber(
func NewRPCEventSubscriber(
logger zerolog.Logger,
client *requester.CrossSporkClient,
heartbeatInterval uint64,
chainID flowGo.ChainID,
logger zerolog.Logger,
) *RPCSubscriber {
startHeight uint64,
) *RPCEventSubscriber {
logger = logger.With().Str("component", "subscriber").Logger()
return &RPCSubscriber{
client: client,
heartbeatInterval: heartbeatInterval,
chain: chainID,
logger: logger,
return &RPCEventSubscriber{
logger: logger,

client: client,
chain: chainID,
height: startHeight,
}
}

Expand All @@ -59,58 +61,58 @@ func NewRPCSubscriber(
// to listen all new events in the current spork.
//
// If error is encountered during backfill the subscription will end and the response chanel will be closed.
func (r *RPCSubscriber) Subscribe(ctx context.Context, height uint64) <-chan models.BlockEvents {
events := make(chan models.BlockEvents)
func (r *RPCEventSubscriber) Subscribe(ctx context.Context) <-chan models.BlockEvents {
eventsChan := make(chan models.BlockEvents)

go func() {
defer func() {
close(events)
close(eventsChan)
}()

// if the height is from the previous spork, backfill all the events from previous sporks first
if r.client.IsPastSpork(height) {
// if the height is from the previous spork, backfill all the eventsChan from previous sporks first
if r.client.IsPastSpork(r.height) {
r.logger.Info().
Uint64("height", height).
Uint64("height", r.height).
Msg("height found in previous spork, starting to backfill")

// backfill all the missed events, handling of context cancellation is done by the producer
for ev := range r.backfill(ctx, height) {
events <- ev
for ev := range r.backfill(ctx, r.height) {
eventsChan <- ev

if ev.Err != nil {
return
}

// keep updating height, so after we are done back-filling
// it will be at the first height in the current spork
height = ev.Events.CadenceHeight()
r.height = ev.Events.CadenceHeight()
}

// after back-filling is done, increment height by one,
// so we start with the height in the current spork
height = height + 1
r.height = r.height + 1
}

r.logger.Info().
Uint64("next-height", height).
Uint64("next-height", r.height).
Msg("backfilling done, subscribe for live data")

// subscribe in the current spork, handling of context cancellation is done by the producer
for ev := range r.subscribe(ctx, height, access.WithHeartbeatInterval(r.heartbeatInterval)) {
events <- ev
for ev := range r.subscribe(ctx, r.height) {
eventsChan <- ev
}

r.logger.Warn().Msg("ended subscription for events")
}()

return events
return eventsChan
}

// subscribe to events by the provided height and handle any errors.
//
// Subscribing to EVM specific events and handle any disconnection errors
// as well as context cancellations.
func (r *RPCSubscriber) subscribe(ctx context.Context, height uint64, opts ...access.SubscribeOption) <-chan models.BlockEvents {
func (r *RPCEventSubscriber) subscribe(ctx context.Context, height uint64) <-chan models.BlockEvents {
eventsChan := make(chan models.BlockEvents)

_, err := r.client.GetBlockHeaderByHeight(ctx, height)
Expand All @@ -120,7 +122,13 @@ func (r *RPCSubscriber) subscribe(ctx context.Context, height uint64, opts ...ac
return eventsChan
}

eventStream, errChan, err := r.client.SubscribeEventsByBlockHeight(ctx, height, r.blocksFilter(), opts...)
// we always use heartbeat interval of 1 to have the least amount of delay from the access node
eventStream, errChan, err := r.client.SubscribeEventsByBlockHeight(
ctx,
height,
blocksFilter(r.chain),
access.WithHeartbeatInterval(1),
)
Comment on lines +125 to +131
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider making heartbeat interval configurable.

The heartbeat interval is hardcoded to 1 for minimal delay. Consider making this configurable through RPCEventSubscriberConfig to allow for different trade-offs between latency and resource usage in different environments.

 type RPCEventSubscriberConfig struct {
+  HeartbeatInterval uint // defaults to 1 for minimal delay
 }

 func (r *RPCEventSubscriber) subscribe(ctx context.Context, height uint64) <-chan models.BlockEvents {
   // ...
   eventStream, errChan, err := r.client.SubscribeEventsByBlockHeight(
     ctx,
     height,
     blocksFilter(r.chain),
-    access.WithHeartbeatInterval(1),
+    access.WithHeartbeatInterval(r.config.HeartbeatInterval),
   )
   // ...
 }

Committable suggestion was skipped due to low confidence.

if err != nil {
eventsChan <- models.NewBlockEventsError(
fmt.Errorf("failed to subscribe to events by block height: %d, with: %w", height, err),
Expand Down Expand Up @@ -187,12 +195,12 @@ func (r *RPCSubscriber) subscribe(ctx context.Context, height uint64, opts ...ac
// and check for each event it receives whether we reached the end, if we reach the end it will increase
// the height by one (next height), and check if we are still in previous sporks, if so repeat everything,
// otherwise return.
func (r *RPCSubscriber) backfill(ctx context.Context, height uint64) <-chan models.BlockEvents {
events := make(chan models.BlockEvents)
func (r *RPCEventSubscriber) backfill(ctx context.Context, height uint64) <-chan models.BlockEvents {
eventsChan := make(chan models.BlockEvents)

go func() {
defer func() {
close(events)
close(eventsChan)
}()

for {
Expand All @@ -207,7 +215,7 @@ func (r *RPCSubscriber) backfill(ctx context.Context, height uint64) <-chan mode

latestHeight, err := r.client.GetLatestHeightForSpork(ctx, height)
if err != nil {
events <- models.NewBlockEventsError(err)
eventsChan <- models.NewBlockEventsError(err)
return
}

Expand All @@ -216,8 +224,8 @@ func (r *RPCSubscriber) backfill(ctx context.Context, height uint64) <-chan mode
Uint64("last-spork-height", latestHeight).
Msg("backfilling spork")

for ev := range r.subscribe(ctx, height, access.WithHeartbeatInterval(1)) {
events <- ev
for ev := range r.subscribe(ctx, height) {
eventsChan <- ev

if ev.Err != nil {
return
Expand All @@ -238,48 +246,22 @@ func (r *RPCSubscriber) backfill(ctx context.Context, height uint64) <-chan mode
}
}()

return events
}

// blockFilter define events we subscribe to:
// A.{evm}.EVM.BlockExecuted and A.{evm}.EVM.TransactionExecuted,
// where {evm} is EVM deployed contract address, which depends on the chain ID we configure.
func (r *RPCSubscriber) blocksFilter() flow.EventFilter {
evmAddress := common.Address(systemcontracts.SystemContractsForChain(r.chain).EVMContract.Address)

blockExecutedEvent := common.NewAddressLocation(
nil,
evmAddress,
string(events.EventTypeBlockExecuted),
).ID()

transactionExecutedEvent := common.NewAddressLocation(
nil,
evmAddress,
string(events.EventTypeTransactionExecuted),
).ID()

return flow.EventFilter{
EventTypes: []string{
blockExecutedEvent,
transactionExecutedEvent,
},
}
return eventsChan
}

// fetchMissingData is used as a backup mechanism for fetching EVM-related
// events, when the event streaming API returns an inconsistent response.
// An inconsistent response could be an EVM block that references EVM
// transactions which are not present in the response. It falls back
// to using grpc requests instead of streaming.
func (r *RPCSubscriber) fetchMissingData(
func (r *RPCEventSubscriber) fetchMissingData(
ctx context.Context,
blockEvents flow.BlockEvents,
) models.BlockEvents {
// remove existing events
blockEvents.Events = nil

for _, eventType := range r.blocksFilter().EventTypes {
for _, eventType := range blocksFilter(r.chain).EventTypes {
recoveredEvents, err := r.client.GetEventsForHeightRange(
ctx,
eventType,
Expand Down Expand Up @@ -309,7 +291,7 @@ func (r *RPCSubscriber) fetchMissingData(
// accumulateEventsMissingBlock will keep receiving transaction events until it can produce a valid
// EVM block event containing a block and transactions. At that point it will reset the recovery mode
// and return the valid block events.
func (r *RPCSubscriber) accumulateEventsMissingBlock(events flow.BlockEvents) models.BlockEvents {
func (r *RPCEventSubscriber) accumulateEventsMissingBlock(events flow.BlockEvents) models.BlockEvents {
r.recoveredEvents = append(r.recoveredEvents, events.Events...)
events.Events = r.recoveredEvents

Expand All @@ -329,7 +311,7 @@ func (r *RPCSubscriber) accumulateEventsMissingBlock(events flow.BlockEvents) mo
// in which case we might miss one of the events (missing transaction), or it can be
// due to a failure from the system transaction which commits an EVM block, which results
// in missing EVM block event but present transactions.
func (r *RPCSubscriber) recover(
func (r *RPCEventSubscriber) recover(
ctx context.Context,
events flow.BlockEvents,
err error,
Expand All @@ -349,3 +331,29 @@ func (r *RPCSubscriber) recover(

return models.NewBlockEventsError(err)
}

// blockFilter define events we subscribe to:
// A.{evm}.EVM.BlockExecuted and A.{evm}.EVM.TransactionExecuted,
// where {evm} is EVM deployed contract address, which depends on the chain ID we configure.
func blocksFilter(chainId flowGo.ChainID) flow.EventFilter {
evmAddress := common.Address(systemcontracts.SystemContractsForChain(chainId).EVMContract.Address)

blockExecutedEvent := common.NewAddressLocation(
nil,
evmAddress,
string(events.EventTypeBlockExecuted),
).ID()

transactionExecutedEvent := common.NewAddressLocation(
nil,
evmAddress,
string(events.EventTypeTransactionExecuted),
).ID()

return flow.EventFilter{
EventTypes: []string{
blockExecutedEvent,
transactionExecutedEvent,
},
}
}
Comment on lines +341 to +359
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Simplify the blocksFilter function

The blocksFilter function constructs event type IDs using common.NewAddressLocation, which can be streamlined. Consider simplifying the creation of event type identifiers for better readability and maintainability.

Apply the following diff to simplify the code:

 func blocksFilter(chainId flowGo.ChainID) flow.EventFilter {
-    evmAddress := common.Address(systemcontracts.SystemContractsForChain(chainId).EVMContract.Address)
-
-    blockExecutedEvent := common.NewAddressLocation(
-        nil,
-        evmAddress,
-        string(events.EventTypeBlockExecuted),
-    ).ID()
-
-    transactionExecutedEvent := common.NewAddressLocation(
-        nil,
-        evmAddress,
-        string(events.EventTypeTransactionExecuted),
-    ).ID()
+    evmAddress := systemcontracts.
+        SystemContractsForChain(chainId).
+        EVMContract.
+        Address.
+        String()
+
+    blockExecutedEvent := fmt.Sprintf(
+        "A.%s.%s",
+        evmAddress,
+        events.EventTypeBlockExecuted,
+    )
+
+    transactionExecutedEvent := fmt.Sprintf(
+        "A.%s.%s",
+        evmAddress,
+        events.EventTypeTransactionExecuted,
+    )

     return flow.EventFilter{
         EventTypes: []string{
             blockExecutedEvent,
             transactionExecutedEvent,
         },
     }
 }

This refactoring enhances clarity by directly formatting the event type strings.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
blockExecutedEvent := common.NewAddressLocation(
nil,
evmAddress,
string(events.EventTypeBlockExecuted),
).ID()
transactionExecutedEvent := common.NewAddressLocation(
nil,
evmAddress,
string(events.EventTypeTransactionExecuted),
).ID()
return flow.EventFilter{
EventTypes: []string{
blockExecutedEvent,
transactionExecutedEvent,
},
}
}
evmAddress := systemcontracts.
SystemContractsForChain(chainId).
EVMContract.
Address.
String()
blockExecutedEvent := fmt.Sprintf(
"A.%s.%s",
evmAddress,
events.EventTypeBlockExecuted,
)
transactionExecutedEvent := fmt.Sprintf(
"A.%s.%s",
evmAddress,
events.EventTypeTransactionExecuted,
)
return flow.EventFilter{
EventTypes: []string{
blockExecutedEvent,
transactionExecutedEvent,
},
}
}

Loading
Loading