diff --git a/cmd/algia/main.go b/cmd/algia/main.go index 4dcf98ba..61df6a25 100644 --- a/cmd/algia/main.go +++ b/cmd/algia/main.go @@ -1,7 +1,6 @@ package main import ( - "context" "encoding/json" "errors" "fmt" @@ -14,6 +13,8 @@ import ( "sync" "time" + "github.com/Hubmakerlabs/replicatr/pkg/context" + "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/event" "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/filter" "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/keys" @@ -150,7 +151,7 @@ func (cfg *Config) GetFollows(profile string) (map[string]Profile, error) { mu.Unlock() m := map[string]struct{}{} - cfg.Do(RelayPerms{Read: true}, func(ctx context.Context, rl *relays.Relay) bool { + cfg.Do(RelayPerms{Read: true}, func(ctx context.T, rl *relays.Relay) bool { evs, e := rl.QuerySync(ctx, filter.T{Kinds: []int{event.KindContactList}, Authors: []string{pub}, Limit: 1}) if e != nil { return true @@ -194,7 +195,7 @@ func (cfg *Config) GetFollows(profile string) (map[string]Profile, error) { } // get follower's descriptions - cfg.Do(RelayPerms{Read: true}, func(ctx context.Context, rl *relays.Relay) bool { + cfg.Do(RelayPerms{Read: true}, func(ctx context.T, rl *relays.Relay) bool { evs, e := rl.QuerySync(ctx, filter.T{ Kinds: []int{event.KindProfileMetadata}, Authors: follows[i:end], // Use the updated end index @@ -225,7 +226,7 @@ func (cfg *Config) GetFollows(profile string) (map[string]Profile, error) { } // FindRelay is -func (cfg *Config) FindRelay(ctx context.Context, r RelayPerms) *relays.Relay { +func (cfg *Config) FindRelay(ctx context.T, r RelayPerms) *relays.Relay { for k, v := range cfg.Relays { if r.Write && !v.Write { continue @@ -252,9 +253,9 @@ func (cfg *Config) FindRelay(ctx context.Context, r RelayPerms) *relays.Relay { } // Do is -func (cfg *Config) Do(r RelayPerms, f func(context.Context, *relays.Relay) bool) { +func (cfg *Config) Do(r RelayPerms, f func(context.T, *relays.Relay) bool) { var wg sync.WaitGroup - ctx := context.Background() + ctx := context.Bg() for k, v := range cfg.Relays { if r.Write && !v.Write { continue @@ -390,7 +391,7 @@ func (cfg *Config) Events(f filter.T) []*event.T { var mu sync.Mutex found := false var m sync.Map - cfg.Do(RelayPerms{Read: true}, func(ctx context.Context, rl *relays.Relay) bool { + cfg.Do(RelayPerms{Read: true}, func(ctx context.T, rl *relays.Relay) bool { mu.Lock() if found { mu.Unlock() diff --git a/cmd/algia/profile.go b/cmd/algia/profile.go index 86f56227..2e14120e 100644 --- a/cmd/algia/profile.go +++ b/cmd/algia/profile.go @@ -1,12 +1,13 @@ package main import ( - "context" "encoding/json" "errors" "fmt" "os" + "github.com/Hubmakerlabs/replicatr/pkg/context" + "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/event" "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/filter" "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/keys" @@ -21,7 +22,7 @@ func doProfile(cCtx *cli.Context) (e error) { j := cCtx.Bool("json") cfg := cCtx.App.Metadata["config"].(*Config) - rl := cfg.FindRelay(context.Background(), RelayPerms{Read: true}) + rl := cfg.FindRelay(context.Bg(), RelayPerms{Read: true}) if rl == nil { return errors.New("cannot connect relays") } diff --git a/cmd/algia/timeline.go b/cmd/algia/timeline.go index 03d190af..da36efa8 100644 --- a/cmd/algia/timeline.go +++ b/cmd/algia/timeline.go @@ -1,7 +1,6 @@ package main import ( - "context" "encoding/json" "errors" "fmt" @@ -13,6 +12,8 @@ import ( "sync" "sync/atomic" + "github.com/Hubmakerlabs/replicatr/pkg/context" + "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/event" "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/filter" "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/filters" @@ -217,7 +218,7 @@ func doDMPost(cCtx *cli.Context) (e error) { } var success atomic.Int64 - cfg.Do(RelayPerms{Write: true}, func(ctx context.Context, rl *relays.Relay) bool { + cfg.Do(RelayPerms{Write: true}, func(ctx context.T, rl *relays.Relay) bool { e := rl.Publish(ctx, ev) if e != nil { fmt.Fprintln(os.Stderr, rl.URL, e) @@ -324,7 +325,7 @@ func doPost(cCtx *cli.Context) (e error) { } var success atomic.Int64 - cfg.Do(RelayPerms{Write: true}, func(ctx context.Context, rl *relays.Relay) bool { + cfg.Do(RelayPerms{Write: true}, func(ctx context.T, rl *relays.Relay) bool { e := rl.Publish(ctx, ev) if e != nil { fmt.Fprintln(os.Stderr, rl.URL, e) @@ -425,7 +426,7 @@ func doReply(cCtx *cli.Context) (e error) { } var success atomic.Int64 - cfg.Do(RelayPerms{Write: true}, func(ctx context.Context, rl *relays.Relay) bool { + cfg.Do(RelayPerms{Write: true}, func(ctx context.T, rl *relays.Relay) bool { if !quote { ev.Tags = ev.Tags.AppendUnique(tags.Tag{"e", id, rl.URL, "reply"}) } else { @@ -488,7 +489,7 @@ func doRepost(cCtx *cli.Context) (e error) { first.Store(true) var success atomic.Int64 - cfg.Do(RelayPerms{Write: true}, func(ctx context.Context, rl *relays.Relay) bool { + cfg.Do(RelayPerms{Write: true}, func(ctx context.T, rl *relays.Relay) bool { if first.Load() { evs, e := rl.QuerySync(ctx, f) if e != nil { @@ -543,7 +544,7 @@ func doUnrepost(cCtx *cli.Context) (e error) { } var repostID string var mu sync.Mutex - cfg.Do(RelayPerms{Read: true}, func(ctx context.Context, rl *relays.Relay) bool { + cfg.Do(RelayPerms{Read: true}, func(ctx context.T, rl *relays.Relay) bool { evs, e := rl.QuerySync(ctx, f) if e != nil { return true @@ -565,7 +566,7 @@ func doUnrepost(cCtx *cli.Context) (e error) { } var success atomic.Int64 - cfg.Do(RelayPerms{Write: true}, func(ctx context.Context, rl *relays.Relay) bool { + cfg.Do(RelayPerms{Write: true}, func(ctx context.T, rl *relays.Relay) bool { e := rl.Publish(ctx, ev) if e != nil { fmt.Fprintln(os.Stderr, rl.URL, e) @@ -631,7 +632,7 @@ func doLike(cCtx *cli.Context) (e error) { first.Store(true) var success atomic.Int64 - cfg.Do(RelayPerms{Write: true}, func(ctx context.Context, rl *relays.Relay) bool { + cfg.Do(RelayPerms{Write: true}, func(ctx context.T, rl *relays.Relay) bool { if first.Load() { evs, e := rl.QuerySync(ctx, f) if e != nil { @@ -687,7 +688,7 @@ func doUnlike(cCtx *cli.Context) (e error) { } var likeID string var mu sync.Mutex - cfg.Do(RelayPerms{Read: true}, func(ctx context.Context, rl *relays.Relay) bool { + cfg.Do(RelayPerms{Read: true}, func(ctx context.T, rl *relays.Relay) bool { evs, e := rl.QuerySync(ctx, f) if e != nil { return true @@ -709,7 +710,7 @@ func doUnlike(cCtx *cli.Context) (e error) { } var success atomic.Int64 - cfg.Do(RelayPerms{Write: true}, func(ctx context.Context, rl *relays.Relay) bool { + cfg.Do(RelayPerms{Write: true}, func(ctx context.T, rl *relays.Relay) bool { e := rl.Publish(ctx, ev) if e != nil { fmt.Fprintln(os.Stderr, rl.URL, e) @@ -758,7 +759,7 @@ func doDelete(cCtx *cli.Context) (e error) { } var success atomic.Int64 - cfg.Do(RelayPerms{Write: true}, func(ctx context.Context, rl *relays.Relay) bool { + cfg.Do(RelayPerms{Write: true}, func(ctx context.T, rl *relays.Relay) bool { e := rl.Publish(ctx, ev) if e != nil { fmt.Fprintln(os.Stderr, rl.URL, e) @@ -821,7 +822,7 @@ func doStream(cCtx *cli.Context) (e error) { cfg := cCtx.App.Metadata["config"].(*Config) - rl := cfg.FindRelay(context.Background(), RelayPerms{Read: true}) + rl := cfg.FindRelay(context.Bg(), RelayPerms{Read: true}) if rl == nil { return errors.New("cannot connect relays") } @@ -859,7 +860,7 @@ func doStream(cCtx *cli.Context) (e error) { Since: &since, } - sub, e := rl.Subscribe(context.Background(), filters.T{ff}) + sub, e := rl.Subscribe(context.Bg(), filters.T{ff}) if e != nil { return e } @@ -879,7 +880,7 @@ func doStream(cCtx *cli.Context) (e error) { if e := evr.Sign(sk); e != nil { return e } - cfg.Do(RelayPerms{Write: true}, func(ctx context.Context, rl *relays.Relay) bool { + cfg.Do(RelayPerms{Write: true}, func(ctx context.T, rl *relays.Relay) bool { rl.Publish(ctx, evr) return true }) @@ -948,7 +949,7 @@ func postMsg(cCtx *cli.Context, msg string) (e error) { } var success atomic.Int64 - cfg.Do(RelayPerms{Write: true}, func(ctx context.Context, rl *relays.Relay) bool { + cfg.Do(RelayPerms{Write: true}, func(ctx context.T, rl *relays.Relay) bool { e := rl.Publish(ctx, ev) if e != nil { fmt.Fprintln(os.Stderr, rl.URL, e) diff --git a/cmd/algia/zap.go b/cmd/algia/zap.go index 57f7dca5..c5fc8373 100644 --- a/cmd/algia/zap.go +++ b/cmd/algia/zap.go @@ -1,7 +1,6 @@ package main import ( - "context" "encoding/json" "errors" "fmt" @@ -10,6 +9,8 @@ import ( "os" "strings" + "github.com/Hubmakerlabs/replicatr/pkg/context" + "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/event" "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/filter" "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/keys" @@ -75,7 +76,7 @@ func pay(cfg *Config, invoice string) (e error) { return e } - rl, e := relays.RelayConnect(context.Background(), host) + rl, e := relays.RelayConnect(context.Bg(), host) if e != nil { return e } @@ -117,12 +118,12 @@ func pay(cfg *Config, invoice string) (e error) { Kinds: []int{event.KindNWCWalletInfo, event.KindNWCWalletResponse, event.KindNWCWalletRequest}, Limit: 1, }} - sub, e := rl.Subscribe(context.Background(), filters) + sub, e := rl.Subscribe(context.Bg(), filters) if e != nil { return e } - e = rl.Publish(context.Background(), ev) + e = rl.Publish(context.Bg(), ev) if e != nil { return e } @@ -146,7 +147,7 @@ func pay(cfg *Config, invoice string) (e error) { // ZapInfo is func (cfg *Config) ZapInfo(pub string) (*Lnurlp, error) { - rl := cfg.FindRelay(context.Background(), RelayPerms{Read: true}) + rl := cfg.FindRelay(context.Bg(), RelayPerms{Read: true}) if rl == nil { return nil, errors.New("cannot connect relays") } diff --git a/cmd/replicatrd/replicatr/listener.go b/cmd/replicatrd/replicatr/listener.go index 19ea433b..19cf3acb 100644 --- a/cmd/replicatrd/replicatr/listener.go +++ b/cmd/replicatrd/replicatr/listener.go @@ -1,9 +1,10 @@ package replicatr import ( - "context" "fmt" + "github.com/Hubmakerlabs/replicatr/pkg/context" + "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/filter" "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/filters" "github.com/puzpuzpuz/xsync/v2" @@ -11,7 +12,7 @@ import ( type Listener struct { filters filters.T - cancel context.CancelCauseFunc + cancel context.C } var listeners = xsync.NewTypedMapOf[*WebSocket, @@ -43,7 +44,7 @@ func GetListeningFilters() (respFilters filters.T) { return } -func SetListener(id string, ws *WebSocket, f filters.T, c context.CancelCauseFunc) { +func SetListener(id string, ws *WebSocket, f filters.T, c context.C) { subs, _ := listeners.LoadOrCompute(ws, func() ListenerMap { return xsync.NewMapOf[*Listener]() }) diff --git a/cmd/replicatrd/replicatr/policiesnip4.go b/cmd/replicatrd/replicatr/policiesnip4.go index ef6ac8d7..9ebf328d 100644 --- a/cmd/replicatrd/replicatr/policiesnip4.go +++ b/cmd/replicatrd/replicatr/policiesnip4.go @@ -1,7 +1,7 @@ package replicatr import ( - "context" + "github.com/Hubmakerlabs/replicatr/pkg/context" "github.com/Hubmakerlabs/replicatr/pkg/nostr/filter" "golang.org/x/exp/slices" @@ -9,7 +9,7 @@ import ( // RejectKind4Snoopers prevents reading NIP-04 messages from people not involved // in the conversation. -func RejectKind4Snoopers(c context.Context, f *filter.T) (bool, string) { +func RejectKind4Snoopers(c context.T, f *filter.T) (bool, string) { // prevent kind-4 events from being returned to unauthorized users, only // when authentication is a thing if !slices.Contains(f.Kinds, 4) { diff --git a/cmd/replicatrd/replicatr/relay.go b/cmd/replicatrd/replicatr/relay.go index 0ea2b05e..8d2c5a22 100644 --- a/cmd/replicatrd/replicatr/relay.go +++ b/cmd/replicatrd/replicatr/relay.go @@ -1,11 +1,12 @@ package replicatr import ( - "context" "net/http" "os" "time" + "github.com/Hubmakerlabs/replicatr/pkg/context" + "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/event" "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/filter" "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/nip11" @@ -26,17 +27,17 @@ const ( // function types used in the relay state type ( - RejectEvent func(c context.Context, event *event.T) (reject bool, msg string) - RejectFilter func(c context.Context, f *filter.T) (reject bool, msg string) - OverwriteFilter func(c context.Context, f *filter.T) - OverwriteDeletionOutcome func(c context.Context, target *event.T, del *event.T) (accept bool, msg string) - OverwriteResponseEvent func(c context.Context, ev *event.T) - Events func(c context.Context, ev *event.T) error - Hook func(c context.Context) - OverwriteRelayInformation func(c context.Context, r *http.Request, info *nip11.RelayInformationDocument) *nip11.RelayInformationDocument - QueryEvents func(c context.Context, f *filter.T) (eventC chan *event.T, e error) - CountEvents func(c context.Context, f *filter.T) (cnt int64, e error) - OnEventSaved func(c context.Context, ev *event.T) + RejectEvent func(c context.T, event *event.T) (reject bool, msg string) + RejectFilter func(c context.T, f *filter.T) (reject bool, msg string) + OverwriteFilter func(c context.T, f *filter.T) + OverwriteDeletionOutcome func(c context.T, target *event.T, del *event.T) (accept bool, msg string) + OverwriteResponseEvent func(c context.T, ev *event.T) + Events func(c context.T, ev *event.T) error + Hook func(c context.T) + OverwriteRelayInformation func(c context.T, r *http.Request, info *nip11.Info) *nip11.Info + QueryEvents func(c context.T, f *filter.T) (eventC chan *event.T, e error) + CountEvents func(c context.T, f *filter.T) (cnt int64, e error) + OnEventSaved func(c context.T, ev *event.T) ) type Relay struct { @@ -57,7 +58,7 @@ type Relay struct { OnDisconnect []Hook OnEventSaved []OnEventSaved // editing info will affect - Info *nip11.RelayInformationDocument + Info *nip11.Info *log2.Log // for establishing websockets upgrader websocket.Upgrader @@ -77,7 +78,7 @@ type Relay struct { func NewRelay(appName string) (r *Relay) { r = &Relay{ Log: log2.New(os.Stderr, appName, 0), - Info: &nip11.RelayInformationDocument{ + Info: &nip11.Info{ Software: "https://github.com/Hubmakerlabs/replicatr/cmd/replicatrd", Version: "n/a", SupportedNIPs: make([]int, 0), diff --git a/cmd/replicatrd/replicatr/websockethandler.go b/cmd/replicatrd/replicatr/websockethandler.go index 70951b53..0b6a1f55 100644 --- a/cmd/replicatrd/replicatr/websockethandler.go +++ b/cmd/replicatrd/replicatr/websockethandler.go @@ -1,7 +1,6 @@ package replicatr import ( - "context" "crypto/rand" "crypto/sha256" "errors" @@ -10,6 +9,8 @@ import ( "sync" "time" + "github.com/Hubmakerlabs/replicatr/pkg/context" + "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/OK" "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/auth" "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/closed" @@ -43,9 +44,9 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) { Request: r, Challenge: hex.Enc(challenge), } - c, cancel := context.WithCancel( - context.WithValue( - context.Background(), + c, cancel := context.Cancel( + context.Value( + context.Bg(), wsKey, ws, ), ) @@ -65,7 +66,7 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) { go rl.websocketWatcher(c, kill, ticker, ws) } -func (rl *Relay) websocketProcessMessages(message []byte, c context.Context, ws *WebSocket) { +func (rl *Relay) websocketProcessMessages(message []byte, c context.T, ws *WebSocket) { var e error env := envelope.ParseMessage(message) if env == nil { @@ -144,9 +145,9 @@ func (rl *Relay) websocketProcessMessages(message []byte, c context.Context, ws wg := sync.WaitGroup{} wg.Add(len(env.T)) // a context just for the "stored events" request handler - reqCtx, cancelReqCtx := context.WithCancelCause(c) + reqCtx, cancelReqCtx := context.CancelCause(c) // expose subscription id in the context - reqCtx = context.WithValue(reqCtx, subscriptionIdKey, env.SubscriptionID) + reqCtx = context.Value(reqCtx, subscriptionIdKey, env.SubscriptionID) // handle each filter separately -- dispatching events as they're loaded from databases for _, f := range env.T { e = rl.handleFilter(reqCtx, env.SubscriptionID, &wg, ws, &f) @@ -198,7 +199,7 @@ func (rl *Relay) websocketProcessMessages(message []byte, c context.Context, ws } } -func (rl *Relay) websocketReadMessages(c context.Context, kill func(), +func (rl *Relay) websocketReadMessages(c context.T, kill func(), ws *WebSocket, conn *websocket.Conn, r *http.Request) { defer kill() @@ -238,7 +239,7 @@ func (rl *Relay) websocketReadMessages(c context.Context, kill func(), } } -func (rl *Relay) websocketWatcher(c context.Context, kill func(), t *time.Ticker, ws *WebSocket) { +func (rl *Relay) websocketWatcher(c context.T, kill func(), t *time.Ticker, ws *WebSocket) { var e error defer kill() for { diff --git a/pkg/context/context.go b/pkg/context/context.go index cf2c6c6a..c20c79ab 100644 --- a/pkg/context/context.go +++ b/pkg/context/context.go @@ -1,9 +1,22 @@ package context -import "context" +import ( + "context" +) type ( T = context.Context F = context.CancelFunc C = context.CancelCauseFunc ) + +var ( + Bg = context.Background + Cancel = context.WithCancel + Timeout = context.WithTimeout + TimeoutCause = context.WithTimeoutCause + TODO = context.TODO + Value = context.WithValue + CancelCause = context.WithCancelCause + Canceled = context.Canceled +) diff --git a/pkg/eventstore/badger/count.go b/pkg/eventstore/badger/count.go index 48f45b24..202f76f8 100644 --- a/pkg/eventstore/badger/count.go +++ b/pkg/eventstore/badger/count.go @@ -1,17 +1,18 @@ package badger import ( - "context" "encoding/binary" "errors" + "github.com/Hubmakerlabs/replicatr/pkg/context" + nostr_binary "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/binary" "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/event" "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/filter" "github.com/dgraph-io/badger/v4" ) -func (b *BadgerBackend) CountEvents(ctx context.Context, f *filter.T) (int64, error) { +func (b *BadgerBackend) CountEvents(ctx context.T, f *filter.T) (int64, error) { var count int64 = 0 queries, extraFilter, since, e := prepareQueries(f) diff --git a/pkg/eventstore/badger/delete.go b/pkg/eventstore/badger/delete.go index 0ab6301b..a0e974e0 100644 --- a/pkg/eventstore/badger/delete.go +++ b/pkg/eventstore/badger/delete.go @@ -1,7 +1,7 @@ package badger import ( - "context" + "github.com/Hubmakerlabs/replicatr/pkg/context" "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/event" "github.com/Hubmakerlabs/replicatr/pkg/hex" @@ -10,7 +10,7 @@ import ( var serialDelete uint32 = 0 -func (b *BadgerBackend) DeleteEvent(ctx context.Context, evt *event.T) (e error) { +func (b *BadgerBackend) DeleteEvent(ctx context.T, evt *event.T) (e error) { deletionHappened := false e = b.Update(func(txn *badger.Txn) (e error) { diff --git a/pkg/eventstore/badger/query.go b/pkg/eventstore/badger/query.go index 7d3a9f15..2ff9b664 100644 --- a/pkg/eventstore/badger/query.go +++ b/pkg/eventstore/badger/query.go @@ -2,10 +2,11 @@ package badger import ( "container/heap" - "context" "encoding/binary" "fmt" + "github.com/Hubmakerlabs/replicatr/pkg/context" + nostr_binary "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/binary" "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/event" "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/filter" @@ -26,7 +27,7 @@ type queryEvent struct { query int } -func (b BadgerBackend) QueryEvents(ctx context.Context, f *filter.T) (chan *event.T, error) { +func (b BadgerBackend) QueryEvents(ctx context.T, f *filter.T) (chan *event.T, error) { ch := make(chan *event.T) queries, extraFilter, since, e := prepareQueries(f) diff --git a/pkg/eventstore/badger/save.go b/pkg/eventstore/badger/save.go index 5d0cc6ab..9c540c28 100644 --- a/pkg/eventstore/badger/save.go +++ b/pkg/eventstore/badger/save.go @@ -1,7 +1,7 @@ package badger import ( - "context" + "github.com/Hubmakerlabs/replicatr/pkg/context" "github.com/Hubmakerlabs/replicatr/pkg/eventstore" nostr_binary "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/binary" @@ -10,7 +10,7 @@ import ( "github.com/dgraph-io/badger/v4" ) -func (b *BadgerBackend) SaveEvent(ctx context.Context, evt *event.T) (e error) { +func (b *BadgerBackend) SaveEvent(ctx context.T, evt *event.T) (e error) { return b.Update(func(txn *badger.Txn) (e error) { // query event by id to ensure we don't save duplicates id, _ := hex.Dec(evt.ID) diff --git a/pkg/eventstore/relay_interface.go b/pkg/eventstore/relay_interface.go index 4b79322b..57eadf73 100644 --- a/pkg/eventstore/relay_interface.go +++ b/pkg/eventstore/relay_interface.go @@ -1,9 +1,10 @@ package eventstore import ( - "context" "fmt" + "github.com/Hubmakerlabs/replicatr/pkg/context" + "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/event" "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/filter" "github.com/Hubmakerlabs/replicatr/pkg/nostr/relay" @@ -11,8 +12,8 @@ import ( // RelayInterface is a wrapper thing that unifies Store and nostr.Relay under a common API. type RelayInterface interface { - Publish(ctx context.Context, evt *event.T) error - QuerySync(ctx context.Context, f *filter.T, opts ...relay.SubscriptionOption) ([]*event.T, error) + Publish(ctx context.T, evt *event.T) error + QuerySync(ctx context.T, f *filter.T, opts ...relay.SubscriptionOption) ([]*event.T, error) } type RelayWrapper struct { @@ -21,7 +22,7 @@ type RelayWrapper struct { var _ RelayInterface = (*RelayWrapper)(nil) -func (w RelayWrapper) Publish(ctx context.Context, evt *event.T) (e error) { +func (w RelayWrapper) Publish(ctx context.T, evt *event.T) (e error) { if 20000 <= evt.Kind && evt.Kind < 30000 { // do not store ephemeral events return nil @@ -59,7 +60,7 @@ func (w RelayWrapper) Publish(ctx context.Context, evt *event.T) (e error) { return nil } -func (w RelayWrapper) QuerySync(ctx context.Context, f *filter.T, opts ...relay.SubscriptionOption) ([]*event.T, error) { +func (w RelayWrapper) QuerySync(ctx context.T, f *filter.T, opts ...relay.SubscriptionOption) ([]*event.T, error) { ch, e := w.Store.QueryEvents(ctx, f) if e != nil { return nil, fmt.Errorf("failed to query: %w", e) diff --git a/pkg/eventstore/store.go b/pkg/eventstore/store.go index b7d5e896..28cfad18 100644 --- a/pkg/eventstore/store.go +++ b/pkg/eventstore/store.go @@ -1,7 +1,7 @@ package eventstore import ( - "context" + "github.com/Hubmakerlabs/replicatr/pkg/context" "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/event" "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/filter" @@ -19,9 +19,9 @@ type Store interface { // QueryEvents is invoked upon a client's REQ as described in NIP-01. // it should return a channel with the events as they're recovered from a database. // the channel should be closed after the events are all delivered. - QueryEvents(context.Context, *filter.T) (chan *event.T, error) + QueryEvents(context.T, *filter.T) (chan *event.T, error) // DeleteEvent is used to handle deletion events, as per NIP-09. - DeleteEvent(context.Context, *event.T) error + DeleteEvent(context.T, *event.T) error // SaveEvent is called once Relay.AcceptEvent reports true. - SaveEvent(context.Context, *event.T) error + SaveEvent(context.T, *event.T) error } diff --git a/pkg/go-nostr/connect/connect.go b/pkg/go-nostr/connect/connect.go index c519c222..d8e80aca 100644 --- a/pkg/go-nostr/connect/connect.go +++ b/pkg/go-nostr/connect/connect.go @@ -3,13 +3,14 @@ package connect import ( "bytes" "compress/flate" - "context" "errors" "fmt" "io" "net" "net/http" + "github.com/Hubmakerlabs/replicatr/pkg/context" + log2 "github.com/Hubmakerlabs/replicatr/pkg/log" "github.com/gobwas/httphead" "github.com/gobwas/ws" @@ -30,7 +31,7 @@ type Connection struct { msgState *wsflate.MessageState } -func NewConnection(ctx context.Context, url string, requestHeader http.Header) (*Connection, error) { +func NewConnection(ctx context.T, url string, requestHeader http.Header) (*Connection, error) { dialer := ws.Dialer{ Header: ws.HandshakeHeaderHTTP(requestHeader), Extensions: []httphead.Option{ @@ -124,7 +125,7 @@ func (c *Connection) WriteMessage(data []byte) (e error) { return nil } -func (c *Connection) ReadMessage(ctx context.Context, buf io.Writer) (e error) { +func (c *Connection) ReadMessage(ctx context.T, buf io.Writer) (e error) { for { select { case <-ctx.Done(): diff --git a/pkg/go-nostr/connection/connection.go b/pkg/go-nostr/connection/connection.go index 1072041a..cf473e6e 100644 --- a/pkg/go-nostr/connection/connection.go +++ b/pkg/go-nostr/connection/connection.go @@ -3,13 +3,14 @@ package connection import ( "bytes" "compress/flate" - "context" "errors" "fmt" "io" "net" "net/http" + "github.com/Hubmakerlabs/replicatr/pkg/context" + "github.com/gobwas/httphead" "github.com/gobwas/ws" "github.com/gobwas/ws/wsflate" @@ -28,7 +29,7 @@ type Connection struct { msgStateW *wsflate.MessageState } -func NewConnection(ctx context.Context, url string, requestHeader http.Header) (*Connection, error) { +func NewConnection(ctx context.T, url string, requestHeader http.Header) (*Connection, error) { dialer := ws.Dialer{ Header: ws.HandshakeHeaderHTTP(requestHeader), Extensions: []httphead.Option{ @@ -126,7 +127,7 @@ func (c *Connection) WriteMessage(data []byte) error { return nil } -func (c *Connection) ReadMessage(ctx context.Context, buf io.Writer) error { +func (c *Connection) ReadMessage(ctx context.T, buf io.Writer) error { for { select { case <-ctx.Done(): diff --git a/pkg/go-nostr/example/example.go b/pkg/go-nostr/example/example.go index 8b447c6f..6e4b8366 100644 --- a/pkg/go-nostr/example/example.go +++ b/pkg/go-nostr/example/example.go @@ -1,7 +1,6 @@ package main import ( - "context" "encoding/json" "fmt" "io" @@ -9,6 +8,8 @@ import ( "strings" "time" + "github.com/Hubmakerlabs/replicatr/pkg/context" + "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/event" "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/filter" filters2 "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/filters" @@ -19,7 +20,7 @@ import ( ) func main() { - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + ctx, cancel := context.Timeout(context.Bg(), 3*time.Second) // connect to relay url := "wss://nostr.zebedee.cloud" @@ -122,7 +123,7 @@ func main() { ev.Content = strings.TrimSpace(content) ev.Sign(sk) for _, url := range []string{"wss://nostr.zebedee.cloud"} { - ctx := context.WithValue(context.Background(), "url", url) + ctx := context.Value(context.Bg(), "url", url) rl, e := relays.RelayConnect(ctx, url) if e != nil { fmt.Println(e) diff --git a/pkg/go-nostr/filter/filter_test.go b/pkg/go-nostr/filter/filter_test.go index 781e5eb7..6e306f0a 100644 --- a/pkg/go-nostr/filter/filter_test.go +++ b/pkg/go-nostr/filter/filter_test.go @@ -43,12 +43,12 @@ func TestFilterMarshal(t *testing.T) { func TestFilterMatchingLive(t *testing.T) { var f T - var evt event.T + var ev event.T json.Unmarshal([]byte(`{"kinds":[1],"authors":["a8171781fd9e90ede3ea44ddca5d3abf828fe8eedeb0f3abb0dd3e563562e1fc","1d80e5588de010d137a67c42b03717595f5f510e73e42cfc48f31bae91844d59","ed4ca520e9929dfe9efdadf4011b53d30afd0678a09aa026927e60e7a45d9244"],"since":1677033299}`), &f) - json.Unmarshal([]byte(`{"id":"5a127c9c931f392f6afc7fdb74e8be01c34035314735a6b97d2cf360d13cfb94","pubkey":"1d80e5588de010d137a67c42b03717595f5f510e73e42cfc48f31bae91844d59","created_at":1677033299,"kind":1,"tags":[["t","japan"]],"content":"If you like my art,I'd appreciate a coin or two!!\nZap is welcome!! Thanks.\n\n\n#japan #bitcoin #art #bananaart\nhttps://void.cat/d/CgM1bzDgHUCtiNNwfX9ajY.webp","sig":"828497508487ca1e374f6b4f2bba7487bc09fccd5cc0d1baa82846a944f8c5766918abf5878a580f1e6615de91f5b57a32e34c42ee2747c983aaf47dbf2a0255"}`), &evt) + json.Unmarshal([]byte(`{"id":"5a127c9c931f392f6afc7fdb74e8be01c34035314735a6b97d2cf360d13cfb94","pubkey":"1d80e5588de010d137a67c42b03717595f5f510e73e42cfc48f31bae91844d59","created_at":1677033299,"kind":1,"tags":[["t","japan"]],"content":"If you like my art,I'd appreciate a coin or two!!\nZap is welcome!! Thanks.\n\n\n#japan #bitcoin #art #bananaart\nhttps://void.cat/d/CgM1bzDgHUCtiNNwfX9ajY.webp","sig":"828497508487ca1e374f6b4f2bba7487bc09fccd5cc0d1baa82846a944f8c5766918abf5878a580f1e6615de91f5b57a32e34c42ee2747c983aaf47dbf2a0255"}`), &ev) - if !f.Matches(&evt) { + if !f.Matches(&ev) { t.Error("live filter should match") } } diff --git a/pkg/go-nostr/nip05/nip05.go b/pkg/go-nostr/nip05/nip05.go index 2d4239e8..e909d87a 100644 --- a/pkg/go-nostr/nip05/nip05.go +++ b/pkg/go-nostr/nip05/nip05.go @@ -1,12 +1,13 @@ package nip05 import ( - "context" "encoding/json" "fmt" "net/http" "strings" + "github.com/Hubmakerlabs/replicatr/pkg/context" + "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/pointers" "github.com/Hubmakerlabs/replicatr/pkg/hex" ) @@ -21,7 +22,7 @@ type WellKnownResponse struct { Relays key2RelaysMap `json:"relays"` // NIP-35 } -func QueryIdentifier(ctx context.Context, fullname string) (*pointers.ProfilePointer, error) { +func QueryIdentifier(ctx context.T, fullname string) (*pointers.ProfilePointer, error) { spl := strings.Split(fullname, "@") var name, domain string diff --git a/pkg/go-nostr/nip11/fetch.go b/pkg/go-nostr/nip11/fetch.go index 7d0cc89a..4fbd7240 100644 --- a/pkg/go-nostr/nip11/fetch.go +++ b/pkg/go-nostr/nip11/fetch.go @@ -1,21 +1,22 @@ package nip11 import ( - "context" "encoding/json" "fmt" "net/http" "net/url" "strings" "time" + + "github.com/Hubmakerlabs/replicatr/pkg/context" ) -// Fetch fetches the NIP-11 RelayInformationDocument. -func Fetch(ctx context.Context, u string) (info *RelayInformationDocument, e error) { +// Fetch fetches the NIP-11 Info. +func Fetch(ctx context.T, u string) (info *Info, e error) { if _, ok := ctx.Deadline(); !ok { // if no timeout is set, force it to 7 seconds - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, 7*time.Second) + var cancel context.F + ctx, cancel = context.Timeout(ctx, 7*time.Second) defer cancel() } @@ -46,7 +47,7 @@ func Fetch(ctx context.Context, u string) (info *RelayInformationDocument, e err } defer resp.Body.Close() - info = &RelayInformationDocument{} + info = &Info{} if e := json.NewDecoder(resp.Body).Decode(info); e != nil { return nil, fmt.Errorf("invalid json: %w", e) } diff --git a/pkg/go-nostr/nip11/nip11_test.go b/pkg/go-nostr/nip11/nip11_test.go index a1a0a891..e5d3559a 100644 --- a/pkg/go-nostr/nip11/nip11_test.go +++ b/pkg/go-nostr/nip11/nip11_test.go @@ -3,7 +3,7 @@ package nip11 import "testing" func TestAddSupportedNIP(t *testing.T) { - info := RelayInformationDocument{} + info := Info{} info.AddSupportedNIP(12) info.AddSupportedNIP(12) info.AddSupportedNIP(13) diff --git a/pkg/go-nostr/nip11/types.go b/pkg/go-nostr/nip11/types.go index 523985ac..208e5b31 100644 --- a/pkg/go-nostr/nip11/types.go +++ b/pkg/go-nostr/nip11/types.go @@ -2,7 +2,7 @@ package nip11 import "golang.org/x/exp/slices" -type RelayInformationDocument struct { +type Info struct { Name string `json:"name"` Description string `json:"description"` PubKey string `json:"pubkey"` @@ -11,17 +11,17 @@ type RelayInformationDocument struct { Software string `json:"software"` Version string `json:"version"` - Limitation *RelayLimitationDocument `json:"limitation,omitempty"` - RelayCountries []string `json:"relay_countries,omitempty"` + Limitation *Limits `json:"limitation,omitempty"` + RelayCountries []string `json:"relay_countries,omitempty"` LanguageTags []string `json:"language_tags,omitempty"` Tags []string `json:"tags,omitempty"` PostingPolicy string `json:"posting_policy,omitempty"` - PaymentsURL string `json:"payments_url,omitempty"` - Fees *RelayFeesDocument `json:"fees,omitempty"` - Icon string `json:"icon"` + PaymentsURL string `json:"payments_url,omitempty"` + Fees *Fees `json:"fees,omitempty"` + Icon string `json:"icon"` } -func (info *RelayInformationDocument) AddSupportedNIP(number int) { +func (info *Info) AddSupportedNIP(number int) { idx, exists := slices.BinarySearch(info.SupportedNIPs, number) if exists { return @@ -32,7 +32,7 @@ func (info *RelayInformationDocument) AddSupportedNIP(number int) { info.SupportedNIPs[idx] = number } -type RelayLimitationDocument struct { +type Limits struct { MaxMessageLength int `json:"max_message_length,omitempty"` MaxSubscriptions int `json:"max_subscriptions,omitempty"` MaxFilters int `json:"max_filters,omitempty"` @@ -46,7 +46,7 @@ type RelayLimitationDocument struct { RestrictedWrites bool `json:"restricted_writes"` } -type RelayFeesDocument struct { +type Fees struct { Admission []struct { Amount int `json:"amount"` Unit string `json:"unit"` diff --git a/pkg/go-nostr/nip46/signer.go b/pkg/go-nostr/nip46/signer.go index e6b127a7..48b2c657 100644 --- a/pkg/go-nostr/nip46/signer.go +++ b/pkg/go-nostr/nip46/signer.go @@ -46,7 +46,7 @@ func (s Session) MakeResponse( requester string, result string, e error, -) (resp Response, evt event.T, error error) { +) (resp Response, ev event.T, err error) { if e != nil { resp = Response{ ID: id, @@ -60,17 +60,18 @@ func (s Session) MakeResponse( } jresp, _ := json.Marshal(resp) - ciphertext, e := nip04.Encrypt(string(jresp), s.SharedKey) + var ciphertext string + ciphertext, e = nip04.Encrypt(string(jresp), s.SharedKey) if e != nil { - return resp, evt, fmt.Errorf("failed to encrypt result: %w", e) + return resp, ev, fmt.Errorf("failed to encrypt result: %w", e) } - evt.Content = ciphertext + ev.Content = ciphertext - evt.CreatedAt = timestamp.Now() - evt.Kind = event.KindNostrConnect - evt.Tags = tags.Tags{tags.Tag{"p", requester}} + ev.CreatedAt = timestamp.Now() + ev.Kind = event.KindNostrConnect + ev.Tags = tags.Tags{tags.Tag{"p", requester}} - return resp, evt, nil + return resp, ev, nil } type Signer struct { diff --git a/pkg/go-nostr/pools/pool.go b/pkg/go-nostr/pools/pool.go index b8e3e54e..2f372a87 100644 --- a/pkg/go-nostr/pools/pool.go +++ b/pkg/go-nostr/pools/pool.go @@ -1,13 +1,14 @@ package pools import ( - "context" "fmt" "log" "strings" "sync" "time" + "github.com/Hubmakerlabs/replicatr/pkg/context" + "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/event" "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/filter" "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/filters" @@ -23,10 +24,10 @@ const ( type SimplePool struct { Relays *xsync.MapOf[string, *relays.Relay] - Context context.Context + Context context.T authHandler func(*event.T) error - cancel context.CancelFunc + cancel context.F } type IncomingEvent struct { @@ -39,8 +40,8 @@ type PoolOption interface { Apply(*SimplePool) } -func NewSimplePool(ctx context.Context, opts ...PoolOption) *SimplePool { - ctx, cancel := context.WithCancel(ctx) +func NewSimplePool(ctx context.T, opts ...PoolOption) *SimplePool { + ctx, cancel := context.Cancel(ctx) pool := &SimplePool{ Relays: xsync.NewMapOf[*relays.Relay](), @@ -80,7 +81,7 @@ func (pool *SimplePool) EnsureRelay(url string) (*relays.Relay, error) { } else { var e error // we use this ctx here so when the pool dies everything dies - ctx, cancel := context.WithTimeout(pool.Context, time.Second*15) + ctx, cancel := context.Timeout(pool.Context, time.Second*15) defer cancel() if rl, e = relays.RelayConnect(ctx, nm); e != nil { return nil, fmt.Errorf("failed to connect: %w", e) @@ -93,17 +94,17 @@ func (pool *SimplePool) EnsureRelay(url string) (*relays.Relay, error) { // SubMany opens a subscription with the given filters to multiple relays // the subscriptions only end when the context is canceled -func (pool *SimplePool) SubMany(ctx context.Context, urls []string, filters filters.T) chan IncomingEvent { +func (pool *SimplePool) SubMany(ctx context.T, urls []string, filters filters.T) chan IncomingEvent { return pool.subMany(ctx, urls, filters, true) } // SubManyNonUnique is like SubMany, but returns duplicate events if they come from different relays -func (pool *SimplePool) SubManyNonUnique(ctx context.Context, urls []string, filters filters.T) chan IncomingEvent { +func (pool *SimplePool) SubManyNonUnique(ctx context.T, urls []string, filters filters.T) chan IncomingEvent { return pool.subMany(ctx, urls, filters, false) } -func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters filters.T, unique bool) chan IncomingEvent { - ctx, cancel := context.WithCancel(ctx) +func (pool *SimplePool) subMany(ctx context.T, urls []string, filters filters.T, unique bool) chan IncomingEvent { + ctx, cancel := context.Cancel(ctx) _ = cancel // do this so `go vet` will stop complaining events := make(chan IncomingEvent) seenAlready := xsync.NewMapOf[timestamp.T]() @@ -215,17 +216,17 @@ func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters filt } // SubManyEose is like SubMany, but it stops subscriptions and closes the channel when gets a EOSE -func (pool *SimplePool) SubManyEose(ctx context.Context, urls []string, filters filters.T) chan IncomingEvent { +func (pool *SimplePool) SubManyEose(ctx context.T, urls []string, filters filters.T) chan IncomingEvent { return pool.subManyEose(ctx, urls, filters, true) } // SubManyEoseNonUnique is like SubManyEose, but returns duplicate events if they come from different relays -func (pool *SimplePool) SubManyEoseNonUnique(ctx context.Context, urls []string, filters filters.T) chan IncomingEvent { +func (pool *SimplePool) SubManyEoseNonUnique(ctx context.T, urls []string, filters filters.T) chan IncomingEvent { return pool.subManyEose(ctx, urls, filters, false) } -func (pool *SimplePool) subManyEose(ctx context.Context, urls []string, filters filters.T, unique bool) chan IncomingEvent { - ctx, cancel := context.WithCancel(ctx) +func (pool *SimplePool) subManyEose(ctx context.T, urls []string, filters filters.T, unique bool) chan IncomingEvent { + ctx, cancel := context.Cancel(ctx) events := make(chan IncomingEvent) seenAlready := xsync.NewMapOf[bool]() @@ -299,8 +300,8 @@ func (pool *SimplePool) subManyEose(ctx context.Context, urls []string, filters } // QuerySingle returns the first event returned by the first relay, cancels everything else. -func (pool *SimplePool) QuerySingle(ctx context.Context, urls []string, f filter.T) *IncomingEvent { - ctx, cancel := context.WithCancel(ctx) +func (pool *SimplePool) QuerySingle(ctx context.T, urls []string, f filter.T) *IncomingEvent { + ctx, cancel := context.Cancel(ctx) defer cancel() for ievt := range pool.SubManyEose(ctx, urls, filters.T{f}) { return &ievt diff --git a/pkg/go-nostr/relays/relay.go b/pkg/go-nostr/relays/relay.go index 7bfc48db..721d9a52 100644 --- a/pkg/go-nostr/relays/relay.go +++ b/pkg/go-nostr/relays/relay.go @@ -2,7 +2,6 @@ package relays import ( "bytes" - "context" "fmt" "log" "net/http" @@ -10,6 +9,8 @@ import ( "sync/atomic" "time" + "github.com/Hubmakerlabs/replicatr/pkg/context" + "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/OK" "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/auth" "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/closed" @@ -44,8 +45,8 @@ type Relay struct { Subscriptions *xsync.MapOf[string, *Subscription] ConnectionError error - connectionContext context.Context // will be canceled when the connection closes - connectionContextCancel context.CancelFunc + connectionContext context.T // will be canceled when the connection closes + connectionContextCancel context.F challenge string // NIP-42 challenge, we only keep the last notices chan string // NIP-01 NOTICEs @@ -64,8 +65,8 @@ type writeRequest struct { } // NewRelay returns a new relay. The relay connection will be closed when the context is canceled. -func NewRelay(ctx context.Context, url string, opts ...RelayOption) *Relay { - ctx, cancel := context.WithCancel(ctx) +func NewRelay(ctx context.T, url string, opts ...RelayOption) *Relay { + ctx, cancel := context.Cancel(ctx) r := &Relay{ URL: normalize.URL(url), connectionContext: ctx, @@ -94,13 +95,14 @@ func NewRelay(ctx context.Context, url string, opts ...RelayOption) *Relay { // RelayConnect returns a relay object connected to url. // Once successfully connected, cancelling ctx has no effect. // To close the connection, call r.Close(). -func RelayConnect(ctx context.Context, url string, opts ...RelayOption) (*Relay, error) { - r := NewRelay(context.Background(), url, opts...) +func RelayConnect(ctx context.T, url string, opts ...RelayOption) (*Relay, error) { + r := NewRelay(context.Bg(), url, opts...) e := r.Connect(ctx) return r, e } // When instantiating relay connections, some options may be passed. + // RelayOption is the type of the argument passed for that. type RelayOption interface { IsRelayOption() @@ -120,7 +122,7 @@ func (r *Relay) String() string { } // Context retrieves the context that is associated with this relay connection. -func (r *Relay) Context() context.Context { return r.connectionContext } +func (r *Relay) Context() context.T { return r.connectionContext } // IsConnected returns true if the connection to this relay seems to be active. func (r *Relay) IsConnected() bool { return r.connectionContext.Err() == nil } @@ -133,7 +135,7 @@ func (r *Relay) IsConnected() bool { return r.connectionContext.Err() == nil } // The underlying relay connection will use a background context. If you want to // pass a custom context to the underlying relay connection, use NewRelay() and // then Relay.Connect(). -func (r *Relay) Connect(ctx context.Context) error { +func (r *Relay) Connect(ctx context.T) error { if r.connectionContext == nil || r.Subscriptions == nil { return fmt.Errorf("relay must be initialized with a call to NewRelay()") } @@ -144,8 +146,8 @@ func (r *Relay) Connect(ctx context.Context) error { if _, ok := ctx.Deadline(); !ok { // if no timeout is set, force it to 7 seconds - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, 7*time.Second) + var cancel context.F + ctx, cancel = context.Timeout(ctx, 7*time.Second) defer cancel() } @@ -296,12 +298,12 @@ func (r *Relay) Write(msg []byte) <-chan error { } // Publish sends an "EVENT" command to the relay r as in NIP-01 and waits for an OK response. -func (r *Relay) Publish(ctx context.Context, evt event.T) error { - return r.publish(ctx, evt.ID, &event.Envelope{T: evt}) +func (r *Relay) Publish(ctx context.T, ev event.T) error { + return r.publish(ctx, ev.ID, &event.Envelope{T: ev}) } // Auth sends an "AUTH" command client->relay as in NIP-42 and waits for an OK response. -func (r *Relay) Auth(ctx context.Context, sign func(event *event.T) error) error { +func (r *Relay) Auth(ctx context.T, sign func(ev *event.T) error) error { authEvent := event.T{ CreatedAt: timestamp.Now(), Kind: event.KindClientAuthentication, @@ -319,17 +321,17 @@ func (r *Relay) Auth(ctx context.Context, sign func(event *event.T) error) error } // publish can be used both for EVENT and for AUTH -func (r *Relay) publish(ctx context.Context, id string, env envelopes.E) error { +func (r *Relay) publish(ctx context.T, id string, env envelopes.E) error { var e error - var cancel context.CancelFunc + var cancel context.F if _, ok := ctx.Deadline(); !ok { // if no timeout is set, force it to 7 seconds - ctx, cancel = context.WithTimeoutCause(ctx, 7*time.Second, fmt.Errorf("given up waiting for an OK")) + ctx, cancel = context.TimeoutCause(ctx, 7*time.Second, fmt.Errorf("given up waiting for an OK")) defer cancel() } else { // otherwise make the context cancellable so we can stop everything upon receiving an "OK" - ctx, cancel = context.WithCancel(ctx) + ctx, cancel = context.Cancel(ctx) defer cancel() } @@ -370,9 +372,9 @@ func (r *Relay) publish(ctx context.Context, id string, env envelopes.E) error { // Events are returned through the channel sub.Events. // The subscription is closed when context ctx is cancelled ("CLOSE" in NIP-01). // -// Remember to cancel subscriptions, either by calling `.Unsub()` on them or ensuring their `context.Context` will be canceled at some point. +// Remember to cancel subscriptions, either by calling `.Unsub()` on them or ensuring their `context.T` will be canceled at some point. // Failure to do that will result in a huge number of halted goroutines being created. -func (r *Relay) Subscribe(ctx context.Context, filters filters.T, opts ...SubscriptionOption) (*Subscription, error) { +func (r *Relay) Subscribe(ctx context.T, filters filters.T, opts ...SubscriptionOption) (*Subscription, error) { sub := r.PrepareSubscription(ctx, filters, opts...) if e := sub.Fire(); e != nil { @@ -384,15 +386,15 @@ func (r *Relay) Subscribe(ctx context.Context, filters filters.T, opts ...Subscr // PrepareSubscription creates a subscription, but doesn't fire it. // -// Remember to cancel subscriptions, either by calling `.Unsub()` on them or ensuring their `context.Context` will be canceled at some point. +// Remember to cancel subscriptions, either by calling `.Unsub()` on them or ensuring their `context.T` will be canceled at some point. // Failure to do that will result in a huge number of halted goroutines being created. -func (r *Relay) PrepareSubscription(ctx context.Context, filters filters.T, opts ...SubscriptionOption) *Subscription { +func (r *Relay) PrepareSubscription(ctx context.T, filters filters.T, opts ...SubscriptionOption) *Subscription { if r.Connection == nil { panic(fmt.Errorf("must call .Connect() first before calling .Subscribe()")) } current := subscriptionIDCounter.Add(1) - ctx, cancel := context.WithCancel(ctx) + ctx, cancel := context.Cancel(ctx) sub := &Subscription{ Relay: r, @@ -421,7 +423,7 @@ func (r *Relay) PrepareSubscription(ctx context.Context, filters filters.T, opts return sub } -func (r *Relay) QuerySync(ctx context.Context, f filter.T, opts ...SubscriptionOption) ([]*event.T, error) { +func (r *Relay) QuerySync(ctx context.T, f filter.T, opts ...SubscriptionOption) ([]*event.T, error) { sub, e := r.Subscribe(ctx, filters.T{f}, opts...) if e != nil { return nil, e @@ -431,8 +433,8 @@ func (r *Relay) QuerySync(ctx context.Context, f filter.T, opts ...SubscriptionO if _, ok := ctx.Deadline(); !ok { // if no timeout is set, force it to 7 seconds - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, 7*time.Second) + var cancel context.F + ctx, cancel = context.Timeout(ctx, 7*time.Second) defer cancel() } @@ -453,7 +455,7 @@ func (r *Relay) QuerySync(ctx context.Context, f filter.T, opts ...SubscriptionO } } -func (r *Relay) Count(ctx context.Context, filters filters.T, opts ...SubscriptionOption) (int64, error) { +func (r *Relay) Count(ctx context.T, filters filters.T, opts ...SubscriptionOption) (int64, error) { sub := r.PrepareSubscription(ctx, filters, opts...) sub.countResult = make(chan int64) @@ -465,8 +467,8 @@ func (r *Relay) Count(ctx context.Context, filters filters.T, opts ...Subscripti if _, ok := ctx.Deadline(); !ok { // if no timeout is set, force it to 7 seconds - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, 7*time.Second) + var cancel context.F + ctx, cancel = context.Timeout(ctx, 7*time.Second) defer cancel() } diff --git a/pkg/go-nostr/relays/relay_test.go b/pkg/go-nostr/relays/relay_test.go index f85fe357..486367f3 100644 --- a/pkg/go-nostr/relays/relay_test.go +++ b/pkg/go-nostr/relays/relay_test.go @@ -2,7 +2,6 @@ package relays import ( "bytes" - "context" "encoding/json" "errors" "io" @@ -12,6 +11,8 @@ import ( "testing" "time" + "github.com/Hubmakerlabs/replicatr/pkg/context" + "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/event" "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/filter" "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/keys" @@ -61,7 +62,7 @@ func TestPublish(t *testing.T) { // connect a client and send the text note rl := MustRelayConnect(ws.URL) - e := rl.Publish(context.Background(), textNote) + e := rl.Publish(context.Bg(), textNote) if e != nil { t.Errorf("publish should have succeeded") } @@ -90,7 +91,7 @@ func TestPublishBlocked(t *testing.T) { // connect a client and send a text note rl := MustRelayConnect(ws.URL) - e := rl.Publish(context.Background(), textNote) + e := rl.Publish(context.Bg(), textNote) if e == nil { t.Errorf("should have failed to publish") } @@ -112,7 +113,7 @@ func TestPublishWriteFailed(t *testing.T) { rl := MustRelayConnect(ws.URL) // Force brief period of time so that publish always fails on closed socket. time.Sleep(1 * time.Millisecond) - e := rl.Publish(context.Background(), textNote) + e := rl.Publish(context.Bg(), textNote) if e == nil { t.Errorf("should have failed to publish") } @@ -131,7 +132,7 @@ func TestConnectContext(t *testing.T) { defer ws.Close() // relay client - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + ctx, cancel := context.Timeout(context.Bg(), 3*time.Second) defer cancel() r, e := RelayConnect(ctx, ws.URL) if e != nil { @@ -152,7 +153,7 @@ func TestConnectContextCanceled(t *testing.T) { defer ws.Close() // relay client - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.Cancel(context.Bg()) cancel() // make ctx expired _, e := RelayConnect(ctx, ws.URL) if !errors.Is(e, context.Canceled) { @@ -167,9 +168,9 @@ func TestConnectWithOrigin(t *testing.T) { defer ws.Close() // relay client - r := NewRelay(context.Background(), normalize.URL(ws.URL)) + r := NewRelay(context.Bg(), normalize.URL(ws.URL)) r.RequestHeader = http.Header{"origin": {"https://example.com"}} - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + ctx, cancel := context.Timeout(context.Bg(), 3*time.Second) defer cancel() e := r.Connect(ctx) if e != nil { @@ -215,11 +216,11 @@ func parseEventMessage(t *testing.T, raw []json.RawMessage) event.T { if typ != "EVENT" { t.Errorf("typ = %q; want EVENT", typ) } - var evt event.T - if e := json.Unmarshal(raw[1], &evt); e != nil { + var ev event.T + if e := json.Unmarshal(raw[1], &ev); e != nil { t.Errorf("json.Unmarshal(`%s`): %v", string(raw[1]), e) } - return evt + return ev } func parseSubscriptionMessage(t *testing.T, raw []json.RawMessage) (subid string, filters []filter.T) { diff --git a/pkg/go-nostr/relays/relays.go b/pkg/go-nostr/relays/relays.go index adf6dee3..7e991553 100644 --- a/pkg/go-nostr/relays/relays.go +++ b/pkg/go-nostr/relays/relays.go @@ -1,11 +1,11 @@ package relays import ( - "context" + "github.com/Hubmakerlabs/replicatr/pkg/context" ) func MustRelayConnect(url string) *Relay { - rl, e := RelayConnect(context.Background(), url) + rl, e := RelayConnect(context.Bg(), url) if e != nil { panic(e.Error()) } diff --git a/pkg/go-nostr/relays/relays_test.go b/pkg/go-nostr/relays/relays_test.go index 5e37b01e..ffb1dfaa 100644 --- a/pkg/go-nostr/relays/relays_test.go +++ b/pkg/go-nostr/relays/relays_test.go @@ -1,10 +1,11 @@ package relays import ( - "context" "testing" "time" + "github.com/Hubmakerlabs/replicatr/pkg/context" + "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/eose" "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/event" "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/filter" @@ -15,7 +16,7 @@ func TestEOSEMadness(t *testing.T) { rl := MustRelayConnect(eose.RELAY) defer rl.Close() - sub, e := rl.Subscribe(context.Background(), filters.T{ + sub, e := rl.Subscribe(context.Bg(), filters.T{ {Kinds: []int{event.KindTextNote}, Limit: 2}, }) if e != nil { @@ -62,7 +63,7 @@ func TestCount(t *testing.T) { rl := MustRelayConnect(RELAY) defer rl.Close() - count, e := rl.Count(context.Background(), filters.T{ + count, e := rl.Count(context.Bg(), filters.T{ {Kinds: []int{event.KindContactList}, Tags: filter.TagMap{"p": []string{"3bf0c63fcb93463407af97a5e5ee64fa883d107ef9e558472c4eb9aaaefa459d"}}}, }) if e != nil { diff --git a/pkg/go-nostr/relays/subscription.go b/pkg/go-nostr/relays/subscription.go index 46a3051b..50fdae01 100644 --- a/pkg/go-nostr/relays/subscription.go +++ b/pkg/go-nostr/relays/subscription.go @@ -1,12 +1,13 @@ package relays import ( - "context" "fmt" "strconv" "sync" "sync/atomic" + "github.com/Hubmakerlabs/replicatr/pkg/context" + close2 "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/closer" "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/count" "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/event" @@ -36,12 +37,12 @@ type Subscription struct { ClosedReason chan string // Context will be .Done() when the subscription ends - Context context.Context + Context context.T live atomic.Bool eosed atomic.Bool closed atomic.Bool - cancel context.CancelFunc + cancel context.F // this keeps track of the events we've received before the EOSE that we must dispatch before // closing the EndOfStoredEvents channel @@ -153,7 +154,7 @@ func (sub *Subscription) Close() { // Sub sets sub.T and then calls sub.Fire(ctx). // The subscription will be closed if the context expires. -func (sub *Subscription) Sub(_ context.Context, filters filters.T) { +func (sub *Subscription) Sub(_ context.T, filters filters.T) { sub.Filters = filters sub.Fire() } diff --git a/pkg/go-nostr/relays/subscription_test.go b/pkg/go-nostr/relays/subscription_test.go index 6b5b82ee..b625d841 100644 --- a/pkg/go-nostr/relays/subscription_test.go +++ b/pkg/go-nostr/relays/subscription_test.go @@ -1,12 +1,13 @@ package relays import ( - "context" "fmt" "sync/atomic" "testing" "time" + "github.com/Hubmakerlabs/replicatr/pkg/context" + "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/event" "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/filter" "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/filters" @@ -19,7 +20,7 @@ func TestSubscribeBasic(t *testing.T) { rl := MustRelayConnect(RELAY) defer rl.Close() - sub, e := rl.Subscribe(context.Background(), filters.T{{Kinds: []int{event.KindTextNote}, Limit: 2}}) + sub, e := rl.Subscribe(context.Bg(), filters.T{{Kinds: []int{event.KindTextNote}, Limit: 2}}) if e != nil { t.Fatalf("subscription failed: %v", e) return @@ -60,7 +61,7 @@ func TestNestedSubscriptions(t *testing.T) { n := atomic.Uint32{} // fetch 2 replies to a note - sub, e := rl.Subscribe(context.Background(), filters.T{{Kinds: []int{event.KindTextNote}, Tags: filter.TagMap{"e": []string{"0e34a74f8547e3b95d52a2543719b109fd0312aba144e2ef95cba043f42fe8c5"}}, Limit: 3}}) + sub, e := rl.Subscribe(context.Bg(), filters.T{{Kinds: []int{event.KindTextNote}, Tags: filter.TagMap{"e": []string{"0e34a74f8547e3b95d52a2543719b109fd0312aba144e2ef95cba043f42fe8c5"}}, Limit: 3}}) if e != nil { t.Fatalf("subscription 1 failed: %v", e) return @@ -70,7 +71,7 @@ func TestNestedSubscriptions(t *testing.T) { select { case evt := <-sub.Events: // now fetch author of this - sub, e := rl.Subscribe(context.Background(), filters.T{{Kinds: []int{event.KindProfileMetadata}, Authors: []string{evt.PubKey}, Limit: 1}}) + sub, e := rl.Subscribe(context.Bg(), filters.T{{Kinds: []int{event.KindProfileMetadata}, Authors: []string{evt.PubKey}, Limit: 1}}) if e != nil { t.Fatalf("subscription 2 failed: %v", e) return @@ -80,7 +81,7 @@ func TestNestedSubscriptions(t *testing.T) { select { case <-sub.Events: // do another subscription here in "sync" mode, just so we're sure things are not blocking - rl.QuerySync(context.Background(), filter.T{Limit: 1}) + rl.QuerySync(context.Bg(), filter.T{Limit: 1}) n.Add(1) if n.Load() == 3 { diff --git a/pkg/nostr-sdk/input.go b/pkg/nostr-sdk/input.go index f5adc6d5..5c444509 100644 --- a/pkg/nostr-sdk/input.go +++ b/pkg/nostr-sdk/input.go @@ -1,7 +1,7 @@ package sdk import ( - "context" + "github.com/Hubmakerlabs/replicatr/pkg/context" "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/nip05" "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/nip19" @@ -10,7 +10,7 @@ import ( ) // InputToProfile turns any npub/nprofile/hex/nip05 input into a ProfilePointer (or nil). -func InputToProfile(ctx context.Context, input string) *pointers.ProfilePointer { +func InputToProfile(ctx context.T, input string) *pointers.ProfilePointer { // handle if it is a hex string if len(input) == 64 { if _, e := hex.Dec(input); e == nil { diff --git a/pkg/nostr-sdk/metadata.go b/pkg/nostr-sdk/metadata.go index 7f9b5a0f..a842a5bb 100644 --- a/pkg/nostr-sdk/metadata.go +++ b/pkg/nostr-sdk/metadata.go @@ -1,10 +1,11 @@ package sdk import ( - "context" "encoding/json" "fmt" + "github.com/Hubmakerlabs/replicatr/pkg/context" + "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/event" "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/filters" "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/nip19" @@ -31,7 +32,7 @@ func (p ProfileMetadata) Npub() string { return v } -func (p ProfileMetadata) Nprofile(ctx context.Context, sys *System, nrelays int) string { +func (p ProfileMetadata) Nprofile(ctx context.T, sys *System, nrelays int) string { v, _ := nip19.EncodeProfile(p.PubKey, sys.FetchOutboxRelays(ctx, p.PubKey)) return v } @@ -47,8 +48,8 @@ func (p ProfileMetadata) ShortName() string { return npub[0:7] + "…" + npub[58:] } -func FetchProfileMetadata(ctx context.Context, pool *pools.SimplePool, pubkey string, relays ...string) ProfileMetadata { - ctx, cancel := context.WithCancel(ctx) +func FetchProfileMetadata(ctx context.T, pool *pools.SimplePool, pubkey string, relays ...string) ProfileMetadata { + ctx, cancel := context.Cancel(ctx) defer cancel() ch := pool.SubManyEose(ctx, relays, filters.T{ diff --git a/pkg/nostr-sdk/outbox.go b/pkg/nostr-sdk/outbox.go index 0254915d..8b6c78fe 100644 --- a/pkg/nostr-sdk/outbox.go +++ b/pkg/nostr-sdk/outbox.go @@ -1,16 +1,17 @@ package sdk import ( - "context" "fmt" "sync" + "github.com/Hubmakerlabs/replicatr/pkg/context" + "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/filter" "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/relays" ) func (sys *System) ExpandQueriesByAuthorAndRelays( - ctx context.Context, + ctx context.T, f filter.T, ) (map[*relays.Relay]filter.T, error) { n := len(f.Authors) diff --git a/pkg/nostr-sdk/relays.go b/pkg/nostr-sdk/relays.go index 76c5babd..c78b6736 100644 --- a/pkg/nostr-sdk/relays.go +++ b/pkg/nostr-sdk/relays.go @@ -1,9 +1,10 @@ package sdk import ( - "context" "encoding/json" + "github.com/Hubmakerlabs/replicatr/pkg/context" + "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/event" "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/filters" "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/pools" @@ -17,8 +18,8 @@ type Relay struct { Outbox bool } -func FetchRelaysForPubkey(ctx context.Context, pool *pools.SimplePool, pubkey string, relays ...string) []Relay { - ctx, cancel := context.WithCancel(ctx) +func FetchRelaysForPubkey(ctx context.T, pool *pools.SimplePool, pubkey string, relays ...string) []Relay { + ctx, cancel := context.Cancel(ctx) defer cancel() ch := pool.SubManyEose(ctx, relays, filters.T{ diff --git a/pkg/nostr-sdk/system.go b/pkg/nostr-sdk/system.go index d1264227..eae13fb3 100644 --- a/pkg/nostr-sdk/system.go +++ b/pkg/nostr-sdk/system.go @@ -1,11 +1,12 @@ package sdk import ( - "context" "fmt" "sync" "time" + "github.com/Hubmakerlabs/replicatr/pkg/context" + "github.com/Hubmakerlabs/replicatr/pkg/eventstore" "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/event" "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/filter" @@ -30,12 +31,12 @@ func (sys System) StoreRelay() eventstore.RelayInterface { return eventstore.RelayWrapper{Store: sys.Store} } -func (sys System) FetchRelays(ctx context.Context, pubkey string) []Relay { +func (sys System) FetchRelays(ctx context.T, pubkey string) []Relay { if v, ok := sys.RelaysCache.Get(pubkey); ok { return v } - ctx, cancel := context.WithTimeout(ctx, time.Second*5) + ctx, cancel := context.Timeout(ctx, time.Second*5) defer cancel() res := FetchRelaysForPubkey(ctx, sys.Pool, pubkey, sys.RelayListRelays...) @@ -43,7 +44,7 @@ func (sys System) FetchRelays(ctx context.Context, pubkey string) []Relay { return res } -func (sys System) FetchOutboxRelays(ctx context.Context, pubkey string) []string { +func (sys System) FetchOutboxRelays(ctx context.T, pubkey string) []string { relays := sys.FetchRelays(ctx, pubkey) result := make([]string, 0, len(relays)) for _, rl := range relays { @@ -56,13 +57,13 @@ func (sys System) FetchOutboxRelays(ctx context.Context, pubkey string) []string // FetchProfileMetadata fetches metadata for a given user from the local cache, or from the local store, // or, failing these, from the target user's defined outbox relays -- then caches the result. -func (sys System) FetchProfileMetadata(ctx context.Context, pubkey string) ProfileMetadata { +func (sys System) FetchProfileMetadata(ctx context.T, pubkey string) ProfileMetadata { pm, _ := sys.fetchProfileMetadata(ctx, pubkey) return pm } // FetchOrStoreProfileMetadata is like FetchProfileMetadata, but also saves the result to the sys.Store -func (sys System) FetchOrStoreProfileMetadata(ctx context.Context, pubkey string) ProfileMetadata { +func (sys System) FetchOrStoreProfileMetadata(ctx context.T, pubkey string) ProfileMetadata { pm, fromInternal := sys.fetchProfileMetadata(ctx, pubkey) if !fromInternal { sys.StoreRelay().Publish(ctx, pm.Event) @@ -70,7 +71,7 @@ func (sys System) FetchOrStoreProfileMetadata(ctx context.Context, pubkey string return pm } -func (sys System) fetchProfileMetadata(ctx context.Context, pubkey string) (pm ProfileMetadata, fromInternal bool) { +func (sys System) fetchProfileMetadata(ctx context.T, pubkey string) (pm ProfileMetadata, fromInternal bool) { if v, ok := sys.MetadataCache.Get(pubkey); ok { return v, true } @@ -87,11 +88,11 @@ func (sys System) fetchProfileMetadata(ctx context.Context, pubkey string) (pm P } } - ctxRelays, cancel := context.WithTimeout(ctx, time.Second*2) + ctxRelays, cancel := context.Timeout(ctx, time.Second*2) relays := sys.FetchOutboxRelays(ctxRelays, pubkey) cancel() - ctx, cancel = context.WithTimeout(ctx, time.Second*3) + ctx, cancel = context.Timeout(ctx, time.Second*3) res := FetchProfileMetadata(ctx, sys.Pool, pubkey, append(relays, sys.MetadataRelays...)...) cancel() @@ -100,7 +101,7 @@ func (sys System) fetchProfileMetadata(ctx context.Context, pubkey string) (pm P } // FetchUserEvents fetches events from each users' outbox relays, grouping queries when possible. -func (sys System) FetchUserEvents(ctx context.Context, filt filter.T) (map[string][]*event.T, error) { +func (sys System) FetchUserEvents(ctx context.T, filt filter.T) (map[string][]*event.T, error) { filters, e := sys.ExpandQueriesByAuthorAndRelays(ctx, filt) if e != nil { return nil, fmt.Errorf("failed to expand queries: %w", e) diff --git a/pkg/nostr/binary/binary_test.go b/pkg/nostr/binary/binary_test.go index fe928807..0b78670c 100644 --- a/pkg/nostr/binary/binary_test.go +++ b/pkg/nostr/binary/binary_test.go @@ -55,12 +55,12 @@ func TestBinaryEncode(t *testing.T) { if e != nil { t.Fatalf("failed to encode binary: %s", e) } - var evt event.T - if e := Unmarshal(bevt, &evt); e != nil { + var ev event.T + if e := Unmarshal(bevt, &ev); e != nil { t.Fatalf("error unmarshalling binary: %s", e) } checkParsedCorrectly(t, &pevt, jevt) - checkParsedCorrectly(t, &evt, jevt) + checkParsedCorrectly(t, &ev, jevt) } } diff --git a/pkg/nostr/nip5/nip05.go b/pkg/nostr/nip5/nip05.go index e624dc49..f63d15a3 100644 --- a/pkg/nostr/nip5/nip05.go +++ b/pkg/nostr/nip5/nip05.go @@ -1,12 +1,13 @@ package nip5 import ( - "context" "encoding/json" "fmt" "net/http" "strings" + "github.com/Hubmakerlabs/replicatr/pkg/context" + "github.com/Hubmakerlabs/replicatr/pkg/hex" log2 "github.com/Hubmakerlabs/replicatr/pkg/log" "github.com/Hubmakerlabs/replicatr/pkg/nostr/pointers" @@ -24,7 +25,7 @@ type WellKnownResponse struct { Relays key2RelaysMap `json:"relays"` // NIP-35 } -func QueryIdentifier(ctx context.Context, fullname string) (pp *pointers.Profile, e error) { +func QueryIdentifier(ctx context.T, fullname string) (pp *pointers.Profile, e error) { spl := strings.Split(fullname, "@") var name, domain string switch len(spl) { diff --git a/pkg/nostr/relay/relay.go b/pkg/nostr/relay/relay.go index 96c9be59..059922cb 100644 --- a/pkg/nostr/relay/relay.go +++ b/pkg/nostr/relay/relay.go @@ -2,13 +2,14 @@ package relay import ( "bytes" - "context" "fmt" "net/http" "sync" "sync/atomic" "time" + "github.com/Hubmakerlabs/replicatr/pkg/context" + "github.com/Hubmakerlabs/replicatr/pkg/go-nostr/connect" "github.com/Hubmakerlabs/replicatr/pkg/nostr/enveloper" "github.com/Hubmakerlabs/replicatr/pkg/nostr/envelopes" @@ -48,7 +49,7 @@ var _ Option = (WithNoticeHandler)(nil) // WithAuthHandler takes an auth event and expects it to be signed. when not // given, AUTH messages from relays are ignored. -type WithAuthHandler func(ctx context.Context, authEvent *event.T) (ok bool) +type WithAuthHandler func(ctx context.T, authEvent *event.T) (ok bool) func (_ WithAuthHandler) IsRelayOption() {} @@ -81,8 +82,8 @@ type Relay struct { Connection *connect.Connection Subscriptions *xsync.MapOf[string, *Subscription] Err error - ctx context.Context // will be canceled when the connection closes - cancel context.CancelFunc + ctx context.T // will be canceled when the connection closes + cancel context.F challenges chan string // NIP-42 challenges notices chan string // NIP-01 NOTICEs okCallbacks *xsync.MapOf[string, func(bool, *string)] @@ -101,8 +102,8 @@ type WriteRequest struct { // New returns a new relay. The relay connection will be closed when the context // is canceled. -func New(ctx context.Context, url string, opts ...Option) (r *Relay) { - ctx, cancel := context.WithCancel(ctx) +func New(ctx context.T, url string, opts ...Option) (r *Relay) { + ctx, cancel := context.Cancel(ctx) r = &Relay{ URL: normalize.URL(url), ctx: ctx, @@ -151,10 +152,10 @@ func New(ctx context.Context, url string, opts ...Option) (r *Relay) { // RelayConnect returns a relay object connected to url. Once successfully // connected, cancelling ctx has no effect. To close the connection, call // r.Close(). -func RelayConnect(ctx context.Context, url string, +func RelayConnect(ctx context.T, url string, opts ...Option) (r *Relay, e error) { - r = New(context.Background(), url, opts...) + r = New(context.Bg(), url, opts...) e = r.Connect(ctx) return } @@ -165,7 +166,7 @@ func RelayConnect(ctx context.Context, url string, func (r *Relay) String() string { return r.URL } // Context retrieves the context that is associated with this relay connection. -func (r *Relay) Context() context.Context { return r.ctx } +func (r *Relay) Context() context.T { return r.ctx } // IsConnected returns true if the connection to this relay seems to be active. func (r *Relay) IsConnected() bool { return r.ctx.Err() == nil } @@ -178,7 +179,7 @@ func (r *Relay) IsConnected() bool { return r.ctx.Err() == nil } // The underlying relay connection will use a background context. If you want to // pass a custom context to the underlying relay connection, use New() and then // Relay.Connect(). -func (r *Relay) Connect(ctx context.Context) (e error) { +func (r *Relay) Connect(ctx context.T) (e error) { if r.ctx == nil || r.Subscriptions == nil { return fmt.Errorf("relay must be initialized with a call to New()") } @@ -187,8 +188,8 @@ func (r *Relay) Connect(ctx context.Context) (e error) { } if _, ok := ctx.Deadline(); !ok { // if no timeout is set, set it to 7 seconds - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, 7*time.Second) + var cancel context.F + ctx, cancel = context.Timeout(ctx, 7*time.Second) defer cancel() } var conn *connect.Connection @@ -336,19 +337,19 @@ func (r *Relay) Write(msg []byte) <-chan error { // Publish sends an "EVENT" command to the relay r as in NIP-01. Status can be: // success, failed, or sent (no response from relay before ctx times out). -func (r *Relay) Publish(ctx context.Context, evt *event.T) (s Status, e error) { +func (r *Relay) Publish(ctx context.T, evt *event.T) (s Status, e error) { s = PublishStatusFailed // data races on status variable without this mutex var mu sync.Mutex if _, ok := ctx.Deadline(); !ok { // if no timeout is set, force it to 7 seconds - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, 7*time.Second) + var cancel context.F + ctx, cancel = context.Timeout(ctx, 7*time.Second) defer cancel() } // make it cancellable so we can stop everything upon receiving an "OK" - var cancel context.CancelFunc - ctx, cancel = context.WithCancel(ctx) + var cancel context.F + ctx, cancel = context.Cancel(ctx) defer cancel() // listen for an OK callback okCallback := func(ok bool, msg *string) { @@ -395,19 +396,19 @@ func (r *Relay) Publish(ctx context.Context, evt *event.T) (s Status, e error) { // // Status can be: success, failed, or sent (no response from relay before ctx // times out). -func (r *Relay) Auth(ctx context.Context, event *event.T) (s Status, e error) { +func (r *Relay) Auth(ctx context.T, event *event.T) (s Status, e error) { s = PublishStatusFailed // data races on s variable without this mutex var mu sync.Mutex if _, ok := ctx.Deadline(); !ok { // if no timeout is set, force it to 3 seconds - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, 3*time.Second) + var cancel context.F + ctx, cancel = context.Timeout(ctx, 3*time.Second) defer cancel() } // make it cancellable so we can stop everything upon receiving an "OK" - var cancel context.CancelFunc - ctx, cancel = context.WithCancel(ctx) + var cancel context.F + ctx, cancel = context.Cancel(ctx) defer cancel() // listen for an OK callback okCallback := func(ok bool, msg *string) { @@ -452,9 +453,9 @@ func (r *Relay) Auth(ctx context.Context, event *event.T) (s Status, e error) { // context ctx is cancelled ("CLOSE" in NIP-01). // // Remember to Cancel subscriptions, either by calling `.Unsub()` on them or -// ensuring their `context.Context` will be canceled at some point. Failure to +// ensuring their `context.T` will be canceled at some point. Failure to // do that will result in a huge number of halted goroutines being created. -func (r *Relay) Subscribe(ctx context.Context, filters filters.T, +func (r *Relay) Subscribe(ctx context.T, filters filters.T, opts ...SubscriptionOption) (s *Subscription, e error) { s = r.PrepareSubscription(ctx, filters, opts...) @@ -467,16 +468,16 @@ func (r *Relay) Subscribe(ctx context.Context, filters filters.T, // PrepareSubscription creates a subscription, but doesn't fire it. // // Remember to Cancel subscriptions, either by calling `.Unsub()` on them or -// ensuring their `context.Context` will be canceled at some point. Failure to +// ensuring their `context.T` will be canceled at some point. Failure to // do that will result in a huge number of halted goroutines being created. -func (r *Relay) PrepareSubscription(ctx context.Context, filters filters.T, +func (r *Relay) PrepareSubscription(ctx context.T, filters filters.T, opts ...SubscriptionOption) (s *Subscription) { if r.Connection == nil { panic(fmt.Errorf("must call .Connect() first before calling .Subscribe()")) } current := r.subscriptionIDCounter.Add(1) - ctx, cancel := context.WithCancel(ctx) + ctx, cancel := context.Cancel(ctx) s = &Subscription{ Relay: r, Context: ctx, @@ -499,7 +500,7 @@ func (r *Relay) PrepareSubscription(ctx context.Context, filters filters.T, return } -func (r *Relay) QuerySync(ctx context.Context, f *filter.T, +func (r *Relay) QuerySync(ctx context.T, f *filter.T, opts ...SubscriptionOption) (evs []*event.T, e error) { var sub *Subscription @@ -509,8 +510,8 @@ func (r *Relay) QuerySync(ctx context.Context, f *filter.T, defer sub.Unsub() if _, ok := ctx.Deadline(); !ok { // if no timeout is set, force it to 7 seconds - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, 7*time.Second) + var cancel context.F + ctx, cancel = context.Timeout(ctx, 7*time.Second) defer cancel() } for { @@ -529,7 +530,7 @@ func (r *Relay) QuerySync(ctx context.Context, f *filter.T, } } -func (r *Relay) Count(ctx context.Context, filters filters.T, +func (r *Relay) Count(ctx context.T, filters filters.T, opts ...SubscriptionOption) (c int64, e error) { sub := r.PrepareSubscription(ctx, filters, opts...) @@ -540,8 +541,8 @@ func (r *Relay) Count(ctx context.Context, filters filters.T, defer sub.Unsub() if _, ok := ctx.Deadline(); !ok { // if no timeout is set, force it to 7 seconds - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, 7*time.Second) + var cancel context.F + ctx, cancel = context.Timeout(ctx, 7*time.Second) defer cancel() } for { diff --git a/pkg/nostr/relay/relay_test.go b/pkg/nostr/relay/relay_test.go index 674405cf..d58451ab 100644 --- a/pkg/nostr/relay/relay_test.go +++ b/pkg/nostr/relay/relay_test.go @@ -2,7 +2,6 @@ package relay import ( "bytes" - "context" "crypto/rand" "encoding/hex" "encoding/json" @@ -15,6 +14,9 @@ import ( "testing" "time" + "github.com/Hubmakerlabs/replicatr/pkg/context" + + btcec "github.com/Hubmakerlabs/replicatr/pkg/ec" "github.com/Hubmakerlabs/replicatr/pkg/nostr/event" "github.com/Hubmakerlabs/replicatr/pkg/nostr/filter" "github.com/Hubmakerlabs/replicatr/pkg/nostr/filters" @@ -23,7 +25,6 @@ import ( "github.com/Hubmakerlabs/replicatr/pkg/nostr/normalize" "github.com/Hubmakerlabs/replicatr/pkg/nostr/tags" "github.com/Hubmakerlabs/replicatr/pkg/nostr/timestamp" - btcec "github.com/Hubmakerlabs/replicatr/pkg/ec" "golang.org/x/net/websocket" ) @@ -68,7 +69,7 @@ func TestPublish(t *testing.T) { // connect a client and send the text note rl := MustRelayConnect(ws.URL) - status, _ := rl.Publish(context.Background(), textNote) + status, _ := rl.Publish(context.Bg(), textNote) if status != PublishStatusSucceeded { t.Errorf("published status is %d, not %d", status, PublishStatusSucceeded) } @@ -98,7 +99,7 @@ func TestPublishBlocked(t *testing.T) { // connect a client and send a text note rl := MustRelayConnect(ws.URL) - status, _ := rl.Publish(context.Background(), textNote) + status, _ := rl.Publish(context.Bg(), textNote) if status != PublishStatusFailed { t.Errorf("published status is %d, not %d", status, PublishStatusFailed) } @@ -120,7 +121,7 @@ func TestPublishWriteFailed(t *testing.T) { rl := MustRelayConnect(ws.URL) // Force brief period of time so that publish always fails on closed socket. time.Sleep(1 * time.Millisecond) - status, e := rl.Publish(context.Background(), textNote) + status, e := rl.Publish(context.Bg(), textNote) if status != PublishStatusFailed { t.Errorf("published status is %d, not %d, err: %v", status, PublishStatusFailed, e) } @@ -139,7 +140,7 @@ func TestConnectContext(t *testing.T) { defer ws.Close() // relay client - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + ctx, cancel := context.Timeout(context.Bg(), 3*time.Second) defer cancel() r, e := RelayConnect(ctx, ws.URL) if e != nil { @@ -160,7 +161,7 @@ func TestConnectContextCanceled(t *testing.T) { defer ws.Close() // relay client - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.Cancel(context.Bg()) cancel() // make ctx expired _, e := RelayConnect(ctx, ws.URL) if !errors.Is(e, context.Canceled) { @@ -175,9 +176,9 @@ func TestConnectWithOrigin(t *testing.T) { defer ws.Close() // relay client - r := New(context.Background(), normalize.URL(ws.URL)) + r := New(context.Bg(), normalize.URL(ws.URL)) r.RequestHeader = http.Header{"origin": {"https://example.com"}} - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + ctx, cancel := context.Timeout(context.Bg(), 3*time.Second) defer cancel() e := r.Connect(ctx) if e != nil { @@ -214,7 +215,7 @@ func makeKeyPair(t *testing.T) (priv, pub string) { } func MustRelayConnect(url string) *Relay { - rl, e := RelayConnect(context.Background(), url) + rl, e := RelayConnect(context.Bg(), url) if e != nil { panic(e.Error()) } diff --git a/pkg/nostr/relay/subscription.go b/pkg/nostr/relay/subscription.go index 510cc249..67cb0dd8 100644 --- a/pkg/nostr/relay/subscription.go +++ b/pkg/nostr/relay/subscription.go @@ -1,12 +1,13 @@ package relay import ( - "context" "fmt" "strconv" "sync" "sync/atomic" + "github.com/Hubmakerlabs/replicatr/pkg/context" + close2 "github.com/Hubmakerlabs/replicatr/pkg/nostr/envelopes/closer" "github.com/Hubmakerlabs/replicatr/pkg/nostr/envelopes/countrequest" "github.com/Hubmakerlabs/replicatr/pkg/nostr/envelopes/req" @@ -29,10 +30,10 @@ type Subscription struct { // the EndOfStoredEvents channel gets closed when an EOSE comes for that subscription EndOfStoredEvents chan struct{} // Context will be .Done() when the subscription ends - Context context.Context + Context context.T Live atomic.Bool Eosed atomic.Bool - Cancel context.CancelFunc + Cancel context.F // this keeps track of the events we've received before the EOSE that we // must dispatch before closing the EndOfStoredEvents channel Storedwg sync.WaitGroup @@ -133,7 +134,7 @@ func (sub *Subscription) Close() { // Sub sets sub.T and then calls sub.Fire(ctx). // The subscription will be closed if the context expires. -func (sub *Subscription) Sub(_ context.Context, filters filters.T) { +func (sub *Subscription) Sub(_ context.T, filters filters.T) { sub.Filters = filters if e := sub.Fire(); log.Fail(e) { } diff --git a/pkg/nostr/relay/subscription_test.go b/pkg/nostr/relay/subscription_test.go index 0ddc4940..bad544a8 100644 --- a/pkg/nostr/relay/subscription_test.go +++ b/pkg/nostr/relay/subscription_test.go @@ -1,12 +1,13 @@ package relay import ( - "context" "fmt" "sync/atomic" "testing" "time" + "github.com/Hubmakerlabs/replicatr/pkg/context" + "github.com/Hubmakerlabs/replicatr/pkg/nostr/filter" "github.com/Hubmakerlabs/replicatr/pkg/nostr/filters" "github.com/Hubmakerlabs/replicatr/pkg/nostr/kind" @@ -26,7 +27,7 @@ func TestSubscribe(t *testing.T) { rl := MustRelayConnect(RELAY) defer rl.Close() - sub, e := rl.Subscribe(context.Background(), + sub, e := rl.Subscribe(context.Bg(), filters.T{ {Kinds: kinds.T{kind.TextNote}, Limit: 2}, }) @@ -70,7 +71,7 @@ func TestNestedSubscriptions(t *testing.T) { n := atomic.Uint32{} // fetch 2 replies to a note - sub, e := rl.Subscribe(context.Background(), filters.T{{ + sub, e := rl.Subscribe(context.Bg(), filters.T{{ Kinds: kinds.T{kind.TextNote}, Tags: filter.TagMap{"e": []string{"0e34a74f8547e3b95d52a2543719b109fd0312aba144e2ef95cba043f42fe8c5"}}, Limit: 3, @@ -84,7 +85,7 @@ func TestNestedSubscriptions(t *testing.T) { select { case event := <-sub.Events: // now fetch author of this - sub, e := rl.Subscribe(context.Background(), filters.T{{Kinds: kinds.T{kind.SetMetadata}, Authors: []string{event.PubKey}, Limit: 1}}) + sub, e := rl.Subscribe(context.Bg(), filters.T{{Kinds: kinds.T{kind.SetMetadata}, Authors: []string{event.PubKey}, Limit: 1}}) if e != nil { t.Errorf("subscription 2 failed: %v", e) return @@ -94,7 +95,7 @@ func TestNestedSubscriptions(t *testing.T) { select { case <-sub.Events: // do another subscription here in "sync" mode, just so we're sure things are not blocking - evs, e := rl.QuerySync(context.Background(), &filter.T{Limit: 1}) + evs, e := rl.QuerySync(context.Bg(), &filter.T{Limit: 1}) if log.Fail(e) { } diff --git a/pkg/nostr/sdk/input.go b/pkg/nostr/sdk/input.go index a7424230..7a79c22f 100644 --- a/pkg/nostr/sdk/input.go +++ b/pkg/nostr/sdk/input.go @@ -1,7 +1,7 @@ package sdk import ( - "context" + "github.com/Hubmakerlabs/replicatr/pkg/context" "github.com/Hubmakerlabs/replicatr/pkg/hex" "github.com/Hubmakerlabs/replicatr/pkg/nostr/eventid" @@ -13,7 +13,7 @@ import ( // InputToProfile turns any npub/nprofile/hex/nip5 input into a ProfilePointer // (or nil). -func InputToProfile(ctx context.Context, input string) (pp *pointers.Profile) { +func InputToProfile(ctx context.T, input string) (pp *pointers.Profile) { var e error // handle if it is a hex string if len(input) == 64 { diff --git a/pkg/nostr/sdk/metadata.go b/pkg/nostr/sdk/metadata.go index 49bcac40..4b53faf0 100644 --- a/pkg/nostr/sdk/metadata.go +++ b/pkg/nostr/sdk/metadata.go @@ -1,10 +1,11 @@ package sdk import ( - "context" "encoding/json" "fmt" + "github.com/Hubmakerlabs/replicatr/pkg/context" + "github.com/Hubmakerlabs/replicatr/pkg/nostr/event" "github.com/Hubmakerlabs/replicatr/pkg/nostr/filters" "github.com/Hubmakerlabs/replicatr/pkg/nostr/kind" @@ -36,7 +37,7 @@ func (p ProfileMetadata) Npub() string { return v } -func (p ProfileMetadata) Nprofile(ctx context.Context, sys *System, +func (p ProfileMetadata) Nprofile(ctx context.T, sys *System, nrelays int) string { v, e := nip19.EncodeProfile(p.PubKey, @@ -56,10 +57,10 @@ func (p ProfileMetadata) ShortName() string { return npub[0:7] + "…" + npub[58:] } -func FetchProfileMetadata(ctx context.Context, pool *pool.SimplePool, +func FetchProfileMetadata(ctx context.T, pool *pool.SimplePool, pubkey string, relays ...string) (pm *ProfileMetadata) { - ctx, cancel := context.WithCancel(ctx) + ctx, cancel := context.Cancel(ctx) defer cancel() ch := pool.SubManyEose(ctx, relays, filters.T{ { diff --git a/pkg/nostr/sdk/outbox.go b/pkg/nostr/sdk/outbox.go index da0363e7..34bb8488 100644 --- a/pkg/nostr/sdk/outbox.go +++ b/pkg/nostr/sdk/outbox.go @@ -1,16 +1,17 @@ package sdk import ( - "context" "fmt" "sync" + "github.com/Hubmakerlabs/replicatr/pkg/context" + "github.com/Hubmakerlabs/replicatr/pkg/nostr/filter" "github.com/Hubmakerlabs/replicatr/pkg/nostr/relay" ) func (s *System) ExpandQueriesByAuthorAndRelays( - ctx context.Context, + ctx context.T, f *filter.T, ) (filters map[*relay.Relay]*filter.T, e error) { diff --git a/pkg/nostr/sdk/relays.go b/pkg/nostr/sdk/relays.go index 51d54331..33036f8b 100644 --- a/pkg/nostr/sdk/relays.go +++ b/pkg/nostr/sdk/relays.go @@ -1,11 +1,12 @@ package sdk import ( - "context" "encoding/json" "net/url" "strings" + "github.com/Hubmakerlabs/replicatr/pkg/context" + "github.com/Hubmakerlabs/replicatr/pkg/nostr/event" "github.com/Hubmakerlabs/replicatr/pkg/nostr/filters" "github.com/Hubmakerlabs/replicatr/pkg/nostr/kind" @@ -20,8 +21,8 @@ type Relay struct { Outbox bool } -func FetchRelaysForPubkey(ctx context.Context, pool *pool.SimplePool, pubkey string, relays ...string) (r []Relay) { - ctx, cancel := context.WithCancel(ctx) +func FetchRelaysForPubkey(ctx context.T, pool *pool.SimplePool, pubkey string, relays ...string) (r []Relay) { + ctx, cancel := context.Cancel(ctx) defer cancel() ch := pool.SubManyEose(ctx, relays, filters.T{ { diff --git a/pkg/nostr/sdk/system.go b/pkg/nostr/sdk/system.go index 6fb203c1..5a2ca9a9 100644 --- a/pkg/nostr/sdk/system.go +++ b/pkg/nostr/sdk/system.go @@ -1,11 +1,12 @@ package sdk import ( - "context" "fmt" "sync" "time" + "github.com/Hubmakerlabs/replicatr/pkg/context" + "github.com/Hubmakerlabs/replicatr/pkg/nostr/event" "github.com/Hubmakerlabs/replicatr/pkg/nostr/filter" "github.com/Hubmakerlabs/replicatr/pkg/nostr/filters" @@ -32,18 +33,18 @@ func (s *System) StoreRelay() eventstore.RelayInterface { return eventstore.RelayWrapper{Store: s.Store} } -func (s *System) FetchRelays(ctx context.Context, pubkey string) []Relay { +func (s *System) FetchRelays(ctx context.T, pubkey string) []Relay { if v, ok := s.RelaysCache.Get(pubkey); ok { return v } - ctx, cancel := context.WithTimeout(ctx, time.Second*5) + ctx, cancel := context.Timeout(ctx, time.Second*5) defer cancel() res := FetchRelaysForPubkey(ctx, s.Pool, pubkey, s.RelayListRelays...) s.RelaysCache.SetWithTTL(pubkey, res, time.Hour*6) return res } -func (s *System) FetchOutboxRelays(ctx context.Context, pubkey string) []string { +func (s *System) FetchOutboxRelays(ctx context.T, pubkey string) []string { relays := s.FetchRelays(ctx, pubkey) result := make([]string, 0, len(relays)) for _, rl := range relays { @@ -56,7 +57,7 @@ func (s *System) FetchOutboxRelays(ctx context.Context, pubkey string) []string // FetchProfileMetadata fetches metadata for a given user from the local cache, or from the local store, // or, failing these, from the target user's defined outbox relays -- then caches the result. -func (s *System) FetchProfileMetadata(ctx context.Context, +func (s *System) FetchProfileMetadata(ctx context.T, pubkey string) *ProfileMetadata { pm, _ := s.fetchProfileMetadata(ctx, pubkey) @@ -65,7 +66,7 @@ func (s *System) FetchProfileMetadata(ctx context.Context, // FetchOrStoreProfileMetadata is like FetchProfileMetadata, but also saves the // result to the sys.Store -func (s *System) FetchOrStoreProfileMetadata(ctx context.Context, +func (s *System) FetchOrStoreProfileMetadata(ctx context.T, pubkey string) *ProfileMetadata { pm, fromInternal := s.fetchProfileMetadata(ctx, pubkey) @@ -75,7 +76,7 @@ func (s *System) FetchOrStoreProfileMetadata(ctx context.Context, return pm } -func (s *System) fetchProfileMetadata(ctx context.Context, +func (s *System) fetchProfileMetadata(ctx context.T, pubkey string) (pm *ProfileMetadata, fromInternal bool) { if v, ok := s.MetadataCache.Get(pubkey); ok { @@ -94,10 +95,10 @@ func (s *System) fetchProfileMetadata(ctx context.Context, } } } - ctxRelays, cancel := context.WithTimeout(ctx, time.Second*2) + ctxRelays, cancel := context.Timeout(ctx, time.Second*2) relays := s.FetchOutboxRelays(ctxRelays, pubkey) cancel() - ctx, cancel = context.WithTimeout(ctx, time.Second*3) + ctx, cancel = context.Timeout(ctx, time.Second*3) res := FetchProfileMetadata(ctx, s.Pool, pubkey, append(relays, s.MetadataRelays...)...) cancel() s.MetadataCache.SetWithTTL(pubkey, res, time.Hour*6) @@ -106,7 +107,7 @@ func (s *System) fetchProfileMetadata(ctx context.Context, // FetchUserEvents fetches events from each users' outbox relays, grouping // queries when possible. -func (s *System) FetchUserEvents(ctx context.Context, +func (s *System) FetchUserEvents(ctx context.T, f *filter.T) (r map[string][]*event.T, e error) { var ff map[*relay.Relay]*filter.T diff --git a/pkg/pool/pool.go b/pkg/pool/pool.go index 4fcd4396..8016f3dd 100644 --- a/pkg/pool/pool.go +++ b/pkg/pool/pool.go @@ -1,11 +1,12 @@ package pool import ( - "context" "fmt" "sync" "time" + "github.com/Hubmakerlabs/replicatr/pkg/context" + log2 "github.com/Hubmakerlabs/replicatr/pkg/log" "github.com/Hubmakerlabs/replicatr/pkg/nostr/event" "github.com/Hubmakerlabs/replicatr/pkg/nostr/filter" @@ -31,9 +32,9 @@ func namedLock(name string) (unlock func()) { type SimplePool struct { Relays map[string]*relay.Relay - Context context.Context + Context context.T - cancel context.CancelFunc + cancel context.F } type IncomingEvent struct { @@ -41,8 +42,8 @@ type IncomingEvent struct { Relay *relay.Relay } -func NewSimplePool(ctx context.Context) *SimplePool { - ctx, cancel := context.WithCancel(ctx) +func NewSimplePool(ctx context.T) *SimplePool { + ctx, cancel := context.Cancel(ctx) return &SimplePool{ Relays: make(map[string]*relay.Relay), @@ -64,7 +65,7 @@ func (p *SimplePool) EnsureRelay(url string) (*relay.Relay, error) { } else { var e error // we use this ctx here so when the pool dies everything dies - ctx, cancel := context.WithTimeout(p.Context, time.Second*15) + ctx, cancel := context.Timeout(p.Context, time.Second*15) defer cancel() if rl, e = relay.RelayConnect(ctx, nm); e != nil { return nil, fmt.Errorf("failed to connect: %w", e) @@ -77,16 +78,16 @@ func (p *SimplePool) EnsureRelay(url string) (*relay.Relay, error) { // SubMany opens a subscription with the given filters to multiple relays // the subscriptions only end when the context is canceled -func (p *SimplePool) SubMany(ctx context.Context, urls []string, filters filters.T, unique bool) chan IncomingEvent { +func (p *SimplePool) SubMany(ctx context.T, urls []string, filters filters.T, unique bool) chan IncomingEvent { return p.subMany(ctx, urls, filters, true) } // SubManyNonUnique is like SubMany, but returns duplicate events if they come from different relays -func (p *SimplePool) SubManyNonUnique(ctx context.Context, urls []string, filters filters.T, unique bool) chan IncomingEvent { +func (p *SimplePool) SubManyNonUnique(ctx context.T, urls []string, filters filters.T, unique bool) chan IncomingEvent { return p.subMany(ctx, urls, filters, false) } -func (p *SimplePool) subMany(ctx context.Context, urls []string, filters filters.T, unique bool) chan IncomingEvent { +func (p *SimplePool) subMany(ctx context.T, urls []string, filters filters.T, unique bool) chan IncomingEvent { events := make(chan IncomingEvent) seenAlready := xsync.NewMapOf[bool]() @@ -130,17 +131,17 @@ func (p *SimplePool) subMany(ctx context.Context, urls []string, filters filters } // SubManyEose is like SubMany, but it stops subscriptions and closes the channel when gets a EOSE -func (p *SimplePool) SubManyEose(ctx context.Context, urls []string, filters filters.T) chan IncomingEvent { +func (p *SimplePool) SubManyEose(ctx context.T, urls []string, filters filters.T) chan IncomingEvent { return p.subManyEose(ctx, urls, filters, true) } // SubManyEoseNonUnique is like SubManyEose, but returns duplicate events if they come from different relays -func (p *SimplePool) SubManyEoseNonUnique(ctx context.Context, urls []string, filters filters.T) chan IncomingEvent { +func (p *SimplePool) SubManyEoseNonUnique(ctx context.T, urls []string, filters filters.T) chan IncomingEvent { return p.subManyEose(ctx, urls, filters, false) } -func (p *SimplePool) subManyEose(ctx context.Context, urls []string, filters filters.T, unique bool) chan IncomingEvent { - ctx, cancel := context.WithCancel(ctx) +func (p *SimplePool) subManyEose(ctx context.T, urls []string, filters filters.T, unique bool) chan IncomingEvent { + ctx, cancel := context.Cancel(ctx) events := make(chan IncomingEvent) seenAlready := xsync.NewMapOf[bool]() @@ -200,8 +201,8 @@ func (p *SimplePool) subManyEose(ctx context.Context, urls []string, filters fil } // QuerySingle returns the first event returned by the first relay, cancels everything else. -func (p *SimplePool) QuerySingle(ctx context.Context, urls []string, f *filter.T) *IncomingEvent { - ctx, cancel := context.WithCancel(ctx) +func (p *SimplePool) QuerySingle(ctx context.T, urls []string, f *filter.T) *IncomingEvent { + ctx, cancel := context.Cancel(ctx) defer cancel() for ievt := range p.SubManyEose(ctx, urls, filters.T{f}) { return &ievt diff --git a/pkg/relay/add-event.go b/pkg/relay/add-event.go index c2986bf2..ad2caf52 100644 --- a/pkg/relay/add-event.go +++ b/pkg/relay/add-event.go @@ -1,10 +1,11 @@ package relay import ( - "context" "errors" "fmt" + "github.com/Hubmakerlabs/replicatr/pkg/context" + "github.com/Hubmakerlabs/replicatr/pkg/nostr/envelopes/OK" "github.com/Hubmakerlabs/replicatr/pkg/nostr/event" "github.com/Hubmakerlabs/replicatr/pkg/nostr/filter" @@ -13,7 +14,7 @@ import ( "github.com/Hubmakerlabs/replicatr/pkg/relay/eventstore" ) -func (rl *Relay) AddEvent(ctx context.Context, evt *event.T) (e error) { +func (rl *Relay) AddEvent(ctx context.T, evt *event.T) (e error) { if evt == nil { return errors.New("error: event is nil") } @@ -90,7 +91,7 @@ func (rl *Relay) AddEvent(ctx context.Context, evt *event.T) (e error) { return nil } -func (rl *Relay) handleDeleteRequest(ctx context.Context, evt *event.T) (e error) { +func (rl *Relay) handleDeleteRequest(ctx context.T, evt *event.T) (e error) { var ch chan *event.T // event deletion -- nip09 for _, tag := range evt.Tags { diff --git a/pkg/relay/eventstore/badger/count.go b/pkg/relay/eventstore/badger/count.go index ff4029de..cd3b7d2a 100644 --- a/pkg/relay/eventstore/badger/count.go +++ b/pkg/relay/eventstore/badger/count.go @@ -1,17 +1,18 @@ package badger import ( - "context" "encoding/binary" "errors" + "github.com/Hubmakerlabs/replicatr/pkg/context" + nostr_binary "github.com/Hubmakerlabs/replicatr/pkg/nostr/binary" "github.com/Hubmakerlabs/replicatr/pkg/nostr/event" "github.com/Hubmakerlabs/replicatr/pkg/nostr/filter" "github.com/dgraph-io/badger/v4" ) -func (b *Backend) CountEvents(ctx context.Context, f *filter.T) (c int64, e error) { +func (b *Backend) CountEvents(ctx context.T, f *filter.T) (c int64, e error) { var queries []query var extraFilter *filter.T var since uint32 diff --git a/pkg/relay/eventstore/badger/delete.go b/pkg/relay/eventstore/badger/delete.go index f161287e..de817569 100644 --- a/pkg/relay/eventstore/badger/delete.go +++ b/pkg/relay/eventstore/badger/delete.go @@ -1,16 +1,17 @@ package badger import ( - "context" "errors" + "github.com/Hubmakerlabs/replicatr/pkg/context" + "github.com/Hubmakerlabs/replicatr/pkg/nostr/event" "github.com/dgraph-io/badger/v4" ) var serialDelete uint32 = 0 -func (b *Backend) DeleteEvent(ctx context.Context, evt *event.T) (e error) { +func (b *Backend) DeleteEvent(ctx context.T, evt *event.T) (e error) { deletionHappened := false e = b.Update(func(txn *badger.Txn) (e error) { diff --git a/pkg/relay/eventstore/badger/query.go b/pkg/relay/eventstore/badger/query.go index 2860b473..f543ecac 100644 --- a/pkg/relay/eventstore/badger/query.go +++ b/pkg/relay/eventstore/badger/query.go @@ -2,11 +2,12 @@ package badger import ( "container/heap" - "context" "encoding/binary" "errors" "fmt" + "github.com/Hubmakerlabs/replicatr/pkg/context" + "github.com/Hubmakerlabs/replicatr/pkg/hex" nostr_binary "github.com/Hubmakerlabs/replicatr/pkg/nostr/binary" "github.com/Hubmakerlabs/replicatr/pkg/nostr/event" @@ -138,7 +139,7 @@ func (b *Backend) Q(queries []query, since uint32, extraFilter *filter.T, f *fil } } -func (b *Backend) QueryEvents(ctx context.Context, f *filter.T) (evChan chan *event.T, e error) { +func (b *Backend) QueryEvents(ctx context.T, f *filter.T) (evChan chan *event.T, e error) { evChan = make(chan *event.T) var queries []query var extraFilter *filter.T diff --git a/pkg/relay/eventstore/badger/save.go b/pkg/relay/eventstore/badger/save.go index 513e7f70..8b116ea6 100644 --- a/pkg/relay/eventstore/badger/save.go +++ b/pkg/relay/eventstore/badger/save.go @@ -1,7 +1,7 @@ package badger import ( - "context" + "github.com/Hubmakerlabs/replicatr/pkg/context" nostr_binary "github.com/Hubmakerlabs/replicatr/pkg/nostr/binary" "github.com/Hubmakerlabs/replicatr/pkg/nostr/event" @@ -9,7 +9,7 @@ import ( "github.com/dgraph-io/badger/v4" ) -func (b *Backend) SaveEvent(ctx context.Context, evt *event.T) (e error) { +func (b *Backend) SaveEvent(ctx context.T, evt *event.T) (e error) { return b.Update(func(txn *badger.Txn) (e error) { // query event by id to ensure we don't save duplicates id := evt.ID.Bytes() diff --git a/pkg/relay/eventstore/relay_interface.go b/pkg/relay/eventstore/relay_interface.go index c6a9f569..8f9baa56 100644 --- a/pkg/relay/eventstore/relay_interface.go +++ b/pkg/relay/eventstore/relay_interface.go @@ -1,10 +1,11 @@ package eventstore import ( - "context" "errors" "fmt" + "github.com/Hubmakerlabs/replicatr/pkg/context" + "github.com/Hubmakerlabs/replicatr/pkg/nostr/event" "github.com/Hubmakerlabs/replicatr/pkg/nostr/filter" "github.com/Hubmakerlabs/replicatr/pkg/nostr/kinds" @@ -12,8 +13,8 @@ import ( // RelayInterface is a wrapper thing that unifies Store and nostr.Relay under a common API. type RelayInterface interface { - Publish(ctx context.Context, event event.T) error - QuerySync(ctx context.Context, f *filter.T, opts ...SubscriptionOption) ([]*event.T, error) + Publish(ctx context.T, event event.T) error + QuerySync(ctx context.T, f *filter.T, opts ...SubscriptionOption) ([]*event.T, error) } // SubscriptionOption is the type of the argument passed for that. @@ -37,37 +38,37 @@ type RelayWrapper struct { // compile time interface check var _ RelayInterface = (*RelayWrapper)(nil) -func (w RelayWrapper) Publish(ctx context.Context, evt event.T) (e error) { - if evt.Kind.IsEphemeral() { +func (w RelayWrapper) Publish(ctx context.T, ev event.T) (e error) { + if ev.Kind.IsEphemeral() { // do not store ephemeral events return nil - } else if evt.Kind.IsReplaceable() { + } else if ev.Kind.IsReplaceable() { // replaceable event, delete before storing var ch chan *event.T - ch, e = w.Store.QueryEvents(ctx, &filter.T{Authors: []string{evt.PubKey}, Kinds: kinds.T{evt.Kind}}) + ch, e = w.Store.QueryEvents(ctx, &filter.T{Authors: []string{ev.PubKey}, Kinds: kinds.T{ev.Kind}}) if log.Fail(e) { return fmt.Errorf("failed to query before replacing: %w", e) } - if previous := <-ch; previous != nil && isOlder(previous, &evt) { + if previous := <-ch; previous != nil && isOlder(previous, &ev) { if e = w.Store.DeleteEvent(ctx, previous); log.Fail(e) { return fmt.Errorf("failed to delete event for replacing: %w", e) } } - } else if evt.Kind.IsParameterizedReplaceable() { + } else if ev.Kind.IsParameterizedReplaceable() { // parameterized replaceable event, delete before storing - d := evt.Tags.GetFirst([]string{"d", ""}) + d := ev.Tags.GetFirst([]string{"d", ""}) if d != nil { var ch chan *event.T ch, e = w.Store.QueryEvents(ctx, &filter.T{ - Authors: []string{evt.PubKey}, - Kinds: kinds.T{evt.Kind}, + Authors: []string{ev.PubKey}, + Kinds: kinds.T{ev.Kind}, Tags: filter.TagMap{"d": []string{d.Value()}}, }) if log.Fail(e) { return fmt.Errorf( "failed to query before parameterized replacing: %w", e) } - if previous := <-ch; previous != nil && isOlder(previous, &evt) { + if previous := <-ch; previous != nil && isOlder(previous, &ev) { if e = w.Store.DeleteEvent(ctx, previous); log.Fail(e) { return fmt.Errorf( "failed to delete event for parameterized replacing: %w", e) @@ -75,13 +76,13 @@ func (w RelayWrapper) Publish(ctx context.Context, evt event.T) (e error) { } } } - if e = w.SaveEvent(ctx, &evt); log.Fail(e) && !errors.Is(e, ErrDupEvent) { + if e = w.SaveEvent(ctx, &ev); log.Fail(e) && !errors.Is(e, ErrDupEvent) { return fmt.Errorf("failed to save: %w", e) } return nil } -func (w RelayWrapper) QuerySync(ctx context.Context, f *filter.T, +func (w RelayWrapper) QuerySync(ctx context.T, f *filter.T, opts ...SubscriptionOption) (evs []*event.T, e error) { var ch chan *event.T if ch, e = w.Store.QueryEvents(ctx, f); log.E.Chk(e) { diff --git a/pkg/relay/eventstore/store.go b/pkg/relay/eventstore/store.go index a4f14355..a890f845 100644 --- a/pkg/relay/eventstore/store.go +++ b/pkg/relay/eventstore/store.go @@ -1,7 +1,7 @@ package eventstore import ( - "context" + "github.com/Hubmakerlabs/replicatr/pkg/context" "github.com/Hubmakerlabs/replicatr/pkg/nostr/event" "github.com/Hubmakerlabs/replicatr/pkg/nostr/filter" @@ -19,9 +19,9 @@ type Store interface { // QueryEvents is invoked upon a client's REQ as described in NIP-01. // it should return a channel with the events as they're recovered from a database. // the channel should be closed after the events are all delivered. - QueryEvents(context.Context, *filter.T) (chan *event.T, error) + QueryEvents(context.T, *filter.T) (chan *event.T, error) // DeleteEvent is used to handle deletion events, as per NIP-09. - DeleteEvent(context.Context, *event.T) error + DeleteEvent(context.T, *event.T) error // SaveEvent is called once Relay.AcceptEvent reports true. - SaveEvent(context.Context, *event.T) error + SaveEvent(context.T, *event.T) error } diff --git a/pkg/relay/handlers.go b/pkg/relay/handlers.go index cb8a77f8..d9533477 100644 --- a/pkg/relay/handlers.go +++ b/pkg/relay/handlers.go @@ -1,7 +1,6 @@ package relay import ( - "context" "crypto/rand" "encoding/hex" "encoding/json" @@ -11,6 +10,8 @@ import ( "sync" "time" + "github.com/Hubmakerlabs/replicatr/pkg/context" + "github.com/Hubmakerlabs/replicatr/pkg/nostr/enveloper" "github.com/Hubmakerlabs/replicatr/pkg/nostr/envelopes" "github.com/Hubmakerlabs/replicatr/pkg/nostr/envelopes/OK" @@ -66,9 +67,9 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) { Challenge: hex.EncodeToString(challenge), Authed: make(chan struct{}), } - ctx, cancel := context.WithCancel( - context.WithValue( - context.Background(), + ctx, cancel := context.Cancel( + context.Value( + context.Bg(), WebsocketContextKey, ws, ), ) @@ -195,9 +196,9 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) { ee := sync.WaitGroup{} ee.Add(len(env.T)) // a context just for the "stored events" request handler - reqCtx, cancelReqCtx := context.WithCancelCause(ctx) + reqCtx, cancelReqCtx := context.CancelCause(ctx) // expose subscription id in the context - reqCtx = context.WithValue(reqCtx, SubscriptionIDContextKey, + reqCtx = context.Value(reqCtx, SubscriptionIDContextKey, env.SubscriptionID) // handle each filter separately -- dispatching events as // they're loaded from databases diff --git a/pkg/relay/listener.go b/pkg/relay/listener.go index c1f97bb6..b0c8b50a 100644 --- a/pkg/relay/listener.go +++ b/pkg/relay/listener.go @@ -1,9 +1,10 @@ package relay import ( - "context" "fmt" + "github.com/Hubmakerlabs/replicatr/pkg/context" + log2 "github.com/Hubmakerlabs/replicatr/pkg/log" event2 "github.com/Hubmakerlabs/replicatr/pkg/nostr/envelopes/event" "github.com/Hubmakerlabs/replicatr/pkg/nostr/event" @@ -17,7 +18,7 @@ var log = log2.GetStd() type Listener struct { filters filters2.T - cancel context.CancelCauseFunc + cancel context.C } var listeners = xsync.NewTypedMapOf[*WebSocket, *xsync.MapOf[string, *Listener]](pointerHasher[WebSocket]) @@ -49,7 +50,7 @@ func GetListeningFilters() filters2.T { } func setListener(id subscriptionid.T, ws *WebSocket, - filters filters2.T, cancel context.CancelCauseFunc) { + filters filters2.T, cancel context.C) { subs, _ := listeners.LoadOrCompute(ws, func() *xsync.MapOf[string, *Listener] { diff --git a/pkg/relay/policies/events.go b/pkg/relay/policies/events.go index 9b7a9f50..812a9744 100644 --- a/pkg/relay/policies/events.go +++ b/pkg/relay/policies/events.go @@ -1,7 +1,7 @@ package policies import ( - "context" + "github.com/Hubmakerlabs/replicatr/pkg/context" "github.com/Hubmakerlabs/replicatr/pkg/nostr/event" "github.com/Hubmakerlabs/replicatr/pkg/nostr/kind" @@ -16,7 +16,7 @@ import ( // // If ignoreKinds is given this restriction will not apply to these kinds (useful for allowing a bigger). // If onlyKinds is given then all other kinds will be ignored. -func PreventTooManyIndexableTags(max int, ignoreKinds kinds.T, onlyKinds kinds.T) func(context.Context, *event.T) (bool, string) { +func PreventTooManyIndexableTags(max int, ignoreKinds kinds.T, onlyKinds kinds.T) func(context.T, *event.T) (bool, string) { ignore := func(kind kind.T) bool { return false } if len(ignoreKinds) > 0 { ignore = func(kind kind.T) bool { @@ -31,7 +31,7 @@ func PreventTooManyIndexableTags(max int, ignoreKinds kinds.T, onlyKinds kinds.T } } - return func(ctx context.Context, event *event.T) (reject bool, msg string) { + return func(ctx context.T, event *event.T) (reject bool, msg string) { if ignore(event.Kind) { return false, "" } @@ -50,8 +50,8 @@ func PreventTooManyIndexableTags(max int, ignoreKinds kinds.T, onlyKinds kinds.T } // PreventLargeTags rejects events that have indexable tag values greater than maxTagValueLen. -func PreventLargeTags(maxTagValueLen int) func(context.Context, *event.T) (bool, string) { - return func(ctx context.Context, event *event.T) (reject bool, msg string) { +func PreventLargeTags(maxTagValueLen int) func(context.T, *event.T) (bool, string) { + return func(ctx context.T, event *event.T) (reject bool, msg string) { for _, tag := range event.Tags { if len(tag) > 1 && len(tag[0]) == 1 { if len(tag[1]) > maxTagValueLen { @@ -67,7 +67,7 @@ func PreventLargeTags(maxTagValueLen int) func(context.Context, *event.T) (bool, // any events with kinds different than the specified ones. // // todo: this range minimum doesn't look like it would ever change from zero -func RestrictToSpecifiedKinds(kinds ...kind.T) func(context.Context, *event.T) (bool, string) { +func RestrictToSpecifiedKinds(kinds ...kind.T) func(context.T, *event.T) (bool, string) { var maximum, minimum kind.T for _, kind := range kinds { if kind > maximum { @@ -79,7 +79,7 @@ func RestrictToSpecifiedKinds(kinds ...kind.T) func(context.Context, *event.T) ( } } - return func(ctx context.Context, event *event.T) (reject bool, msg string) { + return func(ctx context.T, event *event.T) (reject bool, msg string) { // these are cheap and very questionable optimizations, but they exist for a reason: // we would have to ensure that the kind number is within the bounds of a uint16 anyway if event.Kind > maximum { @@ -97,8 +97,8 @@ func RestrictToSpecifiedKinds(kinds ...kind.T) func(context.Context, *event.T) ( } } -func PreventTimestampsInThePast(thresholdSeconds timestamp.T) func(context.Context, *event.T) (bool, string) { - return func(ctx context.Context, event *event.T) (reject bool, msg string) { +func PreventTimestampsInThePast(thresholdSeconds timestamp.T) func(context.T, *event.T) (bool, string) { + return func(ctx context.T, event *event.T) (reject bool, msg string) { if timestamp.Now()-event.CreatedAt > thresholdSeconds { return true, "event too old" } @@ -106,8 +106,8 @@ func PreventTimestampsInThePast(thresholdSeconds timestamp.T) func(context.Conte } } -func PreventTimestampsInTheFuture(thresholdSeconds timestamp.T) func(context.Context, *event.T) (bool, string) { - return func(ctx context.Context, event *event.T) (reject bool, msg string) { +func PreventTimestampsInTheFuture(thresholdSeconds timestamp.T) func(context.T, *event.T) (bool, string) { + return func(ctx context.T, event *event.T) (reject bool, msg string) { if event.CreatedAt-timestamp.Now() > thresholdSeconds { return true, "event too much in the future" } diff --git a/pkg/relay/policies/filters.go b/pkg/relay/policies/filters.go index cce0a076..11e4595a 100644 --- a/pkg/relay/policies/filters.go +++ b/pkg/relay/policies/filters.go @@ -1,7 +1,7 @@ package policies import ( - "context" + "github.com/Hubmakerlabs/replicatr/pkg/context" "github.com/Hubmakerlabs/replicatr/pkg/nostr/filter" kinds2 "github.com/Hubmakerlabs/replicatr/pkg/nostr/kinds" @@ -10,7 +10,7 @@ import ( ) // NoComplexFilters disallows filters with more than 2 tags. -func NoComplexFilters(ctx context.Context, f filter.T) (reject bool, msg string) { +func NoComplexFilters(ctx context.T, f filter.T) (reject bool, msg string) { items := len(f.Tags) + len(f.Kinds) if items > 4 && len(f.Tags) > 2 { @@ -21,7 +21,7 @@ func NoComplexFilters(ctx context.Context, f filter.T) (reject bool, msg string) } // NoEmptyFilters disallows filters that don't have at least a tag, a kind, an author or an id. -func NoEmptyFilters(ctx context.Context, f filter.T) (reject bool, msg string) { +func NoEmptyFilters(ctx context.T, f filter.T) (reject bool, msg string) { c := len(f.Kinds) + len(f.IDs) + len(f.Authors) for _, tagItems := range f.Tags { c += len(tagItems) @@ -34,24 +34,24 @@ func NoEmptyFilters(ctx context.Context, f filter.T) (reject bool, msg string) { // AntiSyncBots tries to prevent people from syncing kind:1s from this relay to else by always // requiring an author parameter at least. -func AntiSyncBots(ctx context.Context, f filter.T) (reject bool, msg string) { +func AntiSyncBots(ctx context.T, f filter.T) (reject bool, msg string) { return (len(f.Kinds) == 0 || slices.Contains(f.Kinds, 1)) && len(f.Authors) == 0, "an author must be specified to get their kind:1 notes" } -func NoSearchQueries(ctx context.Context, f filter.T) (reject bool, msg string) { +func NoSearchQueries(ctx context.T, f filter.T) (reject bool, msg string) { if f.Search != "" { return true, "search is not supported" } return false, "" } -func RemoveSearchQueries(ctx context.Context, f *filter.T) { +func RemoveSearchQueries(ctx context.T, f *filter.T) { f.Search = "" } -func RemoveAllButKinds(kinds ...uint16) func(context.Context, *filter.T) { - return func(ctx context.Context, f *filter.T) { +func RemoveAllButKinds(kinds ...uint16) func(context.T, *filter.T) { + return func(ctx context.T, f *filter.T) { if n := len(f.Kinds); n > 0 { newKinds := make(kinds2.T, 0, n) for i := 0; i < n; i++ { @@ -64,8 +64,8 @@ func RemoveAllButKinds(kinds ...uint16) func(context.Context, *filter.T) { } } -func RemoveAllButTags(tagNames ...string) func(context.Context, *filter.T) { - return func(ctx context.Context, f *filter.T) { +func RemoveAllButTags(tagNames ...string) func(context.T, *filter.T) { + return func(ctx context.T, f *filter.T) { for tagName := range f.Tags { if !slices.Contains(tagNames, tagName) { delete(f.Tags, tagName) diff --git a/pkg/relay/policies/nip04.go b/pkg/relay/policies/nip04.go index cee7c091..d4102d21 100644 --- a/pkg/relay/policies/nip04.go +++ b/pkg/relay/policies/nip04.go @@ -1,7 +1,7 @@ package policies import ( - "context" + "github.com/Hubmakerlabs/replicatr/pkg/context" "github.com/Hubmakerlabs/replicatr/pkg/nostr/filter" "github.com/Hubmakerlabs/replicatr/pkg/relay" @@ -10,7 +10,7 @@ import ( // RejectKind04Snoopers prevents reading NIP-04 messages from people not // involved in the conversation. -func RejectKind04Snoopers(ctx context.Context, f filter.T) (bool, string) { +func RejectKind04Snoopers(ctx context.T, f filter.T) (bool, string) { // prevent kind-4 events from being returned to unauthed users, // only when authentication is a thing if !slices.Contains(f.Kinds, 4) { diff --git a/pkg/relay/relay.go b/pkg/relay/relay.go index f684124f..3aedb107 100644 --- a/pkg/relay/relay.go +++ b/pkg/relay/relay.go @@ -1,11 +1,12 @@ package relay import ( - "context" "net/http" "os" "time" + "github.com/Hubmakerlabs/replicatr/pkg/context" + log2 "github.com/Hubmakerlabs/replicatr/pkg/log" "github.com/Hubmakerlabs/replicatr/pkg/nostr/event" "github.com/Hubmakerlabs/replicatr/pkg/nostr/filter" @@ -44,22 +45,22 @@ func New() *Relay { type Relay struct { ServiceURL string - RejectEvent []func(ctx context.Context, ev *event.T) (reject bool, msg string) - RejectFilter []func(ctx context.Context, f *filter.T) (reject bool, msg string) - RejectCountFilter []func(ctx context.Context, f *filter.T) (reject bool, msg string) - OverwriteDeletionOutcome []func(ctx context.Context, target *event.T, deletion *event.T) (acceptDeletion bool, msg string) - OverwriteResponseEvent []func(ctx context.Context, ev *event.T) - OverwriteFilter []func(ctx context.Context, f *filter.T) - OverwriteCountFilter []func(ctx context.Context, f *filter.T) - OverwriteRelayInformation []func(ctx context.Context, r *http.Request, info *relayinfo.T) *relayinfo.T - StoreEvent []func(ctx context.Context, ev *event.T) error - DeleteEvent []func(ctx context.Context, ev *event.T) error - QueryEvents []func(ctx context.Context, f *filter.T) (chan *event.T, error) - CountEvents []func(ctx context.Context, f *filter.T) (int64, error) - OnAuth []func(ctx context.Context, pubkey string) - OnConnect []func(ctx context.Context) - OnDisconnect []func(ctx context.Context) - OnEventSaved []func(ctx context.Context, ev *event.T) + RejectEvent []func(ctx context.T, ev *event.T) (reject bool, msg string) + RejectFilter []func(ctx context.T, f *filter.T) (reject bool, msg string) + RejectCountFilter []func(ctx context.T, f *filter.T) (reject bool, msg string) + OverwriteDeletionOutcome []func(ctx context.T, target *event.T, deletion *event.T) (acceptDeletion bool, msg string) + OverwriteResponseEvent []func(ctx context.T, ev *event.T) + OverwriteFilter []func(ctx context.T, f *filter.T) + OverwriteCountFilter []func(ctx context.T, f *filter.T) + OverwriteRelayInformation []func(ctx context.T, r *http.Request, info *relayinfo.T) *relayinfo.T + StoreEvent []func(ctx context.T, ev *event.T) error + DeleteEvent []func(ctx context.T, ev *event.T) error + QueryEvents []func(ctx context.T, f *filter.T) (chan *event.T, error) + CountEvents []func(ctx context.T, f *filter.T) (int64, error) + OnAuth []func(ctx context.T, pubkey string) + OnConnect []func(ctx context.T) + OnDisconnect []func(ctx context.T) + OnEventSaved []func(ctx context.T, ev *event.T) // editing info will affect Info *relayinfo.T diff --git a/pkg/relay/serve-req.go b/pkg/relay/serve-req.go index 81cb9303..2b931eae 100644 --- a/pkg/relay/serve-req.go +++ b/pkg/relay/serve-req.go @@ -1,10 +1,11 @@ package relay import ( - "context" "errors" "sync" + "github.com/Hubmakerlabs/replicatr/pkg/context" + "github.com/Hubmakerlabs/replicatr/pkg/nostr/envelopes/OK" event2 "github.com/Hubmakerlabs/replicatr/pkg/nostr/envelopes/event" "github.com/Hubmakerlabs/replicatr/pkg/nostr/envelopes/notice" @@ -13,7 +14,7 @@ import ( "github.com/Hubmakerlabs/replicatr/pkg/nostr/subscriptionid" ) -func (rl *Relay) handleRequest(ctx context.Context, id subscriptionid.T, +func (rl *Relay) handleRequest(ctx context.T, id subscriptionid.T, eose *sync.WaitGroup, ws *WebSocket, f *filter.T) (e error) { defer eose.Done() @@ -58,7 +59,7 @@ func (rl *Relay) handleRequest(ctx context.Context, id subscriptionid.T, return nil } -func (rl *Relay) handleCountRequest(ctx context.Context, ws *WebSocket, f *filter.T) int64 { +func (rl *Relay) handleCountRequest(ctx context.T, ws *WebSocket, f *filter.T) int64 { // overwrite the filter (for example, to eliminate some kinds or tags that // we know we don't support) for _, ovw := range rl.OverwriteCountFilter { diff --git a/pkg/relay/start.go b/pkg/relay/start.go index 6961286a..f9a18f8a 100644 --- a/pkg/relay/start.go +++ b/pkg/relay/start.go @@ -1,13 +1,14 @@ package relay import ( - "context" "errors" "net" "net/http" "strconv" "time" + "github.com/Hubmakerlabs/replicatr/pkg/context" + "github.com/fasthttp/websocket" "github.com/rs/cors" ) @@ -45,7 +46,7 @@ func (rl *Relay) Start(host string, port int, started ...chan bool) (e error) { } // Shutdown sends a websocket close control message to all connected clients. -func (rl *Relay) Shutdown(ctx context.Context) { +func (rl *Relay) Shutdown(ctx context.T) { rl.E.Chk(rl.httpServer.Shutdown(ctx)) rl.clients.Range(func(conn *websocket.Conn, _ struct{}) bool { rl.E.Chk(conn.WriteControl(websocket.CloseMessage, nil, time.Now().Add(time.Second))) diff --git a/pkg/relay/utils.go b/pkg/relay/utils.go index c71f2384..fd7d5736 100644 --- a/pkg/relay/utils.go +++ b/pkg/relay/utils.go @@ -1,7 +1,7 @@ package relay import ( - "context" + "github.com/Hubmakerlabs/replicatr/pkg/context" "github.com/Hubmakerlabs/replicatr/pkg/nostr/envelopes/auth" "github.com/Hubmakerlabs/replicatr/pkg/nostr/filter" @@ -13,28 +13,28 @@ const ( SubscriptionIDContextKey ) -func RequestAuth(ctx context.Context) { +func RequestAuth(ctx context.T) { ws := GetConnection(ctx) log.D.Chk(ws.WriteJSON(auth.Challenge{Challenge: ws.Challenge})) } -func GetConnection(ctx context.Context) *WebSocket { +func GetConnection(ctx context.T) *WebSocket { return ctx.Value(WebsocketContextKey).(*WebSocket) } -func GetAuthed(ctx context.Context) string { +func GetAuthed(ctx context.T) string { return GetConnection(ctx).AuthedPublicKey } -func GetIP(ctx context.Context) string { +func GetIP(ctx context.T) string { return xff.GetRemoteAddr(GetConnection(ctx).Request) } -func GetSubscriptionID(ctx context.Context) string { +func GetSubscriptionID(ctx context.T) string { return ctx.Value(SubscriptionIDContextKey).(string) } -func GetOpenSubscriptions(ctx context.Context) (res []*filter.T) { +func GetOpenSubscriptions(ctx context.T) (res []*filter.T) { if subs, ok := listeners.Load(GetConnection(ctx)); ok { res = make([]*filter.T, 0, listeners.Size()*2) subs.Range(func(_ string, sub *Listener) bool {