Skip to content

Commit

Permalink
Fetch logs instead of receipts
Browse files Browse the repository at this point in the history
  • Loading branch information
Sulejman committed Aug 19, 2024
1 parent e87d4e7 commit b196fad
Showing 1 changed file with 30 additions and 24 deletions.
54 changes: 30 additions & 24 deletions pkgs/helpers/prost/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,30 @@ import (
"collector/pkgs/helpers/redis"
"context"
"fmt"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"math/big"
"strconv"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rpc"
log "github.com/sirupsen/logrus"
)

func ProcessEvents(block *types.Block, contractABI abi.ABI) {
var receipts []*types.Receipt
var logs []types.Log
var err error

hash := block.Hash()
filterQuery := ethereum.FilterQuery{
BlockHash: &hash,
Addresses: []common.Address{common.HexToAddress(config.SettingsObj.DataMarketAddress)},
}

operation := func() error {
receipts, err = Client.BlockReceipts(context.Background(), rpc.BlockNumberOrHashWithNumber(rpc.BlockNumber(block.Number().Int64())))
logs, err = Client.FilterLogs(context.Background(), filterQuery)
return err
}

Expand All @@ -32,29 +40,27 @@ func ProcessEvents(block *types.Block, contractABI abi.ABI) {
clients.SendFailureNotification("ProcessEvents", fmt.Sprintf("Error fetching block receipts: %s", err.Error()), time.Now().String(), "High")
return
}
for _, receipt := range receipts {
for _, vLog := range receipt.Logs {
if vLog.Address.Hex() != config.SettingsObj.ContractAddress {
for _, vLog := range logs {
if vLog.Address.Hex() != config.SettingsObj.ContractAddress {
continue
}
switch vLog.Topics[0].Hex() {
case contractABI.Events["EpochReleased"].ID.Hex():
event, err := Instance.ParseEpochReleased(vLog)
if err != nil {
clients.SendFailureNotification("EpochRelease parse error", err.Error(), time.Now().String(), "High")
log.Errorln("Error unpacking epochReleased event:", err)
continue
}
switch vLog.Topics[0].Hex() {
case contractABI.Events["EpochReleased"].ID.Hex():
event, err := Instance.ParseEpochReleased(*vLog)
if err != nil {
clients.SendFailureNotification("EpochRelease parse error", err.Error(), time.Now().String(), "High")
log.Errorln("Error unpacking epochReleased event:", err)
continue
}
if event.DataMarketAddress.Hex() == config.SettingsObj.DataMarketAddress {
log.Debugf("Epoch Released at block %d: %s\n", block.Header().Number, event.EpochId.String())
if CurrentEpochID.Cmp(event.EpochId) < 0 {
CurrentEpochID = event.EpochId
submissionLimit := UpdateSubmissionLimit(new(big.Int).Set(block.Number()))
go processEpoch(event.EpochId, submissionLimit, block)
if err = redis.Set(context.Background(), pkgs.CurrentEpoch, CurrentEpochID.String(), 0); err != nil {
clients.SendFailureNotification("ProcessEvents", fmt.Sprintf("Unable to update current epoch in redis: %s", err.Error()), time.Now().String(), "High")
log.Errorln("Unable to update current epoch in redis:", err.Error())
}
if event.DataMarketAddress.Hex() == config.SettingsObj.DataMarketAddress {
log.Debugf("Epoch Released at block %d: %s\n", block.Header().Number, event.EpochId.String())
if CurrentEpochID.Cmp(event.EpochId) < 0 {
CurrentEpochID = event.EpochId
submissionLimit := UpdateSubmissionLimit(new(big.Int).Set(block.Number()))
go processEpoch(event.EpochId, submissionLimit, block)
if err = redis.Set(context.Background(), pkgs.CurrentEpoch, CurrentEpochID.String(), 0); err != nil {
clients.SendFailureNotification("ProcessEvents", fmt.Sprintf("Unable to update current epoch in redis: %s", err.Error()), time.Now().String(), "High")
log.Errorln("Unable to update current epoch in redis:", err.Error())
}
}
}
Expand Down

0 comments on commit b196fad

Please sign in to comment.