Skip to content

Commit

Permalink
Merge pull request #41 from UCSD-E4E/36-feat-implement-data-upload
Browse files Browse the repository at this point in the history
feat: Implement data upload
  • Loading branch information
ntlhui authored Feb 10, 2024
2 parents 99058cd + 683d59b commit 850a5ec
Show file tree
Hide file tree
Showing 17 changed files with 227 additions and 102 deletions.
6 changes: 5 additions & 1 deletion CONTRIBUTING
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,8 @@ Where C system headers are used, prefer the C style header over the C++ style he
## Return Values
Prefer returning error codes.

Error codes may be defined by documentation or by enumeration.
Error codes may be defined by documentation or by enumeration.

If returning a true or false value, prefer using `bool`.

`assert` is strictly prohibited in releasable code.
158 changes: 114 additions & 44 deletions src/cellular/dataUpload.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "encoding/base85.h"
#include "encoding/base64.h"

#include "consts.hpp"
#include "system.hpp"
#include "product.hpp"
#include "sleepTask.hpp"
Expand All @@ -14,77 +15,146 @@

void DataUpload::init(void)
{
SF_OSAL_printf("Entering SYSTEM_STATE_DATA_UPLOAD\n");
SF_OSAL_printf("Entering SYSTEM_STATE_DATA_UPLOAD" __NL__);

this->initSuccess = 0;
sf::cloud::wait_connect(5000);
this->initSuccess = 1;
if (sf::cloud::wait_connect(SF_CELL_SIGNAL_TIMEOUT_MS))
{
this->initSuccess = 0;
}
}

STATES_e DataUpload::run(void)
STATES_e DataUpload::can_upload(void)
{
sf::cloud::wait_connect(5000);


uint8_t dataEncodeBuffer[DATA_UPLOAD_MAX_BLOCK_LEN];
char dataPublishBuffer[DATA_UPLOAD_MAX_UPLOAD_LEN];
char publishName[DU_PUBLISH_ID_NAME_LEN + 1];
int nBytesToEncode;
size_t nBytesToSend;

if (!this->initSuccess)
if (!pSystemDesc->pRecorder->hasData())
{
SF_OSAL_printf("Failed to init\n");
return STATE_DEEP_SLEEP;
}

memset(dataEncodeBuffer, 0, DATA_UPLOAD_MAX_BLOCK_LEN);
nBytesToEncode = pSystemDesc->pRecorder->getLastPacket(dataEncodeBuffer, DATA_UPLOAD_MAX_BLOCK_LEN, publishName, DU_PUBLISH_ID_NAME_LEN);
if (-1 == nBytesToEncode)
if (!sf::cloud::is_connected())
{
SF_OSAL_printf("Failed to retrive data\n");
return STATE_CLI;
if (sf::cloud::wait_connect(SF_CELL_SIGNAL_TIMEOUT_MS))
{
// Lost connection and failed to reconnect
return STATE_DEEP_SLEEP;
}
}

SF_OSAL_printf("Publish ID: %s\n", publishName);
if (nBytesToEncode % 4 != 0)
if (pSystemDesc->pWaterSensor->getCurrentStatus())
{
nBytesToEncode += 4 - (nBytesToEncode % 4);
// In water
return STATE_DEPLOYED;
}
SF_OSAL_printf("Got %d bytes to encode\n", nBytesToEncode);

memset(dataPublishBuffer, 0, DATA_UPLOAD_MAX_UPLOAD_LEN);
nBytesToSend = DATA_UPLOAD_MAX_UPLOAD_LEN;
urlsafe_b64_encode(dataEncodeBuffer, nBytesToEncode, dataPublishBuffer, &nBytesToSend);

SF_OSAL_printf("Got %u bytes to upload\n", nBytesToSend);

if (!Particle.connected())
if (pSystemDesc->pBattery->getVCell() < SF_BATTERY_UPLOAD_VOLTAGE)
{
// we're not connected! abort
SF_OSAL_printf("not connected :(");
return STATE_CLI;
return STATE_DEEP_SLEEP;
}
// Don't change current state, continue looping
return STATE_UPLOAD;
}

SF_OSAL_printf("Data: %s", dataPublishBuffer);
STATES_e DataUpload::run(void)
{
uint8_t binary_packet_buffer[SF_PACKET_SIZE];
char ascii_record_buffer[SF_RECORD_SIZE + 1];
char publishName[DU_PUBLISH_ID_NAME_LEN + 1];
int nBytesToEncode;
size_t nBytesToSend;
STATES_e next_state;
int retval;

bool sucsess = Particle.publish(publishName, dataPublishBuffer);
if (!sucsess)
if (!this->initSuccess)
{
SF_OSAL_printf("Failed to upload data!\n");
SF_OSAL_printf("Failed to init\n");
return STATE_DEEP_SLEEP;
}

SF_OSAL_printf("Uploaded record %s\n", dataPublishBuffer);
Particle.process();

pSystemDesc->pRecorder->popLastPacket(nBytesToEncode);
while ((next_state = can_upload()) == STATE_UPLOAD)
{
memset(binary_packet_buffer, 0, SF_PACKET_SIZE);
nBytesToEncode = pSystemDesc->pRecorder->getLastPacket(binary_packet_buffer, SF_PACKET_SIZE, publishName, DU_PUBLISH_ID_NAME_LEN);
switch (nBytesToEncode)
{
case -2:
// We already know that there is data, but we aren't able to retrieve
// it. This can indicate recorder failure.
FLOG_AddError(FLOG_UPL_OPEN_FAIL, 0);
return STATE_DEEP_SLEEP;
case -1:
case -3:
// Either active session (bug) or buffer overflow (bug)
SF_OSAL_printf("Failed to retrieve data: %d" __NL__, nBytesToEncode);
return STATE_CLI;
default:
break;
}

SF_OSAL_printf("Publish ID: %s" __NL__, publishName);

// 32-bit alignment? 4 byte alignment?
if (nBytesToEncode % 4 != 0)
{
nBytesToEncode += 4 - (nBytesToEncode % 4);
}
SF_OSAL_printf("Got %d bytes to encode" __NL__, nBytesToEncode);

// Encode
memset(ascii_record_buffer, 0, SF_RECORD_SIZE + 1);
nBytesToSend = SF_RECORD_SIZE + 1;
if (retval = urlsafe_b64_encode(binary_packet_buffer, nBytesToEncode, ascii_record_buffer, &nBytesToSend))
{
// size limit violation is bug
SF_OSAL_printf("Failed to encode: %d" __NL__, retval);
return STATE_CLI;
}

SF_OSAL_printf("Got %u bytes to upload" __NL__, nBytesToSend);

SF_OSAL_printf("Data: %s" __NL__, ascii_record_buffer);

switch ((retval = sf::cloud::publish_blob(publishName, ascii_record_buffer)))
{
default:
case sf::cloud::OVERSIZE_DATA:
case sf::cloud::OVERSIZE_NAME:
SF_OSAL_printf("Failed to publish: %d" __NL__, retval);
return STATE_CLI;
case sf::cloud::NOT_CONNECTED:
FLOG_AddError(FLOG_UPL_CONNECT_FAIL, 1);
return STATE_DEEP_SLEEP;
case sf::cloud::PUBLISH_FAIL:
FLOG_AddError(FLOG_UPL_PUB_FAIL, 0);
return STATE_DEEP_SLEEP;
case sf::cloud::SUCCESS:
break;
}

SF_OSAL_printf("Uploaded record" __NL__);

Particle.process();

switch (pSystemDesc->pRecorder->popLastPacket(nBytesToEncode))
{
case -1:
case -2:
// This is a bug
SF_OSAL_printf("Session failure" __NL__);
return STATE_CLI;
case 0:
break;
}
}
return next_state;

return STATE_CLI;
}

void DataUpload::exit(void)
{
Cellular.off();
if (sf::cloud::wait_disconnect(5000))
{
FLOG_AddError(FLOG_CELL_DISCONN_FAIL, 0);
}
}

STATES_e DataUpload::exitState(void)
Expand Down
21 changes: 3 additions & 18 deletions src/cellular/dataUpload.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,7 @@
* RGB LED state is handled by the system theme.
*/

#if SF_UPLOAD_ENCODING == SF_UPLOAD_BASE85
/**
* How many bytes to store chunks of data in on the SPI flash.
*
* 496 * 5/4 (base85 encoding compression rate) = 620 which is less than the 622
* bytes which is the maximum size of publish events.
*/
#define DATA_UPLOAD_MAX_BLOCK_LEN 496
#elif SF_UPLOAD_ENCODING == SF_UPLOAD_BASE64 || SF_UPLOAD_ENCODING == SF_UPLOAD_BASE64URL
/**
* How many bytes to store chunks of data in on the SPI flash.
*
* 466 * 4/3 (base64 encoding compression rate) = 621 which is less than the 622
* bytes which is the maximum size of publish events.
*/
#define DATA_UPLOAD_MAX_BLOCK_LEN 466
#endif


/**
* @brief Number of bytes to buffer for upload
Expand All @@ -38,7 +22,7 @@
* @brief Minimum time between publish
*
*/
#define DATA_UPLOAD_MIN_PUBLISH_TIME_MS 1000
#define DATA_UPLOAD_MIN_PUBLISH_TIME_MS SF_UPLOAD_MS_PER_TRANSMIT

/**
* @brief Publish Event Name Length (not including NULL terminator)
Expand All @@ -63,5 +47,6 @@ class DataUpload : public Task{
int initSuccess;
system_tick_t lastConnectTime;
STATES_e exitState(void);
STATES_e can_upload(void);
};
#endif
2 changes: 1 addition & 1 deletion src/cellular/deploy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ int Deployment::open(const char* const name, Deployment::State_e state)
::close(this->currentFile);
}
#ifdef DEP_DEBUG
SF_OSAL_printf("opening!");
SF_OSAL_printf("opening %s!" __NL__, name);
#endif
switch (state)
{
Expand Down
31 changes: 16 additions & 15 deletions src/cellular/recorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -297,19 +297,19 @@ int Recorder::getLastPacket(void* pBuffer,
#ifdef REC_DEBUG
SF_OSAL_printf("Failed to open last session" __NL__);
#endif
return -1;
return -2;
}

current_length = session.getLength();
if (current_length % REC_MAX_PACKET_SIZE == 0)
if (current_length % SF_PACKET_SIZE == 0)
{
// only full packets available
bytes_to_read = REC_MAX_PACKET_SIZE;
bytes_to_read = SF_PACKET_SIZE;
}
else
{
// partial packet
bytes_to_read = current_length % REC_MAX_PACKET_SIZE;
bytes_to_read = current_length % SF_PACKET_SIZE;
}

#ifdef REC_DEBUG
Expand All @@ -321,14 +321,14 @@ int Recorder::getLastPacket(void* pBuffer,
#ifdef REC_DEBUG
SF_OSAL_printf("Buffer overflow!" __NL__);
#endif
return -1;
return -3;
}

newLength = current_length - bytes_to_read;
session.seek(newLength);
bytesRead = session.read(pBuffer, bytes_to_read);
snprintf((char*)pName, nameLen, "Sfin-%s-%s-%d", pSystemDesc->deviceID,
name, newLength / REC_MAX_PACKET_SIZE);
name, newLength / SF_PACKET_SIZE);
session.close();
return bytesRead;
}
Expand All @@ -337,7 +337,8 @@ int Recorder::getLastPacket(void* pBuffer,
* @brief Trims the last block with specified length from the recorder
*
* @param len Length of block to trim
* @return int 1 if successful, otherwise 0
* @return int 0 if successful, otherwise error code. -1 if another session is
* already open. -2 if previous session unopenable.
*/
int Recorder::popLastPacket(size_t len)
{
Expand All @@ -362,19 +363,19 @@ int Recorder::popLastPacket(size_t len)
#ifdef REC_DEBUG
SF_OSAL_printf("Failed to open last session" __NL__);
#endif
return -1;
return -2;
}

current_length = session.getLength();
if (current_length % REC_MAX_PACKET_SIZE == 0)
if (current_length % SF_PACKET_SIZE == 0)
{
// only full packets available
bytes_to_pop = REC_MAX_PACKET_SIZE;
bytes_to_pop = SF_PACKET_SIZE;
}
else
{
// partial packet
bytes_to_pop = current_length % REC_MAX_PACKET_SIZE;
bytes_to_pop = current_length % SF_PACKET_SIZE;
}

newLength = current_length - bytes_to_pop;
Expand All @@ -388,7 +389,7 @@ int Recorder::popLastPacket(size_t len)
session.truncate(newLength);
session.close();
}
return 1;
return 0;
}

int Recorder::setSessionTime(uint32_t session_time)
Expand Down Expand Up @@ -473,16 +474,16 @@ int Recorder::putBytes(const void* pData, size_t nBytes)
FLOG_AddError(FLOG_REC_SESSION_CLOSED, 0);
return 1;
}
if (nBytes > (REC_MAX_PACKET_SIZE - this->dataIdx))
if (nBytes > (SF_PACKET_SIZE - this->dataIdx))
{
// data will not fit, flush and clear
// pad 0
for (; this->dataIdx < REC_MAX_PACKET_SIZE; this->dataIdx++)
for (; this->dataIdx < SF_PACKET_SIZE; this->dataIdx++)
{
this->dataBuffer[this->dataIdx] = 0;
}
// SF_OSAL_printf("Flushing\n");
this->pSession->write(this->dataBuffer, REC_MAX_PACKET_SIZE);
this->pSession->write(this->dataBuffer, SF_PACKET_SIZE);

memset(this->dataBuffer, 0, REC_MEMORY_BUFFER_SIZE);
this->dataIdx = 0;
Expand Down
Loading

0 comments on commit 850a5ec

Please sign in to comment.