Skip to content

Commit

Permalink
Digital Twin Graph
Browse files Browse the repository at this point in the history
  • Loading branch information
ashbeitz committed May 8, 2024
1 parent 0b5b675 commit 56ab4b9
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 143 deletions.
52 changes: 29 additions & 23 deletions core/module/digital_twin_graph/src/digital_twin_graph_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use core_protobuf_data_access::module::digital_twin_graph::v1::{
};
use core_protobuf_data_access::module::digital_twin_registry::v1::digital_twin_registry_client::DigitalTwinRegistryClient;
use core_protobuf_data_access::module::digital_twin_registry::v1::{
EndpointInfo, FindByInstanceIdRequest, FindByInstanceIdResponse, FindByModelIdRequest,
EntityAccessInfo, FindByInstanceIdRequest, FindByInstanceIdResponse, FindByModelIdRequest,
FindByModelIdResponse,
};
use log::{debug, warn};
Expand Down Expand Up @@ -76,7 +76,7 @@ impl DigitalTwinGraphImpl {
model_id: &str,
protocol: &str,
operations: &[String],
) -> Result<Vec<EndpointInfo>, tonic::Status> {
) -> Result<Vec<EntityAccessInfo>, tonic::Status> {
// Define the retry strategy.
let retry_strategy = ExponentialBackoff::from_millis(Self::BACKOFF_BASE_DURATION_IN_MILLIS)
.map(jitter) // add jitter to delays
Expand All @@ -99,11 +99,11 @@ impl DigitalTwinGraphImpl {
Ok(response
.entity_access_info_list
.iter()
.flat_map(|entity_access_info| entity_access_info.endpoint_info_list.clone())
.filter(|endpoint_info| {
endpoint_info.protocol == protocol
&& is_subset(operations, &endpoint_info.operations)
.filter(|entity_access_info| {
entity_access_info.protocol == protocol
&& is_subset(operations, &entity_access_info.operations)
})
.cloned()
.collect())
}

Expand All @@ -118,7 +118,7 @@ impl DigitalTwinGraphImpl {
instance_id: &str,
protocol: &str,
operations: &[String],
) -> Result<Vec<EndpointInfo>, tonic::Status> {
) -> Result<Vec<EntityAccessInfo>, tonic::Status> {
// Define the retry strategy.
let retry_strategy = ExponentialBackoff::from_millis(Self::BACKOFF_BASE_DURATION_IN_MILLIS)
.map(jitter) // add jitter to delays
Expand All @@ -142,11 +142,11 @@ impl DigitalTwinGraphImpl {
Ok(response
.entity_access_info_list
.iter()
.flat_map(|entity_access_info| entity_access_info.endpoint_info_list.clone())
.filter(|endpoint_info| {
endpoint_info.protocol == protocol
&& is_subset(operations, &endpoint_info.operations)
.filter(|entity_access_info| {
entity_access_info.protocol == protocol
&& is_subset(operations, &entity_access_info.operations)
})
.cloned()
.collect())
}

Expand Down Expand Up @@ -243,7 +243,7 @@ impl DigitalTwinGraph for DigitalTwinGraphImpl {
debug!("Received a find request for model id {model_id}");

// Retrieve the provider details.
let provider_endpoint_info_list = self
let provider_entity_access_info_list = self
.find_digital_twin_providers_with_model_id(
model_id.as_str(),
digital_twin_protocol::GRPC,
Expand All @@ -252,17 +252,23 @@ impl DigitalTwinGraph for DigitalTwinGraphImpl {
.await?;

// Build a map of instance id to its associated endpoint infos.
let instance_provider_map: std::collections::HashMap<String, Vec<EndpointInfo>> =
provider_endpoint_info_list
let instance_provider_map: std::collections::HashMap<String, Vec<EntityAccessInfo>> =
provider_entity_access_info_list
.iter()
.map(|provider_endpoint_info| {
(provider_endpoint_info.context.clone(), provider_endpoint_info.clone())
.map(|provider_entity_access_info| {
(
provider_entity_access_info.instance_id.clone(),
provider_entity_access_info.clone(),
)
})
.fold(
// fold is used to group the endpoint infos by instance id.
std::collections::HashMap::new(),
|mut accumulator, (instance_id, endpoint_info)| {
accumulator.entry(instance_id).or_insert_with(Vec::new).push(endpoint_info);
|mut accumulator, (instance_id, entity_access_info)| {
accumulator
.entry(instance_id)
.or_insert_with(Vec::new)
.push(entity_access_info);
accumulator
},
);
Expand All @@ -271,10 +277,10 @@ impl DigitalTwinGraph for DigitalTwinGraphImpl {

for instance_id in instance_provider_map.keys() {
// We will only use the first provider. For a high availability scenario, we can try multiple providers.
let provider_endpoint_info = &instance_provider_map[instance_id][0];
let provider_entity_access_info = &instance_provider_map[instance_id][0];

let provider_uri = provider_endpoint_info.uri.clone();
let instance_id = provider_endpoint_info.context.clone();
let provider_uri = provider_entity_access_info.uri.clone();
let instance_id = provider_entity_access_info.instance_id.clone();

let tx = self.tx.clone();
let mut rx = tx.subscribe();
Expand Down Expand Up @@ -347,7 +353,7 @@ impl DigitalTwinGraph for DigitalTwinGraphImpl {
let provider_endpoint_info = &provider_endpoint_info_list[0];

let provider_uri = provider_endpoint_info.uri.clone();
let instance_id = provider_endpoint_info.context.clone();
let instance_id = provider_endpoint_info.instance_id.clone();

let tx = self.tx.clone();
let mut rx = tx.subscribe();
Expand Down Expand Up @@ -429,7 +435,7 @@ impl DigitalTwinGraph for DigitalTwinGraphImpl {
let provider_endpoint_info = &provider_endpoint_info_list[0];

let provider_uri = provider_endpoint_info.uri.clone();
let instance_id = provider_endpoint_info.context.clone();
let instance_id = provider_endpoint_info.instance_id.clone();

let tx = self.tx.clone();
let mut rx = tx.subscribe();
Expand Down
108 changes: 44 additions & 64 deletions core/module/digital_twin_registry/src/digital_twin_registry_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ extern crate iref;

use core_protobuf_data_access::module::digital_twin_registry::v1::digital_twin_registry_server::DigitalTwinRegistry;
use core_protobuf_data_access::module::digital_twin_registry::v1::{
EndpointInfo, EntityAccessInfo, FindByInstanceIdRequest, FindByInstanceIdResponse,
FindByModelIdRequest, FindByModelIdResponse, RegisterRequest, RegisterResponse,
EntityAccessInfo, FindByInstanceIdRequest, FindByInstanceIdResponse, FindByModelIdRequest,
FindByModelIdResponse, RegisterRequest, RegisterResponse,
};
use log::{debug, info};
use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
Expand Down Expand Up @@ -69,43 +69,29 @@ impl DigitalTwinRegistry for DigitalTwinRegistryImpl {

debug!("Received a find_by_instance_id request for instance id {instance_id}");

let mut new_entity_access_info_list = Vec::<EntityAccessInfo>::new();
let mut matching_entity_access_info_list = Vec::<EntityAccessInfo>::new();

// This block controls the lifetime of the lock.
{
let lock: RwLockReadGuard<HashMap<String, Vec<EntityAccessInfo>>> =
self.entity_access_info_map.read();
for entity_access_info_list in lock.values() {
for entity_access_info in entity_access_info_list {
let mut instance_found: bool = false;
let mut new_endpoint_info_list: Vec<EndpointInfo> = Vec::new();
for endpoint_info_ in entity_access_info.endpoint_info_list.iter() {
if endpoint_info_.context == instance_id {
instance_found = true;
}
new_endpoint_info_list.push(endpoint_info_.clone());
}
if instance_found {
let new_entity_access_info = EntityAccessInfo {
name: entity_access_info.name.clone(),
id: entity_access_info.id.clone(),
description: entity_access_info.description.clone(),
endpoint_info_list: new_endpoint_info_list,
};
new_entity_access_info_list.push(new_entity_access_info);
if entity_access_info.instance_id == instance_id {
matching_entity_access_info_list.push(entity_access_info.clone());
}
}
}
}

if new_entity_access_info_list.is_empty() {
if matching_entity_access_info_list.is_empty() {
return Err(Status::not_found(
"Unable to find any entities with instance id {instance_id}",
));
}

let response =
FindByInstanceIdResponse { entity_access_info_list: new_entity_access_info_list };
FindByInstanceIdResponse { entity_access_info_list: matching_entity_access_info_list };

debug!("Completed the find_by_instance_id request.");

Expand All @@ -125,11 +111,14 @@ impl DigitalTwinRegistry for DigitalTwinRegistryImpl {
for entity_access_info in &request_inner.entity_access_info_list {
self.register_entity(entity_access_info.clone()).map_err(|e| {
Status::internal(format!(
"Failed to register the entity: {}, error: {}",
entity_access_info.id, e
"Failed to register the entity with instance id: {}, error: {}",
entity_access_info.instance_id, e
))
})?;
info!("Registered the entity: {}", entity_access_info.id);
info!(
"Registered the entity with model id: {} and instance id: {}",
entity_access_info.model_id, entity_access_info.instance_id
);
}

let response = RegisterResponse {};
Expand All @@ -150,18 +139,23 @@ impl DigitalTwinRegistryImpl {
{
let mut lock: RwLockWriteGuard<HashMap<String, Vec<EntityAccessInfo>>> =
self.entity_access_info_map.write();
let get_result = lock.get(&entity_access_info.id);
let get_result = lock.get(&entity_access_info.model_id);
match get_result {
Some(_) => {
info!(
"Registered another entity access info for entity {}",
&entity_access_info.id
&entity_access_info.model_id
);
lock.get_mut(&entity_access_info.id).unwrap().push(entity_access_info.clone());
lock.get_mut(&entity_access_info.model_id)
.unwrap()
.push(entity_access_info.clone());
}
None => {
info!("Registered entity {}", &entity_access_info.id);
lock.insert(entity_access_info.id.clone(), vec![entity_access_info.clone()]);
info!("Registered entity {}", &entity_access_info.model_id);
lock.insert(
entity_access_info.model_id.clone(),
vec![entity_access_info.clone()],
);
}
};
}
Expand All @@ -178,20 +172,16 @@ mod digital_twin_registry_impl_tests {
async fn find_by_model_id_test() {
let operations = vec![String::from("Subscribe"), String::from("Unsubscribe")];

let endpoint_info = EndpointInfo {
let entity_access_info = EntityAccessInfo {
provider_id: String::from("test-provider"),
instance_id: String::from("1234567890"),
model_id: String::from("dtmi:sdv:hvac:ambient_air_temperature;1"),
protocol: String::from("grpc"),
uri: String::from("http://[::1]:40010"), // Devskim: ignore DS137138
context: String::from("1234567890"),
context: String::from(""),
operations,
};

let entity_access_info = EntityAccessInfo {
name: String::from("AmbientAirTemperature"),
id: String::from("dtmi:sdv:hvac:ambient_air_temperature;1"),
description: String::from("Ambient air temperature"),
endpoint_info_list: vec![endpoint_info],
};

let entity_access_info_map = Arc::new(RwLock::new(HashMap::new()));

let digital_twin_registry_impl =
Expand All @@ -201,7 +191,7 @@ mod digital_twin_registry_impl_tests {
{
let mut lock: RwLockWriteGuard<HashMap<String, Vec<EntityAccessInfo>>> =
entity_access_info_map.write();
lock.insert(entity_access_info.id.clone(), vec![entity_access_info.clone()]);
lock.insert(entity_access_info.model_id.clone(), vec![entity_access_info.clone()]);
}

let request = tonic::Request::new(FindByModelIdRequest {
Expand All @@ -216,10 +206,9 @@ mod digital_twin_registry_impl_tests {

let response_entity_access_info = response_inner.entity_access_info_list[0].clone();

assert_eq!(response_entity_access_info.id, "dtmi:sdv:hvac:ambient_air_temperature;1");
assert_eq!(response_entity_access_info.endpoint_info_list.len(), 1);
assert_eq!(response_entity_access_info.model_id, "dtmi:sdv:hvac:ambient_air_temperature;1");
assert_eq!(
response_entity_access_info.endpoint_info_list[0].uri,
response_entity_access_info.uri,
"http://[::1]:40010" // Devskim: ignore DS137138
);
}
Expand All @@ -228,20 +217,16 @@ mod digital_twin_registry_impl_tests {
async fn find_by_instance_id_test() {
let operations = vec![String::from("Subscribe"), String::from("Unsubscribe")];

let endpoint_info = EndpointInfo {
let entity_access_info = EntityAccessInfo {
provider_id: String::from("test-provider"),
instance_id: String::from("1234567890"),
model_id: String::from("dtmi:sdv:hvac:ambient_air_temperature;1"),
protocol: String::from("grpc"),
uri: String::from("http://[::1]:40010"), // Devskim: ignore DS137138
context: String::from("1234567890"),
context: String::from(""),
operations,
};

let entity_access_info = EntityAccessInfo {
name: String::from("AmbientAirTemperature"),
id: String::from("dtmi:sdv:hvac:ambient_air_temperature;1"),
description: String::from("Ambient air temperature"),
endpoint_info_list: vec![endpoint_info],
};

let entity_access_info_map = Arc::new(RwLock::new(HashMap::new()));

let digital_twin_registry_impl =
Expand All @@ -251,7 +236,7 @@ mod digital_twin_registry_impl_tests {
{
let mut lock: RwLockWriteGuard<HashMap<String, Vec<EntityAccessInfo>>> =
entity_access_info_map.write();
lock.insert(entity_access_info.id.clone(), vec![entity_access_info.clone()]);
lock.insert(entity_access_info.model_id.clone(), vec![entity_access_info.clone()]);
}

let request = tonic::Request::new(FindByInstanceIdRequest {
Expand All @@ -266,30 +251,25 @@ mod digital_twin_registry_impl_tests {

let response_entity_access_info = response_inner.entity_access_info_list[0].clone();

assert_eq!(response_entity_access_info.endpoint_info_list[0].context, "1234567890");
assert_eq!(response_entity_access_info.endpoint_info_list.len(), 1);
assert_eq!(response_entity_access_info.instance_id, "1234567890");
assert_eq!(
response_entity_access_info.endpoint_info_list[0].uri,
response_entity_access_info.uri,
"http://[::1]:40010" // Devskim: ignore DS137138
);
}

#[tokio::test]
async fn register_test() {
let endpoint_info = EndpointInfo {
let entity_access_info = EntityAccessInfo {
provider_id: String::from("test-provider"),
instance_id: String::from("1234567890"),
model_id: String::from("dtmi:sdv:hvac:ambient_air_temperature;1"),
protocol: String::from("grpc"),
uri: String::from("http://[::1]:40010"), // Devskim: ignore DS137138
context: String::from("1234567890"),
context: String::from(""),
operations: vec![String::from("Subscribe"), String::from("Unsubscribe")],
};

let entity_access_info = EntityAccessInfo {
name: String::from("AmbientAirTemperature"),
id: String::from("dtmi:sdv:hvac:ambient_air_temperature;1"),
description: String::from("Ambient air temperature"),
endpoint_info_list: vec![endpoint_info],
};

let entity_access_info_map = Arc::new(RwLock::new(HashMap::new()));

let digital_twin_registry_impl =
Expand Down
Loading

0 comments on commit 56ab4b9

Please sign in to comment.