Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow for concurrent cataloger parser calls #3266

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cmd/syft/internal/commands/attest.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/wagoodman/go-progress"

"github.com/anchore/clio"
"github.com/anchore/go-sync"
"github.com/anchore/stereoscope"
"github.com/anchore/syft/cmd/syft/internal/options"
"github.com/anchore/syft/cmd/syft/internal/ui"
Expand Down Expand Up @@ -252,6 +253,8 @@ func generateSBOMForAttestation(ctx context.Context, id clio.Identification, opt
return nil, fmt.Errorf("attest requires use of an OCI registry directly, one or more of the specified sources is unsupported: %v", opts.From)
}

ctx = sync.SetContextExecutor(ctx, sync.NewExecutor(opts.Parallelism))

src, err := getSource(ctx, opts, userInput, stereoscope.RegistryTag)

if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions cmd/syft/internal/commands/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/anchore/clio"
"github.com/anchore/go-collections"
"github.com/anchore/go-sync"
"github.com/anchore/stereoscope"
"github.com/anchore/stereoscope/pkg/image"
"github.com/anchore/syft/cmd/syft/internal/options"
Expand Down Expand Up @@ -167,6 +168,8 @@ func validateArgs(cmd *cobra.Command, args []string, error string) error {
}

func runScan(ctx context.Context, id clio.Identification, opts *scanOptions, userInput string) error {
ctx = sync.SetContextExecutor(ctx, sync.NewExecutor(opts.Parallelism))

writer, err := opts.SBOMWriter()
if err != nil {
return err
Expand Down
3 changes: 2 additions & 1 deletion cmd/syft/internal/options/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package options

import (
"fmt"
"runtime"
"sort"
"strings"

Expand Down Expand Up @@ -72,7 +73,7 @@ func DefaultCatalog() Catalog {
File: defaultFileConfig(),
Relationships: defaultRelationshipsConfig(),
Source: defaultSourceConfig(),
Parallelism: 1,
Parallelism: runtime.NumCPU() * 3,
}
}

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ require (
github.com/BurntSushi/toml v1.4.0
github.com/OneOfOne/xxhash v1.2.8
github.com/adrg/xdg v0.5.0
github.com/anchore/go-sync v0.0.0-20240926143818-0345bfc976f9
github.com/magiconair/properties v1.8.7
golang.org/x/exp v0.0.0-20231108232855-2478ac86f678
)
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ github.com/anchore/go-macholibre v0.0.0-20220308212642-53e6d0aaf6fb h1:iDMnx6LIj
github.com/anchore/go-macholibre v0.0.0-20220308212642-53e6d0aaf6fb/go.mod h1:DmTY2Mfcv38hsHbG78xMiTDdxFtkHpgYNVDPsF2TgHk=
github.com/anchore/go-struct-converter v0.0.0-20221118182256-c68fdcfa2092 h1:aM1rlcoLz8y5B2r4tTLMiVTrMtpfY0O8EScKJxaSaEc=
github.com/anchore/go-struct-converter v0.0.0-20221118182256-c68fdcfa2092/go.mod h1:rYqSE9HbjzpHTI74vwPvae4ZVYZd1lue2ta6xHPdblA=
github.com/anchore/go-sync v0.0.0-20240926143818-0345bfc976f9 h1:CEONkqYICLuKgiIgHIcY9cxSDvQWMtDnIY01HSVCJhc=
github.com/anchore/go-sync v0.0.0-20240926143818-0345bfc976f9/go.mod h1:43zWHVYBx8GXzjjISSD4rMpACGBpUZmE3Yy+qUNnhn4=
github.com/anchore/go-testutils v0.0.0-20200925183923-d5f45b0d3c04 h1:VzprUTpc0vW0nnNKJfJieyH/TZ9UYAnTZs5/gHTdAe8=
github.com/anchore/go-testutils v0.0.0-20200925183923-d5f45b0d3c04/go.mod h1:6dK64g27Qi1qGQZ67gFmBFvEHScy0/C8qhQhNe5B5pQ=
github.com/anchore/go-version v1.2.2-0.20200701162849-18adb9c92b9b h1:e1bmaoJfZVsCYMrIZBpFxwV26CbsuoEh5muXD5I1Ods=
Expand Down
61 changes: 39 additions & 22 deletions internal/task/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ import (
"context"
"fmt"
"runtime/debug"
"sync"
"time"

"github.com/hashicorp/go-multierror"

"github.com/anchore/go-sync"
"github.com/anchore/syft/internal/log"
"github.com/anchore/syft/internal/sbomsync"
"github.com/anchore/syft/syft/event/monitor"
Expand All @@ -35,31 +35,48 @@ func NewTaskExecutor(tasks []Task, numWorkers int) *Executor {
}

func (p *Executor) Execute(ctx context.Context, resolver file.Resolver, s sbomsync.Builder, prog *monitor.CatalogerTaskProgress) error {
var errs error
wg := &sync.WaitGroup{}
for i := 0; i < p.numWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()

for {
tsk, ok := <-p.tasks
if !ok {
return
}

if err := runTaskSafely(ctx, tsk, resolver, s); err != nil {
errs = multierror.Append(errs, fmt.Errorf("failed to run task: %w", err))
prog.SetError(err)
}
prog.Increment()
exec := sync.ContextExecutor(ctx)

collector := sync.NewCollector[error](exec)

run := func(tsk Task) sync.ProviderFunc[error] {
return func() error {
if err := runTaskSafely(ctx, tsk, resolver, s); err != nil {
prog.SetError(err)
return err
}
}()
prog.Increment()
return nil
}
}

for {
tsk, ok := <-p.tasks
if !ok {
break
}

collector.Provide(run(tsk))
}

wg.Wait()
errs := collector.Collect()

if len(errs) == 0 {
return nil
}

var nonNilErrs []error
for _, err := range errs {
if err != nil {
nonNilErrs = append(nonNilErrs, err)
}
}

if len(nonNilErrs) == 0 {
return nil
}

return errs
return multierror.Append(nil, nonNilErrs...)
}

func runTaskSafely(ctx context.Context, t Task, resolver file.Resolver, s sbomsync.Builder) (err error) {
Expand Down
62 changes: 49 additions & 13 deletions syft/file/cataloger/filedigest/cataloger.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/dustin/go-humanize"

"github.com/anchore/go-sync"
stereoscopeFile "github.com/anchore/stereoscope/pkg/file"
"github.com/anchore/syft/internal"
"github.com/anchore/syft/internal/bus"
Expand All @@ -30,6 +31,12 @@ func NewCataloger(hashes []crypto.Hash) *Cataloger {
}
}

type result struct {
coordinates file.Coordinates
digests []file.Digest
err error
}

func (i *Cataloger) Catalog(ctx context.Context, resolver file.Resolver, coordinates ...file.Coordinates) (map[file.Coordinates][]file.Digest, error) {
results := make(map[file.Coordinates][]file.Digest)
var locations []file.Location
Expand All @@ -46,37 +53,66 @@ func (i *Cataloger) Catalog(ctx context.Context, resolver file.Resolver, coordin
}
}

exec := sync.ContextExecutor(ctx)

collector := sync.NewCollector[result](exec)

prog := catalogingProgress(int64(len(locations)))
for _, location := range locations {
result, err := i.catalogLocation(resolver, location)
collector.Provide(i.run(resolver, location, prog))
}

if errors.Is(err, ErrUndigestableFile) {
for _, r := range collector.Collect() {
if r.err != nil {
log.Warnf("failed to process file %q: %+v", r.coordinates.RealPath, r.err)
continue
}

results[r.coordinates] = append(results[r.coordinates], r.digests...)
}

log.Debugf("file digests cataloger processed %d files", prog.Current())

prog.AtomicStage.Set(fmt.Sprintf("%s files", humanize.Comma(prog.Current())))
prog.SetCompleted()

return results, nil
}

func (i *Cataloger) run(resolver file.Resolver, location file.Location, prog *monitor.CatalogerTaskProgress) sync.ProviderFunc[result] {
return func() result {
digests, err := i.catalogLocation(resolver, location)

if errors.Is(err, ErrUndigestableFile) {
return result{
coordinates: location.Coordinates,
}
}

prog.AtomicStage.Set(location.Path())

if internal.IsErrPathPermission(err) {
log.Debugf("file digests cataloger skipping %q: %+v", location.RealPath, err)
continue
return result{
coordinates: location.Coordinates,
}
}

if err != nil {
prog.SetError(err)
return nil, fmt.Errorf("failed to process file %q: %w", location.RealPath, err)
return result{
coordinates: location.Coordinates,
err: fmt.Errorf("failed to process file %q: %w", location.RealPath, err),
}
}

prog.Increment()

results[location.Coordinates] = result
return result{
coordinates: location.Coordinates,
digests: digests,
err: err,
}
}

log.Debugf("file digests cataloger processed %d files", prog.Current())

prog.AtomicStage.Set(fmt.Sprintf("%s files", humanize.Comma(prog.Current())))
prog.SetCompleted()

return results, nil
}

func (i *Cataloger) catalogLocation(resolver file.Resolver, location file.Location) ([]file.Digest, error) {
Expand Down
48 changes: 39 additions & 9 deletions syft/pkg/cataloger/generic/cataloger.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

"github.com/anchore/go-logger"
"github.com/anchore/go-sync"
"github.com/anchore/syft/internal"
"github.com/anchore/syft/internal/log"
"github.com/anchore/syft/syft/artifact"
Expand Down Expand Up @@ -147,36 +148,65 @@ func (c *Cataloger) Name() string {
return c.upstreamCataloger
}

type parserResult struct {
packages []pkg.Package
relationships []artifact.Relationship
err error
}

// Catalog is given an object to resolve file references and content, this function returns any discovered Packages after analyzing the catalog source.
func (c *Cataloger) Catalog(ctx context.Context, resolver file.Resolver) ([]pkg.Package, []artifact.Relationship, error) {
var packages []pkg.Package
var relationships []artifact.Relationship

logger := log.Nested("cataloger", c.upstreamCataloger)

env := Environment{
// TODO: consider passing into the cataloger, this would affect the cataloger interface (and all implementations). This can be deferred until later.
LinuxRelease: linux.IdentifyRelease(resolver),
}

exec := sync.ContextExecutor(ctx)

collector := sync.NewCollector[parserResult](exec)

for _, req := range c.selectFiles(resolver) {
collector.Provide(c.run(ctx, resolver, req, env))
}

results := collector.Collect()

for _, result := range results {
packages = append(packages, result.packages...)
relationships = append(relationships, result.relationships...)
}

return c.process(ctx, resolver, packages, relationships, nil)
}

func (c *Cataloger) run(ctx context.Context, resolver file.Resolver, req request, env Environment) sync.ProviderFunc[parserResult] {
return func() parserResult {
lgr := log.Nested("cataloger", c.upstreamCataloger)

location, parser := req.Location, req.Parser

log.WithFields("path", location.RealPath).Trace("parsing file contents")

discoveredPackages, discoveredRelationships, err := invokeParser(ctx, resolver, location, logger, parser, &env)
discoveredPackages, discoveredRelationships, err := invokeParser(ctx, resolver, location, lgr, parser, &env)
if err != nil {
continue // logging is handled within invokeParser
// note: logging is handled within invokeParser
return parserResult{
err: err,
}
}

for _, p := range discoveredPackages {
p.FoundBy = c.upstreamCataloger
packages = append(packages, p)
for i := range discoveredPackages {
discoveredPackages[i].FoundBy = c.upstreamCataloger
}

relationships = append(relationships, discoveredRelationships...)
return parserResult{
packages: discoveredPackages,
relationships: discoveredRelationships,
}
}
return c.process(ctx, resolver, packages, relationships, nil)
}

func (c *Cataloger) process(ctx context.Context, resolver file.Resolver, pkgs []pkg.Package, rels []artifact.Relationship, err error) ([]pkg.Package, []artifact.Relationship, error) {
Expand Down
Loading