Skip to content

Commit

Permalink
improved shutdown and metadata refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
glaslos committed Oct 22, 2023
1 parent 3de2326 commit 42f9004
Show file tree
Hide file tree
Showing 22 changed files with 134 additions and 180 deletions.
5 changes: 2 additions & 3 deletions app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,7 @@ func main() {
}
exitMtx.Lock()
fmt.Println("\nshutting down...")
if err := gtn.Shutdown(); err != nil {
log.Fatal(err)
}
gtn.Shutdown()
}
defer exit()

Expand All @@ -78,6 +76,7 @@ func main() {
go func() {
<-sig
exit()
fmt.Println("\nleaving...")
os.Exit(0)
}()

Expand Down
20 changes: 10 additions & 10 deletions connection/conntable.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,49 +50,49 @@ type Metadata struct {
}

type ConnTable struct {
table map[CKey]*Metadata
table map[CKey]Metadata
mtx sync.RWMutex
}

func New() *ConnTable {
ct := &ConnTable{
table: make(map[CKey]*Metadata, 1024),
table: make(map[CKey]Metadata, 1024),
}
return ct
}

// RegisterConn a connection in the table
func (t *ConnTable) RegisterConn(conn net.Conn, rule *rules.Rule) (*Metadata, error) {
func (t *ConnTable) RegisterConn(conn net.Conn, rule *rules.Rule) (Metadata, error) {
srcIP, srcPort, err := net.SplitHostPort(conn.RemoteAddr().String())
if err != nil {
return nil, fmt.Errorf("failed to split remote address: %w", err)
return Metadata{}, fmt.Errorf("failed to split remote address: %w", err)
}

_, dstPort, err := net.SplitHostPort(conn.LocalAddr().String())
if err != nil {
return nil, fmt.Errorf("failed to split local address: %w", err)
return Metadata{}, fmt.Errorf("failed to split local address: %w", err)
}
port, err := strconv.Atoi(dstPort)
if err != nil {
return nil, fmt.Errorf("failed to parse dstPort: %w", err)
return Metadata{}, fmt.Errorf("failed to parse dstPort: %w", err)
}
return t.Register(srcIP, srcPort, uint16(port), rule)
}

// Register a connection in the table
func (t *ConnTable) Register(srcIP, srcPort string, dstPort uint16, rule *rules.Rule) (*Metadata, error) {
func (t *ConnTable) Register(srcIP, srcPort string, dstPort uint16, rule *rules.Rule) (Metadata, error) {
t.mtx.Lock()
defer t.mtx.Unlock()

ck, err := NewConnKeyByString(srcIP, srcPort)
if err != nil {
return nil, err
return Metadata{}, err
}
if md, ok := t.table[ck]; ok {
return md, nil
}

md := &Metadata{
md := Metadata{
Added: time.Now(),
TargetPort: dstPort,
Rule: rule,
Expand All @@ -115,7 +115,7 @@ func (t *ConnTable) FlushOlderThan(s time.Duration) {
}

// TODO: what happens when I return a *Metadata and then FlushOlderThan() deletes it?
func (t *ConnTable) Get(ck CKey) *Metadata {
func (t *ConnTable) Get(ck CKey) Metadata {
t.mtx.RLock()
defer t.mtx.RUnlock()
return t.table[ck]
Expand Down
44 changes: 26 additions & 18 deletions glutton.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package glutton

import (
"context"
"errors"
"fmt"
"io"
"net"
Expand Down Expand Up @@ -181,6 +180,11 @@ func (g *Glutton) Start() error {
go g.udpListen()

for {
select {
case <-g.ctx.Done():
return nil
default:
}
conn, err := g.Server.tcpListener.Accept()
if err != nil {
return err
Expand All @@ -194,18 +198,21 @@ func (g *Glutton) Start() error {
rule = &rules.Rule{Target: "default"}
}

if _, err := g.connTable.RegisterConn(conn, rule); err != nil {
md, err := g.connTable.RegisterConn(conn, rule)
if err != nil {
return err
}

g.Logger.Debug("new connection", zap.String("addr", conn.LocalAddr().String()), zap.String("handler", rule.Target))

if err := g.UpdateConnectionTimeout(g.ctx, conn); err != nil {
g.Logger.Error("failed to set connection timeout", zap.Error(err))
}

if hfunc, ok := g.tcpProtocolHandlers[rule.Target]; ok {
go func() {
if err := hfunc(g.ctx, conn); err != nil {
g.Logger.Error("failed to handle TCP connection", zap.Error(err))
if err := hfunc(g.ctx, conn, md); err != nil {
g.Logger.Error("failed to handle TCP connection", zap.Error(err), zap.String("handler", rule.Target))
}
}()
}
Expand Down Expand Up @@ -257,24 +264,21 @@ func (g *Glutton) UpdateConnectionTimeout(ctx context.Context, conn net.Conn) er
}

// ConnectionByFlow returns connection metadata by connection key
func (g *Glutton) ConnectionByFlow(ckey [2]uint64) *connection.Metadata {
func (g *Glutton) ConnectionByFlow(ckey [2]uint64) connection.Metadata {
return g.connTable.Get(ckey)
}

// MetadataByConnection returns connection metadata by connection
func (g *Glutton) MetadataByConnection(conn net.Conn) (*connection.Metadata, error) {
func (g *Glutton) MetadataByConnection(conn net.Conn) (connection.Metadata, error) {
host, port, err := net.SplitHostPort(conn.RemoteAddr().String())
if err != nil {
return nil, fmt.Errorf("faild to split remote address: %w", err)
return connection.Metadata{}, fmt.Errorf("faild to split remote address: %w", err)
}
ckey, err := connection.NewConnKeyByString(host, port)
if err != nil {
return nil, err
return connection.Metadata{}, err
}
md := g.ConnectionByFlow(ckey)
if md == nil {
return nil, errors.New("not found")
}
return md, nil
}

Expand All @@ -285,15 +289,15 @@ func (g *Glutton) sanitizePayload(payload []byte) []byte {
return payload
}

func (g *Glutton) ProduceTCP(handler string, conn net.Conn, md *connection.Metadata, payload []byte, decoded interface{}) error {
func (g *Glutton) ProduceTCP(handler string, conn net.Conn, md connection.Metadata, payload []byte, decoded interface{}) error {
if g.Producer != nil {
payload = g.sanitizePayload(payload)
return g.Producer.LogTCP(handler, conn, md, payload, decoded)
}
return nil
}

func (g *Glutton) ProduceUDP(handler string, srcAddr, dstAddr *net.UDPAddr, md *connection.Metadata, payload []byte, decoded interface{}) error {
func (g *Glutton) ProduceUDP(handler string, srcAddr, dstAddr *net.UDPAddr, md connection.Metadata, payload []byte, decoded interface{}) error {
if g.Producer != nil {
payload = g.sanitizePayload(payload)
return g.Producer.LogUDP("udp", srcAddr, dstAddr, md, payload, decoded)
Expand All @@ -302,19 +306,23 @@ func (g *Glutton) ProduceUDP(handler string, srcAddr, dstAddr *net.UDPAddr, md *
}

// Shutdown the packet processor
func (g *Glutton) Shutdown() error {
func (g *Glutton) Shutdown() {
defer g.Logger.Sync()
g.cancel() // close all connection

g.Logger.Info("Shutting down listeners")
if err := g.Server.Shutdown(); err != nil {
g.Logger.Error("failed to shutdown server", zap.Error(err))
}

if err := flushTProxyIPTables(viper.GetString("interface"), g.publicAddrs[0].String(), "tcp", uint32(g.Server.tcpPort)); err != nil {
return err
g.Logger.Error("failed to drop tcp iptables", zap.Error(err))
}
if err := flushTProxyIPTables(viper.GetString("interface"), g.publicAddrs[0].String(), "udp", uint32(g.Server.udpPort)); err != nil {
return err
g.Logger.Error("failed to drop udp iptables", zap.Error(err))
}

g.Logger.Info("Shutting down processor")
return g.Server.Shutdown()
g.Logger.Info("All done")
}

func (g *Glutton) applyRules(network string, srcAddr, dstAddr net.Addr) (*rules.Rule, error) {
Expand Down
24 changes: 10 additions & 14 deletions producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type Event struct {
Decoded interface{} `json:"decoded,omitempty"`
}

func makeEventTCP(handler string, conn net.Conn, md *connection.Metadata, payload []byte, decoded interface{}, sensorID string) (*Event, error) {
func makeEventTCP(handler string, conn net.Conn, md connection.Metadata, payload []byte, decoded interface{}, sensorID string) (*Event, error) {
host, port, err := net.SplitHostPort(conn.RemoteAddr().String())
if err != nil {
return nil, err
Expand All @@ -62,22 +62,20 @@ func makeEventTCP(handler string, conn net.Conn, md *connection.Metadata, payloa
Transport: "tcp",
SrcHost: host,
SrcPort: port,
DstPort: uint16(md.TargetPort),
SensorID: sensorID,
Handler: handler,
Payload: base64.StdEncoding.EncodeToString(payload),
Scanner: scannerName,
Decoded: decoded,
}
if md != nil {
event.DstPort = uint16(md.TargetPort)
if md.Rule != nil {
event.Rule = md.Rule.String()
}
if md.Rule != nil {
event.Rule = md.Rule.String()
}
return &event, nil
}

func makeEventUDP(handler string, srcAddr, dstAddr *net.UDPAddr, md *connection.Metadata, payload []byte, decoded interface{}, sensorID string) (*Event, error) {
func makeEventUDP(handler string, srcAddr, dstAddr *net.UDPAddr, md connection.Metadata, payload []byte, decoded interface{}, sensorID string) (*Event, error) {
_, scannerName, err := scanner.IsScanner(net.ParseIP(srcAddr.IP.String()))
if err != nil {
return nil, err
Expand All @@ -88,17 +86,15 @@ func makeEventUDP(handler string, srcAddr, dstAddr *net.UDPAddr, md *connection.
Transport: "udp",
SrcHost: srcAddr.IP.String(),
SrcPort: strconv.Itoa(int(srcAddr.AddrPort().Port())),
DstPort: uint16(md.TargetPort),
SensorID: sensorID,
Handler: handler,
Payload: base64.StdEncoding.EncodeToString(payload),
Scanner: scannerName,
Decoded: decoded,
}
if md != nil {
event.DstPort = uint16(md.TargetPort)
if md.Rule != nil {
event.Rule = md.Rule.String()
}
if md.Rule != nil {
event.Rule = md.Rule.String()
}
return &event, nil
}
Expand Down Expand Up @@ -131,7 +127,7 @@ func New(sensorID string) (*Producer, error) {
}

// LogTCP is a meta caller for all producers
func (p *Producer) LogTCP(handler string, conn net.Conn, md *connection.Metadata, payload []byte, decoded interface{}) error {
func (p *Producer) LogTCP(handler string, conn net.Conn, md connection.Metadata, payload []byte, decoded interface{}) error {
event, err := makeEventTCP(handler, conn, md, payload, decoded, p.sensorID)
if err != nil {
return err
Expand All @@ -150,7 +146,7 @@ func (p *Producer) LogTCP(handler string, conn net.Conn, md *connection.Metadata
}

// LogUDP is a meta caller for all producers
func (p *Producer) LogUDP(handler string, srcAddr, dstAddr *net.UDPAddr, md *connection.Metadata, payload []byte, decoded interface{}) error {
func (p *Producer) LogUDP(handler string, srcAddr, dstAddr *net.UDPAddr, md connection.Metadata, payload []byte, decoded interface{}) error {
event, err := makeEventUDP(handler, srcAddr, dstAddr, md, payload, decoded, p.sensorID)
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions producer/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ func TestProducerLog(t *testing.T) {

viper.Set("producers.http.remote", svr.URL)

err = p.LogTCP("test", conn, &md, []byte{123}, nil)
err = p.LogTCP("test", conn, md, []byte{123}, nil)
require.NoError(t, err)

err = p.LogUDP("test", &net.UDPAddr{}, &net.UDPAddr{}, &md, []byte{123}, nil)
err = p.LogUDP("test", &net.UDPAddr{}, &net.UDPAddr{}, md, []byte{123}, nil)
require.NoError(t, err)
}
8 changes: 4 additions & 4 deletions protocols/interfaces/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ type Logger interface {
}

type Honeypot interface {
ProduceTCP(protocol string, conn net.Conn, md *connection.Metadata, payload []byte, decoded interface{}) error
ProduceUDP(handler string, srcAddr, dstAddr *net.UDPAddr, md *connection.Metadata, payload []byte, decoded interface{}) error
ConnectionByFlow([2]uint64) *connection.Metadata
ProduceTCP(protocol string, conn net.Conn, md connection.Metadata, payload []byte, decoded interface{}) error
ProduceUDP(handler string, srcAddr, dstAddr *net.UDPAddr, md connection.Metadata, payload []byte, decoded interface{}) error
ConnectionByFlow([2]uint64) connection.Metadata
UpdateConnectionTimeout(ctx context.Context, conn net.Conn) error
MetadataByConnection(net.Conn) (*connection.Metadata, error)
MetadataByConnection(net.Conn) (connection.Metadata, error)
}
Loading

0 comments on commit 42f9004

Please sign in to comment.