Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VER: Release 0.11.2 #27

Merged
merged 4 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Changelog

## 0.11.2 - 2024-06-25

#### Enhancements
- Added `historical::timeseries::get_range_to_file` method to persist the data stream to
a given path before returning an `AsyncDbnDecoder`
- Upgraded DBN version to 0.18.2

## 0.11.1 - 2024-06-11

#### Enhancements
Expand Down
7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "databento"
authors = ["Databento <[email protected]>"]
version = "0.11.1"
version = "0.11.2"
edition = "2021"
repository = "https://github.com/databento/databento-rs"
description = "Official Databento client library"
Expand All @@ -24,7 +24,7 @@ live = ["dep:hex", "dep:sha2", "tokio/net"]

[dependencies]
# binary encoding
dbn = { version = "0.18.1", features = ["async", "serde"] }
dbn = { version = "0.18.2", features = ["async", "serde"] }
# Async stream trait
futures = { version = "0.3", optional = true }
# Used for Live authentication
Expand All @@ -45,6 +45,7 @@ tokio-util = { version = "0.7", features = ["io"], optional = true }
typed-builder = "0.18"

[dev-dependencies]
env_logger = "0.11.2"
env_logger = "0.11.3"
tempfile = "3.10.1"
tokio = { version = "1.38", features = ["full"] }
wiremock = "0.6"
217 changes: 201 additions & 16 deletions src/historical/timeseries.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
//! The historical timeseries API.

use std::num::NonZeroU64;
use std::{num::NonZeroU64, path::PathBuf};

use dbn::{Compression, Encoding, SType, Schema, VersionUpgradePolicy};
use futures::TryStreamExt;
use dbn::{encode::AsyncDbnEncoder, Compression, Encoding, SType, Schema, VersionUpgradePolicy};
use futures::{Stream, TryStreamExt};
use reqwest::{header::ACCEPT, RequestBuilder};
use tokio::io::AsyncReadExt;
use tokio::{
fs::File,
io::{AsyncReadExt, AsyncWriteExt, BufWriter},
};
use tokio_util::{bytes::Bytes, io::StreamReader};
use typed_builder::TypedBuilder;

use crate::Symbols;
Expand Down Expand Up @@ -34,17 +38,79 @@ impl TimeseriesClient<'_> {
&mut self,
params: &GetRangeParams,
) -> crate::Result<AsyncDbnDecoder<impl AsyncReadExt>> {
let reader = self
.get_range_impl(
&params.dataset,
params.schema,
params.stype_in,
params.stype_out,
&params.symbols,
&params.date_time_range,
params.limit,
)
.await?;
let mut decoder: AsyncDbnDecoder<_> = AsyncDbnDecoder::with_zstd_buffer(reader).await?;
decoder.set_upgrade_policy(params.upgrade_policy);
Ok(decoder)
}

/// Makes a streaming request for timeseries data from Databento.
///
/// This method returns a stream decoder. For larger requests, consider using
/// [`BatchClient::submit_job()`](super::batch::BatchClient::submit_job()).
///
/// # Errors
/// This function returns an error when it fails to communicate with the Databento API
/// or the API indicates there's an issue with the request. An error will also be returned
/// if it fails to create a new file at `path`.
pub async fn get_range_to_file(
&mut self,
params: &GetRangeToFileParams,
) -> crate::Result<AsyncDbnDecoder<impl AsyncReadExt>> {
let reader = self
.get_range_impl(
&params.dataset,
params.schema,
params.stype_in,
params.stype_out,
&params.symbols,
&params.date_time_range,
params.limit,
)
.await?;
let mut http_decoder = AsyncDbnDecoder::with_zstd_buffer(reader).await?;
http_decoder.set_upgrade_policy(params.upgrade_policy);
let file = BufWriter::new(File::create(&params.path).await?);
let mut encoder = AsyncDbnEncoder::with_zstd(file, http_decoder.metadata()).await?;
while let Some(rec_ref) = http_decoder.decode_record_ref().await? {
encoder.encode_record_ref(rec_ref).await?;
}
encoder.get_mut().shutdown().await?;
Ok(AsyncDbnDecoder::from_zstd_file(&params.path).await?)
}

#[allow(clippy::too_many_arguments)] // private method
async fn get_range_impl(
&mut self,
dataset: &str,
schema: Schema,
stype_in: SType,
stype_out: SType,
symbols: &Symbols,
date_time_range: &DateTimeRange,
limit: Option<NonZeroU64>,
) -> crate::Result<StreamReader<impl Stream<Item = std::io::Result<Bytes>>, Bytes>> {
let mut form = vec![
("dataset", params.dataset.to_string()),
("schema", params.schema.to_string()),
("dataset", dataset.to_owned()),
("schema", schema.to_string()),
("encoding", Encoding::Dbn.to_string()),
("compression", Compression::ZStd.to_string()),
("stype_in", params.stype_in.to_string()),
("stype_out", params.stype_out.to_string()),
("symbols", params.symbols.to_api_string()),
("stype_in", stype_in.to_string()),
("stype_out", stype_out.to_string()),
("symbols", symbols.to_api_string()),
];
params.date_time_range.add_to_form(&mut form);
if let Some(limit) = params.limit {
date_time_range.add_to_form(&mut form);
if let Some(limit) = limit {
form.push(("limit", limit.to_string()));
}
let resp = self
Expand All @@ -59,10 +125,7 @@ impl TimeseriesClient<'_> {
.error_for_status()?
.bytes_stream()
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e));
let reader = tokio_util::io::StreamReader::new(stream);
let mut decoder: AsyncDbnDecoder<_> = AsyncDbnDecoder::with_zstd_buffer(reader).await?;
decoder.set_upgrade_policy(params.upgrade_policy);
Ok(decoder)
Ok(tokio_util::io::StreamReader::new(stream))
}

fn post(&mut self, slug: &str) -> crate::Result<RequestBuilder> {
Expand Down Expand Up @@ -101,9 +164,76 @@ pub struct GetRangeParams {
pub upgrade_policy: VersionUpgradePolicy,
}

/// The parameters for [`TimeseriesClient::get_range_to_file()`]. Use
/// [`GetRangeToFileParams::builder()`] to get a builder type with all the preset defaults.
#[derive(Debug, Clone, TypedBuilder, PartialEq, Eq)]
pub struct GetRangeToFileParams {
/// The dataset code.
#[builder(setter(transform = |dt: impl ToString| dt.to_string()))]
pub dataset: String,
/// The symbols to filter for.
#[builder(setter(into))]
pub symbols: Symbols,
/// The data record schema.
pub schema: Schema,
/// The request time range.
#[builder(setter(into))]
pub date_time_range: DateTimeRange,
/// The symbology type of the input `symbols`. Defaults to
/// [`RawSymbol`](dbn::enums::SType::RawSymbol).
#[builder(default = SType::RawSymbol)]
pub stype_in: SType,
/// The symbology type of the output `symbols`. Defaults to
/// [`InstrumentId`](dbn::enums::SType::InstrumentId).
#[builder(default = SType::InstrumentId)]
pub stype_out: SType,
/// The optional maximum number of records to return. Defaults to no limit.
#[builder(default)]
pub limit: Option<NonZeroU64>,
/// How to decode DBN from prior versions. Defaults to upgrade.
#[builder(default = VersionUpgradePolicy::Upgrade)]
pub upgrade_policy: VersionUpgradePolicy,
/// The file path to persist the stream data to.
#[builder(default, setter(transform = |p: impl Into<PathBuf>| p.into()))]
pub path: PathBuf,
}

impl From<GetRangeToFileParams> for GetRangeParams {
fn from(value: GetRangeToFileParams) -> Self {
Self {
dataset: value.dataset,
symbols: value.symbols,
schema: value.schema,
date_time_range: value.date_time_range,
stype_in: value.stype_in,
stype_out: value.stype_out,
limit: value.limit,
upgrade_policy: value.upgrade_policy,
}
}
}

impl GetRangeParams {
/// Converts these parameters into a request that will be persisted to a file
/// at `path`. Used in conjunction with [`TimeseriesClient::get_range_to_file()``].
pub fn with_path(self, path: impl Into<PathBuf>) -> GetRangeToFileParams {
GetRangeToFileParams {
dataset: self.dataset,
symbols: self.symbols,
schema: self.schema,
date_time_range: self.date_time_range,
stype_in: self.stype_in,
stype_out: self.stype_out,
limit: self.limit,
upgrade_policy: self.upgrade_policy,
path: path.into(),
}
}
}

#[cfg(test)]
mod tests {
use dbn::record::TradeMsg;
use dbn::{record::TradeMsg, Dataset};
use reqwest::StatusCode;
use time::macros::datetime;
use wiremock::{
Expand Down Expand Up @@ -169,4 +299,59 @@ mod tests {
decoder.decode_record::<TradeMsg>().await.unwrap().unwrap();
assert!(decoder.decode_record::<TradeMsg>().await.unwrap().is_none());
}

#[tokio::test]
async fn test_get_range_to_file() {
const START: time::OffsetDateTime = datetime!(2024 - 05 - 17 00:00 UTC);
const END: time::OffsetDateTime = datetime!(2024 - 05 - 18 00:00 UTC);
const SCHEMA: Schema = Schema::Trades;
const DATASET: &str = Dataset::IfeuImpact.as_str();

let mock_server = MockServer::start().await;
let temp_dir = tempfile::TempDir::new().unwrap();
let bytes = tokio::fs::read(zst_test_data_path(SCHEMA)).await.unwrap();
Mock::given(method("POST"))
.and(basic_auth(API_KEY, ""))
.and(path(format!("/v{API_VERSION}/timeseries.get_range")))
.and(body_contains("dataset", DATASET))
.and(body_contains("schema", "trades"))
.and(body_contains("symbols", "BRN.FUT"))
.and(body_contains(
"start",
START.unix_timestamp_nanos().to_string(),
))
.and(body_contains("end", END.unix_timestamp_nanos().to_string()))
// // default
.and(body_contains("stype_in", "parent"))
.and(body_contains("stype_out", "instrument_id"))
.respond_with(ResponseTemplate::new(StatusCode::OK.as_u16()).set_body_bytes(bytes))
.mount(&mock_server)
.await;
let mut target = HistoricalClient::with_url(
mock_server.uri(),
API_KEY.to_owned(),
HistoricalGateway::Bo1,
)
.unwrap();
let path = temp_dir.path().join("test.dbn.zst");
let mut decoder = target
.timeseries()
.get_range_to_file(
&GetRangeToFileParams::builder()
.dataset(DATASET)
.schema(SCHEMA)
.symbols(vec!["BRN.FUT"])
.stype_in(SType::Parent)
.date_time_range((START, END))
.path(path.clone())
.build(),
)
.await
.unwrap();
assert_eq!(decoder.metadata().schema.unwrap(), SCHEMA);
// Two records
decoder.decode_record::<TradeMsg>().await.unwrap().unwrap();
decoder.decode_record::<TradeMsg>().await.unwrap().unwrap();
assert!(decoder.decode_record::<TradeMsg>().await.unwrap().is_none());
}
}
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//! It provides clients for fast, safe streaming of both real-time and historical market data through
//! similar interfaces.
//! The library is built on top of the tokio asynchronous runtime and
//! [Databento's efficient binary encoding](https://databento.com/docs/knowledge-base/new-users/dbn-encoding).
//! [Databento's efficient binary encoding](https://databento.com/docs/standards-and-conventions/databento-binary-encoding).
//!
//! You can find getting started tutorials, full API method documentation, examples
//! with output on the [Databento docs site](https://databento.com/docs/?historical=rust&live=rust).
Expand Down
2 changes: 1 addition & 1 deletion src/live.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub struct Subscription {
#[builder(default, setter(strip_option))]
pub start: Option<OffsetDateTime>,
#[doc(hidden)]
/// Reserved for future use.
/// Request subscription with snapshot. Defaults to `false`. Conflicts with the `start` parameter.
#[builder(setter(strip_bool))]
pub use_snapshot: bool,
}
Expand Down