original, String key) {
+ this.original = original;
+ this.key = key;
+ }
+
+ protected Header makeNext() {
+ while (original.hasNext()) {
+ Header header = original.next();
+ if (!header.key().equals(key)) {
+ continue;
+ }
+ return header;
+ }
+ return this.allDone();
+ }
+
+ @Override
+ public boolean hasNext() {
+ switch (state) {
+ case FAILED:
+ throw new IllegalStateException("Iterator is in failed state");
+ case DONE:
+ return false;
+ case READY:
+ return true;
+ default:
+ return maybeComputeNext();
+ }
+ }
+
+ @Override
+ public Header next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ state = State.NOT_READY;
+ if (next == null) {
+ throw new IllegalStateException("Expected item but none found.");
+ }
+ return next;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Removal not supported");
+ }
+
+
+
+
+ public Header peek() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ return next;
+ }
+
+ protected Header allDone() {
+ state = State.DONE;
+ return null;
+ }
+
+
+ private Boolean maybeComputeNext() {
+ state = State.FAILED;
+ next = makeNext();
+ if (state == State.DONE) {
+ return false;
+ } else {
+ state = State.READY;
+ return true;
+ }
+ }
+ }
+}
diff --git a/connector/src/main/java/io/openmessaging/connector/api/header/Header.java b/connector/src/main/java/io/openmessaging/connector/api/header/Header.java
new file mode 100644
index 0000000..647d439
--- /dev/null
+++ b/connector/src/main/java/io/openmessaging/connector/api/header/Header.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.connector.api.header;
+
+import io.openmessaging.connector.api.data.Meta;
+import io.openmessaging.connector.api.data.MetaAndData;
+
+/**
+ * A {@link Header} is a key-value pair, and multiple headers can be included with the key, value, and timestamp in each RocketMQ message.
+ * The data contains both the meta information and the value object.
+ *
+ * This is an immutable interface.
+ */
+public interface Header {
+
+ /**
+ * Get the header's value as deserialized by Connect's header converter.
+ *
+ * @return
+ */
+ MetaAndData data();
+
+ /**
+ * The header's key, which is not necessarily unique within the set of headers on a RocketMQ message.
+ *
+ * @return the header's key; never null
+ */
+ String key();
+
+ /**
+ * Return a new {@link Header} object that has the same key but with the supplied value.
+ *
+ * @param meta the meta for the new value; may be null
+ * @param value the new value
+ * @return the new {@link Header}; never null
+ */
+ Header with(Meta meta, Object value);
+
+ /**
+ * Return a new {@link Header} object that has the same meta and value but with the supplied key.
+ *
+ * @param key the key for the new header; may not be null
+ * @return the new {@link Header}; never null
+ */
+ Header rename(String key);
+}
diff --git a/connector/src/main/java/io/openmessaging/connector/api/header/Headers.java b/connector/src/main/java/io/openmessaging/connector/api/header/Headers.java
new file mode 100644
index 0000000..de72637
--- /dev/null
+++ b/connector/src/main/java/io/openmessaging/connector/api/header/Headers.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.connector.api.header;
+
+import java.math.BigDecimal;
+import java.util.List;
+import java.util.Map;
+
+import io.openmessaging.connector.api.data.Meta;
+import io.openmessaging.connector.api.data.Struct;
+
+/**
+ * A mutable ordered collection of {@link Header} objects.
+ * Note that multiple headers shouldn't have the same {@link Header#key() key}.
+ */
+public interface Headers extends Iterable {
+
+ /**
+ * Get the number of headers in this object.
+ *
+ * @return the number of headers; never negative
+ */
+ int size();
+
+ /**
+ * Determine whether this object has no headers.
+ *
+ * @return true if there are no headers, or false if there is at least one header
+ */
+ boolean isEmpty();
+
+ /**
+ * Get the collection of {@link Header} objects whose {@link Header#key() keys} all match the specified key.
+ *
+ * @param key the key; may not be null
+ * @return the iterator over headers with the specified key; may be null if there are no headers with the
+ * specified key
+ */
+ Header findHeader(String key);
+
+ /**
+ * Get the map of {@link Header} objects.
+ *
+ * @return the map of headers
+ */
+ Map toMap();
+
+ /**
+ * Add the given {@link Header} to this collection.
+ *
+ * @param header the header; may not be null
+ * @return this object to facilitate chaining multiple methods; never null
+ */
+ Headers add(Header header);
+
+ /**
+ * Add to this collection a {@link Header} with the given key and value.
+ *
+ * @param key the header's key; may not be null
+ * @param value the header's value; may be null
+ * @param meta the meta for the header's value; may not be null if the value is not null
+ * @return this object to facilitate chaining multiple methods; never null
+ */
+ Headers add(String key, Meta meta, Object value);
+
+ /**
+ * Add to this collection a {@link Header} with the given key and value.
+ *
+ * @param key the header's key; may not be null
+ * @param value the header's value; may be null
+ * @return this object to facilitate chaining multiple methods; never null
+ */
+ Headers addString(String key, String value);
+
+ /**
+ * Add to this collection a {@link Header} with the given key and value.
+ *
+ * @param key the header's key; may not be null
+ * @param value the header's value; may be null
+ * @return this object to facilitate chaining multiple methods; never null
+ */
+ Headers addBoolean(String key, boolean value);
+
+ /**
+ * Add to this collection a {@link Header} with the given key and value.
+ *
+ * @param key the header's key; may not be null
+ * @param value the header's value; may be null
+ * @return this object to facilitate chaining multiple methods; never null
+ */
+ Headers addByte(String key, byte value);
+
+ /**
+ * Add to this collection a {@link Header} with the given key and value.
+ *
+ * @param key the header's key; may not be null
+ * @param value the header's value; may be null
+ * @return this object to facilitate chaining multiple methods; never null
+ */
+ Headers addShort(String key, short value);
+
+ /**
+ * Add to this collection a {@link Header} with the given key and value.
+ *
+ * @param key the header's key; may not be null
+ * @param value the header's value; may be null
+ * @return this object to facilitate chaining multiple methods; never null
+ */
+ Headers addInt(String key, int value);
+
+ /**
+ * Add to this collection a {@link Header} with the given key and value.
+ *
+ * @param key the header's key; may not be null
+ * @param value the header's value; may be null
+ * @return this object to facilitate chaining multiple methods; never null
+ */
+ Headers addLong(String key, long value);
+
+ /**
+ * Add to this collection a {@link Header} with the given key and value.
+ *
+ * @param key the header's key; may not be null
+ * @param value the header's value; may be null
+ * @return this object to facilitate chaining multiple methods; never null
+ */
+ Headers addFloat(String key, float value);
+
+ /**
+ * Add to this collection a {@link Header} with the given key and value.
+ *
+ * @param key the header's key; may not be null
+ * @param value the header's value; may be null
+ * @return this object to facilitate chaining multiple methods; never null
+ */
+ Headers addDouble(String key, double value);
+
+ /**
+ * Add to this collection a {@link Header} with the given key and value.
+ *
+ * @param key the header's key; may not be null
+ * @param value the header's value; may be null
+ * @return this object to facilitate chaining multiple methods; never null
+ */
+ Headers addBytes(String key, byte[] value);
+
+ /**
+ * Add to this collection a {@link Header} with the given key and value.
+ *
+ * @param key the header's key; may not be null
+ * @param value the header's value; may be null
+ * @param meta the meta describing the list value; may not be null
+ * @return this object to facilitate chaining multiple methods; never null
+ * @throws Exception if the header's value is invalid
+ */
+ Headers addList(String key, List> value, Meta meta);
+
+ /**
+ * Add to this collection a {@link Header} with the given key and value.
+ *
+ * @param key the header's key; may not be null
+ * @param value the header's value; may be null
+ * @param meta the meta describing the map value; may not be null
+ * @return this object to facilitate chaining multiple methods; never null
+ * @throws Exception if the header's value is invalid
+ */
+ Headers addMap(String key, Map, ?> value, Meta meta);
+
+ /**
+ * Add to this collection a {@link Header} with the given key and value.
+ *
+ * @param key the header's key; may not be null
+ * @param value the header's value; may be null
+ * @return this object to facilitate chaining multiple methods; never null
+ * @throws Exception if the header's value is invalid
+ */
+ Headers addStruct(String key, Struct value);
+
+ /**
+ * Add to this collection a {@link Header} with the given key and value.
+ *
+ * @param key the header's key; may not be null
+ * @param value the header's value; may be null
+ * @return this object to facilitate chaining multiple methods; never null
+ */
+ Headers addDecimal(String key, BigDecimal value);
+
+ /**
+ * Add to this collection a {@link Header} with the given key and value.
+ *
+ * @param key the header's key; may not be null
+ * @param value the header's value; may be null
+ * @return this object to facilitate chaining multiple methods; never null
+ */
+ Headers addDate(String key, java.util.Date value);
+
+ /**
+ * Add to this collection a {@link Header} with the given key and value.
+ *
+ * @param key the header's key; may not be null
+ * @param value the header's value; may be null
+ * @return this object to facilitate chaining multiple methods; never null
+ */
+ Headers addTime(String key, java.util.Date value);
+
+ /**
+ * Add to this collection a {@link Header} with the given key and value.
+ *
+ * @param key the header's key; may not be null
+ * @param value the header's value; may be null
+ * @return this object to facilitate chaining multiple methods; never null
+ */
+ Headers addTimestamp(String key, java.util.Date value);
+
+ /**
+ * Removes all {@link Header} objects whose {@link Header#key() key} matches the specified key.
+ *
+ * @param key the key; may not be null
+ * @return this object to facilitate chaining multiple methods; never null
+ */
+ Headers remove(String key);
+
+ /**
+ * Removes all headers from this object.
+ *
+ * @return this object to facilitate chaining multiple methods; never null
+ */
+ Headers clear();
+
+ /**
+ * Create a copy of this {@link Headers} object. The new copy will contain all of the same {@link Header} objects as
+ * this object.
+ *
+ * @return the copy; never null
+ */
+ Headers duplicate();
+
+}
diff --git a/connector/src/main/test/io/openmessaging/connector/api/data/HeadersTest.java b/connector/src/main/test/io/openmessaging/connector/api/data/HeadersTest.java
new file mode 100644
index 0000000..3ac4c4a
--- /dev/null
+++ b/connector/src/main/test/io/openmessaging/connector/api/data/HeadersTest.java
@@ -0,0 +1,49 @@
+package io.openmessaging.connector.api.data;
+
+import java.math.BigDecimal;
+import java.util.Map;
+
+import io.openmessaging.connector.api.header.DataHeader;
+import io.openmessaging.connector.api.header.DataHeaders;
+import io.openmessaging.connector.api.header.Header;
+import io.openmessaging.connector.api.header.Headers;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class HeadersTest {
+
+
+ @Test
+ public void commonTest(){
+ Meta strMeta = MetaBuilder.string().name("myStrMeta").build();
+ Header header = new DataHeader("header1", strMeta, "headerValue1");
+ //System.out.println("construct header: " + header);
+ assertNotNull(header);
+
+ Headers headers = new DataHeaders();
+ headers.add(header)
+ .add("header1", strMeta, "headerValue2")
+ .addBoolean("header2", true)
+ .addDecimal("header3 ", BigDecimal.ONE)
+ ;
+ //System.out.println("construct headers: " + headers);
+ assertNotNull(headers);
+
+ Header findHeader1 = headers.findHeader("header1");
+ //System.out.println("except header named header1, real is: " + findHeader1);
+ assertEquals(findHeader1.key(), "header1");
+ findHeader1.rename("header2");
+
+ Header findHeader2 = headers.findHeader("header2");
+ //System.out.println("except header valued TRUE, real is: " + findHeader2);
+ assertEquals(findHeader2.key(), "header2");
+
+ Map headerMap = headers.toMap();
+ //System.out.println("show header Map: " + headerMap);
+ assertNotNull(headerMap);
+ headerMap.put("header3", header);
+ }
+
+}
diff --git a/connector/src/main/test/io/openmessaging/connector/api/data/MetaAndDataTest.java b/connector/src/main/test/io/openmessaging/connector/api/data/MetaAndDataTest.java
new file mode 100644
index 0000000..1de0448
--- /dev/null
+++ b/connector/src/main/test/io/openmessaging/connector/api/data/MetaAndDataTest.java
@@ -0,0 +1,105 @@
+package io.openmessaging.connector.api.data;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+public class MetaAndDataTest {
+
+
+ @Test
+ public void structParserTest(){
+ Meta structMeta = MetaBuilder.struct()
+ .name("structMeta")
+ .field("f1", Meta.STRING_META)
+ .field("f2", Meta.INT32_META)
+ .field("f3", MetaBuilder.array(Meta.STRING_META).build())
+ .build();
+ MetaAndData structMetaAndData = new MetaAndData(
+ structMeta
+ );
+
+ structMetaAndData.putData("f1", "asdf");
+ structMetaAndData.putData("f2", 32);
+ structMetaAndData.putData("f3", new ArrayList(){{
+ add("a");
+ add("b");
+ }});
+
+ String jsonStr = structMetaAndData.convertToString();
+ assertNotNull(jsonStr);
+ //System.out.println(jsonStr);
+
+ // MAP MetaAndData
+ MetaAndData newMetaAndData = MetaAndData.getMetaDataFromString(jsonStr);
+ assertNotNull(newMetaAndData);
+ //System.out.println(newMetaAndData);
+
+ // MAP TO STRUCT
+ Struct struct = newMetaAndData.convertToStruct();
+ assertNotNull(struct);
+ //System.out.println(struct);
+ }
+
+ @Test
+ public void listParserTest(){
+ String str = "[1, 2, 3, \"four\"]";
+ MetaAndData result = MetaAndData.getMetaDataFromString(str);
+
+ List> list = (List>) result.getData();
+ //System.out.println(list);
+ assertEquals(4, list.size());
+ assertEquals(1, ((Number) list.get(0)).intValue());
+ assertEquals(2, ((Number) list.get(1)).intValue());
+ assertEquals(3, ((Number) list.get(2)).intValue());
+ assertEquals("four", list.get(3));
+ }
+
+ @Test
+ public void commonTest(){
+ MetaAndData metaAndData = new MetaAndData(Decimal.builder(10).build(), BigDecimal.ONE);
+ assertNotNull(metaAndData);
+
+ BigDecimal de = metaAndData.convertToDecimal(10);
+ assertEquals(de, BigDecimal.ONE);
+ assertNotEquals(de, 1);
+
+ BigDecimal de2 = metaAndData.convertToDecimal(1);
+ assertEquals(de2, BigDecimal.ONE);
+
+ metaAndData.putData(BigDecimal.TEN);
+ assertEquals(metaAndData.convertToDecimal(10), BigDecimal.TEN);
+
+ assertEquals((BigDecimal)metaAndData.getData(), BigDecimal.TEN);
+
+ assertNull(metaAndData.inferMeta());
+ }
+
+ @Test
+ public void stringTest(){
+ MetaAndData metaAndData = new MetaAndData(Meta.STRING_META, "message value 中文信息");
+ assertNotNull(metaAndData);
+
+ String msg = metaAndData.convertToString();
+ assertEquals(msg, "message value 中文信息");
+
+ Exception ex = null;
+ try{
+ metaAndData.convertToDouble();
+ }catch (Exception e){
+ ex = e;
+ }
+ assertNotNull(ex);
+ }
+
+
+
+
+}
diff --git a/connector/src/main/test/io/openmessaging/connector/api/data/MetaTest.java b/connector/src/main/test/io/openmessaging/connector/api/data/MetaTest.java
new file mode 100644
index 0000000..9420e63
--- /dev/null
+++ b/connector/src/main/test/io/openmessaging/connector/api/data/MetaTest.java
@@ -0,0 +1,508 @@
+package io.openmessaging.connector.api.data;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class MetaTest {
+
+ @Test
+ public void staticMethod(){
+ Type type = Meta.getMetaType(String.class);
+ assertEquals(type, Type.STRING);
+ type = Meta.getMetaType(Struct.class);
+ assertEquals(type, Type.STRUCT);
+ type = Meta.getMetaType(Map.class);
+ assertEquals(type, Type.MAP);
+ type = Meta.getMetaType(ArrayList.class);
+ assertEquals(type, Type.ARRAY);
+
+
+
+ Meta.validateValue(Meta.BYTES_META, "kfc".getBytes());
+ Exception error = null;
+ try{
+ // Throw exception for the value of STRING do not match the meta of BYTES .
+ Meta.validateValue(Meta.BYTES_META, "kfc");
+ }catch (Exception e){
+ error = e;
+ }
+ assertNotNull(error);
+
+ Meta.validateValue("name", Meta.STRING_META, "fasd");
+
+ List list = new ArrayList<>();
+ list.add("a");
+ list.add("b");
+ list.add("c");
+ Object obj = list;
+ Meta.validateValue(MetaBuilder.array(Meta.STRING_META).build(), obj);
+
+ error = null;
+ try{
+ // Throw exception for the value of ARRAY do not match the meta of BYTES .
+ Meta.validateValue(MetaBuilder.array(Meta.BYTES_META).build(), obj);
+ }catch (Exception e){
+ error = e;
+ }
+ assertNotNull(error);
+ }
+
+ @Test
+ public void int8(){
+ System.out.println("================INT8================");
+ Meta meta1 = MetaBuilder.int8()
+ .name("build1")
+ .version(100)
+ .dataSource("datasource1")
+ .parameter("p1","v1")
+ .parameter("p2", "v2")
+ .build();
+ System.out.println(meta1);
+
+ Meta meta2 = MetaBuilder.int8()
+ .name("build2")
+ .dataSource("datasource1")
+ .build();
+ System.out.println(meta2);
+
+ Meta meta3 = MetaBuilder.int8()
+ .dataSource("datasource1")
+ .build();
+ System.out.println(meta3);
+
+ Meta meta4 = MetaBuilder.int8()
+ .build();
+ System.out.println(meta4);
+
+ Meta meta = new MetaBase(Type.INT8, "abc", 1,"dataSource", null);
+ System.out.println(meta);
+
+ System.out.println(Meta.INT8_META);
+ }
+
+ @Test
+ public void int16(){
+ System.out.println("================INT16================");
+ Meta meta1 = MetaBuilder.int16()
+ .name("build1")
+ .version(100)
+ .dataSource("datasource1")
+ .parameter("p1","v1")
+ .parameter("p2", "v2")
+ .build();
+ System.out.println(meta1);
+
+ Meta meta2 = MetaBuilder.int16()
+ .name("build2")
+ .dataSource("datasource1")
+ .build();
+ System.out.println(meta2);
+
+ Meta meta3 = MetaBuilder.int16()
+ .dataSource("datasource1")
+ .build();
+ System.out.println(meta3);
+
+ Meta meta4 = MetaBuilder.int16()
+ .build();
+ System.out.println(meta4);
+
+ Meta meta = new MetaBase(Type.INT16, "abc", 1, "dataSource", null);
+ System.out.println(meta);
+
+ System.out.println(Meta.INT16_META);
+ }
+
+
+ @Test
+ public void int32(){
+ System.out.println("================INT32================");
+ Meta meta1 = MetaBuilder.int32()
+ .name("build1")
+ .version(100)
+ .dataSource("datasource1")
+ .parameter("p1","v1")
+ .parameter("p2", "v2")
+ .build();
+ System.out.println(meta1);
+
+ Meta meta2 = MetaBuilder.int32()
+ .name("build2")
+ .dataSource("datasource1")
+ .build();
+ System.out.println(meta2);
+
+ Meta meta3 = MetaBuilder.int32()
+ .dataSource("datasource1")
+ .build();
+ System.out.println(meta3);
+
+ Meta meta4 = MetaBuilder.int32()
+ .build();
+ System.out.println(meta4);
+
+ Meta meta = new MetaBase(Type.INT32, "abc", 1, "dataSource", null);
+ System.out.println(meta);
+
+ System.out.println(Meta.INT32_META);
+ }
+
+
+
+ @Test
+ public void int64(){
+ System.out.println("================INT64================");
+ Meta meta1 = MetaBuilder.int64()
+ .name("build1")
+ .version(100)
+ .dataSource("datasource1")
+ .parameter("p1","v1")
+ .parameter("p2", "v2")
+ .build();
+ System.out.println(meta1);
+
+ Meta meta2 = MetaBuilder.int64()
+ .name("build2")
+ .dataSource("datasource1")
+ .build();
+ System.out.println(meta2);
+
+ Meta meta3 = MetaBuilder.int64()
+ .dataSource("datasource1")
+ .build();
+ System.out.println(meta3);
+
+ Meta meta4 = MetaBuilder.int64()
+ .build();
+ System.out.println(meta4);
+
+ Meta meta = new MetaBase(Type.INT64, "abc", 1, "dataSource", null);
+ System.out.println(meta);
+
+ System.out.println(Meta.INT64_META);
+ }
+
+
+
+
+ @Test
+ public void float32(){
+ System.out.println("================FLOAT32================");
+ Meta meta1 = MetaBuilder.float32()
+ .name("build1")
+ .version(100)
+ .dataSource("datasource1")
+ .parameter("p1","v1")
+ .parameter("p2", "v2")
+ .build();
+ System.out.println(meta1);
+
+ Meta meta2 = MetaBuilder.float32()
+ .name("build2")
+ .dataSource("datasource1")
+ .build();
+ System.out.println(meta2);
+
+ Meta meta3 = MetaBuilder.float32()
+ .dataSource("datasource1")
+ .build();
+ System.out.println(meta3);
+
+ Meta meta4 = MetaBuilder.float32()
+ .build();
+ System.out.println(meta4);
+
+ Meta meta = new MetaBase(Type.FLOAT32, "abc", 1, "dataSource", null);
+ System.out.println(meta);
+
+ System.out.println(Meta.FLOAT32_META);
+ }
+
+
+ @Test
+ public void float64(){
+ System.out.println("================FLOAT64================");
+ Meta meta1 = MetaBuilder.float64()
+ .name("build1")
+ .version(100)
+ .dataSource("datasource1")
+ .parameter("p1","v1")
+ .parameter("p2", "v2")
+ .build();
+ System.out.println(meta1);
+
+ Meta meta2 = MetaBuilder.float64()
+ .name("build2")
+ .dataSource("datasource1")
+ .build();
+ System.out.println(meta2);
+
+ Meta meta3 = MetaBuilder.float64()
+ .dataSource("datasource1")
+ .build();
+ System.out.println(meta3);
+
+ Meta meta4 = MetaBuilder.float64()
+ .build();
+ System.out.println(meta4);
+
+ Meta meta = new MetaBase(Type.FLOAT64, "abc", 1, "dataSource", null);
+ System.out.println(meta);
+
+ System.out.println(Meta.FLOAT64_META);
+ }
+
+
+ @Test
+ public void bool(){
+ System.out.println("================BOOLEAN================");
+ Meta meta1 = MetaBuilder.bool()
+ .name("build1")
+ .version(100)
+ .dataSource("datasource1")
+ .parameter("p1","v1")
+ .parameter("p2", "v2")
+ .build();
+ System.out.println(meta1);
+
+ Meta meta2 = MetaBuilder.bool()
+ .name("build2")
+ .dataSource("datasource1")
+ .build();
+ System.out.println(meta2);
+
+ Meta meta3 = MetaBuilder.bool()
+ .dataSource("datasource1")
+ .build();
+ System.out.println(meta3);
+
+ Meta meta4 = MetaBuilder.bool()
+ .build();
+ System.out.println(meta4);
+
+ Meta meta = new MetaBase(Type.BOOLEAN, "abc", 1, "dataSource", null);
+ System.out.println(meta);
+
+ System.out.println(Meta.BOOLEAN_META);
+ }
+
+
+ @Test
+ public void str(){
+ System.out.println("================STRING================");
+ Meta meta1 = MetaBuilder.string()
+ .name("build1")
+ .version(100)
+ .dataSource("datasource1")
+ .parameter("p1","v1")
+ .parameter("p2", "v2")
+ .build();
+ System.out.println(meta1);
+
+ Meta meta2 = MetaBuilder.string()
+ .name("build2")
+ .dataSource("datasource1")
+ .build();
+ System.out.println(meta2);
+
+ Meta meta3 = MetaBuilder.string()
+ .dataSource("datasource1")
+ .build();
+ System.out.println(meta3);
+
+ Meta meta4 = MetaBuilder.string()
+ .build();
+ System.out.println(meta4);
+
+ Meta meta = new MetaBase(Type.STRING, "abc", 1, "dataSource", null);
+ System.out.println(meta);
+
+ System.out.println(Meta.STRING_META);
+ }
+
+
+ @Test
+ public void bytes(){
+ System.out.println("================BYTES================");
+ Meta meta1 = MetaBuilder.bytes()
+ .name("build1")
+ .version(100)
+ .dataSource("datasource1")
+ .parameter("p1","v1")
+ .parameter("p2", "v2")
+ .build();
+ System.out.println(meta1);
+
+ Meta meta2 = MetaBuilder.bytes()
+ .name("build2")
+ .dataSource("datasource1")
+ .build();
+ System.out.println(meta2);
+
+ Meta meta3 = MetaBuilder.bytes()
+ .dataSource("datasource1")
+ .build();
+ System.out.println(meta3);
+
+ Meta meta4 = MetaBuilder.bytes()
+ .build();
+ System.out.println(meta4);
+
+ Meta meta = new MetaBase(Type.BYTES, "abc", 1, "dataSource", null);
+ System.out.println(meta);
+
+ System.out.println(Meta.BYTES_META);
+ }
+
+
+ @Test
+ public void array(){
+ System.out.println("================ARRAY================");
+ Meta meta1 = MetaBuilder.array(Meta.STRING_META)
+ .name("build1")
+ .version(100)
+ .dataSource("datasource1")
+ .parameter("p1","v1")
+ .parameter("p2", "v2")
+ .build();
+ System.out.println(meta1);
+
+ Meta meta2 = MetaBuilder.array(Meta.STRING_META)
+ .name("build2")
+ .dataSource("datasource1")
+ .build();
+ System.out.println(meta2);
+
+ Meta meta3 = MetaBuilder.array(Meta.STRING_META)
+ .dataSource("datasource1")
+ .build();
+ System.out.println(meta3);
+
+ Meta meta4 = MetaBuilder.array(Meta.STRING_META)
+ .build();
+ System.out.println(meta4);
+
+ Meta meta = new MetaArray(Type.ARRAY, "abc", 1, "dataSource",
+ null, Meta.STRING_META);
+ System.out.println(meta);
+
+ try{
+ Meta metaError = new MetaMap(Type.MAP, "abc", 1, "dataSource",
+ null, Meta.STRING_META, Meta.BYTES_META);
+ }catch (Exception e){
+ System.out.println("error for new MataMap but Type is not ARRAY.");
+ }
+ }
+
+
+ @Test
+ public void map(){
+ System.out.println("================MAP================");
+ Meta meta1 = MetaBuilder.map(Meta.STRING_META, Meta.BYTES_META)
+ .name("build1")
+ .version(100)
+ .dataSource("datasource1")
+ .parameter("p1","v1")
+ .parameter("p2", "v2")
+ .build();
+ System.out.println(meta1);
+
+ Meta meta2 = MetaBuilder.map(Meta.STRING_META, Meta.BYTES_META)
+ .name("build2")
+ .dataSource("datasource1")
+ .build();
+ System.out.println(meta2);
+
+ Meta meta3 = MetaBuilder.map(Meta.STRING_META, Meta.BYTES_META)
+ .dataSource("datasource1")
+ .build();
+ System.out.println(meta3);
+
+ Meta meta4 = MetaBuilder.map(Meta.STRING_META, Meta.BYTES_META)
+ .build();
+ System.out.println(meta4);
+
+ Meta meta = new MetaMap(Type.MAP, "abc", 1, "dataSource",
+ null, Meta.STRING_META, Meta.BYTES_META);
+ System.out.println(meta);
+
+ try{
+ Meta metaError = new MetaMap(Type.ARRAY, "abc", 1, "dataSource",
+ null, Meta.STRING_META, Meta.BYTES_META);
+ }catch (Exception e){
+ System.out.println("error for new MataMap but Type is not MAP.");
+ }
+
+ }
+
+
+ @Test
+ public void struct(){
+ System.out.println("================STRUCT================");
+ Meta meta1 = MetaBuilder.struct()
+ .name("build1")
+ .version(100)
+ .dataSource("datasource1")
+ .parameter("p1","v1")
+ .parameter("p2", "v2")
+ .field("f1", Meta.STRING_META)
+ .field("f2", Meta.BOOLEAN_META)
+ .field("f3", Meta.FLOAT32_META)
+ .build();
+ System.out.println(meta1);
+
+ Meta meta2 = MetaBuilder.struct()
+ .name("build2")
+ .dataSource("datasource1")
+ .field("f1", Meta.STRING_META)
+ .field("f2", Meta.BOOLEAN_META)
+ .field("f3", Meta.FLOAT32_META)
+ .build();
+ System.out.println(meta2);
+
+ Meta meta3 = MetaBuilder.struct()
+ .dataSource("datasource1")
+ .field("f1", Meta.STRING_META)
+ .field("f2", Meta.BOOLEAN_META)
+ .field("f3", Meta.FLOAT32_META)
+ .build();
+ System.out.println(meta3);
+
+ Meta meta4 = MetaBuilder.struct()
+ .field("f1", Meta.STRING_META)
+ .field("f2", Meta.BOOLEAN_META)
+ .field("f3", Meta.FLOAT32_META)
+ .build();
+ System.out.println(meta4);
+
+ Meta meta5 = MetaBuilder.struct()
+ .build();
+ System.out.println(meta5);
+
+ int size = 10;
+ List fields = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ Field field = new Field(i, "f"+i, Meta.STRING_META);
+ fields.add(field);
+ }
+ Meta meta = new MetaStruct(Type.STRUCT, "abc", 1, "dataSource",
+ null, fields);
+ System.out.println(meta);
+
+ try{
+ Meta metaError = new MetaStruct(Type.MAP, "abc", 1, "dataSource",
+ null, fields);
+ }catch (Exception e){
+ System.out.println("error for new MetaStruct but Type is not STRUCT.");
+ }
+
+ }
+
+}
diff --git a/connector/src/main/test/io/openmessaging/connector/api/data/SinkDataEntryTest.java b/connector/src/main/test/io/openmessaging/connector/api/data/SinkDataEntryTest.java
new file mode 100644
index 0000000..628e4be
--- /dev/null
+++ b/connector/src/main/test/io/openmessaging/connector/api/data/SinkDataEntryTest.java
@@ -0,0 +1,117 @@
+package io.openmessaging.connector.api.data;
+
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class SinkDataEntryTest {
+
+ /**
+ * simple test for kv data
+ * SET myset asldfjsaldjglas PX 360000
+ */
+ @Test
+ public void testKV() {
+ String command = "SET";
+ String key = "myset";
+ String value = "asldfjsaldjglas";
+ long px = 360000;
+
+ DataEntryBuilder builder =
+ DataEntryBuilder.newDataEntryBuilder()
+ .keyMeta(Meta.STRING_META)
+ .keyData(key)
+ .valueMeta(Meta.STRING_META)
+ .valueData(value)
+ .header("REDIS_COMMAND", command)
+ .header("PX", px);
+
+ SinkDataEntry sinkDataEntry = builder.buildSinkDataEntry(
+ 1500L
+ );
+
+ assertNotNull(sinkDataEntry);
+ assertEquals(sinkDataEntry.getKey().convertToString(), "myset");
+ assertEquals(sinkDataEntry.getValue().convertToString(), "asldfjsaldjglas");
+ assertEquals(sinkDataEntry.getHeaders().findHeader("REDIS_COMMAND").data().convertToString(), "SET");
+ assertTrue(sinkDataEntry.getHeaders().toMap().get("PX").data().convertToLong() == 360000);
+ }
+
+ /**
+ * simple test for table data
+ */
+ @Test
+ public void testTable() {
+ // construct table meta
+ Meta tableMeta = MetaBuilder.struct()
+ .field("id", Meta.INT64_META)
+ .field("name", Meta.STRING_META)
+ .field("age", Meta.INT16_META)
+ .field("score", Meta.INT32_META)
+ .field("isNB", Meta.BOOLEAN_META)
+ .build();
+
+ // construct sourceDataEntry
+ DataEntryBuilder builder =
+ DataEntryBuilder.newDataEntryBuilder(tableMeta)
+ .valueData("id", 1L)
+ .valueData("name", "小红")
+ .valueData("age", (short)17)
+ .valueData("score", 99)
+ .valueData("isNB", true);
+
+ SinkDataEntry sinkDataEntry = builder.buildSinkDataEntry(1500L);
+
+ //System.out.println(sinkDataEntry);
+ assertNotNull(sinkDataEntry);
+ assertTrue(sinkDataEntry.getValue().convertToStruct().getInt64("id") == 1L);
+ assertEquals(sinkDataEntry.getValue().convertToStruct().getString("name"), "小红");
+ assertTrue(sinkDataEntry.getValue().convertToStruct().getInt16("age") == (short)17);
+ assertTrue(sinkDataEntry.getValue().convertToStruct().getInt32("score") == 99);
+ assertTrue(sinkDataEntry.getValue().convertToStruct().getBoolean("isNB"));
+ }
+
+ /**
+ * test data for type of struct
+ *
+ * @return
+ */
+ @Test
+ public void testStruct() {
+ // 1. construct meta
+
+ DataEntryBuilder structDataEntry = new DataEntryBuilder(
+ Meta.STRING_META,
+ MetaBuilder.struct()
+ .name("myStruct")
+ .field("field_string", Meta.STRING_META)
+ .field("field_int32", Meta.INT32_META)
+ .parameter("parameter", "sadfsadf")
+ .build()
+ );
+
+ // 2. construct data
+
+ structDataEntry
+ .queue("last_queue")
+ .queueId(1)
+ .timestamp(System.currentTimeMillis())
+ .entryType(EntryType.UPDATE)
+ .keyData("schema_data_lalala")
+ .valueData("field_string", "nihao")
+ .valueData("field_int32", 321)
+ .header("int_header", 1)
+ ;
+
+ // 3. construct dataEntry
+
+ SinkDataEntry sinkDataEntry = structDataEntry.buildSinkDataEntry(1500L);
+
+ //System.out.println(sinkDataEntry);
+ assertNotNull(sinkDataEntry);
+ }
+
+}
diff --git a/connector/src/main/test/io/openmessaging/connector/api/data/SourceDataEntryTest.java b/connector/src/main/test/io/openmessaging/connector/api/data/SourceDataEntryTest.java
new file mode 100644
index 0000000..5f3eb7b
--- /dev/null
+++ b/connector/src/main/test/io/openmessaging/connector/api/data/SourceDataEntryTest.java
@@ -0,0 +1,128 @@
+package io.openmessaging.connector.api.data;
+
+
+import java.nio.ByteBuffer;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class SourceDataEntryTest {
+
+ /**
+ * simple test for kv data
+ * SET myset asldfjsaldjglas PX 360000
+ */
+ @Test
+ public void testKV(){
+ String command = "SET";
+ String key = "myset";
+ String value = "asldfjsaldjglas";
+ long px = 360000;
+
+
+ DataEntryBuilder builder =
+ DataEntryBuilder.newDataEntryBuilder()
+ .keyMeta(Meta.STRING_META)
+ .keyData(key)
+ .valueMeta(Meta.STRING_META)
+ .valueData(value)
+ .header("REDIS_COMMAND", command)
+ .header("PX", px);
+
+ SourceDataEntry sourceDataEntry = builder.buildSourceDataEntry(
+ ByteBuffer.wrap("partition1".getBytes()),
+ ByteBuffer.wrap("1098".getBytes())
+ );
+
+ assertNotNull(sourceDataEntry);
+ assertEquals(sourceDataEntry.getKey().convertToString(), "myset");
+ assertEquals(sourceDataEntry.getValue().convertToString(), "asldfjsaldjglas");
+ assertEquals(sourceDataEntry.getHeaders().findHeader("REDIS_COMMAND").data().convertToString(), "SET");
+ assertTrue(sourceDataEntry.getHeaders().toMap().get("PX").data().convertToLong() == 360000);
+ //System.out.println(sourceDataEntry);
+ }
+
+ /**
+ * simple test for table data
+ */
+ @Test
+ public void testTable(){
+ // construct table meta
+ Meta tableMeta = MetaBuilder.struct()
+ .field("id", Meta.INT64_META)
+ .field("name", Meta.STRING_META)
+ .field("age", Meta.INT16_META)
+ .field("score", Meta.INT32_META)
+ .field("isNB", Meta.BOOLEAN_META)
+ .build();
+
+ // construct sourceDataEntry
+ DataEntryBuilder builder =
+ DataEntryBuilder.newDataEntryBuilder(tableMeta)
+ .valueData("id", 1L)
+ .valueData("name", "小红")
+ .valueData("age", (short)17)
+ .valueData("score", 99)
+ .valueData("isNB", true);
+
+ SourceDataEntry sourceDataEntry = builder.buildSourceDataEntry(
+ ByteBuffer.wrap("partition1".getBytes()),
+ ByteBuffer.wrap("1098".getBytes())
+ );
+
+ assertNotNull(sourceDataEntry);
+ assertNotNull(sourceDataEntry);
+ assertTrue(sourceDataEntry.getValue().convertToStruct().getInt64("id") == 1L);
+ assertEquals(sourceDataEntry.getValue().convertToStruct().getString("name"), "小红");
+ assertTrue(sourceDataEntry.getValue().convertToStruct().getInt16("age") == (short)17);
+ assertTrue(sourceDataEntry.getValue().convertToStruct().getInt32("score") == 99);
+ assertTrue(sourceDataEntry.getValue().convertToStruct().getBoolean("isNB"));
+ //System.out.println(sourceDataEntry);
+ }
+
+ /**
+ * test data for type of struct
+ *
+ * @return
+ */
+ @Test
+ public void testStruct(){
+ // 1. construct meta
+
+ DataEntryBuilder structDataEntry = new DataEntryBuilder(
+ Meta.STRING_META,
+ MetaBuilder.struct()
+ .name("myStruct")
+ .field("field_string", Meta.STRING_META)
+ .field("field_int32", Meta.INT32_META)
+ .parameter("parameter", "sadfsadf")
+ .build()
+ );
+
+ // 2. construct data
+
+ structDataEntry
+ .queue("last_queue")
+ .queueId(1)
+ .timestamp(System.currentTimeMillis())
+ .entryType(EntryType.UPDATE)
+ .keyData("schema_data_lalala")
+ .valueData("field_string", "nihao")
+ .valueData("field_int32", 321)
+ .header("int_header", 1)
+ ;
+
+ // 3. construct dataEntry
+
+ SourceDataEntry sourceDataEntry = structDataEntry.buildSourceDataEntry(null, null);
+
+ assertNotNull(sourceDataEntry);
+ //System.out.println(sourceDataEntry);
+ }
+
+
+
+}
diff --git a/pom.xml b/pom.xml
index 9559e01..b6bd1b8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -138,6 +138,12 @@
openmessaging-api
0.3.1-alpha
+
+ junit
+ junit
+ 4.12
+ test
+
\ No newline at end of file