Skip to content

Commit

Permalink
feat: topic secondary index (#19)
Browse files Browse the repository at this point in the history
- Added secondary index partition for topics
- Write to index in `Store::append` as well, using write batch
- Index key uses topic + 0xFF + frame ID as encoding
- Removed mut condition from `Store::append`
- Use topic index in `Store::head`
- Added basic `head` test
- Change `Store::head` to only need topic as &str (no heap allocation)
  • Loading branch information
marvin-j97 authored Oct 28, 2024
1 parent 93e791d commit 691902a
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 50 deletions.
4 changes: 2 additions & 2 deletions src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ async fn handle(
handle_pipe_post(&mut store, engine, pool.clone(), id, req.into_body()).await
}

Routes::HeadGet(topic) => response_frame_or_404(store.head(topic)),
Routes::HeadGet(topic) => response_frame_or_404(store.head(&topic)),

Routes::NotFound => response_404(),
}
Expand Down Expand Up @@ -309,7 +309,7 @@ async fn handle_pipe_post(
}

pub async fn serve(
mut store: Store,
store: Store,
engine: nu::Engine,
pool: ThreadPool,
expose: Option<String>,
Expand Down
22 changes: 11 additions & 11 deletions src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,15 @@ impl HandlerTask {
}

async fn spawn(
mut store: Store,
store: Store,
handler: HandlerTask,
pool: ThreadPool,
) -> Result<tokio::sync::mpsc::Sender<bool>, Error> {
let (tx_command, _rx_command) = tokio::sync::mpsc::channel(1);

let last_id: Option<Scru128Id> = if let Some(start) = handler.meta.start.as_ref() {
match start {
StartDefinition::Head { head } => store.head(head.to_string()).map(|frame| frame.id),
StartDefinition::Head { head } => store.head(head).map(|frame| frame.id),
}
} else {
None
Expand All @@ -110,7 +110,7 @@ async fn spawn(
let mut recver = store.read(options).await;

{
let mut store = store.clone();
let store = store.clone();
let mut handler = handler.clone();
let mut generated_frames = std::collections::HashSet::new();

Expand Down Expand Up @@ -139,9 +139,9 @@ async fn spawn(

let value = execute_and_get_result(&pool, handler.clone(), frame.clone()).await;
if handler.meta.stateful.unwrap_or(false) {
handle_result_stateful(&mut store, &mut handler, &frame, value).await;
handle_result_stateful(&store, &mut handler, &frame, value).await;
} else if let Some(frame_id) =
handle_result_stateless(&mut store, &handler, &frame, value).await
handle_result_stateless(&store, &handler, &frame, value).await
{
generated_frames.insert(frame_id);
}
Expand Down Expand Up @@ -208,7 +208,7 @@ fn execute_handler(handler: HandlerTask, frame: &Frame) -> Result<Value, Error>
}

async fn handle_result_stateful(
store: &mut Store,
store: &Store,
handler: &mut HandlerTask,
frame: &Frame,
value: Value,
Expand Down Expand Up @@ -242,7 +242,7 @@ async fn handle_result_stateful(
}

async fn handle_result_stateless(
store: &mut Store,
store: &Store,
handler: &HandlerTask,
frame: &Frame,
value: Value,
Expand Down Expand Up @@ -330,7 +330,7 @@ mod tests {
#[tokio::test]
async fn test_serve_stateless() {
let temp_dir = TempDir::new().unwrap();
let mut store = Store::new(temp_dir.into_path()).await;
let store = Store::new(temp_dir.into_path()).await;
let pool = ThreadPool::new(4);
let engine = nu::Engine::new(store.clone()).unwrap();

Expand Down Expand Up @@ -397,7 +397,7 @@ mod tests {
#[tokio::test]
async fn test_serve_stateful() {
let temp_dir = TempDir::new().unwrap();
let mut store = Store::new(temp_dir.into_path()).await;
let store = Store::new(temp_dir.into_path()).await;
let pool = ThreadPool::new(4);
let engine = nu::Engine::new(store.clone()).unwrap();

Expand Down Expand Up @@ -478,7 +478,7 @@ mod tests {
#[tokio::test]
async fn test_handler_update() {
let temp_dir = TempDir::new().unwrap();
let mut store = Store::new(temp_dir.into_path()).await;
let store = Store::new(temp_dir.into_path()).await;
let pool = ThreadPool::new(4);
let engine = nu::Engine::new(store.clone()).unwrap();

Expand Down Expand Up @@ -604,7 +604,7 @@ mod tests {
// This test is to ensure that a handler does not process its own output
async fn test_handler_stateless_no_self_loop() {
let temp_dir = TempDir::new().unwrap();
let mut store = Store::new(temp_dir.into_path()).await;
let store = Store::new(temp_dir.into_path()).await;
let pool = ThreadPool::new(4);
let engine = nu::Engine::new(store.clone()).unwrap();

Expand Down
4 changes: 2 additions & 2 deletions src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type BoxError = Box<dyn std::error::Error + Send + Sync>;
type HTTPResult = Result<hyper::Response<BoxBody<Bytes, BoxError>>, BoxError>;

async fn handle(
mut store: Store,
store: Store,
req: hyper::Request<hyper::body::Incoming>,
addr: Option<SocketAddr>,
connection_id: Scru128Id,
Expand Down Expand Up @@ -234,7 +234,7 @@ pub async fn serve(
let mut streams = active_streams.lock().await;
if let Some(requests) = streams.untrack_connection(&connection_id) {
for request_id in requests {
let mut store = store.clone();
let store = store.clone();
let _ = store
.append(
Frame::with_topic("http.disconnect")
Expand Down
2 changes: 1 addition & 1 deletion src/nu/commands/append_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl Command for AppendCommand {
) -> Result<PipelineData, ShellError> {
let span = call.head;

let mut store = self.store.clone();
let store = self.store.clone();

let topic: String = call.req(engine_state, stack, 0)?;
let meta: Option<Value> = call.get_flag(engine_state, stack, "meta")?;
Expand Down
122 changes: 93 additions & 29 deletions src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use serde::{Deserialize, Deserializer, Serialize};

use fjall::{Config, Keyspace, PartitionCreateOptions, PartitionHandle};

#[derive(PartialEq, Serialize, Deserialize, Clone, Default, bon::Builder)]
#[derive(PartialEq, Eq, Serialize, Deserialize, Clone, Default, bon::Builder)]
#[builder(start_fn = with_topic)]
pub struct Frame {
#[builder(start_fn, into)]
Expand All @@ -38,7 +38,7 @@ impl fmt::Debug for Frame {
}
}

#[derive(Default, PartialEq, Clone, Debug, Deserialize, Serialize)]
#[derive(Default, PartialEq, Eq, Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum TTL {
#[default]
Expand Down Expand Up @@ -141,8 +141,11 @@ pub enum FollowOption {
#[derive(Clone)]
pub struct Store {
pub path: PathBuf,

keyspace: Keyspace,
pub partition: PartitionHandle,
frame_partition: PartitionHandle,
topic_index: PartitionHandle,

broadcast_tx: broadcast::Sender<Frame>,
}

Expand All @@ -151,16 +154,23 @@ impl Store {
let config = Config::new(path.join("fjall"));
let keyspace = config.open().unwrap();

let partition = keyspace
let frame_partition = keyspace
.open_partition("stream", PartitionCreateOptions::default())
.unwrap();

let topic_index = keyspace
.open_partition("idx_topic", PartitionCreateOptions::default())
.unwrap();

let (broadcast_tx, _) = broadcast::channel(1024);

Store {
path,

keyspace,
partition,
frame_partition,
topic_index,

broadcast_tx,
}
}
Expand Down Expand Up @@ -188,7 +198,7 @@ impl Store {

// Clone these for the background thread
let tx_clone = tx.clone();
let partition = self.partition.clone();
let partition = self.frame_partition.clone();
let options_clone = options.clone();
let should_follow_clone = should_follow;

Expand Down Expand Up @@ -314,26 +324,50 @@ impl Store {
}

pub fn get(&self, id: &Scru128Id) -> Option<Frame> {
let res = self.partition.get(id.to_bytes()).unwrap();
let res = self.frame_partition.get(id.to_bytes()).unwrap();
res.map(|value| serde_json::from_slice(&value).unwrap())
}

pub fn head(&self, topic: String) -> Option<Frame> {
// Iterate over the partition in reverse order
let range: (Bound<Vec<u8>>, Bound<Vec<u8>>) = (Bound::Unbounded, Bound::Unbounded);
self.partition.range(range).rev().find_map(|record| {
let (_, value) = record.ok()?;
let frame: Frame = serde_json::from_slice(&value).ok()?;
if frame.topic == topic {
Some(frame)
} else {
None
}
})
pub fn head(&self, topic: &str) -> Option<Frame> {
let mut prefix = Vec::with_capacity(topic.len() + 1);
prefix.extend(topic.as_bytes());
prefix.push(0xFF);

for kv in self.topic_index.prefix(prefix).rev() {
let (k, _) = kv.unwrap();
let frame_id = k.split(|&c| c == 0xFF).nth(1).unwrap();

// Join back to "primary index"
if let Some(value) = self.frame_partition.get(frame_id).unwrap() {
let frame: Frame = serde_json::from_slice(&value).unwrap();
return Some(frame);
};
}

None
}

/// Formats a key for the topic secondary index
fn topic_index_key(frame: &Frame) -> Vec<u8> {
// We use a 0xFF as delimiter, because
// 0xFF cannot appear in a valid UTF-8 sequence
let mut v = Vec::with_capacity(frame.id.as_bytes().len() + 1 + frame.topic.len());
v.extend(frame.topic.as_bytes());
v.push(0xFF);
v.extend(frame.id.as_bytes());
v
}

pub fn remove(&self, id: &Scru128Id) -> Result<(), fjall::Error> {
self.partition.remove(id.to_bytes())
let Some(frame) = self.get(id) else {
// Already deleted
return Ok(());
};

let mut batch = self.keyspace.batch();
batch.remove(&self.frame_partition, id.to_bytes());
batch.remove(&self.topic_index, Self::topic_index_key(&frame));
batch.commit()
}

pub async fn cas_reader(&self, hash: ssri::Integrity) -> cacache::Result<cacache::Reader> {
Expand All @@ -354,14 +388,18 @@ impl Store {
cacache::read_hash(&self.path.join("cacache"), hash).await
}

pub async fn append(&mut self, frame: Frame) -> Frame {
pub async fn append(&self, frame: Frame) -> Frame {
let mut frame = frame;
frame.id = scru128::new();

// only store the frame if it's not ephemeral
if frame.ttl != Some(TTL::Ephemeral) {
let encoded: Vec<u8> = serde_json::to_vec(&frame).unwrap();
self.partition.insert(frame.id.to_bytes(), encoded).unwrap();

let mut batch = self.keyspace.batch();
batch.insert(&self.frame_partition, frame.id.to_bytes(), encoded);
batch.insert(&self.topic_index, Self::topic_index_key(&frame), b"");
batch.commit().unwrap();
self.keyspace.persist(fjall::PersistMode::SyncAll).unwrap();
}

Expand Down Expand Up @@ -409,6 +447,32 @@ mod tests_read_options {
expected: ReadOptions,
}

#[tokio::test]
async fn test_topic_index() -> Result<(), crate::error::Error> {
let folder = tempfile::tempdir()?;

let store = Store::new(folder.path().to_path_buf()).await;

let frame1 = Frame {
id: scru128::new(),
topic: "hello".to_owned(),
..Default::default()
};
let frame1 = store.append(frame1).await;

let frame2 = Frame {
id: scru128::new(),
topic: "hallo".to_owned(),
..Default::default()
};
let frame2 = store.append(frame2).await;

assert_eq!(Some(frame1), store.head("hello"));
assert_eq!(Some(frame2), store.head("hallo"));

Ok(())
}

#[test]
fn test_from_query() {
let test_cases = [
Expand Down Expand Up @@ -474,7 +538,7 @@ mod tests_store {
#[tokio::test]
async fn test_get() {
let temp_dir = TempDir::new().unwrap();
let mut store = Store::new(temp_dir.into_path()).await;
let store = Store::new(temp_dir.into_path()).await;
let meta = serde_json::json!({"key": "value"});
let frame = store
.append(Frame::with_topic("stream").meta(meta).build())
Expand All @@ -486,7 +550,7 @@ mod tests_store {
#[tokio::test]
async fn test_follow() {
let temp_dir = TempDir::new().unwrap();
let mut store = Store::new(temp_dir.into_path()).await;
let store = Store::new(temp_dir.into_path()).await;

// Append two initial clips
let f1 = store.append(Frame::with_topic("stream").build()).await;
Expand Down Expand Up @@ -535,12 +599,12 @@ mod tests_store {
#[tokio::test]
async fn test_stream_basics() {
let temp_dir = TempDir::new().unwrap();
let mut store = Store::new(temp_dir.into_path()).await;
let store = Store::new(temp_dir.into_path()).await;

let f1 = store.append(Frame::with_topic("/stream").build()).await;
let f2 = store.append(Frame::with_topic("/stream").build()).await;

assert_eq!(store.head("/stream".to_string()), Some(f2.clone()));
assert_eq!(store.head("/stream"), Some(f2.clone()));

let recver = store.read(ReadOptions::default()).await;
assert_eq!(
Expand All @@ -564,7 +628,7 @@ mod tests_store {
#[tokio::test]
async fn test_read_limit_nofollow() {
let temp_dir = tempfile::tempdir().unwrap();
let mut store = Store::new(temp_dir.path().to_path_buf()).await;
let store = Store::new(temp_dir.path().to_path_buf()).await;

// Add 3 items
let frame1 = store.append(Frame::with_topic("test").build()).await;
Expand All @@ -586,7 +650,7 @@ mod tests_store {
#[tokio::test]
async fn test_read_follow_limit_after_subscribe() {
let temp_dir = tempfile::tempdir().unwrap();
let mut store = Store::new(temp_dir.path().to_path_buf()).await;
let store = Store::new(temp_dir.path().to_path_buf()).await;

// Add 1 item
let frame1 = store.append(Frame::with_topic("test").build()).await;
Expand Down Expand Up @@ -622,7 +686,7 @@ mod tests_store {
#[tokio::test]
async fn test_read_follow_limit_processing_history() {
let temp_dir = tempfile::tempdir().unwrap();
let mut store = Store::new(temp_dir.path().to_path_buf()).await;
let store = Store::new(temp_dir.path().to_path_buf()).await;

// Create 5 records upfront
let frame1 = store.append(Frame::with_topic("test").build()).await;
Expand Down
Loading

0 comments on commit 691902a

Please sign in to comment.