Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming proposals #53

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
384 changes: 225 additions & 159 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 6 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ categories = ["api-bindings", "games"]
features = ["unknown_variants"]

[features]
default = ["bulk_caching"]
# default = ["bulk_caching"]
default = []
bulk_caching = ["dep:heck"]
unknown_variants = []
unknown_variants_slim = []
Expand All @@ -30,7 +31,7 @@ httpstatus = "0.1"
itertools = "0.13"
once_cell = "1"
percent-encoding = "2"
reqwest = {version = "0.12.5", features = ["json" ,"blocking"] }
reqwest = {version = "0.12.12", features = ["json" ,"blocking", "stream"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
serde_urlencoded = "0.7"
Expand All @@ -40,12 +41,15 @@ tinyvec = "1"
url = { version = "2", features = ["serde"] }
uuid = { version = "1", features = ["serde"] }
tokio = { version = "1", features = ["macros", "rt-multi-thread"], optional = true }
futures-util = "0.3.31"
bytes = "1.10.0"

[dev-dependencies]
strum = { version = "0.26", features = ["derive"] }
tokio = { version = "1", features = ["full"] }
tokio-test = "0.4.4"
static_assertions = "1.1.0"
rand = "0.9.0"

[[bin]]
name = "search"
Expand Down
26 changes: 26 additions & 0 deletions examples/bulk/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#[tokio::main]
async fn main() -> scryfall::Result<()> {
let iterator = scryfall::bulk::all_cards().await?;

let mut error_count = 0;
let mut count = 0;

for card in iterator {
match card {
Ok(_) => {
count += 1;
if count % 5000 == 0 {
println!("{count}");
}
},
Err(e) => {
println!("{:?}", e);
error_count += 1;
},
}
}

println!("Found {} cards and {} errors", count, error_count);

Ok(())
}
65 changes: 33 additions & 32 deletions src/bulk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,22 @@
//! See also: [Official Docs](https://scryfall.com/docs/api/bulk-data)

use std::fs::File;
use std::io;
use std::io::BufReader;
use std::io::{BufWriter, Write};
use std::path::Path;

use bytes::Buf;
use cfg_if::cfg_if;
use chrono::{DateTime, Utc};
use futures_util::StreamExt;
use serde::de::DeserializeOwned;
use serde::Deserialize;
use uuid::Uuid;

use crate::card::Card;
use crate::ruling::Ruling;
use crate::uri::Uri;
use crate::util::array_stream_reader::ArrayStreamReader;
use crate::util::BULK_DATA_URL;
use crate::util::{stream_iterator, BULK_DATA_URL};
use crate::Error;

/// Scryfall provides daily exports of our card data in bulk files. Each of
Expand Down Expand Up @@ -99,7 +100,7 @@ pub struct BulkDataFile<T> {
_object: String,
}

impl<T: DeserializeOwned> BulkDataFile<T> {
impl<T: DeserializeOwned + Send + 'static> BulkDataFile<T> {
cfg_if! {
if #[cfg(feature = "bulk_caching")] {
/// The full temp path where this file will be downloaded with `load`. The
Expand All @@ -121,17 +122,13 @@ impl<T: DeserializeOwned> BulkDataFile<T> {
Ok(BufReader::new(File::open(cache_path)?))
}
} else {
async fn get_reader(&self) -> crate::Result<BufReader<impl io::Read + Send>> {
let content = self.download_uri
.fetch_raw()
.await?
.bytes()
.await
.map_err(|error| crate::Error::ReqwestError {
url: self.download_uri.inner().clone(),
error: Box::new(error),
})?;
Ok(BufReader::new(std::io::Cursor::new(content)))
async fn get_reader(&self) -> crate::Result<BufReader<impl std::io::Read + Send>> {

let response = self.download_uri.fetch_raw().await?;
let body = response.bytes().await.map_err(|e| {
crate::Error::ReqwestError { error: Box::new(e), url: self.download_uri.inner().clone() }
})?;
Ok(BufReader::new(body.reader()))
}
}
}
Expand Down Expand Up @@ -161,22 +158,29 @@ impl<T: DeserializeOwned> BulkDataFile<T> {
/// Downloads and stores the file in the computer's temp folder if this
/// version hasn't been downloaded yet. Otherwise uses the stored copy.
pub async fn load_iter(&self) -> crate::Result<impl Iterator<Item = crate::Result<T>>> {
let de = serde_json::Deserializer::from_reader(ArrayStreamReader::new_buffered(
self.get_reader().await?,
));
Ok(de.into_iter().map(|item| item.map_err(|e| e.into())))
let reader = self.get_reader().await?;
Ok(stream_iterator::create(reader))
}

/// Downloads this file, saving it to `path`. Overwrites the file if it
/// already exists.
pub async fn download(&self, path: impl AsRef<Path>) -> crate::Result<()> {
let path = path.as_ref();
let response = self.download_uri.fetch_raw().await?;
let content = response.bytes().await.map_err(|e| Error::ReqwestError {
error: e.into(),
url: self.download_uri.inner().clone(),
})?;
io::copy(&mut content.as_ref(), &mut File::create(path)?)?;

let mut body = response.bytes_stream();
let mut writer = BufWriter::new(File::create(path)?);

while let Some(chunk_result) = body.next().await {
let chunk = chunk_result.map_err(|e| Error::ReqwestError {
error: e.into(),
url: self.download_uri.inner().clone(),
})?;
writer.write_all(&chunk)?;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you use a tokio::fs::File here instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I can see tokio is not available only with the bin feature. Would it be ok to make tokio a non optional dependency?

}

writer.flush()?;

Ok(())
}
}
Expand Down Expand Up @@ -210,9 +214,6 @@ pub async fn default_cards() -> crate::Result<impl Iterator<Item = crate::Result
}

/// An iterator of every card object on Scryfall in every language.
///
/// # Note
/// This currently takes about 2GB of RAM before returning 👀.
pub async fn all_cards() -> crate::Result<impl Iterator<Item = crate::Result<Card>>> {
BulkDataFile::of_type("all_cards").await?.load_iter().await
}
Expand All @@ -225,6 +226,9 @@ pub async fn rulings() -> crate::Result<impl Iterator<Item = crate::Result<Rulin

#[cfg(test)]
mod tests {
use std::io::BufReader;

use crate::util::stream_iterator;

#[tokio::test]
#[ignore]
Expand Down Expand Up @@ -268,8 +272,6 @@ mod tests {

#[test]
fn test_parse_list() {
use serde_json::Deserializer;

use crate::ruling::Ruling;
let s = r#"[
{
Expand All @@ -287,9 +289,8 @@ mod tests {
"comment": "The “commander tax” increases based on how many times a commander was cast from the command zone. Casting a commander from your hand doesn’t require that additional cost, and it doesn’t increase what the cost will be the next time you cast that commander from the command zone."
}
]"#;
Deserializer::from_reader(super::ArrayStreamReader::new_buffered(s.as_bytes()))
.into_iter()
.map(|r: serde_json::Result<Ruling>| r.unwrap())
stream_iterator::create(BufReader::new(s.as_bytes()))
.map(|r: crate::Result<Ruling>| r.unwrap())
.for_each(drop);
}
}
2 changes: 1 addition & 1 deletion src/card.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ pub struct ImageUris {
///
/// For more details, see the [official documentation](https://scryfall.com/docs/api/cards).
#[derive(Serialize, Deserialize, Clone, PartialEq, Debug)]
#[cfg_attr(test, serde(deny_unknown_fields))]
// #[cfg_attr(test, serde(deny_unknown_fields))]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be restored. Tests were failing because of some new attributes in the scryfall API, this needs to be fixed separately.

#[non_exhaustive]
pub struct Card {
// region Core Card Fields
Expand Down
2 changes: 2 additions & 0 deletions src/card/border_color.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub enum BorderColor {
Gold,
White,
Silver,
Yellow,
}

impl std::fmt::Display for BorderColor {
Expand All @@ -28,6 +29,7 @@ impl std::fmt::Display for BorderColor {
Gold => "gold",
White => "white",
Silver => "silver",
Yellow => "yellow",
}
)
}
Expand Down
1 change: 1 addition & 0 deletions src/card/promo_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pub enum PromoType {
Duels,
Embossed,
Event,
FirstPlaceFoil,
Fnm,
Fracturefoil,
Galaxyfoil,
Expand Down
2 changes: 1 addition & 1 deletion src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use once_cell::sync::Lazy;
use serde::{Deserialize, Deserializer};
use url::Url;

pub(crate) mod array_stream_reader;
pub(crate) mod stream_iterator;

/// The [scryfall](https://scryfall.com/docs/api) endpoint.
pub static ROOT_URL: Lazy<Url> = Lazy::new(|| Url::parse("https://api.scryfall.com/").unwrap());
Expand Down
Loading