Skip to content

Commit

Permalink
SM-823: if obd.query fails, it keeps retrying that one too many times (
Browse files Browse the repository at this point in the history
…#75)

* - check if we have failed to query this pid too many times
-if we failed too many times, we should send an error to the cloud

* - fixed linter

* - added more logging

* - add retries to pids with IntervalInSec==0
- updated tests
  • Loading branch information
zakharenkodmytro authored May 24, 2024
1 parent a706c07 commit 9d56c28
Show file tree
Hide file tree
Showing 2 changed files with 204 additions and 3 deletions.
31 changes: 29 additions & 2 deletions internal/worker_runner.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package internal

import (
"context"
"fmt"
"sync"
"time"
Expand Down Expand Up @@ -45,13 +46,16 @@ type workerRunner struct {
func NewWorkerRunner(addr *common.Address, loggerSettingsSvc loggers.TemplateStore,
dataSender network.DataSender, logger zerolog.Logger, fpRunner FingerprintRunner,
pids *models.TemplatePIDs, settings *models.TemplateDeviceSettings, device Device) WorkerRunner {
signalsQueue := &SignalsQueue{lastTimeChecked: make(map[string]time.Time)}
signalsQueue := &SignalsQueue{lastTimeChecked: make(map[string]time.Time), failureCount: make(map[string]int)}
// Interval for sending status payload to cloud. Status payload contains obd signals and non-obd signals.
interval := 20 * time.Second
return &workerRunner{ethAddr: addr, loggerSettingsSvc: loggerSettingsSvc,
dataSender: dataSender, logger: logger, fingerprintRunner: fpRunner, pids: pids, deviceSettings: settings, signalsQueue: signalsQueue, sendPayloadInterval: interval, device: device}
}

// Max failures allowed for a PID before sending an error to the cloud
const maxPidFailures = 10

// Run sends a signed status payload every X seconds, that may or may not contain OBD signals.
// It also has a continuous loop that checks voltage compared to template settings to make sure ok to query OBD.
// It will query the VIN once on startup and send a fingerprint payload (only once per Run).
Expand Down Expand Up @@ -307,16 +311,30 @@ func (wr *workerRunner) queryOBD() {
if lastEnqueuedTime, ok := wr.signalsQueue.lastEnqueuedTime(request.Name); ok {
// if interval is 0, then we only query once at the device startup
if request.IntervalSeconds == 0 {
continue
if wr.signalsQueue.failureCount[request.Name] == 0 {
continue
}
}
if int(time.Since(lastEnqueuedTime).Seconds()) < request.IntervalSeconds {
continue
}
}
// check if we have failed to query this pid too many times
if wr.signalsQueue.failureCount[request.Name] > maxPidFailures {
continue
}

// execute the pid
obdResp, ts, err := commands.RequestPIDRaw(&wr.logger, wr.device.UnitID, request)
if err != nil {
wr.logger.Err(err).Msg("failed to query obd pid")
wr.signalsQueue.IncrementFailureCount(request.Name)
wr.signalsQueue.lastTimeChecked[request.Name] = time.Now()
// if we failed too many times, we should send an error to the cloud
if wr.signalsQueue.failureCount[request.Name] > maxPidFailures {
wr.logger.Err(err).Ctx(context.WithValue(context.Background(), LogToMqtt, "true")).
Msgf("failed to query pid too many times: %+v", request)
}
continue
}
// future: new formula type that could work for proprietary PIDs and could support text, int or float
Expand All @@ -335,6 +353,8 @@ func (wr *workerRunner) queryOBD() {
continue
}

// reset the failure count
wr.signalsQueue.failureCount[request.Name] = 0
wr.signalsQueue.Enqueue(models.SignalData{
Timestamp: ts.UnixMilli(),
Name: request.Name,
Expand Down Expand Up @@ -368,6 +388,7 @@ func (wr *workerRunner) isOkToQueryOBD() (bool, api.PowerStatusResponse) {
type SignalsQueue struct {
signals []models.SignalData
lastTimeChecked map[string]time.Time
failureCount map[string]int
sync.RWMutex
}

Expand All @@ -394,3 +415,9 @@ func (sq *SignalsQueue) Dequeue() []models.SignalData {
sq.signals = []models.SignalData{}
return signals
}

func (sq *SignalsQueue) IncrementFailureCount(requestName string) {
sq.Lock()
defer sq.Unlock()
sq.failureCount[requestName]++
}
176 changes: 175 additions & 1 deletion internal/worker_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,180 @@ func Test_workerRunner_Run_sendSignalsWithDifferentInterval(t *testing.T) {
wr.Stop()
}

func Test_workerRunner_Run_failedToQueryPidTooManyTimes(t *testing.T) {
// when
httpmock.Activate()
defer httpmock.DeactivateAndReset()
const autoPiBaseURL = "http://192.168.4.1:9000"

unitID := uuid.New()

mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

vl, ds, ts, ls := mockComponents(mockCtrl, unitID)

// mock power status resp
psPath := fmt.Sprintf("/dongle/%s/execute_raw/", unitID)
httpmock.RegisterResponder(http.MethodPost, autoPiBaseURL+psPath,
httpmock.NewStringResponder(200, `{"spm": {"last_trigger": {"up": "volt_change"}, "battery": {"voltage": 13.3}}}`))

// mock obd resp
path := fmt.Sprintf("/dongle/%s/execute_raw", unitID)
httpmock.RegisterResponder(http.MethodPost, autoPiBaseURL+path,
func(req *http.Request) (*http.Response, error) {
// Read the request body
bodyBytes, err := io.ReadAll(req.Body)
if err != nil {
return httpmock.NewStringResponse(500, ""), err
}
// Convert the body bytes to string
bodyString := string(bodyBytes)

// Match the request body
if strings.Contains(bodyString, "obd.query fuellevel") {
return httpmock.NewStringResponse(500, `{"error":"Failed to calculate formula: invalid syntax (<string>, line 1)"}`), nil
} else if strings.Contains(bodyString, "obd.query foo") {
return httpmock.NewStringResponse(500, `{"error":"Failed to calculate formula: invalid syntax (<string>, line 1)"}`), nil
}
return httpmock.NewStringResponse(200, `{"value": "7e803412f6700000000", "_stamp": "2024-02-29T17:17:30.534861"}`), nil
},
)

expectOnMocks(ts, vl, unitID, ds, 2)

// Initialize workerRunner here with mocked dependencies
requests := []models.PIDRequest{
{
Name: "fuellevel",
IntervalSeconds: 1,
Formula: "dbc:31|8@0+ (0.392156862745098,0) [0|100] \"%\"",
},
{
Name: "foo",
IntervalSeconds: 0,
Formula: "dbc:31|8@0+ (0.392156862745098,0) [0|100] \"%\"",
},
}

wr := createWorkerRunner(ts, ds, ls, unitID)
wr.pids.Requests = requests
wr.sendPayloadInterval = 10 * time.Second
wr.stop = make(chan bool)
wr.logger = zerolog.New(os.Stdout).With().Timestamp().Str("app", "edge-network").Logger()

// assert data sender is called without fuel level signal
ds.EXPECT().SendDeviceStatusData(gomock.Any()).Times(3).Do(func(data models.DeviceStatusData) {
assert.Equal(t, 7, len(data.Vehicle.Signals))
}).Return(nil)
ds.EXPECT().SendDeviceNetworkData(gomock.Any()).Times(3).Do(func(data models.DeviceNetworkData) {
assert.NotNil(t, data.Cell)
}).Return(nil)

// then the data sender should be called twice
go wr.Run()
time.Sleep(25 * time.Second)
assert.Equal(t, 11, wr.signalsQueue.failureCount["fuellevel"])
assert.Equal(t, 11, wr.signalsQueue.failureCount["foo"])
wr.Stop()
}

func Test_workerRunner_Run_failedToQueryPidButRecover(t *testing.T) {
// when
httpmock.Activate()
defer httpmock.DeactivateAndReset()
const autoPiBaseURL = "http://192.168.4.1:9000"

unitID := uuid.New()

mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()

vl, ds, ts, ls := mockComponents(mockCtrl, unitID)

// mock power status resp
psPath := fmt.Sprintf("/dongle/%s/execute_raw/", unitID)
httpmock.RegisterResponder(http.MethodPost, autoPiBaseURL+psPath,
httpmock.NewStringResponder(200, `{"spm": {"last_trigger": {"up": "volt_change"}, "battery": {"voltage": 13.3}}}`))

// mock obd resp
path := fmt.Sprintf("/dongle/%s/execute_raw", unitID)
var count int
httpmock.RegisterResponder(http.MethodPost, autoPiBaseURL+path,
func(req *http.Request) (*http.Response, error) {
// Read the request body
bodyBytes, err := io.ReadAll(req.Body)
if err != nil {
return httpmock.NewStringResponse(500, ""), err
}
// Convert the body bytes to string
bodyString := string(bodyBytes)

// Match the request body
if strings.Contains(bodyString, "obd.query fuellevel") && count < 10 {
count++
return httpmock.NewStringResponse(500, `{"error":"Failed to calculate formula: invalid syntax (<string>, line 1)"}`), nil
} else if strings.Contains(bodyString, "obd.query foo") && count < 10 {
count++
return httpmock.NewStringResponse(500, `{"error":"Failed to calculate formula: invalid syntax (<string>, line 1)"}`), nil
}
return httpmock.NewStringResponse(200, `{"value": "7e803412f6700000000", "_stamp": "2024-02-29T17:17:30.534861"}`), nil
},
)

expectOnMocks(ts, vl, unitID, ds, 2)

// Initialize workerRunner here with mocked dependencies
requests := []models.PIDRequest{
{
Name: "fuellevel",
IntervalSeconds: 1,
Formula: "dbc:31|8@0+ (0.392156862745098,0) [0|100] \"%\"",
},
{
Name: "foo",
IntervalSeconds: 0,
Formula: "dbc:31|8@0+ (0.392156862745098,0) [0|100] \"%\"",
},
}

wr := createWorkerRunner(ts, ds, ls, unitID)
wr.pids.Requests = requests
wr.sendPayloadInterval = 10 * time.Second
wr.stop = make(chan bool)
wr.logger = zerolog.New(os.Stdout).With().Timestamp().Str("app", "edge-network").Logger()

// assert data sender is called without fuel level signal
ds.EXPECT().SendDeviceStatusData(gomock.Any()).Times(1).Do(func(data models.DeviceStatusData) {
assert.Equal(t, 7, len(data.Vehicle.Signals))
}).Return(nil)
ds.EXPECT().SendDeviceStatusData(gomock.Any()).Times(1).Do(func(data models.DeviceStatusData) {
assert.Equal(t, 9, len(data.Vehicle.Signals))
}).Return(nil)
ds.EXPECT().SendDeviceStatusData(gomock.Any()).Times(1).Do(func(data models.DeviceStatusData) {
assert.Equal(t, 12, len(data.Vehicle.Signals))
found := false
for _, signal := range data.Vehicle.Signals {
if signal.Name == "foo" {
found = true
break
}
}
assert.Falsef(t, found, "foo signal should not be present in the signals")
}).Return(nil)
ds.EXPECT().SendDeviceNetworkData(gomock.Any()).Times(3).Do(func(data models.DeviceNetworkData) {
assert.NotNil(t, data.Cell)
}).Return(nil)

// then the data sender should be called twice
go wr.Run()
time.Sleep(25 * time.Second)
// failure counter should be reset after success query
assert.Equal(t, 0, wr.signalsQueue.failureCount["fuellevel"])
assert.Equal(t, 0, wr.signalsQueue.failureCount["foo"])
wr.Stop()
}

func mockComponents(mockCtrl *gomock.Controller, unitID uuid.UUID) (*mock_loggers.MockVINLogger, *mock_network.MockDataSender, *mock_loggers.MockTemplateStore, FingerprintRunner) {
vl := mock_loggers.NewMockVINLogger(mockCtrl)
ds := mock_network.NewMockDataSender(mockCtrl)
Expand Down Expand Up @@ -525,7 +699,7 @@ func createWorkerRunner(ts *mock_loggers.MockTemplateStore, ds *mock_network.Moc
UnitID: unitID,
},
pids: &models.TemplatePIDs{Requests: nil, TemplateName: "test", Version: "1.0"},
signalsQueue: &SignalsQueue{lastTimeChecked: make(map[string]time.Time)},
signalsQueue: &SignalsQueue{lastTimeChecked: make(map[string]time.Time), failureCount: make(map[string]int)},
}
return wr
}

0 comments on commit 9d56c28

Please sign in to comment.