Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Have location tracking in mobile-config track #907

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ tokio-util = "0"
uuid = { version = "1", features = ["v4", "serde"] }
tower-http = { version = "0", features = ["trace"] }
derive_builder = "0"
csv = "*"

[patch.crates-io]
# v0.7.0-alpha.3
Expand Down
2 changes: 1 addition & 1 deletion file_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ tracing = { workspace = true }
chrono = { workspace = true }
helium-proto = {workspace = true}
helium-crypto = {workspace = true}
csv = "*"
csv = {workspace = true}
http = {workspace = true}
aws-config = "0.51"
aws-sdk-s3 = "0.21"
Expand Down
1 change: 1 addition & 0 deletions mobile_config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ task-manager = { path = "../task_manager" }
solana-sdk = { workspace = true }
custom-tracing = { path = "../custom_tracing", features = ["grpc"] }
coverage-map = { path = "../coverage_map" }
csv = {workspace = true}

[dev-dependencies]
rand = { workspace = true }
Expand Down
3 changes: 3 additions & 0 deletions mobile_config/migrations/8_mob_rad_track_add_location.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ALTER TABLE mobile_radio_tracker
ADD COLUMN asserted_location NUMERIC,
ADD COLUMN asserted_location_changed_at TIMESTAMPTZ;
38 changes: 27 additions & 11 deletions mobile_config/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,16 @@ use helium_proto::services::{
sub_dao::SubDaoServer,
};
use mobile_config::{
admin_service::AdminService, authorization_service::AuthorizationService,
carrier_service::CarrierService, entity_service::EntityService,
gateway_service::GatewayService, hex_boosting_service::HexBoostingService, key_cache::KeyCache,
mobile_radio_tracker::MobileRadioTracker, settings::Settings, sub_dao_service::SubDaoService,
admin_service::AdminService,
authorization_service::AuthorizationService,
carrier_service::CarrierService,
entity_service::EntityService,
gateway_service::GatewayService,
hex_boosting_service::HexBoostingService,
key_cache::KeyCache,
mobile_radio_tracker::{migrate_mobile_tracker_locations, MobileRadioTracker},
settings::Settings,
sub_dao_service::SubDaoService,
};
use std::{net::SocketAddr, path::PathBuf, time::Duration};
use task_manager::{ManagedTask, TaskManager};
Expand All @@ -36,21 +42,31 @@ pub struct Cli {
impl Cli {
pub async fn run(self) -> Result<()> {
let settings = Settings::new(self.config)?;
self.cmd.run(settings).await

match self.cmd {
Cmd::Server(daemon) => daemon.run(&settings).await,
Cmd::MigrateMobileTracker(csv_file) => {
let mobile_config_pool = settings.database.connect("mobile-config-store").await?;
let metadata_pool = settings.metadata.connect("mobile-config-metadata").await?;
sqlx::migrate!().run(&mobile_config_pool).await?;
migrate_mobile_tracker_locations(mobile_config_pool, metadata_pool, &csv_file.path)
.await?;
Ok(())
}
}
}
}

#[derive(Debug, clap::Subcommand)]
pub enum Cmd {
Server(Daemon),
// Oneshot command to migrate location data for mobile tracker
MigrateMobileTracker(CsvFile),
}

impl Cmd {
pub async fn run(&self, settings: Settings) -> Result<()> {
match self {
Self::Server(cmd) => cmd.run(&settings).await,
}
}
#[derive(Debug, clap::Args)]
pub struct CsvFile {
pub path: String,
}

#[derive(Debug, clap::Args)]
Expand Down
188 changes: 180 additions & 8 deletions mobile_config/src/mobile_radio_tracker.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{collections::HashMap, time::Duration};
use std::{collections::HashMap, str::FromStr, time::Duration};

use chrono::{DateTime, Utc};
use csv::Reader;
use futures::{Stream, StreamExt, TryFutureExt, TryStreamExt};
use sqlx::{Pool, Postgres, QueryBuilder};
use task_manager::ManagedTask;
Expand Down Expand Up @@ -77,6 +78,8 @@ pub struct TrackedMobileRadio {
pub hash: String,
pub last_changed_at: DateTime<Utc>,
pub last_checked_at: DateTime<Utc>,
pub asserted_location: Option<i64>,
pub asserted_location_changed_at: Option<DateTime<Utc>>,
}

impl TrackedMobileRadio {
Expand All @@ -86,17 +89,23 @@ impl TrackedMobileRadio {
hash: radio.hash(),
last_changed_at: radio.refreshed_at,
last_checked_at: Utc::now(),
asserted_location: radio.location,
asserted_location_changed_at: None,
}
}

fn update_from_radio(mut self, radio: &MobileRadio) -> Self {
let new_hash = radio.hash();
self.last_checked_at = Utc::now();
if self.hash != new_hash {
self.hash = new_hash;
self.last_changed_at = radio.refreshed_at;
}
if self.asserted_location != radio.location {
self.asserted_location = radio.location;
self.asserted_location_changed_at = Some(radio.refreshed_at);
}

self.last_checked_at = Utc::now();
self
}
}
Expand Down Expand Up @@ -131,7 +140,7 @@ impl MobileRadioTracker {
}

async fn run(self, mut shutdown: triggered::Listener) -> anyhow::Result<()> {
tracing::info!("starting");
tracing::info!("starting with interval: {:?}", self.interval);
let mut interval = tokio::time::interval(self.interval);

loop {
Expand Down Expand Up @@ -192,7 +201,9 @@ pub async fn get_tracked_radios(
entity_key,
hash,
last_changed_at,
last_checked_at
last_checked_at,
asserted_location::bigint,
asserted_location_changed_at
FROM mobile_radio_tracker
"#,
)
Expand Down Expand Up @@ -244,24 +255,28 @@ async fn update_tracked_radios(
) -> anyhow::Result<()> {
let mut txn = pool.begin().await?;

const BATCH_SIZE: usize = (u16::MAX / 4) as usize;
const BATCH_SIZE: usize = (u16::MAX / 6) as usize;

for chunk in tracked_radios.chunks(BATCH_SIZE) {
QueryBuilder::new(
"INSERT INTO mobile_radio_tracker(entity_key, hash, last_changed_at, last_checked_at)",
"INSERT INTO mobile_radio_tracker(entity_key, hash, last_changed_at, last_checked_at, asserted_location, asserted_location_changed_at)",
)
.push_values(chunk, |mut b, tracked_radio| {
b.push_bind(&tracked_radio.entity_key)
.push_bind(&tracked_radio.hash)
.push_bind(tracked_radio.last_changed_at)
.push_bind(tracked_radio.last_checked_at);
.push_bind(tracked_radio.last_checked_at)
.push_bind(tracked_radio.asserted_location)
.push_bind(tracked_radio.asserted_location_changed_at);
})
.push(
r#"
ON CONFLICT (entity_key) DO UPDATE SET
hash = EXCLUDED.hash,
last_changed_at = EXCLUDED.last_changed_at,
last_checked_at = EXCLUDED.last_checked_at
last_checked_at = EXCLUDED.last_checked_at,
asserted_location = EXCLUDED.asserted_location,
asserted_location_changed_at = EXCLUDED.asserted_location_changed_at
"#,
)
.build()
Expand All @@ -274,6 +289,106 @@ async fn update_tracked_radios(
Ok(())
}

// This function can be removed after migration is done.
// 1. Fill mobile_radio_tracker asserted_location from mobile_hotspot_infos
// 2. Read data from csv report. Fill mobile_radio_tracker.asserted_location_changed_at if location from csv and in mobile_hotspot_infos table matches
pub async fn migrate_mobile_tracker_locations(
mobile_config_pool: Pool<Postgres>,
metadata_pool: Pool<Postgres>,
csv_file_path: &str,
) -> anyhow::Result<()> {
// 1. Fill mobile_radio_tracker asserted_location from mobile_hotspot_infos
// get_all_mobile_radios
tracing::info!("Exporting data from mobile_hotspot_infos");
let mobile_infos = get_all_mobile_radios(&metadata_pool)
.filter(|v| futures::future::ready(v.location.is_some()))
.filter(|v| {
futures::future::ready(
v.num_location_asserts.is_some() && v.num_location_asserts.unwrap() > 0,
)
})
.collect::<Vec<_>>()
.await;

let mut txn = mobile_config_pool.begin().await?;

const BATCH_SIZE: usize = (u16::MAX / 3) as usize;

for chunk in mobile_infos.chunks(BATCH_SIZE) {
let mut query_builder = QueryBuilder::new(
"UPDATE mobile_radio_tracker AS mrt SET asserted_location = data.location
FROM ( ",
);

query_builder.push_values(chunk, |mut builder, mob_info| {
builder
.push_bind(mob_info.location)
.push_bind(&mob_info.entity_key);
});

query_builder.push(
") AS data(location, entity_key)
WHERE mrt.entity_key = data.entity_key",
);

let built = query_builder.build();
built.execute(&mut txn).await?;
}

// 2. Read data from csv report. Fill mobile_radio_tracker if and only if location from csv and in mobile_hotspot_infos table matches
let mobile_infos_map: HashMap<_, _> = mobile_infos
.into_iter()
.map(|v| (bs58::encode(v.entity_key.clone()).into_string(), v.location))
.collect();
tracing::info!("Exporting data from CSV");
let mut rdr = Reader::from_path(csv_file_path)?;
let headers = rdr.headers().unwrap().clone();
let pub_key_idx = headers.iter().position(|h| h == "public_key").unwrap();
let location_idx = headers.iter().position(|h| h == "h3").unwrap();
let time_idx = headers.iter().position(|h| h == "time").unwrap();
let mut mobile_infos_to_update: Vec<(EntityKey, DateTime<Utc>)> = vec![];

for record in rdr.records() {
let record = record?;
let pub_key: &str = record.get(pub_key_idx).unwrap();

if let Some(v) = mobile_infos_map.get(&String::from(pub_key)) {
let loc = i64::from_str_radix(record.get(location_idx).unwrap(), 16);
if v.unwrap() == loc.unwrap() {
let date: &str = record.get(time_idx).unwrap();
let date_time: DateTime<Utc> = DateTime::from_str(date).unwrap();
let entity_key = bs58::decode(pub_key.to_string()).into_vec()?;

mobile_infos_to_update.push((entity_key, date_time));
}
}
}

tracing::info!("Updating asserted_location_changed_at in db");
for chunk in mobile_infos_to_update.chunks(BATCH_SIZE) {
let mut query_builder = QueryBuilder::new(
"UPDATE mobile_radio_tracker AS mrt SET asserted_location_changed_at = data.asserted_location_changed_at
FROM ( ",
);

query_builder.push_values(chunk, |mut builder, mob_info| {
builder.push_bind(&mob_info.0).push_bind(mob_info.1);
});

query_builder.push(
") AS data(entity_key, asserted_location_changed_at)
WHERE mrt.entity_key = data.entity_key",
);

let built = query_builder.build();
built.execute(&mut txn).await?;
}

txn.commit().await?;

Ok(())
}

#[cfg(test)]
mod tests {

Expand Down Expand Up @@ -324,6 +439,63 @@ mod tests {
assert_eq!(radio.hash(), result[0].hash);
}

#[tokio::test]
async fn last_asserted_location_will_not_updated_if_nothing_changes() {
// location None
let mut radio = mobile_radio(vec![1, 2, 3]);
radio.location = None;
let tracked_radio = TrackedMobileRadio::new(&radio);
let mut tracked_radios = HashMap::new();
tracked_radios.insert(tracked_radio.entity_key.clone(), tracked_radio);

let result = identify_changes(stream::iter(vec![radio.clone()]), tracked_radios).await;

assert!(result[0].asserted_location_changed_at.is_none());
assert!(result[0].asserted_location.is_none());

// location is 1
let mut radio = mobile_radio(vec![1, 2, 3]);
radio.location = Some(1);
let tracked_radio = TrackedMobileRadio::new(&radio);
let mut tracked_radios = HashMap::new();
tracked_radios.insert(tracked_radio.entity_key.clone(), tracked_radio);

let result = identify_changes(stream::iter(vec![radio.clone()]), tracked_radios).await;
assert!(result[0].asserted_location_changed_at.is_none());
assert_eq!(result[0].asserted_location, Some(1));
}

#[tokio::test]
async fn will_update_last_asserted_location_changed_at_when_location_changes() {
let mut radio = mobile_radio(vec![1, 2, 3]);
radio.location = None;
let tracked_radio = TrackedMobileRadio::new(&radio);
radio.location = Some(1);

let mut tracked_radios = HashMap::new();
tracked_radios.insert(tracked_radio.entity_key.clone(), tracked_radio);

let result = identify_changes(stream::iter(vec![radio.clone()]), tracked_radios).await;

assert_eq!(
result[0].asserted_location_changed_at.unwrap(),
result[0].last_changed_at
);
assert_eq!(result[0].asserted_location.unwrap(), 1);

let tracked_radio = TrackedMobileRadio::new(&radio);
radio.location = Some(2);
let mut tracked_radios = HashMap::new();
tracked_radios.insert(tracked_radio.entity_key.clone(), tracked_radio);
let result = identify_changes(stream::iter(vec![radio.clone()]), tracked_radios).await;

assert_eq!(
result[0].asserted_location_changed_at.unwrap(),
result[0].last_changed_at,
);
assert_eq!(result[0].asserted_location.unwrap(), 2);
}

fn mobile_radio(entity_key: EntityKey) -> MobileRadio {
MobileRadio {
entity_key,
Expand Down