-
Notifications
You must be signed in to change notification settings - Fork 1
/
mflow.go
81 lines (68 loc) · 2.1 KB
/
mflow.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package main
import (
"encoding/json"
"fmt"
"sort"
"strings"
log "github.com/Sirupsen/logrus"
prom "github.com/prometheus/client_golang/prometheus"
)
type labelNameValue struct {
Name string `json:"name"`
Value string `json:"value"`
}
type metricUpdateEvent struct {
Name string `json:"name"`
LabelNameValuePairs []labelNameValue `json:"labelNameValuePairs"`
Value float64 `json:"value"`
}
// sort.Interface
type byLabelName []labelNameValue
func (a byLabelName) Len() int { return len(a) }
func (a byLabelName) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a byLabelName) Less(i, j int) bool { return a[i].Name < a[j].Name }
func sortedLabelNames(labelNameValuePairs []labelNameValue) []string {
sort.Sort(byLabelName(labelNameValuePairs))
lnames := make([]string, len(labelNameValuePairs))
for i, nvPair := range labelNameValuePairs {
lnames[i] = nvPair.Name
}
return lnames
}
func toPrometheusLabels(labelNameValuePairs []labelNameValue) prom.Labels {
labels := make(map[string]string)
for _, nvp := range labelNameValuePairs {
labels[nvp.Name] = nvp.Value
}
return labels
}
func toMetricInfo(event *metricUpdateEvent, ts int64) (*metricInfo, error) {
nameItems := strings.Split(event.Name, "_")
l := len(nameItems)
if l < 2 {
return nil, fmt.Errorf("Metric %s does not have type suffix", event.Name)
}
return &metricInfo{
name: strings.Join(nameItems[:l-1], "_"),
aggrType: nameItems[l-1],
value: event.Value,
labelNames: sortedLabelNames(event.LabelNameValuePairs),
labels: toPrometheusLabels(event.LabelNameValuePairs),
timestamp: ts,
}, nil
}
func handleIncomingMessage(minfoChan chan *metricInfo,
msgData []byte, msgTs int64) {
var event metricUpdateEvent
if err := json.Unmarshal(msgData, &event); err != nil {
log.Warnf("Failed to unmarshal message: %v (orig: %v)", err, string(msgData))
} else {
minfo, err := toMetricInfo(&event, msgTs)
if err == nil {
minfoChan <- minfo
} else {
log.Warnf("Failed to parse metric update event: %v (orig: %v)",
err, string(msgData))
}
}
}