Skip to content

Commit

Permalink
parser: better detection of CSV dialects
Browse files Browse the repository at this point in the history
The previous implementation of CSV dialect detection was pretty poor, and often
resulted in incorrect delimiters.  This introduces a new detection module that
works much better. For one thing, the delimiter search space is much more
reasonable, in that it no longer includes space characters. Also, the detection
now accounts for the whole dialect instead of detecting delimiters and quote
characters using two separate functions. Now, we look at the whole dialect
(delimiter, quote, escape character, line ending) and assign it an overall
score, and return the dialect with the greatest score.

Scoring is based on two main factors: the number of rows that were able to be
successfuly parsed, and the consistency of columns between rows. The idea
behind the number of rows is that incorrect dialects will result in parse
errors, and thus a lower count of succesfully parsed rows. A dialect that fails
to parse the entire sample can pretty trivially be ranked worse than one that
is able to parse all the content.  The idea behind consistency is that the
correct dialect will result in rows having a relatively consistent number of
columns. We use the standard deviation of the column counts to measure this.
The score comparison also accounts for the average number of columns, and
prefers dialects resulting in a greater number of columns, as long as the
standard deviation isn't too much worse. These two factors (standard deviation
and average column count) are combined into an overall [coefficient of
variation](https://en.wikipedia.org/wiki/Coefficient_of_variation), and we
prefer scores having a lower coefficient.
  • Loading branch information
psFried committed Jan 29, 2024
1 parent 8e7e460 commit f70f36a
Show file tree
Hide file tree
Showing 17 changed files with 1,760 additions and 151 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/parser/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ criterion = { workspace = true }
csv = { workspace = true }
encoding_rs = { workspace = true }
flate2 = { workspace = true }
itertools = { workspace = true }
mime = { workspace = true }
num-bigint = { workspace = true }
protobuf = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion crates/parser/src/config/character_separated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ impl EnumSelection for Escape {
}
}

#[derive(Debug, PartialEq, Eq, Clone, Copy, Serialize, Deserialize, EnumIter)]
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, Serialize, Deserialize, EnumIter)]
pub enum Quote {
#[serde(rename = "\"")]
DoubleQuote,
Expand Down
276 changes: 276 additions & 0 deletions crates/parser/src/format/character_separated/detection.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,276 @@
use bytes::Bytes;
use std::cmp::Ordering;

use super::new_read_builder;
use crate::character_separated::{Delimiter, Escape, LineEnding, Quote};

/// Represents the relative score given to a specific dialect for a given sequence of bytes.
#[derive(Debug, Copy, Clone, Default)]
pub struct DetectionScore {
/// The number of rows having > 0 delimiters appearing in them
row_count: usize,
/// The average number of delimiters appearing in _all_ rows (including those with 0 delimiters)
mean_row_score: f64,
/// The standard
row_score_stddev: f64,
}

impl DetectionScore {
/// https://en.wikipedia.org/wiki/Coefficient_of_variation
fn coefficient_of_variation(&self) -> f64 {
if self.mean_row_score > 0.0 {
self.row_score_stddev / self.mean_row_score
} else {
panic!("cannot comput coefficient_of_variation with 0 mean_column_count");
}
}
}

impl PartialEq for DetectionScore {
fn eq(&self, other: &Self) -> bool {
self.partial_cmp(other) == Some(std::cmp::Ordering::Equal)
}
}

impl PartialOrd for DetectionScore {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
// Order first based on row count
let row_cmp = self.row_count.cmp(&other.row_count);
if !row_cmp.is_eq() {
return Some(row_cmp);
}

if self.row_count == 0 {
// if both have 0 row counts, then then other fields are irrelevant
Some(Ordering::Equal)
} else if self.mean_row_score == 0.0 || other.mean_row_score == 0.0 {
// if either has a 0 mean row score, then the larger wins, or else they are equal
self.mean_row_score.partial_cmp(&other.mean_row_score)
} else {
// Finally, use the reverse order comparison of coefficient of variation
// (lower coefficient means it's a higher score)
self.coefficient_of_variation()
.partial_cmp(&other.coefficient_of_variation())
.map(std::cmp::Ordering::reverse)
}
}
}

/// The detected dialect of a CSV file
#[derive(Debug)]
pub struct Dialect {
pub quote: Quote,
pub delimiter: Delimiter,
/// The score that was computed for this dialect
pub score: DetectionScore,
}

/// Tries to detect the dialect of a CSV file, based on a prefix of the file provided in `peeked`.
/// Only the delimiter and quote are currently detected, though we may wish to support detecting the
/// escape character in the future. The values from the configuration are passed in, and
/// `line_separator` and `escape` are required. If `config_quote` or `config_delimiter` are Some,
/// then the search space will be limited to only those values, and they will be returned in the
/// detected dialect.
/// A dialect is always detected and returned, even though it may not be a very good fit. This reflects
/// the reality that even an incorrect dialect can usually at least result in a single column per line.
pub fn detect_dialect(
line_separator: LineEnding,
escape: Escape,
peeked: Bytes,

config_quote: Option<Quote>,
config_delimiter: Option<Delimiter>,
) -> Dialect {
let mut best_score = DetectionScore::default();
let mut best_index = 0usize;
let permutations = get_dialect_candidates(config_quote, config_delimiter);
for (i, (quote, delimiter)) in permutations.iter().enumerate() {
let score = compute_score(peeked.clone(), *quote, *delimiter, line_separator, escape);
tracing::trace!(?quote, ?delimiter, ?score, "computed score for dialect");
if score > best_score {
best_score = score;
best_index = i;
}
}

Dialect {
quote: permutations[best_index].0,
delimiter: permutations[best_index].1,
score: best_score,
}
}

/// Returns a set of dialect options to use as candidates.
/// If either or both of `config_quote` or `config_delimiter` are Some,
/// then the returned candidates will all have that value. Otherwise, a
/// default set is used.
fn get_dialect_candidates(
config_quote: Option<Quote>,
config_delimiter: Option<Delimiter>,
) -> Vec<(Quote, Delimiter)> {
use itertools::Itertools;

let all_quotes: Vec<Quote> = if let Some(q) = config_quote {
vec![q]
} else {
vec![Quote::DoubleQuote, Quote::SingleQuote]
};
let all_delims: Vec<Delimiter> = if let Some(d) = config_delimiter {
vec![d]
} else {
vec![
Delimiter::Comma,
Delimiter::Pipe,
Delimiter::Tab,
Delimiter::Semicolon,
]
};

all_quotes
.into_iter()
.cartesian_product(all_delims)
.collect()
}

fn compute_score(
peeked: Bytes,
quote: Quote,
delimiter: Delimiter,
line_ending: LineEnding,
escape: Escape,
) -> DetectionScore {
use bytes::Buf;

let mut builder = new_read_builder(line_ending, quote, delimiter, escape);
builder.has_headers(false);
let mut reader = builder.from_reader(peeked.reader());

// Build a vec containing a count of the delimiters in each row. It's critical that we count
// delimiter here instead of cells. Consider that _any_ candidate delimiter will work to parse
// rows into a single column each. If we counted cells, we could easily end up with an obviously
// incorrect (to humans) candidate delimiter that has a "perfect" score due to having a mean cell
// count of 1 and a standard deviation of 0. Counting delimiters means that we give a 0 score to
// each row that doesn't contain the delimiter.
let mut row_scores = Vec::new();
let mut record = csv::ByteRecord::new();
// A count of rows having more than one column. Note that we still add a 0 to `row_scores` when
// such a row is encountered, but we do not count them here. This count is used as the first-order
// comparison of scores, and we want to give preference to dialects that result in a greater number
// of rows that can actually be parsed correctly.
let mut row_count = 0;
while let Ok(more) = reader.read_byte_record(&mut record) {
if !more {
break;
}
let score = record.len().saturating_sub(1);
if score > 0 {
row_count += 1;
}
row_scores.push(score);
}

let (mean_row_score, row_score_stddev) = if row_count > 0 {
let n_rows = row_scores.len() as f64;
let sum = row_scores.iter().copied().sum::<usize>();
let mean = (sum as f64) / n_rows;
let variance_sum = row_scores
.iter()
.map(|score| {
let diff = mean - (*score as f64);
diff * diff
})
.sum::<f64>();

let stddev = (variance_sum / n_rows).sqrt();
(mean, stddev)
} else {
(0.0, 0.0)
};
DetectionScore {
row_count,
mean_row_score,
row_score_stddev,
}
}

#[cfg(test)]
mod test {
use super::*;

#[test]
fn detection_score_comparison() {
let test_cases = &[
// Ordering is done first based on row count
((4, 30.5, 3.5), (5, 30.5, 3.5), Some(Ordering::Less)),
((99, 25.0, 12.0), (1, 1.0, 0.0), Some(Ordering::Greater)),
((2, 0.0, 0.0), (1, 0.0, 0.0), Some(Ordering::Greater)),
((0, 0.0, 0.0), (1, 1.0, 0.0), Some(Ordering::Less)),
// If row counts are equal, reverse order based on ratio of stddev/mean
((4, 30.0, 3.0), (4, 30.0, 3.0), Some(Ordering::Equal)),
((4, 30.0, 3.0), (4, 30.0, 3.0), Some(Ordering::Equal)),
((4, 3.0, 0.0), (4, 90.0, 14.5), Some(Ordering::Greater)),
// Except for special cases when either mean row score is 0
((0, 0.0, 0.0), (0, 0.0, 0.0), Some(Ordering::Equal)),
((2, 0.0, 0.0), (2, 0.0, 0.0), Some(Ordering::Equal)),
((2, 0.0, 0.0), (2, 0.01, 1.0), Some(Ordering::Less)),
];

for (a_tuple, b_tuple, expected_ord) in test_cases.into_iter() {
let a = DetectionScore {
row_count: a_tuple.0,
mean_row_score: a_tuple.1,
row_score_stddev: a_tuple.2,
};
let b = DetectionScore {
row_count: b_tuple.0,
mean_row_score: b_tuple.1,
row_score_stddev: b_tuple.2,
};
let actual = a.partial_cmp(&b);
assert_eq!(
*expected_ord, actual,
"a: {a:?}, b: {b:?}, expected: {expected_ord:?}, actual: {actual:?}"
);
let expected_rev = expected_ord.map(std::cmp::Ordering::reverse);
let actual = b.partial_cmp(&a);
assert_eq!(expected_rev, actual, "reflexive case: a: {a:?}, b: {b:?}, expected: {expected_rev:?}, actual: {actual:?}");
}
}

#[test]
fn dialect_detection() {
#[derive(Debug, PartialEq, serde::Deserialize)]
struct DetectionResult {
quote: Quote,
delimiter: Delimiter,
}

for result in std::fs::read_dir("src/format/character_separated/detection_cases").unwrap() {
let entry = result.unwrap();

let filename = entry.file_name();
let path = entry.path();
let content = bytes::Bytes::from(std::fs::read(path).unwrap());

// The first line of each file must be a json object with the expected detection results
let newline_idx = content.iter().position(|b| *b == b'\n').unwrap();
let expect_json = content.slice(0..newline_idx);
let csv = content.slice((newline_idx + 1)..);

let expected: DetectionResult = serde_json::from_slice(&expect_json)
.expect("failed to deserialize expected detection result");

let dialect = detect_dialect(LineEnding::CRLF, Escape::None, csv, None, None);
let actual = DetectionResult {
quote: dialect.quote,
delimiter: dialect.delimiter,
};
let score = dialect.score;

assert_eq!(
expected, actual,
"detection failed for '{filename:?}', actual score was {score:?}"
);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"quote": "\"", "delimiter": ","}
,,,,,,,,,,,,,
id,fleet,country,ship_type,ship_class,name,description,start_date,sunk_date,sunk_report,losses,source,x,y
,0,46,5,,Blücher,,1939-09-20,1940-04-09,Sunk in the Battle of Drøbak Soundon 9 April 1940 ,,http://upload.wikimedia.org/wikipedia/commons/d/d5/Bundesarchiv_DVM_10_Bild-23-63-09%2C_Kreuzer_%22Bl%C3%BCcher%22.jpg,59.7,10.591667
,0,46,5,,Königsberg,,1929-04-17,1940-04-10,"Sunk 10 April 1940 at Bergen, Norway ",,http://upload.wikimedia.org/wikipedia/commons/4/40/K%C3%B6nigsberg_3.jpg,60.4,5.316667
,0,46,6,,Karlsruhe,,1929-11-06,1940-04-09,Damaged by torpedoes fired byHMS Truant and later sunk on 9 April 1940 ,,http://upload.wikimedia.org/wikipedia/commons/c/ca/Bundesarchiv_Bild_102-12746%2C_Kaiser-Wilhelm-Kanal%2C_Kreuzer_%22Karlsruhe%22.jpg ,57.574779,7.976074
Loading

0 comments on commit f70f36a

Please sign in to comment.