From 89a6bbfcc2abdf79576f95c6e9f6fe8a56c3a2de Mon Sep 17 00:00:00 2001 From: Preetam Jinka Date: Thu, 1 Jan 2015 10:31:00 -0500 Subject: [PATCH 1/4] Add WIP of v0.0.1-rev5 --- cistern.go | 94 ++++++++++++++++++----------------- config/config.go | 14 +++++- device/device.go | 107 ++++++++++++++++++++++++++++++++++++++++ device/device_test.go | 9 ++++ device/registry.go | 61 +++++++++++++++++++++++ device/registry_test.go | 17 +++++++ net/sflow | 2 +- net/snmp | 2 +- version.go | 2 +- 9 files changed, 259 insertions(+), 49 deletions(-) create mode 100644 device/device.go create mode 100644 device/device_test.go create mode 100644 device/registry.go create mode 100644 device/registry_test.go diff --git a/cistern.go b/cistern.go index 429cb9c..68aed9e 100644 --- a/cistern.go +++ b/cistern.go @@ -1,15 +1,19 @@ package main import ( + "encoding/json" "flag" + "fmt" "log" + "net" + "os" "github.com/PreetamJinka/udpchan" "github.com/PreetamJinka/cistern/api" "github.com/PreetamJinka/cistern/config" "github.com/PreetamJinka/cistern/decode" - "github.com/PreetamJinka/cistern/net/snmp" + "github.com/PreetamJinka/cistern/device" "github.com/PreetamJinka/cistern/pipeline" "github.com/PreetamJinka/cistern/state/metrics" "github.com/PreetamJinka/cistern/state/series" @@ -19,75 +23,75 @@ var ( sflowListenAddr = ":6343" apiListenAddr = ":8080" configFile = "/opt/cistern/config.json" - - descOid = snmp.MustParseOID(".1.3.6.1.2.1.1.1.0") - hostnameOid = snmp.MustParseOID(".1.3.6.1.2.1.1.5.0") ) func main() { + + // Flags flag.StringVar(&sflowListenAddr, "sflow-listen-addr", sflowListenAddr, "listen address for sFlow datagrams") flag.StringVar(&apiListenAddr, "api-listen-addr", apiListenAddr, "listen address for HTTP API server") flag.StringVar(&configFile, "config", configFile, "configuration file") + showVersion := flag.Bool("version", false, "Show version") flag.Parse() + log.SetFlags(log.Lshortfile | log.Lmicroseconds) + + if *showVersion { + fmt.Println("Cistern version", version) + os.Exit(0) + } + log.Printf("Cistern version %s starting", version) - log.Printf("Loading configuration file at %s", configFile) + log.Printf("Attempting to load configuration file at %s", configFile) conf, err := config.Load(configFile) if err != nil { - log.Print(err) + log.Printf("Could not load configuration: %v", err) } - for _, device := range conf.SNMPDevices { - // TODO: refactor this part out - go func(dev config.SNMPEntry) { - session, err := snmp.NewSession(dev.Address, dev.User, dev.AuthPassphrase, dev.PrivPassphrase) - if err != nil { - log.Println(err) - return - } + // Log the loaded config + confBytes, err := json.MarshalIndent(conf, " ", " ") + if err != nil { + log.Println("Could not log config:", err) + } else { + log.Println("\n " + string(confBytes)) + } - err = session.Discover() - if err != nil { - log.Printf("[SNMP] Discovery failed for %s", dev.Address) - return - } + registry := device.NewRegistry() + for _, dev := range conf.Devices { + + ip := net.ParseIP(dev.IP) + // Add a device to the registry + registryDev := registry.LookupOrAdd(ip) - resp, err := session.Get(descOid) - if err != nil { - log.Printf("[SNMP] Get desc failed for %s", dev.Address) - return + if dev.SNMP != nil { + // We have an SNMP config + addr := ip.String() + if ip.To4() == nil { + // IPv6 addresses need to be surrounded + // with `[` and `]`. + addr = "[" + addr + "]" } - deviceDesc := "" + port := 161 - if vbinds := resp.Varbinds(); len(vbinds) > 0 { - deviceDesc, err = vbinds[0].GetStringValue() - if err != nil { - log.Printf("[SNMP] Did not get a string value for device description for %s", dev.Address) - return - } + if dev.SNMP.Port != 0 { + port = dev.SNMP.Port } - resp, err = session.Get(hostnameOid) - if err != nil { - log.Printf("[SNMP] Get hostname failed for %s", dev.Address) - return - } + addr = fmt.Sprintf("%s:%d", addr, port) - deviceHostname := "" + err = registryDev.SetSNMP(addr, dev.SNMP.User, dev.SNMP.AuthPassphrase, dev.SNMP.PrivPassphrase) + if err == nil { + log.Println("Successfully created SNMP session with", addr) + log.Println("Starting device discovery") - if vbinds := resp.Varbinds(); len(vbinds) > 0 { - deviceHostname, err = vbinds[0].GetStringValue() - if err != nil { - log.Printf("[SNMP] Did not get a string value for device hostname for %s", dev.Address) - return - } + registryDev.Discover() + } else { + log.Printf("SNMP session creation failed for %v: %v", addr, err) } - - log.Printf("[SNMP] Discovery\n at %s [%s]:\n %s", dev.Address, deviceHostname, deviceDesc) - }(device) + } } // start listening diff --git a/config/config.go b/config/config.go index 08d1de1..818c310 100644 --- a/config/config.go +++ b/config/config.go @@ -6,7 +6,19 @@ import ( ) type Configuration struct { - SNMPDevices []SNMPEntry `json:"snmpDevices"` + Devices []ConfigDevice `json:"devices"` +} + +type ConfigDevice struct { + IP string `json:"ip"` + SNMP *ConfigDeviceSNMP `json:"snmp"` +} + +type ConfigDeviceSNMP struct { + Port int `json:"port"` + User string `json:"user"` + AuthPassphrase string `json:"auth"` + PrivPassphrase string `json:"priv"` } type SNMPEntry struct { diff --git a/device/device.go b/device/device.go new file mode 100644 index 0000000..921380c --- /dev/null +++ b/device/device.go @@ -0,0 +1,107 @@ +package device + +import ( + "log" + "net" + "sync" + + "github.com/PreetamJinka/cistern/net/snmp" +) + +type deviceType int + +const ( + TypeUnknown deviceType = 0 + + TypeNetwork deviceType = 1 << (iota - 1) + TypeLinux +) + +var ( + descOid = snmp.MustParseOID(".1.3.6.1.2.1.1.1.0") + hostnameOid = snmp.MustParseOID(".1.3.6.1.2.1.1.5.0") +) + +// A Device is an entity that sends flows or +// makes information available via SNMP. +type Device struct { + hostname string + desc string + ip net.IP + snmpSession *snmp.Session +} + +// NewDevice returns a new Device with the given IP. +func NewDevice(address net.IP) *Device { + return &Device{ + ip: address, + } +} + +func (d *Device) SetSNMP(address, user, auth, priv string) error { + sess, err := snmp.NewSession(address, user, auth, priv) + if err != nil { + return err + } + + err = sess.Discover() + if err != nil { + return err + } + + d.snmpSession = sess + return nil +} + +func (d *Device) Discover() { + wg := sync.WaitGroup{} + + wg.Add(2) + go func() { + defer wg.Done() + + // Discover hostname + getRes, err := d.snmpSession.Get(hostnameOid) + if err != nil { + log.Printf("[SNMP %v] Could not get hostname: %v", d.ip, err) + return + } + + if vbinds := getRes.Varbinds(); len(vbinds) > 0 { + hostnameStr, err := vbinds[0].GetStringValue() + if err != nil { + log.Printf("[SNMP %v] Invalid GetResponse for hostname: %v", d.ip, err) + return + } + + d.hostname = hostnameStr + + log.Printf("[SNMP %v] Discovered hostname %s", d.ip, hostnameStr) + } + }() + + go func() { + defer wg.Done() + + // Discover description + getRes, err := d.snmpSession.Get(descOid) + if err != nil { + log.Printf("[SNMP %v] Could not get description: %v", d.ip, err) + return + } + + if vbinds := getRes.Varbinds(); len(vbinds) > 0 { + descStr, err := vbinds[0].GetStringValue() + if err != nil { + log.Printf("[SNMP %v] Invalid GetResponse for description: %v", d.ip, err) + return + } + + d.desc = descStr + + log.Printf("[SNMP %v] Discovered desc %s", d.ip, descStr) + } + }() + + wg.Wait() +} diff --git a/device/device_test.go b/device/device_test.go new file mode 100644 index 0000000..2526360 --- /dev/null +++ b/device/device_test.go @@ -0,0 +1,9 @@ +package device + +import ( + "testing" +) + +func TestDevice1(t *testing.T) { + +} diff --git a/device/registry.go b/device/registry.go new file mode 100644 index 0000000..4b4dc22 --- /dev/null +++ b/device/registry.go @@ -0,0 +1,61 @@ +package device + +import ( + "net" + "sync" +) + +// A Registry is a device registry. +type Registry struct { + + // devices is a map from IP address string to Device. + devices map[string]*Device + + sync.Mutex +} + +// NewRegistry creates a new device registry. +func NewRegistry() *Registry { + return &Registry{ + devices: map[string]*Device{}, + } +} + +// Devices returns a slice of devices in the registry. +func (r *Registry) Devices() []*Device { + r.Lock() + defer r.Unlock() + + devices := []*Device{} + + for _, device := range r.devices { + devices = append(devices, device) + } + + return devices +} + +// NumDevices returns the number of devices in the registry. +func (r *Registry) NumDevices() int { + return len(r.devices) +} + +func (r *Registry) Lookup(address net.IP) (*Device, bool) { + dev, present := r.devices[address.String()] + + return dev, present +} + +func (r *Registry) LookupOrAdd(address net.IP) *Device { + r.Lock() + defer r.Unlock() + + dev, present := r.Lookup(address) + if present { + return dev + } + + dev = NewDevice(address) + r.devices[address.String()] = dev + return dev +} diff --git a/device/registry_test.go b/device/registry_test.go new file mode 100644 index 0000000..f05c119 --- /dev/null +++ b/device/registry_test.go @@ -0,0 +1,17 @@ +package device + +import ( + "net" + "testing" +) + +func TestRegistry(t *testing.T) { + reg := NewRegistry() + + t.Log(reg.Lookup(net.ParseIP("127.0.0.1"))) + + dev := NewDevice(net.ParseIP("127.0.0.1")) + + reg.devices[dev.ip.String()] = dev + t.Log(reg.Lookup(net.ParseIP("127.0.0.1"))) +} diff --git a/net/sflow b/net/sflow index 7b8125f..f9e46b7 160000 --- a/net/sflow +++ b/net/sflow @@ -1 +1 @@ -Subproject commit 7b8125fe90b65d1e5c4a680b656cb7797ee9e7b4 +Subproject commit f9e46b71ca5cc52ecc2b64d5d9f67c88dc51b257 diff --git a/net/snmp b/net/snmp index e69491b..1377da8 160000 --- a/net/snmp +++ b/net/snmp @@ -1 +1 @@ -Subproject commit e69491be0103a67b00fc4e1f0aecf23f836b848d +Subproject commit 1377da800f05c1f74a02a0685c3c34cb344258ba diff --git a/version.go b/version.go index c13feb8..e09062f 100644 --- a/version.go +++ b/version.go @@ -1,3 +1,3 @@ package main -const version = "0.0.1-rev4" +const version = "0.0.1-rev5" From bb6ea91c9d34dbbcc7db5a84ecc26a91f3de6207 Mon Sep 17 00:00:00 2001 From: Preetam Jinka Date: Sat, 10 Jan 2015 12:02:23 -0500 Subject: [PATCH 2/4] Update SNMP --- cistern.go | 8 ++++++-- device/device.go | 15 --------------- device/registry.go | 33 ++++++++++++++++++++++++++++++--- net/snmp | 2 +- 4 files changed, 37 insertions(+), 21 deletions(-) diff --git a/cistern.go b/cistern.go index 68aed9e..8153449 100644 --- a/cistern.go +++ b/cistern.go @@ -58,7 +58,11 @@ func main() { log.Println("\n " + string(confBytes)) } - registry := device.NewRegistry() + registry, err := device.NewRegistry() + if err != nil { + log.Fatal(err) + } + for _, dev := range conf.Devices { ip := net.ParseIP(dev.IP) @@ -82,7 +86,7 @@ func main() { addr = fmt.Sprintf("%s:%d", addr, port) - err = registryDev.SetSNMP(addr, dev.SNMP.User, dev.SNMP.AuthPassphrase, dev.SNMP.PrivPassphrase) + err = registry.SetDeviceSNMP(ip, addr, dev.SNMP.User, dev.SNMP.AuthPassphrase, dev.SNMP.PrivPassphrase) if err == nil { log.Println("Successfully created SNMP session with", addr) log.Println("Starting device discovery") diff --git a/device/device.go b/device/device.go index 921380c..5ed1322 100644 --- a/device/device.go +++ b/device/device.go @@ -38,21 +38,6 @@ func NewDevice(address net.IP) *Device { } } -func (d *Device) SetSNMP(address, user, auth, priv string) error { - sess, err := snmp.NewSession(address, user, auth, priv) - if err != nil { - return err - } - - err = sess.Discover() - if err != nil { - return err - } - - d.snmpSession = sess - return nil -} - func (d *Device) Discover() { wg := sync.WaitGroup{} diff --git a/device/registry.go b/device/registry.go index 4b4dc22..2f21bf7 100644 --- a/device/registry.go +++ b/device/registry.go @@ -3,6 +3,8 @@ package device import ( "net" "sync" + + "github.com/PreetamJinka/cistern/net/snmp" ) // A Registry is a device registry. @@ -11,14 +13,22 @@ type Registry struct { // devices is a map from IP address string to Device. devices map[string]*Device + sessionManager *snmp.SessionManager + sync.Mutex } // NewRegistry creates a new device registry. -func NewRegistry() *Registry { - return &Registry{ - devices: map[string]*Device{}, +func NewRegistry() (*Registry, error) { + sessionManager, err := snmp.NewSessionManager() + if err != nil { + return nil, err } + + return &Registry{ + devices: map[string]*Device{}, + sessionManager: sessionManager, + }, nil } // Devices returns a slice of devices in the registry. @@ -59,3 +69,20 @@ func (r *Registry) LookupOrAdd(address net.IP) *Device { r.devices[address.String()] = dev return dev } + +func (r *Registry) SetDeviceSNMP(deviceAddress net.IP, snmpAddr, user, auth, priv string) error { + d := r.LookupOrAdd(deviceAddress) + + sess, err := r.sessionManager.NewSession(snmpAddr, user, auth, priv) + if err != nil { + return err + } + + err = sess.Discover() + if err != nil { + return err + } + + d.snmpSession = sess + return nil +} diff --git a/net/snmp b/net/snmp index 1377da8..11ce5b4 160000 --- a/net/snmp +++ b/net/snmp @@ -1 +1 @@ -Subproject commit 1377da800f05c1f74a02a0685c3c34cb344258ba +Subproject commit 11ce5b4dde2fb77fb244c3c70f6e881715a65bd3 From 5b404e11ae797e8f30c4ca97af2f03ffed507e84 Mon Sep 17 00:00:00 2001 From: Preetam Jinka Date: Sat, 10 Jan 2015 12:06:28 -0500 Subject: [PATCH 3/4] remove Bolt --- .gitmodules | 3 --- state/series/bolt | 1 - 2 files changed, 4 deletions(-) delete mode 160000 state/series/bolt diff --git a/.gitmodules b/.gitmodules index 4a56c17..c98cd54 100644 --- a/.gitmodules +++ b/.gitmodules @@ -4,9 +4,6 @@ [submodule "net/proto"] path = net/proto url = git@github.com:PreetamJinka/proto.git -[submodule "state/series/bolt"] - path = state/series/bolt - url = git@github.com:boltdb/bolt.git [submodule "net/snmp"] path = net/snmp url = git@github.com:PreetamJinka/snmp.git diff --git a/state/series/bolt b/state/series/bolt deleted file mode 160000 index 293da01..0000000 --- a/state/series/bolt +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 293da013aec93d4d96c702cc5ddf1aae17ef0bc3 From 71b570ed8ea5db8b889d695ed8dfd0a5dfd389fe Mon Sep 17 00:00:00 2001 From: Preetam Jinka Date: Wed, 21 Jan 2015 13:18:00 -0500 Subject: [PATCH 4/4] Use catena --- api/hosts.go | 70 ++++++++++++++++++++++--- cistern.go | 5 +- state/metrics/hostRegistry.go | 16 +++++- state/series/engine.go | 99 ++++------------------------------- state/series/engine_test.go | 17 ------ 5 files changed, 93 insertions(+), 114 deletions(-) delete mode 100644 state/series/engine_test.go diff --git a/api/hosts.go b/api/hosts.go index d098477..b699f86 100644 --- a/api/hosts.go +++ b/api/hosts.go @@ -4,8 +4,11 @@ import ( "encoding/json" "log" "net/http" + "strconv" "strings" + "time" + "github.com/PreetamJinka/catena" "github.com/PreetamJinka/cistern/state/metrics" "github.com/PreetamJinka/cistern/state/series" ) @@ -46,15 +49,70 @@ func metricStates(reg *metrics.HostRegistry) http.HandlerFunc { func metricSeries(engine *series.Engine) http.HandlerFunc { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - log.Printf("[http] serving %s", r.URL) - enc := json.NewEncoder(w) + w.Header().Add("Access-Control-Allow-Origin", "*") + w.Header().Add("Content-Type", "application/json") - host := r.URL.Query().Get("host") - metric := r.URL.Query().Get("metric") + log.Printf("[http] serving %s", r.URL) - points := engine.Query(host, metric) + enc := json.NewEncoder(w) - enc.Encode(points) + hostsString := r.URL.Query().Get("hosts") + metricsString := r.URL.Query().Get("metrics") + startString := r.URL.Query().Get("start") + endString := r.URL.Query().Get("end") + + hosts := strings.Split(hostsString, ",") + metrics := strings.Split(metricsString, ",") + + start := int64(0) + end := int64(0) + + now := time.Now().Unix() + + var err error + + if startString != "" { + start, err = strconv.ParseInt(startString, 10, 64) + if err != nil { + panic(err) + } + } else { + start = now - 3600 + } + + if start < 0 { + start = start + now + } + + if endString != "" { + end, err = strconv.ParseInt(endString, 10, 64) + if err != nil { + panic(err) + } + } else { + end = now + } + + if end < 0 { + end = end + now + } + + queryDescs := []catena.QueryDesc{} + + for _, host := range hosts { + for _, metric := range metrics { + queryDescs = append(queryDescs, catena.QueryDesc{ + Source: host, + Metric: metric, + Start: start, + End: end, + }) + } + } + + resp := engine.Query(queryDescs) + + enc.Encode(resp) }) } diff --git a/cistern.go b/cistern.go index 8153449..273f87c 100644 --- a/cistern.go +++ b/cistern.go @@ -7,6 +7,7 @@ import ( "log" "net" "os" + "runtime" "github.com/PreetamJinka/udpchan" @@ -27,6 +28,8 @@ var ( func main() { + runtime.GOMAXPROCS(runtime.NumCPU()) + // Flags flag.StringVar(&sflowListenAddr, "sflow-listen-addr", sflowListenAddr, "listen address for sFlow datagrams") flag.StringVar(&apiListenAddr, "api-listen-addr", apiListenAddr, "listen address for HTTP API server") @@ -137,7 +140,7 @@ func main() { go LogDiagnostics(hostRegistry) - engine, err := series.NewEngine("/tmp/cistern.db") + engine, err := series.NewEngine("/tmp/cistern/series") if err != nil { log.Fatal(err) } diff --git a/state/metrics/hostRegistry.go b/state/metrics/hostRegistry.go index 97de08f..355fbaa 100644 --- a/state/metrics/hostRegistry.go +++ b/state/metrics/hostRegistry.go @@ -5,6 +5,7 @@ import ( "sync" "time" + "github.com/PreetamJinka/catena" "github.com/PreetamJinka/cistern/state/series" ) @@ -85,18 +86,31 @@ func (h *HostRegistry) MetricStates(host string, metrics ...string) []float32 { } func (h *HostRegistry) RunSnapshotter(engine *series.Engine) { + now := time.Now() + + <-time.After(now.Add(time.Minute).Truncate(time.Minute).Sub(now)) + for now := range time.Tick(time.Minute) { h.lock.RLock() + rows := catena.Rows{} + for host, metricReg := range h.hosts { for metric, metricState := range metricReg.metrics { metricVal := metricState.Value() if metricVal == metricVal { - engine.InsertPoint(host, metric, now, metricVal) + rows = append(rows, catena.Row{ + Source: host, + Metric: metric, + Timestamp: now.Unix(), + Value: float64(metricVal), + }) } } } h.lock.RUnlock() + + engine.InsertRows(rows) } } diff --git a/state/series/engine.go b/state/series/engine.go index 86a0eb3..ee621d6 100644 --- a/state/series/engine.go +++ b/state/series/engine.go @@ -1,103 +1,24 @@ package series import ( - "bytes" - "encoding/binary" - "errors" - "time" - - "github.com/PreetamJinka/cistern/state/series/bolt" + "github.com/PreetamJinka/catena" ) type Engine struct { - db *bolt.DB + *catena.DB } -func NewEngine(filename string) (*Engine, error) { - db, err := bolt.Open(filename, 0666, nil) +func NewEngine(baseDir string) (*Engine, error) { + db, err := catena.OpenDB(baseDir) + if err != nil { - return nil, err + db, err = catena.NewDB(baseDir) + if err != nil { + return nil, err + } } return &Engine{ - db: db, + db, }, nil } - -func (e *Engine) InsertPoint(host, metric string, ts time.Time, value float32) error { - return e.db.Update(func(tx *bolt.Tx) error { - hostBucket, err := tx.CreateBucketIfNotExists([]byte(host)) - if err != nil { - return err - } - - metricBucket, err := hostBucket.CreateBucketIfNotExists([]byte(metric)) - if err != nil { - return err - } - - key := &bytes.Buffer{} - val := &bytes.Buffer{} - - err = binary.Write(key, binary.BigEndian, uint64(ts.Unix())) - if err != nil { - return err - } - - err = binary.Write(val, binary.LittleEndian, value) - if err != nil { - return err - } - - return metricBucket.Put(key.Bytes(), val.Bytes()) - }) -} - -type point struct { - Timestamp int64 `json:"ts"` - Value float32 `json:"value"` -} - -func (e *Engine) Query(host, metric string) []point { - result := []point{} - - e.db.View(func(tx *bolt.Tx) error { - hostBucket := tx.Bucket([]byte(host)) - - if hostBucket == nil { - return errors.New("unknown host") - } - - metricBucket := hostBucket.Bucket([]byte(metric)) - if metricBucket == nil { - return errors.New("unknown metric") - } - - c := metricBucket.Cursor() - for k, v := c.First(); k != nil; k, v = c.Next() { - ts, val := decodeKeyValue(k, v) - - result = append(result, point{ - Timestamp: ts.Unix(), - Value: val, - }) - } - - return nil - }) - - return result -} - -func decodeKeyValue(key, value []byte) (time.Time, float32) { - var ts time.Time - - var unixTime uint64 - binary.Read(bytes.NewReader(key), binary.BigEndian, &unixTime) - ts = time.Unix(int64(unixTime), 0) - - var val float32 - binary.Read(bytes.NewReader(value), binary.LittleEndian, &val) - - return ts, val -} diff --git a/state/series/engine_test.go b/state/series/engine_test.go deleted file mode 100644 index 670e2c4..0000000 --- a/state/series/engine_test.go +++ /dev/null @@ -1,17 +0,0 @@ -package series - -import ( - "os" - "testing" -) - -func TestEngine(t *testing.T) { - os.RemoveAll("/tmp/series.db") - - engine, err := NewEngine("/tmp/series.db") - if err != nil { - t.Fatal(err) - } - - t.Log(engine.db.Stats()) -}