Skip to content

Commit

Permalink
Merge pull request open-horizon#4025 from MaxMcAdam/anax-3989
Browse files Browse the repository at this point in the history
Issue 3989 - AgentImageCleanup: Subworker maintains a list of images …
  • Loading branch information
LiilyZhang authored Apr 18, 2024
2 parents c89b9aa + ab3a708 commit 5737c30
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 0 deletions.
19 changes: 19 additions & 0 deletions nodemanagement/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package nodemanagement

import (
"fmt"
"github.com/open-horizon/anax/containermessage"
"github.com/open-horizon/anax/events"
)

Expand Down Expand Up @@ -142,3 +143,21 @@ func NewNmpStatusChangeCommand(msg *events.ExchangeChangeMessage) *NmpStatusChan
Msg: msg,
}
}

type ImageFetchedCommand struct {
DeploymentDescription *containermessage.DeploymentDescription
}

func NewImageFetchedCommand(dd *containermessage.DeploymentDescription) *ImageFetchedCommand {
return &ImageFetchedCommand{
DeploymentDescription: dd,
}
}

func (i ImageFetchedCommand) String() string {
return fmt.Sprintf("DeploymentDescription: %v", i.DeploymentDescription)
}

func (i ImageFetchedCommand) ShortString() string {
return i.String()
}
18 changes: 18 additions & 0 deletions nodemanagement/node_management_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,15 @@ func (n *NodeManagementWorker) DownloadComplete(cmd *NMPDownloadCompleteCommand)
}
}

func (n *NodeManagementWorker) HandleImageFetchedCommand(cmd *ImageFetchedCommand) {
for _, svc := range cmd.DeploymentDescription.Services {
imgRec := persistence.NewServiceImageUsage(svc.Image)
if err := persistence.SaveOrUpdateServiceImage(n.db, imgRec); err != nil {
glog.Errorf(nmwlog(fmt.Sprintf("Failed to save image download record to db: %v", err)))
}
}
}

func (n *NodeManagementWorker) CommandHandler(command worker.Command) bool {
glog.Infof(nmwlog(fmt.Sprintf("Handling command %v", command)))
switch command.(type) {
Expand Down Expand Up @@ -330,6 +339,9 @@ func (n *NodeManagementWorker) CommandHandler(command worker.Command) bool {
n.HandleAgentFilesVersionChange(cmd)
case *NmpStatusChangeCommand:
n.HandleNmpStatusReset()
case *ImageFetchedCommand:
cmd := command.(*ImageFetchedCommand)
n.HandleImageFetchedCommand(cmd)
default:
return false
}
Expand Down Expand Up @@ -520,6 +532,12 @@ func (n *NodeManagementWorker) NewEvent(incoming events.Message) {
case events.CHANGE_NMP_STATUS:
n.Commands <- NewNmpStatusChangeCommand(msg)
}
case *events.ImageFetchMessage:
msg, _ := incoming.(*events.ImageFetchMessage)
switch msg.Event().Id {
case events.IMAGE_FETCHED:
n.Commands <- NewImageFetchedCommand(msg.DeploymentDescription)
}
}
}

Expand Down
98 changes: 98 additions & 0 deletions persistence/service_images.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package persistence

import (
"encoding/json"
"fmt"
"github.com/boltdb/bolt"
"regexp"
"time"
)

// service image table name
const SERVICE_IMAGES = "service_images"

type ServiceImageUsage struct {
ImageId string `json:"image_id"`
TimeLastUsed uint64 `json:"time_last_used"`
}

func NewServiceImageUsage(imageId string) *ServiceImageUsage {
return &ServiceImageUsage{ImageId: imageId, TimeLastUsed: uint64(time.Now().Unix())}
}

func (s ServiceImageUsage) String() string {
return fmt.Sprintf("ImageId: %v, "+
"TimeLastUsed: %v",
s.ImageId, s.TimeLastUsed)
}

func (s ServiceImageUsage) ShortString() string {
return s.String()
}

// save or update the given image info
// image use info is keyed with ImageName:ImageTag
func SaveOrUpdateServiceImage(db *bolt.DB, serviceImage *ServiceImageUsage) error {
writeErr := db.Update(func(tx *bolt.Tx) error {
if bucket, err := tx.CreateBucketIfNotExists([]byte(SERVICE_IMAGES)); err != nil {
return err
} else if serial, err := json.Marshal(serviceImage); err != nil {
return fmt.Errorf("Failed to serialize service image usage: %v", err)
} else {
return bucket.Put([]byte(serviceImage.ImageId), serial)
}
})

return writeErr
}

func DeleteServiceImage(db *bolt.DB, imageId string) error {
return db.Update(func(tx *bolt.Tx) error {
if bucket, err := tx.CreateBucketIfNotExists([]byte(SERVICE_IMAGES)); err != nil {
return err
} else if err := bucket.Delete([]byte(imageId)); err != nil {
return fmt.Errorf("Unable to delete service image usage record for %v: %v.", imageId, err)
}
return nil
})
}

func FindServiceImageUsageWithFilters(db *bolt.DB, filters []IUFilter) ([]ServiceImageUsage, error) {
imgUsages := make([]ServiceImageUsage, 0)

readErr := db.View(func(tx *bolt.Tx) error {
if bucket := tx.Bucket([]byte(SERVICE_IMAGES)); bucket != nil {
bucket.ForEach(func(k, v []byte) error {
imgRec := ServiceImageUsage{}
if err := json.Unmarshal(v, &imgRec); err != nil {
return fmt.Errorf("Unable to deserialize service image usage record %v: %v", k, err)
} else {
exclude := false
for _, filter := range filters {
if !filter(imgRec) {
exclude = true
}
}
if !exclude {
imgUsages = append(imgUsages, imgRec)
}
}
return nil
})
}
return nil
})

return imgUsages, readErr
}

type IUFilter func(ServiceImageUsage) bool

func ImageNameRegexFilter(imageId string) IUFilter {
regEx, err := regexp.Compile(imageId)
if err != nil {
return func(iu ServiceImageUsage) bool { return false }
}

return func(iu ServiceImageUsage) bool { return regEx.Match([]byte(iu.ImageId)) }
}

0 comments on commit 5737c30

Please sign in to comment.