diff --git a/ihmc-pub-sub-generator/src/test/java/us/ihmc/pubsub/test/IDLElementTestTest.java b/ihmc-pub-sub-generator/src/test/java/us/ihmc/pubsub/test/IDLElementTestTest.java index 4710de16..3a213c55 100644 --- a/ihmc-pub-sub-generator/src/test/java/us/ihmc/pubsub/test/IDLElementTestTest.java +++ b/ihmc-pub-sub-generator/src/test/java/us/ihmc/pubsub/test/IDLElementTestTest.java @@ -351,7 +351,6 @@ public void testIDLElementWithFastRTPSGenCPPCode() throws IOException SerializedPayload cppPayload = new SerializedPayload(type.getTypeSize()); cppPayload.getData().put(IDLElementTestCPPData.cppData); cppPayload.getData().flip(); - cppPayload.setLength(IDLElementTestCPPData.cppData.length); type.deserialize(cppPayload, cppElement); assertArrayEquals(IDLElementTestCPPData.cppData, javadata); diff --git a/ihmc-pub-sub/src/main/java/us/ihmc/idl/CDR.java b/ihmc-pub-sub/src/main/java/us/ihmc/idl/CDR.java index 427148a9..5033e738 100644 --- a/ihmc-pub-sub/src/main/java/us/ihmc/idl/CDR.java +++ b/ihmc-pub-sub/src/main/java/us/ihmc/idl/CDR.java @@ -56,6 +56,7 @@ public void serialize(SerializedPayload payload) public static void writeEncapsulation(SerializedPayload payload) { ByteBuffer buf = payload.getData(); + payload.ensureCapacity(buf.position() + 1 + encapsulation_size + 2); //Write encapsulation buf.put((byte) 0x0); buf.put((byte) payload.getEncapsulation()); @@ -81,7 +82,6 @@ public static void readEncapsulation(SerializedPayload payload) public void finishSerialize() { buf.flip(); - payload.setLength(buf.limit()); } public void finishDeserialize() @@ -90,6 +90,8 @@ public void finishDeserialize() public static int getTypeSize(int elementTypeSize) { + if (elementTypeSize == 0) // unbounded + return 0; return elementTypeSize + encapsulation_size; } @@ -105,6 +107,7 @@ public short read_type_1() public void write_type_1(short val) { align(2); + payload.ensureCapacity(buf.position() + 2); buf.putShort(val); } @@ -120,6 +123,7 @@ public int read_type_2() public void write_type_2(int val) { align(4); + payload.ensureCapacity(buf.position() + 4); buf.putInt(val); } @@ -171,6 +175,7 @@ public float read_type_5() public void write_type_5(float val) { align(4); + payload.ensureCapacity(buf.position() + 4); buf.putFloat(val); } @@ -186,6 +191,7 @@ public double read_type_6() public void write_type_6(double val) { align(8); + payload.ensureCapacity(buf.position() + 8); buf.putDouble(val); } @@ -213,6 +219,7 @@ public char read_type_8() public void write_type_8(char val) { + payload.ensureCapacity(buf.position() + 1); buf.put((byte) val); } @@ -226,6 +233,7 @@ public byte read_type_9() public void write_type_9(byte val) { + payload.ensureCapacity(buf.position() + 1); buf.put(val); } @@ -273,6 +281,8 @@ public void read_type_d(StringBuilder res) public void write_type_d(StringBuilder str) { write_type_2(str.length() + 1); + + payload.ensureCapacity(buf.position() + str.length() + 1); for (int i = 0; i < str.length(); i++) { buf.put((byte) str.charAt(i)); @@ -308,6 +318,7 @@ public void write_type_e(IDLSequence seq) int length = seq.size(); write_type_2(length); + payload.ensureCapacity(buf.position() + length); if (seq instanceof IDLSequence.Byte byteSeq) // faster copy { buf.put(buf.position(), byteSeq.getBuffer(), 0, length); @@ -342,6 +353,7 @@ public long read_type_11() public void write_type_11(long val) { align(8); + payload.ensureCapacity(buf.position() + 8); buf.putLong(val); } @@ -400,6 +412,7 @@ public void read_type_15(StringBuilder res) public void write_type_15(StringBuilder str) { write_type_2(str.length()); + payload.ensureCapacity(buf.position() + str.length()); for (int i = 0; i < str.length(); i++) { buf.putChar(str.charAt(i)); @@ -413,7 +426,9 @@ public int align(int byteBoundary) if (adv != 0) { - buf.position(position + encapsulation_size + (byteBoundary - adv)); + int newPosition = position + encapsulation_size + (byteBoundary - adv); + payload.ensureCapacity(newPosition); + buf.position(newPosition); } return adv; diff --git a/ihmc-pub-sub/src/main/java/us/ihmc/pubsub/common/SerializedPayload.java b/ihmc-pub-sub/src/main/java/us/ihmc/pubsub/common/SerializedPayload.java index a2864d71..2d5ff108 100644 --- a/ihmc-pub-sub/src/main/java/us/ihmc/pubsub/common/SerializedPayload.java +++ b/ihmc-pub-sub/src/main/java/us/ihmc/pubsub/common/SerializedPayload.java @@ -32,20 +32,24 @@ public class SerializedPayload public static final short PL_CDR_LE = 0x0003; private short encapsulation; - private int length; - private final ByteBuffer data; - private int max_size; + private final boolean unbounded; + private ByteBuffer data; private int pos; /** * Constructor * - * @param maxSize maximum size of the serialized data + * @param typeSize bounded size or 0 for unbounded types */ - public SerializedPayload(int maxSize) + public SerializedPayload(int typeSize) { - this.max_size = maxSize; - this.data = ByteBuffer.allocateDirect(maxSize); + unbounded = typeSize == 0; + + if (unbounded) + data = ByteBuffer.allocate(8); + else + data = ByteBuffer.allocate(typeSize); + setEncapsulation(CDR_LE); } @@ -81,21 +85,9 @@ public void setEncapsulation(short encapsulation) */ public int getLength() { - return length; - } - - public void setLength(int length) - { - this.length = length; + return data.limit(); } - /** - * @return Maximum size of the payload - */ - public int getMax_size() - { - return max_size; - } /** * * @return Position when reading. @@ -110,6 +102,17 @@ public void setPos(int pos) this.pos = pos; } + public void ensureCapacity(int size) + { + if (unbounded) + { + if (data.capacity() < size) + { + data = ByteBuffer.allocate(data.capacity() * 2); + } + } + } + public ByteBuffer getData() { return data; diff --git a/ihmc-pub-sub/src/main/java/us/ihmc/pubsub/impl/fastRTPS/FastRTPSParticipant.java b/ihmc-pub-sub/src/main/java/us/ihmc/pubsub/impl/fastRTPS/FastRTPSParticipant.java index e52c8aaa..c1972958 100644 --- a/ihmc-pub-sub/src/main/java/us/ihmc/pubsub/impl/fastRTPS/FastRTPSParticipant.java +++ b/ihmc-pub-sub/src/main/java/us/ihmc/pubsub/impl/fastRTPS/FastRTPSParticipant.java @@ -151,11 +151,6 @@ public synchronized int get_no_subscribers(String target_topic) synchronized void registerType(TopicDataType topicDataType) throws IllegalArgumentException { - if (topicDataType.getTypeSize() <= 0) - { - throw new IllegalArgumentException("Registered type must have maximum byte size > 0"); - } - if (topicDataType.getName().isEmpty()) { throw new IllegalArgumentException("Registered type must have a name"); diff --git a/ihmc-pub-sub/src/main/java/us/ihmc/pubsub/impl/fastRTPS/FastRTPSSubscriber.java b/ihmc-pub-sub/src/main/java/us/ihmc/pubsub/impl/fastRTPS/FastRTPSSubscriber.java index 42c31780..1e565bbf 100644 --- a/ihmc-pub-sub/src/main/java/us/ihmc/pubsub/impl/fastRTPS/FastRTPSSubscriber.java +++ b/ihmc-pub-sub/src/main/java/us/ihmc/pubsub/impl/fastRTPS/FastRTPSSubscriber.java @@ -15,7 +15,7 @@ */ package us.ihmc.pubsub.impl.fastRTPS; -import us.ihmc.idl.CDR; +import us.ihmc.log.LogTools; import us.ihmc.pubsub.TopicDataType; import us.ihmc.pubsub.attributes.SubscriberAttributes; import us.ihmc.pubsub.common.*; @@ -100,14 +100,6 @@ private void preparePayload(short encapsulation, int dataLength) { payload.getData().clear(); payload.setEncapsulation(encapsulation); - - // Compatibility for older versions of FastRTPS that do not include encapsulation in the payload size - if(CDR.getTypeSize(dataLength) <= payload.getMax_size()) - { - dataLength = CDR.getTypeSize(dataLength); - } - - payload.setLength(dataLength); payload.getData().limit(dataLength); } @@ -193,33 +185,39 @@ public boolean readNextData(T data, SampleInfo info) System.err.println("This subscriber has been removed from the domain"); return false; } - - if(impl.readnextData(payload.getData().capacity(), payload.getData(), sampleInfoMarshaller)) + + if (info == null) { - if (info != null) + LogTools.error("Info is null"); + return false; + } + else + { + if (impl.readnextData(payload.getData().capacity(), payload.getData(), sampleInfoMarshaller)) { updateSampleInfo(sampleInfoMarshaller, info, keyBuffer); + preparePayload(sampleInfoMarshaller.getEncapsulation(), sampleInfoMarshaller.getDataLength()); + try + { + currentMessageSize = info.getDataLength(); + if (currentMessageSize > largestMessageSize) + largestMessageSize = currentMessageSize; + cumulativePayloadBytes += currentMessageSize; + + payload.ensureCapacity((int) currentMessageSize); + topicDataType.deserialize(payload, data); + } + catch (IOException e) + { + e.printStackTrace(); + return false; + } + return true; } - preparePayload(sampleInfoMarshaller.getEncapsulation(), sampleInfoMarshaller.getDataLength()); - try - { - currentMessageSize = payload.getLength(); - if (payload.getLength() > largestMessageSize) - largestMessageSize = payload.getLength(); - cumulativePayloadBytes += payload.getLength(); - - topicDataType.deserialize(payload, data); - } - catch (IOException e) + else { - e.printStackTrace(); return false; } - return true; - } - else - { - return false; } } } diff --git a/ihmc-pub-sub/src/main/java/us/ihmc/pubsub/types/ByteBufferPubSubType.java b/ihmc-pub-sub/src/main/java/us/ihmc/pubsub/types/ByteBufferPubSubType.java index 4e86bc2a..89a5e503 100644 --- a/ihmc-pub-sub/src/main/java/us/ihmc/pubsub/types/ByteBufferPubSubType.java +++ b/ihmc-pub-sub/src/main/java/us/ihmc/pubsub/types/ByteBufferPubSubType.java @@ -83,9 +83,12 @@ public void serialize(ByteBuffer data, SerializedPayload serializedPayload) thro CDR.writeEncapsulation(serializedPayload); ByteBuffer target = serializedPayload.getData(); + serializedPayload.ensureCapacity(4 + data.remaining()); target.putInt(data.remaining()); target.put(data); - serializedPayload.setLength(align(target.position())); + int alignedSize = align(target.position()); + serializedPayload.ensureCapacity(alignedSize); + serializedPayload.getData().limit(alignedSize); } @Override