Skip to content

Commit

Permalink
Bearer token support (#4)
Browse files Browse the repository at this point in the history
* Support Bearer tokens using a different mutex and queue, remove redundant takeGlobal call on queue

* Add LRU support for bearer queues

* Document bearer tokens on readme

* Prevent queue creation and global hash calculations if the request is going to be routed to another node

* Fix some bad mutex locking, add k6 test

* More mutex work
  • Loading branch information
germanoeich authored Apr 12, 2022
1 parent e563ff1 commit d43ecc1
Show file tree
Hide file tree
Showing 11 changed files with 190 additions and 67 deletions.
1 change: 0 additions & 1 deletion .github/workflows/docker-publish-tags.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ env:

jobs:
build:

runs-on: ubuntu-latest
permissions:
contents: read
Expand Down
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ nirn-proxy.*
nirn-proxy
.env
*.txt
*.log
*.log
k6_tests/node_modules
5 changes: 5 additions & 0 deletions CONFIG.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ If using Kubernetes, create a headless service and use it here for easy clusteri

Example: `nirn-headless.default.svc.cluster.local` or `nirn.mydomain.com`

##### MAX_BEARER_COUNT
Bearer token queues max size. Internally, bearer queues are put in an LRU map, this env var represents the max amount of items for this map.
Requests are never interrupted midway, even when an entry is evicted. A low LRU size may cause increased 429s if a bearer token has too many requests queued and fires another one after eviction.
Default: 1024

## Unstable env vars
Collection of env vars that may be removed at any time, mainly used for Discord introducing new behaviour on their edge api versions

Expand Down
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ It is designed to be minimally invasive and exploits common library patterns to
- Works with any API version (Also supports using two or more versions for the same bot)
- Small resource footprint
- Works with webhooks
- Works with Bearer tokens
- Prometheus metrics exported out of the box
- No hardcoded routes, therefore no need of updates for new routes introduced by Discord

Expand Down Expand Up @@ -56,8 +57,6 @@ The proxy may return a 408 Request Timeout if Discord takes more than $REQUEST_T

The ratelimiting only works with `X-RateLimit-Precision` set to `seconds`. If you are using Discord API v8+, that is the only possible behaviour. For users on v6 or v7, please refer to your library docs for information on which precision it uses and how to change it to seconds.

Bearer tokens should work, however this was not at all tested and is not the main use case for this project

### Why?

As projects grow, it's desirable to break them into multiple pieces, each responsible for its own domain. Discord provides gateway sharding on their end but REST can get tricky once you start moving logic out of the shards themselves and lose the guild affinity that shards inherently have, thus a centralized place for handling ratelimits is a must to prevent cloudflare bans and prevent avoidable 429s. At the time this project was created, there was no alternative that fully satisfied our requirements like multi-bot support. We are also early adopters of Discord features, so we need a proxy that supports new routes without us having to manually update it. Thus, this project was born.
Expand Down Expand Up @@ -99,6 +98,10 @@ Global ratelimits are handled by a single node on the cluster, however this affi

The best deployment strategy for the cluster is to kill nodes one at a time, preferably with the replacement node already up.

### Bearer Tokens

Bearer tokens are first class citizens. They are treated differently than bot tokens, while bot queues are long lived and never get evicted, Bearer queues are put into an LRU and are spread out by their token hash instead of by the path hash. This provides a more even spread of bearer queues across nodes in the cluster. In addition, Bearer globals are always handled locally. You can control how many bearer queues to keep at any time with the MAX_BEARER_COUNT env var.

### Profiling

The proxy can be profiled at runtime by enabling the ENABLE_PPROF flag and browsing to `http://ip:7654/debug/pprof/`
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.16
require (
github.com/Clever/leakybucket v1.2.0
github.com/bwmarrin/snowflake v0.3.0 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/hashicorp/memberlist v0.3.1 // indirect
github.com/joho/godotenv v1.4.0 // indirect
github.com/prometheus/client_golang v1.11.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerX
github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/golang-lru v0.5.0 h1:CL2msUPvZTLb5O648aiLNJw3hnBxN2+1Jq8rCOH9wdo=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/hashicorp/memberlist v0.3.1 h1:MXgUXLqva1QvpVEDQW1IQLG0wivQAtmFlHRQ+1vWZfM=
github.com/hashicorp/memberlist v0.3.1/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
Expand Down
21 changes: 21 additions & 0 deletions k6_tests/loadtest1.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import http from 'k6/http';
import {check, sleep} from 'k6';
export const options = {
noConnectionReuse: true,
vus: 50,
iterations: 50
};

export default function() {
const params = {
headers: { 'Authorization': __ENV.TOKEN },
};
let res = http.get('http://localhost:8080/api/v9/gateway', params);
check(res, { 'success': (r) => r.status >= 200 && r.status < 400 });

let res2 = http.get('http://localhost:8080/api/v9/guilds/203039963636301824', params);
check(res2, { 'success': (r) => r.status >= 200 && r.status < 400 });

let res3 = http.get('http://localhost:8080/api/v9/guilds/203039963636301824/channels', params);
check(res3, { 'success': (r) => r.status >= 200 && r.status < 400 });
}
5 changes: 5 additions & 0 deletions lib/discord.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"math"
"net"
"net/http"
"strings"
"time"
)

Expand Down Expand Up @@ -79,6 +80,10 @@ func GetBotGlobalLimit(token string) (uint, error) {
return math.MaxUint32, nil
}

if strings.HasPrefix(token, "Bearer") {
return 50, nil
}

bot, err := doDiscordReq(context.Background(), "/api/v9/gateway/bot", "GET", nil, map[string][]string{"Authorization": {token}}, "")

if err != nil {
Expand Down
75 changes: 37 additions & 38 deletions lib/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type RequestQueue struct {
identifier string
isTokenInvalid *int64
botLimit uint
queueType QueueType
}


Expand Down Expand Up @@ -66,16 +67,27 @@ func NewRequestQueue(processor func(ctx context.Context, item *QueueItem) (*http
return nil, err
}

user, err := GetBotUser(token)
if err != nil && token != "" {
return nil, err
queueType := NoAuth
var user *BotUserResponse
if !strings.HasPrefix(token, "Bearer") {
user, err = GetBotUser(token)
if err != nil && token != "" {
return nil, err
}
} else {
queueType = Bearer
}

identifier := "NoAuth"
if user != nil {
queueType = Bot
identifier = user.Username + "#" + user.Discrim
}

if queueType == Bearer {
identifier = "Bearer"
}

ret := &RequestQueue{
queues: make(map[uint64]*QueueChannel),
processor: processor,
Expand All @@ -85,15 +97,31 @@ func NewRequestQueue(processor func(ctx context.Context, item *QueueItem) (*http
user: user,
identifier: identifier,
isTokenInvalid: new(int64),
botLimit: limit,
botLimit: limit,
queueType: queueType,
}

logger.WithFields(logrus.Fields{ "globalLimit": limit, "identifier": identifier, "bufferSize": bufferSize }).Info("Created new queue")
if queueType != Bearer {
logger.WithFields(logrus.Fields{"globalLimit": limit, "identifier": identifier, "bufferSize": bufferSize}).Info("Created new queue")
// Only sweep bot queues, bearer queues get completely destroyed and hold way less endpoints
go ret.tickSweep()
} else {
logger.WithFields(logrus.Fields{"globalLimit": limit, "identifier": identifier, "bufferSize": bufferSize}).Debug("Created new bearer queue")
}

go ret.tickSweep()
return ret, nil
}

func (q *RequestQueue) destroy() {
q.Lock()
defer q.Unlock()
logger.Debug("Destroying queue")
for _, val := range q.queues {
close(val.ch)
}
q.queues = nil
}

func (q *RequestQueue) sweep() {
q.Lock()
defer q.Unlock()
Expand Down Expand Up @@ -124,11 +152,13 @@ func (q *RequestQueue) Queue(req *http.Request, res *http.ResponseWriter, path s
"method": req.Method,
}).Trace("Inbound request")

q.Lock()
ch := q.getQueueChannel(path, pathHash)

doneChan := make(chan *http.Response)
errChan := make(chan error)
ch.ch <- &QueueItem{req, res, doneChan, errChan }
q.Unlock()
select {
case resp := <-doneChan:
return path, resp, nil
Expand All @@ -138,8 +168,6 @@ func (q *RequestQueue) Queue(req *http.Request, res *http.ResponseWriter, path s
}

func (q *RequestQueue) getQueueChannel(path string, pathHash uint64) *QueueChannel {
q.Lock()
defer q.Unlock()
t := time.Now()
ch, ok := q.queues[pathHash]
if !ok {
Expand Down Expand Up @@ -206,34 +234,6 @@ func parseHeaders(headers *http.Header) (int64, int64, time.Duration, bool, erro
return limitParsed, remainingParsed, reset, isGlobal, nil
}

func (q *RequestQueue) takeGlobal(path string) {
takeGlobal:
waitTime := atomic.LoadInt64(q.globalLockedUntil)

if waitTime > 0 {
logger.WithFields(logrus.Fields{
"bucket": path,
"waitTime": waitTime,
}).Trace("Waiting for existing global to clear")
time.Sleep(time.Until(time.Unix(0, waitTime)))
sw := atomic.CompareAndSwapInt64(q.globalLockedUntil, waitTime, 0)
if sw {
logger.Info("Unlocked global bucket")
}
}

_, err := q.globalBucket.Add(1)
if err != nil {
reset := q.globalBucket.Reset()
logger.WithFields(logrus.Fields{
"bucket": path,
"waitTime": time.Until(reset),
}).Trace("Failed to grab global token, sleeping for a bit")
time.Sleep(time.Until(reset))
goto takeGlobal
}
}

func return404webhook(item *QueueItem) {
res := *item.Res
res.WriteHeader(404)
Expand Down Expand Up @@ -286,7 +286,6 @@ func (q *RequestQueue) subscribe(ch *QueueChannel, path string, pathHash uint64)
continue
}

q.takeGlobal(path)

if atomic.LoadInt64(q.isTokenInvalid) > 0 {
return401(item)
Expand Down Expand Up @@ -345,7 +344,7 @@ func (q *RequestQueue) subscribe(ch *QueueChannel, path string, pathHash uint64)
ret404 = true
}

if resp.StatusCode == 401 && !isInteraction(item.Req.URL.String()) && q.identifier != "NoAuth" {
if resp.StatusCode == 401 && !isInteraction(item.Req.URL.String()) && q.queueType != NoAuth {
// Permanently lock this queue
logger.WithFields(logrus.Fields{
"bucket": path,
Expand Down
Loading

0 comments on commit d43ecc1

Please sign in to comment.