Skip to content

Commit

Permalink
Merge pull request #1644 from weichou1229/on-change-threshold
Browse files Browse the repository at this point in the history
feat: Implement AutoEvent onChangeThreshold
  • Loading branch information
cloudxxx8 authored Nov 1, 2024
2 parents a0938cd + 44a1270 commit a2adfdc
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 22 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ go 1.23
require (
github.com/OneOfOne/xxhash v1.2.8
github.com/edgexfoundry/go-mod-bootstrap/v4 v4.0.0-dev.2
github.com/edgexfoundry/go-mod-core-contracts/v4 v4.0.0-dev.2
github.com/edgexfoundry/go-mod-core-contracts/v4 v4.0.0-dev.3
github.com/edgexfoundry/go-mod-messaging/v4 v4.0.0-dev.3
github.com/google/uuid v1.6.0
github.com/hashicorp/go-multierror v1.1.1
github.com/labstack/echo/v4 v4.12.0
github.com/panjf2000/ants/v2 v2.10.0
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475
github.com/spf13/cast v1.7.0
github.com/stretchr/testify v1.9.0
golang.org/x/net v0.30.0
gopkg.in/yaml.v3 v3.0.1
Expand Down Expand Up @@ -98,7 +99,6 @@ require (
github.com/shoenig/go-m1cpu v0.1.6 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/speps/go-hashids v2.0.0+incompatible // indirect
github.com/spf13/cast v1.7.0 // indirect
github.com/spiffe/go-spiffe/v2 v2.4.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/tklauser/go-sysconf v0.3.12 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ github.com/edgexfoundry/go-mod-bootstrap/v4 v4.0.0-dev.2 h1:T5iCk8PqEdrzgnz6G9xt
github.com/edgexfoundry/go-mod-bootstrap/v4 v4.0.0-dev.2/go.mod h1:54WyXiygNbIfITqLVGXU8nOatZh7pMAyO3NQXOuPr5s=
github.com/edgexfoundry/go-mod-configuration/v4 v4.0.0-dev.3 h1:3SdjghkEqos8AySKmz+ehjmI1HP/EmnRaFwNTf0rbyc=
github.com/edgexfoundry/go-mod-configuration/v4 v4.0.0-dev.3/go.mod h1:s/pjxzTfqbsH1s4KyvefhOYmVNc9RvK6sI4x4SGI8Tk=
github.com/edgexfoundry/go-mod-core-contracts/v4 v4.0.0-dev.2 h1:BEJKSvyW+dMTW/yzEKWjs0tGUZnMkFPYX4eypyoG0IY=
github.com/edgexfoundry/go-mod-core-contracts/v4 v4.0.0-dev.2/go.mod h1:I3EG+Tg/gcVSUJ+IJDuvVKFISnRu8oQtMXqltE1rzT8=
github.com/edgexfoundry/go-mod-core-contracts/v4 v4.0.0-dev.3 h1:BYdXlS/dLNegB+kT+qKbDgsXv/NhSrigMpomLNl9N5Q=
github.com/edgexfoundry/go-mod-core-contracts/v4 v4.0.0-dev.3/go.mod h1:I3EG+Tg/gcVSUJ+IJDuvVKFISnRu8oQtMXqltE1rzT8=
github.com/edgexfoundry/go-mod-messaging/v4 v4.0.0-dev.3 h1:FRpec371q4CnRBol0E4utB0BHZLVu146JtCAhau9ujQ=
github.com/edgexfoundry/go-mod-messaging/v4 v4.0.0-dev.3/go.mod h1:eAmCHilZWXL0skB9Frnm2kZTeY81sF6xKOmePoWKTNE=
github.com/edgexfoundry/go-mod-registry/v4 v4.0.0-dev.2 h1:iHu8JPpmrEOrIZdv0iYW69FlMmkyal/FpbXtC3pHt2c=
Expand Down
50 changes: 32 additions & 18 deletions internal/autoevent/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package autoevent
import (
"context"
"fmt"
"math"
"sync"
"time"

Expand All @@ -24,17 +25,20 @@ import (

"github.com/edgexfoundry/device-sdk-go/v4/internal/application"
sdkCommon "github.com/edgexfoundry/device-sdk-go/v4/internal/common"

"github.com/spf13/cast"
)

type Executor struct {
deviceName string
sourceName string
onChange bool
lastReadings map[string]interface{}
duration time.Duration
stop bool
mutex *sync.Mutex
pool *ants.Pool
deviceName string
sourceName string
onChange bool
onChangeThreshold float64
lastReadings map[string]interface{}
duration time.Duration
stop bool
mutex *sync.Mutex
pool *ants.Pool
}

// Run triggers this Executor executes the handler for the event source periodically
Expand All @@ -61,7 +65,7 @@ func (e *Executor) Run(ctx context.Context, wg *sync.WaitGroup, buffer chan bool
if evt != nil {
if e.onChange {
if e.compareReadings(evt.Readings) {
lc.Debugf("AutoEvent - readings are the same as previous one")
lc.Debugf("AutoEvent - source '%s' readings are the same as previous one", e.sourceName)
continue
}
}
Expand Down Expand Up @@ -109,13 +113,22 @@ func (e *Executor) compareReadings(readings []dtos.BaseReading) bool {
var result = true
for _, reading := range readings {
if lastReading, ok := e.lastReadings[reading.ResourceName]; ok {
if reading.ValueType == common.ValueTypeBinary {
switch reading.ValueType {
case common.ValueTypeBinary:
checksum := xxhash.Checksum64(reading.BinaryValue)
if lastReading != checksum {
e.lastReadings[reading.ResourceName] = checksum
result = false
}
} else {
case common.ValueTypeUint8, common.ValueTypeUint16, common.ValueTypeUint32, common.ValueTypeUint64,
common.ValueTypeInt8, common.ValueTypeInt16, common.ValueTypeInt32, common.ValueTypeInt64,
common.ValueTypeFloat32, common.ValueTypeFloat64:
t := cast.ToFloat64(lastReading) - cast.ToFloat64(reading.Value)
if math.Abs(t) > e.onChangeThreshold {
e.lastReadings[reading.ResourceName] = reading.Value
result = false
}
default:
if lastReading != reading.Value {
e.lastReadings[reading.ResourceName] = reading.Value
result = false
Expand Down Expand Up @@ -155,12 +168,13 @@ func NewExecutor(deviceName string, ae models.AutoEvent, pool *ants.Pool) (*Exec
}

return &Executor{
deviceName: deviceName,
sourceName: ae.SourceName,
onChange: ae.OnChange,
duration: duration,
stop: false,
mutex: &sync.Mutex{},
pool: pool,
deviceName: deviceName,
sourceName: ae.SourceName,
onChange: ae.OnChange,
onChangeThreshold: ae.OnChangeThreshold,
duration: duration,
stop: false,
mutex: &sync.Mutex{},
pool: pool,
}, nil
}
56 changes: 56 additions & 0 deletions internal/autoevent/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,59 @@ func TestCompareReadings(t *testing.T) {
})
}
}

func TestOnChangeThreshold(t *testing.T) {
deviceName := "testDevice"
resourceName := "testResource"
profileName := "testProfile"
autoEvent := models.AutoEvent{SourceName: resourceName, OnChange: true, Interval: "500ms"}
pool, err := ants.NewPool(runtime.GOMAXPROCS(0), ants.WithNonblocking(true))
require.NoError(t, err)
e, err := NewExecutor(deviceName, autoEvent, pool)
require.NoError(t, err)

tests := []struct {
name string
valueType string
lastReadingValue any
currentReadingValue any
onChangeThreshold float64
expectUnchanged bool
}{
{"float32 unchanged is true", common.ValueTypeFloat32, float32(0), float32(0.01), 0.01, true},
{"float32 unchanged is false", common.ValueTypeFloat32, float32(0), float32(0.02), 0.01, false},
{"float64 unchanged is true", common.ValueTypeFloat64, float64(0), float64(0.01), 0.01, true},
{"float64 unchanged is false", common.ValueTypeFloat64, float64(0), float64(0.02), 0.01, false},
{"uint8 unchanged is true", common.ValueTypeUint8, uint8(0), uint8(1), 1, true},
{"uint8 unchanged is false", common.ValueTypeUint8, uint8(0), uint8(2), 1, false},
{"uint16 unchanged is true", common.ValueTypeUint16, uint16(0), uint16(1), 1, true},
{"uint16 unchanged is false", common.ValueTypeUint16, uint16(0), uint16(2), 1, false},
{"uint32 unchanged is true", common.ValueTypeUint32, uint32(0), uint32(1), 1, true},
{"uint32 unchanged is false", common.ValueTypeUint32, uint32(0), uint32(2), 1, false},
{"uint64 unchanged is true", common.ValueTypeUint64, uint64(0), uint64(1), 1, true},
{"uint64 unchanged is false", common.ValueTypeUint64, uint64(0), uint64(2), 1, false},
{"int8 unchanged is true", common.ValueTypeInt8, int8(0), int8(1), 1, true},
{"int8 unchanged is false", common.ValueTypeInt8, int8(0), int8(2), 1, false},
{"int16 unchanged is true", common.ValueTypeInt16, int16(0), int16(1), 1, true},
{"int16 unchanged is false", common.ValueTypeInt16, int16(0), int16(2), 1, false},
{"int32 unchanged is true", common.ValueTypeInt32, int32(0), int32(1), 1, true},
{"int32 unchanged is false", common.ValueTypeInt32, int32(0), int32(2), 1, false},
{"int64 unchanged is true", common.ValueTypeInt64, int64(0), int64(1), 1, true},
{"int64 unchanged is false", common.ValueTypeInt64, int64(0), int64(2), 1, false},
}

for _, testCase := range tests {
t.Run(testCase.name, func(t *testing.T) {
lastReading, err := dtos.NewSimpleReading(profileName, deviceName, resourceName, testCase.valueType, testCase.lastReadingValue)
require.NoError(t, err)
currentReading, err := dtos.NewSimpleReading(profileName, deviceName, resourceName, testCase.valueType, testCase.currentReadingValue)
require.NoError(t, err)
e.lastReadings = map[string]any{lastReading.ResourceName: lastReading.Value}
e.onChangeThreshold = testCase.onChangeThreshold

res := e.compareReadings([]dtos.BaseReading{currentReading})

assert.Equal(t, testCase.expectUnchanged, res, "compareReading result not as expected")
})
}
}

0 comments on commit a2adfdc

Please sign in to comment.