From 38f9f72fa62217d9740f43428eced6088b6b5cf7 Mon Sep 17 00:00:00 2001 From: Carter Green Date: Mon, 17 Jun 2024 18:03:53 -0500 Subject: [PATCH 1/4] ADD: Add Rust `timeseries.get_range_to_file` --- CHANGELOG.md | 6 + Cargo.toml | 1 + src/historical/timeseries.rs | 217 ++++++++++++++++++++++++++++++++--- 3 files changed, 208 insertions(+), 16 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e8d03b6..48fa746 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +## 0.11.2 - TBD + +#### Enhancements +- Added `historical::timeseries::get_range_to_file` method to persist the data stream to + a given path before returning an `AsyncDbnDecoder` + ## 0.11.1 - 2024-06-11 #### Enhancements diff --git a/Cargo.toml b/Cargo.toml index 7bbab2a..d2b4ad1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,5 +46,6 @@ typed-builder = "0.18" [dev-dependencies] env_logger = "0.11.2" +tempfile = "3.10.1" tokio = { version = "1.38", features = ["full"] } wiremock = "0.6" diff --git a/src/historical/timeseries.rs b/src/historical/timeseries.rs index 19995c0..13a9a59 100644 --- a/src/historical/timeseries.rs +++ b/src/historical/timeseries.rs @@ -1,11 +1,15 @@ //! The historical timeseries API. -use std::num::NonZeroU64; +use std::{num::NonZeroU64, path::PathBuf}; -use dbn::{Compression, Encoding, SType, Schema, VersionUpgradePolicy}; -use futures::TryStreamExt; +use dbn::{encode::AsyncDbnEncoder, Compression, Encoding, SType, Schema, VersionUpgradePolicy}; +use futures::{Stream, TryStreamExt}; use reqwest::{header::ACCEPT, RequestBuilder}; -use tokio::io::AsyncReadExt; +use tokio::{ + fs::File, + io::{AsyncReadExt, AsyncWriteExt, BufWriter}, +}; +use tokio_util::{bytes::Bytes, io::StreamReader}; use typed_builder::TypedBuilder; use crate::Symbols; @@ -34,17 +38,79 @@ impl TimeseriesClient<'_> { &mut self, params: &GetRangeParams, ) -> crate::Result> { + let reader = self + .get_range_impl( + ¶ms.dataset, + params.schema, + params.stype_in, + params.stype_out, + ¶ms.symbols, + ¶ms.date_time_range, + params.limit, + ) + .await?; + let mut decoder: AsyncDbnDecoder<_> = AsyncDbnDecoder::with_zstd_buffer(reader).await?; + decoder.set_upgrade_policy(params.upgrade_policy); + Ok(decoder) + } + + /// Makes a streaming request for timeseries data from Databento. + /// + /// This method returns a stream decoder. For larger requests, consider using + /// [`BatchClient::submit_job()`](super::batch::BatchClient::submit_job()). + /// + /// # Errors + /// This function returns an error when it fails to communicate with the Databento API + /// or the API indicates there's an issue with the request. An error will also be returned + /// if it fails to create a new file at `path`. + pub async fn get_range_to_file( + &mut self, + params: &GetRangeToFileParams, + ) -> crate::Result> { + let reader = self + .get_range_impl( + ¶ms.dataset, + params.schema, + params.stype_in, + params.stype_out, + ¶ms.symbols, + ¶ms.date_time_range, + params.limit, + ) + .await?; + let mut http_decoder = AsyncDbnDecoder::with_zstd_buffer(reader).await?; + http_decoder.set_upgrade_policy(params.upgrade_policy); + let file = BufWriter::new(File::create(¶ms.path).await?); + let mut encoder = AsyncDbnEncoder::with_zstd(file, http_decoder.metadata()).await?; + while let Some(rec_ref) = http_decoder.decode_record_ref().await? { + encoder.encode_record_ref(rec_ref).await?; + } + encoder.get_mut().shutdown().await?; + Ok(AsyncDbnDecoder::from_zstd_file(¶ms.path).await?) + } + + #[allow(clippy::too_many_arguments)] // private method + async fn get_range_impl( + &mut self, + dataset: &str, + schema: Schema, + stype_in: SType, + stype_out: SType, + symbols: &Symbols, + date_time_range: &DateTimeRange, + limit: Option, + ) -> crate::Result>, Bytes>> { let mut form = vec![ - ("dataset", params.dataset.to_string()), - ("schema", params.schema.to_string()), + ("dataset", dataset.to_owned()), + ("schema", schema.to_string()), ("encoding", Encoding::Dbn.to_string()), ("compression", Compression::ZStd.to_string()), - ("stype_in", params.stype_in.to_string()), - ("stype_out", params.stype_out.to_string()), - ("symbols", params.symbols.to_api_string()), + ("stype_in", stype_in.to_string()), + ("stype_out", stype_out.to_string()), + ("symbols", symbols.to_api_string()), ]; - params.date_time_range.add_to_form(&mut form); - if let Some(limit) = params.limit { + date_time_range.add_to_form(&mut form); + if let Some(limit) = limit { form.push(("limit", limit.to_string())); } let resp = self @@ -59,10 +125,7 @@ impl TimeseriesClient<'_> { .error_for_status()? .bytes_stream() .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)); - let reader = tokio_util::io::StreamReader::new(stream); - let mut decoder: AsyncDbnDecoder<_> = AsyncDbnDecoder::with_zstd_buffer(reader).await?; - decoder.set_upgrade_policy(params.upgrade_policy); - Ok(decoder) + Ok(tokio_util::io::StreamReader::new(stream)) } fn post(&mut self, slug: &str) -> crate::Result { @@ -101,9 +164,76 @@ pub struct GetRangeParams { pub upgrade_policy: VersionUpgradePolicy, } +/// The parameters for [`TimeseriesClient::get_range_to_file()`]. Use +/// [`GetRangeToFileParams::builder()`] to get a builder type with all the preset defaults. +#[derive(Debug, Clone, TypedBuilder, PartialEq, Eq)] +pub struct GetRangeToFileParams { + /// The dataset code. + #[builder(setter(transform = |dt: impl ToString| dt.to_string()))] + pub dataset: String, + /// The symbols to filter for. + #[builder(setter(into))] + pub symbols: Symbols, + /// The data record schema. + pub schema: Schema, + /// The request time range. + #[builder(setter(into))] + pub date_time_range: DateTimeRange, + /// The symbology type of the input `symbols`. Defaults to + /// [`RawSymbol`](dbn::enums::SType::RawSymbol). + #[builder(default = SType::RawSymbol)] + pub stype_in: SType, + /// The symbology type of the output `symbols`. Defaults to + /// [`InstrumentId`](dbn::enums::SType::InstrumentId). + #[builder(default = SType::InstrumentId)] + pub stype_out: SType, + /// The optional maximum number of records to return. Defaults to no limit. + #[builder(default)] + pub limit: Option, + /// How to decode DBN from prior versions. Defaults to upgrade. + #[builder(default = VersionUpgradePolicy::Upgrade)] + pub upgrade_policy: VersionUpgradePolicy, + /// The file path to persist the stream data to. + #[builder(default, setter(transform = |p: impl Into| p.into()))] + pub path: PathBuf, +} + +impl From for GetRangeParams { + fn from(value: GetRangeToFileParams) -> Self { + Self { + dataset: value.dataset, + symbols: value.symbols, + schema: value.schema, + date_time_range: value.date_time_range, + stype_in: value.stype_in, + stype_out: value.stype_out, + limit: value.limit, + upgrade_policy: value.upgrade_policy, + } + } +} + +impl GetRangeParams { + /// Converts these parameters into a request that will be persisted to a file + /// at `path`. Used in conjunction with [`TimeseriesClient::get_range_to_file()``]. + pub fn with_path(self, path: impl Into) -> GetRangeToFileParams { + GetRangeToFileParams { + dataset: self.dataset, + symbols: self.symbols, + schema: self.schema, + date_time_range: self.date_time_range, + stype_in: self.stype_in, + stype_out: self.stype_out, + limit: self.limit, + upgrade_policy: self.upgrade_policy, + path: path.into(), + } + } +} + #[cfg(test)] mod tests { - use dbn::record::TradeMsg; + use dbn::{record::TradeMsg, Dataset}; use reqwest::StatusCode; use time::macros::datetime; use wiremock::{ @@ -169,4 +299,59 @@ mod tests { decoder.decode_record::().await.unwrap().unwrap(); assert!(decoder.decode_record::().await.unwrap().is_none()); } + + #[tokio::test] + async fn test_get_range_to_file() { + const START: time::OffsetDateTime = datetime!(2024 - 05 - 17 00:00 UTC); + const END: time::OffsetDateTime = datetime!(2024 - 05 - 18 00:00 UTC); + const SCHEMA: Schema = Schema::Trades; + const DATASET: &str = Dataset::IfeuImpact.as_str(); + + let mock_server = MockServer::start().await; + let temp_dir = tempfile::TempDir::new().unwrap(); + let bytes = tokio::fs::read(zst_test_data_path(SCHEMA)).await.unwrap(); + Mock::given(method("POST")) + .and(basic_auth(API_KEY, "")) + .and(path(format!("/v{API_VERSION}/timeseries.get_range"))) + .and(body_contains("dataset", DATASET)) + .and(body_contains("schema", "trades")) + .and(body_contains("symbols", "BRN.FUT")) + .and(body_contains( + "start", + START.unix_timestamp_nanos().to_string(), + )) + .and(body_contains("end", END.unix_timestamp_nanos().to_string())) + // // default + .and(body_contains("stype_in", "parent")) + .and(body_contains("stype_out", "instrument_id")) + .respond_with(ResponseTemplate::new(StatusCode::OK.as_u16()).set_body_bytes(bytes)) + .mount(&mock_server) + .await; + let mut target = HistoricalClient::with_url( + mock_server.uri(), + API_KEY.to_owned(), + HistoricalGateway::Bo1, + ) + .unwrap(); + let path = temp_dir.path().join("test.dbn.zst"); + let mut decoder = target + .timeseries() + .get_range_to_file( + &GetRangeToFileParams::builder() + .dataset(DATASET) + .schema(SCHEMA) + .symbols(vec!["BRN.FUT"]) + .stype_in(SType::Parent) + .date_time_range((START, END)) + .path(path.clone()) + .build(), + ) + .await + .unwrap(); + assert_eq!(decoder.metadata().schema.unwrap(), SCHEMA); + // Two records + decoder.decode_record::().await.unwrap().unwrap(); + decoder.decode_record::().await.unwrap().unwrap(); + assert!(decoder.decode_record::().await.unwrap().is_none()); + } } From 04de68897631d855cbcc9a196fbf3eef754fce72 Mon Sep 17 00:00:00 2001 From: Roberto Mesado Date: Mon, 24 Jun 2024 17:24:12 -0700 Subject: [PATCH 2/4] MOD: Update docs links throughout codebase --- src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index fbd2a14..cbd47b6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,7 +2,7 @@ //! It provides clients for fast, safe streaming of both real-time and historical market data through //! similar interfaces. //! The library is built on top of the tokio asynchronous runtime and -//! [Databento's efficient binary encoding](https://databento.com/docs/knowledge-base/new-users/dbn-encoding). +//! [Databento's efficient binary encoding](https://databento.com/docs/standards-and-conventions/databento-binary-encoding). //! //! You can find getting started tutorials, full API method documentation, examples //! with output on the [Databento docs site](https://databento.com/docs/?historical=rust&live=rust). From 7639ec4e498d91dd7557ad592d4cf2d2b362cc12 Mon Sep 17 00:00:00 2001 From: Carter Green Date: Tue, 25 Jun 2024 09:42:15 -0500 Subject: [PATCH 3/4] VER: Release 0.11.2 --- CHANGELOG.md | 3 ++- Cargo.toml | 6 +++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 48fa746..045fb1c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,10 +1,11 @@ # Changelog -## 0.11.2 - TBD +## 0.11.2 - 2024-06-25 #### Enhancements - Added `historical::timeseries::get_range_to_file` method to persist the data stream to a given path before returning an `AsyncDbnDecoder` +- Upgraded DBN version to 0.18.2 ## 0.11.1 - 2024-06-11 diff --git a/Cargo.toml b/Cargo.toml index d2b4ad1..37a278c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "databento" authors = ["Databento "] -version = "0.11.1" +version = "0.11.2" edition = "2021" repository = "https://github.com/databento/databento-rs" description = "Official Databento client library" @@ -24,7 +24,7 @@ live = ["dep:hex", "dep:sha2", "tokio/net"] [dependencies] # binary encoding -dbn = { version = "0.18.1", features = ["async", "serde"] } +dbn = { version = "0.18.2", features = ["async", "serde"] } # Async stream trait futures = { version = "0.3", optional = true } # Used for Live authentication @@ -45,7 +45,7 @@ tokio-util = { version = "0.7", features = ["io"], optional = true } typed-builder = "0.18" [dev-dependencies] -env_logger = "0.11.2" +env_logger = "0.11.3" tempfile = "3.10.1" tokio = { version = "1.38", features = ["full"] } wiremock = "0.6" From 3a5ec2e9ea99dcbfe5b610e27f972a41ef45e03a Mon Sep 17 00:00:00 2001 From: Vinicius Livramento Date: Tue, 25 Jun 2024 18:46:09 +0100 Subject: [PATCH 4/4] ADD: Adding public docs for live snapshot --- src/live.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/live.rs b/src/live.rs index 259a75d..37a8d80 100644 --- a/src/live.rs +++ b/src/live.rs @@ -32,7 +32,7 @@ pub struct Subscription { #[builder(default, setter(strip_option))] pub start: Option, #[doc(hidden)] - /// Reserved for future use. + /// Request subscription with snapshot. Defaults to `false`. Conflicts with the `start` parameter. #[builder(setter(strip_bool))] pub use_snapshot: bool, }