-
Notifications
You must be signed in to change notification settings - Fork 0
/
state_map_connection.go
101 lines (86 loc) · 2.57 KB
/
state_map_connection.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package stagelinq
import (
"encoding/json"
"net"
"strings"
)
// State represents a received state value.
type State struct {
Name string
Value map[string]interface{}
}
// StateMapConnection provides functionality to communicate with the StateMap data source.
type StateMapConnection struct {
conn *messageConnection
errC chan error
stateC chan *State
}
var stateMapConnectionMessageSet = newDeviceConnMessageSet([]message{
&stateEmitMessage{},
})
// NewStateMapConnection wraps an existing network connection and returns a StateMapConnection, providing the functionality to subscribe to and receive changes of state values.
// You need to pass the token that you have announced for your own device on the network.
func NewStateMapConnection(conn net.Conn, token Token) (smc *StateMapConnection, err error) {
msgConn := newMessageConnection(conn, stateMapConnectionMessageSet)
errC := make(chan error, 1)
stateC := make(chan *State, 1)
stateMapConn := &StateMapConnection{
conn: msgConn,
errC: errC,
stateC: stateC,
}
// Before we do anything else, we announce our TCP source port in-protocol.
// I have observed SoundSwitch and Resolume doing this, don't know what the purpose is though.
msgConn.WriteMessage(&serviceAnnouncementMessage{
tokenPrefixedMessage: tokenPrefixedMessage{
Token: token,
},
Service: "StateMap",
Port: uint16(getPort(conn.LocalAddr())),
})
go func() {
var err error
defer func() {
if err != nil {
stateMapConn.errC <- err
close(stateMapConn.errC)
}
close(stateMapConn.stateC)
}()
for {
var msg message
msg, err = msgConn.ReadMessage()
if err != nil {
return
}
switch v := msg.(type) {
case *stateEmitMessage:
state := &State{
Name: v.Name,
}
err = json.NewDecoder(strings.NewReader(v.JSON)).Decode(&state.Value)
if err != nil {
return
}
stateC <- state
}
}
}()
smc = stateMapConn
return
}
// Subscribe tells the StagelinQ device to let us know about changes for the given state value.
func (smc *StateMapConnection) Subscribe(event string) error {
// TODO - check what to do with the int field in the state subscribe message, what is that?
return smc.conn.WriteMessage(&stateSubscribeMessage{
Name: event,
})
}
// StateC returns the channel via which state changes will be returned for this connection.
func (smc *StateMapConnection) StateC() <-chan *State {
return smc.stateC
}
// ErrorC returns the channel via which connectionrerors will be returned for this connection.
func (smc *StateMapConnection) ErrorC() <-chan error {
return smc.errC
}