diff --git a/jsurfer-all/src/test/java/org/jsfr/json/FastJsonParesrTest.java b/jsurfer-all/src/test/java/org/jsfr/json/FastJsonParesrTest.java index 88e8476..8430e17 100644 --- a/jsurfer-all/src/test/java/org/jsfr/json/FastJsonParesrTest.java +++ b/jsurfer-all/src/test/java/org/jsfr/json/FastJsonParesrTest.java @@ -49,7 +49,7 @@ public void onValue(Object value, ParsingContext context) { context.pause(); } }).build(); - ResumableParser parser = surfer.getResumableParser(read("sample.json"), config); + ResumableParser parser = surfer.createResumableParser(read("sample.json"), config); assertFalse(parser.resume()); LOGGER.info("Start parsing"); parser.parse(); diff --git a/jsurfer-all/src/test/java/org/jsfr/json/GsonParserTest.java b/jsurfer-all/src/test/java/org/jsfr/json/GsonParserTest.java index d2c1bad..6ca60b0 100644 --- a/jsurfer-all/src/test/java/org/jsfr/json/GsonParserTest.java +++ b/jsurfer-all/src/test/java/org/jsfr/json/GsonParserTest.java @@ -114,7 +114,7 @@ public void onValue(Object value, ParsingContext context) { context.pause(); } }).build(); - ResumableParser parser = surfer.getResumableParser(read("sample.json"), config); + ResumableParser parser = surfer.createResumableParser(read("sample.json"), config); assertFalse(parser.resume()); LOGGER.info("Start parsing"); parser.parse(); diff --git a/jsurfer-all/src/test/java/org/jsfr/json/JacksonParserTest.java b/jsurfer-all/src/test/java/org/jsfr/json/JacksonParserTest.java index 9ec7ec2..774b269 100644 --- a/jsurfer-all/src/test/java/org/jsfr/json/JacksonParserTest.java +++ b/jsurfer-all/src/test/java/org/jsfr/json/JacksonParserTest.java @@ -40,6 +40,8 @@ import java.util.concurrent.atomic.AtomicLong; import static org.junit.Assert.*; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.*; /** * Created by Leo on 2015/3/30. @@ -117,7 +119,7 @@ public void onValue(Object value, ParsingContext context) { context.pause(); } }).build(); - ResumableParser parser = surfer.getResumableParser(read("sample.json"), config); + ResumableParser parser = surfer.createResumableParser(read("sample.json"), config); assertFalse(parser.resume()); LOGGER.info("Start parsing"); parser.parse(); @@ -129,4 +131,24 @@ public void onValue(Object value, ParsingContext context) { assertFalse(parser.resume()); } + @Test + public void testNonBlockingParser() throws Exception { + JsonPathListener mockListener = mock(JsonPathListener.class); + SurfingConfiguration config = surfer.configBuilder() + .bind("$['foo','bar']", mockListener) + .build(); + byte[] part1 = "{\"foo\": 12".getBytes("UTF-8"); + byte[] part2 = "34, \"bar\": \"ab".getBytes("UTF-8"); + byte[] part3 = "cd\"}".getBytes("UTF-8"); + + NonBlockingParser nonBlockingParser = surfer.createNonBlockingParser(config); + assertTrue(nonBlockingParser.feed(part1, 0, part1.length)); + assertTrue(nonBlockingParser.feed(part2, 0, part2.length)); + assertTrue(nonBlockingParser.feed(part3, 0, part3.length)); + nonBlockingParser.endOfInput(); + assertFalse(nonBlockingParser.feed(part1, 0, 100)); + verify(mockListener).onValue(eq(provider.primitive(1234L)), any(ParsingContext.class)); + verify(mockListener).onValue(eq(provider.primitive("abcd")), any(ParsingContext.class)); + } + } diff --git a/jsurfer-all/src/test/java/org/jsfr/json/JsonSurferTest.java b/jsurfer-all/src/test/java/org/jsfr/json/JsonSurferTest.java index d09e60f..4836edb 100644 --- a/jsurfer-all/src/test/java/org/jsfr/json/JsonSurferTest.java +++ b/jsurfer-all/src/test/java/org/jsfr/json/JsonSurferTest.java @@ -85,12 +85,12 @@ public void onValue(Object value, ParsingContext context) { assertEquals("bar", context.load("foo", String.class)); } }) - .bind("$.store.book[0]", new JsonPathListener() { - @Override - public void onValue(Object value, ParsingContext context) { - assertNull(context.load("foo", String.class)); - } - }).buildAndSurf(read("sample.json")); + .bind("$.store.book[0]", new JsonPathListener() { + @Override + public void onValue(Object value, ParsingContext context) { + assertNull(context.load("foo", String.class)); + } + }).buildAndSurf(read("sample.json")); } @Test @@ -328,7 +328,7 @@ protected Reader read(String resourceName) throws IOException { return new InputStreamReader(Resources.getResource(resourceName).openStream(), StandardCharsets.UTF_8); } - private String readAsString(String resourceName) throws IOException { + protected String readAsString(String resourceName) throws IOException { return Resources.toString(Resources.getResource(resourceName), StandardCharsets.UTF_8); } diff --git a/jsurfer-core/src/main/java/org/jsfr/json/JsonParserAdapter.java b/jsurfer-core/src/main/java/org/jsfr/json/JsonParserAdapter.java index 6f5e0b8..cf82a46 100644 --- a/jsurfer-core/src/main/java/org/jsfr/json/JsonParserAdapter.java +++ b/jsurfer-core/src/main/java/org/jsfr/json/JsonParserAdapter.java @@ -54,7 +54,7 @@ public interface JsonParserAdapter { * @param context Surfing context * @return Resumable Parser */ - ResumableParser createParser(Reader reader, SurfingContext context); + ResumableParser createResumableParser(Reader reader, SurfingContext context); /** * Create a resumable parser @@ -63,6 +63,14 @@ public interface JsonParserAdapter { * @param context Surfing context * @return Resumable parser */ - ResumableParser createParser(String json, SurfingContext context); + ResumableParser createResumableParser(String json, SurfingContext context); + + /** + * Create a NonBlockingParser + * + * @param context Surfing context + * @return NonBlockingParser + */ + NonBlockingParser createNonBlockingParser(SurfingContext context); } diff --git a/jsurfer-core/src/main/java/org/jsfr/json/JsonSurfer.java b/jsurfer-core/src/main/java/org/jsfr/json/JsonSurfer.java index 85148d6..072ebbc 100644 --- a/jsurfer-core/src/main/java/org/jsfr/json/JsonSurfer.java +++ b/jsurfer-core/src/main/java/org/jsfr/json/JsonSurfer.java @@ -85,7 +85,7 @@ public SurfingConfiguration.Builder configBuilder() { */ public Iterator iterator(Reader reader, JsonPath jsonPath) { SurfingContext context = createIteratorContext(jsonPath); - final ResumableParser resumableParser = jsonParserAdapter.createParser(reader, context); + final ResumableParser resumableParser = jsonParserAdapter.createResumableParser(reader, context); resumableParser.parse(); return createIterator(context, resumableParser); } @@ -99,7 +99,7 @@ public Iterator iterator(Reader reader, JsonPath jsonPath) { */ public Iterator iterator(String json, JsonPath jsonPath) { SurfingContext context = createIteratorContext(jsonPath); - final ResumableParser resumableParser = jsonParserAdapter.createParser(json, context); + final ResumableParser resumableParser = jsonParserAdapter.createResumableParser(json, context); resumableParser.parse(); return createIterator(context, resumableParser); } @@ -159,7 +159,7 @@ public void surf(String json, SurfingConfiguration configuration) { } /** - * @param json Json source + * @param json Json source * @param configuration SurfingConfiguration that holds JsonPath binding */ public void surf(Reader json, SurfingConfiguration configuration) { @@ -167,14 +167,19 @@ public void surf(Reader json, SurfingConfiguration configuration) { jsonParserAdapter.parse(json, new SurfingContext(configuration)); } - public ResumableParser getResumableParser(String json, SurfingConfiguration configuration) { + public ResumableParser createResumableParser(String json, SurfingConfiguration configuration) { ensureSetting(configuration); - return jsonParserAdapter.createParser(json, new SurfingContext(configuration)); + return jsonParserAdapter.createResumableParser(json, new SurfingContext(configuration)); } - public ResumableParser getResumableParser(Reader json, SurfingConfiguration configuration) { + public ResumableParser createResumableParser(Reader json, SurfingConfiguration configuration) { ensureSetting(configuration); - return jsonParserAdapter.createParser(json, new SurfingContext(configuration)); + return jsonParserAdapter.createResumableParser(json, new SurfingContext(configuration)); + } + + public NonBlockingParser createNonBlockingParser(SurfingConfiguration configuration) { + ensureSetting(configuration); + return jsonParserAdapter.createNonBlockingParser(new SurfingContext(configuration)); } /** diff --git a/jsurfer-core/src/main/java/org/jsfr/json/NonBlockingParser.java b/jsurfer-core/src/main/java/org/jsfr/json/NonBlockingParser.java new file mode 100644 index 0000000..2e12b3d --- /dev/null +++ b/jsurfer-core/src/main/java/org/jsfr/json/NonBlockingParser.java @@ -0,0 +1,23 @@ +package org.jsfr.json; + +/** + * Interface for non-blocking parsing + */ +public interface NonBlockingParser extends ResumableParser { + + /** + * Feed data and start or resume parsing immediately + * + * @param bytes bytes to feed + * @param start start position + * @param end end position + * @return true if all feed data is successfully consumed + */ + boolean feed(byte[] bytes, int start, int end); + + /** + * Called to notify parser the input ended + */ + void endOfInput(); + +} diff --git a/jsurfer-core/src/main/java/org/jsfr/json/ResumableParser.java b/jsurfer-core/src/main/java/org/jsfr/json/ResumableParser.java index 4016b67..178fcac 100644 --- a/jsurfer-core/src/main/java/org/jsfr/json/ResumableParser.java +++ b/jsurfer-core/src/main/java/org/jsfr/json/ResumableParser.java @@ -13,7 +13,7 @@ public interface ResumableParser { /** * Resume parsing. It should not be invoked before parse()! * - * @return true if parser is not stopped and in paused state before calling resume() + * @return true if parser was in paused state before invoking resume() */ boolean resume(); diff --git a/jsurfer-core/src/main/java/org/jsfr/json/SurfingContext.java b/jsurfer-core/src/main/java/org/jsfr/json/SurfingContext.java index 6247783..090b1e2 100644 --- a/jsurfer-core/src/main/java/org/jsfr/json/SurfingContext.java +++ b/jsurfer-core/src/main/java/org/jsfr/json/SurfingContext.java @@ -127,7 +127,7 @@ public boolean endJSON() { // clear resources currentPosition.clear(); currentPosition = null; - this.stopped = true; + this.stop(); return true; } @@ -276,13 +276,14 @@ public T load(String key, Class tClass) { return this.transientMap != null ? tClass.cast(this.transientMap.get(key)) : null; } - private boolean shouldBreak() { + public boolean shouldBreak() { return this.stopped || this.paused; } @Override public void stop() { this.stopped = true; + this.paused = false; } @Override diff --git a/jsurfer-fastjson/src/main/java/org/jsfr/json/FastJsonParser.java b/jsurfer-fastjson/src/main/java/org/jsfr/json/FastJsonParser.java index c3b23bc..35e8d27 100644 --- a/jsurfer-fastjson/src/main/java/org/jsfr/json/FastJsonParser.java +++ b/jsurfer-fastjson/src/main/java/org/jsfr/json/FastJsonParser.java @@ -35,7 +35,7 @@ public void parse() { @Override public boolean resume() { try { - if (context.isStopped() || !context.isPaused()) { + if (!context.isPaused()) { return false; } context.resume(); @@ -52,7 +52,7 @@ private void doParse() { try { String tempString = null; - while (!lexer.isEOF() && !context.isStopped() && !context.isPaused()) { + while (!lexer.isEOF() && !context.shouldBreak()) { lexer.nextToken(); int token = lexer.token(); //System.out.println("token: " + token); @@ -131,22 +131,27 @@ private FastJsonParser() { @Override public void parse(Reader reader, SurfingContext context) { - createParser(reader, context).parse(); + createResumableParser(reader, context).parse(); } @Override public void parse(String json, SurfingContext context) { - createParser(json, context).parse(); + createResumableParser(json, context).parse(); } @Override - public ResumableParser createParser(Reader reader, SurfingContext context) { + public ResumableParser createResumableParser(Reader reader, SurfingContext context) { return new FastJsonResumableParser(new JSONReaderScanner(reader), context, new StaticPrimitiveHolder()); } @Override - public ResumableParser createParser(String json, SurfingContext context) { + public ResumableParser createResumableParser(String json, SurfingContext context) { return new FastJsonResumableParser(new JSONScanner(json), context, new StaticPrimitiveHolder()); } + @Override + public NonBlockingParser createNonBlockingParser(SurfingContext context) { + throw new UnsupportedOperationException("Unsupported"); + } + } diff --git a/jsurfer-gson/src/main/java/org/jsfr/json/GsonParser.java b/jsurfer-gson/src/main/java/org/jsfr/json/GsonParser.java index 3003b9c..d41b1f7 100644 --- a/jsurfer-gson/src/main/java/org/jsfr/json/GsonParser.java +++ b/jsurfer-gson/src/main/java/org/jsfr/json/GsonParser.java @@ -60,7 +60,7 @@ public void parse() { @Override public boolean resume() { try { - if (context.isStopped() || !context.isPaused()) { + if (!context.isPaused()) { return false; } context.resume(); @@ -74,7 +74,7 @@ public boolean resume() { private void doParse() { try { - while (!context.isStopped() && !context.isPaused()) { + while (!context.shouldBreak()) { JsonToken token = jsonReader.peek(); switch (token) { case BEGIN_ARRAY: @@ -119,7 +119,7 @@ private void doParse() { break; case END_DOCUMENT: context.endJSON(); - return; + break; } } } catch (Exception e) { @@ -135,16 +135,16 @@ private GsonParser() { @Override public void parse(Reader reader, SurfingContext context) { - createParser(reader, context).parse(); + createResumableParser(reader, context).parse(); } @Override public void parse(String json, SurfingContext context) { - createParser(json, context).parse(); + createResumableParser(json, context).parse(); } @Override - public ResumableParser createParser(Reader reader, SurfingContext context) { + public ResumableParser createResumableParser(Reader reader, SurfingContext context) { final JsonReader jsonReader = new JsonReader(reader); final JsonProvider jsonProvider = context.getConfig().getJsonProvider(); @@ -197,8 +197,13 @@ public void doSkipValue() throws IOException { } @Override - public ResumableParser createParser(String json, SurfingContext context) { - return createParser(new StringReader(json), context); + public ResumableParser createResumableParser(String json, SurfingContext context) { + return createResumableParser(new StringReader(json), context); + } + + @Override + public NonBlockingParser createNonBlockingParser(SurfingContext context) { + throw new UnsupportedOperationException("Unsupported"); } } diff --git a/jsurfer-jackson/pom.xml b/jsurfer-jackson/pom.xml index af8a266..371c8a9 100644 --- a/jsurfer-jackson/pom.xml +++ b/jsurfer-jackson/pom.xml @@ -61,7 +61,7 @@ com.fasterxml.jackson.core jackson-databind - 2.8.2 + 2.9.0 diff --git a/jsurfer-jackson/src/main/java/org/jsfr/json/JacksonParser.java b/jsurfer-jackson/src/main/java/org/jsfr/json/JacksonParser.java index 70919f3..171a6f0 100644 --- a/jsurfer-jackson/src/main/java/org/jsfr/json/JacksonParser.java +++ b/jsurfer-jackson/src/main/java/org/jsfr/json/JacksonParser.java @@ -27,6 +27,7 @@ import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.core.json.async.NonBlockingJsonParser; import org.jsfr.json.provider.JsonProvider; import java.io.IOException; @@ -34,28 +35,92 @@ public class JacksonParser implements JsonParserAdapter { + private static class JacksonNonblockingParser extends JacksonResumableParser implements NonBlockingParser { + + private NonBlockingJsonParser nonBlockingJsonParser; + + JacksonNonblockingParser(NonBlockingJsonParser jsonParser, SurfingContext context) { + super(jsonParser, context); + this.nonBlockingJsonParser = jsonParser; + } + + @Override + public boolean feed(byte[] bytes, int start, int end) { + try { + if (nonBlockingJsonParser.needMoreInput() && !context.isStopped()) { + nonBlockingJsonParser.feedInput(bytes, start, end); + if (context.isPaused()) { + context.resume(); + doPare(); + } else { + parse(); + } + return true; + } + } catch (IOException e) { + context.getConfig().getErrorHandlingStrategy().handleParsingException(e); + } + return false; + } + + @Override + public void endOfInput() { + nonBlockingJsonParser.endOfInput(); + resume(); + } + + } + private static class JacksonResumableParser implements ResumableParser { private JsonParser jsonParser; - private SurfingContext context; + SurfingContext context; private AbstractPrimitiveHolder stringHolder; private AbstractPrimitiveHolder longHolder; private AbstractPrimitiveHolder doubleHolder; private StaticPrimitiveHolder staticHolder; - public JacksonResumableParser(JsonParser jsonParser, SurfingContext context, AbstractPrimitiveHolder stringHolder, AbstractPrimitiveHolder longHolder, AbstractPrimitiveHolder doubleHolder, StaticPrimitiveHolder staticHolder) { + JacksonResumableParser(final JsonParser jsonParser, SurfingContext context) { this.jsonParser = jsonParser; this.context = context; - this.stringHolder = stringHolder; - this.longHolder = longHolder; - this.doubleHolder = doubleHolder; - this.staticHolder = staticHolder; + final JsonProvider jsonProvider = context.getConfig().getJsonProvider(); + this.stringHolder = new AbstractPrimitiveHolder(context.getConfig()) { + @Override + public Object doGetValue() throws IOException { + return jsonProvider.primitive(jsonParser.getText()); + } + + @Override + public void doSkipValue() throws IOException { + } + }; + this.longHolder = new AbstractPrimitiveHolder(context.getConfig()) { + @Override + public Object doGetValue() throws IOException { + return jsonProvider.primitive(jsonParser.getLongValue()); + } + + @Override + public void doSkipValue() throws IOException { + } + }; + this.doubleHolder = new AbstractPrimitiveHolder(context.getConfig()) { + @Override + public Object doGetValue() throws IOException { + return jsonProvider.primitive(jsonParser.getDoubleValue()); + } + + @Override + public void doSkipValue() throws IOException { + } + }; + this.staticHolder = new StaticPrimitiveHolder(); } @Override public boolean resume() { try { - if (context.isStopped() || !context.isPaused()) { + if (!context.isPaused()) { return false; } context.resume(); @@ -77,18 +142,18 @@ public void parse() { } } - private void doPare() throws IOException { - final JsonProvider jsonProvider = context.getConfig().getJsonProvider(); - - while (!context.isStopped() && !context.isPaused()) { + void doPare() throws IOException { + JsonProvider jsonProvider = context.getConfig().getJsonProvider(); + while (!context.shouldBreak()) { JsonToken token = jsonParser.nextToken(); if (token == null) { context.endJSON(); - return; + break; } switch (token) { case NOT_AVAILABLE: - return; + context.pause(); + break; case START_OBJECT: context.startObject(); break; @@ -148,19 +213,19 @@ public JacksonParser(JsonFactory factory) { @Override public void parse(Reader reader, final SurfingContext context) { - createParser(reader, context).parse(); + createResumableParser(reader, context).parse(); } @Override public void parse(String json, SurfingContext context) { - createParser(json, context).parse(); + createResumableParser(json, context).parse(); } @Override - public ResumableParser createParser(Reader reader, SurfingContext context) { + public ResumableParser createResumableParser(Reader reader, SurfingContext context) { try { final JsonParser jp = this.factory.createParser(reader); - return createParser(jp, context); + return createResumableParser(jp, context); } catch (Exception e) { context.getConfig().getErrorHandlingStrategy().handleParsingException(e); return null; @@ -168,50 +233,29 @@ public ResumableParser createParser(Reader reader, SurfingContext context) { } @Override - public ResumableParser createParser(String json, SurfingContext context) { + public ResumableParser createResumableParser(String json, SurfingContext context) { try { final JsonParser jp = this.factory.createParser(json); - return createParser(jp, context); + return createResumableParser(jp, context); } catch (Exception e) { context.getConfig().getErrorHandlingStrategy().handleParsingException(e); return null; } } - private ResumableParser createParser(final JsonParser jp, SurfingContext context) { - final JsonProvider jsonProvider = context.getConfig().getJsonProvider(); - AbstractPrimitiveHolder stringHolder = new AbstractPrimitiveHolder(context.getConfig()) { - @Override - public Object doGetValue() throws IOException { - return jsonProvider.primitive(jp.getText()); - } - - @Override - public void doSkipValue() throws IOException { - } - }; - AbstractPrimitiveHolder longHolder = new AbstractPrimitiveHolder(context.getConfig()) { - @Override - public Object doGetValue() throws IOException { - return jsonProvider.primitive(jp.getLongValue()); - } - - @Override - public void doSkipValue() throws IOException { - } - }; - AbstractPrimitiveHolder doubleHolder = new AbstractPrimitiveHolder(context.getConfig()) { - @Override - public Object doGetValue() throws IOException { - return jsonProvider.primitive(jp.getDoubleValue()); - } + @Override + public NonBlockingParser createNonBlockingParser(SurfingContext context) { + try { + NonBlockingJsonParser jp = (NonBlockingJsonParser) factory.createNonBlockingByteArrayParser(); + return new JacksonNonblockingParser(jp, context); + } catch (IOException e) { + context.getConfig().getErrorHandlingStrategy().handleParsingException(e); + } + return null; + } - @Override - public void doSkipValue() throws IOException { - } - }; - StaticPrimitiveHolder staticPrimitiveHolder = new StaticPrimitiveHolder(); - return new JacksonResumableParser(jp, context, stringHolder, longHolder, doubleHolder, staticPrimitiveHolder); + private JacksonResumableParser createResumableParser(final JsonParser jp, SurfingContext context) { + return new JacksonResumableParser(jp, context); } } diff --git a/jsurfer-jsonsimple/src/main/java/org/jsfr/json/JsonSimpleParser.java b/jsurfer-jsonsimple/src/main/java/org/jsfr/json/JsonSimpleParser.java index 201f55c..d502af3 100644 --- a/jsurfer-jsonsimple/src/main/java/org/jsfr/json/JsonSimpleParser.java +++ b/jsurfer-jsonsimple/src/main/java/org/jsfr/json/JsonSimpleParser.java @@ -52,12 +52,17 @@ public void parse(String json, SurfingContext context) { } @Override - public ResumableParser createParser(Reader reader, SurfingContext context) { + public ResumableParser createResumableParser(Reader reader, SurfingContext context) { throw new UnsupportedOperationException("Unsupported"); } @Override - public ResumableParser createParser(String json, SurfingContext context) { + public ResumableParser createResumableParser(String json, SurfingContext context) { + throw new UnsupportedOperationException("Unsupported"); + } + + @Override + public NonBlockingParser createNonBlockingParser(SurfingContext context) { throw new UnsupportedOperationException("Unsupported"); }