From edd910f88fe8d7be3e2623ead64b8c8afb4bb078 Mon Sep 17 00:00:00 2001 From: Alex Goodman Date: Tue, 1 Oct 2024 10:18:44 -0400 Subject: [PATCH] [wip] more concurrent catalogers Signed-off-by: Alex Goodman --- cmd/syft/internal/commands/attest.go | 3 + cmd/syft/internal/commands/scan.go | 3 + cmd/syft/internal/options/catalog.go | 3 +- go.mod | 1 + go.sum | 2 + internal/task/executor.go | 61 ++++++++++++-------- syft/file/cataloger/filedigest/cataloger.go | 62 ++++++++++++++++----- syft/pkg/cataloger/generic/cataloger.go | 48 +++++++++++++--- 8 files changed, 138 insertions(+), 45 deletions(-) diff --git a/cmd/syft/internal/commands/attest.go b/cmd/syft/internal/commands/attest.go index fa3b5ceef52..c8ef599ff8d 100644 --- a/cmd/syft/internal/commands/attest.go +++ b/cmd/syft/internal/commands/attest.go @@ -13,6 +13,7 @@ import ( "github.com/wagoodman/go-progress" "github.com/anchore/clio" + "github.com/anchore/go-sync" "github.com/anchore/stereoscope" "github.com/anchore/syft/cmd/syft/internal/options" "github.com/anchore/syft/cmd/syft/internal/ui" @@ -252,6 +253,8 @@ func generateSBOMForAttestation(ctx context.Context, id clio.Identification, opt return nil, fmt.Errorf("attest requires use of an OCI registry directly, one or more of the specified sources is unsupported: %v", opts.From) } + ctx = sync.SetContextExecutor(ctx, sync.NewExecutor(opts.Parallelism)) + src, err := getSource(ctx, opts, userInput, stereoscope.RegistryTag) if err != nil { diff --git a/cmd/syft/internal/commands/scan.go b/cmd/syft/internal/commands/scan.go index 48fcc8c1ebc..ae01c3a8ccd 100644 --- a/cmd/syft/internal/commands/scan.go +++ b/cmd/syft/internal/commands/scan.go @@ -14,6 +14,7 @@ import ( "github.com/anchore/clio" "github.com/anchore/go-collections" + "github.com/anchore/go-sync" "github.com/anchore/stereoscope" "github.com/anchore/stereoscope/pkg/image" "github.com/anchore/syft/cmd/syft/internal/options" @@ -167,6 +168,8 @@ func validateArgs(cmd *cobra.Command, args []string, error string) error { } func runScan(ctx context.Context, id clio.Identification, opts *scanOptions, userInput string) error { + ctx = sync.SetContextExecutor(ctx, sync.NewExecutor(opts.Parallelism)) + writer, err := opts.SBOMWriter() if err != nil { return err diff --git a/cmd/syft/internal/options/catalog.go b/cmd/syft/internal/options/catalog.go index ec75d392f91..ce182d7f0e9 100644 --- a/cmd/syft/internal/options/catalog.go +++ b/cmd/syft/internal/options/catalog.go @@ -2,6 +2,7 @@ package options import ( "fmt" + "runtime" "sort" "strings" @@ -72,7 +73,7 @@ func DefaultCatalog() Catalog { File: defaultFileConfig(), Relationships: defaultRelationshipsConfig(), Source: defaultSourceConfig(), - Parallelism: 1, + Parallelism: runtime.NumCPU() * 3, } } diff --git a/go.mod b/go.mod index 8cd36e216f4..78eb5c1778d 100644 --- a/go.mod +++ b/go.mod @@ -90,6 +90,7 @@ require ( github.com/BurntSushi/toml v1.4.0 github.com/OneOfOne/xxhash v1.2.8 github.com/adrg/xdg v0.5.0 + github.com/anchore/go-sync v0.0.0-20240926143818-0345bfc976f9 github.com/magiconair/properties v1.8.7 golang.org/x/exp v0.0.0-20231108232855-2478ac86f678 ) diff --git a/go.sum b/go.sum index 16e38c9860a..6b035d84555 100644 --- a/go.sum +++ b/go.sum @@ -109,6 +109,8 @@ github.com/anchore/go-macholibre v0.0.0-20220308212642-53e6d0aaf6fb h1:iDMnx6LIj github.com/anchore/go-macholibre v0.0.0-20220308212642-53e6d0aaf6fb/go.mod h1:DmTY2Mfcv38hsHbG78xMiTDdxFtkHpgYNVDPsF2TgHk= github.com/anchore/go-struct-converter v0.0.0-20221118182256-c68fdcfa2092 h1:aM1rlcoLz8y5B2r4tTLMiVTrMtpfY0O8EScKJxaSaEc= github.com/anchore/go-struct-converter v0.0.0-20221118182256-c68fdcfa2092/go.mod h1:rYqSE9HbjzpHTI74vwPvae4ZVYZd1lue2ta6xHPdblA= +github.com/anchore/go-sync v0.0.0-20240926143818-0345bfc976f9 h1:CEONkqYICLuKgiIgHIcY9cxSDvQWMtDnIY01HSVCJhc= +github.com/anchore/go-sync v0.0.0-20240926143818-0345bfc976f9/go.mod h1:43zWHVYBx8GXzjjISSD4rMpACGBpUZmE3Yy+qUNnhn4= github.com/anchore/go-testutils v0.0.0-20200925183923-d5f45b0d3c04 h1:VzprUTpc0vW0nnNKJfJieyH/TZ9UYAnTZs5/gHTdAe8= github.com/anchore/go-testutils v0.0.0-20200925183923-d5f45b0d3c04/go.mod h1:6dK64g27Qi1qGQZ67gFmBFvEHScy0/C8qhQhNe5B5pQ= github.com/anchore/go-version v1.2.2-0.20200701162849-18adb9c92b9b h1:e1bmaoJfZVsCYMrIZBpFxwV26CbsuoEh5muXD5I1Ods= diff --git a/internal/task/executor.go b/internal/task/executor.go index 899796424be..9dabba47535 100644 --- a/internal/task/executor.go +++ b/internal/task/executor.go @@ -4,11 +4,11 @@ import ( "context" "fmt" "runtime/debug" - "sync" "time" "github.com/hashicorp/go-multierror" + "github.com/anchore/go-sync" "github.com/anchore/syft/internal/log" "github.com/anchore/syft/internal/sbomsync" "github.com/anchore/syft/syft/event/monitor" @@ -35,31 +35,48 @@ func NewTaskExecutor(tasks []Task, numWorkers int) *Executor { } func (p *Executor) Execute(ctx context.Context, resolver file.Resolver, s sbomsync.Builder, prog *monitor.CatalogerTaskProgress) error { - var errs error - wg := &sync.WaitGroup{} - for i := 0; i < p.numWorkers; i++ { - wg.Add(1) - go func() { - defer wg.Done() - - for { - tsk, ok := <-p.tasks - if !ok { - return - } - - if err := runTaskSafely(ctx, tsk, resolver, s); err != nil { - errs = multierror.Append(errs, fmt.Errorf("failed to run task: %w", err)) - prog.SetError(err) - } - prog.Increment() + exec := sync.ContextExecutor(ctx) + + collector := sync.NewCollector[error](exec) + + run := func(tsk Task) sync.ProviderFunc[error] { + return func() error { + if err := runTaskSafely(ctx, tsk, resolver, s); err != nil { + prog.SetError(err) + return err } - }() + prog.Increment() + return nil + } + } + + for { + tsk, ok := <-p.tasks + if !ok { + break + } + + collector.Provide(run(tsk)) } - wg.Wait() + errs := collector.Collect() + + if len(errs) == 0 { + return nil + } + + var nonNilErrs []error + for _, err := range errs { + if err != nil { + nonNilErrs = append(nonNilErrs, err) + } + } + + if len(nonNilErrs) == 0 { + return nil + } - return errs + return multierror.Append(nil, nonNilErrs...) } func runTaskSafely(ctx context.Context, t Task, resolver file.Resolver, s sbomsync.Builder) (err error) { diff --git a/syft/file/cataloger/filedigest/cataloger.go b/syft/file/cataloger/filedigest/cataloger.go index e5d8c347e96..15d310e70df 100644 --- a/syft/file/cataloger/filedigest/cataloger.go +++ b/syft/file/cataloger/filedigest/cataloger.go @@ -8,6 +8,7 @@ import ( "github.com/dustin/go-humanize" + "github.com/anchore/go-sync" stereoscopeFile "github.com/anchore/stereoscope/pkg/file" "github.com/anchore/syft/internal" "github.com/anchore/syft/internal/bus" @@ -30,6 +31,12 @@ func NewCataloger(hashes []crypto.Hash) *Cataloger { } } +type result struct { + coordinates file.Coordinates + digests []file.Digest + err error +} + func (i *Cataloger) Catalog(ctx context.Context, resolver file.Resolver, coordinates ...file.Coordinates) (map[file.Coordinates][]file.Digest, error) { results := make(map[file.Coordinates][]file.Digest) var locations []file.Location @@ -46,37 +53,66 @@ func (i *Cataloger) Catalog(ctx context.Context, resolver file.Resolver, coordin } } + exec := sync.ContextExecutor(ctx) + + collector := sync.NewCollector[result](exec) + prog := catalogingProgress(int64(len(locations))) for _, location := range locations { - result, err := i.catalogLocation(resolver, location) + collector.Provide(i.run(resolver, location, prog)) + } - if errors.Is(err, ErrUndigestableFile) { + for _, r := range collector.Collect() { + if r.err != nil { + log.Warnf("failed to process file %q: %+v", r.coordinates.RealPath, r.err) continue } + results[r.coordinates] = append(results[r.coordinates], r.digests...) + } + + log.Debugf("file digests cataloger processed %d files", prog.Current()) + + prog.AtomicStage.Set(fmt.Sprintf("%s files", humanize.Comma(prog.Current()))) + prog.SetCompleted() + + return results, nil +} + +func (i *Cataloger) run(resolver file.Resolver, location file.Location, prog *monitor.CatalogerTaskProgress) sync.ProviderFunc[result] { + return func() result { + digests, err := i.catalogLocation(resolver, location) + + if errors.Is(err, ErrUndigestableFile) { + return result{ + coordinates: location.Coordinates, + } + } + prog.AtomicStage.Set(location.Path()) if internal.IsErrPathPermission(err) { log.Debugf("file digests cataloger skipping %q: %+v", location.RealPath, err) - continue + return result{ + coordinates: location.Coordinates, + } } if err != nil { prog.SetError(err) - return nil, fmt.Errorf("failed to process file %q: %w", location.RealPath, err) + return result{ + coordinates: location.Coordinates, + err: fmt.Errorf("failed to process file %q: %w", location.RealPath, err), + } } prog.Increment() - - results[location.Coordinates] = result + return result{ + coordinates: location.Coordinates, + digests: digests, + err: err, + } } - - log.Debugf("file digests cataloger processed %d files", prog.Current()) - - prog.AtomicStage.Set(fmt.Sprintf("%s files", humanize.Comma(prog.Current()))) - prog.SetCompleted() - - return results, nil } func (i *Cataloger) catalogLocation(resolver file.Resolver, location file.Location) ([]file.Digest, error) { diff --git a/syft/pkg/cataloger/generic/cataloger.go b/syft/pkg/cataloger/generic/cataloger.go index 49e1b47b936..bf084a51c15 100644 --- a/syft/pkg/cataloger/generic/cataloger.go +++ b/syft/pkg/cataloger/generic/cataloger.go @@ -4,6 +4,7 @@ import ( "context" "github.com/anchore/go-logger" + "github.com/anchore/go-sync" "github.com/anchore/syft/internal" "github.com/anchore/syft/internal/log" "github.com/anchore/syft/syft/artifact" @@ -147,36 +148,65 @@ func (c *Cataloger) Name() string { return c.upstreamCataloger } +type parserResult struct { + packages []pkg.Package + relationships []artifact.Relationship + err error +} + // Catalog is given an object to resolve file references and content, this function returns any discovered Packages after analyzing the catalog source. func (c *Cataloger) Catalog(ctx context.Context, resolver file.Resolver) ([]pkg.Package, []artifact.Relationship, error) { var packages []pkg.Package var relationships []artifact.Relationship - logger := log.Nested("cataloger", c.upstreamCataloger) - env := Environment{ // TODO: consider passing into the cataloger, this would affect the cataloger interface (and all implementations). This can be deferred until later. LinuxRelease: linux.IdentifyRelease(resolver), } + exec := sync.ContextExecutor(ctx) + + collector := sync.NewCollector[parserResult](exec) + for _, req := range c.selectFiles(resolver) { + collector.Provide(c.run(ctx, resolver, req, env)) + } + + results := collector.Collect() + + for _, result := range results { + packages = append(packages, result.packages...) + relationships = append(relationships, result.relationships...) + } + + return c.process(ctx, resolver, packages, relationships, nil) +} + +func (c *Cataloger) run(ctx context.Context, resolver file.Resolver, req request, env Environment) sync.ProviderFunc[parserResult] { + return func() parserResult { + lgr := log.Nested("cataloger", c.upstreamCataloger) + location, parser := req.Location, req.Parser log.WithFields("path", location.RealPath).Trace("parsing file contents") - discoveredPackages, discoveredRelationships, err := invokeParser(ctx, resolver, location, logger, parser, &env) + discoveredPackages, discoveredRelationships, err := invokeParser(ctx, resolver, location, lgr, parser, &env) if err != nil { - continue // logging is handled within invokeParser + // note: logging is handled within invokeParser + return parserResult{ + err: err, + } } - for _, p := range discoveredPackages { - p.FoundBy = c.upstreamCataloger - packages = append(packages, p) + for i := range discoveredPackages { + discoveredPackages[i].FoundBy = c.upstreamCataloger } - relationships = append(relationships, discoveredRelationships...) + return parserResult{ + packages: discoveredPackages, + relationships: discoveredRelationships, + } } - return c.process(ctx, resolver, packages, relationships, nil) } func (c *Cataloger) process(ctx context.Context, resolver file.Resolver, pkgs []pkg.Package, rels []artifact.Relationship, err error) ([]pkg.Package, []artifact.Relationship, error) {