forked from jerry73204/realsense-rust
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathframe_queue.rs
142 lines (124 loc) · 4.17 KB
/
frame_queue.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
//! Defines the queue type of frames.
use crate::{
base::DEFAULT_TIMEOUT,
common::*,
error::{Error as RsError, ErrorChecker, Result},
frame::{AnyFrame, Frame, GenericFrameEx},
frame_kind::FrameKind,
};
/// The queue of frames.
#[derive(Debug)]
pub struct FrameQueue {
pub(crate) ptr: NonNull<sys::rs2_frame_queue>,
}
impl FrameQueue {
/// Creates an instance with given capacity.
pub fn with_capacity(capacity: usize) -> Result<Self> {
let queue = unsafe {
let mut checker = ErrorChecker::new();
let ptr = sys::rs2_create_frame_queue(capacity as c_int, checker.inner_mut_ptr());
checker.check()?;
Self::from_raw(ptr)
};
Ok(queue)
}
/// Push a frame to the queue.
pub fn enqueue<Kind>(&mut self, frame: Frame<Kind>)
where
Kind: FrameKind,
{
unsafe {
sys::rs2_enqueue_frame(frame.ptr.as_ptr(), self.ptr.cast::<c_void>().as_ptr());
}
}
/// Pops a frame from queue.
///
/// The method blocks until a frame is available.
pub fn wait(&mut self, timeout: Option<Duration>) -> Result<AnyFrame> {
let timeout_ms = timeout.unwrap_or(DEFAULT_TIMEOUT).as_millis() as c_uint;
let frame = loop {
let mut checker = ErrorChecker::new();
let ptr = unsafe {
sys::rs2_wait_for_frame(self.ptr.as_ptr(), timeout_ms, checker.inner_mut_ptr())
};
match (timeout, checker.check()) {
(None, Err(RsError::Timeout(..))) => continue,
tuple => {
let (_, result) = tuple;
result?;
}
}
let frame = unsafe { Frame::from_raw(ptr) };
break frame;
};
Ok(frame)
}
/// Wait for frame asynchronously. It is analogous to [FrameQueue::wait]
pub async fn wait_async(&mut self, timeout: Option<Duration>) -> Result<AnyFrame> {
let timeout_ms = timeout
.map(|duration| duration.as_millis() as c_uint)
.unwrap_or(sys::RS2_DEFAULT_TIMEOUT as c_uint);
let (tx, rx) = futures::channel::oneshot::channel();
let queue_ptr = AtomicPtr::new(self.ptr.as_ptr());
thread::spawn(move || {
let result = unsafe {
loop {
let mut checker = ErrorChecker::new();
let ptr = sys::rs2_wait_for_frame(
queue_ptr.load(Ordering::Relaxed),
timeout_ms,
checker.inner_mut_ptr(),
);
let result = match (timeout, checker.check()) {
(None, Err(RsError::Timeout(..))) => continue,
(_, result) => result.map(|_| Frame::from_raw(ptr)),
};
break result;
}
};
let _ = tx.send(result);
});
let frame = rx.await.unwrap()?;
Ok(frame)
}
/// Try to pop a frame and returns immediately.
pub fn try_wait(&mut self) -> Result<Option<AnyFrame>> {
unsafe {
let mut checker = ErrorChecker::new();
let mut ptr: *mut sys::rs2_frame = ptr::null_mut();
let ret = sys::rs2_poll_for_frame(
self.ptr.as_ptr(),
&mut ptr as *mut _,
checker.inner_mut_ptr(),
);
checker.check()?;
if ret != 0 {
let frame = Frame::from_raw(ptr);
Ok(Some(frame))
} else {
Ok(None)
}
}
}
pub fn into_raw(self) -> *mut sys::rs2_frame_queue {
let ptr = self.ptr;
mem::forget(self);
ptr.as_ptr()
}
pub unsafe fn from_raw(ptr: *mut sys::rs2_frame_queue) -> Self {
Self {
ptr: NonNull::new(ptr).unwrap(),
}
}
pub(crate) unsafe fn unsafe_clone(&self) -> Self {
Self { ptr: self.ptr }
}
}
impl Drop for FrameQueue {
fn drop(&mut self) {
unsafe {
sys::rs2_delete_frame_queue(self.ptr.as_ptr());
}
}
}
unsafe impl Send for FrameQueue {}