Skip to content

Commit

Permalink
[METRICS] Ignore unsupported type on serializing BeanObject (#1697)
Browse files Browse the repository at this point in the history
  • Loading branch information
chinghongfang authored May 6, 2023
1 parent 668d5a4 commit 47afc52
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 91 deletions.
156 changes: 87 additions & 69 deletions common/src/main/java/org/astraea/common/ByteUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.astraea.common;

import com.google.protobuf.InvalidProtocolBufferException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
Expand Down Expand Up @@ -162,12 +163,21 @@ public static byte[] toBytes(boolean value) {
return new byte[] {0};
}

/** Serialize BeanObject by protocol buffer. */
/** Serialize BeanObject by protocol buffer. The unsupported value will be ignored. */
public static byte[] toBytes(BeanObject value) {
var beanBuilder = BeanObjectOuterClass.BeanObject.newBuilder();
beanBuilder.setDomain(value.domainName());
beanBuilder.putAllProperties(value.properties());
value.attributes().forEach((key, val) -> beanBuilder.putAttributes(key, primitive(val)));
value
.attributes()
.forEach(
(key, val) -> {
try {
beanBuilder.putAttributes(key, primitive(val));
} catch (SerializationException ignore) {
// Bean attribute may contain non-primitive value. e.g. TimeUnit, Byte.
}
});
return beanBuilder.build().toByteArray();
}

Expand Down Expand Up @@ -294,75 +304,83 @@ public static byte[] readBytes(ByteBuffer buffer, int size) {
// ---------------------------------ProtoBuf Object------------------------------------------- //

/** Deserialize to BeanObject with protocol buffer */
public static BeanObject readBeanObject(byte[] bytes) {
// Pack InvalidProtocolBufferException thrown by protoBuf
var outerBean = Utils.packException(() -> BeanObjectOuterClass.BeanObject.parseFrom(bytes));
return new BeanObject(
outerBean.getDomain(),
outerBean.getPropertiesMap(),
outerBean.getAttributesMap().entrySet().stream()
.collect(
Collectors.toUnmodifiableMap(
Map.Entry::getKey, e -> Objects.requireNonNull(toObject(e.getValue())))));
public static BeanObject readBeanObject(byte[] bytes) throws SerializationException {
try {
var outerBean = BeanObjectOuterClass.BeanObject.parseFrom(bytes);
return new BeanObject(
outerBean.getDomain(),
outerBean.getPropertiesMap(),
outerBean.getAttributesMap().entrySet().stream()
.collect(
Collectors.toUnmodifiableMap(
Map.Entry::getKey, e -> Objects.requireNonNull(toObject(e.getValue())))));
} catch (InvalidProtocolBufferException ex) {
// Pack exception thrown by protoBuf to Serialization exception.
throw new SerializationException(ex);
}
}

/** Deserialize to ClusterInfo with protocol buffer */
public static ClusterInfo readClusterInfo(byte[] bytes) {
var outerClusterInfo =
Utils.packException(() -> ClusterInfoOuterClass.ClusterInfo.parseFrom(bytes));
return ClusterInfo.of(
outerClusterInfo.getClusterId(),
outerClusterInfo.getNodeInfoList().stream()
.map(nodeInfo -> NodeInfo.of(nodeInfo.getId(), nodeInfo.getHost(), nodeInfo.getPort()))
.collect(Collectors.toList()),
outerClusterInfo.getTopicList().stream()
.map(
protoTopic ->
new Topic() {
@Override
public String name() {
return protoTopic.getName();
}

@Override
public Config config() {
return Config.of(protoTopic.getConfigMap());
}

@Override
public boolean internal() {
return protoTopic.getInternal();
}

@Override
public Set<TopicPartition> topicPartitions() {
return protoTopic.getPartitionList().stream()
.map(tp -> TopicPartition.of(protoTopic.getName(), tp))
.collect(Collectors.toSet());
}
})
.collect(Collectors.toMap(Topic::name, Function.identity())),
outerClusterInfo.getReplicaList().stream()
.map(
replica ->
Replica.builder()
.topic(replica.getTopic())
.partition(replica.getPartition())
.nodeInfo(
NodeInfo.of(
replica.getNodeInfo().getId(),
replica.getNodeInfo().getHost(),
replica.getNodeInfo().getPort()))
.lag(replica.getLag())
.size(replica.getSize())
.isLeader(replica.getIsLeader())
.isSync(replica.getIsSync())
.isFuture(replica.getIsFuture())
.isOffline(replica.getIsOffline())
.isPreferredLeader(replica.getIsPreferredLeader())
.path(replica.getPath())
.build())
.collect(Collectors.toList()));
try {
var outerClusterInfo = ClusterInfoOuterClass.ClusterInfo.parseFrom(bytes);
return ClusterInfo.of(
outerClusterInfo.getClusterId(),
outerClusterInfo.getNodeInfoList().stream()
.map(
nodeInfo -> NodeInfo.of(nodeInfo.getId(), nodeInfo.getHost(), nodeInfo.getPort()))
.collect(Collectors.toList()),
outerClusterInfo.getTopicList().stream()
.map(
protoTopic ->
new Topic() {
@Override
public String name() {
return protoTopic.getName();
}

@Override
public Config config() {
return Config.of(protoTopic.getConfigMap());
}

@Override
public boolean internal() {
return protoTopic.getInternal();
}

@Override
public Set<TopicPartition> topicPartitions() {
return protoTopic.getPartitionList().stream()
.map(tp -> TopicPartition.of(protoTopic.getName(), tp))
.collect(Collectors.toSet());
}
})
.collect(Collectors.toMap(Topic::name, Function.identity())),
outerClusterInfo.getReplicaList().stream()
.map(
replica ->
Replica.builder()
.topic(replica.getTopic())
.partition(replica.getPartition())
.nodeInfo(
NodeInfo.of(
replica.getNodeInfo().getId(),
replica.getNodeInfo().getHost(),
replica.getNodeInfo().getPort()))
.lag(replica.getLag())
.size(replica.getSize())
.isLeader(replica.getIsLeader())
.isSync(replica.getIsSync())
.isFuture(replica.getIsFuture())
.isOffline(replica.getIsOffline())
.isPreferredLeader(replica.getIsPreferredLeader())
.path(replica.getPath())
.build())
.collect(Collectors.toList()));
} catch (InvalidProtocolBufferException ex) {
throw new SerializationException(ex);
}
}

// --------------------------------ProtoBuf Primitive----------------------------------------- //
Expand All @@ -371,7 +389,7 @@ public Set<TopicPartition> topicPartitions() {
* Convert java primitive type to "one of" protocol buffer primitive type. There are no "short"
* and "char" in Protocol Buffers. Use "int" and "String" instead.
*/
private static PrimitiveOuterClass.Primitive primitive(Object v) {
private static PrimitiveOuterClass.Primitive primitive(Object v) throws SerializationException {
if (v instanceof Integer)
return PrimitiveOuterClass.Primitive.newBuilder().setInt((int) v).build();
else if (v instanceof Long)
Expand All @@ -385,7 +403,7 @@ else if (v instanceof Boolean)
else if (v instanceof String)
return PrimitiveOuterClass.Primitive.newBuilder().setStr(v.toString()).build();
else
throw new IllegalArgumentException(
throw new SerializationException(
"Type "
+ v.getClass()
+ " is not supported. Please use Integer, Long, Float, Double, Boolean, String instead.");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.common;

public class SerializationException extends IllegalArgumentException {
public SerializationException(Exception ex) {
super(ex);
}

public SerializationException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.util.zip.GZIPInputStream;
import org.astraea.common.ByteUtils;
import org.astraea.common.Header;
import org.astraea.common.Utils;
import org.astraea.common.SerializationException;
import org.astraea.common.consumer.Record;
import org.astraea.common.generated.RecordOuterClass;

Expand Down Expand Up @@ -58,24 +58,27 @@ public Record<byte[], byte[]> next() {

/** Parsed message if successful, or null if the stream is at EOF. */
private static Record<byte[], byte[]> readRecord(InputStream inputStream) {
var outerRecord =
Utils.packException(() -> RecordOuterClass.Record.parseDelimitedFrom(inputStream));
// inputStream reaches EOF
if (outerRecord == null) return null;
return Record.builder()
.topic(outerRecord.getTopic())
.headers(
outerRecord.getHeadersList().stream()
.map(header -> new Header(header.getKey(), header.getValue().toByteArray()))
.toList())
.key(outerRecord.getKey().toByteArray())
.value(outerRecord.getValue().toByteArray())
.offset(outerRecord.getOffset())
.timestamp(outerRecord.getTimestamp())
.partition(outerRecord.getPartition())
.serializedKeySize(outerRecord.getKey().size())
.serializedValueSize(outerRecord.getValue().size())
.build();
try {
var outerRecord = RecordOuterClass.Record.parseDelimitedFrom(inputStream);
// inputStream reaches EOF
if (outerRecord == null) return null;
return Record.builder()
.topic(outerRecord.getTopic())
.headers(
outerRecord.getHeadersList().stream()
.map(header -> new Header(header.getKey(), header.getValue().toByteArray()))
.toList())
.key(outerRecord.getKey().toByteArray())
.value(outerRecord.getValue().toByteArray())
.offset(outerRecord.getOffset())
.timestamp(outerRecord.getTimestamp())
.partition(outerRecord.getPartition())
.serializedKeySize(outerRecord.getKey().size())
.serializedValueSize(outerRecord.getValue().size())
.build();
} catch (IOException e) {
throw new SerializationException(e);
}
}

private InputStream fs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.util.List;
import java.util.Map;
import org.astraea.common.SerializationException;
import org.astraea.common.consumer.Deserializer;
import org.astraea.common.metrics.BeanObject;
import org.astraea.common.producer.Serializer;
Expand Down Expand Up @@ -63,10 +64,23 @@ public void testSerializationDeserialization() {
public void testUnsupportedType() {
var domain = "domain";
var properties = Map.of("name", "wrongType");
var attributes = Map.of("map", (Object) Map.of("k", "v"));
var attributes = Map.of("unsupportedType", new Object());

var bean = new BeanObject(domain, properties, attributes);
var serializedBean = Serializer.BEAN_OBJECT.serialize("ignore", List.of(), bean);
var deserializedBean =
Deserializer.BEAN_OBJECT.deserialize("ignore", List.of(), serializedBean);
// The "map" attribute should be ignored on serialization
Assertions.assertNotNull(bean.attributes().get("unsupportedType"));
Assertions.assertNull(deserializedBean.attributes().get("unsupportedType"));
Assertions.assertNotEquals(bean, deserializedBean);
}

@Test
public void testInvalidBytes() {
byte[] malformed = new byte[5];
Assertions.assertThrows(
IllegalArgumentException.class,
() -> Serializer.BEAN_OBJECT.serialize("ignore", List.of(), bean));
SerializationException.class,
() -> Deserializer.BEAN_OBJECT.deserialize("ignore", List.of(), malformed));
}
}

0 comments on commit 47afc52

Please sign in to comment.