Skip to content

Commit

Permalink
Merge branch 'non-blocking-parser'
Browse files Browse the repository at this point in the history
  • Loading branch information
wanglingsong committed Aug 11, 2017
2 parents bfdbe11 + cb16db3 commit ca67375
Show file tree
Hide file tree
Showing 14 changed files with 209 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public void onValue(Object value, ParsingContext context) {
context.pause();
}
}).build();
ResumableParser parser = surfer.getResumableParser(read("sample.json"), config);
ResumableParser parser = surfer.createResumableParser(read("sample.json"), config);
assertFalse(parser.resume());
LOGGER.info("Start parsing");
parser.parse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public void onValue(Object value, ParsingContext context) {
context.pause();
}
}).build();
ResumableParser parser = surfer.getResumableParser(read("sample.json"), config);
ResumableParser parser = surfer.createResumableParser(read("sample.json"), config);
assertFalse(parser.resume());
LOGGER.info("Start parsing");
parser.parse();
Expand Down
24 changes: 23 additions & 1 deletion jsurfer-all/src/test/java/org/jsfr/json/JacksonParserTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import java.util.concurrent.atomic.AtomicLong;

import static org.junit.Assert.*;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.*;

/**
* Created by Leo on 2015/3/30.
Expand Down Expand Up @@ -117,7 +119,7 @@ public void onValue(Object value, ParsingContext context) {
context.pause();
}
}).build();
ResumableParser parser = surfer.getResumableParser(read("sample.json"), config);
ResumableParser parser = surfer.createResumableParser(read("sample.json"), config);
assertFalse(parser.resume());
LOGGER.info("Start parsing");
parser.parse();
Expand All @@ -129,4 +131,24 @@ public void onValue(Object value, ParsingContext context) {
assertFalse(parser.resume());
}

@Test
public void testNonBlockingParser() throws Exception {
JsonPathListener mockListener = mock(JsonPathListener.class);
SurfingConfiguration config = surfer.configBuilder()
.bind("$['foo','bar']", mockListener)
.build();
byte[] part1 = "{\"foo\": 12".getBytes("UTF-8");
byte[] part2 = "34, \"bar\": \"ab".getBytes("UTF-8");
byte[] part3 = "cd\"}".getBytes("UTF-8");

NonBlockingParser nonBlockingParser = surfer.createNonBlockingParser(config);
assertTrue(nonBlockingParser.feed(part1, 0, part1.length));
assertTrue(nonBlockingParser.feed(part2, 0, part2.length));
assertTrue(nonBlockingParser.feed(part3, 0, part3.length));
nonBlockingParser.endOfInput();
assertFalse(nonBlockingParser.feed(part1, 0, 100));
verify(mockListener).onValue(eq(provider.primitive(1234L)), any(ParsingContext.class));
verify(mockListener).onValue(eq(provider.primitive("abcd")), any(ParsingContext.class));
}

}
14 changes: 7 additions & 7 deletions jsurfer-all/src/test/java/org/jsfr/json/JsonSurferTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,12 @@ public void onValue(Object value, ParsingContext context) {
assertEquals("bar", context.load("foo", String.class));
}
})
.bind("$.store.book[0]", new JsonPathListener() {
@Override
public void onValue(Object value, ParsingContext context) {
assertNull(context.load("foo", String.class));
}
}).buildAndSurf(read("sample.json"));
.bind("$.store.book[0]", new JsonPathListener() {
@Override
public void onValue(Object value, ParsingContext context) {
assertNull(context.load("foo", String.class));
}
}).buildAndSurf(read("sample.json"));
}

@Test
Expand Down Expand Up @@ -328,7 +328,7 @@ protected Reader read(String resourceName) throws IOException {
return new InputStreamReader(Resources.getResource(resourceName).openStream(), StandardCharsets.UTF_8);
}

private String readAsString(String resourceName) throws IOException {
protected String readAsString(String resourceName) throws IOException {
return Resources.toString(Resources.getResource(resourceName), StandardCharsets.UTF_8);
}

Expand Down
12 changes: 10 additions & 2 deletions jsurfer-core/src/main/java/org/jsfr/json/JsonParserAdapter.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public interface JsonParserAdapter {
* @param context Surfing context
* @return Resumable Parser
*/
ResumableParser createParser(Reader reader, SurfingContext context);
ResumableParser createResumableParser(Reader reader, SurfingContext context);

/**
* Create a resumable parser
Expand All @@ -63,6 +63,14 @@ public interface JsonParserAdapter {
* @param context Surfing context
* @return Resumable parser
*/
ResumableParser createParser(String json, SurfingContext context);
ResumableParser createResumableParser(String json, SurfingContext context);

/**
* Create a NonBlockingParser
*
* @param context Surfing context
* @return NonBlockingParser
*/
NonBlockingParser createNonBlockingParser(SurfingContext context);

}
19 changes: 12 additions & 7 deletions jsurfer-core/src/main/java/org/jsfr/json/JsonSurfer.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public SurfingConfiguration.Builder configBuilder() {
*/
public Iterator<Object> iterator(Reader reader, JsonPath jsonPath) {
SurfingContext context = createIteratorContext(jsonPath);
final ResumableParser resumableParser = jsonParserAdapter.createParser(reader, context);
final ResumableParser resumableParser = jsonParserAdapter.createResumableParser(reader, context);
resumableParser.parse();
return createIterator(context, resumableParser);
}
Expand All @@ -99,7 +99,7 @@ public Iterator<Object> iterator(Reader reader, JsonPath jsonPath) {
*/
public Iterator<Object> iterator(String json, JsonPath jsonPath) {
SurfingContext context = createIteratorContext(jsonPath);
final ResumableParser resumableParser = jsonParserAdapter.createParser(json, context);
final ResumableParser resumableParser = jsonParserAdapter.createResumableParser(json, context);
resumableParser.parse();
return createIterator(context, resumableParser);
}
Expand Down Expand Up @@ -159,22 +159,27 @@ public void surf(String json, SurfingConfiguration configuration) {
}

/**
* @param json Json source
* @param json Json source
* @param configuration SurfingConfiguration that holds JsonPath binding
*/
public void surf(Reader json, SurfingConfiguration configuration) {
ensureSetting(configuration);
jsonParserAdapter.parse(json, new SurfingContext(configuration));
}

public ResumableParser getResumableParser(String json, SurfingConfiguration configuration) {
public ResumableParser createResumableParser(String json, SurfingConfiguration configuration) {
ensureSetting(configuration);
return jsonParserAdapter.createParser(json, new SurfingContext(configuration));
return jsonParserAdapter.createResumableParser(json, new SurfingContext(configuration));
}

public ResumableParser getResumableParser(Reader json, SurfingConfiguration configuration) {
public ResumableParser createResumableParser(Reader json, SurfingConfiguration configuration) {
ensureSetting(configuration);
return jsonParserAdapter.createParser(json, new SurfingContext(configuration));
return jsonParserAdapter.createResumableParser(json, new SurfingContext(configuration));
}

public NonBlockingParser createNonBlockingParser(SurfingConfiguration configuration) {
ensureSetting(configuration);
return jsonParserAdapter.createNonBlockingParser(new SurfingContext(configuration));
}

/**
Expand Down
23 changes: 23 additions & 0 deletions jsurfer-core/src/main/java/org/jsfr/json/NonBlockingParser.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.jsfr.json;

/**
* Interface for non-blocking parsing
*/
public interface NonBlockingParser extends ResumableParser {

/**
* Feed data and start or resume parsing immediately
*
* @param bytes bytes to feed
* @param start start position
* @param end end position
* @return true if all feed data is successfully consumed
*/
boolean feed(byte[] bytes, int start, int end);

/**
* Called to notify parser the input ended
*/
void endOfInput();

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public interface ResumableParser {
/**
* Resume parsing. It should not be invoked before parse()!
*
* @return true if parser is not stopped and in paused state before calling resume()
* @return true if parser was in paused state before invoking resume()
*/
boolean resume();

Expand Down
5 changes: 3 additions & 2 deletions jsurfer-core/src/main/java/org/jsfr/json/SurfingContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public boolean endJSON() {
// clear resources
currentPosition.clear();
currentPosition = null;
this.stopped = true;
this.stop();
return true;
}

Expand Down Expand Up @@ -276,13 +276,14 @@ public <T> T load(String key, Class<T> tClass) {
return this.transientMap != null ? tClass.cast(this.transientMap.get(key)) : null;
}

private boolean shouldBreak() {
public boolean shouldBreak() {
return this.stopped || this.paused;
}

@Override
public void stop() {
this.stopped = true;
this.paused = false;
}

@Override
Expand Down
17 changes: 11 additions & 6 deletions jsurfer-fastjson/src/main/java/org/jsfr/json/FastJsonParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public void parse() {
@Override
public boolean resume() {
try {
if (context.isStopped() || !context.isPaused()) {
if (!context.isPaused()) {
return false;
}
context.resume();
Expand All @@ -52,7 +52,7 @@ private void doParse() {
try {
String tempString = null;

while (!lexer.isEOF() && !context.isStopped() && !context.isPaused()) {
while (!lexer.isEOF() && !context.shouldBreak()) {
lexer.nextToken();
int token = lexer.token();
//System.out.println("token: " + token);
Expand Down Expand Up @@ -131,22 +131,27 @@ private FastJsonParser() {

@Override
public void parse(Reader reader, SurfingContext context) {
createParser(reader, context).parse();
createResumableParser(reader, context).parse();
}

@Override
public void parse(String json, SurfingContext context) {
createParser(json, context).parse();
createResumableParser(json, context).parse();
}

@Override
public ResumableParser createParser(Reader reader, SurfingContext context) {
public ResumableParser createResumableParser(Reader reader, SurfingContext context) {
return new FastJsonResumableParser(new JSONReaderScanner(reader), context, new StaticPrimitiveHolder());
}

@Override
public ResumableParser createParser(String json, SurfingContext context) {
public ResumableParser createResumableParser(String json, SurfingContext context) {
return new FastJsonResumableParser(new JSONScanner(json), context, new StaticPrimitiveHolder());
}

@Override
public NonBlockingParser createNonBlockingParser(SurfingContext context) {
throw new UnsupportedOperationException("Unsupported");
}

}
21 changes: 13 additions & 8 deletions jsurfer-gson/src/main/java/org/jsfr/json/GsonParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public void parse() {
@Override
public boolean resume() {
try {
if (context.isStopped() || !context.isPaused()) {
if (!context.isPaused()) {
return false;
}
context.resume();
Expand All @@ -74,7 +74,7 @@ public boolean resume() {

private void doParse() {
try {
while (!context.isStopped() && !context.isPaused()) {
while (!context.shouldBreak()) {
JsonToken token = jsonReader.peek();
switch (token) {
case BEGIN_ARRAY:
Expand Down Expand Up @@ -119,7 +119,7 @@ private void doParse() {
break;
case END_DOCUMENT:
context.endJSON();
return;
break;
}
}
} catch (Exception e) {
Expand All @@ -135,16 +135,16 @@ private GsonParser() {

@Override
public void parse(Reader reader, SurfingContext context) {
createParser(reader, context).parse();
createResumableParser(reader, context).parse();
}

@Override
public void parse(String json, SurfingContext context) {
createParser(json, context).parse();
createResumableParser(json, context).parse();
}

@Override
public ResumableParser createParser(Reader reader, SurfingContext context) {
public ResumableParser createResumableParser(Reader reader, SurfingContext context) {
final JsonReader jsonReader = new JsonReader(reader);
final JsonProvider jsonProvider = context.getConfig().getJsonProvider();

Expand Down Expand Up @@ -197,8 +197,13 @@ public void doSkipValue() throws IOException {
}

@Override
public ResumableParser createParser(String json, SurfingContext context) {
return createParser(new StringReader(json), context);
public ResumableParser createResumableParser(String json, SurfingContext context) {
return createResumableParser(new StringReader(json), context);
}

@Override
public NonBlockingParser createNonBlockingParser(SurfingContext context) {
throw new UnsupportedOperationException("Unsupported");
}

}
2 changes: 1 addition & 1 deletion jsurfer-jackson/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.8.2</version>
<version>2.9.0</version>
</dependency>
</dependencies>

Expand Down
Loading

0 comments on commit ca67375

Please sign in to comment.