Skip to content

Commit

Permalink
feat: introduce the special Pipe collect_to_vec to use for test
Browse files Browse the repository at this point in the history
  • Loading branch information
davidB committed Dec 9, 2024
1 parent 55678c6 commit a703cbe
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 1 deletion.
2 changes: 2 additions & 0 deletions cdviz-collector/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ pub(crate) enum Error {
ConfigReader(figment::Error),
CloudEventBuilder(cloudevents::event::EventBuilderError),
CloudEventMessage(cloudevents::message::Error),
// MutexPoisoned(std::sync::PoisonError<std::sync::MutexGuard<'static, Message>>),
// MutexPoisoned<T>(std::sync::PoisonError<T>),
// ConfigTomlError(toml::de::Error),
MultiHash(multihash::Error),
#[display("{txt}")]
Expand Down
53 changes: 53 additions & 0 deletions cdviz-collector/src/pipes/collect_to_vec.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#[allow(dead_code)]
use super::Pipe;
use crate::errors::{Error, Result};
use serde::{Deserialize, Serialize};
use std::{
fmt::Debug,
sync::{Arc, Mutex},
};

#[derive(Debug, Deserialize, Serialize)]
pub(crate) struct Config {}

type Buffer<I> = Arc<Mutex<Vec<I>>>;
pub struct Processor<I> {
// TODO do we need to use Mutex?
buffer: Buffer<I>,
}

impl<I> Pipe for Processor<I> {
type Input = I;
fn send(&mut self, input: Self::Input) -> Result<()> {
//.lock().unwrap() if mutex
self.buffer.lock().map_err(|err| Error::from(err.to_string()))?.push(input);
Ok(())
}
}

pub struct Collector<I> {
buffer: Buffer<I>,
}

impl<I> Collector<I>
where
I: Clone,
{
#[allow(dead_code)] // mainly use in tests
pub fn new() -> Self {
Self { buffer: Arc::new(Mutex::new(vec![])) }
}

#[allow(dead_code)] // mainly use in tests
pub fn create_pipe(&self) -> Processor<I> {
Processor { buffer: Arc::clone(&self.buffer) }
}

#[allow(dead_code)] // mainly use in tests
pub fn try_into_iter(&self) -> Result<std::vec::IntoIter<I>> {
self.buffer
.lock()
.map_err(|err| Error::from(err.to_string()))
.map(|buffer| buffer.clone().into_iter())
}
}
1 change: 1 addition & 0 deletions cdviz-collector/src/pipes/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::errors::Result;
pub mod collect_to_vec;
pub mod discard_all;
pub mod log;
pub mod passthrough;
Expand Down
2 changes: 1 addition & 1 deletion cdviz-collector/src/sources/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ pub(crate) fn start(_name: &str, config: Config, tx: Sender<Message>) -> JoinHan
})
}

#[derive(Debug, Deserialize, Serialize, Default)]
#[derive(Debug, Clone, Deserialize, Serialize, Default, PartialEq, Eq)]
pub struct EventSource {
pub metadata: Value,
pub header: HashMap<String, String>,
Expand Down

0 comments on commit a703cbe

Please sign in to comment.