-
Notifications
You must be signed in to change notification settings - Fork 0
/
subscriber.go
98 lines (85 loc) · 2.38 KB
/
subscriber.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package gaudius
import (
"context"
"math/big"
"time"
"github.com/alecsavvy/gaudius/gen/contracts"
"github.com/ethereum/go-ethereum/common"
)
// params for the subscriber function
type ScannerParams struct {
// block to start with
// leave empty to start at current block
StartBlock *uint64
// block to end subscription
// leave empty to continue endlessly
EndBlock *uint64
}
// method to receive events from a range of blocks (and/or future events)
func (sdk *AudiusSdk) EventScanner(params *ScannerParams) (<-chan *contracts.EntityManagerManageEntity, chan bool) {
ctx := context.TODO()
// set start block
var start uint64
if params != nil && params.StartBlock != nil {
start = uint64(*params.StartBlock)
} else {
current, _ := sdk.AcdcClient.BlockByNumber(ctx, nil)
start = current.NumberU64() - 1
}
ch := make(chan *contracts.EntityManagerManageEntity)
stopSignal := make(chan bool)
go func() {
defer close(ch)
current := int64(start)
select {
case <-stopSignal:
return
default:
for {
block, err := sdk.AcdcClient.BlockByNumber(ctx, big.NewInt(current))
if err != nil && err.Error() == "not found" || block == nil {
// give a few seconds for main chain
// to get ahead of scanner
<-await(2 * time.Second)
continue
}
txs := block.Transactions()
for _, tx := range txs {
if tx == nil {
continue
}
receipt, _ := sdk.AcdcClient.TransactionReceipt(ctx, tx.Hash())
logs := receipt.Logs
for _, log := range logs {
event, _ := sdk.EntityManager.ParseManageEntity(*log)
ch <- event
}
}
// offset next block being mined
<-await(500 * time.Millisecond)
current += 1
}
}
}()
return ch, stopSignal
}
// method to receive live stream of all current events
func (sdk *AudiusSdk) EventSubscriber() (<-chan *contracts.EntityManagerManageEntity, chan bool) {
return sdk.EventScanner(nil)
}
// method that listens to current events until a certain transaction hash is found
func (sdk *AudiusSdk) TxSubscriber(sp *ScannerParams, tx common.Hash) <-chan *contracts.EntityManagerManageEntity {
ch := make(chan *contracts.EntityManagerManageEntity)
scanner, stopper := sdk.EventScanner(sp)
go func() {
// currently we only have one log per tx
for event := range scanner {
if event.Raw.TxHash == tx {
close(stopper)
ch <- event
close(ch)
}
}
}()
return ch
}