Skip to content

Commit

Permalink
⭐️ replace dque with pdque (#951)
Browse files Browse the repository at this point in the history
* replace dque with pdque

* add test

* add pdque into cnspec directly

* clean up

* restore go.mod

* go mod tidy

* fix license

* clean up

* clean up

* better concurrency test

* refactor with require.NoError

* improve tests

* clean up

* go fmt

* test fix?

* fix tests in github action

* test tests

* test tests

* go fmt

* test tests

* clean up

* let New create the directory
  • Loading branch information
mariuskimmina authored Nov 23, 2023
1 parent 746b03e commit ce1a460
Show file tree
Hide file tree
Showing 6 changed files with 800 additions and 22 deletions.
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ require (
github.com/hashicorp/go-multierror v1.1.1
github.com/hashicorp/go-plugin v1.6.0 // indirect
github.com/hashicorp/go-version v1.6.0
github.com/joncrlsn/dque v0.0.0-20211108142734-c2ef48c5192a
github.com/jstemmer/go-junit-report/v2 v2.1.0
github.com/mattn/go-isatty v0.0.20
github.com/mitchellh/mapstructure v1.5.0
Expand Down Expand Up @@ -324,7 +323,7 @@ require (
github.com/sivchari/tenv v1.7.1 // indirect
github.com/sonatard/noctx v0.0.2 // indirect
github.com/sourcegraph/go-diff v0.7.0 // indirect
github.com/spf13/afero v1.10.0 // indirect
github.com/spf13/afero v1.10.0
github.com/spf13/cast v1.5.1 // indirect
github.com/ssgreg/nlreturn/v2 v2.2.1 // indirect
github.com/stbenjam/no-sprintf-host-port v0.1.1 // indirect
Expand Down
4 changes: 0 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,6 @@ github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJA
github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 h1:ZpnhV/YsD2/4cESfV5+Hoeu/iUR3ruzNvZ+yQfO03a0=
github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2/go.mod h1:bBOAhwG1umN6/6ZUMtDFBMQR8jRg9O75tm9K00oMsK4=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gofrs/flock v0.7.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU=
github.com/gofrs/flock v0.8.1 h1:+gYjHKf32LDeiEEFhQaotPbLuUXjY5ZqxKgXy7n59aw=
github.com/gofrs/flock v0.8.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU=
github.com/gofrs/uuid v4.4.0+incompatible h1:3qXRTX8/NbyulANqlc0lchS1gqAVxRgsuW1YrTJupqA=
Expand Down Expand Up @@ -549,8 +548,6 @@ github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9Y
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/joncrlsn/dque v0.0.0-20211108142734-c2ef48c5192a h1:sfe532Ipn7GX0V6mHdynBk393rDmqgI0QmjLK7ct7TU=
github.com/joncrlsn/dque v0.0.0-20211108142734-c2ef48c5192a/go.mod h1:dNKs71rs2VJGBAmttu7fouEsRQlRjxy0p1Sx+T5wbpY=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
Expand Down Expand Up @@ -590,7 +587,6 @@ github.com/kr/fs v0.1.0 h1:Jskdu9ieNAYnjxsi0LbQp1ulIKZV1LAFgK1tWhpZgl8=
github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
Expand Down
28 changes: 12 additions & 16 deletions policy/scan/disk_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,23 @@ import (
"sync"
"time"

"github.com/joncrlsn/dque"
"github.com/rs/zerolog/log"
"go.mondoo.com/cnspec/v9/policy/scan/pdque"
"google.golang.org/protobuf/proto"
)

type diskQueueConfig struct {
dir string
filename string
segmentSize int
sync bool
dir string
filename string
maxSize int
sync bool
}

var defaultDqueConfig = diskQueueConfig{
dir: "/tmp/cnspec-queue", // TODO: consider configurable path
filename: "disk-queue",
segmentSize: 500,
sync: false,
dir: "/tmp/cnspec-queue", // TODO: consider configurable path
filename: "disk-queue",
maxSize: 500,
sync: false,
}

// queueMsg is the being stored in disk queue
Expand All @@ -40,7 +40,7 @@ type queuePayload struct {
}

type diskQueueClient struct {
queue *dque.DQue
queue *pdque.Queue
once sync.Once
wg sync.WaitGroup
entries chan Job
Expand Down Expand Up @@ -68,15 +68,11 @@ func newDqueClient(config diskQueueConfig, handler func(job *Job)) (*diskQueueCl
return nil, fmt.Errorf("cannot create queue directory: %s", err)
}

q.queue, err = dque.NewOrOpen(config.filename, config.dir, config.segmentSize, diskQueueEntryBuilder)
q.queue, err = pdque.NewOrOpen(config.filename, config.dir, config.maxSize, diskQueueEntryBuilder)
if err != nil {
return nil, err
}

if !config.sync {
_ = q.queue.TurboOn()
}

q.entries = make(chan Job)

q.wg.Add(2)
Expand Down Expand Up @@ -127,7 +123,7 @@ func (c *diskQueueClient) popper() {
entry, err := c.queue.DequeueBlock()
if err != nil {
switch err {
case dque.ErrQueueClosed:
case pdque.ErrQueueClosed:
return
default:
log.Error().Err(err).Msg("could not pop job from disk queue")
Expand Down
71 changes: 71 additions & 0 deletions policy/scan/disk_queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright (c) Mondoo, Inc.
// SPDX-License-Identifier: BUSL-1.1

package scan

import (
"os"
"testing"

"go.mondoo.com/cnquery/v9/providers-sdk/v1/inventory"
)

func TestDiskQueueClient_EnqueueDequeue(t *testing.T) {
tempDir, err := os.MkdirTemp("", "testdir")
if err != nil {
t.Fatalf("Failed to create temp dir: %v", err)
}
defer os.RemoveAll(tempDir) // Clean up

// Update the configuration to use the temporary directory
testConfig := defaultDqueConfig
testConfig.dir = tempDir

completionChannel := make(chan struct{}, 50) // Channel to signal job completion

handler := func(job *Job) {
completionChannel <- struct{}{} // Signal completion
}

client, err := newDqueClient(testConfig, handler)
if err != nil {
t.Fatalf("Failed to create diskQueueClient: %v", err)
}
defer client.Stop()

// Test Enqueue
testJob := &Job{
Inventory: &inventory.Inventory{
Spec: &inventory.InventorySpec{
Assets: []*inventory.Asset{
{
Connections: []*inventory.Config{
{
Type: "k8s",
Options: map[string]string{
"path": "./testdata/2pods.yaml",
},
Discover: &inventory.Discovery{
Targets: []string{"auto"},
},
},
},
ManagedBy: "mondoo-operator-123",
},
},
},
},
}
for i := 0; i < 50; i++ {
client.Channel() <- *testJob
}

for i := 0; i < 50; i++ {
<-completionChannel
}

// Verify that all jobs have been processed
if len(completionChannel) != 0 {
t.Errorf("Expected handler to be called 50 times, but was called %d times", 50-len(completionChannel))
}
}
Loading

0 comments on commit ce1a460

Please sign in to comment.