diff --git a/internal/worker_runner.go b/internal/worker_runner.go index ed46b03..9ba784a 100644 --- a/internal/worker_runner.go +++ b/internal/worker_runner.go @@ -1,6 +1,7 @@ package internal import ( + "context" "fmt" "sync" "time" @@ -45,13 +46,16 @@ type workerRunner struct { func NewWorkerRunner(addr *common.Address, loggerSettingsSvc loggers.TemplateStore, dataSender network.DataSender, logger zerolog.Logger, fpRunner FingerprintRunner, pids *models.TemplatePIDs, settings *models.TemplateDeviceSettings, device Device) WorkerRunner { - signalsQueue := &SignalsQueue{lastTimeChecked: make(map[string]time.Time)} + signalsQueue := &SignalsQueue{lastTimeChecked: make(map[string]time.Time), failureCount: make(map[string]int)} // Interval for sending status payload to cloud. Status payload contains obd signals and non-obd signals. interval := 20 * time.Second return &workerRunner{ethAddr: addr, loggerSettingsSvc: loggerSettingsSvc, dataSender: dataSender, logger: logger, fingerprintRunner: fpRunner, pids: pids, deviceSettings: settings, signalsQueue: signalsQueue, sendPayloadInterval: interval, device: device} } +// Max failures allowed for a PID before sending an error to the cloud +const maxPidFailures = 10 + // Run sends a signed status payload every X seconds, that may or may not contain OBD signals. // It also has a continuous loop that checks voltage compared to template settings to make sure ok to query OBD. // It will query the VIN once on startup and send a fingerprint payload (only once per Run). @@ -307,16 +311,30 @@ func (wr *workerRunner) queryOBD() { if lastEnqueuedTime, ok := wr.signalsQueue.lastEnqueuedTime(request.Name); ok { // if interval is 0, then we only query once at the device startup if request.IntervalSeconds == 0 { - continue + if wr.signalsQueue.failureCount[request.Name] == 0 { + continue + } } if int(time.Since(lastEnqueuedTime).Seconds()) < request.IntervalSeconds { continue } } + // check if we have failed to query this pid too many times + if wr.signalsQueue.failureCount[request.Name] > maxPidFailures { + continue + } + // execute the pid obdResp, ts, err := commands.RequestPIDRaw(&wr.logger, wr.device.UnitID, request) if err != nil { wr.logger.Err(err).Msg("failed to query obd pid") + wr.signalsQueue.IncrementFailureCount(request.Name) + wr.signalsQueue.lastTimeChecked[request.Name] = time.Now() + // if we failed too many times, we should send an error to the cloud + if wr.signalsQueue.failureCount[request.Name] > maxPidFailures { + wr.logger.Err(err).Ctx(context.WithValue(context.Background(), LogToMqtt, "true")). + Msgf("failed to query pid too many times: %+v", request) + } continue } // future: new formula type that could work for proprietary PIDs and could support text, int or float @@ -335,6 +353,8 @@ func (wr *workerRunner) queryOBD() { continue } + // reset the failure count + wr.signalsQueue.failureCount[request.Name] = 0 wr.signalsQueue.Enqueue(models.SignalData{ Timestamp: ts.UnixMilli(), Name: request.Name, @@ -368,6 +388,7 @@ func (wr *workerRunner) isOkToQueryOBD() (bool, api.PowerStatusResponse) { type SignalsQueue struct { signals []models.SignalData lastTimeChecked map[string]time.Time + failureCount map[string]int sync.RWMutex } @@ -394,3 +415,9 @@ func (sq *SignalsQueue) Dequeue() []models.SignalData { sq.signals = []models.SignalData{} return signals } + +func (sq *SignalsQueue) IncrementFailureCount(requestName string) { + sq.Lock() + defer sq.Unlock() + sq.failureCount[requestName]++ +} diff --git a/internal/worker_runner_test.go b/internal/worker_runner_test.go index 6669784..2221611 100644 --- a/internal/worker_runner_test.go +++ b/internal/worker_runner_test.go @@ -494,6 +494,180 @@ func Test_workerRunner_Run_sendSignalsWithDifferentInterval(t *testing.T) { wr.Stop() } +func Test_workerRunner_Run_failedToQueryPidTooManyTimes(t *testing.T) { + // when + httpmock.Activate() + defer httpmock.DeactivateAndReset() + const autoPiBaseURL = "http://192.168.4.1:9000" + + unitID := uuid.New() + + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + vl, ds, ts, ls := mockComponents(mockCtrl, unitID) + + // mock power status resp + psPath := fmt.Sprintf("/dongle/%s/execute_raw/", unitID) + httpmock.RegisterResponder(http.MethodPost, autoPiBaseURL+psPath, + httpmock.NewStringResponder(200, `{"spm": {"last_trigger": {"up": "volt_change"}, "battery": {"voltage": 13.3}}}`)) + + // mock obd resp + path := fmt.Sprintf("/dongle/%s/execute_raw", unitID) + httpmock.RegisterResponder(http.MethodPost, autoPiBaseURL+path, + func(req *http.Request) (*http.Response, error) { + // Read the request body + bodyBytes, err := io.ReadAll(req.Body) + if err != nil { + return httpmock.NewStringResponse(500, ""), err + } + // Convert the body bytes to string + bodyString := string(bodyBytes) + + // Match the request body + if strings.Contains(bodyString, "obd.query fuellevel") { + return httpmock.NewStringResponse(500, `{"error":"Failed to calculate formula: invalid syntax (, line 1)"}`), nil + } else if strings.Contains(bodyString, "obd.query foo") { + return httpmock.NewStringResponse(500, `{"error":"Failed to calculate formula: invalid syntax (, line 1)"}`), nil + } + return httpmock.NewStringResponse(200, `{"value": "7e803412f6700000000", "_stamp": "2024-02-29T17:17:30.534861"}`), nil + }, + ) + + expectOnMocks(ts, vl, unitID, ds, 2) + + // Initialize workerRunner here with mocked dependencies + requests := []models.PIDRequest{ + { + Name: "fuellevel", + IntervalSeconds: 1, + Formula: "dbc:31|8@0+ (0.392156862745098,0) [0|100] \"%\"", + }, + { + Name: "foo", + IntervalSeconds: 0, + Formula: "dbc:31|8@0+ (0.392156862745098,0) [0|100] \"%\"", + }, + } + + wr := createWorkerRunner(ts, ds, ls, unitID) + wr.pids.Requests = requests + wr.sendPayloadInterval = 10 * time.Second + wr.stop = make(chan bool) + wr.logger = zerolog.New(os.Stdout).With().Timestamp().Str("app", "edge-network").Logger() + + // assert data sender is called without fuel level signal + ds.EXPECT().SendDeviceStatusData(gomock.Any()).Times(3).Do(func(data models.DeviceStatusData) { + assert.Equal(t, 7, len(data.Vehicle.Signals)) + }).Return(nil) + ds.EXPECT().SendDeviceNetworkData(gomock.Any()).Times(3).Do(func(data models.DeviceNetworkData) { + assert.NotNil(t, data.Cell) + }).Return(nil) + + // then the data sender should be called twice + go wr.Run() + time.Sleep(25 * time.Second) + assert.Equal(t, 11, wr.signalsQueue.failureCount["fuellevel"]) + assert.Equal(t, 11, wr.signalsQueue.failureCount["foo"]) + wr.Stop() +} + +func Test_workerRunner_Run_failedToQueryPidButRecover(t *testing.T) { + // when + httpmock.Activate() + defer httpmock.DeactivateAndReset() + const autoPiBaseURL = "http://192.168.4.1:9000" + + unitID := uuid.New() + + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + vl, ds, ts, ls := mockComponents(mockCtrl, unitID) + + // mock power status resp + psPath := fmt.Sprintf("/dongle/%s/execute_raw/", unitID) + httpmock.RegisterResponder(http.MethodPost, autoPiBaseURL+psPath, + httpmock.NewStringResponder(200, `{"spm": {"last_trigger": {"up": "volt_change"}, "battery": {"voltage": 13.3}}}`)) + + // mock obd resp + path := fmt.Sprintf("/dongle/%s/execute_raw", unitID) + var count int + httpmock.RegisterResponder(http.MethodPost, autoPiBaseURL+path, + func(req *http.Request) (*http.Response, error) { + // Read the request body + bodyBytes, err := io.ReadAll(req.Body) + if err != nil { + return httpmock.NewStringResponse(500, ""), err + } + // Convert the body bytes to string + bodyString := string(bodyBytes) + + // Match the request body + if strings.Contains(bodyString, "obd.query fuellevel") && count < 10 { + count++ + return httpmock.NewStringResponse(500, `{"error":"Failed to calculate formula: invalid syntax (, line 1)"}`), nil + } else if strings.Contains(bodyString, "obd.query foo") && count < 10 { + count++ + return httpmock.NewStringResponse(500, `{"error":"Failed to calculate formula: invalid syntax (, line 1)"}`), nil + } + return httpmock.NewStringResponse(200, `{"value": "7e803412f6700000000", "_stamp": "2024-02-29T17:17:30.534861"}`), nil + }, + ) + + expectOnMocks(ts, vl, unitID, ds, 2) + + // Initialize workerRunner here with mocked dependencies + requests := []models.PIDRequest{ + { + Name: "fuellevel", + IntervalSeconds: 1, + Formula: "dbc:31|8@0+ (0.392156862745098,0) [0|100] \"%\"", + }, + { + Name: "foo", + IntervalSeconds: 0, + Formula: "dbc:31|8@0+ (0.392156862745098,0) [0|100] \"%\"", + }, + } + + wr := createWorkerRunner(ts, ds, ls, unitID) + wr.pids.Requests = requests + wr.sendPayloadInterval = 10 * time.Second + wr.stop = make(chan bool) + wr.logger = zerolog.New(os.Stdout).With().Timestamp().Str("app", "edge-network").Logger() + + // assert data sender is called without fuel level signal + ds.EXPECT().SendDeviceStatusData(gomock.Any()).Times(1).Do(func(data models.DeviceStatusData) { + assert.Equal(t, 7, len(data.Vehicle.Signals)) + }).Return(nil) + ds.EXPECT().SendDeviceStatusData(gomock.Any()).Times(1).Do(func(data models.DeviceStatusData) { + assert.Equal(t, 9, len(data.Vehicle.Signals)) + }).Return(nil) + ds.EXPECT().SendDeviceStatusData(gomock.Any()).Times(1).Do(func(data models.DeviceStatusData) { + assert.Equal(t, 12, len(data.Vehicle.Signals)) + found := false + for _, signal := range data.Vehicle.Signals { + if signal.Name == "foo" { + found = true + break + } + } + assert.Falsef(t, found, "foo signal should not be present in the signals") + }).Return(nil) + ds.EXPECT().SendDeviceNetworkData(gomock.Any()).Times(3).Do(func(data models.DeviceNetworkData) { + assert.NotNil(t, data.Cell) + }).Return(nil) + + // then the data sender should be called twice + go wr.Run() + time.Sleep(25 * time.Second) + // failure counter should be reset after success query + assert.Equal(t, 0, wr.signalsQueue.failureCount["fuellevel"]) + assert.Equal(t, 0, wr.signalsQueue.failureCount["foo"]) + wr.Stop() +} + func mockComponents(mockCtrl *gomock.Controller, unitID uuid.UUID) (*mock_loggers.MockVINLogger, *mock_network.MockDataSender, *mock_loggers.MockTemplateStore, FingerprintRunner) { vl := mock_loggers.NewMockVINLogger(mockCtrl) ds := mock_network.NewMockDataSender(mockCtrl) @@ -525,7 +699,7 @@ func createWorkerRunner(ts *mock_loggers.MockTemplateStore, ds *mock_network.Moc UnitID: unitID, }, pids: &models.TemplatePIDs{Requests: nil, TemplateName: "test", Version: "1.0"}, - signalsQueue: &SignalsQueue{lastTimeChecked: make(map[string]time.Time)}, + signalsQueue: &SignalsQueue{lastTimeChecked: make(map[string]time.Time), failureCount: make(map[string]int)}, } return wr }