Skip to content

Commit

Permalink
fix: parallelize inventory verification (#64)
Browse files Browse the repository at this point in the history
Signed-off-by: Ramkumar Chinchani <[email protected]>
  • Loading branch information
rchincha authored Feb 22, 2024
1 parent d4616be commit 6fc173d
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 18 deletions.
61 changes: 61 additions & 0 deletions pkg/fs/tpool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package fs

import (
"sync"
)

type ThreadPool interface {
// Add adds a task for the threadpool to consume
Add(taskfn func() error)

// Done finishes the threadpool
Done() error
}

type threadpool struct {
wg sync.WaitGroup
n int
backlog int
tasks chan func() error
err error
}

func NewThreadPool(n, backlog int) *threadpool {
pool := &threadpool{n: n, backlog: backlog, tasks: make(chan func() error, backlog)}

for i := 0; i < n; i++ {
// start the runners
pool.wg.Add(1)
go pool.runner()
}

return pool
}

func (pool *threadpool) Add(f func() error) {
pool.tasks <- f
}

func (pool *threadpool) runner() {
defer pool.wg.Done()

for {
taskfn, ok := <-pool.tasks
if !ok {
// no more tasks
return
}

// ignore failures, save error
if err := taskfn(); err != nil {
pool.err = err
}
}
}

func (pool *threadpool) Done() error {
close(pool.tasks)
pool.wg.Wait()

return pool.err
}
61 changes: 43 additions & 18 deletions pkg/fs/verify.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"fmt"
"os"
"path/filepath"
"runtime"
"strconv"
"strings"
"sync/atomic"

"github.com/rs/zerolog/log"
"sigs.k8s.io/bom/pkg/spdx"
Expand Down Expand Up @@ -92,7 +94,11 @@ func Verify(input, inventory, missing string) error {

mdoc := bom.NewDocument("", "")
mdoc.Name = "missing-files-document"
mcount := 0

var mcount atomic.Uint64

backlog := 1024
tpool := NewThreadPool(runtime.NumCPU(), backlog)

for _, entry := range inv.Entries {
mode, err := strconv.ParseInt(entry.Mode, 8, 32)
Expand All @@ -106,28 +112,47 @@ func Verify(input, inventory, missing string) error {
continue
}

if err := checkBOM(input, entry.Path); err != nil {
log.Error().Err(err).Str("path", entry.Path).Msg("inventory verify failed")
tpool.Add(
func(entry Entry) func() error {
taskfn := func() error {
if err := checkBOM(input, entry.Path); err != nil {
log.Error().Err(err).Str("path", entry.Path).Interface("entry", entry).Msg("inventory verify failed")

mcount++
mcount.Add(1)

sfile := spdx.NewFile()
sfile.SetEntity(
&spdx.Entity{
Name: entry.Path,
Checksum: map[string]string{"SHA256": strings.Split(entry.Checksum, ":")[1]},
},
)
sfile := spdx.NewFile()
sfile.SetEntity(
&spdx.Entity{
Name: entry.Path,
Checksum: map[string]string{"SHA256": strings.Split(entry.Checksum, ":")[1]},
},
)

if err := mdoc.AddFile(sfile); err != nil {
log.Error().Err(err).Msg("unable to add file to package")
if err := mdoc.AddFile(sfile); err != nil {
log.Error().Err(err).Msg("unable to add file to package")

return err
}
}
return err
}
}

return nil
}

return taskfn
}(entry),
)
}

// finish with the threadpool
if err := tpool.Done(); err != nil {
log.Error().Err(err).Msg("threadpool failed")

return err
}

if mcount != 0 {
count := mcount.Load()

if count != 0 {
if missing != "" {
if err := bom.WriteDocument(mdoc, missing); err != nil {
log.Error().Err(err).Str("path", missing).Msg("unable to writing missing entries")
Expand All @@ -136,7 +161,7 @@ func Verify(input, inventory, missing string) error {
}
}

return fmt.Errorf("%w: %d entries missing", errors.ErrIncomplete, mcount)
return fmt.Errorf("%w: %d entries missing", errors.ErrIncomplete, count)
}

return nil
Expand Down

0 comments on commit 6fc173d

Please sign in to comment.