diff --git a/.gitmodules b/.gitmodules deleted file mode 100644 index c98cd54..0000000 --- a/.gitmodules +++ /dev/null @@ -1,9 +0,0 @@ -[submodule "proto/sflow"] - path = net/sflow - url = git://github.com/PreetamJinka/sflow.git -[submodule "net/proto"] - path = net/proto - url = git@github.com:PreetamJinka/proto.git -[submodule "net/snmp"] - path = net/snmp - url = git@github.com:PreetamJinka/snmp.git diff --git a/api/api.go b/api/api.go index ab27312..1363f87 100644 --- a/api/api.go +++ b/api/api.go @@ -1,30 +1,138 @@ package api import ( + "encoding/json" + "net" "net/http" - "github.com/PreetamJinka/cistern/state/metrics" + "github.com/VividCortex/siesta" + + "github.com/PreetamJinka/cistern/device" + "github.com/PreetamJinka/cistern/state/flows" "github.com/PreetamJinka/cistern/state/series" ) -type ApiServer struct { - addr string - hostRegistry *metrics.HostRegistry - engine *series.Engine +type APIServer struct { + addr string + deviceRegistry *device.Registry + seriesEngine *series.Engine } -func NewApiServer(address string, reg *metrics.HostRegistry, engine *series.Engine) *ApiServer { - return &ApiServer{ - addr: address, - hostRegistry: reg, - engine: engine, +func NewAPIServer(address string, deviceRegistry *device.Registry, seriesEngine *series.Engine) *APIServer { + return &APIServer{ + addr: address, + deviceRegistry: deviceRegistry, + seriesEngine: seriesEngine, } } -func (s *ApiServer) Run() { - http.Handle("/hosts", hostStatus(s.hostRegistry)) - http.Handle("/metrics", hostMetrics(s.hostRegistry)) - http.Handle("/metricstates", metricStates(s.hostRegistry)) - http.Handle("/metricseries", metricSeries(s.engine)) +func (s *APIServer) Run() { + service := siesta.NewService("/") + + service.AddPre(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Access-Control-Allow-Origin", "*") + }) + + service.AddPost(func(c siesta.Context, w http.ResponseWriter, r *http.Request, q func()) { + resp := c.Get(responseKey) + err, _ := c.Get(errorKey).(string) + + if resp == nil && err == "" { + return + } + + enc := json.NewEncoder(w) + enc.Encode(APIResponse{ + Data: resp, + Error: err, + }) + }) + + service.Route("GET", "/", "Default page", func(c siesta.Context, w http.ResponseWriter, r *http.Request) { + c.Set(responseKey, "Welcome to the Cistern API!") + }) + + service.Route("GET", "/devices", "Lists sources", func(c siesta.Context, w http.ResponseWriter, r *http.Request) { + type ipHostname struct { + IP string `json:"ip"` + Hostname string `json:"hostname,omitempty"` + } + + devices := []ipHostname{} + + for _, dev := range s.deviceRegistry.Devices() { + devices = append(devices, ipHostname{ + IP: dev.IP().String(), + Hostname: dev.Hostname(), + }) + } + + c.Set(responseKey, devices) + }) + + service.Route("GET", "/devices/:device/metrics", + "Lists metrics for a device", + func(c siesta.Context, w http.ResponseWriter, r *http.Request) { + var params siesta.Params + device := params.String("device", "", "Device name") + err := params.Parse(r.Form) + if err != nil { + c.Set(errorKey, err.Error()) + return + } + + address := net.ParseIP(*device) + dev, present := s.deviceRegistry.Lookup(address) + if !present { + c.Set(errorKey, "device not found") + return + } + + c.Set(responseKey, dev.Metrics()) + }) + + service.Route("GET", "/devices/:device/flows", + "Lists top flows for a device", + func(c siesta.Context, w http.ResponseWriter, r *http.Request) { + var params siesta.Params + device := params.String("device", "", "Device name") + err := params.Parse(r.Form) + if err != nil { + c.Set(errorKey, err.Error()) + return + } + + address := net.ParseIP(*device) + dev, present := s.deviceRegistry.Lookup(address) + if !present { + c.Set(errorKey, "device not found") + return + } + + type flowsResponse struct { + ByBytes []flows.Flow `json:"byBytes"` + ByPackets []flows.Flow `json:"byPackets"` + } + + topTalkers := dev.TopTalkers() + if topTalkers == nil { + c.Set(errorKey, "No active flows") + return + } + + resp := flowsResponse{ + ByBytes: topTalkers.ByBytes(), + ByPackets: topTalkers.ByPackets(), + } + + c.Set(responseKey, resp) + }) + + service.Route("GET", "/series/query", + "Lists metrics for a device", + s.querySeriesRoute()) + + http.Handle("/", service) + go http.ListenAndServe(s.addr, nil) } diff --git a/api/context_keys.go b/api/context_keys.go new file mode 100644 index 0000000..38a1068 --- /dev/null +++ b/api/context_keys.go @@ -0,0 +1,6 @@ +package api + +const ( + responseKey = "response" + errorKey = "error" +) diff --git a/api/hosts.go b/api/hosts.go deleted file mode 100644 index b699f86..0000000 --- a/api/hosts.go +++ /dev/null @@ -1,118 +0,0 @@ -package api - -import ( - "encoding/json" - "log" - "net/http" - "strconv" - "strings" - "time" - - "github.com/PreetamJinka/catena" - "github.com/PreetamJinka/cistern/state/metrics" - "github.com/PreetamJinka/cistern/state/series" -) - -func hostStatus(reg *metrics.HostRegistry) http.HandlerFunc { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - log.Printf("[http] serving %s", r.URL) - - enc := json.NewEncoder(w) - - enc.Encode(reg.Hosts()) - }) -} - -func hostMetrics(reg *metrics.HostRegistry) http.HandlerFunc { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - log.Printf("[http] serving %s", r.URL) - - enc := json.NewEncoder(w) - - host := r.URL.Query().Get("host") - enc.Encode(reg.Metrics(host)) - }) -} - -func metricStates(reg *metrics.HostRegistry) http.HandlerFunc { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - log.Printf("[http] serving %s", r.URL) - - enc := json.NewEncoder(w) - - host := r.URL.Query().Get("host") - metrics := strings.Split(r.URL.Query().Get("metrics"), ",") - - enc.Encode(reg.MetricStates(host, metrics...)) - }) -} - -func metricSeries(engine *series.Engine) http.HandlerFunc { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - - w.Header().Add("Access-Control-Allow-Origin", "*") - w.Header().Add("Content-Type", "application/json") - - log.Printf("[http] serving %s", r.URL) - - enc := json.NewEncoder(w) - - hostsString := r.URL.Query().Get("hosts") - metricsString := r.URL.Query().Get("metrics") - startString := r.URL.Query().Get("start") - endString := r.URL.Query().Get("end") - - hosts := strings.Split(hostsString, ",") - metrics := strings.Split(metricsString, ",") - - start := int64(0) - end := int64(0) - - now := time.Now().Unix() - - var err error - - if startString != "" { - start, err = strconv.ParseInt(startString, 10, 64) - if err != nil { - panic(err) - } - } else { - start = now - 3600 - } - - if start < 0 { - start = start + now - } - - if endString != "" { - end, err = strconv.ParseInt(endString, 10, 64) - if err != nil { - panic(err) - } - } else { - end = now - } - - if end < 0 { - end = end + now - } - - queryDescs := []catena.QueryDesc{} - - for _, host := range hosts { - for _, metric := range metrics { - queryDescs = append(queryDescs, catena.QueryDesc{ - Source: host, - Metric: metric, - Start: start, - End: end, - }) - } - } - - resp := engine.Query(queryDescs) - - enc.Encode(resp) - }) -} diff --git a/api/response.go b/api/response.go new file mode 100644 index 0000000..a484942 --- /dev/null +++ b/api/response.go @@ -0,0 +1,6 @@ +package api + +type APIResponse struct { + Data interface{} `json:"data,omitempty"` + Error string `json:"error,omitempty"` +} diff --git a/api/series_query.go b/api/series_query.go new file mode 100644 index 0000000..345f81b --- /dev/null +++ b/api/series_query.go @@ -0,0 +1,82 @@ +package api + +import ( + "encoding/json" + "net/http" + "time" + + "github.com/PreetamJinka/catena" + "github.com/VividCortex/siesta" +) + +func (s *APIServer) querySeriesRoute() func(siesta.Context, http.ResponseWriter, *http.Request) { + return func(c siesta.Context, w http.ResponseWriter, r *http.Request) { + var params siesta.Params + downsample := params.Int64("downsample", 0, "A downsample value of averages N points at a time") + err := params.Parse(r.Form) + if err != nil { + c.Set(errorKey, err.Error()) + return + } + + var descs []catena.QueryDesc + + dec := json.NewDecoder(r.Body) + err = dec.Decode(&descs) + if err != nil { + c.Set(errorKey, err.Error()) + return + } + + now := time.Now().Unix() + for i, desc := range descs { + if desc.Start <= 0 { + desc.Start += now + } + + if desc.End <= 0 { + desc.End += now + } + + descs[i] = desc + } + + resp := s.seriesEngine.Query(descs) + + if *downsample <= 1 { + c.Set(responseKey, resp) + return + } + + for i, series := range resp.Series { + pointIndex := 0 + seenPoints := 1 + currentPartition := series.Points[0].Timestamp / *downsample + for j, p := range series.Points { + if j == 0 { + continue + } + + if p.Timestamp / *downsample == currentPartition { + series.Points[pointIndex].Value += p.Value + seenPoints++ + } else { + currentPartition = p.Timestamp / *downsample + series.Points[pointIndex].Value /= float64(seenPoints) + pointIndex++ + seenPoints = 1 + series.Points[pointIndex] = p + } + + if j == len(series.Points) { + series.Points[pointIndex].Value /= float64(seenPoints) + } + } + + series.Points = series.Points[:pointIndex] + resp.Series[i] = series + } + + c.Set(responseKey, resp) + } +} diff --git a/cistern.go b/cistern.go index 273f87c..9a1297b 100644 --- a/cistern.go +++ b/cistern.go @@ -15,8 +15,6 @@ import ( "github.com/PreetamJinka/cistern/config" "github.com/PreetamJinka/cistern/decode" "github.com/PreetamJinka/cistern/device" - "github.com/PreetamJinka/cistern/pipeline" - "github.com/PreetamJinka/cistern/state/metrics" "github.com/PreetamJinka/cistern/state/series" ) @@ -35,6 +33,8 @@ func main() { flag.StringVar(&apiListenAddr, "api-listen-addr", apiListenAddr, "listen address for HTTP API server") flag.StringVar(&configFile, "config", configFile, "configuration file") showVersion := flag.Bool("version", false, "Show version") + showLicense := flag.Bool("license", false, "Show software licenses") + showConfig := flag.Bool("show-config", false, "Show loaded config file") flag.Parse() log.SetFlags(log.Lshortfile | log.Lmicroseconds) @@ -44,6 +44,11 @@ func main() { os.Exit(0) } + if *showLicense { + fmt.Println(license) + os.Exit(0) + } + log.Printf("Cistern version %s starting", version) log.Printf("Attempting to load configuration file at %s", configFile) @@ -58,10 +63,17 @@ func main() { if err != nil { log.Println("Could not log config:", err) } else { - log.Println("\n " + string(confBytes)) + if *showConfig { + log.Println("\n " + string(confBytes)) + } + } + + engine, err := series.NewEngine("/tmp/cistern/series") + if err != nil { + log.Fatal(err) } - registry, err := device.NewRegistry() + registry, err := device.NewRegistry(engine.Inbound) if err != nil { log.Fatal(err) } @@ -113,43 +125,18 @@ func main() { sflowDecoder := decode.NewSflowDecoder(c, 16) sflowDecoder.Run() - hostRegistry := metrics.NewHostRegistry() - - processingPipeline := &pipeline.Pipeline{} - processingPipeline.Add(pipeline.NewHostProcessor(hostRegistry)) - processingPipeline.Add(pipeline.NewGenericInterfaceCountersProcessor(hostRegistry)) - processingPipeline.Add(pipeline.NewRawPacketProcessor(hostRegistry)) - - pipelineMessages := make(chan pipeline.Message, 16) - // TODO: refactor this part out go func() { for datagram := range sflowDecoder.Outbound() { - source := datagram.IpAddress.String() - - for _, sample := range datagram.Samples { - for _, record := range sample.GetRecords() { - pipelineMessages <- pipeline.Message{ - Source: source, - Record: record, - } - } - } + source := datagram.IpAddress + + registryDev := registry.LookupOrAdd(source) + registryDev.Inbound <- datagram } }() - processingPipeline.Run(pipelineMessages) - - go LogDiagnostics(hostRegistry) - - engine, err := series.NewEngine("/tmp/cistern/series") - if err != nil { - log.Fatal(err) - } - - api := api.NewApiServer(apiListenAddr, hostRegistry, engine) - api.Run() - log.Printf("started API server listening on %s", apiListenAddr) - go hostRegistry.RunSnapshotter(engine) + apiServer := api.NewAPIServer(apiListenAddr, registry, engine) + apiServer.Run() + log.Printf("API server started on %s", apiListenAddr) // make sure we don't exit <-make(chan struct{}) diff --git a/decode/sflow.go b/decode/sflow.go index 3ae6d46..01505a4 100644 --- a/decode/sflow.go +++ b/decode/sflow.go @@ -1,7 +1,7 @@ package decode import ( - "github.com/PreetamJinka/cistern/net/sflow" + "github.com/PreetamJinka/sflow" "bytes" "log" diff --git a/device/device.go b/device/device.go index 5ed1322..48e36a4 100644 --- a/device/device.go +++ b/device/device.go @@ -4,8 +4,14 @@ import ( "log" "net" "sync" + "time" - "github.com/PreetamJinka/cistern/net/snmp" + "github.com/PreetamJinka/sflow" + "github.com/PreetamJinka/snmp" + + "github.com/PreetamJinka/cistern/state/flows" + "github.com/PreetamJinka/cistern/state/metrics" + "github.com/PreetamJinka/cistern/state/series" ) type deviceType int @@ -15,6 +21,7 @@ const ( TypeNetwork deviceType = 1 << (iota - 1) TypeLinux + TypeBSD ) var ( @@ -25,17 +32,33 @@ var ( // A Device is an entity that sends flows or // makes information available via SNMP. type Device struct { - hostname string - desc string - ip net.IP - snmpSession *snmp.Session + hostname string + desc string + ip net.IP + deviceType deviceType + + snmpSession *snmp.Session + metricRegistry *metrics.MetricRegistry + topTalkers *flows.TopTalkers + + Inbound chan sflow.Datagram + outbound chan series.Observation } // NewDevice returns a new Device with the given IP. -func NewDevice(address net.IP) *Device { - return &Device{ +func NewDevice(address net.IP, outboundObservations chan series.Observation) *Device { + dev := &Device{ ip: address, + + metricRegistry: metrics.NewMetricRegistry(), + + Inbound: make(chan sflow.Datagram), + outbound: outboundObservations, } + + go dev.handleFlows() + + return dev } func (d *Device) Discover() { @@ -90,3 +113,62 @@ func (d *Device) Discover() { wg.Wait() } + +func (d *Device) Metrics() []metrics.MetricDefinition { + return d.metricRegistry.Metrics() +} + +func (d *Device) IP() net.IP { + return d.ip +} + +func (d *Device) Hostname() string { + return d.hostname +} + +func (d *Device) TopTalkers() *flows.TopTalkers { + return d.topTalkers +} + +func (d *Device) handleFlows() { + + log.Printf("[Device %v] Handling flows", d.ip) + + for dgram := range d.Inbound { + for _, sample := range dgram.Samples { + for _, record := range sample.GetRecords() { + d.processFlowRecord(record) + } + } + } +} + +func (d *Device) processFlowRecord(r sflow.Record) { + switch r.(type) { + case sflow.HostCpuCounters: + d.processHostCPUCounters(r.(sflow.HostCpuCounters)) + case sflow.HostMemoryCounters: + d.processHostMemoryCounters(r.(sflow.HostMemoryCounters)) + case sflow.HostDiskCounters: + d.processHostDiskCounters(r.(sflow.HostDiskCounters)) + case sflow.HostNetCounters: + d.processHostNetCounters(r.(sflow.HostNetCounters)) + case sflow.GenericInterfaceCounters: + d.processGenericInterfaceCounters(r.(sflow.GenericInterfaceCounters)) + case sflow.RawPacketFlow: + if d.topTalkers == nil { + d.topTalkers = flows.NewTopTalkers(time.Second * 30) + } + d.processRawPacketFlow(r.(sflow.RawPacketFlow)) + } +} + +func (d *Device) updateAndEmit(metric string, metricType metrics.MetricType, v interface{}) { + value := d.metricRegistry.Update(metric, metricType, v) + d.outbound <- series.Observation{ + Source: d.ip.String(), + Metric: metric, + Timestamp: time.Now().Unix(), + Value: float64(value), + } +} diff --git a/device/device_test.go b/device/device_test.go deleted file mode 100644 index 2526360..0000000 --- a/device/device_test.go +++ /dev/null @@ -1,9 +0,0 @@ -package device - -import ( - "testing" -) - -func TestDevice1(t *testing.T) { - -} diff --git a/device/flow_records.go b/device/flow_records.go new file mode 100644 index 0000000..675c0a8 --- /dev/null +++ b/device/flow_records.go @@ -0,0 +1,167 @@ +package device + +import ( + "fmt" + "log" + "net" + + "github.com/PreetamJinka/proto" + "github.com/PreetamJinka/sflow" + + "github.com/PreetamJinka/cistern/state/metrics" +) + +func (d *Device) processHostCPUCounters(c sflow.HostCpuCounters) { + d.updateAndEmit("cpu.user", metrics.TypeDerivative, c.CpuUser) + d.updateAndEmit("cpu.nice", metrics.TypeDerivative, c.CpuNice) + d.updateAndEmit("cpu.sys", metrics.TypeDerivative, c.CpuSys) + d.updateAndEmit("cpu.idle", metrics.TypeDerivative, c.CpuIdle) + d.updateAndEmit("cpu.wio", metrics.TypeDerivative, c.CpuWio) + d.updateAndEmit("cpu.intr", metrics.TypeDerivative, c.CpuIntr) + d.updateAndEmit("cpu.softintr", metrics.TypeDerivative, c.CpuSoftIntr) +} + +func (d *Device) processHostMemoryCounters(c sflow.HostMemoryCounters) { + d.updateAndEmit("mem.total", metrics.TypeGauge, c.Total) + d.updateAndEmit("mem.free", metrics.TypeGauge, c.Free) + d.updateAndEmit("mem.shared", metrics.TypeGauge, c.Shared) + d.updateAndEmit("mem.buffers", metrics.TypeGauge, c.Buffers) + d.updateAndEmit("mem.cached", metrics.TypeGauge, c.Cached) + d.updateAndEmit("mem.swap_total", metrics.TypeGauge, c.SwapTotal) + d.updateAndEmit("mem.swap_free", metrics.TypeGauge, c.SwapFree) + + d.updateAndEmit("mem.page_in", metrics.TypeDerivative, c.PageIn) + d.updateAndEmit("mem.page_out", metrics.TypeDerivative, c.PageOut) + d.updateAndEmit("mem.swap_in", metrics.TypeDerivative, c.SwapIn) + d.updateAndEmit("mem.swap_out", metrics.TypeDerivative, c.SwapOut) +} + +func (d *Device) processHostDiskCounters(c sflow.HostDiskCounters) { + d.updateAndEmit("disk.total", metrics.TypeGauge, c.Total) + d.updateAndEmit("disk.free", metrics.TypeGauge, c.Free) + d.updateAndEmit("disk.max_used", metrics.TypeGauge, c.MaxUsedPercent) + + d.updateAndEmit("disk.reads", metrics.TypeDerivative, c.Reads) + d.updateAndEmit("disk.bytes_read", metrics.TypeDerivative, c.BytesRead) + d.updateAndEmit("disk.read_time", metrics.TypeDerivative, c.ReadTime) + + d.updateAndEmit("disk.writes", metrics.TypeDerivative, c.Writes) + d.updateAndEmit("disk.bytes_written", metrics.TypeDerivative, c.BytesWritten) + d.updateAndEmit("disk.write_time", metrics.TypeDerivative, c.WriteTime) +} + +func (d *Device) processHostNetCounters(c sflow.HostNetCounters) { + d.updateAndEmit("net.bytes_in", metrics.TypeDerivative, c.BytesIn) + d.updateAndEmit("net.packets_in", metrics.TypeDerivative, c.PacketsIn) + d.updateAndEmit("net.errs_in", metrics.TypeDerivative, c.ErrsIn) + d.updateAndEmit("net.drops_in", metrics.TypeDerivative, c.DropsIn) + + d.updateAndEmit("net.bytes_out", metrics.TypeDerivative, c.BytesOut) + d.updateAndEmit("net.packets_out", metrics.TypeDerivative, c.PacketsOut) + d.updateAndEmit("net.errs_out", metrics.TypeDerivative, c.ErrsOut) + d.updateAndEmit("net.drops_out", metrics.TypeDerivative, c.DropsOut) +} + +func (d *Device) processGenericInterfaceCounters(c sflow.GenericInterfaceCounters) { + d.updateAndEmit(fmt.Sprintf("if%d.%s", c.Index, "octets_in"), metrics.TypeDerivative, c.InOctets) + d.updateAndEmit(fmt.Sprintf("if%d.%s", c.Index, "unicast_packets_in"), metrics.TypeDerivative, c.InUcastPkts) + d.updateAndEmit(fmt.Sprintf("if%d.%s", c.Index, "multicast_packets_in"), metrics.TypeDerivative, c.InMulticastPkts) + d.updateAndEmit(fmt.Sprintf("if%d.%s", c.Index, "broadcast_packets_in"), metrics.TypeDerivative, c.InBroadcastPkts) + d.updateAndEmit(fmt.Sprintf("if%d.%s", c.Index, "errors_in"), metrics.TypeDerivative, c.InErrors) + d.updateAndEmit(fmt.Sprintf("if%d.%s", c.Index, "unknown_proto_in"), metrics.TypeDerivative, c.InUnknownProtos) + + d.updateAndEmit(fmt.Sprintf("if%d.%s", c.Index, "octets_out"), metrics.TypeDerivative, c.OutOctets) + d.updateAndEmit(fmt.Sprintf("if%d.%s", c.Index, "unicast_packets_out"), metrics.TypeDerivative, c.OutUcastPkts) + d.updateAndEmit(fmt.Sprintf("if%d.%s", c.Index, "multicast_packets_out"), metrics.TypeDerivative, c.OutMulticastPkts) + d.updateAndEmit(fmt.Sprintf("if%d.%s", c.Index, "broadcast_packets_out"), metrics.TypeDerivative, c.OutBroadcastPkts) + d.updateAndEmit(fmt.Sprintf("if%d.%s", c.Index, "errors_out"), metrics.TypeDerivative, c.OutErrors) +} + +func (d *Device) processRawPacketFlow(c sflow.RawPacketFlow) { + sampleBytes := c.Header + + ethernetPacket, err := proto.DecodeEthernet(sampleBytes) + if err != nil { + log.Println("DecodeEthernet:", err) + return + } + + var ( + protocol uint8 + protocolStr = "" + + sourceAddr net.IP + destAddr net.IP + + sourcePort uint16 + destPort uint16 + + length uint16 + ) + + var ipPayload []byte + + switch ethernetPacket.EtherType { + case 0x0800: + ipv4Packet, err := proto.DecodeIPv4(ethernetPacket.Payload) + if err != nil { + log.Println("DecodeIPv4:", err) + return + } + + sourceAddr = ipv4Packet.Source + destAddr = ipv4Packet.Destination + ipPayload = ipv4Packet.Payload + + protocol = ipv4Packet.Protocol + + length = ipv4Packet.Length + + case 0x86dd: + ipv6Packet, err := proto.DecodeIPv6(ethernetPacket.Payload) + if err != nil { + log.Println("DecodeIPv6:", err) + return + } + + sourceAddr = ipv6Packet.Source + destAddr = ipv6Packet.Destination + ipPayload = ipv6Packet.Payload + + protocol = ipv6Packet.NextHeader + + length = ipv6Packet.Length + } + + switch protocol { + case 0x6: + tcpPacket, err := proto.DecodeTCP(ipPayload) + if err != nil { + log.Println("DecodeTCP:", err) + return + } + + sourcePort = tcpPacket.SourcePort + destPort = tcpPacket.DestinationPort + + protocolStr = "TCP" + + case 0x11: + udpPacket, err := proto.DecodeUDP(ipPayload) + if err != nil { + log.Println("DecodeUDP:", err) + return + } + + sourcePort = udpPacket.SourcePort + destPort = udpPacket.DestinationPort + + protocolStr = "UDP" + } + + if sourcePort+destPort > 0 { + d.topTalkers.Update(protocolStr, sourceAddr, destAddr, int(sourcePort), int(destPort), int(length)) + log.Printf("[Packet flow] [%s] %v:%d -> %v:%d (%d bytes)", + protocolStr, sourceAddr, sourcePort, destAddr, destPort, length) + } +} diff --git a/device/registry.go b/device/registry.go index 2f21bf7..42d5644 100644 --- a/device/registry.go +++ b/device/registry.go @@ -4,30 +4,35 @@ import ( "net" "sync" - "github.com/PreetamJinka/cistern/net/snmp" + "github.com/PreetamJinka/snmp" + + "github.com/PreetamJinka/cistern/state/series" ) // A Registry is a device registry. type Registry struct { - // devices is a map from IP address string to Device. - devices map[string]*Device + // devices is a map from IP address (max 16 bytes) string to Device. + devices map[[16]byte]*Device sessionManager *snmp.SessionManager + outbound chan series.Observation + sync.Mutex } // NewRegistry creates a new device registry. -func NewRegistry() (*Registry, error) { +func NewRegistry(outbound chan series.Observation) (*Registry, error) { sessionManager, err := snmp.NewSessionManager() if err != nil { return nil, err } return &Registry{ - devices: map[string]*Device{}, + devices: map[[16]byte]*Device{}, sessionManager: sessionManager, + outbound: outbound, }, nil } @@ -51,7 +56,10 @@ func (r *Registry) NumDevices() int { } func (r *Registry) Lookup(address net.IP) (*Device, bool) { - dev, present := r.devices[address.String()] + byteAddr := [16]byte{} + copy(byteAddr[:], address.To16()) + + dev, present := r.devices[byteAddr] return dev, present } @@ -65,8 +73,12 @@ func (r *Registry) LookupOrAdd(address net.IP) *Device { return dev } - dev = NewDevice(address) - r.devices[address.String()] = dev + dev = NewDevice(address, r.outbound) + + byteAddr := [16]byte{} + copy(byteAddr[:], address.To16()) + + r.devices[byteAddr] = dev return dev } diff --git a/diagnostic.go b/diagnostic.go deleted file mode 100644 index 7397a95..0000000 --- a/diagnostic.go +++ /dev/null @@ -1,20 +0,0 @@ -package main - -import ( - "log" - "time" - - "github.com/PreetamJinka/cistern/state/metrics" -) - -func LogDiagnostics(hostRegistry *metrics.HostRegistry) { - log.Println("logging diagnostics") - - for _ = range time.Tick(30 * time.Second) { - hosts := hostRegistry.Hosts() - - log.Printf(`[DIAGNOSTIC] Num hosts: %d - %v`, len(hosts), hosts) - - } -} diff --git a/license.go b/license.go new file mode 100644 index 0000000..2530a6e --- /dev/null +++ b/license.go @@ -0,0 +1,143 @@ +package main + +const license = ` +Copyright (c) 2014 Preetam Jinka +All rights reserved. + +Redistribution and use in source and binary forms, with or without modification, +are permitted provided that the following conditions are met: + +1. Redistributions of source code must retain the above copyright notice, this +list of conditions and the following disclaimer. + +2. Redistributions in binary form must reproduce the above copyright notice, +this list of conditions and the following disclaimer in the documentation and/or +other materials provided with the distribution. + +3. Neither the name of the copyright holder nor the names of its contributors +may be used to endorse or promote products derived from this software without +specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR +ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON +ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +Cistern may contain open source software with the associated licenses. + +Go: + Copyright (c) 2012 The Go Authors. All rights reserved. + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions are + met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above + copyright notice, this list of conditions and the following disclaimer + in the documentation and/or other materials provided with the + distribution. + * Neither the name of Google Inc. nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +PreetamJinka/catena: + Copyright (c) 2015 Preetam Jinka + All rights reserved. + + Redistribution and use in source and binary forms, with or without modification, + are permitted provided that the following conditions are met: + + 1. Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + + 2. Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation and/or + other materials provided with the distribution. + + 3. Neither the name of the copyright holder nor the names of its contributors + may be used to endorse or promote products derived from this software without + specific prior written permission. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR + ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON + ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +PreetamJinka/proto, PreetamJinka/sflow, PreetamJinka/snmp: + Copyright (c) 2014 Preetam Jinka + All rights reserved. + + Redistribution and use in source and binary forms, with or without modification, + are permitted provided that the following conditions are met: + + 1. Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + + 2. Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation and/or + other materials provided with the distribution. + + 3. Neither the name of the copyright holder nor the names of its contributors + may be used to endorse or promote products derived from this software without + specific prior written permission. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR + ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON + ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +VividCortex/siesta: + The MIT License (MIT) + + Copyright (c) 2014 VividCortex + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in all + copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + SOFTWARE. +` diff --git a/net/proto b/net/proto deleted file mode 160000 index 8b06b0a..0000000 --- a/net/proto +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 8b06b0a6501c3e76bba1c9d7a70a9d9631a89f6a diff --git a/net/sflow b/net/sflow deleted file mode 160000 index f9e46b7..0000000 --- a/net/sflow +++ /dev/null @@ -1 +0,0 @@ -Subproject commit f9e46b71ca5cc52ecc2b64d5d9f67c88dc51b257 diff --git a/net/snmp b/net/snmp deleted file mode 160000 index 11ce5b4..0000000 --- a/net/snmp +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 11ce5b4dde2fb77fb244c3c70f6e881715a65bd3 diff --git a/pipeline/blackholeProcessor.go b/pipeline/blackholeProcessor.go deleted file mode 100644 index 7658795..0000000 --- a/pipeline/blackholeProcessor.go +++ /dev/null @@ -1,19 +0,0 @@ -package pipeline - -type BlackholeProcessor struct { - inbound chan Message -} - -func (b *BlackholeProcessor) SetInbound(inbound chan Message) { - b.inbound = inbound -} - -func (b *BlackholeProcessor) Process() { - for _ = range b.inbound { - // poof - } -} - -func (b *BlackholeProcessor) Outbound() chan Message { - return nil -} diff --git a/pipeline/genericInterfaceProcessor.go b/pipeline/genericInterfaceProcessor.go deleted file mode 100644 index 5fbc386..0000000 --- a/pipeline/genericInterfaceProcessor.go +++ /dev/null @@ -1,61 +0,0 @@ -package pipeline - -import ( - "fmt" - - "github.com/PreetamJinka/cistern/net/sflow" - "github.com/PreetamJinka/cistern/state/metrics" -) - -type GenericInterfaceCountersProcessor struct { - reg *metrics.HostRegistry - inbound chan Message - outbound chan Message -} - -func NewGenericInterfaceCountersProcessor(reg *metrics.HostRegistry) *GenericInterfaceCountersProcessor { - return &GenericInterfaceCountersProcessor{ - reg: reg, - outbound: make(chan Message, 4), - } -} - -func (p *GenericInterfaceCountersProcessor) SetInbound(inbound chan Message) { - p.inbound = inbound -} - -func (p *GenericInterfaceCountersProcessor) Outbound() chan Message { - return p.outbound -} - -func (p *GenericInterfaceCountersProcessor) Process() { - for message := range p.inbound { - - record := message.Record - registryKey := message.Source - - switch record.(type) { - case sflow.GenericInterfaceCounters: - c := record.(sflow.GenericInterfaceCounters) - - p.reg.Insert(registryKey, fmt.Sprintf("if%d.%s", c.Index, "octets_in"), metrics.TypeDerivative, c.InOctets) - p.reg.Insert(registryKey, fmt.Sprintf("if%d.%s", c.Index, "unicast_packets_in"), metrics.TypeDerivative, c.InUcastPkts) - p.reg.Insert(registryKey, fmt.Sprintf("if%d.%s", c.Index, "multicast_packets_in"), metrics.TypeDerivative, c.InMulticastPkts) - p.reg.Insert(registryKey, fmt.Sprintf("if%d.%s", c.Index, "broadcast_packets_in"), metrics.TypeDerivative, c.InBroadcastPkts) - p.reg.Insert(registryKey, fmt.Sprintf("if%d.%s", c.Index, "errors_in"), metrics.TypeDerivative, c.InErrors) - p.reg.Insert(registryKey, fmt.Sprintf("if%d.%s", c.Index, "unknown_proto_in"), metrics.TypeDerivative, c.InUnknownProtos) - - p.reg.Insert(registryKey, fmt.Sprintf("if%d.%s", c.Index, "octets_out"), metrics.TypeDerivative, c.OutOctets) - p.reg.Insert(registryKey, fmt.Sprintf("if%d.%s", c.Index, "unicast_packets_out"), metrics.TypeDerivative, c.OutUcastPkts) - p.reg.Insert(registryKey, fmt.Sprintf("if%d.%s", c.Index, "multicast_packets_out"), metrics.TypeDerivative, c.OutMulticastPkts) - p.reg.Insert(registryKey, fmt.Sprintf("if%d.%s", c.Index, "broadcast_packets_out"), metrics.TypeDerivative, c.OutBroadcastPkts) - p.reg.Insert(registryKey, fmt.Sprintf("if%d.%s", c.Index, "errors_out"), metrics.TypeDerivative, c.OutErrors) - - default: - select { - case p.outbound <- message: - default: - } - } - } -} diff --git a/pipeline/hostProcessor.go b/pipeline/hostProcessor.go deleted file mode 100644 index cc7d661..0000000 --- a/pipeline/hostProcessor.go +++ /dev/null @@ -1,97 +0,0 @@ -package pipeline - -import ( - "github.com/PreetamJinka/cistern/net/sflow" - "github.com/PreetamJinka/cistern/state/metrics" -) - -type HostProcessor struct { - reg *metrics.HostRegistry - inbound chan Message - outbound chan Message -} - -func NewHostProcessor(reg *metrics.HostRegistry) *HostProcessor { - return &HostProcessor{ - reg: reg, - outbound: make(chan Message, 4), - } -} - -func (h *HostProcessor) SetInbound(inbound chan Message) { - h.inbound = inbound -} - -func (h *HostProcessor) Outbound() chan Message { - return h.outbound -} - -func (h *HostProcessor) Process() { - for message := range h.inbound { - record := message.Record - registryKey := message.Source - - switch record.(type) { - case sflow.HostCpuCounters: - c := record.(sflow.HostCpuCounters) - - h.reg.Insert(registryKey, "cpu.user", metrics.TypeDerivative, c.CpuUser) - h.reg.Insert(registryKey, "cpu.nice", metrics.TypeDerivative, c.CpuNice) - h.reg.Insert(registryKey, "cpu.sys", metrics.TypeDerivative, c.CpuSys) - h.reg.Insert(registryKey, "cpu.idle", metrics.TypeDerivative, c.CpuIdle) - h.reg.Insert(registryKey, "cpu.wio", metrics.TypeDerivative, c.CpuWio) - h.reg.Insert(registryKey, "cpu.intr", metrics.TypeDerivative, c.CpuIntr) - h.reg.Insert(registryKey, "cpu.softintr", metrics.TypeDerivative, c.CpuSoftIntr) - - case sflow.HostMemoryCounters: - m := record.(sflow.HostMemoryCounters) - - h.reg.Insert(registryKey, "mem.total", metrics.TypeGauge, m.Total) - h.reg.Insert(registryKey, "mem.free", metrics.TypeGauge, m.Free) - h.reg.Insert(registryKey, "mem.shared", metrics.TypeGauge, m.Shared) - h.reg.Insert(registryKey, "mem.buffers", metrics.TypeGauge, m.Buffers) - h.reg.Insert(registryKey, "mem.cached", metrics.TypeGauge, m.Cached) - h.reg.Insert(registryKey, "mem.swap_total", metrics.TypeGauge, m.SwapTotal) - h.reg.Insert(registryKey, "mem.swap_free", metrics.TypeGauge, m.SwapFree) - - h.reg.Insert(registryKey, "mem.page_in", metrics.TypeDerivative, m.PageIn) - h.reg.Insert(registryKey, "mem.page_out", metrics.TypeDerivative, m.PageOut) - h.reg.Insert(registryKey, "mem.swap_in", metrics.TypeDerivative, m.SwapIn) - h.reg.Insert(registryKey, "mem.swap_out", metrics.TypeDerivative, m.SwapOut) - - case sflow.HostDiskCounters: - d := record.(sflow.HostDiskCounters) - - h.reg.Insert(registryKey, "disk.total", metrics.TypeGauge, d.Total) - h.reg.Insert(registryKey, "disk.free", metrics.TypeGauge, d.Free) - h.reg.Insert(registryKey, "disk.max_used", metrics.TypeGauge, d.MaxUsedPercent) - - h.reg.Insert(registryKey, "disk.reads", metrics.TypeDerivative, d.Reads) - h.reg.Insert(registryKey, "disk.bytes_read", metrics.TypeDerivative, d.BytesRead) - h.reg.Insert(registryKey, "disk.read_time", metrics.TypeDerivative, d.ReadTime) - - h.reg.Insert(registryKey, "disk.writes", metrics.TypeDerivative, d.Writes) - h.reg.Insert(registryKey, "disk.bytes_written", metrics.TypeDerivative, d.BytesWritten) - h.reg.Insert(registryKey, "disk.write_time", metrics.TypeDerivative, d.WriteTime) - - case sflow.HostNetCounters: - n := record.(sflow.HostNetCounters) - - h.reg.Insert(registryKey, "net.bytes_in", metrics.TypeDerivative, n.BytesIn) - h.reg.Insert(registryKey, "net.packets_in", metrics.TypeDerivative, n.PacketsIn) - h.reg.Insert(registryKey, "net.errs_in", metrics.TypeDerivative, n.ErrsIn) - h.reg.Insert(registryKey, "net.drops_in", metrics.TypeDerivative, n.DropsIn) - - h.reg.Insert(registryKey, "net.bytes_out", metrics.TypeDerivative, n.BytesOut) - h.reg.Insert(registryKey, "net.packets_out", metrics.TypeDerivative, n.PacketsOut) - h.reg.Insert(registryKey, "net.errs_out", metrics.TypeDerivative, n.ErrsOut) - h.reg.Insert(registryKey, "net.drops_out", metrics.TypeDerivative, n.DropsOut) - - default: - select { - case h.outbound <- message: - default: - } - } - } -} diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go deleted file mode 100644 index 0357837..0000000 --- a/pipeline/pipeline.go +++ /dev/null @@ -1,37 +0,0 @@ -package pipeline - -import ( - "log" - - "github.com/PreetamJinka/cistern/net/sflow" -) - -type Pipeline struct { - processors []PipelineProcessor -} - -func (p *Pipeline) Add(proc PipelineProcessor) { - p.processors = append(p.processors, proc) -} - -func (p *Pipeline) Run(inbound chan Message) { - log.Println("starting pipeline") - for _, proc := range p.processors { - proc.SetInbound(inbound) - inbound = proc.Outbound() - go proc.Process() - } - - go (&BlackholeProcessor{inbound: inbound}).Process() -} - -type Message struct { - Source string - Record sflow.Record -} - -type PipelineProcessor interface { - Process() - SetInbound(chan Message) - Outbound() chan Message -} diff --git a/pipeline/rawPacketProcessor.go b/pipeline/rawPacketProcessor.go deleted file mode 100644 index c72ebff..0000000 --- a/pipeline/rawPacketProcessor.go +++ /dev/null @@ -1,114 +0,0 @@ -package pipeline - -import ( - "log" - "net" - - "github.com/PreetamJinka/cistern/net/proto" - "github.com/PreetamJinka/cistern/net/sflow" - "github.com/PreetamJinka/cistern/state/metrics" -) - -type RawPacketProcessor struct { - reg *metrics.HostRegistry - inbound chan Message - outbound chan Message -} - -func NewRawPacketProcessor(reg *metrics.HostRegistry) *RawPacketProcessor { - return &RawPacketProcessor{ - reg: reg, - outbound: make(chan Message, 16), - } -} - -func (p *RawPacketProcessor) SetInbound(inbound chan Message) { - p.inbound = inbound -} - -func (p *RawPacketProcessor) Outbound() chan Message { - return p.outbound -} - -func (p *RawPacketProcessor) Process() { - for message := range p.inbound { - record := message.Record - - switch record.(type) { - case sflow.RawPacketFlow: - log.Println("received raw packet flow record") - - rawFlow := record.(sflow.RawPacketFlow) - sampleBytes := rawFlow.Header - - ethernetPacket := proto.DecodeEthernet(sampleBytes) - - var ( - protocol uint8 - protocolStr = "" - - sourceAddr net.IP - destAddr net.IP - - sourcePort uint16 - destPort uint16 - - length uint16 - ) - - var ipPayload []byte - - switch ethernetPacket.EtherType { - case 0x0800: - ipv4Packet := proto.DecodeIPv4(ethernetPacket.Payload) - - sourceAddr = ipv4Packet.Source - destAddr = ipv4Packet.Destination - ipPayload = ipv4Packet.Payload - - protocol = ipv4Packet.Protocol - - length = ipv4Packet.Length - - case 0x86dd: - ipv6Packet := proto.DecodeIPv6(ethernetPacket.Payload) - - sourceAddr = ipv6Packet.Source - destAddr = ipv6Packet.Destination - ipPayload = ipv6Packet.Payload - - protocol = ipv6Packet.NextHeader - - length = ipv6Packet.Length - } - - switch protocol { - case 0x6: - tcpPacket := proto.DecodeTCP(ipPayload) - - sourcePort = tcpPacket.SourcePort - destPort = tcpPacket.DestinationPort - - protocolStr = "TCP" - - case 0x11: - udpPacket := proto.DecodeUDP(ipPayload) - - sourcePort = udpPacket.SourcePort - destPort = udpPacket.DestinationPort - - protocolStr = "UDP" - } - - if sourcePort+destPort > 0 { - log.Printf("[%s] %v:%d -> %v:%d (%d bytes)", protocolStr, sourceAddr, sourcePort, destAddr, destPort, length) - } - - default: - select { - case p.outbound <- message: - default: - } - } - } -} diff --git a/state/flows/flows.go b/state/flows/flows.go new file mode 100644 index 0000000..ce22405 --- /dev/null +++ b/state/flows/flows.go @@ -0,0 +1,189 @@ +package flows + +import ( + "bytes" + "net" + "sort" + "sync" + "time" + + "github.com/PreetamJinka/cistern/state/metrics" +) + +const ( + FlowProtocolTCP = "TCP" + FlowProtocolUDP = "UDP" + + alpha float32 = 1 +) + +type Flow struct { + Source net.IP `json:"source"` + Destination net.IP `json:"destination"` + SourcePort int `json:"sourcePort"` + DestinationPort int `json:"destinationPort"` + + Protocol string `json:"protocol"` + + bytesDerivative metrics.MetricState + packetsDerivative metrics.MetricState + + bytes uint64 + packets uint64 + + BytesPerSecond float32 `json:"bytesPerSecond"` + PacketsPerSecond float32 `json:"packetsPerSecond"` + + lastUpdated time.Time +} + +type flowKey struct { + protocol string + addr1 [16]byte + port1 int + addr2 [16]byte + port2 int +} + +func toFlowKey(protocol string, source, destination net.IP, sourcePort, destPort int) flowKey { + source = source.To16() + destination = destination.To16() + + key := flowKey{ + protocol: protocol, + } + + if bytes.Compare([]byte(source), []byte(destination)) < 0 { + copy(key.addr1[:], source) + copy(key.addr2[:], destination) + key.port1 = sourcePort + key.port2 = destPort + return key + } + + copy(key.addr1[:], destination) + copy(key.addr2[:], source) + key.port1 = destPort + key.port2 = sourcePort + return key +} + +type TopTalkers struct { + flows map[flowKey]Flow + + lock sync.Mutex +} + +func NewTopTalkers(window time.Duration) *TopTalkers { + toptalkers := &TopTalkers{ + flows: map[flowKey]Flow{}, + } + + go func() { + for _ = range time.Tick(window) { + toptalkers.compact(window) + } + }() + + return toptalkers +} + +func (t *TopTalkers) compact(window time.Duration) { + t.lock.Lock() + defer t.lock.Unlock() + + timeout := time.Now().Add(-window) + + for key, flow := range t.flows { + if flow.lastUpdated.Before(timeout) { + delete(t.flows, key) + } + } +} + +func (t *TopTalkers) Update(protocol string, + source, destination net.IP, sourcePort, destPort int, bytes int) { + t.lock.Lock() + defer t.lock.Unlock() + + key := toFlowKey(protocol, source, destination, sourcePort, destPort) + var flow Flow + + var present bool + if flow, present = t.flows[key]; !present { + flow.Source = source + flow.Destination = destination + flow.SourcePort = sourcePort + flow.DestinationPort = destPort + flow.bytesDerivative = metrics.DerivativeState{} + flow.packetsDerivative = metrics.DerivativeState{} + flow.Protocol = protocol + } + + seenBytes := uint64(bytes) + flow.bytes + flow.bytesDerivative = flow.bytesDerivative.Update(seenBytes) + flow.BytesPerSecond = flow.bytesDerivative.Value() + flow.bytes = seenBytes + + seenPackets := flow.packets + 1 + flow.packetsDerivative = flow.packetsDerivative.Update(seenPackets) + flow.PacketsPerSecond = flow.packetsDerivative.Value() + flow.packets = seenPackets + + flow.lastUpdated = time.Now() + t.flows[key] = flow +} + +func (t *TopTalkers) ByBytes() []Flow { + t.lock.Lock() + defer t.lock.Unlock() + + flows := make([]Flow, 0, len(t.flows)) + for _, flow := range t.flows { + flows = append(flows, flow) + } + + sort.Sort(sort.Reverse(byBytes(flows))) + return flows +} + +func (t *TopTalkers) ByPackets() []Flow { + t.lock.Lock() + defer t.lock.Unlock() + + flows := make([]Flow, 0, len(t.flows)) + for _, flow := range t.flows { + flows = append(flows, flow) + } + + sort.Sort(sort.Reverse(byPackets(flows))) + return flows +} + +type byBytes []Flow + +func (f byBytes) Len() int { + return len(f) +} + +func (f byBytes) Less(i, j int) bool { + return f[i].BytesPerSecond < f[j].BytesPerSecond +} + +func (f byBytes) Swap(i, j int) { + f[i], f[j] = f[j], f[i] +} + +type byPackets []Flow + +func (f byPackets) Len() int { + return len(f) +} + +func (f byPackets) Less(i, j int) bool { + return f[i].PacketsPerSecond < f[j].PacketsPerSecond +} + +func (f byPackets) Swap(i, j int) { + f[i], f[j] = f[j], f[i] +} diff --git a/state/metrics/hostRegistry.go b/state/metrics/hostRegistry.go deleted file mode 100644 index 355fbaa..0000000 --- a/state/metrics/hostRegistry.go +++ /dev/null @@ -1,116 +0,0 @@ -package metrics - -import ( - "errors" - "sync" - "time" - - "github.com/PreetamJinka/catena" - "github.com/PreetamJinka/cistern/state/series" -) - -var ( - ErrUnknownHost = errors.New("metrics: unknown host") -) - -type HostRegistry struct { - lock sync.RWMutex - hosts map[string]*MetricRegistry -} - -func NewHostRegistry() *HostRegistry { - return &HostRegistry{ - lock: sync.RWMutex{}, - hosts: make(map[string]*MetricRegistry), - } -} - -func (h *HostRegistry) Insert(host string, metric string, metricType MetricType, value interface{}) error { - h.lock.Lock() - defer h.lock.Unlock() - - metricRegistry, present := h.hosts[host] - if !present { - h.hosts[host] = NewMetricRegistry() - metricRegistry = h.hosts[host] - } - - return metricRegistry.Update(metric, metricType, value) -} - -// Hosts returns a slice of all of the hosts listed -// in the registry -func (h *HostRegistry) Hosts() []string { - h.lock.RLock() - defer h.lock.RUnlock() - - hosts := []string{} - - for host := range h.hosts { - hosts = append(hosts, host) - } - - return hosts -} - -// Metrics returns a slice of all of the metrics -// for a specific host -func (h *HostRegistry) Metrics(host string) []string { - h.lock.RLock() - defer h.lock.RUnlock() - - metrics := []string{} - - if metricReg := h.hosts[host]; metricReg != nil { - for metric := range h.hosts[host].metrics { - metrics = append(metrics, metric) - } - } - - return metrics -} - -func (h *HostRegistry) MetricStates(host string, metrics ...string) []float32 { - h.lock.RLock() - defer h.lock.RUnlock() - - values := []float32{} - - if metricReg := h.hosts[host]; metricReg != nil { - for _, metric := range metrics { - values = append(values, metricReg.Get(metric)) - } - } - - return values -} - -func (h *HostRegistry) RunSnapshotter(engine *series.Engine) { - now := time.Now() - - <-time.After(now.Add(time.Minute).Truncate(time.Minute).Sub(now)) - - for now := range time.Tick(time.Minute) { - h.lock.RLock() - - rows := catena.Rows{} - - for host, metricReg := range h.hosts { - for metric, metricState := range metricReg.metrics { - metricVal := metricState.Value() - if metricVal == metricVal { - rows = append(rows, catena.Row{ - Source: host, - Metric: metric, - Timestamp: now.Unix(), - Value: float64(metricVal), - }) - } - } - } - - h.lock.RUnlock() - - engine.InsertRows(rows) - } -} diff --git a/state/metrics/metricRegistry.go b/state/metrics/metricRegistry.go index bc0585e..c81826a 100644 --- a/state/metrics/metricRegistry.go +++ b/state/metrics/metricRegistry.go @@ -4,6 +4,11 @@ import ( "math" ) +type MetricDefinition struct { + Name string `json:"name"` + Type MetricType `json:"type"` +} + type MetricRegistry struct { metrics map[string]MetricState } @@ -14,7 +19,7 @@ func NewMetricRegistry() *MetricRegistry { } } -func (m *MetricRegistry) Update(metric string, metricType MetricType, value interface{}) error { +func (m *MetricRegistry) Update(metric string, metricType MetricType, value interface{}) float32 { state, present := m.metrics[metric] if !present { switch metricType { @@ -27,7 +32,7 @@ func (m *MetricRegistry) Update(metric string, metricType MetricType, value inte m.metrics[metric] = state.Update(value) - return nil + return state.Value() } func (m *MetricRegistry) Get(metric string) float32 { @@ -38,3 +43,16 @@ func (m *MetricRegistry) Get(metric string) float32 { return state.Value() } + +func (m *MetricRegistry) Metrics() []MetricDefinition { + metrics := []MetricDefinition{} + + for metric, state := range m.metrics { + metrics = append(metrics, MetricDefinition{ + Name: metric, + Type: state.Type(), + }) + } + + return metrics +} diff --git a/state/series/engine.go b/state/series/engine.go index ee621d6..4350885 100644 --- a/state/series/engine.go +++ b/state/series/engine.go @@ -1,11 +1,15 @@ package series import ( + "log" + "time" + "github.com/PreetamJinka/catena" ) type Engine struct { *catena.DB + Inbound chan Observation } func NewEngine(baseDir string) (*Engine, error) { @@ -18,7 +22,49 @@ func NewEngine(baseDir string) (*Engine, error) { } } - return &Engine{ - db, - }, nil + engine := &Engine{ + DB: db, + Inbound: make(chan Observation, 512), + } + + go engine.handleInbound() + + return engine, nil +} + +func (engine *Engine) handleInbound() { + log.Println("[Series engine] Handling inbound observations") + + var wait = time.After(time.Second * 5) + var buffer = make([]Observation, 0, 512) + + for { + select { + case <-wait: + if len(buffer) > 0 { + engine.writeObservations(buffer) + buffer = buffer[:0] + } + + wait = time.After(time.Second * 5) + + case obs := <-engine.Inbound: + buffer = append(buffer, obs) + if len(buffer) == 512 { + engine.writeObservations(buffer) + buffer = buffer[:0] + wait = time.After(time.Second * 5) + } + } + } +} + +func (engine *Engine) writeObservations(obs []Observation) { + log.Printf("[Series engine] Writing %d observations", len(obs)) + + rows := make(catena.Rows, len(obs)) + for i, observation := range obs { + rows[i] = catena.Row(observation) + } + engine.DB.InsertRows(rows) } diff --git a/state/series/observation.go b/state/series/observation.go new file mode 100644 index 0000000..4661962 --- /dev/null +++ b/state/series/observation.go @@ -0,0 +1,6 @@ +package series + +import "github.com/PreetamJinka/catena" + +type Observation catena.Row +type Observations catena.Rows diff --git a/version.go b/version.go index e09062f..6f83df5 100644 --- a/version.go +++ b/version.go @@ -1,3 +1,3 @@ package main -const version = "0.0.1-rev5" +const version = "0.0.1-rev6"