Skip to content

Commit

Permalink
Merge pull request #260 from newrelic/cciutea/fix_storer_file_collision
Browse files Browse the repository at this point in the history
feat: add unique path for storage file
  • Loading branch information
cristianciutea authored Apr 6, 2021
2 parents 30ec2d6 + 2e12863 commit 3f45fc5
Show file tree
Hide file tree
Showing 10 changed files with 298 additions and 10 deletions.
7 changes: 7 additions & 0 deletions data/metric/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ func AddCustomAttributes(metricSet *Set, customAttributes []attribute.Attribute)
}
}

// AddNamespaceAttributes add attributes to MetricSet namespace.
func (ms *Set) AddNamespaceAttributes(attributes ...attribute.Attribute) {
for _, attr := range attributes {
ms.nsAttributes = append(ms.nsAttributes, attr)
}
}

// SetMetric adds a metric to the Set object or updates the metric value if the metric already exists.
// It calculates elapsed difference for RATE and DELTA types.
func (ms *Set) SetMetric(name string, value interface{}, sourceType SourceType) (err error) {
Expand Down
17 changes: 17 additions & 0 deletions data/metric/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,23 @@ func TestSet_SetMetricCachesRateAndDeltas(t *testing.T) {
}
}

func TestSet_SetMetricCache_NameSpaceSpecialChars(t *testing.T) {
storer := persist.NewInMemoryStore()

ms := NewSet("some-event-type", storer, attribute.Attr("::==::::==", "::==::::"))
err := ms.SetMetric("test", 3, DELTA)
assert.NoError(t, err)

nameSpace := ms.namespace("test")

assert.Equal(t, "::==::::====::==::::::test", nameSpace)
var v interface{}
_, err = storer.Get(nameSpace, &v)
assert.NoError(t, err)

assert.Equal(t, 3.0, v)
}

func TestSet_SetMetricsRatesAndDeltas(t *testing.T) {
var testCases = []struct {
sourceType SourceType
Expand Down
7 changes: 6 additions & 1 deletion integration/entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,14 @@ func (e *Entity) SameAs(b *Entity) bool {

// NewMetricSet returns a new instance of Set with its sample attached to the integration.
func (e *Entity) NewMetricSet(eventType string, nameSpacingAttributes ...attribute.Attribute) *metric.Set {

s := metric.NewSet(eventType, e.storer, nameSpacingAttributes...)

if e.Metadata != nil {
if key, err := e.Metadata.Key(); err == nil {
s.AddNamespaceAttributes(attribute.Attr("entityKey", key.String()))
}
}

if len(e.customAttributes) > 0 {
metric.AddCustomAttributes(s, e.customAttributes)
}
Expand Down
3 changes: 2 additions & 1 deletion integration/entity_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ func (a IDAttributes) Less(i, j int) bool {

func (a *IDAttributes) removeEmptyAndDuplicates() {

var uniques IDAttributes
uniques := make(IDAttributes, 0)

var prev IDAttribute
for i, attr := range *a {
if prev.Key != attr.Key && attr.Key != "" {
Expand Down
19 changes: 17 additions & 2 deletions integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package integration

import (
"bytes"
"crypto/md5"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -99,8 +100,14 @@ func New(name, version string, opts ...Option) (i *Integration, err error) {
}

if i.storer == nil {
var err error
i.storer, err = persist.NewFileStore(persist.DefaultPath(i.Name), i.logger, persist.DefaultTTL)
storePath, err := persist.NewStorePath(i.Name, i.CreateUniqueID(), i.logger, persist.DefaultTTL)
if err != nil {
return nil, fmt.Errorf("can't create temporary directory for store: %s", err)
}

storePath.CleanOldFiles()

i.storer, err = persist.NewFileStore(storePath.GetFilePath(), i.logger, persist.DefaultTTL)
if err != nil {
return nil, fmt.Errorf("can't create store: %s", err)
}
Expand Down Expand Up @@ -234,6 +241,14 @@ func (i *Integration) Logger() log.Logger {
return i.logger
}

// CreateUniqueID will generate an md5 string from integration arguments
// to unique identify the integration instance.
func (i *Integration) CreateUniqueID() string {
h := md5.New()
h.Write([]byte(fmt.Sprintf("%v", i.args)))
return fmt.Sprintf("%x", h.Sum(nil))
}

// toJSON serializes integration as JSON. If the pretty attribute is
// set to true, the JSON will be indented for easy reading.
func (i *Integration) toJSON(pretty bool) (output []byte, err error) {
Expand Down
32 changes: 32 additions & 0 deletions integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,38 @@ func TestIntegration_CreateLocalAndRemoteEntities(t *testing.T) {
assert.NotEqual(t, remote, nil)
}

func TestIntegration_CreateUniqueID_Default(t *testing.T) {
type argumentList struct {
sdk_args.DefaultArgumentList
StatusURL string `default:"http://127.0.0.1/status" help:"NGINX status URL. If you are using ngx_http_api_module be sure to include the full path ending with the API version number"`
}
flag.CommandLine = flag.NewFlagSet("cmd", flag.ContinueOnError)

var al argumentList

i, err := New("testIntegration", "0.0.0", Args(&al))
assert.NoError(t, err)

assert.Equal(t, i.CreateUniqueID(), "3071eb6863e28435e6c7e0c2bbe55ecd")
}

func TestIntegration_CreateUniqueID_EnvironmentVar(t *testing.T) {
type argumentList struct {
sdk_args.DefaultArgumentList
StatusURL string `default:"http://127.0.0.1/status" help:"NGINX status URL. If you are using ngx_http_api_module be sure to include the full path ending with the API version number"`
}
var al argumentList
flag.CommandLine = flag.NewFlagSet("cmd", flag.ContinueOnError)

os.Setenv("STATUS_URL", "bar")
defer os.Clearenv()

i, err := New("testIntegration", "0.0.0", Args(&al))
assert.NoError(t, err)

assert.Equal(t, i.CreateUniqueID(), "2d998100982b7de9b4e446c85c3bed78")
}

type testWriter struct {
testFunc func([]byte)
}
Expand Down
5 changes: 4 additions & 1 deletion integration/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,10 @@ func TestItStoresOnDiskByDefault(t *testing.T) {
assert.NoError(t, i.Publish())

// assert data has been flushed to disk
c, err := persist.NewFileStore(persist.DefaultPath(integrationName), log.NewStdErr(true), persist.DefaultTTL)
storePath, err := persist.NewStorePath(i.Name, i.CreateUniqueID(), i.logger, persist.DefaultTTL)
assert.NoError(t, err)

c, err := persist.NewFileStore(storePath.GetFilePath(), log.NewStdErr(true), persist.DefaultTTL)
assert.NoError(t, err)

var v float64
Expand Down
104 changes: 104 additions & 0 deletions persist/store_path.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package persist

import (
"fmt"
"github.com/newrelic/infra-integrations-sdk/log"
"os"
"path/filepath"
"time"
)

const (
storeFileTemplate = "%s-%s.json"
)

// StorePath will handle the location for the persistence.
type StorePath interface {
GetFilePath() string
CleanOldFiles()
}

// storePath will handle the location for the persistence.
type storePath struct {
dir string
integrationName string
integrationID string
ilog log.Logger
ttl time.Duration
}

// NewStorePath create a new instance of StorePath
func NewStorePath(integrationName, integrationID string, ilog log.Logger, ttl time.Duration) (StorePath, error) {
if integrationName == "" {
return nil, fmt.Errorf("integration name not specified")
}

if integrationID == "" {
return nil, fmt.Errorf("integration id not specified")
}

if ttl == 0 {
ttl = DefaultTTL
}

return &storePath{
dir: tmpIntegrationDir(),
integrationName: integrationName,
integrationID: integrationID,
ilog: ilog,
ttl: ttl,
}, nil
}

// GetFilePath will return the file for storing integration state.
func (t *storePath) GetFilePath() string {
return filepath.Join(t.dir, fmt.Sprintf(storeFileTemplate, t.integrationName, t.integrationID))
}

// CleanOldFiles will remove all old files created by this integration.
func (t *storePath) CleanOldFiles() {
files, err := t.findOldFiles()
if err != nil {
t.ilog.Debugf("failed to cleanup old files: %v", err)
return
}

for _, file := range files {
t.ilog.Debugf("removing store file (%s)", file)
err := os.Remove(file)
if err != nil {
t.ilog.Debugf("failed to remove store file (%s): %v", file, err)
continue
}
}
}

// glob returns the pattern for finding all files for the same integration name.
func (t *storePath) glob() string {
return filepath.Join(t.dir, fmt.Sprintf(storeFileTemplate, t.integrationName, "*"))
}

func (t *storePath) findOldFiles() ([]string, error) {
var result []string
// List all files by pattern: /tmp/nr-integrations/com.newrelic.nginx-*.json
files, err := filepath.Glob(t.glob())
if err != nil {
return nil, err
}
for _, file := range files {
if file == t.GetFilePath() {
continue
}

fileStat, err := os.Stat(file)
if err != nil {
continue
}

if now().Sub(fileStat.ModTime()) > t.ttl {
t.ilog.Debugf("store file (%s) is older than %v", fileStat.Name(), t.ttl)
result = append(result, file)
}
}
return result, nil
}
98 changes: 98 additions & 0 deletions persist/store_path_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package persist

import (
"github.com/newrelic/infra-integrations-sdk/log"
"github.com/stretchr/testify/assert"
"os"
"path/filepath"
"testing"
"time"
)

var tmpDir string

func setupTestCase(t *testing.T) func(t *testing.T) {
t.Log("setup test case")

assert.NoError(t, os.RemoveAll(filepath.Join(os.TempDir(), integrationsDir)))
tmpDir = tmpIntegrationDir()

files := []struct {
name string
lastMod time.Duration
}{
{
name: "com.newrelic.fake-a.json",
lastMod: 1 * time.Second,
},
{
name: "com.newrelic.fake-b.json",
lastMod: 80 * time.Second,
},
{
name: "com.newrelic.fake-c.json",
lastMod: 80 * time.Second,
},
{
name: "com.newrelic.flex-b.json",
lastMod: 80 * time.Second,
},
}

for _, file := range files {
f, err := os.Create(filepath.Join(tmpDir, file.name))
assert.NoError(t, err)

lastChanged := time.Now().Local().Add(-file.lastMod)
err = os.Chtimes(f.Name(), lastChanged, lastChanged)
assert.NoError(t, err)
}

return func(t *testing.T) {
t.Log("teardown test case")
assert.NoError(t, os.RemoveAll(tmpDir))
}
}

func TestStorePath_CleanOldFiles(t *testing.T) {

// GIVEN a tmp directory with multiple files
tearDownFn := setupTestCase(t)
defer tearDownFn(t)

// WHEN new store file is generated
newPath, err := NewStorePath("com.newrelic.fake", "c", log.Discard, 1*time.Minute)
assert.NoError(t, err)

// THEN only old files with different integration ID are removed
newPath.CleanOldFiles()

files, err := filepath.Glob(filepath.Join(tmpDir, "*"))
assert.NoError(t, err)

expected := []string{
filepath.Join(tmpDir, "com.newrelic.fake-a.json"),
filepath.Join(tmpDir, "com.newrelic.fake-c.json"),
filepath.Join(tmpDir, "com.newrelic.flex-b.json"),
}
assert.Equal(t, expected, files)
}

func TestStorePath_GetFilePath(t *testing.T) {
storeFile, err := NewStorePath("com.newrelic.fake", "c", log.Discard, 1*time.Minute)
assert.NoError(t, err)

expected := filepath.Join(tmpIntegrationDir(), "com.newrelic.fake-c.json")
assert.Equal(t, expected, storeFile.GetFilePath())
}

func TestStorePath_glob(t *testing.T) {
storeFile, err := NewStorePath("com.newrelic.fake", "c", log.Discard, 1*time.Minute)
assert.NoError(t, err)

tmp, ok := storeFile.(*storePath)
assert.True(t, ok)

expected := filepath.Join(tmpIntegrationDir(), "com.newrelic.fake-*.json")
assert.Equal(t, expected, tmp.glob())
}
16 changes: 11 additions & 5 deletions persist/storer.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,16 +75,22 @@ func SetNow(newNow func() time.Time) {
now = newNow
}

// DefaultPath returns a default folder/filename path to a Storer for an integration from the given name. The name of
// DefaultPath returns a default folder/filename dir to a Storer for an integration from the given name. The name of
// the file will be the name of the integration with the .json extension.
func DefaultPath(integrationName string) string {
dir := tmpIntegrationDir()
file := filepath.Join(dir, integrationName+".json")

return file
}

func tmpIntegrationDir() string {
dir := filepath.Join(os.TempDir(), integrationsDir)
baseDir := path.Join(dir, integrationName+".json")
// Create integrations Storer directory
if os.MkdirAll(dir, dirFilePerm) != nil {
baseDir = os.TempDir()
dir = os.TempDir()
}
return baseDir
return dir
}

// NewInMemoryStore will create and initialize an in-memory Storer (not persistent).
Expand All @@ -96,7 +102,7 @@ func NewInMemoryStore() Storer {
}
}

// NewFileStore returns a disk-backed Storer using the provided file path
// NewFileStore returns a disk-backed Storer using the provided file dir
func NewFileStore(storagePath string, ilog log.Logger, ttl time.Duration) (Storer, error) {
ms := NewInMemoryStore().(*inMemoryStore)

Expand Down

0 comments on commit 3f45fc5

Please sign in to comment.