Skip to content

Commit

Permalink
Finish migration
Browse files Browse the repository at this point in the history
  • Loading branch information
kurotych committed Feb 5, 2025
1 parent 05bcb21 commit aa74962
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 10 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ tokio-util = "0"
uuid = { version = "1", features = ["v4", "serde"] }
tower-http = { version = "0", features = ["trace"] }
derive_builder = "0"
csv = "*"

[patch.crates-io]
# v0.7.0-alpha.3
Expand Down
2 changes: 1 addition & 1 deletion file_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ tracing = { workspace = true }
chrono = { workspace = true }
helium-proto = {workspace = true}
helium-crypto = {workspace = true}
csv = "*"
csv = {workspace = true}
http = {workspace = true}
aws-config = "0.51"
aws-sdk-s3 = "0.21"
Expand Down
1 change: 1 addition & 0 deletions mobile_config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ task-manager = { path = "../task_manager" }
solana-sdk = { workspace = true }
custom-tracing = { path = "../custom_tracing", features = ["grpc"] }
coverage-map = { path = "../coverage_map" }
csv = {workspace = true}

[dev-dependencies]
rand = { workspace = true }
Expand Down
1 change: 0 additions & 1 deletion mobile_config/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ impl Cli {
let mobile_config_pool = settings.database.connect("mobile-config-store").await?;
let metadata_pool = settings.metadata.connect("mobile-config-metadata").await?;
sqlx::migrate!().run(&mobile_config_pool).await?;
dbg!(&csv_file.path);
migrate_mobile_tracker_locations(mobile_config_pool, metadata_pool, &csv_file.path)
.await?;
Ok(())
Expand Down
70 changes: 62 additions & 8 deletions mobile_config/src/mobile_radio_tracker.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use std::{collections::HashMap, time::Duration};
use std::{collections::HashMap, str::FromStr, time::Duration};

use chrono::{DateTime, Utc};
use csv::Reader;
use futures::{Stream, StreamExt, TryFutureExt, TryStreamExt};
use sqlx::{Pool, Postgres, QueryBuilder};
use task_manager::ManagedTask;

use crate::gateway_info::db::all_info_stream;

type EntityKey = Vec<u8>;

#[derive(Debug, Clone, sqlx::FromRow)]
Expand Down Expand Up @@ -290,19 +289,24 @@ async fn update_tracked_radios(
Ok(())
}

// This function can be removed after migration is done.
// 1. Fill mobile_radio_tracker asserted_location from mobile_hotspot_infos
// 2. Update mobile_tracker.asserted_location_changed_at from mobile_hotspot_infos for num_location_assert = 1
// 3. Read data from csv report. Fill mobile_radio_tracker if and only if location from csv and in mobile_hotspot_infos table matches
// 2. Read data from csv report. Fill mobile_radio_tracker if and only if location from csv and in mobile_hotspot_infos table matches
pub async fn migrate_mobile_tracker_locations(
mobile_config_pool: Pool<Postgres>,
metadata_pool: Pool<Postgres>,
_csv_file_path: &str,
csv_file_path: &str,
) -> anyhow::Result<()> {
// 1. Fill mobile_radio_tracker asserted_location from mobile_hotspot_infos
// get_all_mobile_radios
// let mobiles = get_all_mobile_radios(&metadata_pool).awa;
tracing::info!("Exporting data from mobile_hotspot_infos");
let mobile_infos = get_all_mobile_radios(&metadata_pool)
.filter(|v| futures::future::ready(v.location.is_some()))
.filter(|v| {
futures::future::ready(
v.num_location_asserts.is_some() && v.num_location_asserts.unwrap() > 0,
)
})
.collect::<Vec<_>>()
.await;

Expand All @@ -318,7 +322,7 @@ pub async fn migrate_mobile_tracker_locations(

query_builder.push_values(chunk, |mut builder, mob_info| {
builder
.push_bind(&mob_info.location)
.push_bind(mob_info.location)
.push_bind(&mob_info.entity_key);
});

Expand All @@ -330,6 +334,56 @@ pub async fn migrate_mobile_tracker_locations(
let built = query_builder.build();
built.execute(&mut txn).await?;
}

// 2. Read data from csv report. Fill mobile_radio_tracker if and only if location from csv and in mobile_hotspot_infos table matches
let mobile_infos_map: HashMap<_, _> = mobile_infos
.into_iter()
.map(|v| (bs58::encode(v.entity_key.clone()).into_string(), v.location))
.collect();
tracing::info!("Exporting data from CSV");
let mut rdr = Reader::from_path(csv_file_path)?;
let headers = rdr.headers().unwrap().clone();
let pub_key_idx = headers.iter().position(|h| h == "public_key").unwrap();
let location_idx = headers.iter().position(|h| h == "h3").unwrap();
let time_idx = headers.iter().position(|h| h == "time").unwrap();
let mut mobile_infos_to_update: Vec<(EntityKey, DateTime<Utc>)> = vec![];

for record in rdr.records() {
let record = record?;
let pub_key: &str = record.get(pub_key_idx).unwrap();

if let Some(v) = mobile_infos_map.get(&String::from(pub_key)) {
let loc = i64::from_str_radix(record.get(location_idx).unwrap(), 16);
if v.unwrap() == loc.unwrap() {
let date: &str = record.get(time_idx).unwrap();
let date_time: DateTime<Utc> = DateTime::from_str(date).unwrap();
let entity_key = bs58::decode(pub_key.to_string()).into_vec()?;

mobile_infos_to_update.push((entity_key, date_time));
}
}
}

tracing::info!("Updating asserted_location_changed_at in db");
for chunk in mobile_infos_to_update.chunks(BATCH_SIZE) {
let mut query_builder = QueryBuilder::new(
"UPDATE mobile_radio_tracker AS mrt SET asserted_location_changed_at = data.asserted_location_changed_at
FROM ( ",
);

query_builder.push_values(chunk, |mut builder, mob_info| {
builder.push_bind(&mob_info.0).push_bind(mob_info.1);
});

query_builder.push(
") AS data(entity_key, asserted_location_changed_at)
WHERE mrt.entity_key = data.entity_key",
);

let built = query_builder.build();
built.execute(&mut txn).await?;
}

txn.commit().await?;

Ok(())
Expand Down

0 comments on commit aa74962

Please sign in to comment.