Skip to content

Commit

Permalink
Handle feed deletes
Browse files Browse the repository at this point in the history
  • Loading branch information
jarrel-b committed Aug 25, 2023
1 parent 5925e32 commit 243105e
Showing 1 changed file with 61 additions and 48 deletions.
109 changes: 61 additions & 48 deletions publicapi/feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"database/sql"
"fmt"
"math"
"sort"
"sync"
"time"

Expand All @@ -18,6 +19,7 @@ import (
db "github.com/mikeydub/go-gallery/db/gen/coredb"
"github.com/mikeydub/go-gallery/graphql/dataloader"
"github.com/mikeydub/go-gallery/graphql/model"
"github.com/mikeydub/go-gallery/service/logger"
"github.com/mikeydub/go-gallery/service/persist"
"github.com/mikeydub/go-gallery/service/recommend"
"github.com/mikeydub/go-gallery/service/recommend/userpref"
Expand Down Expand Up @@ -317,7 +319,7 @@ type feedParams struct {
FetchFrom time.Duration
}

func fetchFeedEntities(ctx context.Context, queries *db.Queries, p feedParams) ([]db.FeedEntityScore, error) {
func fetchFeedEntityScores(ctx context.Context, queries *db.Queries, p feedParams) ([]db.FeedEntityScore, error) {
var q db.GetFeedEntityScoresParams

q.IncludeViewer = true
Expand Down Expand Up @@ -369,7 +371,7 @@ func (api FeedAPI) TrendingFeed(ctx context.Context, before *string, after *stri

if !hasCursors {
calcFunc := func(ctx context.Context) ([]persist.FeedEntityType, []persist.DBID, error) {
trendData, err := fetchFeedEntities(ctx, api.queries, feedParams{
trendData, err := fetchFeedEntityScores(ctx, api.queries, feedParams{
IncludePosts: includePosts,
IncludeEvents: true,
ExcludeActions: []persist.Action{persist.ActionUserCreated, persist.ActionUserFollowedUsers},
Expand Down Expand Up @@ -472,7 +474,7 @@ func (api FeedAPI) CuratedFeed(ctx context.Context, before, after *string, first
}

if !hasCursors {
trendData, err := fetchFeedEntities(ctx, api.queries, feedParams{
trendData, err := fetchFeedEntityScores(ctx, api.queries, feedParams{
IncludePosts: includePosts,
IncludeEvents: !includePosts,
ExcludeUserID: userID,
Expand Down Expand Up @@ -647,73 +649,84 @@ func loadFeedEntities(ctx context.Context, d *dataloader.Loaders, typs []persist
if len(typs) != len(ids) {
panic("length of types and ids must be equal")
}

entities := make([]any, len(ids))
feedEventIDs := make([]persist.DBID, 0, len(ids))
postIDs := make([]persist.DBID, 0, len(ids))
errored := make([]int, 0)
idToPosition := make(map[persist.DBID]int, len(ids))
eventsFetch := make([]persist.DBID, 0, len(ids))
postsFetch := make([]persist.DBID, 0, len(ids))
eventsDone := make(chan bool)
postsDone := make(chan bool)
eventsErr := make(chan int)
postsErr := make(chan int)

for i := 0; i < len(typs); i++ {
id := ids[i]
idToPosition[id] = i
switch persist.FeedEntityType(typs[i]) {
case persist.FeedEventTypeTag:
feedEventIDs = append(feedEventIDs, ids[i])
eventsFetch = append(eventsFetch, id)
case persist.PostTypeTag:
postIDs = append(postIDs, ids[i])
postsFetch = append(postsFetch, id)
default:
return nil, fmt.Errorf("unknown feed entity type %d", typs[i])
logger.For(ctx).Warnf("unknown feed entity type %d", typs[i])
}
}

incomingFeedEvents := make(chan []db.FeedEvent)
incomingFeedPosts := make(chan []db.Post)
incomingErrors := make(chan error)

go func() {
feedEvents, errs := d.FeedEventByFeedEventID.LoadAll(feedEventIDs)
for _, err := range errs {
if err != nil {
incomingErrors <- err
return
batchResults, batchErrs := d.FeedEventByFeedEventID.LoadAll(eventsFetch)
for i := 0; i < len(batchResults); i++ {
pos := idToPosition[eventsFetch[i]]
err := batchErrs[i]
entities[pos] = batchResults[i]
if err != nil && !util.ErrorAs[persist.ErrFeedEventNotFoundByID](err) {
logger.For(ctx).Errorf("failed to fetch event %s: %s", eventsFetch[i], err)
eventsErr <- pos
}
}
incomingFeedEvents <- feedEvents
close(eventsDone)
close(eventsErr)
}()

go func() {
feedPosts, errs := d.PostByPostID.LoadAll(postIDs)
for _, err := range errs {
if err != nil {
incomingErrors <- err
return
batchResults, batchErrs := d.PostByPostID.LoadAll(postsFetch)
for i := 0; i < len(batchResults); i++ {
pos := idToPosition[postsFetch[i]]
err := batchErrs[i]
entities[pos] = batchResults[i]
if err != nil && !util.ErrorAs[persist.ErrPostNotFoundByID](err) {
logger.For(ctx).Errorf("failed to fetch post %s: %s", postsFetch[i], err)
postsErr <- pos
}
}
incomingFeedPosts <- feedPosts
close(postsDone)
close(postsErr)
}()

for i := 0; i < 2; i++ {
select {
case feedEvents := <-incomingFeedEvents:
idsToFeedEvents := make(map[persist.DBID]db.FeedEvent, len(feedEvents))
for _, evt := range feedEvents {
idsToFeedEvents[evt.ID] = evt
}
for pos := range eventsErr {
errored = append(errored, pos)
}
for pos := range postsErr {
errored = append(errored, pos)
}

for j, id := range ids {
if it, ok := idsToFeedEvents[id]; ok {
entities[j] = it
}
}
case feedPosts := <-incomingFeedPosts:
idsToFeedPosts := make(map[persist.DBID]db.Post, len(feedPosts))
for _, evt := range feedPosts {
idsToFeedPosts[evt.ID] = evt
}
<-eventsDone
<-postsDone

for j, id := range ids {
if it, ok := idsToFeedPosts[id]; ok {
entities[j] = it
}
}
case err := <-incomingErrors:
return nil, err
// Sort in descending order
sort.Slice(errored, func(i, j int) bool { return errored[i] > errored[j] })

// Filter out errored entities
for _, pos := range errored {
if pos == 0 {
entities = entities[1:]
continue
}
if pos == len(entities)-1 {
entities = entities[:pos]
continue
}
entities = append(entities[:pos], entities[pos+1:]...)
}

return entities, nil
Expand Down

0 comments on commit 243105e

Please sign in to comment.