Skip to content

Commit

Permalink
chore(java): rename MemoryBuffer read/write/put/getType with read/wri…
Browse files Browse the repository at this point in the history
…te/put/getTypeNumber (#1480)

This PR refactor `MemoryBuffer` by:
- Rename all methods in name for unsigned varint/varlong write/read
methods from write/readPositiveXXX.
- Rename all methods of get/put/read/writeType to type with number. For
example, rename `readInt` to `readInt32`
- Remove rebundant code in `MemoryBuffer`
- Rename unsafe method as `_unsafeXXX`
  • Loading branch information
chaokunyang authored Apr 12, 2024
1 parent 46db7e0 commit a9ad50a
Show file tree
Hide file tree
Showing 62 changed files with 1,829 additions and 2,098 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ jobs:
java-version: 8
distribution: 'temurin'
- name: Install fury java
run: cd java && mvn -T10 clean install -DskipTests && cd -
run: cd java && mvn -T10 --no-transfer-progress clean install -DskipTests && cd -
- name: Test
run: |
# Avoid sbt download jackson-databind error
Expand Down
4 changes: 0 additions & 4 deletions NOTICE
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,6 @@ The text of each license is the standard Apache 2.0 license.
java/fury-core/src/main/java/org/apache/fury/util/Platform.java
java/fury-format/src/main/java/org/apache/fury/format/vectorized/ArrowWriter.java

* flink (https://github.com/apache/flink)
Files:
java/fury-core/src/main/java/org/apache/fury/memory/MemoryBuffer.java

* commons-io (https://github.com/apache/commons-io)
Files:
java/fury-core/src/main/java/org/apache/fury/io/ClassLoaderObjectInputStream.java
Expand Down
5 changes: 2 additions & 3 deletions cpp/fury/util/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ class Buffer {

inline double GetDouble(uint32_t offset) { return Get<double>(offset); }

inline uint32_t PutPositiveVarInt32(uint32_t offset, int32_t value) {
inline uint32_t PutVarUint32(uint32_t offset, int32_t value) {
if (value >> 7 == 0) {
data_[offset] = (int8_t)value;
return 1;
Expand Down Expand Up @@ -164,8 +164,7 @@ class Buffer {
return 5;
}

inline int32_t GetPositiveVarInt32(uint32_t offset,
uint32_t *readBytesLength) {
inline int32_t GetVarUint32(uint32_t offset, uint32_t *readBytesLength) {
uint32_t position = offset;
int b = data_[position++];
int result = b & 0x7F;
Expand Down
30 changes: 15 additions & 15 deletions cpp/fury/util/buffer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,30 +38,30 @@ TEST(Buffer, ToString) {
EXPECT_EQ(buffer->Get<float>(0), f);
}

void checkPositiveVarInt(int32_t startOffset, std::shared_ptr<Buffer> buffer,
int32_t value, uint32_t bytesWritten) {
uint32_t actualBytesWritten = buffer->PutPositiveVarInt32(startOffset, value);
void checkVarUint32(int32_t startOffset, std::shared_ptr<Buffer> buffer,
int32_t value, uint32_t bytesWritten) {
uint32_t actualBytesWritten = buffer->PutVarUint32(startOffset, value);
EXPECT_EQ(actualBytesWritten, bytesWritten);
uint32_t readBytesLength;
int32_t varInt = buffer->GetPositiveVarInt32(startOffset, &readBytesLength);
int32_t varInt = buffer->GetVarUint32(startOffset, &readBytesLength);
EXPECT_EQ(value, varInt);
EXPECT_EQ(readBytesLength, bytesWritten);
}

TEST(Buffer, TestPositiveVarInt) {
TEST(Buffer, TestVarUint) {
std::shared_ptr<Buffer> buffer;
AllocateBuffer(64, &buffer);
for (int i = 0; i < 32; ++i) {
checkPositiveVarInt(i, buffer, 1, 1);
checkPositiveVarInt(i, buffer, 1 << 6, 1);
checkPositiveVarInt(i, buffer, 1 << 7, 2);
checkPositiveVarInt(i, buffer, 1 << 13, 2);
checkPositiveVarInt(i, buffer, 1 << 14, 3);
checkPositiveVarInt(i, buffer, 1 << 20, 3);
checkPositiveVarInt(i, buffer, 1 << 21, 4);
checkPositiveVarInt(i, buffer, 1 << 27, 4);
checkPositiveVarInt(i, buffer, 1 << 28, 5);
checkPositiveVarInt(i, buffer, 1 << 30, 5);
checkVarUint32(i, buffer, 1, 1);
checkVarUint32(i, buffer, 1 << 6, 1);
checkVarUint32(i, buffer, 1 << 7, 2);
checkVarUint32(i, buffer, 1 << 13, 2);
checkVarUint32(i, buffer, 1 << 14, 3);
checkVarUint32(i, buffer, 1 << 20, 3);
checkVarUint32(i, buffer, 1 << 21, 4);
checkVarUint32(i, buffer, 1 << 27, 4);
checkVarUint32(i, buffer, 1 << 28, 5);
checkVarUint32(i, buffer, 1 << 30, 5);
}
}

Expand Down
8 changes: 4 additions & 4 deletions docs/guide/java_object_graph_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,13 +203,13 @@ class FooSerializer extends Serializer<Foo> {

@Override
public void write(MemoryBuffer buffer, Foo value) {
buffer.writeLong(value.f1);
buffer.writeInt64(value.f1);
}

@Override
public Foo read(MemoryBuffer buffer) {
Foo foo = new Foo();
foo.f1 = buffer.readLong();
foo.f1 = buffer.readInt64();
return foo;
}
}
Expand Down Expand Up @@ -381,13 +381,13 @@ But if you do want to upgrade fury for better performance and smaller size, you
using code like following to keep binary compatibility:
```java
MemoryBuffer buffer = xxx;
buffer.writeVarInt(2);
buffer.writeVarInt32(2);
fury.serialize(buffer, obj);
```
Then for deserialization, you need:
```java
MemoryBuffer buffer = xxx;
int furyVersion = buffer.readVarInt()
int furyVersion = buffer.readVarInt32()
Fury fury = getFury(furyVersion);
fury.deserialize(buffer);
```
Expand Down
2 changes: 1 addition & 1 deletion go/fury/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func (b *ByteBuffer) PutInt32(index int, value int32) {
binary.LittleEndian.PutUint32(b.data[index:], uint32(value))
}

// WriteVarInt32 WritePositiveVarInt writes a 1-5 byte int, returns the number of bytes written.
// WriteVarInt32 WriteVarUint writes a 1-5 byte int, returns the number of bytes written.
func (b *ByteBuffer) WriteVarInt32(value int32) int8 {
if value>>7 == 0 {
b.grow(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public Object serializeList() {
for (int i = 0; i < size; i++) {
Integer o = list.get(i);
if (o != null) {
buffer.writeVarInt(o);
buffer.writeVarInt32(o);
}
}
return buffer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ public Object bufferUnsafeReadShort(MemoryState state) {
buffer1.readerIndex(0);
int size = buffer1.size();
for (int i = 0; i < size / 2; i++) {
x += buffer1.readShort();
x += buffer1.readInt16();
}
return x;
}
Expand All @@ -233,7 +233,7 @@ public Object bufferUnsafeReadInt(MemoryState state) {
buffer1.readerIndex(0);
int size = buffer1.size();
for (int i = 0; i < size / 4; i++) {
x += buffer1.readInt();
x += buffer1.readInt32();
}
return x;
}
Expand Down
64 changes: 32 additions & 32 deletions java/fury-core/src/main/java/org/apache/fury/Fury.java
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ private void resetBuffer() {
private void write(MemoryBuffer buffer, Object obj) {
if (config.shareMetaContext()) {
int startOffset = buffer.writerIndex();
buffer.writeInt(-1); // preserve 4-byte for nativeObjects start offsets.
buffer.writeInt32(-1); // preserve 4-byte for nativeObjects start offsets.
writeRef(buffer, obj);
buffer.putInt(startOffset, buffer.writerIndex());
classResolver.writeClassDefs(buffer);
Expand All @@ -320,8 +320,8 @@ private void write(MemoryBuffer buffer, Object obj) {

private void xserializeInternal(MemoryBuffer buffer, Object obj) {
int startOffset = buffer.writerIndex();
buffer.writeInt(-1); // preserve 4-byte for nativeObjects start offsets.
buffer.writeInt(-1); // preserve 4-byte for nativeObjects size
buffer.writeInt32(-1); // preserve 4-byte for nativeObjects start offsets.
buffer.writeInt32(-1); // preserve 4-byte for nativeObjects size
xwriteRef(buffer, obj);
buffer.putInt(startOffset, buffer.writerIndex());
buffer.putInt(startOffset + 4, nativeObjects.size());
Expand Down Expand Up @@ -485,7 +485,7 @@ public <T> void xwriteNonRef(MemoryBuffer buffer, T obj, Serializer<T> serialize
serializer = classResolver.getSerializer(cls);
}
short typeId = serializer.getXtypeId();
buffer.writeShort(typeId);
buffer.writeInt16(typeId);
if (typeId != NOT_SUPPORT_CROSS_LANGUAGE) {
if (typeId == FURY_TYPE_TAG_ID) {
classResolver.xwriteTypeTag(buffer, cls);
Expand All @@ -503,7 +503,7 @@ public <T> void xwriteNonRef(MemoryBuffer buffer, T obj, Serializer<T> serialize
// fields/objects deserialization will use wrong reference id since we skip opaque objects
// deserialization.
// So we stash native objects and serialize all those object at the last.
buffer.writePositiveVarInt(nativeObjects.size());
buffer.writeVarUint32(nativeObjects.size());
nativeObjects.add(obj);
}
depth--;
Expand All @@ -522,23 +522,23 @@ private void writeData(MemoryBuffer buffer, ClassInfo classInfo, Object obj) {
buffer.writeChar((Character) obj);
break;
case ClassResolver.SHORT_CLASS_ID:
buffer.writeShort((Short) obj);
buffer.writeInt16((Short) obj);
break;
case ClassResolver.INTEGER_CLASS_ID:
if (compressInt) {
buffer.writeVarInt((Integer) obj);
buffer.writeVarInt32((Integer) obj);
} else {
buffer.writeInt((Integer) obj);
buffer.writeInt32((Integer) obj);
}
break;
case ClassResolver.FLOAT_CLASS_ID:
buffer.writeFloat((Float) obj);
buffer.writeFloat32((Float) obj);
break;
case ClassResolver.LONG_CLASS_ID:
LongSerializer.writeLong(buffer, (Long) obj, longEncoding);
LongSerializer.writeInt64(buffer, (Long) obj, longEncoding);
break;
case ClassResolver.DOUBLE_CLASS_ID:
buffer.writeDouble((Double) obj);
buffer.writeFloat64((Double) obj);
break;
case ClassResolver.STRING_CLASS_ID:
stringSerializer.writeJavaString(buffer, (String) obj);
Expand All @@ -560,9 +560,9 @@ public void writeBufferObject(MemoryBuffer buffer, BufferObject bufferObject) {
// efficient
// TODO(chaokunyang) Remove branch when other languages support aligned varint.
if (language == Language.JAVA) {
buffer.writePositiveVarIntAligned(totalBytes);
buffer.writeVarUint32Aligned(totalBytes);
} else {
buffer.writePositiveVarInt(totalBytes);
buffer.writeVarUint32(totalBytes);
}
int writerIndex = buffer.writerIndex();
buffer.ensure(writerIndex + bufferObject.totalBytes());
Expand All @@ -584,9 +584,9 @@ public void writeBufferObject(
// efficient
// TODO(chaokunyang) Remove branch when other languages support aligned varint.
if (language == Language.JAVA) {
buffer.writePositiveVarIntAligned(totalBytes);
buffer.writeVarUint32Aligned(totalBytes);
} else {
buffer.writePositiveVarInt(totalBytes);
buffer.writeVarUint32(totalBytes);
}
bufferObject.writeTo(buffer);
} else {
Expand All @@ -600,9 +600,9 @@ public MemoryBuffer readBufferObject(MemoryBuffer buffer) {
int size;
// TODO(chaokunyang) Remove branch when other languages support aligned varint.
if (language == Language.JAVA) {
size = buffer.readPositiveAlignedVarInt();
size = buffer.readAlignedVarUint();
} else {
size = buffer.readPositiveVarInt();
size = buffer.readVarUint32();
}
MemoryBuffer slice = buffer.slice(buffer.readerIndex(), size);
buffer.readerIndex(buffer.readerIndex() + size);
Expand Down Expand Up @@ -666,12 +666,12 @@ public String readJavaString(MemoryBuffer buffer) {
return stringSerializer.readJavaString(buffer);
}

public void writeLong(MemoryBuffer buffer, long value) {
LongSerializer.writeLong(buffer, value, longEncoding);
public void writeInt64(MemoryBuffer buffer, long value) {
LongSerializer.writeInt64(buffer, value, longEncoding);
}

public long readLong(MemoryBuffer buffer) {
return LongSerializer.readLong(buffer, longEncoding);
public long readInt64(MemoryBuffer buffer) {
return LongSerializer.readInt64(buffer, longEncoding);
}

@Override
Expand Down Expand Up @@ -797,8 +797,8 @@ private RuntimeException handleReadFailed(Throwable t) {

private Object xdeserializeInternal(MemoryBuffer buffer) {
Object obj;
int nativeObjectsStartOffset = buffer.readInt();
int nativeObjectsSize = buffer.readInt();
int nativeObjectsStartOffset = buffer.readInt32();
int nativeObjectsSize = buffer.readInt32();
int endReaderIndex = nativeObjectsStartOffset;
if (peerLanguage == Language.JAVA) {
int readerIndex = buffer.readerIndex();
Expand Down Expand Up @@ -903,19 +903,19 @@ private Object readDataInternal(MemoryBuffer buffer, ClassInfo classInfo) {
case ClassResolver.CHAR_CLASS_ID:
return buffer.readChar();
case ClassResolver.SHORT_CLASS_ID:
return buffer.readShort();
return buffer.readInt16();
case ClassResolver.INTEGER_CLASS_ID:
if (compressInt) {
return buffer.readVarInt();
return buffer.readVarInt32();
} else {
return buffer.readInt();
return buffer.readInt32();
}
case ClassResolver.FLOAT_CLASS_ID:
return buffer.readFloat();
return buffer.readFloat32();
case ClassResolver.LONG_CLASS_ID:
return LongSerializer.readLong(buffer, longEncoding);
return LongSerializer.readInt64(buffer, longEncoding);
case ClassResolver.DOUBLE_CLASS_ID:
return buffer.readDouble();
return buffer.readFloat64();
case ClassResolver.STRING_CLASS_ID:
return stringSerializer.readJavaString(buffer);
// TODO(add fastpath for other types)
Expand Down Expand Up @@ -970,7 +970,7 @@ public Object xreadRefByNullableSerializer(MemoryBuffer buffer, Serializer<?> se

public Object xreadNonRef(MemoryBuffer buffer, Serializer<?> serializer) {
depth++;
short typeId = buffer.readShort();
short typeId = buffer.readInt16();
ClassResolver classResolver = this.classResolver;
if (typeId != NOT_SUPPORT_CROSS_LANGUAGE) {
Class<?> cls = null;
Expand Down Expand Up @@ -1000,7 +1000,7 @@ public Object xreadNonRef(MemoryBuffer buffer, Serializer<?> serializer) {
return o;
} else {
String className = classResolver.xreadClassName(buffer);
int ordinal = buffer.readPositiveVarInt();
int ordinal = buffer.readVarUint32();
if (peerLanguage != Language.JAVA) {
return OpaqueObjects.of(peerLanguage, className, ordinal);
} else {
Expand Down Expand Up @@ -1028,7 +1028,7 @@ public void serializeJavaObject(MemoryBuffer buffer, Object obj) {
}
if (config.shareMetaContext()) {
int startOffset = buffer.writerIndex();
buffer.writeInt(-1); // preserve 4-byte for nativeObjects start offsets.
buffer.writeInt32(-1); // preserve 4-byte for nativeObjects start offsets.
if (!refResolver.writeRefOrNull(buffer, obj)) {
ClassInfo classInfo = classResolver.getOrUpdateClassInfo(obj.getClass());
writeData(buffer, classInfo, obj);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,16 +370,16 @@ private Expression serializeForNotNull(
} else if (clz == char.class || clz == Character.class) {
return new Invoke(buffer, "writeChar", inputObject);
} else if (clz == short.class || clz == Short.class) {
return new Invoke(buffer, "writeShort", inputObject);
return new Invoke(buffer, "writeInt16", inputObject);
} else if (clz == int.class || clz == Integer.class) {
String func = fury.compressInt() ? "writeVarInt" : "writeInt";
String func = fury.compressInt() ? "writeVarInt32" : "writeInt32";
return new Invoke(buffer, func, inputObject);
} else if (clz == long.class || clz == Long.class) {
return LongSerializer.writeLong(buffer, inputObject, fury.longEncoding(), true);
return LongSerializer.writeInt64(buffer, inputObject, fury.longEncoding(), true);
} else if (clz == float.class || clz == Float.class) {
return new Invoke(buffer, "writeFloat", inputObject);
return new Invoke(buffer, "writeFloat32", inputObject);
} else if (clz == double.class || clz == Double.class) {
return new Invoke(buffer, "writeDouble", inputObject);
return new Invoke(buffer, "writeFloat64", inputObject);
} else {
throw new IllegalStateException("impossible");
}
Expand Down Expand Up @@ -1135,15 +1135,15 @@ protected Expression deserializeForNotNull(
} else if (cls == char.class || cls == Character.class) {
return readChar(buffer);
} else if (cls == short.class || cls == Short.class) {
return readShort(buffer);
return readInt16(buffer);
} else if (cls == int.class || cls == Integer.class) {
return fury.compressInt() ? readVarInt(buffer) : readInt(buffer);
return fury.compressInt() ? readVarInt32(buffer) : readInt32(buffer);
} else if (cls == long.class || cls == Long.class) {
return LongSerializer.readLong(buffer, fury.longEncoding());
return LongSerializer.readInt64(buffer, fury.longEncoding());
} else if (cls == float.class || cls == Float.class) {
return readFloat(buffer);
return readFloat32(buffer);
} else if (cls == double.class || cls == Double.class) {
return readDouble(buffer);
return readFloat64(buffer);
} else {
throw new IllegalStateException("impossible");
}
Expand Down
Loading

0 comments on commit a9ad50a

Please sign in to comment.