Skip to content

Commit

Permalink
feat(compress+split): integrate compress/split writer from oscar-io
Browse files Browse the repository at this point in the history
  • Loading branch information
Uinelj committed Aug 8, 2023
1 parent 8b25ab5 commit d378c22
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 26 deletions.
25 changes: 15 additions & 10 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
[package]
name = "ungoliant"
version = "2.0.0"
authors = ["Julien Abadji <[email protected]>, Pedro J. Ortiz <[email protected]>"]
authors = [
"Julien Abadji <[email protected]>, Pedro J. Ortiz <[email protected]>",
]
edition = "2021"
description = "The pipeline for the OSCAR corpus."
license = "Apache-2.0"
Expand All @@ -11,8 +13,12 @@ repository = "https://github.com/oscar-project/ungoliant"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
reqwest = { version = "0.11", default-features=false, features = ["rustls-tls", "blocking", "stream"] }
flate2 = { version = "1.0.20"}
reqwest = { version = "0.11", default-features = false, features = [
"rustls-tls",
"blocking",
"stream",
] }
flate2 = { version = "1.0.20" }
futures-core = "0.3"
futures-util = "0.3"
futures = "0.3"
Expand All @@ -21,8 +27,8 @@ env_logger = "0.8.3"
log = "0.4.14"
itertools = "0.10.0"
tokio = { version = "1", features = ["full"] }
tokio-util = {version="0.6.6", features=["compat"]}
warc = {version="0.3.0", features=["with_serde"]}
tokio-util = { version = "0.6.6", features = ["compat"] }
warc = { version = "0.3.0", features = ["with_serde"] }
ut1_blocklist = "0.3.0"
fasttext = "0.7.6"
bytes = "1"
Expand All @@ -42,14 +48,13 @@ unicode-script = "0.5.4"
unicode-segmentation = "1.8.0"
csv = "1.1.6"
unic-ucd = "0.9.0"
oxilangtag = {version="0.1.3", features=["serde"]}
oxilangtag = { version = "0.1.3", features = ["serde"] }
language-tags = "0.3.2"
lazy_static = "1.4.0"
oscar-io = "0.2.2"
#tlsh = {git="https://github.com/Uinelj/tlsh-rs", branch="fix-q3-panic"}
oscar-io = "0.4.0"
tlsh-fixed = "0.1.1"

ctclib-pp = {version="0.2.0", optional=true}
ctclib-pp = { version = "0.2.0", optional = true }


[features]
Expand All @@ -60,7 +65,7 @@ rand_distr = "0.4.2"
sha-1 = "0.9"
criterion = "0.3"
serial_test = "0.5.1"
tempfile="3.2.0"
tempfile = "3.2.0"
test-log = "0.2.11"

[[bench]]
Expand Down
9 changes: 9 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,4 +184,13 @@ pub struct Pipeline {
help = "Optional path to kenlm folder. for the language xx, you have to have a xx.binary file."
)]
pub kenlms_path: Option<PathBuf>,

#[structopt(
help = "Split size (in MBytes). Default: No splitting",
long = "split_size"
)]
pub split: Option<u64>,

#[structopt(short = "c", long = "comp", help = "Enables zstd compression")]
pub comp: bool,
}
33 changes: 26 additions & 7 deletions src/io/langfiles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use oscar_io::v3::{Writer, WriterTrait};
type LanguageMap = HashMap<LanguageTag<String>, Arc<Mutex<Writer>>>;
pub struct LangFilesDoc {
writers: Arc<RwLock<LanguageMap>>,
comp: bool,
dst: PathBuf,
part_size_bytes: Option<u64>,
}
Expand Down Expand Up @@ -76,20 +77,27 @@ impl LangFilesDoc {
/// Also keep in mind that [Self::close_meta] has to be called once every write is done.
///
// [Self::close_meta] could be integrated in an `impl Drop`
pub fn new(dst: &Path, part_size_bytes: Option<u64>) -> Self {
pub fn new(dst: &Path, part_size_bytes: Option<u64>, comp: bool) -> Self {
Self {
writers: Arc::new(RwLock::new(HashMap::new())),
dst: dst.to_path_buf(),
part_size_bytes,
comp,
}
}

fn new_writer(
dst: &Path,
lang: LanguageTag<String>,
part_size_bytes: Option<u64>,
comp: bool,
) -> Result<Arc<Mutex<Writer>>, Error> {
let w = Writer::new(dst, lang, part_size_bytes)?;
let comp = if comp {
Some(oscar_io::v3::Comp::Zstd { level: 0 })

Check warning on line 96 in src/io/langfiles.rs

View check run for this annotation

Codecov / codecov/patch

src/io/langfiles.rs#L96

Added line #L96 was not covered by tests
} else {
None
};
let w = Writer::new(dst, lang, part_size_bytes, comp)?;

Ok(Arc::new(Mutex::new(w)))
}
Expand All @@ -115,6 +123,7 @@ impl LangFilesDoc {
&self.dst,
k.clone(),
self.part_size_bytes,
self.comp,
)?);

info!("{k}: Done");
Expand All @@ -127,6 +136,16 @@ impl LangFilesDoc {
) -> std::sync::RwLockReadGuard<HashMap<LanguageTag<String>, Arc<Mutex<Writer>>>> {
self.writers.read().unwrap()
}

/// Flushes all writers.
pub fn flush_all(&self) -> Result<(), Error> {
for writer in self.writers.read().unwrap().values() {
let mut lock = writer.try_lock().unwrap();
lock.flush()?;

Check warning on line 144 in src/io/langfiles.rs

View check run for this annotation

Codecov / codecov/patch

src/io/langfiles.rs#L141-L144

Added lines #L141 - L144 were not covered by tests
}

Ok(())

Check warning on line 147 in src/io/langfiles.rs

View check run for this annotation

Codecov / codecov/patch

src/io/langfiles.rs#L147

Added line #L147 was not covered by tests
}
}

#[cfg(test)]
Expand All @@ -146,13 +165,13 @@ mod tests {
#[test]
fn init_doc() {
let dst = tempdir().unwrap();
let _: LangFilesDoc = LangFilesDoc::new(dst.path(), None);
let _: LangFilesDoc = LangFilesDoc::new(dst.path(), None, false);
}

#[test]
fn test_contains() {
let dst = tempdir().unwrap();
let lf: LangFilesDoc = LangFilesDoc::new(dst.path(), None);
let lf: LangFilesDoc = LangFilesDoc::new(dst.path(), None, false);
let language = LanguageTag::parse("fr".to_string()).unwrap();

assert!(!lf.contains(&language));
Expand All @@ -165,7 +184,7 @@ mod tests {
#[test]
fn write_one_doc() {
let dst = tempdir().unwrap();
let lf: LangFilesDoc = LangFilesDoc::new(dst.path(), None);
let lf: LangFilesDoc = LangFilesDoc::new(dst.path(), None, false);

let content = "Hello!".to_string();

Expand Down Expand Up @@ -194,10 +213,10 @@ mod tests {

if let Ok(mut w) = w.try_lock() {
w.write(docs.to_vec()).unwrap();
w.flush().unwrap();
}

let mut read_path = PathBuf::from(dst.path());
read_path.push("en_meta.jsonl");
read_path.push("en.jsonl");

let b = File::open(read_path).unwrap();
let doc_from_file: Document = serde_json::from_reader(b).unwrap();
Expand Down
13 changes: 11 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,17 @@ async fn main() -> Result<(), error::Error> {

cli::Ungoliant::Pipeline(p) => {
let mut schema_filepath = p.dst.clone();
let p =
pipelines::OscarDocNew::new(p.src, p.dst, p.lid_path, p.blocklist, p.kenlms_path);

// todo: oscardocnew implements from?
let p = pipelines::OscarDocNew::new(
p.src,
p.dst,
p.lid_path,
p.blocklist,
p.kenlms_path,
p.split.map(|size_mbytes| size_mbytes * 1_000_000),
p.comp,
);
p.run()?;

schema_filepath.push("metadata_schema.json");
Expand Down
12 changes: 11 additions & 1 deletion src/pipelines/oscardoc/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ pub struct OscarDoc {
lid_path: PathBuf,
blocklist: Option<PathBuf>,
kenlms_path: Option<PathBuf>,
split: Option<u64>, // in bytes
comp: bool,
}

impl OscarDoc {
Expand All @@ -67,6 +69,8 @@ impl OscarDoc {
lid_path: PathBuf,
blocklist: Option<PathBuf>,
kenlms_path: Option<PathBuf>,
split: Option<u64>,
comp: bool,
) -> Self {
if blocklist.is_none() {
warn!("No blocklist folder specified! No adult content tagging will be done.");
Expand All @@ -79,6 +83,8 @@ impl OscarDoc {
lid_path,
blocklist,
kenlms_path,
split,
comp,
}
}

Expand Down Expand Up @@ -448,7 +454,7 @@ impl Pipeline<()> for OscarDoc {
// ourselves.
let results = results.enumerate().par_bridge();

let langfiles = LangFilesDoc::new(&self.dst, None);
let langfiles = LangFilesDoc::new(&self.dst, self.split, self.comp);

Check warning on line 457 in src/pipelines/oscardoc/pipeline.rs

View check run for this annotation

Codecov / codecov/patch

src/pipelines/oscardoc/pipeline.rs#L457

Added line #L457 was not covered by tests
#[cfg(feature = "kenlm")]
let kenlms = if let Some(kenlms_path) = &self.kenlms_path {
if !kenlms_path.is_dir() {
Expand Down Expand Up @@ -510,6 +516,10 @@ impl Pipeline<()> for OscarDoc {
}
});

// flush writers
info!("Flushing writers");
langfiles.flush_all()?;
info!("Done");

Check warning on line 522 in src/pipelines/oscardoc/pipeline.rs

View check run for this annotation

Codecov / codecov/patch

src/pipelines/oscardoc/pipeline.rs#L520-L522

Added lines #L520 - L522 were not covered by tests
Ok(())
}
}
2 changes: 1 addition & 1 deletion src/processing/rebuild.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ impl<'a> Rebuilder<'a> {
}

// create mutex
let wr = Arc::new(Mutex::new(Writer::new(self.dst, self.lang, None)?));
let wr = Arc::new(Mutex::new(Writer::new(self.dst, self.lang, None, None)?));

Check warning on line 280 in src/processing/rebuild.rs

View check run for this annotation

Codecov / codecov/patch

src/processing/rebuild.rs#L280

Added line #L280 was not covered by tests

// iterate over shard results
let errors: Vec<Result<(), Error>> = sr
Expand Down
8 changes: 3 additions & 5 deletions tests/oscardoc_rebuild.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ fn gen_corpus() {
let kenlm = Path::new("res/kenlm/").to_path_buf();

//TODO test with custom blocklists
let pipeline = OscarDoc::new(src, dst, lid, Some(bl), Some(kenlm));
let pipeline = OscarDoc::new(src, dst, lid, Some(bl), Some(kenlm), None, false);
pipeline.run().expect(
"Ensure to have shards in res/shards, lid.176.bin at root and blocklist at res/blocklist",
);
Expand Down Expand Up @@ -52,14 +52,12 @@ fn check_rebuild() {
rb.run().unwrap();

// open source corpus, store documents and order them by record id
let f = File::open(&src_corpus).unwrap();
let doc_reader_source = oscar_io::oscar_doc::Reader::new(BufReader::new(f));
let doc_reader_source = oscar_io::v3::Reader::from_path(&src_corpus).unwrap();
let mut docs_source = doc_reader_source.map(|x| x.unwrap()).collect::<Vec<_>>();
docs_source.sort_unstable_by(|a, b| get_record_id(a).cmp(&get_record_id(b)));
// open rebuilt corpus
dst.push("fr_meta.jsonl");
let f = File::open(&dst).unwrap();
let doc_reader_rebuild = oscar_io::oscar_doc::Reader::new(BufReader::new(f));
let doc_reader_rebuild = oscar_io::v3::Reader::from_path(&dst).unwrap();
let mut docs_rebuild = doc_reader_rebuild.map(|x| x.unwrap()).collect::<Vec<_>>();
docs_rebuild.sort_unstable_by(|a, b| get_record_id(a).cmp(&get_record_id(b)));

Expand Down

0 comments on commit d378c22

Please sign in to comment.