Skip to content

Commit

Permalink
Replace gogo protobuf with vtprotobuf (#81)
Browse files Browse the repository at this point in the history
* Replace gogo protobuf with vtprotobuf

    - Gogo protobuf is incompatible with grpc-go API/v2.
    - Replaced proto-gen-gogo-slick with proto-gen-go-grpc and
    proto-gen-go-vtproto plugins.
    - Modified python code generation script.

* commented Size asserts for vtprotobuf

* Custom codec with vtprotobuf helpers.

    - Third party libraries like pubsub require additional
      vtprotobuf helper functions to (un)marshall the messages
      and transmit on the wire as plain proto messages.

* removed Size asserts on presto column tests.
  • Loading branch information
tardunge authored Sep 17, 2021
1 parent 51560d2 commit b5523c5
Show file tree
Hide file tree
Showing 14 changed files with 8,030 additions and 8,908 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ testdata-*
.env
.envrc
logs/
*.venv
6 changes: 3 additions & 3 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{
"files.eol": "\n",
}
{
"files.eol": "\n",
}
8 changes: 4 additions & 4 deletions client/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

setuptools.setup(
name="TalariaClient",
version="0.0.5",
version="0.0.6",
author="Chun Rong Phang",
author_email="crphang@gmail.com",
description="Talaria Client to ingest events to TalariaDB",
Expand All @@ -14,8 +14,8 @@
"Operating System :: OS Independent",
],
install_requires=[
"grpcio~=1.28.1",
"protobuf~=3.11.3"
"grpcio>=1.36.0",
"protobuf~=3.17.3"
],
python_requires='>=3.4',
python_requires='>=3.6',
)
337 changes: 201 additions & 136 deletions client/python/talaria_client/talaria_pb2.py

Large diffs are not rendered by default.

299 changes: 192 additions & 107 deletions client/python/talaria_client/talaria_pb2_grpc.py
Original file line number Diff line number Diff line change
@@ -1,138 +1,223 @@
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc

from . import talaria_pb2 as talaria__pb2


class IngressStub(object):
"""---------------------------------------------------------------------------
Ingress Service
---------------------------------------------------------------------------
Ingress represents a Talaria ingress frontend.
"""
"""---------------------------------------------------------------------------
Ingress Service
---------------------------------------------------------------------------
Ingress represents a Talaria ingress frontend.
"""

def __init__(self, channel):
"""Constructor.
def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.Ingest = channel.unary_unary(
'/talaria.Ingress/Ingest',
request_serializer=talaria__pb2.IngestRequest.SerializeToString,
response_deserializer=talaria__pb2.IngestResponse.FromString,
)
Args:
channel: A grpc.Channel.
"""
self.Ingest = channel.unary_unary(
'/talaria.Ingress/Ingest',
request_serializer=talaria__pb2.IngestRequest.SerializeToString,
response_deserializer=talaria__pb2.IngestResponse.FromString,
)


class IngressServicer(object):
"""---------------------------------------------------------------------------
Ingress Service
---------------------------------------------------------------------------
Ingress represents a Talaria ingress frontend.
"""
"""---------------------------------------------------------------------------
Ingress Service
---------------------------------------------------------------------------
Ingress represents a Talaria ingress frontend.
"""

def Ingest(self, request, context):
# missing associated documentation comment in .proto file
pass
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def Ingest(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')


def add_IngressServicer_to_server(servicer, server):
rpc_method_handlers = {
'Ingest': grpc.unary_unary_rpc_method_handler(
servicer.Ingest,
request_deserializer=talaria__pb2.IngestRequest.FromString,
response_serializer=talaria__pb2.IngestResponse.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'talaria.Ingress', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))

rpc_method_handlers = {
'Ingest': grpc.unary_unary_rpc_method_handler(
servicer.Ingest,
request_deserializer=talaria__pb2.IngestRequest.FromString,
response_serializer=talaria__pb2.IngestResponse.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'talaria.Ingress', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))


# This class is part of an EXPERIMENTAL API.
class Ingress(object):
"""---------------------------------------------------------------------------
Ingress Service
---------------------------------------------------------------------------
Ingress represents a Talaria ingress frontend.
"""

class QueryStub(object):
"""---------------------------------------------------------------------------
Query Service
---------------------------------------------------------------------------
@staticmethod
def Ingest(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/talaria.Ingress/Ingest',
talaria__pb2.IngestRequest.SerializeToString,
talaria__pb2.IngestResponse.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)

Ingress represents a Talaria ingress frontend.
"""

def __init__(self, channel):
"""Constructor.
class QueryStub(object):
"""---------------------------------------------------------------------------
Query Service
---------------------------------------------------------------------------
Args:
channel: A grpc.Channel.
Ingress represents a Talaria ingress frontend.
"""
self.Describe = channel.unary_unary(
'/talaria.Query/Describe',
request_serializer=talaria__pb2.DescribeRequest.SerializeToString,
response_deserializer=talaria__pb2.DescribeResponse.FromString,
)
self.GetSplits = channel.unary_unary(
'/talaria.Query/GetSplits',
request_serializer=talaria__pb2.GetSplitsRequest.SerializeToString,
response_deserializer=talaria__pb2.GetSplitsResponse.FromString,
)
self.GetRows = channel.unary_unary(
'/talaria.Query/GetRows',
request_serializer=talaria__pb2.GetRowsRequest.SerializeToString,
response_deserializer=talaria__pb2.GetRowsResponse.FromString,
)

def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.Describe = channel.unary_unary(
'/talaria.Query/Describe',
request_serializer=talaria__pb2.DescribeRequest.SerializeToString,
response_deserializer=talaria__pb2.DescribeResponse.FromString,
)
self.GetSplits = channel.unary_unary(
'/talaria.Query/GetSplits',
request_serializer=talaria__pb2.GetSplitsRequest.SerializeToString,
response_deserializer=talaria__pb2.GetSplitsResponse.FromString,
)
self.GetRows = channel.unary_unary(
'/talaria.Query/GetRows',
request_serializer=talaria__pb2.GetRowsRequest.SerializeToString,
response_deserializer=talaria__pb2.GetRowsResponse.FromString,
)

class QueryServicer(object):
"""---------------------------------------------------------------------------
Query Service
---------------------------------------------------------------------------

Ingress represents a Talaria ingress frontend.
"""
class QueryServicer(object):
"""---------------------------------------------------------------------------
Query Service
---------------------------------------------------------------------------
def Describe(self, request, context):
"""Describe returns the list of schema/table combinations and the metadata
Ingress represents a Talaria ingress frontend.
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')

def GetSplits(self, request, context):
"""GetSplits returns the list of splits for a particular table/filter combination
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def Describe(self, request, context):
"""Describe returns the list of schema/table combinations and the metadata
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')

def GetRows(self, request, context):
"""GetRows returns the rows for a particular split
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def GetSplits(self, request, context):
"""GetSplits returns the list of splits for a particular table/filter combination
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')

def GetRows(self, request, context):
"""GetRows returns the rows for a particular split
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')


def add_QueryServicer_to_server(servicer, server):
rpc_method_handlers = {
'Describe': grpc.unary_unary_rpc_method_handler(
servicer.Describe,
request_deserializer=talaria__pb2.DescribeRequest.FromString,
response_serializer=talaria__pb2.DescribeResponse.SerializeToString,
),
'GetSplits': grpc.unary_unary_rpc_method_handler(
servicer.GetSplits,
request_deserializer=talaria__pb2.GetSplitsRequest.FromString,
response_serializer=talaria__pb2.GetSplitsResponse.SerializeToString,
),
'GetRows': grpc.unary_unary_rpc_method_handler(
servicer.GetRows,
request_deserializer=talaria__pb2.GetRowsRequest.FromString,
response_serializer=talaria__pb2.GetRowsResponse.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'talaria.Query', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))
rpc_method_handlers = {
'Describe': grpc.unary_unary_rpc_method_handler(
servicer.Describe,
request_deserializer=talaria__pb2.DescribeRequest.FromString,
response_serializer=talaria__pb2.DescribeResponse.SerializeToString,
),
'GetSplits': grpc.unary_unary_rpc_method_handler(
servicer.GetSplits,
request_deserializer=talaria__pb2.GetSplitsRequest.FromString,
response_serializer=talaria__pb2.GetSplitsResponse.SerializeToString,
),
'GetRows': grpc.unary_unary_rpc_method_handler(
servicer.GetRows,
request_deserializer=talaria__pb2.GetRowsRequest.FromString,
response_serializer=talaria__pb2.GetRowsResponse.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'talaria.Query', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))


# This class is part of an EXPERIMENTAL API.
class Query(object):
"""---------------------------------------------------------------------------
Query Service
---------------------------------------------------------------------------
Ingress represents a Talaria ingress frontend.
"""

@staticmethod
def Describe(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/talaria.Query/Describe',
talaria__pb2.DescribeRequest.SerializeToString,
talaria__pb2.DescribeResponse.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)

@staticmethod
def GetSplits(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/talaria.Query/GetSplits',
talaria__pb2.GetSplitsRequest.SerializeToString,
talaria__pb2.GetSplitsResponse.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)

@staticmethod
def GetRows(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/talaria.Query/GetRows',
talaria__pb2.GetRowsRequest.SerializeToString,
talaria__pb2.GetRowsResponse.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
Loading

0 comments on commit b5523c5

Please sign in to comment.