Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Devkelley/find agemo w chariott #53

Merged
merged 10 commits into from
Oct 17, 2023
113 changes: 112 additions & 1 deletion core/common/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,21 @@
// SPDX-License-Identifier: MIT

use config::{Config, File, FileFormat};
use log::debug;
use core_protobuf_data_access::chariott::service_discovery::core::v1::{
service_registry_client::ServiceRegistryClient, DiscoverRequest,
};
use log::{debug, info};
use serde_derive::Deserialize;
use std::future::Future;
use tokio::time::{sleep, Duration};
use tonic::{Request, Status};

#[derive(Debug, Deserialize)]
pub struct ServiceIdentifier {
pub namespace: String,
pub name: String,
pub version: String,
}

/// Load the settings.
///
Expand Down Expand Up @@ -64,6 +76,105 @@ where
last_error
}

/// Use Chariott to discover a service.
///
/// # Arguments
/// * `chariott_uri` - Chariott's URI.
/// * `namespace` - The service's namespace.
/// * `name` - The service's name.
/// * `version` - The service's version.
/// # `communication_kind` - The service's communication kind.
/// # `communication_reference` - The service's communication reference.
pub async fn discover_service_using_chariott(
chariott_uri: &str,
namespace: &str,
name: &str,
version: &str,
communication_kind: &str,
devkelley marked this conversation as resolved.
Show resolved Hide resolved
communication_reference: &str,
) -> Result<String, Status> {
let mut client = ServiceRegistryClient::connect(chariott_uri.to_string())
.await
.map_err(|e| Status::internal(e.to_string()))?;

let request = Request::new(DiscoverRequest {
namespace: namespace.to_string(),
name: name.to_string(),
version: version.to_string(),
});

let response = client.discover(request).await?;

let service = response.into_inner().service.ok_or_else(|| Status::not_found("Did not find a service in Chariott with namespace '{namespace}', name '{name}' and version {version}"))?;

if service.communication_kind != communication_kind
&& service.communication_reference != communication_reference
{
return Err(Status::not_found(
devkelley marked this conversation as resolved.
Show resolved Hide resolved
"Did not find a service in Chariott with namespace '{namespace}', name '{name}' and version {version} that has communication kind '{communication_kind} and communication_reference '{communication_reference}''",
));
}

Ok(service.uri)
}

/// Get a service's URI from settings or from Chariott.
/// Will first try to use the URI defined in the service's settings file. If that is not set, will
/// call Chariott to obtain it.
///
/// # Arguments
/// * `service_uri` - Optional, desired service's URI.
/// * `chariott_uri` - Optional, Chariott's URI.
/// * `service_identifier` - Optional, The service's identifiers (name, namespace, version).
/// # `communication_kind` - Optional, The service's communication kind.
/// # `communication_reference` - Optional, The service's communication reference.
pub async fn get_service_uri(
service_uri: Option<String>,
chariott_uri: Option<String>,
service_identifier: Option<ServiceIdentifier>,
communication_kind: &str,
devkelley marked this conversation as resolved.
Show resolved Hide resolved
communication_reference: &str,
) -> Result<String, Status> {
let result = match service_uri {
Some(value) => {
info!("URI set in settings.");
value
}
None => match chariott_uri {
Some(value) => {
info!("Retrieving URI from Chariott.");

let service_identifier = service_identifier.ok_or_else(|| Status::invalid_argument("The settings file must set the service_identifier when the chariott_uri is set."))?;

execute_with_retry(
30,
Duration::from_secs(1),
|| {
discover_service_using_chariott(
&value,
&service_identifier.namespace,
&service_identifier.name,
&service_identifier.version,
communication_kind,
communication_reference,
)
},
Some(format!(
"Attempting to discover service '{}' with chariott.",
service_identifier.name
)),
)
.await?
}
None => Err(Status::invalid_argument(
"The settings file must set the chariott_uri when the service_uri is not set.",
))?,
},
};

Ok(result)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
14 changes: 9 additions & 5 deletions core/invehicle-digital-twin/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use core_protobuf_data_access::chariott::service_discovery::core::v1::{
};
use core_protobuf_data_access::invehicle_digital_twin::v1::invehicle_digital_twin_server::InvehicleDigitalTwinServer;
use env_logger::{Builder, Target};
use futures::Future;
use log::{debug, error, info, LevelFilter};
use parking_lot::RwLock;
use std::boxed::Box;
Expand Down Expand Up @@ -91,10 +90,10 @@ async fn register_invehicle_digital_twin_service_with_chariott(
/// 5. Call and return from the block `.add_module()` on the server with the updated middleware and
/// module.
#[allow(unused_assignments, unused_mut)] // Necessary when no extra modules are built.
fn build_server_and_serve<S>(
async fn build_server_and_serve<S>(
addr: SocketAddr,
base_service: S,
) -> impl Future<Output = Result<(), tonic::transport::Error>>
) -> Result<(), Box<dyn std::error::Error>>
where
S: Service<http::Request<Body>, Response = http::Response<BoxBody>, Error = Infallible>
+ NamedService
Expand All @@ -109,7 +108,10 @@ where
// (1) Adds the Managed Subscribe module to the service.
let server = {
// (2) Initialize the Managed Subscribe module, which implements GrpcModule.
let managed_subscribe_module = ManagedSubscribeModule::new();
let managed_subscribe_module = ManagedSubscribeModule::new().await.map_err(|error| {
error!("Unable to create Managed Subscribe module.");
error
})?;

// (3) Create interceptor layer to be added to the server.
let managed_subscribe_layer =
Expand All @@ -119,6 +121,8 @@ where
let current_middleware = server.middleware.clone();
let new_middleware = current_middleware.layer(managed_subscribe_layer);

info!("Initialized Managed Subscribe module.");

// (5) Add the module with the updated middleware stack to the server.
server.add_module(new_middleware, Box::new(managed_subscribe_module))
};
Expand All @@ -127,7 +131,7 @@ where
let builder = server.construct_server().add_service(base_service);

// Start the server.
builder.serve(addr)
builder.serve(addr).await.map_err(|error| error.into())
}

#[tokio::main]
Expand Down
34 changes: 22 additions & 12 deletions core/module/managed_subscribe/src/managed_subscribe_module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use core_protobuf_data_access::module::managed_subscribe::v1::{
};

use common::grpc_module::GrpcModule;
use common::utils::{execute_with_retry, load_settings};
use common::utils::{execute_with_retry, get_service_uri, load_settings, ServiceIdentifier};
use log::{debug, error, info};
use parking_lot::RwLock;
use serde_derive::Deserialize;
Expand All @@ -37,6 +37,9 @@ use super::managed_subscribe_interceptor::ManagedSubscribeInterceptor;
const CONFIG_FILENAME: &str = "managed_subscribe_settings";
const SERVICE_PROTOCOL: &str = "grpc";

const MANAGED_SUBSCRIBE_COMMUNICATION_KIND: &str = "mqtt_v5";
const MANAGED_SUBSCRIBE_COMMUNICATION_REFERENCE: &str = "pubsub.v1.pubsub.proto";

// Managed Subscribe action constants.
const PUBLISH_ACTION: &str = "PUBLISH";
const STOP_PUBLISH_ACTION: &str = "STOP_PUBLISH";
Expand All @@ -61,8 +64,9 @@ pub enum TopicAction {
#[derive(Debug, Deserialize)]
pub struct ConfigSettings {
pub base_authority: String,
pub managed_subscribe_uri: String,
pub managed_subscribe_uri: Option<String>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider using an enum for these new config entries so that it is truly mutually exclusive

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to an enum, please take a look

pub chariott_uri: Option<String>,
pub managed_subscribe_service_identifier: Option<ServiceIdentifier>,
}

#[derive(Clone, Debug)]
Expand All @@ -73,28 +77,34 @@ pub struct ManagedSubscribeModule {
pub store: Arc<RwLock<ManagedSubscribeStore>>,
}

impl Default for ManagedSubscribeModule {
fn default() -> Self {
Self::new()
}
}

impl ManagedSubscribeModule {
/// Creates a new managed subscribe module object.
pub fn new() -> Self {
pub async fn new() -> Result<Self, Status> {
// Get module information from the configuration settings.
let config = load_settings::<ConfigSettings>(CONFIG_FILENAME);
let endpoint = config.base_authority;
let service_uri = format!("http://{endpoint}"); // Devskim: ignore DS137138

let store = Arc::new(RwLock::new(ManagedSubscribeStore::new()));

ManagedSubscribeModule {
managed_subscribe_uri: config.managed_subscribe_uri,
info!("Getting Managed Subscribe URI.");

// Get the uri of the managed subscribe service from settings or Chariott.
let managed_subscribe_uri = get_service_uri(
config.managed_subscribe_uri,
config.chariott_uri,
config.managed_subscribe_service_identifier,
MANAGED_SUBSCRIBE_COMMUNICATION_KIND,
MANAGED_SUBSCRIBE_COMMUNICATION_REFERENCE,
)
.await?;

Ok(ManagedSubscribeModule {
managed_subscribe_uri,
service_uri,
service_protocol: SERVICE_PROTOCOL.to_string(),
store,
}
})
}

/// Creates a new managed subscribe interceptor that shares data with the current instance of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,16 @@
base_authority: <<value>>

# The URI that the Managed Subscribe service listens on for requests.
# Comment out this setting if you wish to use Chariott to discover the service.
managed_subscribe_uri: <<value>>

# The URI that the Chariott service listens on for requests.
# If you wish to use Chariott to discover Agemo, then uncomment this setting.
# If you wish to use Chariott to discover the Managed Subscribe service, then uncomment this setting.
# chariott_uri: <<value>>

# The service identifier for the Managed Subscribe service. This will be used when calling Chariott.
# This setting should be uncommented when the chariott_uri setting is uncommented.
# managed_subscribe_service_identifier:
# namespace: <<value>>
# name: <<value>>
# version: <<value>>
3 changes: 3 additions & 0 deletions samples/managed_subscribe/.accepted_words.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@ https
Ibeji
InVehicle
invehicle
md
MQTT
pubsub
repo
sdv
svg
TopicManagementCB
uri
Expand Down
74 changes: 62 additions & 12 deletions samples/managed_subscribe/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,38 @@ disconnects it will stop publishing to that dynamically generated topic.

### Setup

1. Create the four config files with the following contents, if they are not already there:<br><br>
---- consumer_settings.yaml ----<br>
`invehicle_digital_twin_uri: "http://0.0.0.0:5010"`<br><br>
---- invehicle_digital_twin_settings.yaml ----<br>
`invehicle_digital_twin_authority: "0.0.0.0:5010"`<br><br>
---- managed_subscribe_settings.yaml ----<br>
`base_authority: "0.0.0.0:5010"`<br>
`managed_subscribe_uri: "http://0.0.0.0:50051"`<br><br>
---- provider_settings.yaml ----<br>
`invehicle_digital_twin_uri: "http://0.0.0.0:5010"`<br><br>

1. Build the invehicle_digital_twin service with the `managed_subscribe` feature enabled.
1. Create the four config files with the following contents, if they are not already there:

---- consumer_settings.yaml ----

```yaml
invehicle_digital_twin_uri: "http://0.0.0.0:5010"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is out of scope for this PR, but I think it would be very helpful to have sample configs for these so that users don't have to manually create them. might be worth creating an issue to track

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have a standardized config process for all of our services if possible. I will look into getting an issue created.

```

---- invehicle_digital_twin_settings.yaml ----

```yaml
invehicle_digital_twin_authority: "0.0.0.0:5010"
```

---- managed_subscribe_settings.yaml ----

```yaml
base_authority: "0.0.0.0:5010"
managed_subscribe_uri: "http://0.0.0.0:50051"
```

---- provider_settings.yaml ----

```yaml
invehicle_digital_twin_uri: "http://0.0.0.0:5010"
```

1. Build the project with the `managed_subscribe` feature enabled.

```shell
cargo build --features managed_subscribe
```

### Running the Sample

Expand Down Expand Up @@ -59,6 +79,36 @@ where you are running the demo.<br><br>
1. To shutdown, use control-c on the consumer first. This will show the topic thread being shutdown
in the provider. Then control-c the other windows when you wish to stop the demo.

#### Running with Chariott

If you want to use Chariott with this sample:

1. Update the following settings:

---- invehicle_digital_twin_settings.yaml ----

```yaml
invehicle_digital_twin_authority: "0.0.0.0:5010"
chariott_uri: "http://0.0.0.0:50000"
```

---- managed_subscribe_settings.yaml ----

```yaml
base_authority: "0.0.0.0:5010"
chariott_uri: "http://0.0.0.0:50000"
managed_subscribe_service_identifier:
namespace: "sdv.pubsub"
name: "dynamic.pubsub"
version: "0.1.0"
```

1. Ensure Chariott is [running](../../README.md#using-chariott).

1. Start Agemo with [Chariott enabled](https://github.com/eclipse-chariott/Agemo/tree/main/pub-sub-service#running-with-chariott).

1. Follow the [Running the Sample Steps](#steps) as normal.

### Managed Subscribe Module

The managed subscribe module utilizes the [Agemo](https://github.com/eclipse-chariott/Agemo)
Expand Down