Skip to content

Commit

Permalink
feat: update kit_last_seen on ingesting measurement
Browse files Browse the repository at this point in the history
  • Loading branch information
tomcur committed Mar 18, 2024
1 parent a110522 commit ea6f0af
Showing 1 changed file with 21 additions and 0 deletions.
21 changes: 21 additions & 0 deletions astroplant-mqtt-ingest/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub(crate) struct Db {
get_config_and_kit: Statement,
insert_raw_measurement: Statement,
insert_aggregate_measurement: Statement,
upsert_kit_last_seen: Statement,
}

const GET_CONFIG_AND_KIT: &str = "
Expand All @@ -46,6 +47,13 @@ const INSERT_AGGREGATE_MEASUREMENT: &str = "
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
";

const UPSERT_KIT_LAST_SEEN: &str = "
INSERT INTO kit_last_seen (kit_id, datetime_last_seen)
VALUES ($1, NOW())
ON CONFLICT (kit_id) DO UPDATE
SET datetime_last_seen = EXCLUDED.datetime_last_seen
";

impl Db {
pub(crate) async fn new(client: Client) -> anyhow::Result<Self> {
let get_config_and_kit = client
Expand Down Expand Up @@ -83,12 +91,17 @@ impl Db {
)
.await?;

let upsert_kit_last_seen = client
.prepare_typed(UPSERT_KIT_LAST_SEEN, &[Type::INT4])
.await?;

let db = Self {
config_cache: RefCell::new(HashMap::new()),
client,
get_config_and_kit,
insert_raw_measurement,
insert_aggregate_measurement,
upsert_kit_last_seen,
};
Ok(db)
}
Expand Down Expand Up @@ -171,6 +184,10 @@ impl Db {
)
.await?;

self.client
.query(&self.upsert_kit_last_seen, &[&config.kit_id])
.await?;

tracing::trace!(
"Inserted raw measurement {} of kit {} and peripheral {}",
raw.id,
Expand Down Expand Up @@ -211,6 +228,10 @@ impl Db {
)
.await?;

self.client
.query(&self.upsert_kit_last_seen, &[&config.kit_id])
.await?;

tracing::trace!(
"Inserted aggregate measurement {} of kit {} and peripheral {}",
raw.id,
Expand Down

0 comments on commit ea6f0af

Please sign in to comment.