Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: refactor and add unit tests for AppendRowsStream and AppendRowsFuture #889

Merged
merged 1 commit into from
Feb 12, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
369 changes: 239 additions & 130 deletions tests/unit/test_writer_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import queue
import time
import unittest
from unittest import mock

from google.api_core import exceptions
Expand All @@ -27,134 +29,241 @@
REQUEST_TEMPLATE = gapic_types.AppendRowsRequest()


@pytest.fixture(scope="module")
def module_under_test():
from google.cloud.bigquery_storage_v1 import writer

return writer


def test_constructor_and_default_state(module_under_test):
mock_client = mock.create_autospec(big_query_write.BigQueryWriteClient)
manager = module_under_test.AppendRowsStream(mock_client, REQUEST_TEMPLATE)

# Public state
assert manager.is_active is False

# Private state
assert manager._client is mock_client


def test_close_before_open(module_under_test):
mock_client = mock.create_autospec(big_query_write.BigQueryWriteClient)
manager = module_under_test.AppendRowsStream(mock_client, REQUEST_TEMPLATE)
manager.close()
with pytest.raises(bqstorage_exceptions.StreamClosedError):
manager.send(object())


@mock.patch("google.api_core.bidi.BidiRpc", autospec=True)
@mock.patch("google.api_core.bidi.BackgroundConsumer", autospec=True)
def test_initial_send(background_consumer, bidi_rpc, module_under_test):
mock_client = mock.create_autospec(big_query_write.BigQueryWriteClient)
request_template = gapic_types.AppendRowsRequest(
write_stream="stream-name-from-REQUEST_TEMPLATE",
offset=0,
proto_rows=gapic_types.AppendRowsRequest.ProtoData(
writer_schema=gapic_types.ProtoSchema(
proto_descriptor=descriptor_pb2.DescriptorProto()
)
),
)
manager = module_under_test.AppendRowsStream(mock_client, request_template)
type(bidi_rpc.return_value).is_active = mock.PropertyMock(
return_value=(False, True)
)
proto_rows = gapic_types.ProtoRows()
proto_rows.serialized_rows.append(b"hello, world")
initial_request = gapic_types.AppendRowsRequest(
write_stream="this-is-a-stream-resource-path",
offset=42,
proto_rows=gapic_types.AppendRowsRequest.ProtoData(rows=proto_rows),
)

future = manager.send(initial_request)

assert isinstance(future, module_under_test.AppendRowsFuture)
background_consumer.assert_called_once_with(manager._rpc, manager._on_response)
background_consumer.return_value.start.assert_called_once()
assert manager._consumer == background_consumer.return_value

# Make sure the request template and the first request are merged as
# expected. Needs to be especially careful that nested properties such as
# writer_schema and rows are merged as expected.
expected_request = gapic_types.AppendRowsRequest(
write_stream="this-is-a-stream-resource-path",
offset=42,
proto_rows=gapic_types.AppendRowsRequest.ProtoData(
writer_schema=gapic_types.ProtoSchema(
proto_descriptor=descriptor_pb2.DescriptorProto()
class TestAppendRowsStream(unittest.TestCase):
@staticmethod
def _get_target_class():
from google.cloud.bigquery_storage_v1.writer import AppendRowsStream

return AppendRowsStream

@staticmethod
def _make_mock_client():
return mock.create_autospec(big_query_write.BigQueryWriteClient)

def _make_one(self, *args, **kw):
return self._get_target_class()(*args, **kw)

def test_ctor_defaults(self):
mock_client = self._make_mock_client()
stream = self._make_one(mock_client, REQUEST_TEMPLATE)

assert stream._client is mock_client
assert stream._inital_request_template is REQUEST_TEMPLATE
assert stream._closed is False
assert not stream._close_callbacks
assert isinstance(stream._futures_queue, queue.Queue)
assert stream._futures_queue.empty()
assert stream._metadata == ()
assert stream._rpc is None
assert stream._stream_name is None
assert stream._consumer is None

def test_is_active(self):
mock_client = self._make_mock_client()
stream = self._make_one(mock_client, REQUEST_TEMPLATE)

assert stream.is_active is False

with mock.patch("google.api_core.bidi.BackgroundConsumer") as MockConsumer:
MockConsumer.return_value.is_active = True
stream._consumer = MockConsumer()
assert stream.is_active is True

def test_add_close_callback(self):
mock_client = self._make_mock_client()
stream = self._make_one(mock_client, REQUEST_TEMPLATE)

assert not stream._close_callbacks

callbacks = [lambda x: x + i for i in range(3)]
for item in callbacks:
stream.add_close_callback(item)

assert stream._close_callbacks == callbacks

def test_close_before_open(self):
mock_client = self._make_mock_client()
manager = self._make_one(mock_client, REQUEST_TEMPLATE)
manager.close()
with pytest.raises(bqstorage_exceptions.StreamClosedError):
manager.send(object())

@mock.patch("google.api_core.bidi.BidiRpc", autospec=True)
@mock.patch("google.api_core.bidi.BackgroundConsumer", autospec=True)
def test_initial_send(self, background_consumer, bidi_rpc):
from google.cloud.bigquery_storage_v1.writer import AppendRowsFuture

mock_client = self._make_mock_client()
request_template = gapic_types.AppendRowsRequest(
write_stream="stream-name-from-REQUEST_TEMPLATE",
offset=0,
proto_rows=gapic_types.AppendRowsRequest.ProtoData(
writer_schema=gapic_types.ProtoSchema(
proto_descriptor=descriptor_pb2.DescriptorProto()
)
),
rows=proto_rows,
),
trace_id=f"python-writer:{package_version.__version__}",
)
bidi_rpc.assert_called_once_with(
start_rpc=mock_client.append_rows,
initial_request=expected_request,
# Extra header is required to route requests to the correct location.
metadata=(
("x-goog-request-params", "write_stream=this-is-a-stream-resource-path"),
),
)

bidi_rpc.return_value.add_done_callback.assert_called_once_with(
manager._on_rpc_done
)
assert manager._rpc == bidi_rpc.return_value

manager._consumer.is_active = True
assert manager.is_active is True


@mock.patch("google.api_core.bidi.BidiRpc", autospec=True)
@mock.patch("google.api_core.bidi.BackgroundConsumer", autospec=True)
def test_initial_send_with_timeout(background_consumer, bidi_rpc, module_under_test):
mock_client = mock.create_autospec(big_query_write.BigQueryWriteClient)
manager = module_under_test.AppendRowsStream(mock_client, REQUEST_TEMPLATE)
type(bidi_rpc.return_value).is_active = mock.PropertyMock(return_value=False)
type(background_consumer.return_value).is_active = mock.PropertyMock(
return_value=False
)
initial_request = gapic_types.AppendRowsRequest(
write_stream="this-is-a-stream-resource-path"
)
now = time.monotonic()
later = now + module_under_test._DEFAULT_TIMEOUT + 1
with mock.patch.object(module_under_test.time, "sleep"), mock.patch.object(
module_under_test.time, "monotonic", mock.MagicMock(side_effect=(now, later))
), pytest.raises(exceptions.Unknown):
manager.send(initial_request)


def test_future_done_false(module_under_test):
mock_client = mock.create_autospec(big_query_write.BigQueryWriteClient)
manager = module_under_test.AppendRowsStream(mock_client, REQUEST_TEMPLATE)
future = module_under_test.AppendRowsFuture(manager)
assert not future.done()


def test_future_done_true_with_result(module_under_test):
mock_client = mock.create_autospec(big_query_write.BigQueryWriteClient)
manager = module_under_test.AppendRowsStream(mock_client, REQUEST_TEMPLATE)
future = module_under_test.AppendRowsFuture(manager)
future.set_result(object())
assert future.done()


def test_future_done_true_with_exception(module_under_test):
mock_client = mock.create_autospec(big_query_write.BigQueryWriteClient)
manager = module_under_test.AppendRowsStream(mock_client, REQUEST_TEMPLATE)
future = module_under_test.AppendRowsFuture(manager)
future.set_exception(ValueError())
assert future.done()
)
manager = self._make_one(mock_client, request_template)
type(bidi_rpc.return_value).is_active = mock.PropertyMock(
return_value=(False, True)
)
proto_rows = gapic_types.ProtoRows()
proto_rows.serialized_rows.append(b"hello, world")
initial_request = gapic_types.AppendRowsRequest(
write_stream="this-is-a-stream-resource-path",
offset=42,
proto_rows=gapic_types.AppendRowsRequest.ProtoData(rows=proto_rows),
)

future = manager.send(initial_request)

assert isinstance(future, AppendRowsFuture)
background_consumer.assert_called_once_with(manager._rpc, manager._on_response)
background_consumer.return_value.start.assert_called_once()
assert manager._consumer == background_consumer.return_value

# Make sure the request template and the first request are merged as
# expected. Needs to be especially careful that nested properties such as
# writer_schema and rows are merged as expected.
expected_request = gapic_types.AppendRowsRequest(
write_stream="this-is-a-stream-resource-path",
offset=42,
proto_rows=gapic_types.AppendRowsRequest.ProtoData(
writer_schema=gapic_types.ProtoSchema(
proto_descriptor=descriptor_pb2.DescriptorProto()
),
rows=proto_rows,
),
trace_id=f"python-writer:{package_version.__version__}",
)
bidi_rpc.assert_called_once_with(
start_rpc=mock_client.append_rows,
initial_request=expected_request,
# Extra header is required to route requests to the correct location.
metadata=(
(
"x-goog-request-params",
"write_stream=this-is-a-stream-resource-path",
),
),
)

bidi_rpc.return_value.add_done_callback.assert_called_once_with(
manager._on_rpc_done
)
assert manager._rpc == bidi_rpc.return_value

manager._consumer.is_active = True
assert manager.is_active is True

@mock.patch("google.api_core.bidi.BidiRpc", autospec=True)
@mock.patch("google.api_core.bidi.BackgroundConsumer", autospec=True)
def test_initial_send_with_timeout(self, background_consumer, bidi_rpc):
from google.cloud.bigquery_storage_v1 import writer

mock_client = self._make_mock_client()
manager = self._make_one(mock_client, REQUEST_TEMPLATE)
type(bidi_rpc.return_value).is_active = mock.PropertyMock(return_value=False)
type(background_consumer.return_value).is_active = mock.PropertyMock(
return_value=False
)
initial_request = gapic_types.AppendRowsRequest(
write_stream="this-is-a-stream-resource-path"
)
now = time.monotonic()
later = now + writer._DEFAULT_TIMEOUT + 1
with mock.patch.object(writer.time, "sleep"), mock.patch.object(
writer.time, "monotonic", mock.MagicMock(side_effect=(now, later))
), pytest.raises(exceptions.Unknown):
manager.send(initial_request)

@mock.patch("google.api_core.bidi.BidiRpc", autospec=True)
@mock.patch("google.api_core.bidi.BackgroundConsumer", autospec=True)
def test_send(self, background_consumer, bidi_rpc):
mock_client = self._make_mock_client()
stream = self._make_one(mock_client, REQUEST_TEMPLATE)
stream._consumer = background_consumer
stream._rpc = bidi_rpc

type(background_consumer.return_value).is_active = mock.PropertyMock(
return_value=True
)
request = gapic_types.AppendRowsRequest(
write_stream="this-is-a-stream-resource-path"
)
stream.send(request)

bidi_rpc.send.assert_called_once_with(request)
assert stream._futures_queue.qsize() == 1

@mock.patch("google.api_core.bidi.BidiRpc", autospec=True)
@mock.patch("google.api_core.bidi.BackgroundConsumer", autospec=True)
def test_close(self, background_consumer, bidi_rpc):
from google.cloud.bigquery_storage_v1 import writer

type(background_consumer.return_value).is_active = mock.PropertyMock(
return_value=True
)
mock_client = self._make_mock_client()
stream = self._make_one(mock_client, REQUEST_TEMPLATE)
stream._consumer = background_consumer
stream._rpc = bidi_rpc
futures = [writer.AppendRowsFuture(stream) for _ in range(3)]
for f in futures:
stream._futures_queue.put(f)
stream._close_callbacks = [mock.Mock() for _ in range(3)]
close_exception = Exception("test exception")

assert stream._closed is False

stream.close(reason=close_exception)

assert stream._closed is True
assert stream._consumer is None
assert stream._futures_queue.empty() is True
background_consumer.stop.assert_called_once()
bidi_rpc.close.assert_called_once()
for f in futures:
assert f.done() is True
with pytest.raises(Exception, match="test exception"):
f.result()

for callback in stream._close_callbacks:
callback.assert_called_once_with(stream, close_exception)


class TestAppendRowsFuture(unittest.TestCase):
@staticmethod
def _get_target_class():
from google.cloud.bigquery_storage_v1.writer import AppendRowsFuture

return AppendRowsFuture

def _make_one(self, *args, **kw):
return self._get_target_class()(*args, **kw)

def test_future_done_false(self):
from google.cloud.bigquery_storage_v1.writer import AppendRowsStream

mock_client = mock.create_autospec(big_query_write.BigQueryWriteClient)
manager = AppendRowsStream(mock_client, REQUEST_TEMPLATE)
future = self._make_one(manager)
assert not future.done()

def test_future_done_true_with_result(self):
from google.cloud.bigquery_storage_v1.writer import AppendRowsStream

mock_client = mock.create_autospec(big_query_write.BigQueryWriteClient)
manager = AppendRowsStream(mock_client, REQUEST_TEMPLATE)
future = self._make_one(manager)
future.set_result(object())
assert future.done()

def test_future_done_true_with_exception(self):
from google.cloud.bigquery_storage_v1.writer import AppendRowsStream

mock_client = mock.create_autospec(big_query_write.BigQueryWriteClient)
manager = AppendRowsStream(mock_client, REQUEST_TEMPLATE)
future = self._make_one(manager)
future.set_exception(ValueError())
assert future.done()