diff --git a/pkgs/helpers/clients/txClient.go b/pkgs/helpers/clients/txClient.go index c4629a7..ffffe8e 100644 --- a/pkgs/helpers/clients/txClient.go +++ b/pkgs/helpers/clients/txClient.go @@ -25,7 +25,6 @@ type SubmissionBatchSizeRequest struct { type SubmitSubmissionBatchRequest struct { DataMarketAddress string `json:"dataMarket"` BatchCid string `json:"batchCid"` - BatchId string `json:"batchId"` EpochId *big.Int `json:"epochId"` ProjectIds []string `json:"projectIds"` SnapshotCids []string `json:"snapshotCids"` @@ -76,11 +75,10 @@ func SendSubmissionBatchSize(epochId *big.Int, size int) error { return nil } -func SubmitSubmissionBatch(dataMarketAddress string, batchCid string, batchId string, epochId *big.Int, projectIds []string, snapshotCids []string, finalizedCidsRootHash string) error { +func SubmitSubmissionBatch(dataMarketAddress string, batchCid string, epochId *big.Int, projectIds []string, snapshotCids []string, finalizedCidsRootHash string) error { request := SubmitSubmissionBatchRequest{ DataMarketAddress: dataMarketAddress, BatchCid: batchCid, - BatchId: batchId, EpochId: epochId, ProjectIds: projectIds, SnapshotCids: snapshotCids, diff --git a/pkgs/helpers/ipfs/ipfs.go b/pkgs/helpers/ipfs/ipfs.go index b99cef1..807b378 100644 --- a/pkgs/helpers/ipfs/ipfs.go +++ b/pkgs/helpers/ipfs/ipfs.go @@ -17,7 +17,6 @@ var IPFSCon *shell.Shell // Batch represents your data structure type Batch struct { - ID *big.Int `json:"id"` SubmissionIds []string `json:"submissionIds"` Submissions []string `json:"submissions"` RootHash string `json:"roothash"` diff --git a/pkgs/helpers/merkle/merkle.go b/pkgs/helpers/merkle/merkle.go index 94a6686..49447eb 100644 --- a/pkgs/helpers/merkle/merkle.go +++ b/pkgs/helpers/merkle/merkle.go @@ -11,7 +11,6 @@ import ( "fmt" "math/big" "sort" - "strconv" "strings" "sync" "time" @@ -22,8 +21,6 @@ import ( "google.golang.org/protobuf/encoding/protojson" ) -var BatchId int - func UpdateMerkleTree(sortedData []string, tree *imt.IncrementalMerkleTree) (*imt.IncrementalMerkleTree, error) { for _, value := range sortedData { err := tree.AddLeaf([]byte(value)) @@ -101,7 +98,7 @@ func finalizeBatches(batchedKeys [][]string, epochId *big.Int, tree *imt.Increme localProjectValueFrequencies := make(map[string]map[string]int) for _, key := range batch { - val, err := redis.Get(context.Background(), key) // submission data is uuid.submission_json + val, err := redis.Get(context.Background(), key) // submission data is uuid.submission_json if err != nil { clients.SendFailureNotification("finalizeBatches", fmt.Sprintf("Error fetching data from redis: %s", err.Error()), time.Now().String(), "High") @@ -111,11 +108,11 @@ func finalizeBatches(batchedKeys [][]string, epochId *big.Int, tree *imt.Increme log.Debugln(fmt.Sprintf("Processing key %s and value %s", key, val)) - if len(val) == 0 { - clients.SendFailureNotification("finalizeBatches", fmt.Sprintf("Value has expired for key, not being counted in batch: %s", key), time.Now().String(), "High") - log.Errorln("Value has expired for key: ", key) - continue - } + if len(val) == 0 { + clients.SendFailureNotification("finalizeBatches", fmt.Sprintf("Value has expired for key, not being counted in batch: %s", key), time.Now().String(), "High") + log.Errorln("Value has expired for key: ", key) + continue + } parts := strings.Split(key, ".") if len(parts) != 3 { @@ -173,7 +170,7 @@ func finalizeBatches(batchedKeys [][]string, epochId *big.Int, tree *imt.Increme log.Debugln("PIDs and CIDs for epoch: ", epochId, pids, cids) - batchSubmission, err := BuildBatch(allIds, allData, BatchId, epochId, tree, pids, cids) + batchSubmission, err := BuildBatch(allIds, allData, epochId, tree, pids, cids) if err != nil { clients.SendFailureNotification("finalizeBatches", fmt.Sprintf("Batch building error: %s", err.Error()), time.Now().String(), "High") log.Errorln("Error storing the batch: ", err.Error()) @@ -182,24 +179,18 @@ func finalizeBatches(batchedKeys [][]string, epochId *big.Int, tree *imt.Increme mu.Lock() batchSubmissions = append(batchSubmissions, batchSubmission) - BatchId++ mu.Unlock() - log.Debugf("CID: %s Batch: %d", batchSubmission.Cid, BatchId-1) + log.Debugf("CID: %s Epoch: %s", batchSubmission.Cid, epochId.String()) }(batch) } wg.Wait() - ids := []string{} - for _, bs := range batchSubmissions { - ids = append(ids, bs.Batch.ID.String()) - } // Set finalized batches in redis for epochId logEntry := map[string]interface{}{ "epoch_id": epochId.String(), "finalized_batches_count": len(batchSubmissions), - "finalized_batch_ids": ids, "timestamp": time.Now().Unix(), } @@ -252,7 +243,7 @@ func arrangeKeysInBatches(keys []string) [][]string { return batches } -func BuildBatch(dataIds, data []string, id int, epochId *big.Int, tree *imt.IncrementalMerkleTree, pids, cids []string) (*ipfs.BatchSubmission, error) { +func BuildBatch(dataIds, data []string, epochId *big.Int, tree *imt.IncrementalMerkleTree, pids, cids []string) (*ipfs.BatchSubmission, error) { log.Debugln("Building batch for epoch: ", epochId.String()) var err error _, err = UpdateMerkleTree(dataIds, tree) @@ -260,32 +251,31 @@ func BuildBatch(dataIds, data []string, id int, epochId *big.Int, tree *imt.Incr return nil, err } roothash := GetRootHash(tree) - log.Debugln("RootHash for batch ", id, roothash) - batch := &ipfs.Batch{ID: big.NewInt(int64(id)), SubmissionIds: dataIds, Submissions: data, RootHash: roothash, Pids: pids, Cids: cids} + log.Debugln("RootHash for batch in epoch", epochId.String(), roothash) + batch := &ipfs.Batch{SubmissionIds: dataIds, Submissions: data, RootHash: roothash, Pids: pids, Cids: cids} if cid, err := ipfs.StoreOnIPFS(ipfs.IPFSCon, batch); err != nil { - clients.SendFailureNotification("Build Batch", fmt.Sprintf("Error storing batch %d on IPFS: %s", id, err.Error()), time.Now().String(), "High") - log.Errorf("Error storing batch on IPFS: %d", id) + clients.SendFailureNotification("Build Batch", fmt.Sprintf("Error storing batch on IPFS: %s", err.Error()), time.Now().String(), "High") + log.Errorf("Error storing batch on IPFS: %s", err.Error()) return nil, err } else { - log.Debugln("Stored cid for batch ", id, cid) + log.Debugln("Stored cid for batch ", cid) // Set batch building success for epochId logEntry := map[string]interface{}{ "epoch_id": epochId.String(), - "batch_id": id, "batch_cid": cid, "submissions_count": len(data), "submissions": data, "timestamp": time.Now().Unix(), } - if err = redis.SetProcessLog(context.Background(), redis.TriggeredProcessLog(pkgs.BuildBatch, strconv.Itoa(id)), logEntry, 4*time.Hour); err != nil { + if err = redis.SetProcessLog(context.Background(), redis.TriggeredProcessLog(pkgs.BuildBatch, roothash), logEntry, 4*time.Hour); err != nil { clients.SendFailureNotification("BuildBatch", err.Error(), time.Now().String(), "High") log.Errorln("BuildBatch process log error: ", err.Error()) } cidTree, _ := imt.New() if _, err := UpdateMerkleTree(batch.Cids, cidTree); err != nil { - clients.SendFailureNotification("Build Batch", fmt.Sprintf("Error updating merkle tree for batch %d: %s", id, err.Error()), time.Now().String(), "High") + clients.SendFailureNotification("Build Batch", fmt.Sprintf("Error updating merkle tree for batch with roothash %s: %s", roothash, err.Error()), time.Now().String(), "High") log.Errorln("Unable to get finalized root hash: ", err.Error()) return nil, err } diff --git a/pkgs/helpers/prost/contract.go b/pkgs/helpers/prost/contract.go index 60faddb..0212b38 100644 --- a/pkgs/helpers/prost/contract.go +++ b/pkgs/helpers/prost/contract.go @@ -5,7 +5,6 @@ import ( "collector/pkgs" "collector/pkgs/contract" "collector/pkgs/helpers/clients" - "collector/pkgs/helpers/merkle" "collector/pkgs/helpers/redis" "context" "fmt" @@ -73,12 +72,6 @@ func PopulateStateVars() { CurrentEpochID.Set(big.NewInt(0)) } - if output, err := MustQuery[*big.Int](context.Background(), func() (*big.Int, error) { - return Instance.CurrentBatchId(&bind.CallOpts{}, config.SettingsObj.DataMarketContractAddress) - }); err == nil { - merkle.BatchId = int(output.Int64()) - } - if output, err := MustQuery[*big.Int](context.Background(), func() (*big.Int, error) { return Instance.EpochsInADay(&bind.CallOpts{}, config.SettingsObj.DataMarketContractAddress) }); err == nil { diff --git a/pkgs/helpers/prost/processor.go b/pkgs/helpers/prost/processor.go index 77ebe73..1eabaac 100644 --- a/pkgs/helpers/prost/processor.go +++ b/pkgs/helpers/prost/processor.go @@ -215,11 +215,10 @@ func triggerCollectionFlow(epochID *big.Int, headers []string, day *big.Int) { } // now send the actual batches by looping through them for _, batch := range batchSubmissions { - log.Debugln("Submitting batch with CID against batch ID and epoch ID", batch.Cid, batch.Batch.ID.String(), epochID.String()) + log.Debugln("Submitting batch with CID against epoch ID", batch.Cid, epochID.String()) clients.SubmitSubmissionBatch( config.SettingsObj.DataMarketAddress, batch.Cid, - batch.Batch.ID.String(), epochID, batch.Batch.Pids, batch.Batch.Cids, @@ -229,20 +228,7 @@ func triggerCollectionFlow(epochID *big.Int, headers []string, day *big.Int) { time.Sleep(time.Duration(config.SettingsObj.BlockTime*500) * time.Millisecond) } redis.ResetCollectorDBSubmissions(context.Background(), epochID, headers) - // ensure all transactions were included after waiting for new block - // log.Debugln("Verifying all batch submissions") - // txManager.EnsureBatchSubmissionSuccess(epochID) - // if count, err := redis.Get(context.Background(), redis.TransactionReceiptCountByEvent(epochID.String())); count != "" { - // log.Debugf("Transaction receipt fetches for epoch %s: %s", epochID.String(), count) - // n, _ := strconv.Atoi(count) - // if n > len(batchSubmissions)*3 { // giving upto 3 retries per txn - // clients.SendFailureNotification("EnsureBatchSubmissionSuccess", fmt.Sprintf("Too many transaction receipts fetched for epoch %s: %s", epochID.String(), count), time.Now().String(), "Medium") - // log.Errorf("Too many transaction receipts fetched for epoch %s: %s", epochID.String(), count) - // } - // } else if err != nil { - // clients.SendFailureNotification("Redis error", err.Error(), time.Now().String(), "High") - // log.Errorln("Redis error: ", err.Error()) - // } + redis.Delete(context.Background(), redis.TransactionReceiptCountByEvent(epochID.String())) // txManager.EndBatchSubmissionsForEpoch(epochID) diff --git a/pkgs/helpers/prost/txManager.go b/pkgs/helpers/prost/txManager.go index 601b86c..0278f07 100644 --- a/pkgs/helpers/prost/txManager.go +++ b/pkgs/helpers/prost/txManager.go @@ -4,23 +4,22 @@ import ( "collector/config" "collector/pkgs" "collector/pkgs/helpers/clients" - "collector/pkgs/helpers/ipfs" "collector/pkgs/helpers/redis" - "collector/pkgs/helpers/utils" "context" "encoding/json" "errors" "fmt" - "github.com/cenkalti/backoff/v4" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" - log "github.com/sirupsen/logrus" "math/big" "regexp" "strconv" "strings" "sync" "time" + + "github.com/cenkalti/backoff/v4" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + log "github.com/sirupsen/logrus" ) var txManager *TxManager @@ -95,113 +94,6 @@ func (tm *TxManager) GetTxReceipt(txHash common.Hash, identifier string) (*types return receipt, err } -func (tm *TxManager) CommitSubmissionBatches(batchSubmissions []*ipfs.BatchSubmission) { - var wg sync.WaitGroup - accounts := []*Account{} - var requiredAccounts int - - batchDivision := len(batchSubmissions) / config.SettingsObj.PermissibleBatchesPerAccount - if extra := len(batchSubmissions) % config.SettingsObj.PermissibleBatchesPerAccount; extra == 0 { - requiredAccounts = batchDivision - } else { - requiredAccounts = batchDivision + 1 - } - - // try to get required free accounts - for i := 0; i < requiredAccounts; i++ { - if account := tm.accountHandler.GetFreeAccount(false); account != nil { - accounts = append(accounts, account) - } - } - - if len(accounts) == 0 { - log.Warnln("All accounts are occupied, waiting for one account and proceeding with batch submissions") - accounts = append(accounts, tm.accountHandler.GetFreeAccount(true)) - } - - // divide evenly among available accounts - batchesPerAccount := len(batchSubmissions) / len(accounts) - var begin, end int - // send asynchronous batch submissions - for i := 0; i < len(accounts); i++ { - account := accounts[i] - - begin = i * batchesPerAccount - if i == len(accounts)-1 { - end = len(batchSubmissions) - } else { - end = (i + 1) * batchesPerAccount - } - batch := batchSubmissions[begin:end] - - wg.Add(1) - - // Process the batch asynchronously - go func(acc *Account, submissions []*ipfs.BatchSubmission) { - defer wg.Done() - - // Commit each submission batch in the current batch - for _, batchSubmission := range submissions { - tm.CommitSubmissionBatch(acc, batchSubmission) - acc.UpdateAuth(1) - } - - // Release the account after processing the batch - tm.accountHandler.ReleaseAccount(acc) - }(account, batch) - } - - // Wait for all goroutines to complete - wg.Wait() -} - -func (tm *TxManager) CommitSubmissionBatch(account *Account, batchSubmission *ipfs.BatchSubmission) { - var tx *types.Transaction - multiplier := 1 - nonce := account.auth.Nonce.String() - var err error - operation := func() error { - tx, err = Instance.SubmitSubmissionBatch(account.auth, config.SettingsObj.DataMarketContractAddress, batchSubmission.Cid, batchSubmission.Batch.ID, batchSubmission.EpochId, batchSubmission.Batch.Pids, batchSubmission.Batch.Cids, [32]byte(batchSubmission.FinalizedCidsRootHash)) - if err != nil { - multiplier = account.HandleTransactionError(err, multiplier, batchSubmission.Batch.ID.String()) - nonce = account.auth.Nonce.String() - return err - } - return nil - } - if err = backoff.Retry(operation, backoff.WithMaxRetries(backoffInstance, 7)); err != nil { - clients.SendFailureNotification("CommitSubmissionBatch", fmt.Sprintf("Batch %s submission for epoch %s failed after max retries: %s", batchSubmission.Batch.ID.String(), batchSubmission.EpochId.String(), err.Error()), time.Now().String(), "High") - log.Debugf("Batch %s submission for epoch %s failed after max retries: ", batchSubmission.Batch.ID.String(), batchSubmission.EpochId.String()) - return - } - key := redis.BatchSubmissionKey(batchSubmission.Batch.ID.String(), nonce) - batchSubmissionBytes, err := json.Marshal(batchSubmission) - if err != nil { - clients.SendFailureNotification("CommitSubmissionBatch", fmt.Sprintf("Unable to marshal ipfsBatchSubmission: %s", err.Error()), time.Now().String(), "High") - log.Errorln("Unable to marshal ipfsBatchSubmission: ", err.Error()) - } - value := fmt.Sprintf("%s.%s", tx.Hash().Hex(), string(batchSubmissionBytes)) - set := redis.BatchSubmissionSetByEpoch(batchSubmission.EpochId.String()) - err = redis.SetSubmission(context.Background(), key, value, set, 5*time.Minute) - if err != nil { - clients.SendFailureNotification("Redis error", err.Error(), time.Now().String(), "High") - log.Debugln("Redis error: ", err.Error()) - } - log.Debugf("Successfully submitted batch %s with nonce %s, gasPrice %s, tx: %s\n", batchSubmission.Batch.ID.String(), nonce, account.auth.GasPrice.String(), tx.Hash().Hex()) - logEntry := map[string]interface{}{ - "epoch_id": batchSubmission.EpochId.String(), - "batch_id": batchSubmission.Batch.ID, - "tx_hash": tx.Hash().Hex(), - "signer": account.auth.From.Hex(), - "nonce": nonce, - "timestamp": time.Now().Unix(), - } - if err = redis.SetProcessLog(context.Background(), redis.TriggeredProcessLog(pkgs.CommitSubmissionBatch, batchSubmission.Batch.ID.String()), logEntry, 4*time.Hour); err != nil { - clients.SendFailureNotification("CommitSubmissionBatch", err.Error(), time.Now().String(), "High") - log.Errorf("CommitSubmissionBatch process log error: %s ", err.Error()) - } -} - func (tm *TxManager) BatchUpdateRewards(day *big.Int) []string { slotIds := []*big.Int{} submissions := []*big.Int{} @@ -468,160 +360,6 @@ func (tm *TxManager) EnsureRewardUpdateSuccess(day *big.Int) { } } -func (tm *TxManager) EnsureBatchSubmissionSuccess(epochID *big.Int) { - account := tm.accountHandler.GetFreeAccount(true) - defer tm.accountHandler.ReleaseAccount(account) - - txSet := redis.BatchSubmissionSetByEpoch(epochID.String()) - resubmissionIterations := 0 - for { - resubmissionIterations += 1 - keys := redis.RedisClient.SMembers(context.Background(), txSet).Val() - if len(keys) == 0 { - log.Debugln("No transactions remaining for epochID: ", epochID.String()) - if _, err := redis.RedisClient.Del(context.Background(), txSet).Result(); err != nil { - log.Errorf("Unable to delete transaction from redis: %s\n", err.Error()) - } - return - } - if resubmissionIterations > pkgs.MaxBatchRetries { - clients.SendFailureNotification("EnsureBatchSubmissionSuccess", fmt.Sprintf("Reached max retry iterations for epoch: %s", epochID.String()), time.Now().String(), "High") - log.Errorf("Reached max retry iterations for epoch: %s . Batches not confirmed: ", epochID.String()) - for _, key := range keys { - if _, err := redis.RedisClient.Del(context.Background(), key).Result(); err != nil { - log.Errorf("Unable to delete transaction from redis: %s\n", err.Error()) - } - if err := redis.RemoveFromSet(context.Background(), txSet, key); err != nil { - log.Errorf("Unable to delete transaction from transaction set: %s\n", err.Error()) - } - } - if _, err := redis.RedisClient.Del(context.Background(), txSet).Result(); err != nil { - log.Errorf("Unable to delete transaction from redis: %s\n", err.Error()) - } - return - } - log.Debugf("Fetched %d transactions for epoch %d", len(keys), epochID) - for _, key := range keys { - if value, err := redis.Get(context.Background(), key); err != nil { - clients.SendFailureNotification("EnsureBatchSubmissionSuccess", fmt.Sprintf("Unable to fetch value for key: %s\n", key), time.Now().String(), "High") - log.Errorf("Unable to fetch value for key: %s\n", key) - if _, err := redis.RedisClient.Del(context.Background(), key).Result(); err != nil { - log.Errorf("Unable to delete transaction from redis: %s\n", err.Error()) - } - if err = redis.RemoveFromSet(context.Background(), txSet, key); err != nil { - log.Errorf("Unable to delete transaction from transaction set: %s\n", err.Error()) - } - continue - } else { - vals := strings.Split(value, ".") - if len(vals) < 2 { - clients.SendFailureNotification("EnsureBatchSubmissionSuccess", fmt.Sprintf("txKey %s value in redis should have 2 parts: %s", key, value), time.Now().String(), "High") - log.Errorf("txKey %s value in redis should have 2 parts: %s", key, value) - if _, err := redis.RedisClient.Del(context.Background(), key).Result(); err != nil { - log.Errorf("Unable to delete transaction from redis: %s\n", err.Error()) - } - if err = redis.RemoveFromSet(context.Background(), txSet, key); err != nil { - log.Errorf("Unable to delete transaction from transaction set: %s\n", err.Error()) - } - continue - } - batchSubmission := &ipfs.BatchSubmission{} - tx := vals[0] - err = json.Unmarshal([]byte(vals[1]), batchSubmission) - if err != nil { - clients.SendFailureNotification("EnsureBatchSubmissionSuccess", fmt.Sprintf("Unable to unmarshal ipfsBatchSubmission %s: %s", vals[1], err.Error()), time.Now().String(), "High") - log.Errorln("Unable to unmarshal ipfsBatchSubmission: ", err.Error()) - if _, err := redis.RedisClient.Del(context.Background(), key).Result(); err != nil { - log.Errorf("Unable to delete transaction from redis: %s\n", err.Error()) - } - if err = redis.RemoveFromSet(context.Background(), txSet, key); err != nil { - log.Errorf("Unable to delete transaction from transaction set: %s\n", err.Error()) - } - continue - } - nonce := strings.Split(key, ".")[1] - multiplier := 1 - if receipt, err := tm.GetTxReceipt(common.HexToHash(tx), epochID.String()); err != nil { - if receipt != nil { - clients.SendFailureNotification("EnsureBatchSubmissionSuccess", fmt.Sprintf("GetTxReceipt error for %s : %s", tx, err.Error()), time.Now().String(), "Low") - log.Debugf("GetTxReceipt error for tx %s: %s", tx, err.Error()) - } - log.Errorf("Found unsuccessful transaction %s: %s, batchID: %d, nonce: %s", tx, err, batchSubmission.Batch.ID, nonce) - updatedNonce := account.auth.Nonce.String() - if err = account.UpdateGasPrice(context.Background(), multiplier); err != nil { - log.Debugln("Unable to update gas price: ", err.Error()) - } - var reTx *types.Transaction - operation := func() error { - reTx, err = Instance.SubmitSubmissionBatch(account.auth, config.SettingsObj.DataMarketContractAddress, batchSubmission.Cid, batchSubmission.Batch.ID, epochID, batchSubmission.Batch.Pids, batchSubmission.Batch.Cids, [32]byte(batchSubmission.FinalizedCidsRootHash)) - if err != nil { - multiplier = account.HandleTransactionError(err, multiplier, batchSubmission.Batch.ID.String()) - updatedNonce = account.auth.Nonce.String() - return err - } - return nil - } - if err = backoff.Retry(operation, backoff.WithMaxRetries(backoffInstance, 5)); err != nil { - clients.SendFailureNotification("EnsureBatchSubmissionSuccess", fmt.Sprintf("Resubmission for batch %s failed: %s", batchSubmission.Batch.ID.String(), err.Error()), time.Now().String(), "High") - log.Debugf("Resubmission for batch %s failed: ", batchSubmission.Batch.ID.String()) - return - } - txKey := redis.BatchSubmissionKey(batchSubmission.Batch.ID.String(), updatedNonce) - batchSubmissionBytes, err := json.Marshal(batchSubmission) - if err != nil { - clients.SendFailureNotification("EnsureBatchSubmissionSuccess", fmt.Sprintf("Unable to marshal ipfsBatchSubmission: %s", err.Error()), time.Now().String(), "High") - } - txValue := fmt.Sprintf("%s.%s", reTx.Hash().Hex(), string(batchSubmissionBytes)) - if err = redis.SetSubmission(context.Background(), txKey, txValue, txSet, time.Hour); err != nil { - clients.SendFailureNotification("Redis error", err.Error(), time.Now().String(), "High") - log.Errorln("Redis ipfs error: ", err.Error()) - return - } - logEntry := map[string]interface{}{ - "epoch_id": epochID.String(), - "batch_id": batchSubmission.Batch.ID.String(), - "tx_hash": reTx.Hash().Hex(), - "signer": account.auth.From.Hex(), - "nonce": updatedNonce, - "timestamp": time.Now().Unix(), - } - - existingLog, _ := redis.Get(context.Background(), redis.TriggeredProcessLog(pkgs.EnsureBatchSubmissionSuccess, batchSubmission.Batch.ID.String())) - if existingLog == "" { - redis.SetProcessLog(context.Background(), redis.TriggeredProcessLog(pkgs.EnsureBatchSubmissionSuccess, batchSubmission.Batch.ID.String()), logEntry, 4*time.Hour) - } else { // append to existing log entry with another log entry - existingEntries := make(map[string]interface{}) - err = json.Unmarshal([]byte(existingLog), &existingEntries) - if err != nil { - clients.SendFailureNotification("UpdateBatchResubmissionProcessLog", fmt.Sprintf("Unable to unmarshal log entry for resubmission of batch %s: %s", batchSubmission.Batch.ID.String(), err.Error()), time.Now().String(), "High") - log.Errorln("Unable to unmarshal log entry for resubmission of batch: ", batchSubmission.Batch.ID.String()) - redis.SetProcessLog(context.Background(), redis.TriggeredProcessLog(pkgs.EnsureBatchSubmissionSuccess, batchSubmission.Batch.ID.String()), logEntry, 4*time.Hour) - } else { - utils.AppendToLogEntry(existingEntries, "tx_hash", reTx.Hash().Hex()) - utils.AppendToLogEntry(existingEntries, "nonce", updatedNonce) - utils.AppendToLogEntry(existingEntries, "timestamp", time.Now().Unix()) - - redis.SetProcessLog(context.Background(), redis.TriggeredProcessLog(pkgs.EnsureBatchSubmissionSuccess, batchSubmission.Batch.ID.String()), existingEntries, 4*time.Hour) - } - } - } else { - if receipt != nil && receipt.Status == types.ReceiptStatusFailed { - clients.SendFailureNotification("EnsureBatchSubmissionSuccess", fmt.Sprintf(fmt.Sprintf("BatchSubmissionSuccess execution failed: %s", tx)), time.Now().String(), "Low") - log.Debugln("Fetched receipt for unsuccessful tx: ", tx) - } - } - if _, err := redis.RedisClient.Del(context.Background(), key).Result(); err != nil { - log.Errorf("Unable to delete transaction from redis: %s\n", err.Error()) - } - if err = redis.RemoveFromSet(context.Background(), txSet, key); err != nil { - log.Errorf("Unable to delete transaction from transaction set: %s\n", err.Error()) - } - } - } - time.Sleep(time.Second * time.Duration(config.SettingsObj.BlockTime*3)) - } -} - func extractNonceFromError(err string) (uint64, bool) { regex := regexp.MustCompile(`nonce too low: next nonce (\d+), tx nonce \d+`) matches := regex.FindStringSubmatch(err)