Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: analytics observers #13

Merged
merged 2 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ http = []
metrics = ["dep:metrics", "future/metrics", "alloc/metrics", "http/metrics"]
profiler = ["alloc/profiler"]

[workspace.dependencies]
aws-sdk-s3 = "1.13"

[dependencies]
alloc = { path = "./crates/alloc", optional = true }
analytics = { path = "./crates/analytics", optional = true }
Expand Down
4 changes: 3 additions & 1 deletion crates/analytics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ version = "0.1.0"
edition = "2021"

[dependencies]
future = { path = "../future" }

async-trait = "0.1"
tokio = { version = "1", default-features = false, features = ["rt", "rt-multi-thread", "sync", "time", "macros"] }
tracing = "0.1"
Expand All @@ -14,7 +16,7 @@ anyhow = "1"
tap = "1.0"

chrono = { version = "0.4" }
aws-sdk-s3 = "0.31"
aws-sdk-s3.workspace = true
bytes = "1.5"
parquet = { git = "https://github.com/WalletConnect/arrow-rs.git", rev = "99a1cc3", default-features = false, features = ["flate2"] }
parquet_derive = { git = "https://github.com/WalletConnect/arrow-rs.git", rev = "99a1cc3" }
201 changes: 201 additions & 0 deletions crates/analytics/src/collectors.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
use {
crate::{AnalyticsEvent, Batch, BatchFactory, Collector, Exporter},
std::{marker::PhantomData, pin::pin, time::Duration},
tokio::sync::{mpsc, mpsc::error::TrySendError},
};

#[derive(Debug, thiserror::Error)]
enum InternalError {
#[error("Batch error: {0}")]
Batch(String),

#[error("Export error: {0}")]
Export(String),

#[error("Serialization failed")]
Serialization,
}

#[derive(Debug, thiserror::Error)]
pub enum CollectionError {
#[error("Data channel overflow")]
DataChannelOverflow,

#[error("Data channel closed")]
DataChannelClosed,
}

impl<T> From<TrySendError<T>> for CollectionError {
fn from(val: TrySendError<T>) -> Self {
match val {
TrySendError::Full(_) => Self::DataChannelOverflow,
TrySendError::Closed(_) => Self::DataChannelClosed,
}
}
}

pub struct CollectorConfig {
/// Data collection queue capacity. Overflowing the queue would cause excess
/// data to be dropped.
pub data_queue_capacity: usize,

/// Maximum interval between batch data exports.
pub export_interval: Duration,
}

impl Default for CollectorConfig {
fn default() -> Self {
Self {
data_queue_capacity: 8192,
export_interval: Duration::from_secs(5 * 60),
}
}
}

pub struct BatchCollector<T> {
data_tx: mpsc::Sender<T>,
}

impl<T> BatchCollector<T>
where
T: AnalyticsEvent,
{
pub fn new<B, E>(config: CollectorConfig, batch_factory: B, exporter: E) -> Self
where
B: BatchFactory<T>,
B::Error: std::error::Error,
E: Exporter,
{
let (data_tx, data_rx) = mpsc::channel(config.data_queue_capacity);

tokio::spawn(async move {
let event_loop = EventLoop::new(batch_factory, exporter, config);

if let Err(err) = event_loop.run(data_rx).await {
tracing::warn!(?err, "analytics event loop failed");
}
});

Self { data_tx }
}
}

impl<T> Collector<T> for BatchCollector<T>
where
T: AnalyticsEvent,
{
type Error = CollectionError;

fn collect(&self, data: T) -> Result<(), Self::Error> {
self.data_tx.try_send(data).map_err(Into::into)
}
}

struct EventLoop<T, B, E> {
batch_factory: B,
exporter: E,
config: CollectorConfig,
_marker: PhantomData<T>,
}

impl<T, B, E> EventLoop<T, B, E>
where
T: AnalyticsEvent,
B: BatchFactory<T>,
B::Error: std::error::Error,
E: Exporter,
E::Error: std::error::Error,
{
fn new(batch_factory: B, exporter: E, config: CollectorConfig) -> Self {
Self {
batch_factory,
exporter,
config,
_marker: PhantomData,
}
}

async fn run(self, data_rx: mpsc::Receiver<T>) -> Result<(), InternalError> {
let mut data_rx = pin!(data_rx);
let mut export_interval = pin!(tokio::time::interval(self.config.export_interval));

let mut current_batch = self
.batch_factory
.create()
.map_err(|err| InternalError::Batch(err.to_string()))?;

loop {
tokio::select! {
data = data_rx.recv() => match data {
Some(data) => {
if let Err(err) = current_batch.push(data) {
tracing::warn!(?err, "failed to push data to batch");

// Data push error is considered transient, so try to replace the
// broken batch and continue. If we can't create a new batch, exit
// the event loop with an error.
self.replace_batch(&mut current_batch)?;
export_interval.reset();

continue;
}

// Export the batch if it's at capacity.
if current_batch.is_full() {
self.export_batch(&mut current_batch)?;
export_interval.reset();
}
},

// The transmitter has been dropped. Export current batch and shutdown.
None => {
return self.export_batch(&mut current_batch);
},
},

_ = export_interval.tick() => {
self.export_batch(&mut current_batch)?;
}
}
}
}

fn replace_batch(&self, current_batch: &mut B::Batch) -> Result<B::Batch, InternalError> {
let next_batch = self
.batch_factory
.create()
.map_err(|err| InternalError::Batch(err.to_string()))?;

Ok(std::mem::replace(current_batch, next_batch))
}

fn export_batch(&self, current_batch: &mut B::Batch) -> Result<(), InternalError> {
if current_batch.is_empty() {
return Ok(());
}

let current_batch = self.replace_batch(current_batch)?;
let exporter = self.exporter.clone();

tokio::spawn(async move {
let result = async {
let data = tokio::task::spawn_blocking(move || current_batch.serialize())
.await
.map_err(|_| InternalError::Serialization)?
.map_err(|err| InternalError::Batch(err.to_string()))?;

exporter
.export(data)
.await
.map_err(|err| InternalError::Export(err.to_string()))
}
.await;

if let Err(err) = result {
tracing::warn!(?err, "failed to export batch data");
}
});

Ok(())
}
}
Loading
Loading