Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trying some things but it's blowing up a bit. #48

Open
wants to merge 1 commit into
base: feature/growing-message-sequences
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,6 @@ public void testIDLElementWithFastRTPSGenCPPCode() throws IOException
SerializedPayload cppPayload = new SerializedPayload(type.getTypeSize());
cppPayload.getData().put(IDLElementTestCPPData.cppData);
cppPayload.getData().flip();
cppPayload.setLength(IDLElementTestCPPData.cppData.length);
type.deserialize(cppPayload, cppElement);

assertArrayEquals(IDLElementTestCPPData.cppData, javadata);
Expand Down
19 changes: 17 additions & 2 deletions ihmc-pub-sub/src/main/java/us/ihmc/idl/CDR.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public void serialize(SerializedPayload payload)
public static void writeEncapsulation(SerializedPayload payload)
{
ByteBuffer buf = payload.getData();
payload.ensureCapacity(buf.position() + 1 + encapsulation_size + 2);
//Write encapsulation
buf.put((byte) 0x0);
buf.put((byte) payload.getEncapsulation());
Expand All @@ -81,7 +82,6 @@ public static void readEncapsulation(SerializedPayload payload)
public void finishSerialize()
{
buf.flip();
payload.setLength(buf.limit());
}

public void finishDeserialize()
Expand All @@ -90,6 +90,8 @@ public void finishDeserialize()

public static int getTypeSize(int elementTypeSize)
{
if (elementTypeSize == 0) // unbounded
return 0;
return elementTypeSize + encapsulation_size;
}

Expand All @@ -105,6 +107,7 @@ public short read_type_1()
public void write_type_1(short val)
{
align(2);
payload.ensureCapacity(buf.position() + 2);
buf.putShort(val);
}

Expand All @@ -120,6 +123,7 @@ public int read_type_2()
public void write_type_2(int val)
{
align(4);
payload.ensureCapacity(buf.position() + 4);
buf.putInt(val);
}

Expand Down Expand Up @@ -171,6 +175,7 @@ public float read_type_5()
public void write_type_5(float val)
{
align(4);
payload.ensureCapacity(buf.position() + 4);
buf.putFloat(val);
}

Expand All @@ -186,6 +191,7 @@ public double read_type_6()
public void write_type_6(double val)
{
align(8);
payload.ensureCapacity(buf.position() + 8);
buf.putDouble(val);
}

Expand Down Expand Up @@ -213,6 +219,7 @@ public char read_type_8()

public void write_type_8(char val)
{
payload.ensureCapacity(buf.position() + 1);
buf.put((byte) val);
}

Expand All @@ -226,6 +233,7 @@ public byte read_type_9()

public void write_type_9(byte val)
{
payload.ensureCapacity(buf.position() + 1);
buf.put(val);
}

Expand Down Expand Up @@ -273,6 +281,8 @@ public void read_type_d(StringBuilder res)
public void write_type_d(StringBuilder str)
{
write_type_2(str.length() + 1);

payload.ensureCapacity(buf.position() + str.length() + 1);
for (int i = 0; i < str.length(); i++)
{
buf.put((byte) str.charAt(i));
Expand Down Expand Up @@ -308,6 +318,7 @@ public void write_type_e(IDLSequence seq)
int length = seq.size();
write_type_2(length);

payload.ensureCapacity(buf.position() + length);
if (seq instanceof IDLSequence.Byte byteSeq) // faster copy
{
buf.put(buf.position(), byteSeq.getBuffer(), 0, length);
Expand Down Expand Up @@ -342,6 +353,7 @@ public long read_type_11()
public void write_type_11(long val)
{
align(8);
payload.ensureCapacity(buf.position() + 8);
buf.putLong(val);
}

Expand Down Expand Up @@ -400,6 +412,7 @@ public void read_type_15(StringBuilder res)
public void write_type_15(StringBuilder str)
{
write_type_2(str.length());
payload.ensureCapacity(buf.position() + str.length());
for (int i = 0; i < str.length(); i++)
{
buf.putChar(str.charAt(i));
Expand All @@ -413,7 +426,9 @@ public int align(int byteBoundary)

if (adv != 0)
{
buf.position(position + encapsulation_size + (byteBoundary - adv));
int newPosition = position + encapsulation_size + (byteBoundary - adv);
payload.ensureCapacity(newPosition);
buf.position(newPosition);
}

return adv;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,24 @@ public class SerializedPayload
public static final short PL_CDR_LE = 0x0003;

private short encapsulation;
private int length;
private final ByteBuffer data;
private int max_size;
private final boolean unbounded;
private ByteBuffer data;
private int pos;

/**
* Constructor
*
* @param maxSize maximum size of the serialized data
* @param typeSize bounded size or 0 for unbounded types
*/
public SerializedPayload(int maxSize)
public SerializedPayload(int typeSize)
{
this.max_size = maxSize;
this.data = ByteBuffer.allocateDirect(maxSize);
unbounded = typeSize == 0;

if (unbounded)
data = ByteBuffer.allocate(8);
else
data = ByteBuffer.allocate(typeSize);

setEncapsulation(CDR_LE);
}

Expand Down Expand Up @@ -81,21 +85,9 @@ public void setEncapsulation(short encapsulation)
*/
public int getLength()
{
return length;
}

public void setLength(int length)
{
this.length = length;
return data.limit();
}

/**
* @return Maximum size of the payload
*/
public int getMax_size()
{
return max_size;
}
/**
*
* @return Position when reading.
Expand All @@ -110,6 +102,17 @@ public void setPos(int pos)
this.pos = pos;
}

public void ensureCapacity(int size)
{
if (unbounded)
{
if (data.capacity() < size)
{
data = ByteBuffer.allocate(data.capacity() * 2);
}
}
}

public ByteBuffer getData()
{
return data;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,6 @@ public synchronized int get_no_subscribers(String target_topic)

synchronized void registerType(TopicDataType<?> topicDataType) throws IllegalArgumentException
{
if (topicDataType.getTypeSize() <= 0)
{
throw new IllegalArgumentException("Registered type must have maximum byte size > 0");
}

if (topicDataType.getName().isEmpty())
{
throw new IllegalArgumentException("Registered type must have a name");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package us.ihmc.pubsub.impl.fastRTPS;

import us.ihmc.idl.CDR;
import us.ihmc.log.LogTools;
import us.ihmc.pubsub.TopicDataType;
import us.ihmc.pubsub.attributes.SubscriberAttributes;
import us.ihmc.pubsub.common.*;
Expand Down Expand Up @@ -100,14 +100,6 @@ private void preparePayload(short encapsulation, int dataLength)
{
payload.getData().clear();
payload.setEncapsulation(encapsulation);

// Compatibility for older versions of FastRTPS that do not include encapsulation in the payload size
if(CDR.getTypeSize(dataLength) <= payload.getMax_size())
{
dataLength = CDR.getTypeSize(dataLength);
}

payload.setLength(dataLength);
payload.getData().limit(dataLength);
}

Expand Down Expand Up @@ -193,33 +185,39 @@ public boolean readNextData(T data, SampleInfo info)
System.err.println("This subscriber has been removed from the domain");
return false;
}
if(impl.readnextData(payload.getData().capacity(), payload.getData(), sampleInfoMarshaller))

if (info == null)
{
if (info != null)
LogTools.error("Info is null");
return false;
}
else
{
if (impl.readnextData(payload.getData().capacity(), payload.getData(), sampleInfoMarshaller))
{
updateSampleInfo(sampleInfoMarshaller, info, keyBuffer);
preparePayload(sampleInfoMarshaller.getEncapsulation(), sampleInfoMarshaller.getDataLength());
try
{
currentMessageSize = info.getDataLength();
if (currentMessageSize > largestMessageSize)
largestMessageSize = currentMessageSize;
cumulativePayloadBytes += currentMessageSize;

payload.ensureCapacity((int) currentMessageSize);
topicDataType.deserialize(payload, data);
}
catch (IOException e)
{
e.printStackTrace();
return false;
}
return true;
}
preparePayload(sampleInfoMarshaller.getEncapsulation(), sampleInfoMarshaller.getDataLength());
try
{
currentMessageSize = payload.getLength();
if (payload.getLength() > largestMessageSize)
largestMessageSize = payload.getLength();
cumulativePayloadBytes += payload.getLength();

topicDataType.deserialize(payload, data);
}
catch (IOException e)
else
{
e.printStackTrace();
return false;
}
return true;
}
else
{
return false;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,12 @@ public void serialize(ByteBuffer data, SerializedPayload serializedPayload) thro
CDR.writeEncapsulation(serializedPayload);

ByteBuffer target = serializedPayload.getData();
serializedPayload.ensureCapacity(4 + data.remaining());
target.putInt(data.remaining());
target.put(data);
serializedPayload.setLength(align(target.position()));
int alignedSize = align(target.position());
serializedPayload.ensureCapacity(alignedSize);
serializedPayload.getData().limit(alignedSize);
}

@Override
Expand Down
Loading