forked from stith/gorelp
-
Notifications
You must be signed in to change notification settings - Fork 0
/
server.go
155 lines (133 loc) · 2.92 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package relp
import (
"bufio"
"bytes"
"fmt"
"io"
"log"
"net"
)
/*
Server - Contains info about the RELP listener
MessageChannel - Emits messages as they are received
*/
type Server struct {
MessageChannel chan ServerMessage
AutoAck bool
listener net.Listener
done bool
}
type ServerMessage struct {
Message
// true if the message has been acked
Acked bool
// Used internally for acking.
source net.Conn
}
// NewServer - Fire up a server to accept connections and emit messages
// Returns a Server
func NewServer(host string, port int, autoAck bool) (server Server, err error) {
server.listener, err = net.Listen("tcp", fmt.Sprintf("%s:%d", host, port))
server.AutoAck = autoAck
if err != nil {
return server, err
}
server.MessageChannel = make(chan ServerMessage)
go acceptConnections(server)
return server, nil
}
// Close - Stops listening for connections and closes the message channel
func (s Server) Close() {
s.done = true
s.listener.Close()
close(s.MessageChannel)
}
func handleConnection(conn net.Conn, server Server) {
var message ServerMessage
buffer := new(bytes.Buffer)
var err error
reader := bufio.NewReader(conn)
defer conn.Close()
for {
message.Message, err = readMessage(reader)
message.Acked = false
if err != nil && err != io.EOF {
log.Println(err)
continue
}
message.source = conn
response := Message{
Txn: message.Txn,
Command: "rsp",
}
switch message.Command {
case "open":
var dataBuffer bytes.Buffer
dataBuffer.WriteString("200 OK\n")
dataBuffer.WriteString(defaultOffer)
response.Data = dataBuffer.Bytes()
_, err := response.send(buffer, message.source)
if err != nil {
log.Println(err)
return
}
case "syslog":
server.MessageChannel <- message
if server.AutoAck {
err := message.Ack()
if err != nil {
fmt.Println("Error sending syslog ok:", err)
return
}
}
case "close":
fmt.Println("Got a close, closing!")
return
default:
log.Println("Got unknown command:", message.Command)
response.Data = []byte("500 ERR")
_, err := response.send(buffer, message.source)
if err != nil {
log.Println("Error sending 500 ERR:", err)
return
}
}
buffer.Reset()
}
}
func acceptConnections(server Server) {
for {
conn, err := server.listener.Accept()
if err != nil {
return
}
if conn != nil {
go handleConnection(conn, server)
}
if server.done {
return
}
}
}
// Ack - Acknowledges a message
func (m *ServerMessage) Ack() (err error) {
if m.Acked {
return fmt.Errorf("Called Ack on already-acknowledged message %d.", m.Txn)
}
if m.source == nil {
// If the source connection is gone, we don't need to do any work.
return nil
}
buffer := new(bytes.Buffer)
ackMessage := Message{
Txn: m.Txn,
Command: "rsp",
Data: []byte("200 OK"),
}
_, err = ackMessage.send(buffer, m.source)
if err != nil {
return err
}
m.Acked = true
return
}