Skip to content

Commit

Permalink
Distributed lock and store calendar_events UUID as binary in MySQL (#…
Browse files Browse the repository at this point in the history
…20277)

#19352

Fix for code review comment:
#20156 (comment)

Also includes changes from #20252

# Checklist for submitter

If some of the following don't apply, delete the relevant line.

<!-- Note that API documentation changes are now addressed by the
product design team. -->

- [x] Added/updated tests
- [x] If database migrations are included, checked table schema to
confirm autoupdate
- For database migrations:
- [x] Checked schema for all modified table for columns that will
auto-update timestamps during migration.
- [x] Confirmed that updating the timestamps is acceptable, and will not
cause unwanted side effects.
- [x] Ensured the correct collation is explicitly set for character
columns (`COLLATE utf8mb4_unicode_ci`).
- [x] Manual QA for all new/changed functionality
  • Loading branch information
getvictor authored Jul 10, 2024
1 parent 886ab90 commit 7bcd61a
Show file tree
Hide file tree
Showing 21 changed files with 964 additions and 79 deletions.
6 changes: 5 additions & 1 deletion cmd/fleet/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
"github.com/fleetdm/fleet/v4/server/pubsub"
"github.com/fleetdm/fleet/v4/server/service"
"github.com/fleetdm/fleet/v4/server/service/async"
"github.com/fleetdm/fleet/v4/server/service/redis_lock"
"github.com/fleetdm/fleet/v4/server/service/redis_policy_set"
"github.com/fleetdm/fleet/v4/server/sso"
"github.com/fleetdm/fleet/v4/server/version"
Expand Down Expand Up @@ -691,6 +692,7 @@ the way that the Fleet server works.
}

var softwareInstallStore fleet.SoftwareInstallerStore
var distributedLock fleet.Lock
if license.IsPremium() {
profileMatcher := apple_mdm.NewProfileMatcher(redisPool)
if config.S3.SoftwareInstallersBucket != "" {
Expand Down Expand Up @@ -718,6 +720,7 @@ the way that the Fleet server works.
}
}

distributedLock = redis_lock.NewLock(redisPool)
svc, err = eeservice.NewService(
svc,
ds,
Expand All @@ -730,6 +733,7 @@ the way that the Fleet server works.
ssoSessionStore,
profileMatcher,
softwareInstallStore,
distributedLock,
)
if err != nil {
initFatal(err, "initial Fleet Premium service")
Expand Down Expand Up @@ -870,7 +874,7 @@ the way that the Fleet server works.
} else {
config.Calendar.Periodicity = 5 * time.Minute
}
return cron.NewCalendarSchedule(ctx, instanceID, ds, config.Calendar, logger)
return cron.NewCalendarSchedule(ctx, instanceID, ds, distributedLock, config.Calendar, logger)
},
); err != nil {
initFatal(err, "failed to register calendar schedule")
Expand Down
220 changes: 214 additions & 6 deletions ee/server/service/calendar.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,24 @@ package service
import (
"context"
"fmt"
"sync"

"github.com/fleetdm/fleet/v4/server/authz"
"github.com/fleetdm/fleet/v4/server/contexts/ctxerr"
"github.com/fleetdm/fleet/v4/server/fleet"
"github.com/fleetdm/fleet/v4/server/service/calendar"
"github.com/go-kit/log/level"
"sync"
"github.com/google/uuid"
)

var asyncCalendarProcessing bool
var asyncMutex sync.Mutex

func (svc *Service) CalendarWebhook(ctx context.Context, eventUUID string, channelID string, resourceState string) error {

// We don't want the sender to cancel the context since we want to make sure we process the webhook.
ctx = context.WithoutCancel(ctx)

appConfig, err := svc.ds.AppConfig(ctx)
if err != nil {
return fmt.Errorf("load app config: %w", err)
Expand All @@ -36,9 +44,9 @@ func (svc *Service) CalendarWebhook(ctx context.Context, eventUUID string, chann
svc.authz.SkipAuthorization(ctx)
if fleet.IsNotFound(err) {
// We could try to stop the channel callbacks here, but that may not be secure since we don't know if the request is legitimate
level.Warn(svc.logger).Log("msg", "Received calendar callback, but did not find corresponding event in database", "event_uuid",
level.Info(svc.logger).Log("msg", "Received calendar callback, but did not find corresponding event in database", "event_uuid",
eventUUID, "channel_id", channelID)
return err
return nil
}
return err
}
Expand All @@ -47,7 +55,7 @@ func (svc *Service) CalendarWebhook(ctx context.Context, eventUUID string, chann
return fmt.Errorf("calendar event %s has no team ID", eventUUID)
}

localConfig := &calendar.CalendarConfig{
localConfig := &calendar.Config{
GoogleCalendarIntegration: *googleCalendarIntegrationConfig,
ServerURL: appConfig.ServerSettings.ServerURL,
}
Expand All @@ -63,6 +71,60 @@ func (svc *Service) CalendarWebhook(ctx context.Context, eventUUID string, chann
return authz.ForbiddenWithInternal(fmt.Sprintf("calendar channel ID mismatch: %s != %s", savedChannelID, channelID), nil, nil, nil)
}

lockValue, reserved, err := svc.getCalendarLock(ctx, eventUUID, true)
if err != nil {
return err
}
// If lock has been reserved by cron, we will need to re-process this event in case the calendar event was changed after the cron job read it.
if lockValue == "" && !reserved {
// We did not get a lock, so there is nothing to do here
return nil
}

if !reserved {
unlocked := false
defer func() {
if !unlocked {
svc.releaseCalendarLock(ctx, eventUUID, lockValue)
}
}()

// Remove event from the queue so that we don't process this event again.
// Note: This item can be added back to the queue while we are processing it.
err = svc.distributedLock.RemoveFromSet(ctx, calendar.QueueKey, eventUUID)
if err != nil {
return ctxerr.Wrap(ctx, err, "remove calendar event from queue")
}

err = svc.processCalendarEvent(ctx, eventDetails, googleCalendarIntegrationConfig, userCalendar)
if err != nil {
return err
}
svc.releaseCalendarLock(ctx, eventUUID, lockValue)
unlocked = true
}

// Now, we need to check if there are any events in the queue that need to be re-processed.
asyncMutex.Lock()
defer asyncMutex.Unlock()
if !asyncCalendarProcessing {
eventIDs, err := svc.distributedLock.GetSet(ctx, calendar.QueueKey)
if err != nil {
return ctxerr.Wrap(ctx, err, "get calendar event queue")
}
if len(eventIDs) > 0 {
asyncCalendarProcessing = true
go svc.processCalendarAsync(ctx, eventIDs)
}
return nil
}

return nil
}

func (svc *Service) processCalendarEvent(ctx context.Context, eventDetails *fleet.CalendarEventDetails,
googleCalendarIntegrationConfig *fleet.GoogleCalendarIntegration, userCalendar fleet.UserCalendar) error {

genBodyFn := func(conflict bool) (body string, ok bool, err error) {

// This function is called when a new event is being created.
Expand Down Expand Up @@ -113,7 +175,7 @@ func (svc *Service) CalendarWebhook(ctx context.Context, eventUUID string, chann
return calendar.GenerateCalendarEventBody(ctx, svc.ds, team.Name, host, &sync.Map{}, conflict, svc.logger), true, nil
}

err = userCalendar.Configure(eventDetails.Email)
err := userCalendar.Configure(eventDetails.Email)
if err != nil {
return ctxerr.Wrap(ctx, err, "configure calendar")
}
Expand All @@ -124,10 +186,156 @@ func (svc *Service) CalendarWebhook(ctx context.Context, eventUUID string, chann
if updated && event != nil {
// Event was updated, so we need to save it
_, err = svc.ds.CreateOrUpdateCalendarEvent(ctx, event.UUID, event.Email, event.StartTime, event.EndTime, event.Data,
event.TimeZone, eventDetails.ID, fleet.CalendarWebhookStatusNone)
event.TimeZone, eventDetails.HostID, fleet.CalendarWebhookStatusNone)
if err != nil {
return ctxerr.Wrap(ctx, err, "create or update calendar event")
}
}

return nil
}

func (svc *Service) releaseCalendarLock(ctx context.Context, eventUUID string, lockValue string) {
ok, err := svc.distributedLock.ReleaseLock(ctx, calendar.LockKeyPrefix+eventUUID, lockValue)
if err != nil {
level.Error(svc.logger).Log("msg", "Failed to release calendar lock", "err", err)
}
if !ok {
// If the lock was not released, it will expire on its own.
level.Warn(svc.logger).Log("msg", "Failed to release calendar lock")
}
}

func (svc *Service) getCalendarLock(ctx context.Context, eventUUID string, addToQueue bool) (lockValue string, reserved bool, err error) {
// Check if lock has been reserved, which means we can't have it.
reservedValue, err := svc.distributedLock.Get(ctx, calendar.ReservedLockKeyPrefix+eventUUID)
if err != nil {
return "", false, ctxerr.Wrap(ctx, err, "get calendar reserved lock")
}
reserved = reservedValue != nil
if reserved && !addToQueue {
// We flag the lock as reserved.
return "", reserved, nil
}
var lockAcquired bool
if !reserved {
// Try to acquire the lock
lockValue = uuid.New().String()
lockAcquired, err = svc.distributedLock.AcquireLock(ctx, calendar.LockKeyPrefix+eventUUID, lockValue, 0)
if err != nil {
return "", false, ctxerr.Wrap(ctx, err, "acquire calendar lock")
}
}
if (!lockAcquired || reserved) && addToQueue {
// Could not acquire lock, so we are already processing this event. In this case, we add the event to
// the queue (actually a set) to indicate that we need to re-process the event.
err = svc.distributedLock.AddToSet(ctx, calendar.QueueKey, eventUUID)
if err != nil {
return "", false, ctxerr.Wrap(ctx, err, "add calendar event to queue")
}

if reserved {
// We flag the lock as reserved.
return "", reserved, nil
}

// Try to acquire the lock again in case it was released while we were adding the event to the queue.
lockAcquired, err = svc.distributedLock.AcquireLock(ctx, calendar.LockKeyPrefix+eventUUID, lockValue, 0)
if err != nil {
return "", false, ctxerr.Wrap(ctx, err, "acquire calendar lock again")
}

if !lockAcquired {
// We could not acquire the lock, so we are done here.
return "", reserved, nil
}
}
return lockValue, false, nil
}

func (svc *Service) processCalendarAsync(ctx context.Context, eventIDs []string) {
defer func() {
asyncMutex.Lock()
asyncCalendarProcessing = false
asyncMutex.Unlock()
}()
for {
if len(eventIDs) == 0 {
return
}
for _, eventUUID := range eventIDs {
if ok := svc.processCalendarEventAsync(ctx, eventUUID); !ok {
return
}
}

// Now we check whether there are any more events in the queue.
var err error
eventIDs, err = svc.distributedLock.GetSet(ctx, calendar.QueueKey)
if err != nil {
level.Error(svc.logger).Log("msg", "Failed to get calendar event queue", "err", err)
return
}
}
}

func (svc *Service) processCalendarEventAsync(ctx context.Context, eventUUID string) bool {
lockValue, _, err := svc.getCalendarLock(ctx, eventUUID, false)
if err != nil {
level.Error(svc.logger).Log("msg", "Failed to get calendar lock", "err", err)
return false
}
if lockValue == "" {
// We did not get a lock, so there is nothing to do here
return true
}
defer svc.releaseCalendarLock(ctx, eventUUID, lockValue)

// Remove event from the queue so that we don't process this event again.
// Note: This item can be added back to the queue while we are processing it.
err = svc.distributedLock.RemoveFromSet(ctx, calendar.QueueKey, eventUUID)
if err != nil {
level.Error(svc.logger).Log("msg", "Failed to remove calendar event from queue", "err", err)
return false
}

appConfig, err := svc.ds.AppConfig(ctx)
if err != nil {
level.Error(svc.logger).Log("msg", "Failed to load app config", "err", err)
return false
}

if len(appConfig.Integrations.GoogleCalendar) == 0 {
// Google Calendar integration is not configured
return true
}
googleCalendarIntegrationConfig := appConfig.Integrations.GoogleCalendar[0]

eventDetails, err := svc.ds.GetCalendarEventDetailsByUUID(ctx, eventUUID)
if err != nil {
if fleet.IsNotFound(err) {
// We found this event when the callback initially came in. So the event may have been removed or re-created since then.
return true
}
level.Error(svc.logger).Log("msg", "Failed to get calendar event details", "err", err)
return false
}
if eventDetails.TeamID == nil {
// Should not happen
level.Error(svc.logger).Log("msg", "Calendar event has no team ID", "uuid", eventUUID)
return false
}

localConfig := &calendar.Config{
GoogleCalendarIntegration: *googleCalendarIntegrationConfig,
ServerURL: appConfig.ServerSettings.ServerURL,
}
userCalendar := calendar.CreateUserCalendarFromConfig(ctx, localConfig, svc.logger)

err = svc.processCalendarEvent(ctx, eventDetails, googleCalendarIntegrationConfig, userCalendar)
if err != nil {
level.Error(svc.logger).Log("msg", "Failed to process calendar event", "err", err)
return false
}
return true
}
1 change: 1 addition & 0 deletions ee/server/service/mdm_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func setupMockDatastorePremiumService() (*mock.Store, *eeservice.Service, contex
nil,
nil,
nil,
nil,
)
if err != nil {
panic(err)
Expand Down
3 changes: 3 additions & 0 deletions ee/server/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type Service struct {
depService *apple_mdm.DEPService
profileMatcher fleet.ProfileMatcher
softwareInstallStore fleet.SoftwareInstallerStore
distributedLock fleet.Lock
}

func NewService(
Expand All @@ -42,6 +43,7 @@ func NewService(
sso sso.SessionStore,
profileMatcher fleet.ProfileMatcher,
softwareInstallStore fleet.SoftwareInstallerStore,
distributedLock fleet.Lock,
) (*Service, error) {
authorizer, err := authz.NewAuthorizer()
if err != nil {
Expand All @@ -61,6 +63,7 @@ func NewService(
depService: apple_mdm.NewDEPService(ds, depStorage, logger),
profileMatcher: profileMatcher,
softwareInstallStore: softwareInstallStore,
distributedLock: distributedLock,
}

// Override methods that can't be easily overriden via
Expand Down
Loading

0 comments on commit 7bcd61a

Please sign in to comment.