Skip to content

Commit

Permalink
Merge pull request #25 from PreetamJinka/catena
Browse files Browse the repository at this point in the history
Revision 5
  • Loading branch information
Preetam committed Feb 9, 2015
2 parents 327be1e + 71b570e commit 8ea5920
Show file tree
Hide file tree
Showing 15 changed files with 368 additions and 167 deletions.
3 changes: 0 additions & 3 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@
[submodule "net/proto"]
path = net/proto
url = [email protected]:PreetamJinka/proto.git
[submodule "state/series/bolt"]
path = state/series/bolt
url = [email protected]:boltdb/bolt.git
[submodule "net/snmp"]
path = net/snmp
url = [email protected]:PreetamJinka/snmp.git
70 changes: 64 additions & 6 deletions api/hosts.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@ import (
"encoding/json"
"log"
"net/http"
"strconv"
"strings"
"time"

"github.com/PreetamJinka/catena"
"github.com/PreetamJinka/cistern/state/metrics"
"github.com/PreetamJinka/cistern/state/series"
)
Expand Down Expand Up @@ -46,15 +49,70 @@ func metricStates(reg *metrics.HostRegistry) http.HandlerFunc {

func metricSeries(engine *series.Engine) http.HandlerFunc {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
log.Printf("[http] serving %s", r.URL)

enc := json.NewEncoder(w)
w.Header().Add("Access-Control-Allow-Origin", "*")
w.Header().Add("Content-Type", "application/json")

host := r.URL.Query().Get("host")
metric := r.URL.Query().Get("metric")
log.Printf("[http] serving %s", r.URL)

points := engine.Query(host, metric)
enc := json.NewEncoder(w)

enc.Encode(points)
hostsString := r.URL.Query().Get("hosts")
metricsString := r.URL.Query().Get("metrics")
startString := r.URL.Query().Get("start")
endString := r.URL.Query().Get("end")

hosts := strings.Split(hostsString, ",")
metrics := strings.Split(metricsString, ",")

start := int64(0)
end := int64(0)

now := time.Now().Unix()

var err error

if startString != "" {
start, err = strconv.ParseInt(startString, 10, 64)
if err != nil {
panic(err)
}
} else {
start = now - 3600
}

if start < 0 {
start = start + now
}

if endString != "" {
end, err = strconv.ParseInt(endString, 10, 64)
if err != nil {
panic(err)
}
} else {
end = now
}

if end < 0 {
end = end + now
}

queryDescs := []catena.QueryDesc{}

for _, host := range hosts {
for _, metric := range metrics {
queryDescs = append(queryDescs, catena.QueryDesc{
Source: host,
Metric: metric,
Start: start,
End: end,
})
}
}

resp := engine.Query(queryDescs)

enc.Encode(resp)
})
}
103 changes: 57 additions & 46 deletions cistern.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
package main

import (
"encoding/json"
"flag"
"fmt"
"log"
"net"
"os"
"runtime"

"github.com/PreetamJinka/udpchan"

"github.com/PreetamJinka/cistern/api"
"github.com/PreetamJinka/cistern/config"
"github.com/PreetamJinka/cistern/decode"
"github.com/PreetamJinka/cistern/net/snmp"
"github.com/PreetamJinka/cistern/device"
"github.com/PreetamJinka/cistern/pipeline"
"github.com/PreetamJinka/cistern/state/metrics"
"github.com/PreetamJinka/cistern/state/series"
Expand All @@ -19,75 +24,81 @@ var (
sflowListenAddr = ":6343"
apiListenAddr = ":8080"
configFile = "/opt/cistern/config.json"

descOid = snmp.MustParseOID(".1.3.6.1.2.1.1.1.0")
hostnameOid = snmp.MustParseOID(".1.3.6.1.2.1.1.5.0")
)

func main() {

runtime.GOMAXPROCS(runtime.NumCPU())

// Flags
flag.StringVar(&sflowListenAddr, "sflow-listen-addr", sflowListenAddr, "listen address for sFlow datagrams")
flag.StringVar(&apiListenAddr, "api-listen-addr", apiListenAddr, "listen address for HTTP API server")
flag.StringVar(&configFile, "config", configFile, "configuration file")
showVersion := flag.Bool("version", false, "Show version")
flag.Parse()

log.SetFlags(log.Lshortfile | log.Lmicroseconds)

if *showVersion {
fmt.Println("Cistern version", version)
os.Exit(0)
}

log.Printf("Cistern version %s starting", version)

log.Printf("Loading configuration file at %s", configFile)
log.Printf("Attempting to load configuration file at %s", configFile)

conf, err := config.Load(configFile)
if err != nil {
log.Print(err)
log.Printf("Could not load configuration: %v", err)
}

for _, device := range conf.SNMPDevices {
// TODO: refactor this part out
go func(dev config.SNMPEntry) {
session, err := snmp.NewSession(dev.Address, dev.User, dev.AuthPassphrase, dev.PrivPassphrase)
if err != nil {
log.Println(err)
return
}
// Log the loaded config
confBytes, err := json.MarshalIndent(conf, " ", " ")
if err != nil {
log.Println("Could not log config:", err)
} else {
log.Println("\n " + string(confBytes))
}

err = session.Discover()
if err != nil {
log.Printf("[SNMP] Discovery failed for %s", dev.Address)
return
}
registry, err := device.NewRegistry()
if err != nil {
log.Fatal(err)
}

resp, err := session.Get(descOid)
if err != nil {
log.Printf("[SNMP] Get desc failed for %s", dev.Address)
return
}
for _, dev := range conf.Devices {

deviceDesc := ""
ip := net.ParseIP(dev.IP)
// Add a device to the registry
registryDev := registry.LookupOrAdd(ip)

if vbinds := resp.Varbinds(); len(vbinds) > 0 {
deviceDesc, err = vbinds[0].GetStringValue()
if err != nil {
log.Printf("[SNMP] Did not get a string value for device description for %s", dev.Address)
return
}
if dev.SNMP != nil {
// We have an SNMP config
addr := ip.String()
if ip.To4() == nil {
// IPv6 addresses need to be surrounded
// with `[` and `]`.
addr = "[" + addr + "]"
}

resp, err = session.Get(hostnameOid)
if err != nil {
log.Printf("[SNMP] Get hostname failed for %s", dev.Address)
return
port := 161

if dev.SNMP.Port != 0 {
port = dev.SNMP.Port
}

deviceHostname := ""
addr = fmt.Sprintf("%s:%d", addr, port)

if vbinds := resp.Varbinds(); len(vbinds) > 0 {
deviceHostname, err = vbinds[0].GetStringValue()
if err != nil {
log.Printf("[SNMP] Did not get a string value for device hostname for %s", dev.Address)
return
}
}
err = registry.SetDeviceSNMP(ip, addr, dev.SNMP.User, dev.SNMP.AuthPassphrase, dev.SNMP.PrivPassphrase)
if err == nil {
log.Println("Successfully created SNMP session with", addr)
log.Println("Starting device discovery")

log.Printf("[SNMP] Discovery\n at %s [%s]:\n %s", dev.Address, deviceHostname, deviceDesc)
}(device)
registryDev.Discover()
} else {
log.Printf("SNMP session creation failed for %v: %v", addr, err)
}
}
}

// start listening
Expand Down Expand Up @@ -129,7 +140,7 @@ func main() {

go LogDiagnostics(hostRegistry)

engine, err := series.NewEngine("/tmp/cistern.db")
engine, err := series.NewEngine("/tmp/cistern/series")
if err != nil {
log.Fatal(err)
}
Expand Down
14 changes: 13 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,19 @@ import (
)

type Configuration struct {
SNMPDevices []SNMPEntry `json:"snmpDevices"`
Devices []ConfigDevice `json:"devices"`
}

type ConfigDevice struct {
IP string `json:"ip"`
SNMP *ConfigDeviceSNMP `json:"snmp"`
}

type ConfigDeviceSNMP struct {
Port int `json:"port"`
User string `json:"user"`
AuthPassphrase string `json:"auth"`
PrivPassphrase string `json:"priv"`
}

type SNMPEntry struct {
Expand Down
92 changes: 92 additions & 0 deletions device/device.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package device

import (
"log"
"net"
"sync"

"github.com/PreetamJinka/cistern/net/snmp"
)

type deviceType int

const (
TypeUnknown deviceType = 0

TypeNetwork deviceType = 1 << (iota - 1)
TypeLinux
)

var (
descOid = snmp.MustParseOID(".1.3.6.1.2.1.1.1.0")
hostnameOid = snmp.MustParseOID(".1.3.6.1.2.1.1.5.0")
)

// A Device is an entity that sends flows or
// makes information available via SNMP.
type Device struct {
hostname string
desc string
ip net.IP
snmpSession *snmp.Session
}

// NewDevice returns a new Device with the given IP.
func NewDevice(address net.IP) *Device {
return &Device{
ip: address,
}
}

func (d *Device) Discover() {
wg := sync.WaitGroup{}

wg.Add(2)
go func() {
defer wg.Done()

// Discover hostname
getRes, err := d.snmpSession.Get(hostnameOid)
if err != nil {
log.Printf("[SNMP %v] Could not get hostname: %v", d.ip, err)
return
}

if vbinds := getRes.Varbinds(); len(vbinds) > 0 {
hostnameStr, err := vbinds[0].GetStringValue()
if err != nil {
log.Printf("[SNMP %v] Invalid GetResponse for hostname: %v", d.ip, err)
return
}

d.hostname = hostnameStr

log.Printf("[SNMP %v] Discovered hostname %s", d.ip, hostnameStr)
}
}()

go func() {
defer wg.Done()

// Discover description
getRes, err := d.snmpSession.Get(descOid)
if err != nil {
log.Printf("[SNMP %v] Could not get description: %v", d.ip, err)
return
}

if vbinds := getRes.Varbinds(); len(vbinds) > 0 {
descStr, err := vbinds[0].GetStringValue()
if err != nil {
log.Printf("[SNMP %v] Invalid GetResponse for description: %v", d.ip, err)
return
}

d.desc = descStr

log.Printf("[SNMP %v] Discovered desc %s", d.ip, descStr)
}
}()

wg.Wait()
}
9 changes: 9 additions & 0 deletions device/device_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package device

import (
"testing"
)

func TestDevice1(t *testing.T) {

}
Loading

0 comments on commit 8ea5920

Please sign in to comment.