Skip to content

Commit

Permalink
add unit tests for TxMsgAttempts and debug
Browse files Browse the repository at this point in the history
  • Loading branch information
matth-x committed Jul 31, 2024
1 parent 3b12c23 commit a15feb4
Show file tree
Hide file tree
Showing 7 changed files with 381 additions and 26 deletions.
1 change: 1 addition & 0 deletions src/MicroOcpp/Core/RequestQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ bool VolatileRequestQueue::pushRequestBack(std::unique_ptr<Request> request) {

if (strcmp(requests[index]->getOperationType(), "StatusNotification")!= 0)
{
i++;
continue;
}
auto old_status_notification = static_cast<Ocpp16::StatusNotification*>(requests[index]->getOperation());
Expand Down
2 changes: 1 addition & 1 deletion src/MicroOcpp/Core/RequestQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
#endif

#ifndef MO_NUM_REQUEST_QUEUES
#define MO_NUM_REQUEST_QUEUES 3
#define MO_NUM_REQUEST_QUEUES 5
#endif

namespace MicroOcpp {
Expand Down
115 changes: 92 additions & 23 deletions src/MicroOcpp/Model/ConnectorBase/Connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,19 +67,13 @@ Connector::Connector(Context& context, std::shared_ptr<FilesystemAdapter> filesy

txStartOnPowerPathClosedBool = declareConfiguration<bool>(MO_CONFIG_EXT_PREFIX "TxStartOnPowerPathClosed", false);

transactionMessageAttemptsInt = declareConfiguration<int>("TransactionMessageAttempts", 2);
transactionMessageAttemptsInt = declareConfiguration<int>("TransactionMessageAttempts", 3);
transactionMessageRetryIntervalInt = declareConfiguration<int>("TransactionMessageRetryInterval", 60);

if (!availabilityBool) {
MO_DBG_ERR("Cannot declare availabilityBool");
}

if (model.getTransactionStore()) {
transaction = model.getTransactionStore()->getLatestTransaction(connectorId);
} else {
MO_DBG_ERR("must initialize TxStore before Connector");
}

char txFnamePrefix [30];
snprintf(txFnamePrefix, sizeof(txFnamePrefix), "tx-%u-", connectorId);
size_t txFnamePrefixLen = strlen(txFnamePrefix);
Expand Down Expand Up @@ -112,12 +106,21 @@ Connector::Connector(Context& context, std::shared_ptr<FilesystemAdapter> filesy
txNrBegin = parsedTxNr;
}
}

MO_DBG_DEBUG("found %s%u.jsn - Internal range from %u to %u (exclusive)", txFnamePrefix, parsedTxNr, txNrBegin, txNrBack);
}
return 0;
});

MO_DBG_DEBUG("found %u transactions for connector %u. Internal range from %u to %u (exclusive)", (txNrBack + MAX_TX_CNT - txNrBegin) % MAX_TX_CNT, connectorId, txNrBegin, txNrBack);
txNrFront = txNrBegin;

if (model.getTransactionStore()) {
unsigned int txNrLatest = (txNrBack + MAX_TX_CNT - 1) % MAX_TX_CNT; //txNr of the most recent tx on flash
transaction = model.getTransactionStore()->getTransaction(connectorId, txNrLatest); //returns nullptr if txNrLatest does not exist on flash
} else {
MO_DBG_ERR("must initialize TxStore before Connector");
}
}

Connector::~Connector() {
Expand Down Expand Up @@ -259,20 +262,26 @@ void Connector::loop() {
}

if (removed) {
if (txNrFront == txNrBack) {
txNrFront = transaction->getTxNr();
}
txNrBack = transaction->getTxNr(); //roll back creation of last tx entry
}

MO_DBG_DEBUG("collect aborted or silent transaction %u-%u %s", connectorId, transaction->getTxNr(), removed ? "" : "failure");
MO_DBG_VERBOSE("txNrBegin=%u, txNrFront=%u, txNrBack=%u", txNrBegin, txNrFront, txNrBack);
transaction = nullptr;
}

if (transaction && transaction->isAborted()) {
MO_DBG_DEBUG("collect aborted transaction %u-%u", connectorId, transaction->getTxNr());
MO_DBG_VERBOSE("txNrBegin=%u, txNrFront=%u, txNrBack=%u", txNrBegin, txNrFront, txNrBack);
transaction = nullptr;
}

if (transaction && transaction->getStopSync().isRequested()) {
MO_DBG_DEBUG("collect obsolete transaction %u-%u", connectorId, transaction->getTxNr());
MO_DBG_VERBOSE("txNrBegin=%u, txNrFront=%u, txNrBack=%u", txNrBegin, txNrFront, txNrBack);
transaction = nullptr;
}

Expand Down Expand Up @@ -541,7 +550,7 @@ std::shared_ptr<Transaction> Connector::allocateTransaction() {

//clean possible aborted tx
unsigned int txr = txNrBack;
unsigned int txSize = (txNrBack + MAX_TX_CNT - txNrFront) % MAX_TX_CNT;
unsigned int txSize = (txNrBack + MAX_TX_CNT - txNrBegin) % MAX_TX_CNT;
for (unsigned int i = 0; i < txSize; i++) {
txr = (txr + MAX_TX_CNT - 1) % MAX_TX_CNT; //decrement by 1

Expand All @@ -557,6 +566,9 @@ std::shared_ptr<Transaction> Connector::allocateTransaction() {
removed &= model.getTransactionStore()->remove(connectorId, txr);
}
if (removed) {
if (txNrFront == txNrBack) {
txNrFront = txr;
}
txNrBack = txr;
MO_DBG_WARN("deleted dangling silent or aborted tx for new transaction");
} else {
Expand All @@ -569,18 +581,18 @@ std::shared_ptr<Transaction> Connector::allocateTransaction() {
}
}

txSize = (txNrBack + MAX_TX_CNT - txNrFront) % MAX_TX_CNT; //refresh after cleaning txs
txSize = (txNrBack + MAX_TX_CNT - txNrBegin) % MAX_TX_CNT; //refresh after cleaning txs

//try to create new transaction
if (txSize < MO_TXRECORD_SIZE) {
tx = model.getTransactionStore()->createTransaction(connectorId);
tx = model.getTransactionStore()->createTransaction(connectorId, txNrBack);
}

if (!tx) {
//could not create transaction - now, try to replace tx history entry

unsigned int txl = txNrBegin;
txSize = (txNrBack + MAX_TX_CNT - txNrFront) % MAX_TX_CNT;
txSize = (txNrBack + MAX_TX_CNT - txNrBegin) % MAX_TX_CNT;

for (unsigned int i = 0; i < txSize; i++) {

Expand All @@ -593,7 +605,7 @@ std::shared_ptr<Transaction> Connector::allocateTransaction() {

auto txhist = model.getTransactionStore()->getTransaction(connectorId, txl);
//oldest entry, now check if it's history and can be removed or corrupted entry
if (!txhist || txhist->isCompleted() || txhist->isAborted()) {
if (!txhist || txhist->isCompleted() || txhist->isAborted() || (txhist->isSilent() && txhist->getStopSync().isRequested())) {
//yes, remove
bool removed = true;
if (auto mService = model.getMeteringService()) {
Expand All @@ -604,9 +616,13 @@ std::shared_ptr<Transaction> Connector::allocateTransaction() {
}
if (removed) {
txNrBegin = (txl + 1) % MAX_TX_CNT;
if (txNrFront == txl) {
txNrFront = txNrBegin;
}
MO_DBG_DEBUG("deleted tx history entry for new transaction");
MO_DBG_VERBOSE("txNrBegin=%u, txNrFront=%u, txNrBack=%u", txNrBegin, txNrFront, txNrBack);

tx = model.getTransactionStore()->createTransaction(connectorId);
tx = model.getTransactionStore()->createTransaction(connectorId, txNrBack);
} else {
MO_DBG_ERR("memory corruption");
break;
Expand All @@ -626,7 +642,7 @@ std::shared_ptr<Transaction> Connector::allocateTransaction() {
//couldn't create normal transaction -> check if to start charging without real transaction
if (silentOfflineTransactionsBool && silentOfflineTransactionsBool->getBool()) {
//try to handle charging session without sending StartTx or StopTx to the server
tx = model.getTransactionStore()->createTransaction(connectorId, true);
tx = model.getTransactionStore()->createTransaction(connectorId, txNrBack, true);

if (tx) {
MO_DBG_DEBUG("created silent transaction");
Expand All @@ -643,6 +659,12 @@ std::shared_ptr<Transaction> Connector::allocateTransaction() {
}
}

if (tx) {
txNrBack = (txNrBack + 1) % MAX_TX_CNT;
MO_DBG_DEBUG("advance txNrBack %u-%u", connectorId, txNrBack);
MO_DBG_VERBOSE("txNrBegin=%u, txNrFront=%u, txNrBack=%u", txNrBegin, txNrFront, txNrBack);
}

return tx;
}

Expand Down Expand Up @@ -1062,17 +1084,32 @@ unsigned int Connector::getFrontRequestOpNr() {

unsigned int txSize = (txNrBack + MAX_TX_CNT - txNrFront) % MAX_TX_CNT;

if (transactionFront && txSize == 0) {
//catch edge case where txBack has been rolled back and txFront was equal to txBack
MO_DBG_DEBUG("collect front transaction %u-%u after tx rollback", connectorId, transactionFront->getTxNr());
MO_DBG_VERBOSE("txNrBegin=%u, txNrFront=%u, txNrBack=%u", txNrBegin, txNrFront, txNrBack);
transactionFront = nullptr;
}

for (unsigned int i = 0; i < txSize; i++) {

if (!transactionFront) {
transactionFront = model.getTransactionStore()->getTransaction(connectorId, txNrFront);

#if MO_DBG_LEVEL >= MO_DL_VERBOSE
if (transactionFront)
{
MO_DBG_VERBOSE("load front transaction %u-%u", connectorId, transactionFront->getTxNr());
}
#endif
}

if (transactionFront && (transactionFront->isAborted() || transactionFront->isCompleted() || transactionFront->isSilent())) {
//advance front
MO_DBG_DEBUG("collect front transaction %u-%u", connectorId, transactionFront->getTxNr());
transactionFront.reset();
transactionFront = nullptr;
txNrFront = (txNrFront + 1) % MAX_TX_CNT;
MO_DBG_VERBOSE("txNrBegin=%u, txNrFront=%u, txNrBack=%u", txNrBegin, txNrFront, txNrBack);
} else {
//front is accurate. Done here
break;
Expand Down Expand Up @@ -1108,8 +1145,8 @@ std::unique_ptr<Request> Connector::fetchFrontRequest() {
cancelStartTx = true;
}

if ((int)transactionFront->getStartSync().getAttemptNr() > transactionMessageAttemptsInt->getInt()) {
MO_DBG_INFO("exceeded TransactionMessageAttempts. Discard transaction");
if ((int)transactionFront->getStartSync().getAttemptNr() >= transactionMessageAttemptsInt->getInt()) {
MO_DBG_WARN("exceeded TransactionMessageAttempts. Discard transaction");

cancelStartTx = true;
}
Expand Down Expand Up @@ -1139,7 +1176,6 @@ std::unique_ptr<Request> Connector::fetchFrontRequest() {
transactionFront->commit();

auto startTx = makeRequest(new Ocpp16::StartTransaction(model, transactionFront));
startTx->setTimeout(0);
startTx->setOnReceiveConfListener([this] (JsonObject response) {
//fetch authorization status from StartTransaction.conf() for user notification

Expand All @@ -1148,15 +1184,32 @@ std::unique_ptr<Request> Connector::fetchFrontRequest() {
updateTxNotification(TxNotification::DeAuthorized);
}
});
auto transactionFront_capture = transactionFront;
startTx->setOnAbortListener([this, transactionFront_capture] () {
//shortcut to the attemptNr check above. Relevant if other operations block the queue while this StartTx is timing out
if (transactionFront_capture && (int)transactionFront_capture->getStartSync().getAttemptNr() >= transactionMessageAttemptsInt->getInt()) {
MO_DBG_WARN("exceeded TransactionMessageAttempts. Discard transaction");

transactionFront_capture->setSilent();
transactionFront_capture->setInactive();
transactionFront_capture->commit();

//clean up possible tx records
if (auto mSerivce = model.getMeteringService()) {
mSerivce->removeTxMeterData(connectorId, transactionFront_capture->getTxNr());
}
//next getFrontRequestOpNr() call will collect transactionFront
}
});

return startTx;
}

if (transactionFront->getStopSync().isRequested() && !transactionFront->getStopSync().isConfirmed()) {
//send StopTx?

if ((int)transactionFront->getStopSync().getAttemptNr() > transactionMessageAttemptsInt->getInt()) {
MO_DBG_INFO("exceeded TransactionMessageAttempts. Discard transaction");
if ((int)transactionFront->getStopSync().getAttemptNr() >= transactionMessageAttemptsInt->getInt()) {
MO_DBG_WARN("exceeded TransactionMessageAttempts. Discard transaction");

transactionFront->setSilent();

Expand Down Expand Up @@ -1188,11 +1241,27 @@ std::unique_ptr<Request> Connector::fetchFrontRequest() {
std::unique_ptr<Request> stopTx;

if (stopTxData) {
stopTx = makeRequest(new Ocpp16::StopTransaction(model, std::move(transactionFront), stopTxData->retrieveStopTxData()));
stopTx = makeRequest(new Ocpp16::StopTransaction(model, transactionFront, stopTxData->retrieveStopTxData()));
} else {
stopTx = makeRequest(new Ocpp16::StopTransaction(model, std::move(transactionFront)));
stopTx = makeRequest(new Ocpp16::StopTransaction(model, transactionFront));
}
stopTx->setTimeout(0);
auto transactionFront_capture = transactionFront;
stopTx->setOnAbortListener([this, transactionFront_capture] () {
//shortcut to the attemptNr check above. Relevant if other operations block the queue while this StopTx is timing out
if ((int)transactionFront_capture->getStopSync().getAttemptNr() >= transactionMessageAttemptsInt->getInt()) {
MO_DBG_WARN("exceeded TransactionMessageAttempts. Discard transaction");

transactionFront_capture->setSilent();
transactionFront_capture->setInactive();
transactionFront_capture->commit();

//clean up possible tx records
if (auto mSerivce = model.getMeteringService()) {
mSerivce->removeTxMeterData(connectorId, transactionFront_capture->getTxNr());
}
//next getFrontRequestOpNr() call will collect transactionFront
}
});

return stopTx;
}
Expand Down
6 changes: 6 additions & 0 deletions src/MicroOcpp/Model/ConnectorBase/Connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@
#include <functional>
#include <memory>

#ifndef MO_TXRECORD_SIZE
#define MO_TXRECORD_SIZE 4 //no. of tx to hold on flash storage
#endif

#define MAX_TX_CNT 100000U //upper limit of txNr (internal usage). Must be at least 2*MO_TXRECORD_SIZE+1

#ifndef MO_REPORT_NOERROR
#define MO_REPORT_NOERROR 0
#endif
Expand Down
Loading

0 comments on commit a15feb4

Please sign in to comment.