diff --git a/common/src/main/java/org/astraea/common/ByteUtils.java b/common/src/main/java/org/astraea/common/ByteUtils.java index 719f6ae49e..b7db5b56ee 100644 --- a/common/src/main/java/org/astraea/common/ByteUtils.java +++ b/common/src/main/java/org/astraea/common/ByteUtils.java @@ -16,6 +16,7 @@ */ package org.astraea.common; +import com.google.protobuf.InvalidProtocolBufferException; import java.io.InputStream; import java.nio.ByteBuffer; import java.nio.channels.ReadableByteChannel; @@ -162,12 +163,21 @@ public static byte[] toBytes(boolean value) { return new byte[] {0}; } - /** Serialize BeanObject by protocol buffer. */ + /** Serialize BeanObject by protocol buffer. The unsupported value will be ignored. */ public static byte[] toBytes(BeanObject value) { var beanBuilder = BeanObjectOuterClass.BeanObject.newBuilder(); beanBuilder.setDomain(value.domainName()); beanBuilder.putAllProperties(value.properties()); - value.attributes().forEach((key, val) -> beanBuilder.putAttributes(key, primitive(val))); + value + .attributes() + .forEach( + (key, val) -> { + try { + beanBuilder.putAttributes(key, primitive(val)); + } catch (SerializationException ignore) { + // Bean attribute may contain non-primitive value. e.g. TimeUnit, Byte. + } + }); return beanBuilder.build().toByteArray(); } @@ -294,75 +304,83 @@ public static byte[] readBytes(ByteBuffer buffer, int size) { // ---------------------------------ProtoBuf Object------------------------------------------- // /** Deserialize to BeanObject with protocol buffer */ - public static BeanObject readBeanObject(byte[] bytes) { - // Pack InvalidProtocolBufferException thrown by protoBuf - var outerBean = Utils.packException(() -> BeanObjectOuterClass.BeanObject.parseFrom(bytes)); - return new BeanObject( - outerBean.getDomain(), - outerBean.getPropertiesMap(), - outerBean.getAttributesMap().entrySet().stream() - .collect( - Collectors.toUnmodifiableMap( - Map.Entry::getKey, e -> Objects.requireNonNull(toObject(e.getValue()))))); + public static BeanObject readBeanObject(byte[] bytes) throws SerializationException { + try { + var outerBean = BeanObjectOuterClass.BeanObject.parseFrom(bytes); + return new BeanObject( + outerBean.getDomain(), + outerBean.getPropertiesMap(), + outerBean.getAttributesMap().entrySet().stream() + .collect( + Collectors.toUnmodifiableMap( + Map.Entry::getKey, e -> Objects.requireNonNull(toObject(e.getValue()))))); + } catch (InvalidProtocolBufferException ex) { + // Pack exception thrown by protoBuf to Serialization exception. + throw new SerializationException(ex); + } } /** Deserialize to ClusterInfo with protocol buffer */ public static ClusterInfo readClusterInfo(byte[] bytes) { - var outerClusterInfo = - Utils.packException(() -> ClusterInfoOuterClass.ClusterInfo.parseFrom(bytes)); - return ClusterInfo.of( - outerClusterInfo.getClusterId(), - outerClusterInfo.getNodeInfoList().stream() - .map(nodeInfo -> NodeInfo.of(nodeInfo.getId(), nodeInfo.getHost(), nodeInfo.getPort())) - .collect(Collectors.toList()), - outerClusterInfo.getTopicList().stream() - .map( - protoTopic -> - new Topic() { - @Override - public String name() { - return protoTopic.getName(); - } - - @Override - public Config config() { - return Config.of(protoTopic.getConfigMap()); - } - - @Override - public boolean internal() { - return protoTopic.getInternal(); - } - - @Override - public Set topicPartitions() { - return protoTopic.getPartitionList().stream() - .map(tp -> TopicPartition.of(protoTopic.getName(), tp)) - .collect(Collectors.toSet()); - } - }) - .collect(Collectors.toMap(Topic::name, Function.identity())), - outerClusterInfo.getReplicaList().stream() - .map( - replica -> - Replica.builder() - .topic(replica.getTopic()) - .partition(replica.getPartition()) - .nodeInfo( - NodeInfo.of( - replica.getNodeInfo().getId(), - replica.getNodeInfo().getHost(), - replica.getNodeInfo().getPort())) - .lag(replica.getLag()) - .size(replica.getSize()) - .isLeader(replica.getIsLeader()) - .isSync(replica.getIsSync()) - .isFuture(replica.getIsFuture()) - .isOffline(replica.getIsOffline()) - .isPreferredLeader(replica.getIsPreferredLeader()) - .path(replica.getPath()) - .build()) - .collect(Collectors.toList())); + try { + var outerClusterInfo = ClusterInfoOuterClass.ClusterInfo.parseFrom(bytes); + return ClusterInfo.of( + outerClusterInfo.getClusterId(), + outerClusterInfo.getNodeInfoList().stream() + .map( + nodeInfo -> NodeInfo.of(nodeInfo.getId(), nodeInfo.getHost(), nodeInfo.getPort())) + .collect(Collectors.toList()), + outerClusterInfo.getTopicList().stream() + .map( + protoTopic -> + new Topic() { + @Override + public String name() { + return protoTopic.getName(); + } + + @Override + public Config config() { + return Config.of(protoTopic.getConfigMap()); + } + + @Override + public boolean internal() { + return protoTopic.getInternal(); + } + + @Override + public Set topicPartitions() { + return protoTopic.getPartitionList().stream() + .map(tp -> TopicPartition.of(protoTopic.getName(), tp)) + .collect(Collectors.toSet()); + } + }) + .collect(Collectors.toMap(Topic::name, Function.identity())), + outerClusterInfo.getReplicaList().stream() + .map( + replica -> + Replica.builder() + .topic(replica.getTopic()) + .partition(replica.getPartition()) + .nodeInfo( + NodeInfo.of( + replica.getNodeInfo().getId(), + replica.getNodeInfo().getHost(), + replica.getNodeInfo().getPort())) + .lag(replica.getLag()) + .size(replica.getSize()) + .isLeader(replica.getIsLeader()) + .isSync(replica.getIsSync()) + .isFuture(replica.getIsFuture()) + .isOffline(replica.getIsOffline()) + .isPreferredLeader(replica.getIsPreferredLeader()) + .path(replica.getPath()) + .build()) + .collect(Collectors.toList())); + } catch (InvalidProtocolBufferException ex) { + throw new SerializationException(ex); + } } // --------------------------------ProtoBuf Primitive----------------------------------------- // @@ -371,7 +389,7 @@ public Set topicPartitions() { * Convert java primitive type to "one of" protocol buffer primitive type. There are no "short" * and "char" in Protocol Buffers. Use "int" and "String" instead. */ - private static PrimitiveOuterClass.Primitive primitive(Object v) { + private static PrimitiveOuterClass.Primitive primitive(Object v) throws SerializationException { if (v instanceof Integer) return PrimitiveOuterClass.Primitive.newBuilder().setInt((int) v).build(); else if (v instanceof Long) @@ -385,7 +403,7 @@ else if (v instanceof Boolean) else if (v instanceof String) return PrimitiveOuterClass.Primitive.newBuilder().setStr(v.toString()).build(); else - throw new IllegalArgumentException( + throw new SerializationException( "Type " + v.getClass() + " is not supported. Please use Integer, Long, Float, Double, Boolean, String instead."); diff --git a/common/src/main/java/org/astraea/common/SerializationException.java b/common/src/main/java/org/astraea/common/SerializationException.java new file mode 100644 index 0000000000..13506c2e8a --- /dev/null +++ b/common/src/main/java/org/astraea/common/SerializationException.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.common; + +public class SerializationException extends IllegalArgumentException { + public SerializationException(Exception ex) { + super(ex); + } + + public SerializationException(String message) { + super(message); + } +} diff --git a/common/src/main/java/org/astraea/common/backup/RecordReaderBuilder.java b/common/src/main/java/org/astraea/common/backup/RecordReaderBuilder.java index 802dc5041e..53924d3281 100644 --- a/common/src/main/java/org/astraea/common/backup/RecordReaderBuilder.java +++ b/common/src/main/java/org/astraea/common/backup/RecordReaderBuilder.java @@ -24,7 +24,7 @@ import java.util.zip.GZIPInputStream; import org.astraea.common.ByteUtils; import org.astraea.common.Header; -import org.astraea.common.Utils; +import org.astraea.common.SerializationException; import org.astraea.common.consumer.Record; import org.astraea.common.generated.RecordOuterClass; @@ -58,24 +58,27 @@ public Record next() { /** Parsed message if successful, or null if the stream is at EOF. */ private static Record readRecord(InputStream inputStream) { - var outerRecord = - Utils.packException(() -> RecordOuterClass.Record.parseDelimitedFrom(inputStream)); - // inputStream reaches EOF - if (outerRecord == null) return null; - return Record.builder() - .topic(outerRecord.getTopic()) - .headers( - outerRecord.getHeadersList().stream() - .map(header -> new Header(header.getKey(), header.getValue().toByteArray())) - .toList()) - .key(outerRecord.getKey().toByteArray()) - .value(outerRecord.getValue().toByteArray()) - .offset(outerRecord.getOffset()) - .timestamp(outerRecord.getTimestamp()) - .partition(outerRecord.getPartition()) - .serializedKeySize(outerRecord.getKey().size()) - .serializedValueSize(outerRecord.getValue().size()) - .build(); + try { + var outerRecord = RecordOuterClass.Record.parseDelimitedFrom(inputStream); + // inputStream reaches EOF + if (outerRecord == null) return null; + return Record.builder() + .topic(outerRecord.getTopic()) + .headers( + outerRecord.getHeadersList().stream() + .map(header -> new Header(header.getKey(), header.getValue().toByteArray())) + .toList()) + .key(outerRecord.getKey().toByteArray()) + .value(outerRecord.getValue().toByteArray()) + .offset(outerRecord.getOffset()) + .timestamp(outerRecord.getTimestamp()) + .partition(outerRecord.getPartition()) + .serializedKeySize(outerRecord.getKey().size()) + .serializedValueSize(outerRecord.getValue().size()) + .build(); + } catch (IOException e) { + throw new SerializationException(e); + } } private InputStream fs; diff --git a/common/src/test/java/org/astraea/common/serializer/BeanObjectSerializerTest.java b/common/src/test/java/org/astraea/common/serializer/BeanObjectSerializerTest.java index 003faca58d..a486ad4ace 100644 --- a/common/src/test/java/org/astraea/common/serializer/BeanObjectSerializerTest.java +++ b/common/src/test/java/org/astraea/common/serializer/BeanObjectSerializerTest.java @@ -18,6 +18,7 @@ import java.util.List; import java.util.Map; +import org.astraea.common.SerializationException; import org.astraea.common.consumer.Deserializer; import org.astraea.common.metrics.BeanObject; import org.astraea.common.producer.Serializer; @@ -63,10 +64,23 @@ public void testSerializationDeserialization() { public void testUnsupportedType() { var domain = "domain"; var properties = Map.of("name", "wrongType"); - var attributes = Map.of("map", (Object) Map.of("k", "v")); + var attributes = Map.of("unsupportedType", new Object()); + var bean = new BeanObject(domain, properties, attributes); + var serializedBean = Serializer.BEAN_OBJECT.serialize("ignore", List.of(), bean); + var deserializedBean = + Deserializer.BEAN_OBJECT.deserialize("ignore", List.of(), serializedBean); + // The "map" attribute should be ignored on serialization + Assertions.assertNotNull(bean.attributes().get("unsupportedType")); + Assertions.assertNull(deserializedBean.attributes().get("unsupportedType")); + Assertions.assertNotEquals(bean, deserializedBean); + } + + @Test + public void testInvalidBytes() { + byte[] malformed = new byte[5]; Assertions.assertThrows( - IllegalArgumentException.class, - () -> Serializer.BEAN_OBJECT.serialize("ignore", List.of(), bean)); + SerializationException.class, + () -> Deserializer.BEAN_OBJECT.deserialize("ignore", List.of(), malformed)); } }