Skip to content

Commit

Permalink
Merge pull request #5 from tier4/awkernel_sync2
Browse files Browse the repository at this point in the history
fix: use awkernel_sync for Mutex and RwLock
  • Loading branch information
tomiy-0x62 authored Dec 26, 2024
2 parents 60f360b + 1705167 commit 01d0115
Show file tree
Hide file tree
Showing 12 changed files with 441 additions and 186 deletions.
371 changes: 366 additions & 5 deletions Cargo.lock

Large diffs are not rendered by default.

7 changes: 5 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ mio-extras = "2.0.6"
rand = { version = "0.8", default-features = false, features = ["small_rng"] }
serde = { version = "1.0", default-features = false, features = ["derive"] }
socket2 = "0.5"
speedy = { git = "https://github.com/ytakano/speedy.git", default-features = false, features = [
speedy = { git = "https://github.com/koute/speedy", default-features = false, features = [
"speedy-derive",
"alloc",
] }
Expand All @@ -35,6 +35,7 @@ cdr = "0.2.4"
chrono = { version = "0.4", optional = true }
thiserror = "1.0"
serde_repr = "0.1.18"
awkernel_sync = { git = "https://github.com/tier4/awkernel_sync.git" }

[dev-dependencies]
clap = { version = "4.5", features = ["derive"] }
Expand All @@ -43,4 +44,6 @@ serde_derive = "1"

[features]
default = ["std"]
std = ["dep:chrono"]
std = ["dep:chrono", "awkernel_sync/std"]
awkernel_x86 = ["awkernel_sync/x86"]
awkernel_aarch64 = ["awkernel_sync/aarch64"]
6 changes: 3 additions & 3 deletions src/dds/datareader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ use crate::dds::{qos::DataReaderQosPolicies, subscriber::Subscriber, topic::Topi
use crate::discovery::structure::cdr::deserialize;
use crate::rtps::{cache::HistoryCache, reader::DataReaderStatusChanged};
use alloc::sync::Arc;
use awkernel_sync::rwlock::RwLock;
use core::marker::PhantomData;
use mio_extras::channel as mio_channel;
use mio_v06::{event::Evented, Poll, PollOpt, Ready, Token};
use serde::Deserialize;
use std::io;
use std::sync::RwLock;

/// DDS DataReader
pub struct DataReader<D: for<'de> Deserialize<'de>> {
Expand Down Expand Up @@ -47,7 +47,7 @@ impl<D: for<'de> Deserialize<'de>> DataReader<D> {
}

fn get_data(&self) -> Vec<D> {
let mut hc = self.rhc.write().expect("couldn't read ReaderHistoryCache");
let mut hc = self.rhc.write();
let changes = hc.get_alive_changes();
for c in changes.iter() {
hc.remove_change(c);
Expand All @@ -62,7 +62,7 @@ impl<D: for<'de> Deserialize<'de>> DataReader<D> {
v
}
fn _remove_changes(&self) {
let mut hc = self.rhc.write().expect("couldn't write ReaderHistoryCache");
let mut hc = self.rhc.write();
hc.remove_notalive_changes();
}
pub fn get_qos(&self) -> DataReaderQosPolicies {
Expand Down
136 changes: 40 additions & 96 deletions src/dds/participant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ use core::sync::atomic::{AtomicU32, Ordering};
use mio_extras::channel as mio_channel;
use mio_v06::net::UdpSocket;
use rand::rngs::SmallRng;
use std::sync::{Mutex, RwLock};
use std::thread::{self, Builder};

use awkernel_sync::{mcs::MCSNode, mutex::Mutex, rwlock::RwLock};

/// DDS DomainParticipant
///
/// factory for the Publisher, Subscriber and Topic.
Expand All @@ -43,10 +44,8 @@ pub struct DomainParticipant {

impl RTPSEntity for DomainParticipant {
fn guid(&self) -> GUID {
self.inner
.lock()
.expect("couldn't lock DomainParticipantDisc")
.guid()
let mut node = MCSNode::new();
self.inner.lock(&mut node).guid()
}
}

Expand Down Expand Up @@ -91,15 +90,15 @@ impl DomainParticipant {
dp
}
pub fn create_publisher(&self, qos: PublisherQos) -> Publisher {
let mut node = MCSNode::new();
self.inner
.lock()
.expect("couldn't lock DomainParticipantDisc")
.lock(&mut node)
.create_publisher(self.clone(), qos)
}
pub fn create_subscriber(&self, qos: SubscriberQos) -> Subscriber {
let mut node = MCSNode::new();
self.inner
.lock()
.expect("couldn't lock DomainParticipantDisc")
.lock(&mut node)
.create_subscriber(self.clone(), qos)
}
pub fn create_topic(
Expand All @@ -109,64 +108,46 @@ impl DomainParticipant {
kind: TopicKind,
qos: TopicQos,
) -> Topic {
let mut node = MCSNode::new();
self.inner
.lock()
.expect("couldn't lock DomainParticipantDisc")
.lock(&mut node)
.create_topic(self.clone(), name, type_desc, kind, qos)
}
pub fn domain_id(&self) -> u16 {
self.inner
.lock()
.expect("couldn't lock DomainParticipantDisc")
.domain_id()
let mut node = MCSNode::new();
self.inner.lock(&mut node).domain_id()
}
pub fn participant_id(&self) -> u16 {
self.inner
.lock()
.expect("couldn't lock DomainParticipantDisc")
.participant_id()
let mut node = MCSNode::new();
self.inner.lock(&mut node).participant_id()
}
pub(crate) fn gen_entity_key(&self) -> [u8; 3] {
self.inner
.lock()
.expect("couldn't lock DomainParticipantDisc")
.gen_entity_key()
let mut node = MCSNode::new();
self.inner.lock(&mut node).gen_entity_key()
}
pub fn get_default_publisher_qos(&self) -> PublisherQosPolicies {
self.inner
.lock()
.expect("couldn't lock DomainParticipantDisc")
.get_default_publisher_qos()
let mut node = MCSNode::new();
self.inner.lock(&mut node).get_default_publisher_qos()
}
pub fn set_default_publisher_qos(&mut self, qos: PublisherQosPolicies) {
self.inner
.lock()
.expect("couldn't lock DomainParticipantDisc")
.set_default_publisher_qos(qos);
let mut node = MCSNode::new();
self.inner.lock(&mut node).set_default_publisher_qos(qos);
}
pub fn get_default_subscriber_qos(&self) -> SubscriberQosPolicies {
self.inner
.lock()
.expect("couldn't lock DomainParticipantDisc")
.get_default_subscriber_qos()
let mut node = MCSNode::new();
self.inner.lock(&mut node).get_default_subscriber_qos()
}
pub fn set_default_subscriber_qos(&mut self, qos: SubscriberQosPolicies) {
self.inner
.lock()
.expect("couldn't lock DomainParticipantDisc")
.set_default_subscriber_qos(qos);
let mut node = MCSNode::new();
self.inner.lock(&mut node).set_default_subscriber_qos(qos);
}
pub fn get_default_topic_qos(&self) -> TopicQosPolicies {
self.inner
.lock()
.expect("couldn't lock DomainParticipantDisc")
.get_default_topic_qos()
let mut node = MCSNode::new();
self.inner.lock(&mut node).get_default_topic_qos()
}
pub fn set_default_topic_qos(&mut self, qos: TopicQosPolicies) {
self.inner
.lock()
.expect("couldn't lock DomainParticipantDisc")
.set_default_topic_qos(qos);
let mut node = MCSNode::new();
self.inner.lock(&mut node).set_default_topic_qos(qos);
}
}

Expand Down Expand Up @@ -198,16 +179,10 @@ impl DomainParticipantDisc {
}
}
pub fn create_publisher(&self, dp: DomainParticipant, qos: PublisherQos) -> Publisher {
self.inner
.read()
.expect("couldn't read lock DomainParticipantInnet")
.create_publisher(dp, qos)
self.inner.read().create_publisher(dp, qos)
}
pub fn create_subscriber(&self, dp: DomainParticipant, qos: SubscriberQos) -> Subscriber {
self.inner
.read()
.expect("couldn't read lock DomainParticipantInnet")
.create_subscriber(dp, qos)
self.inner.read().create_subscriber(dp, qos)
}
pub fn create_topic(
&self,
Expand All @@ -219,62 +194,34 @@ impl DomainParticipantDisc {
) -> Topic {
self.inner
.read()
.expect("couldn't read lock DomainParticipantInnet")
.create_topic(dp, name, type_desc, kind, qos)
}
pub fn domain_id(&self) -> u16 {
self.inner
.read()
.expect("couldn't read lock DomainParticipantInnet")
.domain_id
self.inner.read().domain_id
}
pub fn participant_id(&self) -> u16 {
self.inner
.read()
.expect("couldn't read lock DomainParticipantInnet")
.participant_id
self.inner.read().participant_id
}
pub fn gen_entity_key(&self) -> [u8; 3] {
self.inner
.read()
.expect("couldn't read lock DomainParticipantInnet")
.gen_entity_key()
self.inner.read().gen_entity_key()
}
pub fn get_default_publisher_qos(&self) -> PublisherQosPolicies {
self.inner
.read()
.expect("couldn't read lock DomainParticipantInnet")
.get_default_publisher_qos()
self.inner.read().get_default_publisher_qos()
}
pub fn set_default_publisher_qos(&mut self, qos: PublisherQosPolicies) {
self.inner
.write()
.expect("couldn't write lock DomainParticipantInnet")
.set_default_publisher_qos(qos);
self.inner.write().set_default_publisher_qos(qos);
}
pub fn get_default_subscriber_qos(&self) -> SubscriberQosPolicies {
self.inner
.read()
.expect("couldn't read lock DomainParticipantInnet")
.get_default_subscriber_qos()
self.inner.read().get_default_subscriber_qos()
}
pub fn set_default_subscriber_qos(&mut self, qos: SubscriberQosPolicies) {
self.inner
.write()
.expect("couldn't write lock DomainParticipantInnet")
.set_default_subscriber_qos(qos);
self.inner.write().set_default_subscriber_qos(qos);
}
pub fn get_default_topic_qos(&self) -> TopicQosPolicies {
self.inner
.read()
.expect("couldn't read lock DomainParticipantInnet")
.get_default_topic_qos()
self.inner.read().get_default_topic_qos()
}
pub fn set_default_topic_qos(&mut self, qos: TopicQosPolicies) {
self.inner
.write()
.expect("couldn't write lock DomainParticipantInnet")
.set_default_topic_qos(qos);
self.inner.write().set_default_topic_qos(qos);
}
}
impl Drop for DomainParticipantDisc {
Expand All @@ -287,10 +234,7 @@ impl Drop for DomainParticipantDisc {

impl RTPSEntity for DomainParticipantDisc {
fn guid(&self) -> GUID {
self.inner
.read()
.expect("couldn't read lock DomainParticipantInnet")
.my_guid
self.inner.read().my_guid
}
}

Expand Down
31 changes: 6 additions & 25 deletions src/dds/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use crate::network::net_util::{usertraffic_multicast_port, usertraffic_unicast_p
use crate::rtps::writer::{DataWriterStatusChanged, WriterCmd, WriterIngredients};
use crate::structure::{Duration, EntityId, EntityKind, RTPSEntity, GUID};
use alloc::sync::Arc;
use awkernel_sync::rwlock::RwLock;
use mio_extras::channel as mio_channel;
use std::sync::RwLock;

/// DDS Publisher
///
Expand Down Expand Up @@ -60,7 +60,6 @@ impl Publisher {
) -> DataWriter<D> {
self.inner
.read()
.expect("couldn't read lock InnerPublisher")
.create_datawriter(qos, topic, self.clone())
}

Expand All @@ -72,42 +71,24 @@ impl Publisher {
) -> DataWriter<D> {
self.inner
.read()
.expect("couldn't read lock InnerPublisher")
.create_datawriter_with_entityid(qos, topic, self.clone(), entity_id)
}

pub fn get_qos(&self) -> PublisherQosPolicies {
self.inner
.read()
.expect("couldn't read lock InnerPublisher")
.get_qos()
self.inner.read().get_qos()
}
pub fn set_qos(&mut self, qos: PublisherQosPolicies) {
self.inner
.write()
.expect("couldn't write lock InnerPublisher")
.set_qos(qos);
self.inner.write().set_qos(qos);
}

pub fn domain_participant(&self) -> DomainParticipant {
self.inner
.read()
.expect("couldn't read lock InnerPublisher")
.dp
.clone()
self.inner.read().dp.clone()
}
pub fn get_default_datawriter_qos(&self) -> DataWriterQosPolicies {
self.inner
.read()
.expect("couldn't read lock InnerPublisher")
.default_dw_qos
.clone()
self.inner.read().default_dw_qos.clone()
}
pub fn set_default_datawriter_qos(&mut self, qos: DataWriterQosPolicies) {
self.inner
.write()
.expect("couldn't write lock InnerPublisher")
.default_dw_qos = qos;
self.inner.write().default_dw_qos = qos;
}
}

Expand Down
Loading

0 comments on commit 01d0115

Please sign in to comment.