diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index a99314f76..988c6a22e 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -98,7 +98,7 @@ jobs: cancel-in-progress: true services: postgres: - image: postgres + image: postgres:14.9-alpine env: POSTGRES_PASSWORD: postgres options: >- @@ -208,4 +208,4 @@ jobs: PACKAGECLOUD_API_KEY: ${{ secrets.PACKAGECLOUD_API_KEY }} run: | chmod +x ./.github/scripts/make_debian.sh - ./.github/scripts/make_debian.sh \ No newline at end of file + ./.github/scripts/make_debian.sh diff --git a/mobile_verifier/src/heartbeats/last_location.rs b/mobile_verifier/src/heartbeats/last_location.rs index cb6e45c76..1c478c370 100644 --- a/mobile_verifier/src/heartbeats/last_location.rs +++ b/mobile_verifier/src/heartbeats/last_location.rs @@ -5,7 +5,9 @@ use helium_crypto::PublicKeyBinary; use retainer::Cache; use sqlx::PgPool; -#[derive(sqlx::FromRow, Copy, Clone)] +use super::Heartbeat; + +#[derive(Debug, sqlx::FromRow, Copy, Clone, PartialEq)] pub struct LastLocation { pub location_validation_timestamp: DateTime, pub latest_timestamp: DateTime, @@ -28,9 +30,31 @@ impl LastLocation { } } - /// Calculates the duration from now in which last_valid_timestamp is 12 hours old - pub fn duration_to_expiration(&self) -> Duration { - ((self.latest_timestamp + Duration::hours(12)) - Utc::now()).max(Duration::zero()) + pub fn from_heartbeat( + heartbeat: &Heartbeat, + location_validation_timestamp: DateTime, + ) -> Self { + Self::new( + location_validation_timestamp, + heartbeat.timestamp, + heartbeat.lat, + heartbeat.lon, + ) + } + + fn still_valid(&self, heartbeat_timestamp: DateTime) -> bool { + let diff = heartbeat_timestamp - self.location_validation_timestamp; + diff <= Duration::hours(24) + } + + fn cache_expiration_duration(&self) -> Option { + // A validation_timestamp is valid for 24 hours past itself, + // but could still be in the past + let until = self.location_validation_timestamp + Duration::hours(24); + let diff = until - Utc::now(); + + // Converting to_ std() with a negative Duration casts to None + diff.to_std().ok() } } @@ -56,9 +80,36 @@ impl LocationCache { } } - async fn fetch_from_db_and_set( + pub async fn set(&self, hotspot: &PublicKeyBinary, last_location: LastLocation) { + self.cache_last_location(hotspot, Some(last_location)).await; + } + + pub async fn get( &self, hotspot: &PublicKeyBinary, + heartbeat_timestamp: DateTime, + ) -> anyhow::Result> { + let location = match self.locations.get(hotspot).await { + Some(last_location) => { + // The value may still be cached according to the system clock + // but not valid based on the time of the heartbeat in question. + let last = *last_location; + last.filter(|l| l.still_valid(heartbeat_timestamp)) + } + None => { + let last = self.fetch_from_db(hotspot, heartbeat_timestamp).await?; + self.cache_last_location(hotspot, last).await; + last + } + }; + + Ok(location) + } + + async fn fetch_from_db( + &self, + hotspot: &PublicKeyBinary, + heartbeat_timestamp: DateTime, ) -> anyhow::Result> { let last_location: Option = sqlx::query_as( r#" @@ -67,58 +118,203 @@ impl LocationCache { WHERE location_validation_timestamp IS NOT NULL AND latest_timestamp >= $1 AND hotspot_key = $2 + AND $3 - location_validation_timestamp <= INTERVAL '24 hours' ORDER BY latest_timestamp DESC LIMIT 1 "#, ) - .bind(Utc::now() - Duration::hours(12)) + .bind(Utc::now() - Duration::hours(24)) .bind(hotspot) + .bind(heartbeat_timestamp) .fetch_optional(&self.pool) .await?; - self.locations - .insert( - hotspot.clone(), - last_location, - last_location - .map(|x| x.duration_to_expiration()) - .unwrap_or_else(|| Duration::days(365)) - .to_std()?, - ) - .await; + Ok(last_location) } - pub async fn fetch_last_location( + async fn cache_last_location( &self, hotspot: &PublicKeyBinary, - ) -> anyhow::Result> { - Ok( - if let Some(last_location) = self.locations.get(hotspot).await { - *last_location - } else { - self.fetch_from_db_and_set(hotspot).await? - }, - ) + last_location: Option, + ) { + match location_with_expiration(last_location) { + Some((last, cache_duration)) => { + self.locations + .insert(hotspot.clone(), Some(last), cache_duration) + .await; + } + None => { + self.locations + .insert(hotspot.clone(), None, Duration::days(365).to_std().unwrap()) + .await; + } + } } - pub async fn set_last_location( - &self, + /// Only used for testing. + pub async fn delete_last_location(&self, hotspot: &PublicKeyBinary) { + self.locations.remove(hotspot).await; + } +} + +fn location_with_expiration( + last_location: Option, +) -> Option<(LastLocation, std::time::Duration)> { + let last = last_location?; + let cache_duration = last.cache_expiration_duration()?; + Some((last, cache_duration)) +} + +#[cfg(test)] +mod tests { + use super::*; + + use chrono::{Duration, DurationRound, Utc}; + use helium_crypto::PublicKeyBinary; + use sqlx::PgPool; + use uuid::Uuid; + + // Make sure test timestamps and DB timestamps have the same granularity. + // 6 decimal places. + fn nanos_trunc(ts: DateTime) -> DateTime { + ts.duration_trunc(Duration::nanoseconds(1000)).unwrap() + } + fn hour_trunc(ts: DateTime) -> DateTime { + ts.duration_trunc(Duration::hours(1)).unwrap() + } + + async fn insert_heartbeat( + pool: &PgPool, hotspot: &PublicKeyBinary, - last_location: LastLocation, + received_timestamp: DateTime, + validation_timestamp: DateTime, ) -> anyhow::Result<()> { - let duration_to_expiration = last_location.duration_to_expiration(); - self.locations - .insert( - hotspot.clone(), - Some(last_location), - duration_to_expiration.to_std()?, - ) - .await; + sqlx::query( + r#" + INSERT INTO wifi_heartbeats + ( + hotspot_key, location_validation_timestamp, latest_timestamp, + truncated_timestamp, coverage_object, + + -- hardcoded values + lat, lon, cell_type, distance_to_asserted, location_trust_score_multiplier + ) + VALUES + ( + $1, $2, $3, $4, $5, + + -- harcoded values + 0.0, 0.0, 'novagenericwifiindoor', 0, 1000 + ) + "#, + ) + .bind(hotspot) + .bind(nanos_trunc(validation_timestamp)) + .bind(nanos_trunc(received_timestamp)) + .bind(hour_trunc(received_timestamp)) + .bind(Uuid::new_v4()) + .execute(pool) + .await?; + Ok(()) } - /// Only used for testing. - pub async fn delete_last_location(&self, hotspot: &PublicKeyBinary) { - self.locations.remove(hotspot).await; + fn test_last_location( + latest_timestamp: DateTime, + location_validation_timestamp: DateTime, + ) -> LastLocation { + LastLocation { + location_validation_timestamp: nanos_trunc(location_validation_timestamp), + latest_timestamp: nanos_trunc(latest_timestamp), + lat: 0.0, + lon: 0.0, + } + } + + #[sqlx::test] + async fn test_invalid_validation_timestamp(pool: PgPool) -> anyhow::Result<()> { + let now = Utc::now(); + + let hotspot_one = PublicKeyBinary::from(vec![1]); + let stale_timestamp = now - Duration::hours(24) - Duration::seconds(1); + insert_heartbeat(&pool, &hotspot_one, now, stale_timestamp).await?; + + let hotspot_two = PublicKeyBinary::from(vec![2]); + let limit_timestamp = now - Duration::hours(24); + insert_heartbeat(&pool, &hotspot_two, now, limit_timestamp).await?; + + let hotspot_three = PublicKeyBinary::from(vec![3]); + let good_timestamp = now - Duration::hours(12); + insert_heartbeat(&pool, &hotspot_three, now, good_timestamp).await?; + + let cache = LocationCache::new(&pool); + assert_eq!( + None, + cache.get(&hotspot_one, now).await?, + "Invalid timestamp current" + ); + assert_eq!( + Some(test_last_location(now, limit_timestamp)), + cache.get(&hotspot_two, now).await?, + "Limit timestamp current" + ); + assert_eq!( + Some(test_last_location(now, good_timestamp)), + cache.get(&hotspot_three, now).await?, + "Good timestamp current" + ); + + // Moving an 1 day into the future should invalidate all timestamps + // regardless of what has already been cached. + let future = now + Duration::days(1); + assert_eq!( + None, + cache.get(&hotspot_one, future).await?, + "Invalid timestamp future" + ); + assert_eq!( + None, + cache.get(&hotspot_two, future).await?, + "Limit timestamp future" + ); + assert_eq!( + None, + cache.get(&hotspot_three, future).await?, + "Good timestamp future" + ); + + Ok(()) + } + + #[sqlx::test] + async fn will_not_cache_invalid_validation_timestamps(pool: PgPool) -> anyhow::Result<()> { + let cache = LocationCache::new(&pool); + + let now = Utc::now(); + let validation_timestamp = now - Duration::hours(25); + + let hotspot = PublicKeyBinary::from(vec![1]); + let invalid_location = test_last_location(now, validation_timestamp); + cache.set(&hotspot, invalid_location).await; + + assert_eq!(None, cache.get(&hotspot, now).await?); + + Ok(()) + } + + #[sqlx::test] + async fn will_cache_valid_validation_timestamps(pool: PgPool) -> anyhow::Result<()> { + let cache = LocationCache::new(&pool); + + let now = Utc::now(); + let validation_timestamp = now - Duration::hours(12); + + let hotspot = PublicKeyBinary::from(vec![1]); + let valid_location = test_last_location(now, validation_timestamp); + cache.set(&hotspot, valid_location).await; + + assert_eq!(Some(valid_location), cache.get(&hotspot, now).await?); + + Ok(()) } } diff --git a/mobile_verifier/src/heartbeats/mod.rs b/mobile_verifier/src/heartbeats/mod.rs index 989f9d187..0f6aed3b2 100644 --- a/mobile_verifier/src/heartbeats/mod.rs +++ b/mobile_verifier/src/heartbeats/mod.rs @@ -263,6 +263,11 @@ impl From for Heartbeat { impl From for Heartbeat { fn from(value: WifiHeartbeatIngestReport) -> Self { + let received_timestamp = value.received_timestamp; + let location_validation_timestamp = value + .report + .location_validation_timestamp + .filter(|ts| received_timestamp.signed_duration_since(ts) <= Duration::hours(24)); Self { hb_type: HbType::Wifi, coverage_object: value.report.coverage_object(), @@ -271,9 +276,9 @@ impl From for Heartbeat { operation_mode: value.report.operation_mode, lat: value.report.lat, lon: value.report.lon, - location_validation_timestamp: value.report.location_validation_timestamp, + location_validation_timestamp, location_source: value.report.location_source, - timestamp: value.received_timestamp, + timestamp: received_timestamp, } } } @@ -519,7 +524,7 @@ impl ValidatedHeartbeat { let is_valid = match heartbeat.location_validation_timestamp { None => { if let Some(last_location) = last_location_cache - .fetch_last_location(&heartbeat.hotspot_key) + .get(&heartbeat.hotspot_key, heartbeat.timestamp) .await? { heartbeat.lat = last_location.lat; @@ -535,16 +540,14 @@ impl ValidatedHeartbeat { } Some(location_validation_timestamp) => { last_location_cache - .set_last_location( + .set( &heartbeat.hotspot_key, - LastLocation::new( + LastLocation::from_heartbeat( + &heartbeat, location_validation_timestamp, - heartbeat.timestamp, - heartbeat.lat, - heartbeat.lon, ), ) - .await?; + .await; true } }; @@ -785,6 +788,7 @@ mod test { use crate::seniority::SeniorityUpdateAction; use super::*; + use file_store::wifi_heartbeat::WifiHeartbeat; use proto::SeniorityUpdateReason::*; fn heartbeat(timestamp: DateTime, coverage_object: Uuid) -> ValidatedHeartbeat { @@ -970,4 +974,46 @@ mod test { assert_eq!(seniority_action.action, SeniorityUpdateAction::NoAction); Ok(()) } + + #[test] + fn wifi_heartbeat_cannot_contain_validation_timestamp() { + fn make_report( + received: DateTime, + valid_at: Option>, + ) -> WifiHeartbeatIngestReport { + WifiHeartbeatIngestReport { + received_timestamp: received, + report: WifiHeartbeat { + pubkey: PublicKeyBinary::from(vec![0]), + lat: 0.0, + lon: 0.0, + operation_mode: true, + location_validation_timestamp: valid_at, + coverage_object: vec![], + timestamp: Utc::now(), + location_source: LocationSource::Skyhook, + }, + } + } + + let received = Utc::now(); + + // validation timestamp is invalid after 24 hours + let valid_at = received - Duration::hours(25); + let hb = Heartbeat::from(make_report(received, Some(valid_at))); + assert_eq!(None, hb.location_validation_timestamp); + + // 24 hours is inclusive + let valid_at = received - Duration::hours(24); + let hb = Heartbeat::from(make_report(received, Some(valid_at))); + assert_eq!(Some(valid_at), hb.location_validation_timestamp); + + // sanity check + let valid_at = received - Duration::hours(0); + let hb = Heartbeat::from(make_report(received, Some(valid_at))); + assert_eq!(Some(valid_at), hb.location_validation_timestamp); + + let hb = Heartbeat::from(make_report(received, None)); + assert_eq!(None, hb.location_validation_timestamp); + } } diff --git a/mobile_verifier/tests/integrations/last_location.rs b/mobile_verifier/tests/integrations/last_location.rs index c04d0678d..6a204e0a3 100644 --- a/mobile_verifier/tests/integrations/last_location.rs +++ b/mobile_verifier/tests/integrations/last_location.rs @@ -159,7 +159,7 @@ async fn heartbeat_will_use_last_good_location_from_db(pool: PgPool) -> anyhow:: } #[sqlx::test] -async fn heartbeat_does_not_use_last_good_location_when_more_than_12_hours( +async fn heartbeat_does_not_use_last_good_location_when_more_than_24_hours( pool: PgPool, ) -> anyhow::Result<()> { let hotspot = PublicKeyBinary::from_str(PUB_KEY)?; @@ -173,10 +173,13 @@ async fn heartbeat_does_not_use_last_good_location_when_more_than_12_hours( let coverage_object = coverage_object(&hotspot, &mut transaction).await?; transaction.commit().await?; + let location_validation_timestamp = Utc::now(); + let validated_heartbeat_1 = ValidatedHeartbeat::validate( heartbeat(&hotspot, &coverage_object) - .location_validation_timestamp(Utc::now()) - .timestamp(Utc::now() - Duration::hours(12) - Duration::seconds(1)) + .location_validation_timestamp(location_validation_timestamp) + // within the 24 hour window of validation timestamp + .timestamp(location_validation_timestamp - Duration::hours(24) + Duration::seconds(1)) .build(), &GatewayClientAllOwnersValid, &coverage_objects, @@ -194,6 +197,8 @@ async fn heartbeat_does_not_use_last_good_location_when_more_than_12_hours( let validated_heartbeat_2 = ValidatedHeartbeat::validate( heartbeat(&hotspot, &coverage_object) + // 24 hours past validation timestamp + .timestamp(location_validation_timestamp + Duration::hours(24) + Duration::seconds(1)) .latlng((0.0, 0.0)) .build(), &GatewayClientAllOwnersValid, diff --git a/mobile_verifier/tests/integrations/modeled_coverage.rs b/mobile_verifier/tests/integrations/modeled_coverage.rs index 3181b220b..eaddc2ff6 100644 --- a/mobile_verifier/tests/integrations/modeled_coverage.rs +++ b/mobile_verifier/tests/integrations/modeled_coverage.rs @@ -1476,7 +1476,7 @@ async fn ensure_lower_trust_score_for_distant_heartbeats(pool: PgPool) -> anyhow lon: latlng.lng(), lat: latlng.lat(), timestamp: DateTime::::MIN_UTC, - location_validation_timestamp: Some(DateTime::::MIN_UTC), + location_validation_timestamp: Some(Utc::now() - Duration::hours(23)), operation_mode: true, coverage_object: Vec::from(coverage_object_uuid.into_bytes()), location_source: LocationSource::Skyhook,