Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use awkernel_sync for Mutex and RwLock #5

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
371 changes: 366 additions & 5 deletions Cargo.lock

Large diffs are not rendered by default.

7 changes: 5 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ mio-extras = "2.0.6"
rand = { version = "0.8", default-features = false, features = ["small_rng"] }
serde = { version = "1.0", default-features = false, features = ["derive"] }
socket2 = "0.5"
speedy = { git = "https://github.com/ytakano/speedy.git", default-features = false, features = [
speedy = { git = "https://github.com/koute/speedy", default-features = false, features = [
"speedy-derive",
"alloc",
] }
Expand All @@ -35,6 +35,7 @@ cdr = "0.2.4"
chrono = { version = "0.4", optional = true }
thiserror = "1.0"
serde_repr = "0.1.18"
awkernel_sync = { git = "https://github.com/tier4/awkernel_sync.git" }

[dev-dependencies]
clap = { version = "4.5", features = ["derive"] }
Expand All @@ -43,4 +44,6 @@ serde_derive = "1"

[features]
default = ["std"]
std = ["dep:chrono"]
std = ["dep:chrono", "awkernel_sync/std"]
awkernel_x86 = ["awkernel_sync/x86"]
awkernel_aarch64 = ["awkernel_sync/aarch64"]
6 changes: 3 additions & 3 deletions src/dds/datareader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ use crate::dds::{qos::DataReaderQosPolicies, subscriber::Subscriber, topic::Topi
use crate::discovery::structure::cdr::deserialize;
use crate::rtps::{cache::HistoryCache, reader::DataReaderStatusChanged};
use alloc::sync::Arc;
use awkernel_sync::rwlock::RwLock;
use core::marker::PhantomData;
use mio_extras::channel as mio_channel;
use mio_v06::{event::Evented, Poll, PollOpt, Ready, Token};
use serde::Deserialize;
use std::io;
use std::sync::RwLock;

/// DDS DataReader
pub struct DataReader<D: for<'de> Deserialize<'de>> {
Expand Down Expand Up @@ -47,7 +47,7 @@ impl<D: for<'de> Deserialize<'de>> DataReader<D> {
}

fn get_data(&self) -> Vec<D> {
let mut hc = self.rhc.write().expect("couldn't read ReaderHistoryCache");
let mut hc = self.rhc.write();
let changes = hc.get_alive_changes();
for c in changes.iter() {
hc.remove_change(c);
Expand All @@ -62,7 +62,7 @@ impl<D: for<'de> Deserialize<'de>> DataReader<D> {
v
}
fn _remove_changes(&self) {
let mut hc = self.rhc.write().expect("couldn't write ReaderHistoryCache");
let mut hc = self.rhc.write();
hc.remove_notalive_changes();
}
pub fn get_qos(&self) -> DataReaderQosPolicies {
Expand Down
136 changes: 40 additions & 96 deletions src/dds/participant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ use core::sync::atomic::{AtomicU32, Ordering};
use mio_extras::channel as mio_channel;
use mio_v06::net::UdpSocket;
use rand::rngs::SmallRng;
use std::sync::{Mutex, RwLock};
use std::thread::{self, Builder};

use awkernel_sync::{mcs::MCSNode, mutex::Mutex, rwlock::RwLock};

/// DDS DomainParticipant
///
/// factory for the Publisher, Subscriber and Topic.
Expand All @@ -43,10 +44,8 @@ pub struct DomainParticipant {

impl RTPSEntity for DomainParticipant {
fn guid(&self) -> GUID {
self.inner
.lock()
.expect("couldn't lock DomainParticipantDisc")
.guid()
let mut node = MCSNode::new();
self.inner.lock(&mut node).guid()
}
}

Expand Down Expand Up @@ -91,15 +90,15 @@ impl DomainParticipant {
dp
}
pub fn create_publisher(&self, qos: PublisherQos) -> Publisher {
let mut node = MCSNode::new();
self.inner
.lock()
.expect("couldn't lock DomainParticipantDisc")
.lock(&mut node)
.create_publisher(self.clone(), qos)
}
pub fn create_subscriber(&self, qos: SubscriberQos) -> Subscriber {
let mut node = MCSNode::new();
self.inner
.lock()
.expect("couldn't lock DomainParticipantDisc")
.lock(&mut node)
.create_subscriber(self.clone(), qos)
}
pub fn create_topic(
Expand All @@ -109,64 +108,46 @@ impl DomainParticipant {
kind: TopicKind,
qos: TopicQos,
) -> Topic {
let mut node = MCSNode::new();
self.inner
.lock()
.expect("couldn't lock DomainParticipantDisc")
.lock(&mut node)
.create_topic(self.clone(), name, type_desc, kind, qos)
}
pub fn domain_id(&self) -> u16 {
self.inner
.lock()
.expect("couldn't lock DomainParticipantDisc")
.domain_id()
let mut node = MCSNode::new();
self.inner.lock(&mut node).domain_id()
}
pub fn participant_id(&self) -> u16 {
self.inner
.lock()
.expect("couldn't lock DomainParticipantDisc")
.participant_id()
let mut node = MCSNode::new();
self.inner.lock(&mut node).participant_id()
}
pub(crate) fn gen_entity_key(&self) -> [u8; 3] {
self.inner
.lock()
.expect("couldn't lock DomainParticipantDisc")
.gen_entity_key()
let mut node = MCSNode::new();
self.inner.lock(&mut node).gen_entity_key()
}
pub fn get_default_publisher_qos(&self) -> PublisherQosPolicies {
self.inner
.lock()
.expect("couldn't lock DomainParticipantDisc")
.get_default_publisher_qos()
let mut node = MCSNode::new();
self.inner.lock(&mut node).get_default_publisher_qos()
}
pub fn set_default_publisher_qos(&mut self, qos: PublisherQosPolicies) {
self.inner
.lock()
.expect("couldn't lock DomainParticipantDisc")
.set_default_publisher_qos(qos);
let mut node = MCSNode::new();
self.inner.lock(&mut node).set_default_publisher_qos(qos);
}
pub fn get_default_subscriber_qos(&self) -> SubscriberQosPolicies {
self.inner
.lock()
.expect("couldn't lock DomainParticipantDisc")
.get_default_subscriber_qos()
let mut node = MCSNode::new();
self.inner.lock(&mut node).get_default_subscriber_qos()
}
pub fn set_default_subscriber_qos(&mut self, qos: SubscriberQosPolicies) {
self.inner
.lock()
.expect("couldn't lock DomainParticipantDisc")
.set_default_subscriber_qos(qos);
let mut node = MCSNode::new();
self.inner.lock(&mut node).set_default_subscriber_qos(qos);
}
pub fn get_default_topic_qos(&self) -> TopicQosPolicies {
self.inner
.lock()
.expect("couldn't lock DomainParticipantDisc")
.get_default_topic_qos()
let mut node = MCSNode::new();
self.inner.lock(&mut node).get_default_topic_qos()
}
pub fn set_default_topic_qos(&mut self, qos: TopicQosPolicies) {
self.inner
.lock()
.expect("couldn't lock DomainParticipantDisc")
.set_default_topic_qos(qos);
let mut node = MCSNode::new();
self.inner.lock(&mut node).set_default_topic_qos(qos);
}
}

Expand Down Expand Up @@ -198,16 +179,10 @@ impl DomainParticipantDisc {
}
}
pub fn create_publisher(&self, dp: DomainParticipant, qos: PublisherQos) -> Publisher {
self.inner
.read()
.expect("couldn't read lock DomainParticipantInnet")
.create_publisher(dp, qos)
self.inner.read().create_publisher(dp, qos)
}
pub fn create_subscriber(&self, dp: DomainParticipant, qos: SubscriberQos) -> Subscriber {
self.inner
.read()
.expect("couldn't read lock DomainParticipantInnet")
.create_subscriber(dp, qos)
self.inner.read().create_subscriber(dp, qos)
}
pub fn create_topic(
&self,
Expand All @@ -219,62 +194,34 @@ impl DomainParticipantDisc {
) -> Topic {
self.inner
.read()
.expect("couldn't read lock DomainParticipantInnet")
.create_topic(dp, name, type_desc, kind, qos)
}
pub fn domain_id(&self) -> u16 {
self.inner
.read()
.expect("couldn't read lock DomainParticipantInnet")
.domain_id
self.inner.read().domain_id
}
pub fn participant_id(&self) -> u16 {
self.inner
.read()
.expect("couldn't read lock DomainParticipantInnet")
.participant_id
self.inner.read().participant_id
}
pub fn gen_entity_key(&self) -> [u8; 3] {
self.inner
.read()
.expect("couldn't read lock DomainParticipantInnet")
.gen_entity_key()
self.inner.read().gen_entity_key()
}
pub fn get_default_publisher_qos(&self) -> PublisherQosPolicies {
self.inner
.read()
.expect("couldn't read lock DomainParticipantInnet")
.get_default_publisher_qos()
self.inner.read().get_default_publisher_qos()
}
pub fn set_default_publisher_qos(&mut self, qos: PublisherQosPolicies) {
self.inner
.write()
.expect("couldn't write lock DomainParticipantInnet")
.set_default_publisher_qos(qos);
self.inner.write().set_default_publisher_qos(qos);
}
pub fn get_default_subscriber_qos(&self) -> SubscriberQosPolicies {
self.inner
.read()
.expect("couldn't read lock DomainParticipantInnet")
.get_default_subscriber_qos()
self.inner.read().get_default_subscriber_qos()
}
pub fn set_default_subscriber_qos(&mut self, qos: SubscriberQosPolicies) {
self.inner
.write()
.expect("couldn't write lock DomainParticipantInnet")
.set_default_subscriber_qos(qos);
self.inner.write().set_default_subscriber_qos(qos);
}
pub fn get_default_topic_qos(&self) -> TopicQosPolicies {
self.inner
.read()
.expect("couldn't read lock DomainParticipantInnet")
.get_default_topic_qos()
self.inner.read().get_default_topic_qos()
}
pub fn set_default_topic_qos(&mut self, qos: TopicQosPolicies) {
self.inner
.write()
.expect("couldn't write lock DomainParticipantInnet")
.set_default_topic_qos(qos);
self.inner.write().set_default_topic_qos(qos);
}
}
impl Drop for DomainParticipantDisc {
Expand All @@ -287,10 +234,7 @@ impl Drop for DomainParticipantDisc {

impl RTPSEntity for DomainParticipantDisc {
fn guid(&self) -> GUID {
self.inner
.read()
.expect("couldn't read lock DomainParticipantInnet")
.my_guid
self.inner.read().my_guid
}
}

Expand Down
31 changes: 6 additions & 25 deletions src/dds/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use crate::network::net_util::{usertraffic_multicast_port, usertraffic_unicast_p
use crate::rtps::writer::{DataWriterStatusChanged, WriterCmd, WriterIngredients};
use crate::structure::{Duration, EntityId, EntityKind, RTPSEntity, GUID};
use alloc::sync::Arc;
use awkernel_sync::rwlock::RwLock;
use mio_extras::channel as mio_channel;
use std::sync::RwLock;

/// DDS Publisher
///
Expand Down Expand Up @@ -60,7 +60,6 @@ impl Publisher {
) -> DataWriter<D> {
self.inner
.read()
.expect("couldn't read lock InnerPublisher")
.create_datawriter(qos, topic, self.clone())
}

Expand All @@ -72,42 +71,24 @@ impl Publisher {
) -> DataWriter<D> {
self.inner
.read()
.expect("couldn't read lock InnerPublisher")
.create_datawriter_with_entityid(qos, topic, self.clone(), entity_id)
}

pub fn get_qos(&self) -> PublisherQosPolicies {
self.inner
.read()
.expect("couldn't read lock InnerPublisher")
.get_qos()
self.inner.read().get_qos()
}
pub fn set_qos(&mut self, qos: PublisherQosPolicies) {
self.inner
.write()
.expect("couldn't write lock InnerPublisher")
.set_qos(qos);
self.inner.write().set_qos(qos);
}

pub fn domain_participant(&self) -> DomainParticipant {
self.inner
.read()
.expect("couldn't read lock InnerPublisher")
.dp
.clone()
self.inner.read().dp.clone()
}
pub fn get_default_datawriter_qos(&self) -> DataWriterQosPolicies {
self.inner
.read()
.expect("couldn't read lock InnerPublisher")
.default_dw_qos
.clone()
self.inner.read().default_dw_qos.clone()
}
pub fn set_default_datawriter_qos(&mut self, qos: DataWriterQosPolicies) {
self.inner
.write()
.expect("couldn't write lock InnerPublisher")
.default_dw_qos = qos;
self.inner.write().default_dw_qos = qos;
}
}

Expand Down
Loading