Skip to content

Commit

Permalink
SHM subsystem: Rust (#823)
Browse files Browse the repository at this point in the history
* add watchdog codebase

* integrated and tested POC

* Update shm.rs

* WIP: implemented buffer headers in separate SHM segment, made buffer generation support

* - refactored POSIX shm wrapper
- generation works

* - use posix_shm module for data segment in SharedMemoryManager
- use numeric ID instead of string ID for SHM buffer identification - this feature speeds up segment lookups and reduces wire overhead
- remove unnecessary fields from SharedMemoryManager
- fix clippy warnings
- added comments

* WIP on SHM

* Fix clippy, additional improvements

* Implement watchdog periodic tasks with optional realtime scheduling

* [skip ci] WIP on SHM provider\client API

* [skip ci] WIP on SHM API

* [skip ci] Big WIP on SHM API integration

* [skip ci] working SHM (tests passing)

* [skip ci] WIP on API

* [skip ci] WIP on SHM API

* WIP on API

* - added SharedMemoryReader to Session and Runtime building API
- support ProtocolID exchange in establish
- convert buffer based on supported protocol ids

* [skip ci] correct shm establish logic

* Remove SharedMemoryFactory

* [skip ci]
- WIP to brush-up the API, eliminate some API flaws
- SHM provider is now conceptually made thread-safe for better flexibility

* [skip ci] Final updates to SHM provider API

* ZSlice: safe mutable acces to inner contents and support for copy-on-write and it's elision as an optimization for SHM buffers

* [skip ci]
- added ZSliceMut and it's functionality
- documented public SHM APIs
- brush-up for SHM API
- hide SharedMemoryBuf from API and return ZSlice instead
- add is_valid() for ZSlice that checks generation counter on SHM

* [skip ci]
- ZSliceMut API changed
- ZSliceBuffer: different API for shared-memory feature
- Hide unnecessary APIs from pub in SharedMemoryReader
- Fix default alignment calculation for AllocAlignment
- Expose necessary SHM API in zenoh crate
- Brush-up examples
- Fix shmbuf to rawbuf conversion

* [skip ci]
- ignore some tests because they need to be run globally sequentially
- transit shared-memory feature to zenoh-buffers in zenoh-transport

* Solved additive feature problem

* - remove dependency in zenoh-buffers
- make periodic_task compile on win

* fix tests

* - refine buld system to optimize workspace dependencies
- fix posix shm segment size estimation

* Update shm.rs

* - replace async-std with tokio in zenoh-shm

* fix examples

* ooops

* ignore test with too long SHM segment id for MacOS

* lower test complexity to adopt runner capabilities

* ignore test with too long SHM segment id for MacOS

* - use md5 hashes to shorten SHM segments OS ids (need for MacOS)
- enable 128 bit ids in tests

* use crc instead of md5

* - get rid of allocator-api2 as work for Allocator is postponed for a while
- allow shared-memory feature for macOS and Win in CI (without unixpipe transport)
- move deps into workspace
- add documentation

* move from 'const ID: ProtocolID' to trait interface to support both static (Rust API) and dynamic (other languages API) protocol ID setting

* support compile-time and runtime ProtocolID setting

* Add more tests to dedicated execution

* [skip ci] add more *concurrent tests to dedicated execution

* - more SHM API docs
- document all SHM API as "unstable"
- hide all SHM API behind "unstable" feature
- some API brush-up
- improve CI for SHM

* exclude test_default_features for SHM tests

* Move test_helpers.rs into tests to follow the guideline of integration tests.

* update doc

* Eliminate zenoh-buffers -> zenoh-shm dependency to illustarte the problem

* fix: Add disabled by default `shared-memory` feature in zenoh-shm

* [skip ci] client storage interface fix

* [skip ci] fix map method interface for SharedMemoryProvider

* brush-up some things after merge

* PR review fixes

* [skip ci] add shm feature to zenoh-ext

* [skip ci] oops

* [skip ci] add shared-memory feature traversing for zenoh-shm dependants

* rename ZSliceShm to ZSliceShmMut

* - fix build
- support SharedMemoryClient sharing

* [skip ci]
- support SharedMemoryClient sharing
- add way to build SharedMemoryClientStorage from slice of clients

* [skip ci]
- add way to build SharedMemoryClientStorage from slice of clients AND default client set

* - remove ZSliceMut API

* - stabby fixed for 1.72
- build with zenoh's mainline toolchain 1.72

* fix doctests

* - ZSliceShm

* Support ZSliceShm and ZSliceShmMut in Payload

* - optimize ZSlicBuffer trait
- add full ZSliceShm support to Payload
- add partial ZSliceShmMut support to Payload
- remove a lot of unnecessary code

* fix code format errors

* - SHM buffer API evolution
- Payload API for SHM buffers

* [skip ci] polish Payload API for SHM

* move SHM Buffer API to separate "slice"module

* Improve SHM Buffer API concept

* Update payload.rs test

* fixes after merge

* build fixes

* - fix recursion error for in SHM buffer API
- add som docs
- fix Payload test causing stack overflow :)

* - implement trait API for SHM buffers
- extend SHM buf Payload API test
- add missing DerefMut to zsliceshmmut

* Fix merge

* Rework Serialize trait

* Impl &mut T for Serialize/Deserialize. Fix valgrind CI.

* Update commons/zenoh-shm/src/lib.rs

* Revert wrong change on log

* Update zenoh/src/bytes.rs

* Fix use

* Fix use

* Review fixes

* fix recursive call

* Update commons/zenoh-shm/src/api/provider/types.rs

* unstable for shm slice traits

* Add more #[zenoh_macros::unstable_doc]

* SHM establishment reorg

* add missing ztimeout! in tests

---------

Co-authored-by: yuanyuyuan <[email protected]>
Co-authored-by: Mahmoud Mazouz <[email protected]>
Co-authored-by: Luca Cominardi <[email protected]>
  • Loading branch information
4 people authored Apr 19, 2024
1 parent 43d280f commit 3711d45
Show file tree
Hide file tree
Showing 128 changed files with 7,727 additions and 1,394 deletions.
8 changes: 7 additions & 1 deletion .config/nextest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,15 @@ slow-timeout = { period = "60s", terminate-after = 2 }
filter = """
test(=zenoh_session_unicast) |
test(=zenoh_session_multicast) |
test(=zenoh_unicity_p2p) |
test(=zenoh_unicity_brokered) |
test(=transport_tcp_intermittent) |
test(=transport_tcp_intermittent_for_lowlatency_transport) |
test(=three_node_combination)
test(=three_node_combination) |
test(=watchdog_alloc_concurrent) |
test(=header_check_memory_concurrent) |
test(=header_link_concurrent) |
test(=header_link_failure_concurrent)
"""
threads-required = 'num-cpus'
slow-timeout = { period = "60s", terminate-after = 6 }
9 changes: 8 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ jobs:
- name: Clippy unstable targets
run: cargo +stable clippy --all-targets --features unstable -- --deny warnings

- name: Clippy shared memory without unstable
run: cargo +stable clippy --all-targets --features shared-memory -- --deny warnings

- name: Clippy all features
if: ${{ matrix.os == 'ubuntu-latest' || matrix.os == 'macOS-latest' }}
run: cargo +stable clippy --all-targets --all-features -- --deny warnings
Expand Down Expand Up @@ -92,8 +95,12 @@ jobs:
run: cargo nextest run --exclude zenoh-examples --exclude zenoh-plugin-example --workspace

- name: Run tests with SHM
if: ${{ matrix.os == 'macOS-latest' || matrix.os == 'windows-latest' }}
run: cargo nextest run -F shared-memory -F unstable -E 'not (test(test_default_features))' --exclude zenoh-examples --exclude zenoh-plugin-example --workspace

- name: Run tests with SHM + unixpipe
if: ${{ matrix.os == 'ubuntu-latest' }}
run: cargo nextest run -F shared-memory -F transport_unixpipe -p zenoh-transport
run: cargo nextest run -F shared-memory -F unstable -F transport_unixpipe -E 'not (test(test_default_features))' --exclude zenoh-examples --exclude zenoh-plugin-example --workspace

- name: Check for feature leaks
if: ${{ matrix.os == 'ubuntu-latest' }}
Expand Down
132 changes: 129 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,11 @@ lazy_static = "1.4.0"
libc = "0.2.139"
libloading = "0.8"
tracing = "0.1"
lockfree = "0.5"
lz4_flex = "0.11"
nix = { version = "0.27", features = ["fs"] }
num_cpus = "1.16.0"
num-traits = { version = "0.2.17", default-features = false }
ordered-float = "4.1.1"
panic-message = "0.3.0"
paste = "1.0.12"
Expand Down Expand Up @@ -146,6 +148,7 @@ serde_cbor = "0.11.2"
serde_json = "1.0.114"
serde-pickle = "1.1.1"
serde_yaml = "0.9.19"
stabby = "4.0.5"
sha3 = "0.10.6"
shared_memory = "0.12.4"
shellexpand = "3.0.0"
Expand All @@ -159,6 +162,7 @@ tokio-util = "0.7.10"
tokio-tungstenite = "0.21"
tokio-rustls = "0.25.0"
# tokio-vsock = see: io/zenoh-links/zenoh-link-vsock/Cargo.toml (workspaces does not support platform dependent dependencies)
thread-priority = "0.15"
console-subscriber = "0.2"
typenum = "1.16.0"
uhlc = { version = "0.7.0", default-features = false } # Default features are disabled due to usage in no_std crates
Expand Down
9 changes: 3 additions & 6 deletions ci/valgrind-check/src/queryable_get/bin/z_queryable_get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,19 @@ async fn main() {

let _z = zenoh_runtime::ZRuntimePoolGuard;

let queryable_key_expr = KeyExpr::try_from("test/valgrind/data").unwrap();
let queryable_key_expr = keyexpr::new("test/valgrind/data").unwrap();
let get_selector = Selector::try_from("test/valgrind/**").unwrap();

println!("Declaring Queryable on '{queryable_key_expr}'...");
let queryable_session = zenoh::open(Config::default()).res().await.unwrap();
let _queryable = queryable_session
.declare_queryable(&queryable_key_expr.clone())
.declare_queryable(queryable_key_expr)
.callback(move |query| {
println!(">> Handling query '{}'", query.selector());
let queryable_key_expr = queryable_key_expr.clone();
zenoh_runtime::ZRuntime::Application.block_in_place(async move {
query
.reply(
queryable_key_expr,
query.value().unwrap().payload().clone(),
)
.reply(queryable_key_expr, query.value().unwrap().payload().clone())
.res()
.await
.unwrap();
Expand Down
Loading

0 comments on commit 3711d45

Please sign in to comment.