Skip to content

Commit

Permalink
refactor(storage-manager): separate creation/start of StorageService
Browse files Browse the repository at this point in the history
This commit separates creating the `StorageService` from starting it.

This change is motivated by the Replication feature: when performing the
initial alignment we want to delay the Storage from answering queries
until after the initial alignment has been performed.

In order to have this functionality we need to be able to dissociate
creating the `StorageService` from starting it.

As the method `start_storage_queryable_subscriber` takes ownership of
the `StorageService`, it became mandatory to first create the
`StorageService`, then start the Replication and lastly start the
Storage. Because of this, as the Replication code was inside a task, the
code to create and start the Storage was also moved inside the task.

* plugins/zenoh-plugin-storage-manager/src/replication/service.rs:
  - Take a reference over the `StorageService` structure as it is only
    needed before spawning the different Replication tasks. The
    StorageService is still needed to call `process_sample`.
  - Clone the `Arc` of the underlying Storage before spawning the
    Replication tasks.

* plugins/zenoh-plugin-storage-manager/src/storages_mgt/mod.rs:
  - Move the logging until starting the Storage.
  - Move the code starting the Storage inside the task.
  - Start the `StorageService` after having started the
    `ReplicationService`.

* plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs:
  - Renamed the function `start` to `new` as it now only creates an
    instance of the `StorageService`.
  - Removed the parameter `rx` from the call to `new` as it no longer
    also starts it.
  - Removed the call to `start_storage_queryable_subscriber` from `new`.
  - Changed the visibility of the method
    `start_storage_queryable_subscriber` to `pub(crate)` as it is called
    from outside the `service` module.
  - Added logging information before the Storage "loop" is started (to
    help confirm, with the logs, the order in which the different
    elements are started).

Signed-off-by: Julien Loudet <[email protected]>
  • Loading branch information
J-Loudet committed Sep 24, 2024
1 parent a753d80 commit 9ab1130
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl ReplicationService {
/// received.
pub async fn spawn_start(
zenoh_session: Arc<Session>,
storage_service: StorageService,
storage_service: &StorageService,
storage_key_expr: OwnedKeyExpr,
replication_log: Arc<RwLock<LogLatest>>,
latest_updates: Arc<RwLock<LatestUpdates>>,
Expand Down Expand Up @@ -98,13 +98,15 @@ impl ReplicationService {
);
}

let storage = storage_service.storage.clone();

tokio::task::spawn(async move {
let replication = Replication {
zenoh_session,
replication_log,
storage_key_expr,
latest_updates,
storage: storage_service.storage.clone(),
storage,
};

let replication_service = Self {
Expand Down
67 changes: 34 additions & 33 deletions plugins/zenoh-plugin-storage-manager/src/storages_mgt/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,8 @@ pub(crate) async fn create_and_start_storage(
let storage_name = parts[7];
let name = format!("{uuid}/{storage_name}");

tracing::trace!("Start storage '{}' on keyexpr '{}'", name, config.key_expr);

let (tx, rx_storage) = tokio::sync::broadcast::channel(1);
let rx_replication = tx.subscribe();

let mut entries = match storage.get_all_entries().await {
Ok(entries) => entries
Expand Down Expand Up @@ -111,50 +110,52 @@ pub(crate) async fn create_and_start_storage(
let latest_updates = Arc::new(RwLock::new(latest_updates));

let storage = Arc::new(Mutex::new(storage));
let storage_service = StorageService::start(
zenoh_session.clone(),
config.clone(),
&name,
storage,
capability,
rx_storage,
CacheLatest::new(latest_updates.clone(), replication_log.clone()),
)
.await;

// Testing if the `replication_log` is set is equivalent to testing if the `replication` is
// set: the `replication_log` is only set when the latter is.
if let Some(replication_log) = replication_log {
let rx_replication = tx.subscribe();

// NOTE Although the function `ReplicationService::spawn_start` spawns its own tasks, we
// still need to call it within a dedicated task because the Zenoh routing tables are
// populated only after the plugins have been loaded.
//
// If we don't wait for the routing tables to be populated the initial alignment
// (i.e. querying any Storage on the network handling the same key expression), will
// never work.
//
// TODO Do we really want to perform such an initial alignment? Because this query will
// target any Storage that matches the same key expression, regardless of if they have
// been configured to be replicated.
tokio::task::spawn(async move {

// NOTE The StorageService method `start_storage_queryable_subscriber` does not spawn its own
// task to loop/wait on the Subscriber and Queryable it creates. Thus we spawn the task
// here.
//
// Doing so also allows us to return early from the creation of the Storage, creation which
// blocks populating the routing tables.
//
// TODO Do we really want to perform such an initial alignment? Because this query will
// target any Storage that matches the same key expression, regardless of if they have
// been configured to be replicated.
tokio::task::spawn(async move {
let storage_service = StorageService::new(
zenoh_session.clone(),
config.clone(),
&name,
storage,
capability,
CacheLatest::new(latest_updates.clone(), replication_log.clone()),
)
.await;

// Testing if the `replication_log` is set is equivalent to testing if the `replication` is
// set: the `replication_log` is only set when the latter is.
if let Some(replication_log) = replication_log {
tracing::debug!(
"Starting replication of storage '{}' on keyexpr '{}'",
name,
config.key_expr,
);

ReplicationService::spawn_start(
zenoh_session,
storage_service,
&storage_service,
config.key_expr,
replication_log,
latest_updates,
rx_replication,
)
.await;
});
}
}

storage_service
.start_storage_queryable_subscriber(rx_storage)
.await;
});

Ok(tx)
}
15 changes: 8 additions & 7 deletions plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,12 @@ pub struct StorageService {
}

impl StorageService {
pub async fn start(
pub async fn new(
session: Arc<Session>,
config: StorageConfig,
name: &str,
storage: Arc<Mutex<Box<dyn zenoh_backend_traits::Storage>>>,
capability: Capability,
rx: Receiver<StorageMessage>,
cache_latest: CacheLatest,
) -> Self {
let storage_service = StorageService {
Expand Down Expand Up @@ -117,15 +116,11 @@ impl StorageService {
}
}
}
storage_service
.clone()
.start_storage_queryable_subscriber(rx)
.await;

storage_service
}

async fn start_storage_queryable_subscriber(self, mut rx: Receiver<StorageMessage>) {
pub(crate) async fn start_storage_queryable_subscriber(self, mut rx: Receiver<StorageMessage>) {
// start periodic GC event
let t = Timer::default();

Expand Down Expand Up @@ -173,6 +168,12 @@ impl StorageService {
}
};

tracing::debug!(
"Starting storage '{}' on keyexpr '{}'",
self.name,
storage_key_expr
);

tokio::task::spawn(async move {
loop {
tokio::select!(
Expand Down

0 comments on commit 9ab1130

Please sign in to comment.