-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce protobuf serialization/deserialization support for transport requests/responses #9737
Introduce protobuf serialization/deserialization support for transport requests/responses #9737
Conversation
Compatibility status:Checks if related components are compatible with change e6885cb Incompatible componentsIncompatible components: [https://github.com/opensearch-project/asynchronous-search.git] Skipped componentsCompatible componentsCompatible components: [https://github.com/opensearch-project/security.git, https://github.com/opensearch-project/alerting.git, https://github.com/opensearch-project/index-management.git, https://github.com/opensearch-project/anomaly-detection.git, https://github.com/opensearch-project/sql.git, https://github.com/opensearch-project/job-scheduler.git, https://github.com/opensearch-project/observability.git, https://github.com/opensearch-project/common-utils.git, https://github.com/opensearch-project/k-nn.git, https://github.com/opensearch-project/reporting.git, https://github.com/opensearch-project/cross-cluster-replication.git, https://github.com/opensearch-project/security-analytics.git, https://github.com/opensearch-project/custom-codecs.git, https://github.com/opensearch-project/performance-analyzer.git, https://github.com/opensearch-project/ml-commons.git, https://github.com/opensearch-project/performance-analyzer-rca.git, https://github.com/opensearch-project/opensearch-oci-object-storage.git, https://github.com/opensearch-project/geospatial.git, https://github.com/opensearch-project/notifications.git, https://github.com/opensearch-project/neural-search.git] |
Gradle Check (Jenkins) Run Completed with:
|
I like it. A couple of ideas/comments.
|
@dbwiddis will have lots of opinions here ;) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work so far. Some comments/questions in-line.
/** | ||
* Implementers can be written to a {@linkplain StreamOutput} and read from a {@linkplain StreamInput}. This allows them to be "thrown | ||
* across the wire" using OpenSearch's internal protocol. If the implementer also implements equals and hashCode then a copy made by | ||
* serializing and deserializing must be equal and have the same hashCode. It isn't required that such a copy be entirely unchanged. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You missed swapping StreamOut(In)put to Out(In)putStream.
Also I'm really confused about the equals/hashCode bit (copied from Writeable
) in the context of a protobuf byte stream (see next comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is my bad! I will fix the comments.
libs/core/src/main/java/org/opensearch/core/common/io/stream/ProtobufWriteable.java
Show resolved
Hide resolved
@@ -70,6 +76,7 @@ public TaskId(String nodeId, long id) { | |||
private TaskId() { | |||
nodeId = ""; | |||
id = -1; | |||
taskIdProto = TaskIdProto.TaskId.newBuilder().setNodeId(nodeId).setId(id).build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes little sense to build a whole protobuf object to contain what we currently encode as a single 0x00
byte. We're assigning this to the EMPTY_TASK_ID
constant, can this object just be left null, and if we encounter a 0 byte when we get to this object, we don't need to deserialize a proto object.
Related, TaskId is one of several informational bits OpenSearch communicates that are separate from the writeable content generated by TransportRequest implementations. We also have thread context request and response headers, features, and the action name. It seems we lose a lot by serializing all of these individually vs. combining all "header" content in a single protobuf byte stream. If we are going to keep them as separate pieces, we probably should continue to keep the 1-byte representations they currently have when they are empty (e.g., empty map, empty list, empty string all "write" a single byte (Vint) 0).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See my above comment of how the outbound message will also be a proto message which will contain header, thread context request, response headers, features and action name within a single proto object.
} else { | ||
nodeId = ""; | ||
id = -1L; | ||
taskIdProto = TaskIdProto.TaskId.newBuilder().setNodeId(nodeId).setId(id).build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See above on serializing "empty" information vs. never even looking at this object.
* {@linkplain byte[]} so we can return the {@link #EMPTY_TASK_ID} without allocating. | ||
*/ | ||
public static TaskId readFromBytes(byte[] in) throws IOException { | ||
TaskIdProto.TaskId taskIdProto = TaskIdProto.TaskId.parseFrom(in); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This approach forces us to always parse a bytestream for the "most common" empty task ID.
Can we have a single byte boolean (basically the "OptionalWriteable" implementation) here which directly shortcuts to EMPTY_TASK_ID
instead of always deserializing some number of bytes > 1?
@@ -116,6 +142,11 @@ public void writeTo(StreamOutput out) throws IOException { | |||
out.writeLong(id); | |||
} | |||
|
|||
@Override | |||
public void writeTo(OutputStream out) throws IOException { | |||
out.write(this.taskIdProto.toByteArray()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May want to consider special-casing the empty-node-id = 0 case here if you implement it per my earlier comments.
|
||
message TaskId { | ||
string nodeId = 1; | ||
int64 id = 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you choose to serialize the task ID instead of using the "0" byte, consider making these optional
parts of the message with the defaults from the empty string/-1, to accomplish the same behavior but permit serializing "nothing" rather than a string and int64 (I'm curious what a "nothing" byte stream looks like.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense! We can make the nodeId
optional in the proto message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can make the nodeId optional in the proto message.
Note this is an alternative to the "0" shortcut which I think is a better option. Is it allowed to have all the values optional? What is the byte size of a protobuffer with a missing nodeId and a single -1 long, which would be the constant we'd store and use as an alternative to "0"?
Thank you @dbwiddis! I have added a few responses, LMK what you think. I will keep refining the PR :) |
Thank you @dblock for your comments!
LMK what you think :) |
Gradle Check (Jenkins) Run Completed with:
|
7f6a1b6
to
fb1edf8
Compare
Compatibility status:Checks if related components are compatible with change 7f6a1b6 Incompatible componentsIncompatible components: [https://github.com/opensearch-project/security.git, https://github.com/opensearch-project/security-analytics.git, https://github.com/opensearch-project/asynchronous-search.git] Skipped componentsCompatible componentsCompatible components: [https://github.com/opensearch-project/geospatial.git, https://github.com/opensearch-project/notifications.git, https://github.com/opensearch-project/neural-search.git, https://github.com/opensearch-project/index-management.git, https://github.com/opensearch-project/sql.git, https://github.com/opensearch-project/observability.git, https://github.com/opensearch-project/job-scheduler.git, https://github.com/opensearch-project/opensearch-oci-object-storage.git, https://github.com/opensearch-project/k-nn.git, https://github.com/opensearch-project/cross-cluster-replication.git, https://github.com/opensearch-project/alerting.git, https://github.com/opensearch-project/anomaly-detection.git, https://github.com/opensearch-project/ml-commons.git, https://github.com/opensearch-project/performance-analyzer.git, https://github.com/opensearch-project/performance-analyzer-rca.git, https://github.com/opensearch-project/common-utils.git, https://github.com/opensearch-project/reporting.git] |
Compatibility status:Checks if related components are compatible with change fb1edf8 Incompatible componentsIncompatible components: [https://github.com/opensearch-project/security.git, https://github.com/opensearch-project/asynchronous-search.git, https://github.com/opensearch-project/security-analytics.git] Skipped componentsCompatible componentsCompatible components: [https://github.com/opensearch-project/alerting.git, https://github.com/opensearch-project/index-management.git, https://github.com/opensearch-project/sql.git, https://github.com/opensearch-project/anomaly-detection.git, https://github.com/opensearch-project/job-scheduler.git, https://github.com/opensearch-project/observability.git, https://github.com/opensearch-project/common-utils.git, https://github.com/opensearch-project/k-nn.git, https://github.com/opensearch-project/reporting.git, https://github.com/opensearch-project/cross-cluster-replication.git, https://github.com/opensearch-project/geospatial.git, https://github.com/opensearch-project/performance-analyzer.git, https://github.com/opensearch-project/notifications.git, https://github.com/opensearch-project/ml-commons.git, https://github.com/opensearch-project/performance-analyzer-rca.git, https://github.com/opensearch-project/neural-search.git, https://github.com/opensearch-project/opensearch-oci-object-storage.git] |
Gradle Check (Jenkins) Run Completed with:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the clarifications. Down to some summary comments.
libs/core/src/main/java/org/opensearch/core/common/io/stream/ProtobufWriteable.java
Show resolved
Hide resolved
|
||
message TaskId { | ||
string nodeId = 1; | ||
int64 id = 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can make the nodeId optional in the proto message.
Note this is an alternative to the "0" shortcut which I think is a better option. Is it allowed to have all the values optional? What is the byte size of a protobuffer with a missing nodeId and a single -1 long, which would be the constant we'd store and use as an alternative to "0"?
Signed-off-by: Vacha Shah <[email protected]>
Signed-off-by: Vacha Shah <[email protected]>
…ments Signed-off-by: Vacha Shah <[email protected]>
Signed-off-by: Vacha Shah <[email protected]>
Signed-off-by: Vacha Shah <[email protected]>
fb1edf8
to
e6885cb
Compare
@dbwiddis I updated the code to represent the empty taskId using an empty byte array for protobuf taskId as discussed. Let me know what you think. |
Gradle Check (Jenkins) Run Completed with:
|
Codecov Report
@@ Coverage Diff @@
## main #9737 +/- ##
============================================
+ Coverage 71.10% 71.16% +0.06%
- Complexity 58070 58136 +66
============================================
Files 4824 4824
Lines 273918 273949 +31
Branches 39918 39919 +1
============================================
+ Hits 194768 194963 +195
+ Misses 62802 62660 -142
+ Partials 16348 16326 -22
|
*/ | ||
public static TaskId readFromBytes(byte[] in) throws IOException { | ||
TaskIdProto.TaskId taskIdProto = TaskIdProto.TaskId.parseFrom(in); | ||
if (!taskIdProto.hasNodeId()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you help me understand this line of code as I don't see where it's defined in Java, so it may be a generated class.
My thought was that we would read/write a byte array with a VInt length before it, and the array itself gets parsed with protobuf. If the length is 0 then we skip reading anything and return empty.
If the entire TaskId was optional (see optionalWriteable) this would essentially be the same thing.
But this looks like we are still reading some number of bytes and just determining whether the nodeId index is present; we'd still have 2 bytes for the int even if 0 bytes for the string.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The hasNodeId
method comes from the generated class from protobuf which checks if the optional field nodeId is set or not. For line 132, I kept the same logic as readFromStream
where if the nodeId is present, it creates a TaskId out of it else it just returns an empty taskId.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think from what I understand, we would be able to achieve the same with keeping both nodeId and id both are optional in the proto message, so similar to the readFromStream
where it reads the string first as the nodeId and determines if empty taskId should be returned. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems reasonable. Sorry to be so into the details here, but my main concern has been replacing a single-byte solution with anything more complex. If making both nodeId and id optional results in a 1-byte buffer that works great.
I'm still not clear on how protobuf communicates the total bytes, though. Looking at #6960 I see the pattern parseFrom(in.readByteArray())
which implies a length VInt followed by that number of bytes.
I don't see any equivalent here, or how if everything is optional it provides any signal to the reader that there's nothing to read. Does protobuf assume reading to end-of-stream? In that case, reading TaskId would spill over into the TransportMessage. Or is there an end-of-stream marker?
TLDR: the savings from fewer bytes on the variable length long need to outweigh the extra bytes for field indices and length bytes and any other overhead to make serializing TaskId worth it, and I'm not sure of those numbers here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Totally makes sense! I was thinking about this and we are only writing the bytes when the nodeId is present (https://github.com/opensearch-project/OpenSearch/pull/9737/files#diff-440ec1fd7eb50816eb2d1ec1eb201229b810efcf1939a9953afb0025ffca2f32R154) so I am thinking when we read the bytes, the byte array will have both nodeId and id or nothing at all. Would that not be similar to the current implementation? I might be totally misunderstanding it so would love to know your thoughts.
For a proto message structure, protobuf can deserialize it without any problem. I get your concern that currently only TaskId is the proto message here while TransportMessage is not but that implementation will come in the upcoming PRs where ClusterStateRequest (for example) which is a type of TransportMessage will also be a proto and all of them will be part of a proto message as the OutboundMessage which is sent over the wire. So, currently TaskId looks like a separate proto in the middle of a stream but ultimately it will be part of the OutboundMessageProto with all the bytes and protobuf will deserialize it into various parts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Totally makes sense! I was thinking about this and we are only writing the bytes when the nodeId is present (https://github.com/opensearch-project/OpenSearch/pull/9737/files#diff-440ec1fd7eb50816eb2d1ec1eb201229b810efcf1939a9953afb0025ffca2f32R154) so I am thinking when we read the bytes, the byte array will have both nodeId and id or nothing at all. Would that not be similar to the current implementation? I might be totally misunderstanding it so would love to know your thoughts.
The issue we're facing here here is that the TaskId is in the middle of the byte stream. StreamInput
needs to know where TaskID starts and ends so it can move on to read the bytes for the TransportRequest
.
Take for example one of the simplest TransportRequests, the initial TCP Handshake Request:
OpenSearch/server/src/main/java/org/opensearch/transport/TransportHandshaker.java
Lines 192 to 198 in 5b864c0
static final class HandshakeRequest extends TransportRequest { | |
private final Version version; | |
HandshakeRequest(Version version) { | |
this.version = version; | |
} |
Here's what the byte stream looks like (non-protobuf):
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 45 53 00 00 00 31 00 00 00 00 00 00 00 08 08 08 |ES...1..........|
|00000010| 20 0b 83 00 00 00 1a 00 00 00 16 69 6e 74 65 72 | ..........inter|
|00000020| 6e 61 6c 3a 74 63 70 2f 68 61 6e 64 73 68 61 6b |nal:tcp/handshak|
|00000030| 65 00 04 a3 8e b7 41 |e.....A |
+--------+-------------------------------------------------+----------------+
The first 49 bytes (all the way up to the 65
(e) starting the last row) are all part of the OutboundMessageRequest
header. So we are receiving the last six bytes that we need to parse into a TransportMessage
.
The last 5 bytes (04 a3 8e b7 41
) are the Writeable
payload for that HandshakeRequest
class linked above (see readBytesReference()
, it's a length byte followed by the 4 bytes encoding the version).
That single 00
byte in between the headers and payload is the Task ID. It's the length of the ID string (0) which shortcuts the 8-byte id. This is good; when writing if it's "Empty" we just write a 0 byte; when reading when we see a 0 byte we just skip it.
If you "write nothing" how will Protobuf know not to parse it? There has to be at lease some way to indicate where the TaskID protobuf bytes start and end before we go on to read those last bytes encoding the message.
By the time you get to where hasNodeId()
works, you've already tried to parse the message into a protobuf, but you don't know where it ends and the "version" in this handshake begins.
I think you need to do something similar to this implementation:
OpenSearch/server/src/main/java/org/opensearch/extensions/rest/RegisterRestActionsRequest.java
Lines 38 to 47 in 5b864c0
public RegisterRestActionsRequest(StreamInput in) throws IOException { | |
super(in); | |
request = RegisterRestActions.parseFrom(in.readByteArray()); | |
} | |
@Override | |
public void writeTo(StreamOutput out) throws IOException { | |
super.writeTo(out); | |
out.writeByteArray(request.toByteArray()); | |
} |
This reads/writes the protobuf bytes wrapped inside a ByteArray
. You can read the byte array and if it is StreamInput.EMPTY_BYTE_ARRAY
then you can just return the EMPTY_TASK_ID
. Otherwise, you take the byte array and call parseFrom()
on it.
This preserves the 1-byte "empty" behavior we currently have and gives us a slightly shorter (due to variable length task id long) set of bytes (plus the length byte) in the case that we actually have a Task ID.
* | ||
* @opensearch.internal | ||
*/ | ||
public interface ProtobufWriteable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we have this writeable extend Writeable
interface?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @Bukhtawar. These changes should be least intrusive and should use existing interfaces. It would enable us to experiment with other protocols as well.
|
||
/** | ||
* Task id that consists of node id and id of the task on the node | ||
* | ||
* @opensearch.api | ||
*/ | ||
@PublicApi(since = "1.0.0") | ||
public final class TaskId implements Writeable { | ||
public final class TaskId implements Writeable, ProtobufWriteable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks confusing, why do we need two interfaces?
I discussed with Dan and I am closing this PR in favor of a new one with starting top down from the POC #9097 as it will make better sense as a PR and give proper context. We also discussed that we can leave TaskId alone for now and focus on the larger request/response classes to get immediate benefits. |
Description
This is the first PR for adding support for protobuf serialization/deserialization coming from the POC and draft PR #9097. The idea is to add incremental changes (bottom-up) for the support to use protobuf for cat nodes API. In this PR, the following changes are included:
ProtobufWriteable
- A version of Writeable where the reader reads from a byte array and writer writes to a java OutputStream. This is needed to serialize and deserialize to and from proto messages. There will be other changes in the OutboundHandler and InboundHandler to transfer bytes related to proto messages over the wire.TransportRequest
andTransportResponse
support ProtobufWriteable.TaskId
is converted into a proto message.TaskIdTests
are moved tolibs/core
module sinceTaskId
has been moved there.Related Issues
#6844
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.