Skip to content

Commit

Permalink
Joining geometries and metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
stuartlynn committed Apr 29, 2024
1 parent ad0492f commit e27cae5
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 10 deletions.
22 changes: 14 additions & 8 deletions src/geo.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::collections::HashMap;
use flatgeobuf::{ FeatureProperties, HttpFgbReader, geozero};
use geozero::ToWkt;
use anyhow::Result;
use anyhow::{Context, Result};
use polars::{frame::DataFrame, prelude::NamedFrom, series::Series};
use crate::data_request_spec::BBox;

/// Function to request geometries from a remotly hosted FGB
Expand All @@ -11,8 +12,7 @@ use crate::data_request_spec::BBox;
/// `bbox`: an optional bounding box to filter the features by
///
/// Returns: a Result object containing a vector of (geometry, properties).
pub async fn get_geometries(file_url:&str, bbox:Option<&BBox>) -> Result<Vec<(String, HashMap<String,String>)>>{
println!("Getting geometries");
pub async fn get_geometries(file_url:&str, bbox:Option<&BBox>) -> Result<DataFrame>{
let fgb = HttpFgbReader::open(file_url)
.await?;

Expand All @@ -25,15 +25,21 @@ pub async fn get_geometries(file_url:&str, bbox:Option<&BBox>) -> Result<Vec<(St
.select_all()
.await?
};

let mut features = Vec::new();

let mut geoms: Vec<String> = vec![];
let mut ids: Vec<String> = vec![];

while let Some(feature) = fgb.next().await? {
let props = feature.properties()?;
let geom = feature.to_wkt()?;
features.push((geom, props));
geoms.push(feature.to_wkt()?);
let id = props.get("AFFGEOID").with_context(|| "failed to get id")?;
ids.push(id.clone());
}
Ok(features)

let ids = Series::new("GEOID",ids);
let geoms = Series::new("geometry", geoms);
let result = DataFrame::new(vec![ids,geoms])?;
Ok(result)
}

#[cfg(test)]
Expand Down
7 changes: 5 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use anyhow::Result;
use data_request_spec::DataRequestSpec;
use metadata::{load_metadata, SourceDataRelease};
use parquet::{get_metrics, MetricRequest};
use polars::frame::DataFrame;
use polars::{frame::DataFrame, prelude::DataFrameJoinOps};
use tokio::try_join;

use crate::geo::get_geometries;
Expand Down Expand Up @@ -35,6 +35,9 @@ impl Popgetter {
// try_from requires us to have the errors from all futures be the same.
// We use anyhow to get it back properly
let (metrics,geoms) = try_join!(async move { metrics.await.map_err(anyhow::Error::from)}, geoms)?;
metrics

let result =metrics?.left_join(&geoms,["GEO_ID"],["GEOID"])?;
Ok(result)
}
}

1 change: 1 addition & 0 deletions src/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ fn get_metrics_from_file(
pub fn get_metrics(metrics: &[MetricRequest], geo_ids: Option<&[&str]>) -> Result<DataFrame> {
let file_list: HashSet<String> = metrics.iter().map(|m| m.file.clone()).collect();

// TODO Can we do this async so we can be downloading results from each file together?
let dfs: Result<Vec<DataFrame>> = file_list
.iter()
.map(|file_url| {
Expand Down

0 comments on commit e27cae5

Please sign in to comment.