Skip to content

Commit

Permalink
getting rid of the damn context stutter
Browse files Browse the repository at this point in the history
  • Loading branch information
mleku committed Jan 9, 2024
1 parent 20db6c3 commit ffe5414
Show file tree
Hide file tree
Showing 64 changed files with 448 additions and 388 deletions.
15 changes: 8 additions & 7 deletions cmd/algia/main.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package main

import (
"context"
"encoding/json"
"errors"
"fmt"
Expand All @@ -14,6 +13,8 @@ import (
"sync"
"time"

"github.com/Hubmakerlabs/replicatr/pkg/context"

"github.com/Hubmakerlabs/replicatr/pkg/go-nostr/event"
"github.com/Hubmakerlabs/replicatr/pkg/go-nostr/filter"
"github.com/Hubmakerlabs/replicatr/pkg/go-nostr/keys"
Expand Down Expand Up @@ -150,7 +151,7 @@ func (cfg *Config) GetFollows(profile string) (map[string]Profile, error) {
mu.Unlock()
m := map[string]struct{}{}

cfg.Do(RelayPerms{Read: true}, func(ctx context.Context, rl *relays.Relay) bool {
cfg.Do(RelayPerms{Read: true}, func(ctx context.T, rl *relays.Relay) bool {
evs, e := rl.QuerySync(ctx, filter.T{Kinds: []int{event.KindContactList}, Authors: []string{pub}, Limit: 1})
if e != nil {
return true
Expand Down Expand Up @@ -194,7 +195,7 @@ func (cfg *Config) GetFollows(profile string) (map[string]Profile, error) {
}

// get follower's descriptions
cfg.Do(RelayPerms{Read: true}, func(ctx context.Context, rl *relays.Relay) bool {
cfg.Do(RelayPerms{Read: true}, func(ctx context.T, rl *relays.Relay) bool {
evs, e := rl.QuerySync(ctx, filter.T{
Kinds: []int{event.KindProfileMetadata},
Authors: follows[i:end], // Use the updated end index
Expand Down Expand Up @@ -225,7 +226,7 @@ func (cfg *Config) GetFollows(profile string) (map[string]Profile, error) {
}

// FindRelay is
func (cfg *Config) FindRelay(ctx context.Context, r RelayPerms) *relays.Relay {
func (cfg *Config) FindRelay(ctx context.T, r RelayPerms) *relays.Relay {
for k, v := range cfg.Relays {
if r.Write && !v.Write {
continue
Expand All @@ -252,9 +253,9 @@ func (cfg *Config) FindRelay(ctx context.Context, r RelayPerms) *relays.Relay {
}

// Do is
func (cfg *Config) Do(r RelayPerms, f func(context.Context, *relays.Relay) bool) {
func (cfg *Config) Do(r RelayPerms, f func(context.T, *relays.Relay) bool) {
var wg sync.WaitGroup
ctx := context.Background()
ctx := context.Bg()
for k, v := range cfg.Relays {
if r.Write && !v.Write {
continue
Expand Down Expand Up @@ -390,7 +391,7 @@ func (cfg *Config) Events(f filter.T) []*event.T {
var mu sync.Mutex
found := false
var m sync.Map
cfg.Do(RelayPerms{Read: true}, func(ctx context.Context, rl *relays.Relay) bool {
cfg.Do(RelayPerms{Read: true}, func(ctx context.T, rl *relays.Relay) bool {
mu.Lock()
if found {
mu.Unlock()
Expand Down
5 changes: 3 additions & 2 deletions cmd/algia/profile.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package main

import (
"context"
"encoding/json"
"errors"
"fmt"
"os"

"github.com/Hubmakerlabs/replicatr/pkg/context"

"github.com/Hubmakerlabs/replicatr/pkg/go-nostr/event"
"github.com/Hubmakerlabs/replicatr/pkg/go-nostr/filter"
"github.com/Hubmakerlabs/replicatr/pkg/go-nostr/keys"
Expand All @@ -21,7 +22,7 @@ func doProfile(cCtx *cli.Context) (e error) {
j := cCtx.Bool("json")

cfg := cCtx.App.Metadata["config"].(*Config)
rl := cfg.FindRelay(context.Background(), RelayPerms{Read: true})
rl := cfg.FindRelay(context.Bg(), RelayPerms{Read: true})
if rl == nil {
return errors.New("cannot connect relays")
}
Expand Down
31 changes: 16 additions & 15 deletions cmd/algia/timeline.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package main

import (
"context"
"encoding/json"
"errors"
"fmt"
Expand All @@ -13,6 +12,8 @@ import (
"sync"
"sync/atomic"

"github.com/Hubmakerlabs/replicatr/pkg/context"

"github.com/Hubmakerlabs/replicatr/pkg/go-nostr/event"
"github.com/Hubmakerlabs/replicatr/pkg/go-nostr/filter"
"github.com/Hubmakerlabs/replicatr/pkg/go-nostr/filters"
Expand Down Expand Up @@ -217,7 +218,7 @@ func doDMPost(cCtx *cli.Context) (e error) {
}

var success atomic.Int64
cfg.Do(RelayPerms{Write: true}, func(ctx context.Context, rl *relays.Relay) bool {
cfg.Do(RelayPerms{Write: true}, func(ctx context.T, rl *relays.Relay) bool {
e := rl.Publish(ctx, ev)
if e != nil {
fmt.Fprintln(os.Stderr, rl.URL, e)
Expand Down Expand Up @@ -324,7 +325,7 @@ func doPost(cCtx *cli.Context) (e error) {
}

var success atomic.Int64
cfg.Do(RelayPerms{Write: true}, func(ctx context.Context, rl *relays.Relay) bool {
cfg.Do(RelayPerms{Write: true}, func(ctx context.T, rl *relays.Relay) bool {
e := rl.Publish(ctx, ev)
if e != nil {
fmt.Fprintln(os.Stderr, rl.URL, e)
Expand Down Expand Up @@ -425,7 +426,7 @@ func doReply(cCtx *cli.Context) (e error) {
}

var success atomic.Int64
cfg.Do(RelayPerms{Write: true}, func(ctx context.Context, rl *relays.Relay) bool {
cfg.Do(RelayPerms{Write: true}, func(ctx context.T, rl *relays.Relay) bool {
if !quote {
ev.Tags = ev.Tags.AppendUnique(tags.Tag{"e", id, rl.URL, "reply"})
} else {
Expand Down Expand Up @@ -488,7 +489,7 @@ func doRepost(cCtx *cli.Context) (e error) {
first.Store(true)

var success atomic.Int64
cfg.Do(RelayPerms{Write: true}, func(ctx context.Context, rl *relays.Relay) bool {
cfg.Do(RelayPerms{Write: true}, func(ctx context.T, rl *relays.Relay) bool {
if first.Load() {
evs, e := rl.QuerySync(ctx, f)
if e != nil {
Expand Down Expand Up @@ -543,7 +544,7 @@ func doUnrepost(cCtx *cli.Context) (e error) {
}
var repostID string
var mu sync.Mutex
cfg.Do(RelayPerms{Read: true}, func(ctx context.Context, rl *relays.Relay) bool {
cfg.Do(RelayPerms{Read: true}, func(ctx context.T, rl *relays.Relay) bool {
evs, e := rl.QuerySync(ctx, f)
if e != nil {
return true
Expand All @@ -565,7 +566,7 @@ func doUnrepost(cCtx *cli.Context) (e error) {
}

var success atomic.Int64
cfg.Do(RelayPerms{Write: true}, func(ctx context.Context, rl *relays.Relay) bool {
cfg.Do(RelayPerms{Write: true}, func(ctx context.T, rl *relays.Relay) bool {
e := rl.Publish(ctx, ev)
if e != nil {
fmt.Fprintln(os.Stderr, rl.URL, e)
Expand Down Expand Up @@ -631,7 +632,7 @@ func doLike(cCtx *cli.Context) (e error) {
first.Store(true)

var success atomic.Int64
cfg.Do(RelayPerms{Write: true}, func(ctx context.Context, rl *relays.Relay) bool {
cfg.Do(RelayPerms{Write: true}, func(ctx context.T, rl *relays.Relay) bool {
if first.Load() {
evs, e := rl.QuerySync(ctx, f)
if e != nil {
Expand Down Expand Up @@ -687,7 +688,7 @@ func doUnlike(cCtx *cli.Context) (e error) {
}
var likeID string
var mu sync.Mutex
cfg.Do(RelayPerms{Read: true}, func(ctx context.Context, rl *relays.Relay) bool {
cfg.Do(RelayPerms{Read: true}, func(ctx context.T, rl *relays.Relay) bool {
evs, e := rl.QuerySync(ctx, f)
if e != nil {
return true
Expand All @@ -709,7 +710,7 @@ func doUnlike(cCtx *cli.Context) (e error) {
}

var success atomic.Int64
cfg.Do(RelayPerms{Write: true}, func(ctx context.Context, rl *relays.Relay) bool {
cfg.Do(RelayPerms{Write: true}, func(ctx context.T, rl *relays.Relay) bool {
e := rl.Publish(ctx, ev)
if e != nil {
fmt.Fprintln(os.Stderr, rl.URL, e)
Expand Down Expand Up @@ -758,7 +759,7 @@ func doDelete(cCtx *cli.Context) (e error) {
}

var success atomic.Int64
cfg.Do(RelayPerms{Write: true}, func(ctx context.Context, rl *relays.Relay) bool {
cfg.Do(RelayPerms{Write: true}, func(ctx context.T, rl *relays.Relay) bool {
e := rl.Publish(ctx, ev)
if e != nil {
fmt.Fprintln(os.Stderr, rl.URL, e)
Expand Down Expand Up @@ -821,7 +822,7 @@ func doStream(cCtx *cli.Context) (e error) {

cfg := cCtx.App.Metadata["config"].(*Config)

rl := cfg.FindRelay(context.Background(), RelayPerms{Read: true})
rl := cfg.FindRelay(context.Bg(), RelayPerms{Read: true})
if rl == nil {
return errors.New("cannot connect relays")
}
Expand Down Expand Up @@ -859,7 +860,7 @@ func doStream(cCtx *cli.Context) (e error) {
Since: &since,
}

sub, e := rl.Subscribe(context.Background(), filters.T{ff})
sub, e := rl.Subscribe(context.Bg(), filters.T{ff})
if e != nil {
return e
}
Expand All @@ -879,7 +880,7 @@ func doStream(cCtx *cli.Context) (e error) {
if e := evr.Sign(sk); e != nil {
return e
}
cfg.Do(RelayPerms{Write: true}, func(ctx context.Context, rl *relays.Relay) bool {
cfg.Do(RelayPerms{Write: true}, func(ctx context.T, rl *relays.Relay) bool {
rl.Publish(ctx, evr)
return true
})
Expand Down Expand Up @@ -948,7 +949,7 @@ func postMsg(cCtx *cli.Context, msg string) (e error) {
}

var success atomic.Int64
cfg.Do(RelayPerms{Write: true}, func(ctx context.Context, rl *relays.Relay) bool {
cfg.Do(RelayPerms{Write: true}, func(ctx context.T, rl *relays.Relay) bool {
e := rl.Publish(ctx, ev)
if e != nil {
fmt.Fprintln(os.Stderr, rl.URL, e)
Expand Down
11 changes: 6 additions & 5 deletions cmd/algia/zap.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package main

import (
"context"
"encoding/json"
"errors"
"fmt"
Expand All @@ -10,6 +9,8 @@ import (
"os"
"strings"

"github.com/Hubmakerlabs/replicatr/pkg/context"

"github.com/Hubmakerlabs/replicatr/pkg/go-nostr/event"
"github.com/Hubmakerlabs/replicatr/pkg/go-nostr/filter"
"github.com/Hubmakerlabs/replicatr/pkg/go-nostr/keys"
Expand Down Expand Up @@ -75,7 +76,7 @@ func pay(cfg *Config, invoice string) (e error) {
return e
}

rl, e := relays.RelayConnect(context.Background(), host)
rl, e := relays.RelayConnect(context.Bg(), host)
if e != nil {
return e
}
Expand Down Expand Up @@ -117,12 +118,12 @@ func pay(cfg *Config, invoice string) (e error) {
Kinds: []int{event.KindNWCWalletInfo, event.KindNWCWalletResponse, event.KindNWCWalletRequest},
Limit: 1,
}}
sub, e := rl.Subscribe(context.Background(), filters)
sub, e := rl.Subscribe(context.Bg(), filters)
if e != nil {
return e
}

e = rl.Publish(context.Background(), ev)
e = rl.Publish(context.Bg(), ev)
if e != nil {
return e
}
Expand All @@ -146,7 +147,7 @@ func pay(cfg *Config, invoice string) (e error) {

// ZapInfo is
func (cfg *Config) ZapInfo(pub string) (*Lnurlp, error) {
rl := cfg.FindRelay(context.Background(), RelayPerms{Read: true})
rl := cfg.FindRelay(context.Bg(), RelayPerms{Read: true})
if rl == nil {
return nil, errors.New("cannot connect relays")
}
Expand Down
7 changes: 4 additions & 3 deletions cmd/replicatrd/replicatr/listener.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
package replicatr

import (
"context"
"fmt"

"github.com/Hubmakerlabs/replicatr/pkg/context"

"github.com/Hubmakerlabs/replicatr/pkg/go-nostr/filter"
"github.com/Hubmakerlabs/replicatr/pkg/go-nostr/filters"
"github.com/puzpuzpuz/xsync/v2"
)

type Listener struct {
filters filters.T
cancel context.CancelCauseFunc
cancel context.C
}

var listeners = xsync.NewTypedMapOf[*WebSocket,
Expand Down Expand Up @@ -43,7 +44,7 @@ func GetListeningFilters() (respFilters filters.T) {
return
}

func SetListener(id string, ws *WebSocket, f filters.T, c context.CancelCauseFunc) {
func SetListener(id string, ws *WebSocket, f filters.T, c context.C) {
subs, _ := listeners.LoadOrCompute(ws, func() ListenerMap {
return xsync.NewMapOf[*Listener]()
})
Expand Down
4 changes: 2 additions & 2 deletions cmd/replicatrd/replicatr/policiesnip4.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package replicatr

import (
"context"
"github.com/Hubmakerlabs/replicatr/pkg/context"

"github.com/Hubmakerlabs/replicatr/pkg/nostr/filter"
"golang.org/x/exp/slices"
)

// RejectKind4Snoopers prevents reading NIP-04 messages from people not involved
// in the conversation.
func RejectKind4Snoopers(c context.Context, f *filter.T) (bool, string) {
func RejectKind4Snoopers(c context.T, f *filter.T) (bool, string) {
// prevent kind-4 events from being returned to unauthorized users, only
// when authentication is a thing
if !slices.Contains(f.Kinds, 4) {
Expand Down
29 changes: 15 additions & 14 deletions cmd/replicatrd/replicatr/relay.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package replicatr

import (
"context"
"net/http"
"os"
"time"

"github.com/Hubmakerlabs/replicatr/pkg/context"

"github.com/Hubmakerlabs/replicatr/pkg/go-nostr/event"
"github.com/Hubmakerlabs/replicatr/pkg/go-nostr/filter"
"github.com/Hubmakerlabs/replicatr/pkg/go-nostr/nip11"
Expand All @@ -26,17 +27,17 @@ const (

// function types used in the relay state
type (
RejectEvent func(c context.Context, event *event.T) (reject bool, msg string)
RejectFilter func(c context.Context, f *filter.T) (reject bool, msg string)
OverwriteFilter func(c context.Context, f *filter.T)
OverwriteDeletionOutcome func(c context.Context, target *event.T, del *event.T) (accept bool, msg string)
OverwriteResponseEvent func(c context.Context, ev *event.T)
Events func(c context.Context, ev *event.T) error
Hook func(c context.Context)
OverwriteRelayInformation func(c context.Context, r *http.Request, info *nip11.RelayInformationDocument) *nip11.RelayInformationDocument
QueryEvents func(c context.Context, f *filter.T) (eventC chan *event.T, e error)
CountEvents func(c context.Context, f *filter.T) (cnt int64, e error)
OnEventSaved func(c context.Context, ev *event.T)
RejectEvent func(c context.T, event *event.T) (reject bool, msg string)
RejectFilter func(c context.T, f *filter.T) (reject bool, msg string)
OverwriteFilter func(c context.T, f *filter.T)
OverwriteDeletionOutcome func(c context.T, target *event.T, del *event.T) (accept bool, msg string)
OverwriteResponseEvent func(c context.T, ev *event.T)
Events func(c context.T, ev *event.T) error
Hook func(c context.T)
OverwriteRelayInformation func(c context.T, r *http.Request, info *nip11.Info) *nip11.Info
QueryEvents func(c context.T, f *filter.T) (eventC chan *event.T, e error)
CountEvents func(c context.T, f *filter.T) (cnt int64, e error)
OnEventSaved func(c context.T, ev *event.T)
)

type Relay struct {
Expand All @@ -57,7 +58,7 @@ type Relay struct {
OnDisconnect []Hook
OnEventSaved []OnEventSaved
// editing info will affect
Info *nip11.RelayInformationDocument
Info *nip11.Info
*log2.Log
// for establishing websockets
upgrader websocket.Upgrader
Expand All @@ -77,7 +78,7 @@ type Relay struct {
func NewRelay(appName string) (r *Relay) {
r = &Relay{
Log: log2.New(os.Stderr, appName, 0),
Info: &nip11.RelayInformationDocument{
Info: &nip11.Info{
Software: "https://github.com/Hubmakerlabs/replicatr/cmd/replicatrd",
Version: "n/a",
SupportedNIPs: make([]int, 0),
Expand Down
Loading

0 comments on commit ffe5414

Please sign in to comment.