From 9267d120d8634362a771287032190026abebba73 Mon Sep 17 00:00:00 2001 From: Jack Kelly Date: Tue, 30 Jan 2024 17:34:36 +0000 Subject: [PATCH] About to have separate locks for each field. Might have forgotten to git add the appropriate files earlier, sorry! #30 --- src/io_uring_thread.rs | 12 ++++++++ src/operation_future.rs | 67 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 79 insertions(+) create mode 100644 src/io_uring_thread.rs create mode 100644 src/operation_future.rs diff --git a/src/io_uring_thread.rs b/src/io_uring_thread.rs new file mode 100644 index 0000000..8094882 --- /dev/null +++ b/src/io_uring_thread.rs @@ -0,0 +1,12 @@ +use std::{ + sync::{mpsc::Sender, Arc, RwLock}, + thread::JoinHandle, +}; + +use crate::operation_future::SharedState; + +#[derive(Debug)] +pub(crate) struct WorkerThread { + pub(crate) handle: JoinHandle<()>, + pub(crate) sender: Sender>>, // Channel to send ops to the worker thread +} diff --git a/src/operation_future.rs b/src/operation_future.rs new file mode 100644 index 0000000..0c841ee --- /dev/null +++ b/src/operation_future.rs @@ -0,0 +1,67 @@ +use std::{ + future::Future, + pin::Pin, + sync::{Arc, RwLock}, + task::{Context, Poll, Waker}, +}; + +use bytes::Bytes; +use object_store::{path::Path, Result}; + +// One enum variant per `ObjectStore` method. +#[derive(Debug)] +pub(crate) enum Operation { + Get { location: Path }, +} + +#[derive(Debug)] +pub(crate) struct OperationFuture { + pub(crate) shared_state: Arc>, +} + +/// Shared state between the future and the waiting thread. Adapted from: +/// https://rust-lang.github.io/async-book/02_execution/03_wakeups.html#applied-build-a-timer +#[derive(Debug)] +pub(crate) struct SharedState { + result: Option>, + waker: Option, + operation: Operation, +} + +/// Adapted from: +/// https://rust-lang.github.io/async-book/02_execution/03_wakeups.html#applied-build-a-timer +impl Future for OperationFuture { + type Output = Result; + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut shared_state = self.shared_state.write().unwrap(); + if shared_state.result.is_some() { + Poll::Ready(shared_state.result.take().unwrap()) + } else { + shared_state.waker = Some(cx.waker().clone()); + Poll::Pending + } + } +} + +/// Adapted from: +/// https://rust-lang.github.io/async-book/02_execution/03_wakeups.html#applied-build-a-timer +impl OperationFuture { + pub fn new(operation: Operation) -> Self { + let shared_state = Arc::new(RwLock::new(SharedState { + result: None, + waker: None, + operation, + })); + + Self { shared_state } + } +} +impl SharedState { + pub fn set_result_and_wake(&mut self, result: Result) { + let mut shared_state = self.shared_state.write().unwrap(); + shared_state.result = Some(result); + if let Some(waker) = shared_state.waker.take() { + waker.wake() + } + } +}