Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(HStore): bypass server for HStore writing (BETA) #588

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package org.apache.hugegraph.serializer.direct;

import org.apache.hugegraph.driver.HugeClient;
import org.apache.hugegraph.serializer.direct.struct.HugeType;
import org.apache.hugegraph.serializer.direct.util.BytesBuffer;
import org.apache.hugegraph.serializer.direct.util.GraphSchema;
import org.apache.hugegraph.serializer.direct.util.IdGenerator;
import org.apache.hugegraph.structure.GraphElement;
import org.apache.hugegraph.structure.graph.Edge;
import org.apache.hugegraph.structure.schema.PropertyKey;

import java.util.Map;

/**
* Directly implement BinarySerializer to generate point-edge Key-Value
*/
public class HStoreSerializer {

private HugeClient client;
private GraphSchema graphSchema;

public HStoreSerializer(HugeClient client) {
this.client = client;
this.graphSchema = new GraphSchema(client);
}

Check warning on line 42 in hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/HStoreSerializer.java

View check run for this annotation

Codecov / codecov/patch

hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/HStoreSerializer.java#L39-L42

Added lines #L39 - L42 were not covered by tests

public byte[] getKeyBytes(GraphElement e) {
byte[] array = null;

Check warning on line 45 in hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/HStoreSerializer.java

View check run for this annotation

Codecov / codecov/patch

hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/HStoreSerializer.java#L45

Added line #L45 was not covered by tests
if (e.type() == "vertex" && e.id() != null) {
BytesBuffer buffer = BytesBuffer.allocate(2 + 1 + e.id().toString().length());
buffer.writeId(IdGenerator.of(e.id()));
array = buffer.bytes();

Check warning on line 49 in hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/HStoreSerializer.java

View check run for this annotation

Codecov / codecov/patch

hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/HStoreSerializer.java#L47-L49

Added lines #L47 - L49 were not covered by tests
} else if (e.type() == "edge") {
BytesBuffer buffer = BytesBuffer.allocate(BytesBuffer.BUF_EDGE_ID);
Edge edge = (Edge)e;
buffer.writeId(IdGenerator.of(edge.sourceId()));
buffer.write(HugeType.EDGE_OUT.code());
buffer.writeId(IdGenerator.of(graphSchema.getEdgeLabel(e.label()).id()));
buffer.writeStringWithEnding("");
buffer.writeId(IdGenerator.of(edge.targetId()));
array = buffer.bytes();

Check warning on line 58 in hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/HStoreSerializer.java

View check run for this annotation

Codecov / codecov/patch

hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/HStoreSerializer.java#L51-L58

Added lines #L51 - L58 were not covered by tests
}
return array;

Check warning on line 60 in hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/HStoreSerializer.java

View check run for this annotation

Codecov / codecov/patch

hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/HStoreSerializer.java#L60

Added line #L60 was not covered by tests
}

public byte[] getKeyBytesSwitchDirection(GraphElement e) {
byte[] array = null;
BytesBuffer buffer = BytesBuffer.allocate(BytesBuffer.BUF_EDGE_ID);
Edge edge = (Edge)e;
buffer.writeId(IdGenerator.of(edge.targetId()));
buffer.write(HugeType.EDGE_IN.code());
buffer.writeId(IdGenerator.of(graphSchema.getEdgeLabel(e.label()).id()));
buffer.writeStringWithEnding("");
buffer.writeId(IdGenerator.of(edge.sourceId()));
array = buffer.bytes();
return array;

Check warning on line 73 in hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/HStoreSerializer.java

View check run for this annotation

Codecov / codecov/patch

hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/HStoreSerializer.java#L64-L73

Added lines #L64 - L73 were not covered by tests
}

public byte[] getValueBytes(GraphElement e) {
byte[] array = null;

Check warning on line 77 in hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/HStoreSerializer.java

View check run for this annotation

Codecov / codecov/patch

hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/HStoreSerializer.java#L77

Added line #L77 was not covered by tests
if (e.type() == "vertex") {
int propsCount = e.properties().size();
BytesBuffer buffer = BytesBuffer.allocate(8 + 16 * propsCount);
buffer.writeId(IdGenerator.of(graphSchema.getVertexLabel(e.label()).id()));
buffer.writeVInt(propsCount);

Check warning on line 82 in hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/HStoreSerializer.java

View check run for this annotation

Codecov / codecov/patch

hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/HStoreSerializer.java#L79-L82

Added lines #L79 - L82 were not covered by tests
for (Map.Entry<String, Object> entry : e.properties().entrySet()) {
PropertyKey propertyKey = graphSchema.getPropertyKey(entry.getKey());
buffer.writeVInt(propertyKey.id().intValue());
buffer.writeProperty(propertyKey.dataType(),entry.getValue());
}
array = buffer.bytes();

Check warning on line 88 in hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/HStoreSerializer.java

View check run for this annotation

Codecov / codecov/patch

hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/HStoreSerializer.java#L84-L88

Added lines #L84 - L88 were not covered by tests
} else if (e.type() == "edge") {
int propsCount = e.properties().size();
BytesBuffer buffer = BytesBuffer.allocate(4 + 16 * propsCount);
buffer.writeVInt(propsCount);

Check warning on line 92 in hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/HStoreSerializer.java

View check run for this annotation

Codecov / codecov/patch

hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/HStoreSerializer.java#L90-L92

Added lines #L90 - L92 were not covered by tests
for (Map.Entry<String, Object> entry : e.properties().entrySet()) {
PropertyKey propertyKey = graphSchema.getPropertyKey(entry.getKey());
buffer.writeVInt(propertyKey.id().intValue());
buffer.writeProperty(propertyKey.dataType(),entry.getValue());
}
array = buffer.bytes();

Check warning on line 98 in hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/HStoreSerializer.java

View check run for this annotation

Codecov / codecov/patch

hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/HStoreSerializer.java#L94-L98

Added lines #L94 - L98 were not covered by tests
}
return array;

Check warning on line 100 in hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/HStoreSerializer.java

View check run for this annotation

Codecov / codecov/patch

hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/HStoreSerializer.java#L100

Added line #L100 was not covered by tests
}

public byte[] getOwnerKeyBytes(GraphElement e) {
byte[] array = null;

Check warning on line 104 in hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/HStoreSerializer.java

View check run for this annotation

Codecov / codecov/patch

hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/HStoreSerializer.java#L104

Added line #L104 was not covered by tests
if (e.type() == "vertex" && e.id() != null) {
array = IdGenerator.of(e.id()).asBytes();

Check warning on line 106 in hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/HStoreSerializer.java

View check run for this annotation

Codecov / codecov/patch

hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/HStoreSerializer.java#L106

Added line #L106 was not covered by tests
} else if (e.type() == "edge") {
Edge edge = (Edge)e;
array = IdGenerator.of(edge.sourceId()).asBytes();

Check warning on line 109 in hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/HStoreSerializer.java

View check run for this annotation

Codecov / codecov/patch

hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/HStoreSerializer.java#L108-L109

Added lines #L108 - L109 were not covered by tests
}
return array;

Check warning on line 111 in hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/HStoreSerializer.java

View check run for this annotation

Codecov / codecov/patch

hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/HStoreSerializer.java#L111

Added line #L111 was not covered by tests
}

public void close() {
this.client.close();
}

Check warning on line 116 in hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/HStoreSerializer.java

View check run for this annotation

Codecov / codecov/patch

hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/HStoreSerializer.java#L115-L116

Added lines #L115 - L116 were not covered by tests

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@
import org.apache.hugegraph.driver.HugeClient;
import org.apache.hugegraph.driver.SchemaManager;
import org.apache.hugegraph.serializer.direct.HBaseSerializer;
import org.apache.hugegraph.serializer.direct.HStoreSerializer;
import org.apache.hugegraph.serializer.direct.RocksDBSerializer;
import org.apache.hugegraph.structure.graph.Edge;
import org.apache.hugegraph.structure.graph.Vertex;

/**
* This class is a demo for rocksdb/HBase put(rowkey, values) which use Client-Side's graph struct
* This class is a demo for Rocksdb/HBase/HStore put(rowkey, values) which use Client-Side's graph struct
* And we don't need to construct the graph element, just use it and transfer them to bytes array
* instead of json format
*/
Expand All @@ -39,7 +40,8 @@
static HugeClient client;
boolean bypassServer = true;
RocksDBSerializer ser;
HBaseSerializer hBaseSer;
HStoreSerializer hStoreSerializer;
HBaseSerializer HBaseSer;

public static void main(String[] args) {
BytesDemo ins = new BytesDemo();
Expand Down Expand Up @@ -96,7 +98,7 @@
.ifNotExist()
.create();

hBaseSer = new HBaseSerializer(client, vertexLogicPartitions, edgeLogicPartitions);
HBaseSer = new HBaseSerializer(client, vertexLogicPartitions, edgeLogicPartitions);

Check warning on line 101 in hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/reuse/BytesDemo.java

View check run for this annotation

Codecov / codecov/patch

hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/reuse/BytesDemo.java#L101

Added line #L101 was not covered by tests
writeGraphElements();

client.close();
Expand Down Expand Up @@ -146,14 +148,14 @@
* */
void writeDirectly(List<Vertex> vertices, List<Edge> edges) {
for (Vertex vertex : vertices) {
byte[] rowkey = hBaseSer.getKeyBytes(vertex);
byte[] values = hBaseSer.getValueBytes(vertex);
byte[] rowkey = HBaseSer.getKeyBytes(vertex);
byte[] values = HBaseSer.getValueBytes(vertex);

Check warning on line 152 in hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/reuse/BytesDemo.java

View check run for this annotation

Codecov / codecov/patch

hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/reuse/BytesDemo.java#L151-L152

Added lines #L151 - L152 were not covered by tests
sendRpcToHBase("vertex", rowkey, values);
}

for (Edge edge : edges) {
byte[] rowkey = hBaseSer.getKeyBytes(edge);
byte[] values = hBaseSer.getValueBytes(edge);
byte[] rowkey = HBaseSer.getKeyBytes(edge);
byte[] values = HBaseSer.getValueBytes(edge);

Check warning on line 158 in hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/reuse/BytesDemo.java

View check run for this annotation

Codecov / codecov/patch

hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/reuse/BytesDemo.java#L157-L158

Added lines #L157 - L158 were not covered by tests
sendRpcToHBase("edge", rowkey, values);
}
}
Expand Down Expand Up @@ -183,8 +185,18 @@
return flag;
}

boolean sendRpcToHStore(String type, byte[] rowkey, byte[] values) {
boolean flag = false;

Check warning on line 189 in hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/reuse/BytesDemo.java

View check run for this annotation

Codecov / codecov/patch

hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/reuse/BytesDemo.java#L189

Added line #L189 was not covered by tests
try {
flag = put(type, rowkey, values);
} catch (IOException e) {
e.printStackTrace();
}
return flag;

Check warning on line 195 in hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/reuse/BytesDemo.java

View check run for this annotation

Codecov / codecov/patch

hugegraph-client/src/main/java/org/apache/hugegraph/serializer/direct/reuse/BytesDemo.java#L191-L195

Added lines #L191 - L195 were not covered by tests
}

boolean put(String type, byte[] rowkey, byte[] values) throws IOException {
// TODO: put to HBase
// TODO: put to HBase/HStore
return true;
}
}
Loading