Skip to content

Commit

Permalink
Digital Twin Graph
Browse files Browse the repository at this point in the history
  • Loading branch information
ashbeitz committed May 1, 2024
1 parent 0aa2baf commit c663f87
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 25 deletions.
57 changes: 32 additions & 25 deletions core/module/digital_twin_graph/src/digital_twin_graph_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use core_protobuf_data_access::module::digital_twin_registry::v1::{
EndpointInfo, FindByInstanceIdRequest, FindByInstanceIdResponse, FindByModelIdRequest,
FindByModelIdResponse,
};
use log::{debug, info, warn};
use log::{debug, warn};
use std::sync::Arc;
use tokio::sync::broadcast;
use tokio::time::{sleep, timeout, Duration};
Expand All @@ -36,8 +36,13 @@ pub struct DigitalTwinGraphImpl {
}

impl DigitalTwinGraphImpl {
/// The base duration in milliseconds for the backoff strategy.
const BACKOFF_BASE_DURATION_IN_MILLIS: u64 = 100;

/// The maximum number of retries for the backoff strategy.
const MAX_RETRIES: usize = 100;

/// The timeout period in milliseconds for the backoff strategy.
const TIMEOUT_PERIOD_IN_MILLIS: u64 = 5000;

/// Create a new instance of a DigitalTwinGraphImpl.
Expand Down Expand Up @@ -69,14 +74,14 @@ impl DigitalTwinGraphImpl {
})
}

/// Use the Digital Twin Registery service to discover the endpoints for digital twin providers that support the specified model id, protocol and operations.
/// Note: This operation will be retried when there is a failure.
/// Use the Digital Twin Registery service to find the endpoints for digital twin providers that support
/// the specified model id, protocol and operations.
///
/// # Arguments
/// * `model_id` - The matching model id.
/// * `protocol` - The required protocol.
/// * `operations` - The required operations.
pub async fn discover_digital_twin_providers_with_model_id(
pub async fn find_digital_twin_providers_with_model_id(
&self,
model_id: &str,
protocol: &str,
Expand Down Expand Up @@ -112,14 +117,13 @@ impl DigitalTwinGraphImpl {
.collect())
}

/// Use the Digital Twin Registry service to discover the endpoints for digital twin providers that support the specified instance id, protocol and operations.
/// Note: This operation will be retried when there is a failure.
/// Use the Digital Twin Registry service to find the endpoints for digital twin providers that support the specified instance id, protocol and operations.
///
/// # Arguments
/// * `instance_id` - The matching instance id.
/// * `protocol` - The required protocol.
/// * `operations` - The required operations.
pub async fn discover_digital_twin_providers_with_instance_id(
pub async fn find_digital_twin_providers_with_instance_id(
&self,
instance_id: &str,
protocol: &str,
Expand Down Expand Up @@ -174,14 +178,18 @@ impl DigitalTwinGraph for DigitalTwinGraphImpl {

// Retrieve the provider details.
let provider_endpoint_info_list = self
.discover_digital_twin_providers_with_model_id(
.find_digital_twin_providers_with_model_id(
model_id.as_str(),
digital_twin_protocol::GRPC,
&[digital_twin_operation::GET.to_string()],
)
.await
.map_err(tonic::Status::internal)?;

// TODO: Organize result into a map of instance id to associated endpoint infos.

Check notice

Code scanning / devskim

A "TODO" or similar was left in source code, possibly indicating incomplete functionality Note

Suspicious comment
// This will allow us to ensure that each instance's values only appears
// once in the response. We can also try other providers when one fails to respond.

let mut values = vec![];

for provider_endpoint_info in &provider_endpoint_info_list {
Expand Down Expand Up @@ -228,27 +236,28 @@ impl DigitalTwinGraph for DigitalTwinGraphImpl {
let mut answer_request: AnswerRequest = Default::default();
let mut attempts_after_failure = 0;
while attempts_after_failure < Self::MAX_RETRIES {
match timeout(Duration::from_secs(5), rx.recv()).await {
match timeout(Duration::from_millis(Self::TIMEOUT_PERIOD_IN_MILLIS), rx.recv())
.await
{
Ok(Ok(request)) => {
if ask_id == request.ask_id {
// We have received the answer request that we are expecting.
answer_request = request;
break;
} else {
// Ignore this answer request, as it is not the one that we are expecting.
warn!("Received an unexpected answer request with ask_id '{}'. We will retry in a moment.", request.ask_id);
// Immediately try again. This was not a failure, so we do not increment attempts_after_failure or sleep.
continue;
}
}
Ok(Err(error_message)) => {
warn!("Failed to receive the answer request. The error message is '{}'. We will retry in a moment.", error_message);
warn!("Failed to receive the answer request. The error message is '{}'. We may retry in a moment.", error_message);
sleep(Duration::from_millis(Self::BACKOFF_BASE_DURATION_IN_MILLIS)).await;
attempts_after_failure += 1;
continue;
}
Err(error_message) => {
warn!("Failed to receive the answer request. The error message is '{}'. We will retry in a moment.", error_message);
warn!("Failed to receive the answer request. The error message is '{}'. We may retry in a moment.", error_message);
sleep(Duration::from_millis(Self::BACKOFF_BASE_DURATION_IN_MILLIS)).await;
attempts_after_failure += 1;
continue;
Expand Down Expand Up @@ -281,11 +290,11 @@ impl DigitalTwinGraph for DigitalTwinGraphImpl {
let instance_id = get_request.instance_id;
let member_path = get_request.member_path;

info!("Received a get request for instance id {instance_id}");
debug!("Received a get request for instance id {instance_id}");

// Retrieve the provider details.
let provider_endpoint_info_list = self
.discover_digital_twin_providers_with_instance_id(
.find_digital_twin_providers_with_instance_id(
instance_id.as_str(),
digital_twin_protocol::GRPC,
&[digital_twin_operation::GET.to_string()],
Expand Down Expand Up @@ -349,27 +358,26 @@ impl DigitalTwinGraph for DigitalTwinGraphImpl {
break;
} else {
// Ignore this answer request, as it is not the one that we are expecting.
warn!("Received an unexpected answer request with ask_id '{}'. We will retry in a moment.", request.ask_id);
// Immediately try again. This was not a failure, so we do not increment attempts_after_failure or sleep.
continue;
}
}
Ok(Err(error_message)) => {
warn!("Failed to receive the answer request. The error message is '{}'. We will retry in a moment.", error_message);
warn!("Failed to receive the answer request. The error message is '{}'. We may retry in a moment.", error_message);
sleep(Duration::from_millis(Self::BACKOFF_BASE_DURATION_IN_MILLIS)).await;
attempts_after_failure += 1;
continue;
}
Err(error_message) => {
warn!("Failed to receive the answer request. The error message is '{}'. We will retry in a moment.", error_message);
warn!("Failed to receive the answer request. The error message is '{}'. We may retry in a moment.", error_message);
sleep(Duration::from_millis(Self::BACKOFF_BASE_DURATION_IN_MILLIS)).await;
attempts_after_failure += 1;
continue;
}
}
}

info!(
debug!(
"Received an answer request. The ask_id is '{}'. The payload is '{}",
answer_request.ask_id, answer_request.payload
);
Expand Down Expand Up @@ -403,11 +411,11 @@ impl DigitalTwinGraph for DigitalTwinGraphImpl {
let member_path = invoke_request.member_path;
let request_payload = invoke_request.request_payload;

info!("Received an invoke request for instance id {instance_id}");
debug!("Received an invoke request for instance id {instance_id}");

// Retrieve the provider details.
let provider_endpoint_info_list = self
.discover_digital_twin_providers_with_instance_id(
.find_digital_twin_providers_with_instance_id(
instance_id.as_str(),
digital_twin_protocol::GRPC,
&[digital_twin_operation::INVOKE.to_string()],
Expand Down Expand Up @@ -464,35 +472,34 @@ impl DigitalTwinGraph for DigitalTwinGraphImpl {
let mut attempts_after_failure = 0;
const MAX_ATTEMPTS_AFTER_FAILURE: u8 = 10;
while attempts_after_failure < MAX_ATTEMPTS_AFTER_FAILURE {
match timeout(Duration::from_secs(5), rx.recv()).await {
match timeout(Duration::from_millis(Self::TIMEOUT_PERIOD_IN_MILLIS), rx.recv()).await {
Ok(Ok(request)) => {
if ask_id == request.ask_id {
// We have received the answer request that we are expecting.
answer_request = request;
break;
} else {
// Ignore this answer request, as it is not the one that we are expecting.
warn!("Received an unexpected answer request with ask_id '{}'. We will retry in a moment.", request.ask_id);
// Immediately try again. This was not a failure, so we do not increment attempts_after_failure or sleep.
continue;
}
}
Ok(Err(error_message)) => {
warn!("Failed to receive the answer request. The error message is '{}'. We will retry in a moment.", error_message);
warn!("Failed to receive the answer request. The error message is '{}'. We may retry in a moment.", error_message);
sleep(Duration::from_secs(1)).await;
attempts_after_failure += 1;
continue;
}
Err(error_message) => {
warn!("Failed to receive the answer request. The error message is '{}'. We will retry in a moment.", error_message);
warn!("Failed to receive the answer request. The error message is '{}'. We may retry in a moment.", error_message);
sleep(Duration::from_secs(1)).await;
attempts_after_failure += 1;
continue;
}
}
}

info!(
debug!(
"Received an answer request. The ask_id is '{}'. The payload is '{}",
answer_request.ask_id, answer_request.payload
);
Expand Down
3 changes: 3 additions & 0 deletions samples/graph/consumer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ use samples_protobuf_data_access::digital_twin_graph::v1::digital_twin_graph::{F
use tokio_retry::Retry;
use tokio_retry::strategy::{ExponentialBackoff, jitter};

// The base duration in milliseconds for the exponential backoff strategy.
const BACKOFF_BASE_DURATION_IN_MILLIS: u64 = 100;

// The maximum number of retries for the exponential backoff strategy.
const MAX_RETRIES: usize = 100;

/// Connect to the digital twin graph service.
Expand Down
3 changes: 3 additions & 0 deletions samples/graph/seat_massager_provider/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ use tokio_retry::strategy::{ExponentialBackoff, jitter};

use crate::request_impl::{InstanceData, ProviderState, RequestImpl};

/// The base duration in milliseconds for the exponential backoff strategy.
const BACKOFF_BASE_DURATION_IN_MILLIS: u64 = 100;

/// The maximum number of retries.
const MAX_RETRIES: usize = 100;

/// Add an entry to the instance map.
Expand Down
3 changes: 3 additions & 0 deletions samples/graph/seat_massager_provider/src/request_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ pub struct RequestImpl {

/// The implementation for the Request interface, which is used to handle requests from the consumer.
impl RequestImpl {
/// The base duration for the exponential backoff strategy in milliseconds.
const BACKOFF_BASE_DURATION_IN_MILLIS: u64 = 100;

/// The maximum number of retries.
const MAX_RETRIES: usize = 100;

/// Get implementation.
Expand Down
3 changes: 3 additions & 0 deletions samples/graph/vehicle_core_provider/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ use tokio_retry::strategy::{ExponentialBackoff, jitter};

use crate::request_impl::{InstanceData, ProviderState, RequestImpl};

/// The base duration in milliseconds for the exponential backoff strategy.
const BACKOFF_BASE_DURATION_IN_MILLIS: u64 = 100;

/// The maximum number of retries.
const MAX_RETRIES: usize = 100;

/// Add an entry to the instance map.
Expand Down
3 changes: 3 additions & 0 deletions samples/graph/vehicle_core_provider/src/request_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ pub struct RequestImpl {

/// The implementation for the Request interface, which is used to handle requests from the consumer.
impl RequestImpl {
///
const BACKOFF_BASE_DURATION_IN_MILLIS: u64 = 100;

///
const MAX_RETRIES: usize = 100;

/// Get implementation.
Expand Down

0 comments on commit c663f87

Please sign in to comment.