From 56ac516c03b35f61420ff8df3c0b2729c1d88090 Mon Sep 17 00:00:00 2001 From: "koo.taejin" Date: Thu, 19 Jan 2023 16:08:30 +0900 Subject: [PATCH 1/2] [#691] Improve memory usage via reuse InputStreamBufferInput --- .../msgpack/core/buffer/ArrayBufferInput.java | 11 +++- .../msgpack/core/buffer/ByteBufferInput.java | 3 +- .../core/buffer/ChannelBufferInput.java | 4 +- .../core/buffer/InputStreamBufferInput.java | 4 +- .../core/buffer/MessageBufferInput.java | 4 +- .../buffer/SequenceMessageBufferInput.java | 7 ++- .../msgpack/core/MessageUnpackerTest.scala | 5 +- .../msgpack/core/buffer/ByteStringTest.scala | 4 +- .../dataformat/MessageBufferInputLocator.java | 23 +++++++ .../MessageBufferInputProvider.java | 23 +++++++ .../MessageBufferInputRegistry.java | 46 ++++++++++++++ .../jackson/dataformat/MessagePackParser.java | 63 ++++++++++++++----- .../msgpack/jackson/dataformat/Triple.java | 49 +++++++++++++++ 13 files changed, 218 insertions(+), 28 deletions(-) create mode 100644 msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/MessageBufferInputLocator.java create mode 100644 msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/MessageBufferInputProvider.java create mode 100644 msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/MessageBufferInputRegistry.java create mode 100644 msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/Triple.java diff --git a/msgpack-core/src/main/java/org/msgpack/core/buffer/ArrayBufferInput.java b/msgpack-core/src/main/java/org/msgpack/core/buffer/ArrayBufferInput.java index 5c6454e69..b8166ed2c 100644 --- a/msgpack-core/src/main/java/org/msgpack/core/buffer/ArrayBufferInput.java +++ b/msgpack-core/src/main/java/org/msgpack/core/buffer/ArrayBufferInput.java @@ -21,7 +21,7 @@ * MessageBufferInput adapter for byte arrays */ public class ArrayBufferInput - implements MessageBufferInput + implements MessageBufferInput { private MessageBuffer buffer; private boolean isEmpty; @@ -66,9 +66,14 @@ public MessageBuffer reset(MessageBuffer buf) return old; } - public void reset(byte[] arr) + @Override + public byte[] reset(byte[] arr) { - reset(MessageBuffer.wrap(checkNotNull(arr, "input array is null"))); + final MessageBuffer messageBuffer = reset(MessageBuffer.wrap(checkNotNull(arr, "input array is null"))); + if (messageBuffer == null) { + return null; + } + return messageBuffer.array(); } public void reset(byte[] arr, int offset, int len) diff --git a/msgpack-core/src/main/java/org/msgpack/core/buffer/ByteBufferInput.java b/msgpack-core/src/main/java/org/msgpack/core/buffer/ByteBufferInput.java index fd0311b83..0cb5d4238 100644 --- a/msgpack-core/src/main/java/org/msgpack/core/buffer/ByteBufferInput.java +++ b/msgpack-core/src/main/java/org/msgpack/core/buffer/ByteBufferInput.java @@ -23,7 +23,7 @@ * {@link MessageBufferInput} adapter for {@link java.nio.ByteBuffer} */ public class ByteBufferInput - implements MessageBufferInput + implements MessageBufferInput { private ByteBuffer input; private boolean isRead = false; @@ -39,6 +39,7 @@ public ByteBufferInput(ByteBuffer input) * @param input new buffer * @return the old buffer */ + @Override public ByteBuffer reset(ByteBuffer input) { ByteBuffer old = this.input; diff --git a/msgpack-core/src/main/java/org/msgpack/core/buffer/ChannelBufferInput.java b/msgpack-core/src/main/java/org/msgpack/core/buffer/ChannelBufferInput.java index e8d7c1de8..ea636240e 100644 --- a/msgpack-core/src/main/java/org/msgpack/core/buffer/ChannelBufferInput.java +++ b/msgpack-core/src/main/java/org/msgpack/core/buffer/ChannelBufferInput.java @@ -26,7 +26,7 @@ * {@link MessageBufferInput} adapter for {@link java.nio.channels.ReadableByteChannel} */ public class ChannelBufferInput - implements MessageBufferInput + implements MessageBufferInput { private ReadableByteChannel channel; private final MessageBuffer buffer; @@ -49,8 +49,8 @@ public ChannelBufferInput(ReadableByteChannel channel, int bufferSize) * @param channel new channel * @return the old resource */ + @Override public ReadableByteChannel reset(ReadableByteChannel channel) - throws IOException { ReadableByteChannel old = this.channel; this.channel = channel; diff --git a/msgpack-core/src/main/java/org/msgpack/core/buffer/InputStreamBufferInput.java b/msgpack-core/src/main/java/org/msgpack/core/buffer/InputStreamBufferInput.java index d605fec3a..39d93309a 100644 --- a/msgpack-core/src/main/java/org/msgpack/core/buffer/InputStreamBufferInput.java +++ b/msgpack-core/src/main/java/org/msgpack/core/buffer/InputStreamBufferInput.java @@ -26,7 +26,7 @@ * {@link MessageBufferInput} adapter for {@link InputStream} */ public class InputStreamBufferInput - implements MessageBufferInput + implements MessageBufferInput { private InputStream in; private final byte[] buffer; @@ -60,8 +60,8 @@ public InputStreamBufferInput(InputStream in, int bufferSize) * @param in new stream * @return the old resource */ + @Override public InputStream reset(InputStream in) - throws IOException { InputStream old = this.in; this.in = in; diff --git a/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBufferInput.java b/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBufferInput.java index 77f7a06f6..716933928 100644 --- a/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBufferInput.java +++ b/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBufferInput.java @@ -24,7 +24,7 @@ * A MessageBufferInput implementation has control of lifecycle of the memory so that it can reuse previously * allocated memory, use memory pools, or use memory-mapped files. */ -public interface MessageBufferInput +public interface MessageBufferInput extends Closeable { /** @@ -40,6 +40,8 @@ public interface MessageBufferInput MessageBuffer next() throws IOException; + T reset(T input); + /** * Closes the input. *

diff --git a/msgpack-core/src/test/java/org/msgpack/core/buffer/SequenceMessageBufferInput.java b/msgpack-core/src/test/java/org/msgpack/core/buffer/SequenceMessageBufferInput.java index 10b91d20a..6b11915ea 100644 --- a/msgpack-core/src/test/java/org/msgpack/core/buffer/SequenceMessageBufferInput.java +++ b/msgpack-core/src/test/java/org/msgpack/core/buffer/SequenceMessageBufferInput.java @@ -24,7 +24,7 @@ * {@link MessageBufferInput} adapter for {@link MessageBufferInput} Enumeration */ public class SequenceMessageBufferInput - implements MessageBufferInput + implements MessageBufferInput { private Enumeration sequence; private MessageBufferInput input; @@ -54,6 +54,11 @@ public MessageBuffer next() throws IOException return buffer; } + @Override + public Void reset(Void input) { + throw new UnsupportedOperationException("reset"); + } + private void nextInput() throws IOException { if (input != null) { diff --git a/msgpack-core/src/test/scala/org/msgpack/core/MessageUnpackerTest.scala b/msgpack-core/src/test/scala/org/msgpack/core/MessageUnpackerTest.scala index 3ea5e911d..1fb2ae4d1 100644 --- a/msgpack-core/src/test/scala/org/msgpack/core/MessageUnpackerTest.scala +++ b/msgpack-core/src/test/scala/org/msgpack/core/MessageUnpackerTest.scala @@ -29,7 +29,7 @@ import scala.jdk.CollectionConverters._ import scala.util.Random object MessageUnpackerTest { - class SplitMessageBufferInput(array: Array[Array[Byte]]) extends MessageBufferInput { + class SplitMessageBufferInput(array: Array[Array[Byte]]) extends MessageBufferInput[Void] { var cursor = 0 override def next(): MessageBuffer = { if (cursor < array.length) { @@ -41,6 +41,9 @@ object MessageUnpackerTest { } } + + override def reset(input: Void): Void = throw new UnsupportedOperationException("reset") + override def close(): Unit = {} } } diff --git a/msgpack-core/src/test/scala/org/msgpack/core/buffer/ByteStringTest.scala b/msgpack-core/src/test/scala/org/msgpack/core/buffer/ByteStringTest.scala index 42872fc44..109566973 100644 --- a/msgpack-core/src/test/scala/org/msgpack/core/buffer/ByteStringTest.scala +++ b/msgpack-core/src/test/scala/org/msgpack/core/buffer/ByteStringTest.scala @@ -26,7 +26,7 @@ class ByteStringTest extends AirSpec { private val byteString = ByteString(createMessagePackData(_.packString(unpackedString))) private def unpackString(messageBuffer: MessageBuffer) = { - val input = new MessageBufferInput { + val input = new MessageBufferInput[Void] { private var isRead = false @@ -38,6 +38,8 @@ class ByteStringTest extends AirSpec { messageBuffer } override def close(): Unit = {} + + override def reset(input: Void): Void = throw new UnsupportedOperationException("reset") } MessagePack.newDefaultUnpacker(input).unpackString() diff --git a/msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/MessageBufferInputLocator.java b/msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/MessageBufferInputLocator.java new file mode 100644 index 000000000..a431a4e0a --- /dev/null +++ b/msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/MessageBufferInputLocator.java @@ -0,0 +1,23 @@ +// +// MessagePack for Java +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +package org.msgpack.jackson.dataformat; + +import org.msgpack.core.buffer.MessageBufferInput; + +public interface MessageBufferInputLocator +{ + MessageBufferInput get(Class clazz); +} diff --git a/msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/MessageBufferInputProvider.java b/msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/MessageBufferInputProvider.java new file mode 100644 index 000000000..96ce91ab8 --- /dev/null +++ b/msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/MessageBufferInputProvider.java @@ -0,0 +1,23 @@ +// +// MessagePack for Java +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +package org.msgpack.jackson.dataformat; + +import org.msgpack.core.buffer.MessageBufferInput; + +interface MessageBufferInputProvider +{ + MessageBufferInput provide(); +} diff --git a/msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/MessageBufferInputRegistry.java b/msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/MessageBufferInputRegistry.java new file mode 100644 index 000000000..06f3409c3 --- /dev/null +++ b/msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/MessageBufferInputRegistry.java @@ -0,0 +1,46 @@ +// +// MessagePack for Java +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +package org.msgpack.jackson.dataformat; + +import org.msgpack.core.buffer.MessageBufferInput; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +public class MessageBufferInputRegistry implements MessageBufferInputLocator +{ + private final Map messageBufferInputMap = new HashMap<>(1); + + @Override + public MessageBufferInput get(Class clazz) + { + return messageBufferInputMap.get(clazz); + } + + public boolean register(Class clazz, MessageBufferInputProvider provider) + { + Objects.requireNonNull(clazz, "clazz"); + Objects.requireNonNull(provider, "provider"); + + if (messageBufferInputMap.containsKey(clazz)) { + return false; + } + + messageBufferInputMap.put(clazz, provider.provide()); + return true; + } +} diff --git a/msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/MessagePackParser.java b/msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/MessagePackParser.java index 2a95b69a0..9f40b0d11 100644 --- a/msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/MessagePackParser.java +++ b/msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/MessagePackParser.java @@ -47,8 +47,8 @@ public class MessagePackParser extends ParserMinimalBase { - private static final ThreadLocal> messageUnpackerHolder = - new ThreadLocal>(); + private static final ThreadLocal> reuseObjectHolder = + new ThreadLocal<>(); private final MessageUnpacker messageUnpacker; private static final BigInteger LONG_MIN = BigInteger.valueOf((long) Long.MIN_VALUE); @@ -126,11 +126,17 @@ public MessagePackParser( IOContext ctxt, int features, ObjectCodec objectCodec, - InputStream in, + final InputStream in, boolean reuseResourceInParser) throws IOException { - this(ctxt, features, new InputStreamBufferInput(in), objectCodec, in, reuseResourceInParser); + this(ctxt, features, new MessageBufferInputProvider() { + @Override + public MessageBufferInput provide() + { + return new InputStreamBufferInput(in); + } + }, objectCodec, in, reuseResourceInParser); } public MessagePackParser(IOContext ctxt, int features, ObjectCodec objectCodec, byte[] bytes) @@ -143,16 +149,22 @@ public MessagePackParser( IOContext ctxt, int features, ObjectCodec objectCodec, - byte[] bytes, + final byte[] bytes, boolean reuseResourceInParser) throws IOException { - this(ctxt, features, new ArrayBufferInput(bytes), objectCodec, bytes, reuseResourceInParser); + this(ctxt, features, new MessageBufferInputProvider() { + @Override + public MessageBufferInput provide() + { + return new ArrayBufferInput(bytes); + } + }, objectCodec, bytes, reuseResourceInParser); } private MessagePackParser(IOContext ctxt, int features, - MessageBufferInput input, + MessageBufferInputProvider bufferInputProvider, ObjectCodec objectCodec, Object src, boolean reuseResourceInParser) @@ -167,7 +179,7 @@ private MessagePackParser(IOContext ctxt, parsingContext = JsonReadContext.createRootContext(dups); this.reuseResourceInParser = reuseResourceInParser; if (!reuseResourceInParser) { - this.messageUnpacker = MessagePack.newDefaultUnpacker(input); + this.messageUnpacker = MessagePack.newDefaultUnpacker(bufferInputProvider.provide()); return; } else { @@ -175,21 +187,40 @@ private MessagePackParser(IOContext ctxt, } MessageUnpacker messageUnpacker; - Tuple messageUnpackerTuple = messageUnpackerHolder.get(); - if (messageUnpackerTuple == null) { - messageUnpacker = MessagePack.newDefaultUnpacker(input); + MessageBufferInputLocator messageBufferInputLocator; + Triple messageUnpackerTriple = reuseObjectHolder.get(); + if (messageUnpackerTriple == null) { + final MessageBufferInputRegistry messageBufferInputRegistry = new MessageBufferInputRegistry(); + messageBufferInputRegistry.register(src.getClass(), bufferInputProvider); + messageBufferInputLocator = messageBufferInputRegistry; + messageUnpacker = MessagePack.newDefaultUnpacker(messageBufferInputRegistry.get(src.getClass())); } else { // Considering to reuse InputStream with JsonParser.Feature.AUTO_CLOSE_SOURCE, // MessagePackParser needs to use the MessageUnpacker that has the same InputStream // since it has buffer which has loaded the InputStream data ahead. // However, it needs to call MessageUnpacker#reset when the source is different from the previous one. - if (isEnabled(JsonParser.Feature.AUTO_CLOSE_SOURCE) || messageUnpackerTuple.first() != src) { - messageUnpackerTuple.second().reset(input); + if (isEnabled(JsonParser.Feature.AUTO_CLOSE_SOURCE) || messageUnpackerTriple.first() != src) { + final MessageBufferInputLocator bufferInputLocator = messageUnpackerTriple.third(); + MessageBufferInput messageBufferInput = bufferInputLocator.get(src.getClass()); + if (messageBufferInput != null) { + messageBufferInput.reset(src); + } + else { + if (bufferInputLocator instanceof MessageBufferInputRegistry) { + ((MessageBufferInputRegistry) bufferInputLocator).register(src.getClass(), bufferInputProvider); + messageBufferInput = bufferInputLocator.get(src.getClass()); + } + else { + messageBufferInput = bufferInputProvider.provide(); + } + } + messageUnpackerTriple.second().reset(messageBufferInput); } - messageUnpacker = messageUnpackerTuple.second(); + messageUnpacker = messageUnpackerTriple.second(); + messageBufferInputLocator = messageUnpackerTriple.third(); } - messageUnpackerHolder.set(new Tuple(src, messageUnpacker)); + reuseObjectHolder.set(new Triple(src, messageUnpacker, messageBufferInputLocator)); } public void setExtensionTypeCustomDeserializers(ExtensionTypeCustomDeserializers extTypeCustomDesers) @@ -690,7 +721,7 @@ private MessageUnpacker getMessageUnpacker() return this.messageUnpacker; } - Tuple messageUnpackerTuple = messageUnpackerHolder.get(); + Triple messageUnpackerTuple = reuseObjectHolder.get(); if (messageUnpackerTuple == null) { throw new IllegalStateException("messageUnpacker is null"); } diff --git a/msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/Triple.java b/msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/Triple.java new file mode 100644 index 000000000..db630ae4a --- /dev/null +++ b/msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/Triple.java @@ -0,0 +1,49 @@ +// +// MessagePack for Java +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +package org.msgpack.jackson.dataformat; + +/** + * Created by komamitsu on 5/28/15. + */ +public class Triple +{ + private final F first; + private final S second; + + private final T third; + + public Triple(F first, S second, T third) + { + this.first = first; + this.second = second; + this.third = third; + } + + public F first() + { + return first; + } + + public S second() + { + return second; + } + + public T third() + { + return third; + } +} From ac9d28aa43438f303752d54bb0bdb87a29b48e20 Mon Sep 17 00:00:00 2001 From: Mitsunori Komatsu Date: Sat, 21 Jan 2023 14:26:28 +0900 Subject: [PATCH 2/2] Refactor the original code --- .../msgpack/core/buffer/ArrayBufferInput.java | 11 +- .../msgpack/core/buffer/ByteBufferInput.java | 3 +- .../core/buffer/ChannelBufferInput.java | 4 +- .../core/buffer/InputStreamBufferInput.java | 4 +- .../core/buffer/MessageBufferInput.java | 4 +- .../buffer/SequenceMessageBufferInput.java | 7 +- .../msgpack/core/MessageUnpackerTest.scala | 5 +- .../msgpack/core/buffer/ByteStringTest.scala | 4 +- .../dataformat/MessageBufferInputLocator.java | 23 ---- .../MessageBufferInputProvider.java | 23 ---- .../MessageBufferInputRegistry.java | 46 ------- .../jackson/dataformat/MessagePackParser.java | 116 ++++++++++-------- .../msgpack/jackson/dataformat/Triple.java | 4 - .../org/msgpack/jackson/dataformat/Tuple.java | 3 - 14 files changed, 80 insertions(+), 177 deletions(-) delete mode 100644 msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/MessageBufferInputLocator.java delete mode 100644 msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/MessageBufferInputProvider.java delete mode 100644 msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/MessageBufferInputRegistry.java diff --git a/msgpack-core/src/main/java/org/msgpack/core/buffer/ArrayBufferInput.java b/msgpack-core/src/main/java/org/msgpack/core/buffer/ArrayBufferInput.java index b8166ed2c..5c6454e69 100644 --- a/msgpack-core/src/main/java/org/msgpack/core/buffer/ArrayBufferInput.java +++ b/msgpack-core/src/main/java/org/msgpack/core/buffer/ArrayBufferInput.java @@ -21,7 +21,7 @@ * MessageBufferInput adapter for byte arrays */ public class ArrayBufferInput - implements MessageBufferInput + implements MessageBufferInput { private MessageBuffer buffer; private boolean isEmpty; @@ -66,14 +66,9 @@ public MessageBuffer reset(MessageBuffer buf) return old; } - @Override - public byte[] reset(byte[] arr) + public void reset(byte[] arr) { - final MessageBuffer messageBuffer = reset(MessageBuffer.wrap(checkNotNull(arr, "input array is null"))); - if (messageBuffer == null) { - return null; - } - return messageBuffer.array(); + reset(MessageBuffer.wrap(checkNotNull(arr, "input array is null"))); } public void reset(byte[] arr, int offset, int len) diff --git a/msgpack-core/src/main/java/org/msgpack/core/buffer/ByteBufferInput.java b/msgpack-core/src/main/java/org/msgpack/core/buffer/ByteBufferInput.java index 0cb5d4238..fd0311b83 100644 --- a/msgpack-core/src/main/java/org/msgpack/core/buffer/ByteBufferInput.java +++ b/msgpack-core/src/main/java/org/msgpack/core/buffer/ByteBufferInput.java @@ -23,7 +23,7 @@ * {@link MessageBufferInput} adapter for {@link java.nio.ByteBuffer} */ public class ByteBufferInput - implements MessageBufferInput + implements MessageBufferInput { private ByteBuffer input; private boolean isRead = false; @@ -39,7 +39,6 @@ public ByteBufferInput(ByteBuffer input) * @param input new buffer * @return the old buffer */ - @Override public ByteBuffer reset(ByteBuffer input) { ByteBuffer old = this.input; diff --git a/msgpack-core/src/main/java/org/msgpack/core/buffer/ChannelBufferInput.java b/msgpack-core/src/main/java/org/msgpack/core/buffer/ChannelBufferInput.java index ea636240e..e8d7c1de8 100644 --- a/msgpack-core/src/main/java/org/msgpack/core/buffer/ChannelBufferInput.java +++ b/msgpack-core/src/main/java/org/msgpack/core/buffer/ChannelBufferInput.java @@ -26,7 +26,7 @@ * {@link MessageBufferInput} adapter for {@link java.nio.channels.ReadableByteChannel} */ public class ChannelBufferInput - implements MessageBufferInput + implements MessageBufferInput { private ReadableByteChannel channel; private final MessageBuffer buffer; @@ -49,8 +49,8 @@ public ChannelBufferInput(ReadableByteChannel channel, int bufferSize) * @param channel new channel * @return the old resource */ - @Override public ReadableByteChannel reset(ReadableByteChannel channel) + throws IOException { ReadableByteChannel old = this.channel; this.channel = channel; diff --git a/msgpack-core/src/main/java/org/msgpack/core/buffer/InputStreamBufferInput.java b/msgpack-core/src/main/java/org/msgpack/core/buffer/InputStreamBufferInput.java index 39d93309a..d605fec3a 100644 --- a/msgpack-core/src/main/java/org/msgpack/core/buffer/InputStreamBufferInput.java +++ b/msgpack-core/src/main/java/org/msgpack/core/buffer/InputStreamBufferInput.java @@ -26,7 +26,7 @@ * {@link MessageBufferInput} adapter for {@link InputStream} */ public class InputStreamBufferInput - implements MessageBufferInput + implements MessageBufferInput { private InputStream in; private final byte[] buffer; @@ -60,8 +60,8 @@ public InputStreamBufferInput(InputStream in, int bufferSize) * @param in new stream * @return the old resource */ - @Override public InputStream reset(InputStream in) + throws IOException { InputStream old = this.in; this.in = in; diff --git a/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBufferInput.java b/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBufferInput.java index 716933928..77f7a06f6 100644 --- a/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBufferInput.java +++ b/msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBufferInput.java @@ -24,7 +24,7 @@ * A MessageBufferInput implementation has control of lifecycle of the memory so that it can reuse previously * allocated memory, use memory pools, or use memory-mapped files. */ -public interface MessageBufferInput +public interface MessageBufferInput extends Closeable { /** @@ -40,8 +40,6 @@ public interface MessageBufferInput MessageBuffer next() throws IOException; - T reset(T input); - /** * Closes the input. *

diff --git a/msgpack-core/src/test/java/org/msgpack/core/buffer/SequenceMessageBufferInput.java b/msgpack-core/src/test/java/org/msgpack/core/buffer/SequenceMessageBufferInput.java index 6b11915ea..10b91d20a 100644 --- a/msgpack-core/src/test/java/org/msgpack/core/buffer/SequenceMessageBufferInput.java +++ b/msgpack-core/src/test/java/org/msgpack/core/buffer/SequenceMessageBufferInput.java @@ -24,7 +24,7 @@ * {@link MessageBufferInput} adapter for {@link MessageBufferInput} Enumeration */ public class SequenceMessageBufferInput - implements MessageBufferInput + implements MessageBufferInput { private Enumeration sequence; private MessageBufferInput input; @@ -54,11 +54,6 @@ public MessageBuffer next() throws IOException return buffer; } - @Override - public Void reset(Void input) { - throw new UnsupportedOperationException("reset"); - } - private void nextInput() throws IOException { if (input != null) { diff --git a/msgpack-core/src/test/scala/org/msgpack/core/MessageUnpackerTest.scala b/msgpack-core/src/test/scala/org/msgpack/core/MessageUnpackerTest.scala index 1fb2ae4d1..3ea5e911d 100644 --- a/msgpack-core/src/test/scala/org/msgpack/core/MessageUnpackerTest.scala +++ b/msgpack-core/src/test/scala/org/msgpack/core/MessageUnpackerTest.scala @@ -29,7 +29,7 @@ import scala.jdk.CollectionConverters._ import scala.util.Random object MessageUnpackerTest { - class SplitMessageBufferInput(array: Array[Array[Byte]]) extends MessageBufferInput[Void] { + class SplitMessageBufferInput(array: Array[Array[Byte]]) extends MessageBufferInput { var cursor = 0 override def next(): MessageBuffer = { if (cursor < array.length) { @@ -41,9 +41,6 @@ object MessageUnpackerTest { } } - - override def reset(input: Void): Void = throw new UnsupportedOperationException("reset") - override def close(): Unit = {} } } diff --git a/msgpack-core/src/test/scala/org/msgpack/core/buffer/ByteStringTest.scala b/msgpack-core/src/test/scala/org/msgpack/core/buffer/ByteStringTest.scala index 109566973..42872fc44 100644 --- a/msgpack-core/src/test/scala/org/msgpack/core/buffer/ByteStringTest.scala +++ b/msgpack-core/src/test/scala/org/msgpack/core/buffer/ByteStringTest.scala @@ -26,7 +26,7 @@ class ByteStringTest extends AirSpec { private val byteString = ByteString(createMessagePackData(_.packString(unpackedString))) private def unpackString(messageBuffer: MessageBuffer) = { - val input = new MessageBufferInput[Void] { + val input = new MessageBufferInput { private var isRead = false @@ -38,8 +38,6 @@ class ByteStringTest extends AirSpec { messageBuffer } override def close(): Unit = {} - - override def reset(input: Void): Void = throw new UnsupportedOperationException("reset") } MessagePack.newDefaultUnpacker(input).unpackString() diff --git a/msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/MessageBufferInputLocator.java b/msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/MessageBufferInputLocator.java deleted file mode 100644 index a431a4e0a..000000000 --- a/msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/MessageBufferInputLocator.java +++ /dev/null @@ -1,23 +0,0 @@ -// -// MessagePack for Java -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// -package org.msgpack.jackson.dataformat; - -import org.msgpack.core.buffer.MessageBufferInput; - -public interface MessageBufferInputLocator -{ - MessageBufferInput get(Class clazz); -} diff --git a/msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/MessageBufferInputProvider.java b/msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/MessageBufferInputProvider.java deleted file mode 100644 index 96ce91ab8..000000000 --- a/msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/MessageBufferInputProvider.java +++ /dev/null @@ -1,23 +0,0 @@ -// -// MessagePack for Java -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// -package org.msgpack.jackson.dataformat; - -import org.msgpack.core.buffer.MessageBufferInput; - -interface MessageBufferInputProvider -{ - MessageBufferInput provide(); -} diff --git a/msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/MessageBufferInputRegistry.java b/msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/MessageBufferInputRegistry.java deleted file mode 100644 index 06f3409c3..000000000 --- a/msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/MessageBufferInputRegistry.java +++ /dev/null @@ -1,46 +0,0 @@ -// -// MessagePack for Java -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// -package org.msgpack.jackson.dataformat; - -import org.msgpack.core.buffer.MessageBufferInput; - -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; - -public class MessageBufferInputRegistry implements MessageBufferInputLocator -{ - private final Map messageBufferInputMap = new HashMap<>(1); - - @Override - public MessageBufferInput get(Class clazz) - { - return messageBufferInputMap.get(clazz); - } - - public boolean register(Class clazz, MessageBufferInputProvider provider) - { - Objects.requireNonNull(clazz, "clazz"); - Objects.requireNonNull(provider, "provider"); - - if (messageBufferInputMap.containsKey(clazz)) { - return false; - } - - messageBufferInputMap.put(clazz, provider.provide()); - return true; - } -} diff --git a/msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/MessagePackParser.java b/msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/MessagePackParser.java index 9f40b0d11..b2e0dfef6 100644 --- a/msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/MessagePackParser.java +++ b/msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/MessagePackParser.java @@ -33,6 +33,7 @@ import org.msgpack.core.MessageFormat; import org.msgpack.core.MessagePack; import org.msgpack.core.MessageUnpacker; +import org.msgpack.core.annotations.Nullable; import org.msgpack.core.buffer.ArrayBufferInput; import org.msgpack.core.buffer.InputStreamBufferInput; import org.msgpack.core.buffer.MessageBufferInput; @@ -47,7 +48,7 @@ public class MessagePackParser extends ParserMinimalBase { - private static final ThreadLocal> reuseObjectHolder = + private static final ThreadLocal> reuseObjectHolder = new ThreadLocal<>(); private final MessageUnpacker messageUnpacker; @@ -126,17 +127,11 @@ public MessagePackParser( IOContext ctxt, int features, ObjectCodec objectCodec, - final InputStream in, + InputStream in, boolean reuseResourceInParser) throws IOException { - this(ctxt, features, new MessageBufferInputProvider() { - @Override - public MessageBufferInput provide() - { - return new InputStreamBufferInput(in); - } - }, objectCodec, in, reuseResourceInParser); + this(ctxt, features, null, in, objectCodec, in, reuseResourceInParser); } public MessagePackParser(IOContext ctxt, int features, ObjectCodec objectCodec, byte[] bytes) @@ -149,22 +144,57 @@ public MessagePackParser( IOContext ctxt, int features, ObjectCodec objectCodec, - final byte[] bytes, + byte[] bytes, boolean reuseResourceInParser) throws IOException { - this(ctxt, features, new MessageBufferInputProvider() { - @Override - public MessageBufferInput provide() - { - return new ArrayBufferInput(bytes); - } - }, objectCodec, bytes, reuseResourceInParser); + this(ctxt, features, bytes, null, objectCodec, bytes, reuseResourceInParser); + } + + private MessageBufferInput createMessageBufferInput( + // Either of `bytes` or `in` is available + @Nullable byte[] bytes, + @Nullable InputStream in) + { + if (bytes != null) { + return new ArrayBufferInput(bytes); + } + else if (in != null) { + return new InputStreamBufferInput(in); + } + else { + throw new IllegalArgumentException("The both `bytes` and `in` are null"); + } + } + + private MessageBufferInput resetOrRecreateMessageBufferInput( + MessageBufferInput messageBufferInput, + // Either of `bytes` or `in` is available + @Nullable byte[] bytes, + @Nullable InputStream in) + throws IOException + { + // TODO: Revisit here + messageBufferInput.close(); + if (messageBufferInput instanceof ArrayBufferInput && bytes != null) { + ((ArrayBufferInput) messageBufferInput).reset(bytes); + } + else if (messageBufferInput instanceof InputStreamBufferInput && in != null) { + ((InputStreamBufferInput) messageBufferInput).reset(in); + } + else { + // The existing MessageBufferInput type doesn't match the new source type. + // Recreate MessageBufferInput instance. + return createMessageBufferInput(bytes, in); + } + return messageBufferInput; } private MessagePackParser(IOContext ctxt, int features, - MessageBufferInputProvider bufferInputProvider, + // Either of `bytes` or `in` is available + @Nullable byte[] bytes, + @Nullable InputStream in, ObjectCodec objectCodec, Object src, boolean reuseResourceInParser) @@ -179,7 +209,7 @@ private MessagePackParser(IOContext ctxt, parsingContext = JsonReadContext.createRootContext(dups); this.reuseResourceInParser = reuseResourceInParser; if (!reuseResourceInParser) { - this.messageUnpacker = MessagePack.newDefaultUnpacker(bufferInputProvider.provide()); + this.messageUnpacker = MessagePack.newDefaultUnpacker(createMessageBufferInput(bytes, in)); return; } else { @@ -187,40 +217,30 @@ private MessagePackParser(IOContext ctxt, } MessageUnpacker messageUnpacker; - MessageBufferInputLocator messageBufferInputLocator; - Triple messageUnpackerTriple = reuseObjectHolder.get(); - if (messageUnpackerTriple == null) { - final MessageBufferInputRegistry messageBufferInputRegistry = new MessageBufferInputRegistry(); - messageBufferInputRegistry.register(src.getClass(), bufferInputProvider); - messageBufferInputLocator = messageBufferInputRegistry; - messageUnpacker = MessagePack.newDefaultUnpacker(messageBufferInputRegistry.get(src.getClass())); + MessageBufferInput messageBufferInput; + Triple messageUnpackerResource = reuseObjectHolder.get(); + if (messageUnpackerResource == null) { + messageBufferInput = createMessageBufferInput(bytes, in); + messageUnpacker = MessagePack.newDefaultUnpacker(messageBufferInput); } else { // Considering to reuse InputStream with JsonParser.Feature.AUTO_CLOSE_SOURCE, // MessagePackParser needs to use the MessageUnpacker that has the same InputStream // since it has buffer which has loaded the InputStream data ahead. // However, it needs to call MessageUnpacker#reset when the source is different from the previous one. - if (isEnabled(JsonParser.Feature.AUTO_CLOSE_SOURCE) || messageUnpackerTriple.first() != src) { - final MessageBufferInputLocator bufferInputLocator = messageUnpackerTriple.third(); - MessageBufferInput messageBufferInput = bufferInputLocator.get(src.getClass()); - if (messageBufferInput != null) { - messageBufferInput.reset(src); - } - else { - if (bufferInputLocator instanceof MessageBufferInputRegistry) { - ((MessageBufferInputRegistry) bufferInputLocator).register(src.getClass(), bufferInputProvider); - messageBufferInput = bufferInputLocator.get(src.getClass()); - } - else { - messageBufferInput = bufferInputProvider.provide(); - } - } - messageUnpackerTriple.second().reset(messageBufferInput); + if (isEnabled(Feature.AUTO_CLOSE_SOURCE) || messageUnpackerResource.first() != src) { + messageBufferInput = messageUnpackerResource.third(); + messageUnpacker = messageUnpackerResource.second(); + + messageBufferInput = resetOrRecreateMessageBufferInput(messageBufferInput, bytes, in); + messageUnpacker.reset(messageBufferInput); + } + else { + messageBufferInput = messageUnpackerResource.third(); + messageUnpacker = messageUnpackerResource.second(); } - messageUnpacker = messageUnpackerTriple.second(); - messageBufferInputLocator = messageUnpackerTriple.third(); } - reuseObjectHolder.set(new Triple(src, messageUnpacker, messageBufferInputLocator)); + reuseObjectHolder.set(new Triple<>(src, messageUnpacker, messageBufferInput)); } public void setExtensionTypeCustomDeserializers(ExtensionTypeCustomDeserializers extTypeCustomDesers) @@ -721,10 +741,10 @@ private MessageUnpacker getMessageUnpacker() return this.messageUnpacker; } - Triple messageUnpackerTuple = reuseObjectHolder.get(); - if (messageUnpackerTuple == null) { + Triple reuseObject = reuseObjectHolder.get(); + if (reuseObject == null) { throw new IllegalStateException("messageUnpacker is null"); } - return messageUnpackerTuple.second(); + return reuseObject.second(); } } diff --git a/msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/Triple.java b/msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/Triple.java index db630ae4a..3f35b4678 100644 --- a/msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/Triple.java +++ b/msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/Triple.java @@ -15,14 +15,10 @@ // package org.msgpack.jackson.dataformat; -/** - * Created by komamitsu on 5/28/15. - */ public class Triple { private final F first; private final S second; - private final T third; public Triple(F first, S second, T third) diff --git a/msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/Tuple.java b/msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/Tuple.java index 1a252739f..b0f720d86 100644 --- a/msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/Tuple.java +++ b/msgpack-jackson/src/main/java/org/msgpack/jackson/dataformat/Tuple.java @@ -15,9 +15,6 @@ // package org.msgpack.jackson.dataformat; -/** - * Created by komamitsu on 5/28/15. - */ public class Tuple { private final F first;