Skip to content

Commit

Permalink
delayed batch submitted event sync
Browse files Browse the repository at this point in the history
  • Loading branch information
Sulejman committed Sep 12, 2024
1 parent 2d1ff73 commit 15310f9
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 2 deletions.
30 changes: 28 additions & 2 deletions pkgs/helpers/prost/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,7 @@ func ProcessEvents(block *types.Block, contractABI abi.ABI) {
}
if event.DataMarketAddress.Hex() == config.SettingsObj.DataMarketAddress {
log.Debugf("Daily Task Completed at block %d: day: %s\n", block.Header().Number, event.DayId.String())
// get hash for tx log is coming from
txHash := vLog.TxHash.Hex()
// get receipt for tx log is coming from
receipt, err := Client.TransactionReceipt(context.Background(), common.HexToHash(txHash))
if err != nil {
clients.SendFailureNotification("ProcessEvents", fmt.Sprintf("Unable to fetch transaction receipt for tx %s: %s", txHash, err.Error()), time.Now().String(), "Medium")
Expand Down Expand Up @@ -121,6 +119,34 @@ func ProcessEvents(block *types.Block, contractABI abi.ABI) {
log.Errorln("Unable to set snapshot batch submitted in redis:", err.Error())
}
}
case contractABI.Events["DelayedBatchSubmitted"].ID.Hex():
event, err := Instance.ParseDelayedBatchSubmitted(vLog)
if err != nil {
clients.SendFailureNotification("DelayedBatchSubmittedEvent parse error", err.Error(), time.Now().String(), "High")
log.Errorln("Error unpacking DelayedBatchSubmittedEvent:", err)
continue
}
if event.DataMarketAddress.Hex() == config.SettingsObj.DataMarketAddress {
log.Debugf("Delayed Batch Submitted at block %d: epochId: %s batchId: %s\n", block.Header().Number, event.EpochId.String(), event.BatchId.String())
txHash := vLog.TxHash.Hex()
receipt, err := Client.TransactionReceipt(context.Background(), common.HexToHash(txHash))
if err != nil {
clients.SendFailureNotification("ProcessEvents", fmt.Sprintf("Unable to fetch transaction receipt for tx %s: %s", txHash, err.Error()), time.Now().String(), "Medium")
log.Errorln("Unable to fetch transaction receipt for tx", txHash, err.Error())
continue
}
receiptMarshalled, err := json.Marshal(receipt)
if err != nil {
clients.SendFailureNotification("ProcessEvents", fmt.Sprintf("Unable to marshal transaction receipt for tx %s: %s", txHash, err.Error()), time.Now().String(), "Medium")
log.Errorln("Unable to marshal transaction receipt for tx", txHash, err.Error())
continue
}
receiptString := string(receiptMarshalled)
if err = redis.Set(context.Background(), redis.ReceiptProcessed(txHash), receiptString, time.Hour); err != nil {
clients.SendFailureNotification("ProcessEvents", fmt.Sprintf("Unable to set delayed batch submitted in redis: %s", err.Error()), time.Now().String(), "High")
log.Errorln("Unable to set snapshot batch submitted in redis:", err.Error())
}
}
}

}
Expand Down
1 change: 1 addition & 0 deletions pkgs/helpers/prost/txManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func (tm *TxManager) GetTxReceipt(txHash common.Hash, identifier string) (*types
return err
}
log.Debugf("Fetched receipt for tx %s: %v", txHash.Hex(), receipt)
redis.Delete(context.Background(), redis.ReceiptProcessed(txHash.Hex()))
err = redis.RedisClient.Incr(context.Background(), redis.TransactionReceiptCountByEvent(identifier)).Err()
if err != nil {
clients.SendFailureNotification("GetTxReceipt", fmt.Sprintf("Failed to increment txreceipt count in Redis: %s", err.Error()), time.Now().String(), "Low")
Expand Down

0 comments on commit 15310f9

Please sign in to comment.