Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use BPF ringbuf to send collected data to user-space #10

Merged
merged 3 commits into from
May 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
# Prerequisites

- amd64 (x86_64)
- Linux Kernel 5.8+ since `gmon` uses [BPF ring buffer](https://nakryiko.com/posts/bpf-ringbuf/)

# Usage

Expand Down
24 changes: 7 additions & 17 deletions ebpf/bpf_x86_bpfel.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

39 changes: 23 additions & 16 deletions ebpf/c/gmon.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,18 @@ int runtime_newproc1(struct pt_regs *ctx) {
bpf_printk("%s:%d | failed to read stackid\n", __FILE__, __LINE__);
return 0;
}
struct newproc1_event_key key = {
.goroutine_id = goid,
.ktime = bpf_ktime_get_ns(),
};
struct newproc1_event event = {
.stack_id = stack_id,
};
bpf_map_update_elem(&newproc1_events, &key, &event, BPF_ANY);

struct event *ev;
ev = bpf_ringbuf_reserve(&events, sizeof(*ev), 0);
if (!ev) {
bpf_printk("%s:%d | failed to reserve ringbuf\n", __FILE__, __LINE__);
return 0;
}
ev->goroutine_id = goid;
ev->stack_id = stack_id;
ev->exit = false;
bpf_ringbuf_submit(ev, 0);

return 0;
}

Expand All @@ -65,14 +69,17 @@ int runtime_goexit1(struct pt_regs *ctx) {
return 0;
}

struct goexit1_event_key key = {
.goroutine_id = go_id,
.ktime = bpf_ktime_get_ns(),
};
struct goexit1_event event = {
.stack_id = stack_id,
};
bpf_map_update_elem(&goexit1_events, &key, &event, BPF_ANY);
struct event *ev;
ev = bpf_ringbuf_reserve(&events, sizeof(*ev), 0);
if (!ev) {
bpf_printk("%s:%d | failed to reserve ringbuf\n", __FILE__, __LINE__);
return 0;
}
ev->goroutine_id = go_id;
ev->stack_id = stack_id;
ev->exit = true;
bpf_ringbuf_submit(ev, 0);

return 0;
}

Expand Down
26 changes: 7 additions & 19 deletions ebpf/c/maps.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,24 @@
__type(value, _value_type); \
} _name SEC(".maps");

#define BPF_HASH(_name, _key_type, _value_type, _max_entries) \
BPF_MAP(_name, BPF_MAP_TYPE_HASH, _key_type, _value_type, _max_entries)

// stack traces: the value is 1 big byte array of the stack addresses
typedef __u64 stack_trace_t[MAX_STACK_DEPTH];
#define BPF_STACK_TRACE(_name, _max_entries) \
BPF_MAP(_name, BPF_MAP_TYPE_STACK_TRACE, u32, stack_trace_t, _max_entries)

BPF_STACK_TRACE(stack_addresses, MAX_STACK_ADDRESSES); // store stack traces

struct newproc1_event_key {
int64_t goroutine_id;
uint64_t ktime; // To make this struct unique
};
struct {
__uint(type, BPF_MAP_TYPE_RINGBUF);
__uint(max_entries, 1 << 24);
} events SEC(".maps");

struct newproc1_event {
int stack_id;
};

BPF_HASH(newproc1_events, struct newproc1_event_key, struct newproc1_event, 10240);

struct goexit1_event_key {
struct event {
int64_t goroutine_id;
uint64_t ktime; // To make this struct unique
};

struct goexit1_event {
int stack_id;
bool exit;
};

BPF_HASH(goexit1_events, struct goexit1_event_key, struct goexit1_event, 10240);
struct event *unused __attribute__((unused));

#endif /* __MAPS_H__ */
158 changes: 48 additions & 110 deletions ebpf/event_handler.go
Original file line number Diff line number Diff line change
@@ -1,56 +1,84 @@
package ebpf

import (
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
"log/slog"
"runtime/trace"
"strconv"
"time"

"github.com/cilium/ebpf"
"github.com/cilium/ebpf/ringbuf"
"github.com/go-delve/delve/pkg/proc"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/keisku/gmon/bininfo"
)

type eventHandler struct {
goroutineQueue chan<- goroutine
objs *bpfObjects
biTranslator bininfo.Translator
reader *ringbuf.Reader
}

func (h *eventHandler) run(ctx context.Context) {
ticker := time.NewTicker(200 * time.Millisecond)
var event bpfEvent
stackIdCache, _ := lru.New[int32, struct{}](16)
for {
select {
case <-ctx.Done():
ticker.Stop()
return
case <-ticker.C:
ctx, task := trace.NewTask(ctx, "event_handler.handle")
trace.WithRegion(ctx, "event_handler.handle_newproc1", func() {
if err := h.handleNewproc1(); err != nil {
slog.Warn("Failed to handle newproc1", slog.Any("error", err))
}
})
trace.WithRegion(ctx, "event_handler.handle_goexit1", func() {
if err := h.handleGoexit1(); err != nil {
slog.Warn("Failed to handle goexit1", slog.Any("error", err))
}
})
task.End()
if err := h.readRecord(ctx, &event); err != nil {
if errors.Is(err, ringbuf.ErrClosed) {
slog.Debug("ring buffer is closed")
return
}
slog.Warn("Failed to read bpf ring buffer", slog.Any("error", err))
continue
}
stack, err := h.lookupStack(ctx, event.StackId)
if err != nil {
slog.Warn(err.Error())
continue
}
h.sendGoroutine(goroutine{
Id: event.GoroutineId,
ObservedAt: time.Now(),
Stack: stack,
Exit: event.Exit,
})
contains, _ := stackIdCache.ContainsOrAdd(event.StackId, struct{}{})
if !contains {
slog.Debug("delete stack_addresses", slog.Int("stack_id", int(event.StackId)))
if err := h.objs.StackAddresses.Delete(event.StackId); err != nil {
slog.Debug("Failed to delete stack_addresses", slog.Any("error", err))
}
}
}
}

func (h *eventHandler) readRecord(ctx context.Context, event *bpfEvent) error {
_, task := trace.NewTask(ctx, "event_handler.read_ring_buffer")
defer task.End()
record, err := h.reader.Read()
if err != nil {
return err
}
if err := binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, event); err != nil {
return fmt.Errorf("decode ring buffer record: %w", err)
}
return nil
}

// lookupStack is a copy of the function in tracee.
// https://github.com/aquasecurity/tracee/blob/f61866b4e2277d2a7dddc6cd77a67cd5a5da3b14/pkg/ebpf/events_pipeline.go#L642-L681
const maxStackDepth = 20

var stackFrameSize = (strconv.IntSize / 8)

func (h *eventHandler) lookupStack(stackId int32) ([]*proc.Function, error) {
func (h *eventHandler) lookupStack(ctx context.Context, stackId int32) ([]*proc.Function, error) {
_, task := trace.NewTask(ctx, "event_handler.lookup_stack")
defer task.End()
stackBytes, err := h.objs.StackAddresses.LookupBytes(stackId)
if err != nil {
return nil, fmt.Errorf("failed to lookup stack addresses: %w", err)
Expand Down Expand Up @@ -83,38 +111,6 @@ func (h *eventHandler) lookupStack(stackId int32) ([]*proc.Function, error) {
return stack[0:stackCounter], nil
}

func (h *eventHandler) handle(
stackAddrs, eventMap *ebpf.Map,
// stackIdSet is the set of stack_id to delete later.
// keysToDelete is the slice of eBPF map keys to delete later.
// keyLength holds the count of keys in keysToDelete to determine if BatchDelete is required.
processMap func(iter *ebpf.MapIterator, stackIdSet map[int32]struct{}) (keysToDelete any, keyLength int),
) error {
stackIdSetToDelete := make(map[int32]struct{})
mapIter := eventMap.Iterate()
keysToDelete, keyLength := processMap(mapIter, stackIdSetToDelete)
if err := mapIter.Err(); err != nil {
return fmt.Errorf("failed to iterate eBPF map: %w", err)
}
if 0 < keyLength {
if n, err := eventMap.BatchDelete(keysToDelete, nil); err == nil {
slog.Debug("Deleted eBPF map", slog.Int("deleted", n), slog.Int("expected", keyLength))
} else {
slog.Warn("Failed to delete eBPF map", slog.Any("err", err))
}
}
// Don't use BatchDelete for stack addresses because the opration is not supported.
// If we do it, we will see "batch delete: not supported" error.
for stackId := range stackIdSetToDelete {
if err := stackAddrs.Delete(stackId); err != nil {
slog.Debug("Failed to delete stack_addresses", slog.Any("error", err))
continue
}
slog.Debug("Deleted stack address map", slog.Int("stack_id", int(stackId)))
}
return nil
}

func (h *eventHandler) sendGoroutine(g goroutine) {
maxRetries := 3
retryInterval := 10 * time.Millisecond
Expand Down Expand Up @@ -145,61 +141,3 @@ func (h *eventHandler) sendGoroutine(g goroutine) {
}
}
}

func (h *eventHandler) handleNewproc1() error {
var key bpfNewproc1EventKey
var value bpfNewproc1Event
var keysToDelete []bpfNewproc1EventKey

return h.handle(
h.objs.StackAddresses,
h.objs.Newproc1Events,
func(mapIter *ebpf.MapIterator, stackIdSet map[int32]struct{}) (any, int) {
for mapIter.Next(&key, &value) {
stack, err := h.lookupStack(value.StackId)
if err != nil {
slog.Warn(err.Error())
continue
}
stackIdSet[value.StackId] = struct{}{}
keysToDelete = append(keysToDelete, key)
h.sendGoroutine(goroutine{
Id: key.GoroutineId,
ObservedAt: time.Now(),
Stack: stack,
Exit: false,
})
}
return keysToDelete, len(keysToDelete)
},
)
}

func (h *eventHandler) handleGoexit1() error {
var key bpfGoexit1EventKey
var value bpfGoexit1Event
var keysToDelete []bpfGoexit1EventKey

return h.handle(
h.objs.StackAddresses,
h.objs.Goexit1Events,
func(mapIter *ebpf.MapIterator, stackIdSet map[int32]struct{}) (any, int) {
for mapIter.Next(&key, &value) {
stack, err := h.lookupStack(value.StackId)
if err != nil {
slog.Warn(err.Error())
continue
}
stackIdSet[value.StackId] = struct{}{}
keysToDelete = append(keysToDelete, key)
h.sendGoroutine(goroutine{
Id: key.GoroutineId,
ObservedAt: time.Now(),
Stack: stack,
Exit: false,
})
}
return keysToDelete, len(keysToDelete)
},
)
}
Loading