Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support streaming large JSON arrays and objects #1091

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Prev Previous commit
Next Next commit
some code cleanup
  • Loading branch information
luxalpa committed Dec 13, 2023
commit 2cf1c7d8ca8fa498c4973285f2a5e2dc96b2b8f2
229 changes: 122 additions & 107 deletions src/de.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2472,7 +2472,7 @@ where

//////////////////////////////////////////////////////////////////////////////

#[derive(Eq, PartialEq)]
#[derive(Eq, PartialEq, Clone, Copy)]
enum ContainerKind {
Array,
Object,
Expand All @@ -2492,8 +2492,8 @@ struct IterativeBaseDeserializer<'de, R> {
de: Deserializer<R>,
lifetime: PhantomData<&'de ()>,
nesting_change: Vec<NestingCommand>,
cur_resolved_level: usize,
cur_expected_level: usize,
cur_level: usize,
future_level: usize,
}

impl<'de, R> IterativeBaseDeserializer<'de, R>
Expand All @@ -2508,122 +2508,109 @@ where
kind: initial_kind,
direction: NestingDirection::Enter,
}],
cur_resolved_level: 0,
cur_expected_level: 1,
cur_level: 0,
future_level: 1,
}
}

fn next_value<T: de::Deserialize<'de>>(&mut self) -> Option<Result<T>> {
Some(de::Deserialize::deserialize(&mut self.de))
}

// This is an associated function instead of using `self` due to the mutable borrow on the caller.
fn nest_enter(
de: &mut Deserializer<R>,
cur_nesting: &mut usize,
kind: ContainerKind,
last_entered: bool,
) -> Result<()> {
// We don't want a preceding comma if we just entered a container.
// In the case of cur_nesting == 0, we are before the top-level container, and we also don't want commas there!
let needs_comma = *cur_nesting != 0 && !last_entered;
if needs_comma {
match de.parse_whitespace() {
tri!(match de.parse_whitespace() {
Ok(Some(b',')) => {
de.eat_char();
Ok(())
}
Ok(Some(_)) => {
return Err(de.peek_error(ErrorCode::Message("Expected Comma".into())))
}
Ok(None) => return Err(de.peek_error(ErrorCode::EofWhileParsingValue)),
Err(e) => return Err(e),
}
Ok(Some(_)) => Err(de.peek_error(ErrorCode::ExpectedComma)),
Ok(None) => Err(de.peek_error(ErrorCode::EofWhileParsingValue)),
Err(e) => Err(e),
});
}

match de.parse_whitespace() {
Ok(Some(b'[')) => {
if !matches!(kind, ContainerKind::Array) {
// TODO: ErrorCode
return Err(de.peek_error(ErrorCode::Message(
"Unexpected value while nesting 1".into(),
)));
}
}
Ok(Some(b'{')) => {
if !matches!(kind, ContainerKind::Object) {
// TODO: ErrorCode
return Err(de.peek_error(ErrorCode::Message(
"Unexpected value while nesting 2".into(),
)));
// ensure we find the correct opening token ('[' or '{')
tri!(match de.parse_whitespace() {
Err(e) => Err(e),
Ok(None) => Err(de.peek_error(ErrorCode::EofWhileParsingValue)),
Ok(Some(token)) => match (token, kind) {
(b'[', ContainerKind::Array) | (b'{', ContainerKind::Object) => {
de.eat_char();
Ok(())
}
}
Ok(None) => return Err(de.peek_error(ErrorCode::EofWhileParsingValue)),
Err(e) => return Err(e),
Ok(Some(_)) => {
// TODO: ErrorCode
return Err(de.peek_error(ErrorCode::Message(
"Unexpected value while nesting 3".into(),
)));
}
}
de.eat_char();
_ => Err(de.peek_error(match kind {
ContainerKind::Array => ErrorCode::ExpectedListStart,
ContainerKind::Object => ErrorCode::ExpectedObjectStart,
})),
},
});

*cur_nesting += 1;

Ok(())
}

// This is an associated function instead of using `self` due to the mutable borrow on the caller.
fn nest_leave(
de: &mut Deserializer<R>,
cur_nesting: &mut usize,
kind: ContainerKind,
) -> Result<()> {
match de.parse_whitespace() {
Ok(Some(b']')) => {
if !matches!(kind, ContainerKind::Array) {
// TODO: ErrorCode
return Err(de.peek_error(ErrorCode::Message(
"Unexpected value while unnesting".into(),
)));
}
}
Ok(Some(b'}')) => {
if !matches!(kind, ContainerKind::Object) {
// TODO: ErrorCode
return Err(de.peek_error(ErrorCode::Message(
"Unexpected value while unnesting".into(),
)));
// ensure we find the correct closing token (']' or '}')
tri!(match de.parse_whitespace() {
Err(e) => Err(e),
Ok(None) => Err(de.peek_error(ErrorCode::EofWhileParsingValue)),
Ok(Some(token)) => match (token, kind) {
(b']', ContainerKind::Array) | (b'}', ContainerKind::Object) => {
de.eat_char();
Ok(())
}
}
Ok(None) => return Err(de.peek_error(ErrorCode::EofWhileParsingValue)),
Err(e) => return Err(e),
Ok(Some(_)) => {
// TODO: ErrorCode
return Err(de.peek_error(ErrorCode::Message(
"Unexpected value while unnesting".into(),
)));
}
}
de.eat_char();
_ => Err(de.peek_error(match kind {
ContainerKind::Array => ErrorCode::ExpectedListEnd,
ContainerKind::Object => ErrorCode::ExpectedObjectEnd,
})),
},
});

*cur_nesting -= 1;

if *cur_nesting == 0 {
// Anything that comes after we left the top-level container is an error.
return de.end();
}

Ok(())
}

fn adjust_nesting(&mut self) -> Result<bool> {
// Applies all the queued up nesting commands. Afterwards, current_level and future_level should
// be the same.
fn resolve_nesting(&mut self) -> Result<bool> {
// When we just entered a container, the next character must not be a comma.
// Otherwise, it must be a comma.
let mut last_entered = false;
for cmd in self.nesting_change.drain(..) {
last_entered = match cmd.direction {
NestingDirection::Enter => {
Self::nest_enter(
tri!(Self::nest_enter(
&mut self.de,
&mut self.cur_resolved_level,
&mut self.cur_level,
cmd.kind,
last_entered,
)?;
last_entered
));
true
}
NestingDirection::Leave => {
Self::nest_leave(&mut self.de, &mut self.cur_resolved_level, cmd.kind)?;
tri!(Self::nest_leave(
&mut self.de,
&mut self.cur_level,
cmd.kind
));
false
}
};
Expand All @@ -2632,36 +2619,42 @@ where
Ok(last_entered)
}

// Gets the actual JSON value that was requested
fn next_value<T: de::Deserialize<'de>>(&mut self) -> Option<Result<T>> {
Some(de::Deserialize::deserialize(&mut self.de))
}

// Due to the way the lifetimes work, we can be sure that at this point only an array is expected
fn next_arr_val<T: de::Deserialize<'de>>(
&mut self,
expected_level: usize,
) -> Option<Result<T>> {
let entered = match self.adjust_nesting() {
Ok(entered) => entered,
// First, we need to resolve all the nesting that was queued up before.
let needs_comma = match self.resolve_nesting() {
Ok(entered) => !entered,
Err(e) => return Some(Err(e)),
};

if self.cur_resolved_level == 0 || self.cur_resolved_level != expected_level {
if self.cur_level == 0 || self.cur_level != expected_level {
// We already ran `end()` at this point during `nest_leave`
dbg!(self.cur_resolved_level, expected_level);
return None;
}

match self.de.parse_whitespace() {
Ok(None) => Some(Err(self.de.peek_error(ErrorCode::EofWhileParsingValue))),
Ok(Some(b']')) => {
self.de.eat_char();
self.cur_resolved_level -= 1;
self.cur_expected_level -= 1;
if self.cur_resolved_level == 0 {
self.cur_level -= 1;
self.future_level -= 1;
if self.cur_level == 0 {
return match self.de.end() {
Ok(()) => None,
Err(e) => Some(Err(e)),
};
}
None
}
Ok(Some(b',')) if !entered => {
Ok(Some(b',')) if needs_comma => {
self.de.eat_char();

match self.de.parse_whitespace() {
Expand All @@ -2671,28 +2664,45 @@ where
Err(e) => Some(Err(e)),
}
}
Ok(Some(_)) if entered => self.next_value(),
Ok(Some(_)) => Some(Err(self.de.peek_error(ErrorCode::ExpectedSomeValue))),
Ok(Some(_)) => {
if !needs_comma {
self.next_value()
} else {
Some(Err(self.de.peek_error(ErrorCode::ExpectedSomeValue)))
}
}
Err(e) => Some(Err(e)),
}
}

fn next_array<'a>(&'a mut self) -> SubArrayDeserializer<'de, 'a, R>
fn queue_enter_array<'new_parent>(
&'new_parent mut self,
) -> SubArrayDeserializer<'de, 'new_parent, R>
where
'de: 'a,
'de: 'new_parent,
{
self.nesting_change.push(NestingCommand {
kind: ContainerKind::Array,
direction: NestingDirection::Enter,
});

self.cur_expected_level += 1;
self.future_level += 1;

SubArrayDeserializer {
expected_level: self.cur_expected_level,
expected_level: self.future_level,
base: self,
}
}

fn queue_leave(&mut self, expected_level: usize, kind: ContainerKind) {
if self.future_level == expected_level {
self.future_level -= 1;
self.nesting_change.push(NestingCommand {
kind,
direction: NestingDirection::Leave,
});
}
}
}

/// A streaming JSON array deserializer.
Expand Down Expand Up @@ -2746,11 +2756,13 @@ where
}

/// Some docs TODO
pub fn next_array<'a>(&'a mut self) -> SubArrayDeserializer<'de, 'a, R>
pub fn next_array<'new_parent>(
&'new_parent mut self,
) -> SubArrayDeserializer<'de, 'new_parent, R>
where
'de: 'a,
'de: 'new_parent,
{
self.base.next_array()
self.base.queue_enter_array()
}
}

Expand All @@ -2763,38 +2775,41 @@ impl<'de, R: read::Read<'de>> From<Deserializer<R>> for ArrayDeserializer<'de, R
}

/// docs TODO
pub struct SubArrayDeserializer<'de1, 'de2, R> {
base: &'de2 mut IterativeBaseDeserializer<'de1, R>,
pub struct SubArrayDeserializer<'de, 'parent, R>
where
R: read::Read<'de>,
{
base: &'parent mut IterativeBaseDeserializer<'de, R>,
expected_level: usize,
}

impl<'de1, 'de2, R> SubArrayDeserializer<'de1, 'de2, R>
impl<'de, 'parent, R> SubArrayDeserializer<'de, 'parent, R>
where
R: read::Read<'de1>,
R: read::Read<'de>,
{
/// Return the next element from the array. Returns None if there are no more elements.
pub fn next<T: de::Deserialize<'de1>>(&mut self) -> Option<Result<T>> {
pub fn next<T: de::Deserialize<'de>>(&mut self) -> Option<Result<T>> {
self.base.next_arr_val(self.expected_level)
}

/// Some docs TODO
pub fn next_array<'a>(&'a mut self) -> SubArrayDeserializer<'de1, 'a, R>
pub fn next_array<'new_parent>(
&'new_parent mut self,
) -> SubArrayDeserializer<'de, 'new_parent, R>
where
'de1: 'a,
'de: 'new_parent,
{
self.base.next_array()
self.base.queue_enter_array()
}
}

impl<'de1, 'de2, R> Drop for SubArrayDeserializer<'de1, 'de2, R> {
impl<'de, 'parent, R> Drop for SubArrayDeserializer<'de, 'parent, R>
where
R: read::Read<'de>,
{
fn drop(&mut self) {
if self.base.cur_expected_level == self.expected_level {
self.base.cur_expected_level -= 1;
self.base.nesting_change.push(NestingCommand {
kind: ContainerKind::Array,
direction: NestingDirection::Leave,
});
}
self.base
.queue_leave(self.expected_level, ContainerKind::Array);
}
}

Expand Down
Loading