-
Notifications
You must be signed in to change notification settings - Fork 5
/
receiver.go
77 lines (68 loc) · 2.12 KB
/
receiver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package mqtt // import "gosrc.io/mqtt"
import (
"io"
"log"
)
type receiver struct {
// Connection to read the MQTT data from
conn io.Reader
// sender is the struct managing the go routine to send
sender sender
// Channel to send back message received (PUBLISH control packets) to the client using the library
messageChannel chan<- Message
// Channel to send back QOS packet (acks) to the internal client process.
qosChannel chan<- QOSResponse
}
// Receiver actually need:
// - Net.conn
// - Sender (to send ack packet when packets requiring acks are received)
// - Error send channel to trigger teardown
// - MessageSendChannel to dispatch messages to client
// Returns teardown channel used to notify when the receiver terminates.
func spawnReceiver(conn io.Reader, messageChannel chan<- Message, s sender) <-chan QOSResponse {
qosChannel := make(chan QOSResponse)
go receiverLoop(conn, qosChannel, messageChannel, s)
return qosChannel
}
// Receive, decode and dispatch messages to the message channel
func receiverLoop(conn io.Reader, qosChannel chan<- QOSResponse, message chan<- Message, s sender) {
var p Marshaller
var err error
Loop:
for {
if p, err = PacketRead(conn); err != nil {
if err == io.EOF {
log.Printf("Connection closed\n")
}
log.Printf("packet read error: %q\n", err)
break Loop
}
// fmt.Printf("Received: %+v\n", p)
sendAckIfNeeded(p, s)
// Only broadcast message back to client when we receive publish packets
switch packetType := p.(type) {
case PublishPacket:
m := Message{}
m.Topic = packetType.Topic
m.Payload = packetType.Payload
message <- m // TODO Back pressure. We may block on processing message if client does not read fast enough. Make sure we can quit.
default:
if ResponsePacket, ok := p.(QOSResponse); ok {
qosChannel <- ResponsePacket
}
}
}
// Loop ended, send receiver close signal
close(qosChannel)
}
// Send acks if needed, depending on packet QOS
func sendAckIfNeeded(pkt Marshaller, s sender) {
switch p := pkt.(type) {
case PublishPacket:
if p.Qos == 1 {
puback := PubAckPacket{ID: p.ID}
buf := puback.Marshall()
s.send(buf)
}
}
}