From c663f87eb0c04a4dd60bde94921700da1e8ff228 Mon Sep 17 00:00:00 2001 From: Ash Beitz <8304894+ashbeitz@users.noreply.github.com> Date: Wed, 1 May 2024 10:27:03 -0700 Subject: [PATCH] Digital Twin Graph --- .../src/digital_twin_graph_impl.rs | 57 +++++++++++-------- samples/graph/consumer/src/main.rs | 3 + .../graph/seat_massager_provider/src/main.rs | 3 + .../src/request_impl.rs | 3 + .../graph/vehicle_core_provider/src/main.rs | 3 + .../vehicle_core_provider/src/request_impl.rs | 3 + 6 files changed, 47 insertions(+), 25 deletions(-) diff --git a/core/module/digital_twin_graph/src/digital_twin_graph_impl.rs b/core/module/digital_twin_graph/src/digital_twin_graph_impl.rs index 61c1aa9d..e535ae49 100644 --- a/core/module/digital_twin_graph/src/digital_twin_graph_impl.rs +++ b/core/module/digital_twin_graph/src/digital_twin_graph_impl.rs @@ -15,7 +15,7 @@ use core_protobuf_data_access::module::digital_twin_registry::v1::{ EndpointInfo, FindByInstanceIdRequest, FindByInstanceIdResponse, FindByModelIdRequest, FindByModelIdResponse, }; -use log::{debug, info, warn}; +use log::{debug, warn}; use std::sync::Arc; use tokio::sync::broadcast; use tokio::time::{sleep, timeout, Duration}; @@ -36,8 +36,13 @@ pub struct DigitalTwinGraphImpl { } impl DigitalTwinGraphImpl { + /// The base duration in milliseconds for the backoff strategy. const BACKOFF_BASE_DURATION_IN_MILLIS: u64 = 100; + + /// The maximum number of retries for the backoff strategy. const MAX_RETRIES: usize = 100; + + /// The timeout period in milliseconds for the backoff strategy. const TIMEOUT_PERIOD_IN_MILLIS: u64 = 5000; /// Create a new instance of a DigitalTwinGraphImpl. @@ -69,14 +74,14 @@ impl DigitalTwinGraphImpl { }) } - /// Use the Digital Twin Registery service to discover the endpoints for digital twin providers that support the specified model id, protocol and operations. - /// Note: This operation will be retried when there is a failure. + /// Use the Digital Twin Registery service to find the endpoints for digital twin providers that support + /// the specified model id, protocol and operations. /// /// # Arguments /// * `model_id` - The matching model id. /// * `protocol` - The required protocol. /// * `operations` - The required operations. - pub async fn discover_digital_twin_providers_with_model_id( + pub async fn find_digital_twin_providers_with_model_id( &self, model_id: &str, protocol: &str, @@ -112,14 +117,13 @@ impl DigitalTwinGraphImpl { .collect()) } - /// Use the Digital Twin Registry service to discover the endpoints for digital twin providers that support the specified instance id, protocol and operations. - /// Note: This operation will be retried when there is a failure. + /// Use the Digital Twin Registry service to find the endpoints for digital twin providers that support the specified instance id, protocol and operations. /// /// # Arguments /// * `instance_id` - The matching instance id. /// * `protocol` - The required protocol. /// * `operations` - The required operations. - pub async fn discover_digital_twin_providers_with_instance_id( + pub async fn find_digital_twin_providers_with_instance_id( &self, instance_id: &str, protocol: &str, @@ -174,7 +178,7 @@ impl DigitalTwinGraph for DigitalTwinGraphImpl { // Retrieve the provider details. let provider_endpoint_info_list = self - .discover_digital_twin_providers_with_model_id( + .find_digital_twin_providers_with_model_id( model_id.as_str(), digital_twin_protocol::GRPC, &[digital_twin_operation::GET.to_string()], @@ -182,6 +186,10 @@ impl DigitalTwinGraph for DigitalTwinGraphImpl { .await .map_err(tonic::Status::internal)?; + // TODO: Organize result into a map of instance id to associated endpoint infos. + // This will allow us to ensure that each instance's values only appears + // once in the response. We can also try other providers when one fails to respond. + let mut values = vec![]; for provider_endpoint_info in &provider_endpoint_info_list { @@ -228,7 +236,9 @@ impl DigitalTwinGraph for DigitalTwinGraphImpl { let mut answer_request: AnswerRequest = Default::default(); let mut attempts_after_failure = 0; while attempts_after_failure < Self::MAX_RETRIES { - match timeout(Duration::from_secs(5), rx.recv()).await { + match timeout(Duration::from_millis(Self::TIMEOUT_PERIOD_IN_MILLIS), rx.recv()) + .await + { Ok(Ok(request)) => { if ask_id == request.ask_id { // We have received the answer request that we are expecting. @@ -236,19 +246,18 @@ impl DigitalTwinGraph for DigitalTwinGraphImpl { break; } else { // Ignore this answer request, as it is not the one that we are expecting. - warn!("Received an unexpected answer request with ask_id '{}'. We will retry in a moment.", request.ask_id); // Immediately try again. This was not a failure, so we do not increment attempts_after_failure or sleep. continue; } } Ok(Err(error_message)) => { - warn!("Failed to receive the answer request. The error message is '{}'. We will retry in a moment.", error_message); + warn!("Failed to receive the answer request. The error message is '{}'. We may retry in a moment.", error_message); sleep(Duration::from_millis(Self::BACKOFF_BASE_DURATION_IN_MILLIS)).await; attempts_after_failure += 1; continue; } Err(error_message) => { - warn!("Failed to receive the answer request. The error message is '{}'. We will retry in a moment.", error_message); + warn!("Failed to receive the answer request. The error message is '{}'. We may retry in a moment.", error_message); sleep(Duration::from_millis(Self::BACKOFF_BASE_DURATION_IN_MILLIS)).await; attempts_after_failure += 1; continue; @@ -281,11 +290,11 @@ impl DigitalTwinGraph for DigitalTwinGraphImpl { let instance_id = get_request.instance_id; let member_path = get_request.member_path; - info!("Received a get request for instance id {instance_id}"); + debug!("Received a get request for instance id {instance_id}"); // Retrieve the provider details. let provider_endpoint_info_list = self - .discover_digital_twin_providers_with_instance_id( + .find_digital_twin_providers_with_instance_id( instance_id.as_str(), digital_twin_protocol::GRPC, &[digital_twin_operation::GET.to_string()], @@ -349,19 +358,18 @@ impl DigitalTwinGraph for DigitalTwinGraphImpl { break; } else { // Ignore this answer request, as it is not the one that we are expecting. - warn!("Received an unexpected answer request with ask_id '{}'. We will retry in a moment.", request.ask_id); // Immediately try again. This was not a failure, so we do not increment attempts_after_failure or sleep. continue; } } Ok(Err(error_message)) => { - warn!("Failed to receive the answer request. The error message is '{}'. We will retry in a moment.", error_message); + warn!("Failed to receive the answer request. The error message is '{}'. We may retry in a moment.", error_message); sleep(Duration::from_millis(Self::BACKOFF_BASE_DURATION_IN_MILLIS)).await; attempts_after_failure += 1; continue; } Err(error_message) => { - warn!("Failed to receive the answer request. The error message is '{}'. We will retry in a moment.", error_message); + warn!("Failed to receive the answer request. The error message is '{}'. We may retry in a moment.", error_message); sleep(Duration::from_millis(Self::BACKOFF_BASE_DURATION_IN_MILLIS)).await; attempts_after_failure += 1; continue; @@ -369,7 +377,7 @@ impl DigitalTwinGraph for DigitalTwinGraphImpl { } } - info!( + debug!( "Received an answer request. The ask_id is '{}'. The payload is '{}", answer_request.ask_id, answer_request.payload ); @@ -403,11 +411,11 @@ impl DigitalTwinGraph for DigitalTwinGraphImpl { let member_path = invoke_request.member_path; let request_payload = invoke_request.request_payload; - info!("Received an invoke request for instance id {instance_id}"); + debug!("Received an invoke request for instance id {instance_id}"); // Retrieve the provider details. let provider_endpoint_info_list = self - .discover_digital_twin_providers_with_instance_id( + .find_digital_twin_providers_with_instance_id( instance_id.as_str(), digital_twin_protocol::GRPC, &[digital_twin_operation::INVOKE.to_string()], @@ -464,7 +472,7 @@ impl DigitalTwinGraph for DigitalTwinGraphImpl { let mut attempts_after_failure = 0; const MAX_ATTEMPTS_AFTER_FAILURE: u8 = 10; while attempts_after_failure < MAX_ATTEMPTS_AFTER_FAILURE { - match timeout(Duration::from_secs(5), rx.recv()).await { + match timeout(Duration::from_millis(Self::TIMEOUT_PERIOD_IN_MILLIS), rx.recv()).await { Ok(Ok(request)) => { if ask_id == request.ask_id { // We have received the answer request that we are expecting. @@ -472,19 +480,18 @@ impl DigitalTwinGraph for DigitalTwinGraphImpl { break; } else { // Ignore this answer request, as it is not the one that we are expecting. - warn!("Received an unexpected answer request with ask_id '{}'. We will retry in a moment.", request.ask_id); // Immediately try again. This was not a failure, so we do not increment attempts_after_failure or sleep. continue; } } Ok(Err(error_message)) => { - warn!("Failed to receive the answer request. The error message is '{}'. We will retry in a moment.", error_message); + warn!("Failed to receive the answer request. The error message is '{}'. We may retry in a moment.", error_message); sleep(Duration::from_secs(1)).await; attempts_after_failure += 1; continue; } Err(error_message) => { - warn!("Failed to receive the answer request. The error message is '{}'. We will retry in a moment.", error_message); + warn!("Failed to receive the answer request. The error message is '{}'. We may retry in a moment.", error_message); sleep(Duration::from_secs(1)).await; attempts_after_failure += 1; continue; @@ -492,7 +499,7 @@ impl DigitalTwinGraph for DigitalTwinGraphImpl { } } - info!( + debug!( "Received an answer request. The ask_id is '{}'. The payload is '{}", answer_request.ask_id, answer_request.payload ); diff --git a/samples/graph/consumer/src/main.rs b/samples/graph/consumer/src/main.rs index 9a72564f..157ef3d5 100644 --- a/samples/graph/consumer/src/main.rs +++ b/samples/graph/consumer/src/main.rs @@ -15,7 +15,10 @@ use samples_protobuf_data_access::digital_twin_graph::v1::digital_twin_graph::{F use tokio_retry::Retry; use tokio_retry::strategy::{ExponentialBackoff, jitter}; +// The base duration in milliseconds for the exponential backoff strategy. const BACKOFF_BASE_DURATION_IN_MILLIS: u64 = 100; + +// The maximum number of retries for the exponential backoff strategy. const MAX_RETRIES: usize = 100; /// Connect to the digital twin graph service. diff --git a/samples/graph/seat_massager_provider/src/main.rs b/samples/graph/seat_massager_provider/src/main.rs index 40f95640..ac82cc50 100644 --- a/samples/graph/seat_massager_provider/src/main.rs +++ b/samples/graph/seat_massager_provider/src/main.rs @@ -25,7 +25,10 @@ use tokio_retry::strategy::{ExponentialBackoff, jitter}; use crate::request_impl::{InstanceData, ProviderState, RequestImpl}; +/// The base duration in milliseconds for the exponential backoff strategy. const BACKOFF_BASE_DURATION_IN_MILLIS: u64 = 100; + +/// The maximum number of retries. const MAX_RETRIES: usize = 100; /// Add an entry to the instance map. diff --git a/samples/graph/seat_massager_provider/src/request_impl.rs b/samples/graph/seat_massager_provider/src/request_impl.rs index b9ca1ea0..55929a59 100644 --- a/samples/graph/seat_massager_provider/src/request_impl.rs +++ b/samples/graph/seat_massager_provider/src/request_impl.rs @@ -44,7 +44,10 @@ pub struct RequestImpl { /// The implementation for the Request interface, which is used to handle requests from the consumer. impl RequestImpl { + /// The base duration for the exponential backoff strategy in milliseconds. const BACKOFF_BASE_DURATION_IN_MILLIS: u64 = 100; + + /// The maximum number of retries. const MAX_RETRIES: usize = 100; /// Get implementation. diff --git a/samples/graph/vehicle_core_provider/src/main.rs b/samples/graph/vehicle_core_provider/src/main.rs index 95a2ba74..8e1b14c4 100644 --- a/samples/graph/vehicle_core_provider/src/main.rs +++ b/samples/graph/vehicle_core_provider/src/main.rs @@ -25,7 +25,10 @@ use tokio_retry::strategy::{ExponentialBackoff, jitter}; use crate::request_impl::{InstanceData, ProviderState, RequestImpl}; +/// The base duration in milliseconds for the exponential backoff strategy. const BACKOFF_BASE_DURATION_IN_MILLIS: u64 = 100; + +/// The maximum number of retries. const MAX_RETRIES: usize = 100; /// Add an entry to the instance map. diff --git a/samples/graph/vehicle_core_provider/src/request_impl.rs b/samples/graph/vehicle_core_provider/src/request_impl.rs index f43ef7bb..8e19c7df 100644 --- a/samples/graph/vehicle_core_provider/src/request_impl.rs +++ b/samples/graph/vehicle_core_provider/src/request_impl.rs @@ -43,7 +43,10 @@ pub struct RequestImpl { /// The implementation for the Request interface, which is used to handle requests from the consumer. impl RequestImpl { + /// const BACKOFF_BASE_DURATION_IN_MILLIS: u64 = 100; + + /// const MAX_RETRIES: usize = 100; /// Get implementation.