Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BlobUploader utilities to enable handling of large data in instrumentation #3122

Draft
wants to merge 21 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
4341b2e
Start of BlobUploader code in Python client library.
michaelsafyan Dec 19, 2024
924cd37
Implement the adaptor, add comments.
michaelsafyan Dec 20, 2024
84fe250
Implement the GCS uploader.
michaelsafyan Dec 20, 2024
dba3aea
Merge branch 'open-telemetry:main' into blob_upload
michaelsafyan Jan 7, 2025
8cd6ce1
Merge branch 'open-telemetry:main' into blob_upload
michaelsafyan Jan 8, 2025
9906a13
Began adding tests.
michaelsafyan Jan 8, 2025
8a4362e
Upload current snapshot.
michaelsafyan Jan 8, 2025
1667374
Add dependencies.
michaelsafyan Jan 8, 2025
41b7eea
Add more tests and fix some of the code that wasn't working.
michaelsafyan Jan 9, 2025
2b51a15
Completed writing unit tests for functionality implemented so far.
michaelsafyan Jan 13, 2025
410099a
Merge branch 'open-telemetry:main' into blob_upload
michaelsafyan Jan 13, 2025
d147a79
Merge branch 'open-telemetry:main' into blob_upload
michaelsafyan Jan 14, 2025
0a3430e
Add license comments and documentation.
michaelsafyan Jan 14, 2025
587e61e
Remove redundant explicit inheritance from object per review comment.
michaelsafyan Jan 14, 2025
c25a6b8
Format with ruff.
michaelsafyan Jan 14, 2025
a7bb5f5
Address additional ruff checks that could not be automatically fixed.
michaelsafyan Jan 14, 2025
7f88a2b
Apply suggestions from code review
michaelsafyan Jan 21, 2025
2ce62f6
Merge branch 'open-telemetry:main' into blob_upload
michaelsafyan Feb 7, 2025
ef648de
Merge branch 'open-telemetry:main' into blob_upload
michaelsafyan Feb 11, 2025
6965a44
Merge branch 'open-telemetry:main' into blob_upload
michaelsafyan Feb 12, 2025
ab94250
Merge branch 'open-telemetry:main' into blob_upload
michaelsafyan Feb 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add more tests and fix some of the code that wasn't working.
  • Loading branch information
michaelsafyan committed Jan 9, 2025
commit 41b7eead616012e437085b764ad65b6746ef2ab2
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import base64
from typing import Dict, Optional
import json

from types import MappingProxyType as _frozendict
from typing import Mapping, Dict, Optional


class Blob(object):
Expand All @@ -17,7 +20,7 @@ def __init__(
self,
raw_bytes: bytes,
content_type: Optional[str] = None,
labels: Optional[Dict[str, str]] = None,
labels: Optional[Mapping[str, str]] = None,
):
"""Initialize the blob with an explicit set of properties.

Expand All @@ -26,12 +29,18 @@ def __init__(
content_type: the MIME type describing the type of data in the payload
labels: additional key/value data about the Blob
"""
self._raw_bytes = _raw_bytes
self._raw_bytes = raw_bytes
self._content_type = content_type
self._labels = labels or {}
self._labels = {}
if labels is not None:
if isinstance(labels, dict):
self._labels.update(labels)
else:
for k in labels:
self._labels[k] = labels[k]

@staticmethod
def from_data_uri(cls, uri: str, labels: Optional[dict] = None) -> "Blob":
def from_data_uri(uri: str, labels: Optional[dict] = None) -> "Blob":
Copy link
Contributor

@samuelcolvin samuelcolvin Jan 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this would be easier to extend if this was a classmethod that returned cls(raw_bytes, content_type=content_type, labels=labels).

Alternatively, if this class shouldn't be subclassed, it should be marked as final.

"""Instantiate a blob from a 'data:...' URI.

Args:
Expand Down Expand Up @@ -67,10 +76,7 @@ def from_data_uri(cls, uri: str, labels: Optional[dict] = None) -> "Blob":
assert remaining.startswith("base64,")
base64_len = len("base64,")
base64_encoded_content = remaining[base64_len:]
try:
raw_bytes = base64.standard_b64decode(base64_encoded_content)
except ValueError:
raw_bytes = base64.urlsafe_b64decode(base64_encoded_content)
raw_bytes = base64.b64decode(base64_encoded_content)
return Blob(raw_bytes, content_type=content_type, labels=labels)

@property
Expand All @@ -84,6 +90,23 @@ def content_type(self) -> Optional[str]:
return self._content_type

@property
def labels(self) -> Dict[str, str]:
def labels(self) -> Mapping[str, str]:
"""Returns the key/value metadata of this Blob."""
return _frozendict(self._labels)

def __eq__(self, o):
return (
(isinstance(o, Blob)) and
(self.raw_bytes == o.raw_bytes) and
(self.content_type == o.content_type) and
(self.labels == o.labels)
)

def __repr__(self):
params = [repr(self._raw_bytes)]
if self._content_type is not None:
params.append('content_type={}'.format(repr(self._content_type)))
if self._labels:
params.append('labels={}'.format(json.dumps(self._labels, sort_keys=True)))
params_string = ', '.join(params)
return 'Blob({})'.format(params_string)
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,6 @@ def from_buffer(self, raw_bytes, mime=True):

def detect_content_type(raw_bytes: bytes) -> str:
"""Attempts to infer the content type of the specified data."""
if not raw_bytes:
return 'application/octet-stream'
return _module.from_buffer(raw_bytes, mime=True)
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from opentelemetry.instrumentation._blobupload.api.blob_uploader import (
BlobUploader,
)
from opentelemetry.instrumentation._blobupload.api.constants import NOT_UPLOADED


_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -42,7 +44,7 @@ def get_blob_uploader(self, use_case: Optional[str]) -> BlobUploader:
if use_case:
use_case_formatted = use_case
_logger.warning(
"No BlobUploaderProvider configured; returning a no-op for use case {}".format(
"No BlobUploaderProvider configured; returning a no-op for use case \"{}\". Use 'set_blob_uploader_provider()' to configure.".format(
use_case_formatted
)
)
Expand All @@ -52,10 +54,12 @@ def get_blob_uploader(self, use_case: Optional[str]) -> BlobUploader:
_blob_uploader_provider = _DefaultBlobUploaderProvider()


def set_blob_uploader_provider(provider: BlobUploaderProvider):
def set_blob_uploader_provider(provider: BlobUploaderProvider) -> BlobUploaderProvider:
"""Allows configuring the behavior of 'get_blob_uploader."""
global _blob_uploader_provider
old_provider = _blob_uploader_provider
_blob_uploader_provider = provider
return old_provider


def get_blob_uploader(use_case: Optional[str] = None) -> BlobUploader:
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
from opentelemetry.instrumentation._blobupload.backend.google.gcs._gcs_impl import GcsBlobUploader

__all__ = [
GcsBlobUploader
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
"""Exposes API methods to callers from the package name."""

from opentelemetry.instrumentation._blobupload.utils.simple_blob_uploader_adaptor import blob_uploader_from_simple_blob_uploader
from opentelemetry.instrumentation._blobupload.utils.simple_blob_uploader import SimpleBlobUploader

__all__ = [
blob_uploader_from_simple_blob_uploader,
SimpleBlobUploader,
]
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
"""Defines a simple, synchronous interface for providing a backend implementation."""

import abc

class SimpleBlobUploader(ABC):
from opentelemetry.instrumentation._blobupload.api import Blob

class SimpleBlobUploader(abc.ABC):
"""Pure abstract base class of a backend implementation that is synchronous."""

@abstractmethod
def generate_destination_uri(self, blob: Blob) -> str:
@abc.abstractmethod
def generate_destination_uri(self, blob: Blob) -> str:
"""Generates a URI of where the blob will get written.

Args:
Expand All @@ -16,8 +19,8 @@ def generate_destination_uri(self, blob: Blob) -> str:
"""
raise NotImplementedError('SimpleBlobUploader.generate_destination_uri')

@abstractmethod
def upload_sync(self, uri: str, blob: Blob):
@abc.abstractmethod
def upload_sync(self, uri: str, blob: Blob):
"""Synchronously writes the blob to the specified destination URI.

Args:
Expand All @@ -28,4 +31,4 @@ def upload_sync(self, uri: str, blob: Blob):
Effects:
Attempts to upload/write the Blob to the specified destination URI.
"""
raise NotImplementedError('SimpleBlobUploader.upload_sync')
raise NotImplementedError('SimpleBlobUploader.upload_sync')
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
import atexit
import logging

from concurrent.futures import Executor, ProcessPoolExecutor
from typing import Optional
from concurrent.futures import Executor, ThreadPoolExecutor

from opentelemetry.instrumentation._blobupload.api import Blob
from opentelemetry.instrumentation._blobupload.api import BlobUploader
from opentelemetry.instrumentation._blobupload.api import detect_content_type
from opentelemetry.instrumentation._blobupload.api import (
Blob,
BlobUploader,
detect_content_type)
from opentelemetry.instrumentation._blobupload.utils.simple_blob_uploader import SimpleBlobUploader


_logger = logging.getLogger(__name__)


def _with_content_type(blob: Blob) -> Blob:
"""Returns a variant of the Blob with the content type auto-detected if needed."""
if blob.content_type is not None:
Expand All @@ -16,7 +22,7 @@ def _with_content_type(blob: Blob) -> Blob:
return Blob(blob.raw_bytes, content_type=content_type, labels=blob.labels)


def _UploadAction(object):
class _UploadAction(object):
"""Represents the work to be done in the background to upload a blob."""

def __init__(self, simple_uploader, uri, blob):
Expand All @@ -25,7 +31,11 @@ def __init__(self, simple_uploader, uri, blob):
self._blob = blob

def __call__(self):
self._simple_uploader.upload_sync(self._uri, self._blob)
_logger.debug('Uploading blob to "{}".'.format(self._uri))
try:
self._simple_uploader.upload_sync(self._uri, self._blob)
except:
_logger.error('Failed to upload blob to "{}".'.format(self._uri))


def _create_default_executor_no_cleanup():
Expand All @@ -37,14 +47,16 @@ def _create_default_executor_no_cleanup():
# It is because of this potential future enhancement, that we
# have moved this logic into a separate function despite it
# being currently logically quite simple.
return ProcessPoolExecutor()
_logger.debug('Creating thread pool executor')
return ThreadPoolExecutor()


def _create_default_executor():
"""Creates an executor and registers appropriate cleanup."""
result = _create_default_executor_no_cleanup()
def _cleanup():
result.shutdown()
_logger.debug('Registering cleanup for the pool')
atexit.register(_cleanup)
return result

Expand All @@ -58,7 +70,10 @@ def _get_or_create_default_executor():
"""Return or lazily instantiate a shared default executor."""
global _default_executor
if _default_executor is None:
_logger.debug('No existing executor found; creating one lazily.')
_default_executor = _create_default_executor()
else:
_logger.debug('Reusing existing executor.')
return _default_executor


Expand All @@ -79,6 +94,7 @@ def upload_async(self, blob: Blob) -> str:
return uri

def _do_in_background(self, action):
_logger.debug('Scheduling background upload.')
self._executor.submit(action)


Expand Down
108 changes: 108 additions & 0 deletions opentelemetry-instrumentation/tests/_blobupload/api/test_blob.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
#! /usr/bin/env python3

if __name__ == "__main__":
import sys
sys.path.append("../../../src")

import base64
import unittest


from opentelemetry.instrumentation._blobupload.api import Blob


class TestBlob(unittest.TestCase):

def test_construction_with_just_bytes(self):
data = 'some string'.encode()
blob = Blob(data)
self.assertEqual(blob.raw_bytes, data)
self.assertIsNone(blob.content_type)
self.assertIsNotNone(blob.labels)
self.assertEqual(len(blob.labels), 0)

def test_construction_with_bytes_and_content_type(self):
data = 'some string'.encode()
content_type = 'text/plain'
blob = Blob(data, content_type=content_type)
self.assertEqual(blob.raw_bytes, data)
self.assertEqual(blob.content_type, content_type)
self.assertIsNotNone(blob.labels)
self.assertEqual(len(blob.labels), 0)

def test_construction_with_bytes_and_labels(self):
data = 'some string'.encode()
labels = {'key1': 'value1', 'key2': 'value2'}
blob = Blob(data, labels=labels)
self.assertEqual(blob.raw_bytes, data)
self.assertIsNone(blob.content_type)
self.assert_labels_equal(blob.labels, labels)

def test_construction_with_all_fields(self):
data = 'some string'.encode()
content_type = 'text/plain'
labels = {'key1': 'value1', 'key2': 'value2'}
blob = Blob(data, content_type=content_type, labels=labels)
self.assertEqual(blob.raw_bytes, data)
self.assertEqual(blob.content_type, content_type)
self.assert_labels_equal(blob.labels, labels)

def test_from_data_uri_without_labels(self):
data = 'some string'.encode()
content_type = 'text/plain'
encoded_data = base64.b64encode(data).decode()
uri = 'data:{};base64,{}'.format(content_type, encoded_data)
blob = Blob.from_data_uri(uri)
self.assertEqual(blob.raw_bytes, data)
self.assertEqual(blob.content_type, content_type)
self.assertIsNotNone(blob.labels)
self.assertEqual(len(blob.labels), 0)

def test_from_data_uri_with_labels(self):
data = 'some string'.encode()
content_type = 'text/plain'
encoded_data = base64.b64encode(data).decode()
uri = 'data:{};base64,{}'.format(content_type, encoded_data)
labels = {'key1': 'value1', 'key2': 'value2'}
blob = Blob.from_data_uri(uri, labels=labels)
self.assertEqual(blob.raw_bytes, data)
self.assertEqual(blob.content_type, content_type)
self.assert_labels_equal(blob.labels, labels)

def test_from_data_uri_with_valid_standard_base64(self):
data = 'some string'.encode()
content_type = 'text/plain'
encoded_data = base64.standard_b64encode(data).decode()
uri = 'data:{};base64,{}'.format(content_type, encoded_data)
blob = Blob.from_data_uri(uri)
self.assertEqual(blob.raw_bytes, data)
self.assertEqual(blob.content_type, content_type)

def test_from_data_uri_with_valid_websafe_base64(self):
data = 'some string'.encode()
content_type = 'text/plain'
encoded_data = base64.urlsafe_b64encode(data).decode()
uri = 'data:{};base64,{}'.format(content_type, encoded_data)
blob = Blob.from_data_uri(uri)
self.assertEqual(blob.raw_bytes, data)
self.assertEqual(blob.content_type, content_type)

def test_from_data_uri_with_non_data_uri_content(self):
with self.assertRaisesRegex(ValueError, 'expected "data:" prefix'):
Blob.from_data_uri('not a valid data uri')

def test_from_data_uri_with_non_base64_content(self):
with self.assertRaisesRegex(ValueError, 'expected ";base64," section'):
Blob.from_data_uri('data:text/plain,validifpercentencoded')

def assert_labels_equal(self, a, b):
self.assertEqual(len(a), len(b), msg='Different sizes: {} vs {}; a={}, b={}'.format(len(a), len(b), a, b))
for k in a:
self.assertTrue(k in b, msg='Key {} found in a but not b'.format(k))
va = a[k]
vb = b[k]
self.assertEqual(va, vb, msg='Values for key {} different for a vs b: {} vs {}'.format(k, va, vb))


if __name__ == "__main__":
unittest.main()
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

if __name__ == "__main__":
import sys

sys.path.append("../../../src")

import io
Expand All @@ -25,6 +24,12 @@ def create_test_image(format):


class TestContentType(unittest.TestCase):

def test_handles_empty_correctly(self):
input = bytes()
output = detect_content_type(input)
self.assertEqual(output, "application/octet-stream")

def test_detects_plaintext(self):
input = "this is just regular text"
output = detect_content_type(input.encode())
Expand Down
Loading
Loading