Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

build okahu exporter and added test cases for okahu exporter #56

Merged
merged 5 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions src/monocle_apptrace/exporters/exporter_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from abc import ABC, abstractmethod
import logging
from typing import Callable

logger = logging.getLogger(__name__)

class ExportTaskProcessor(ABC):

@abstractmethod
def start(self):
return

@abstractmethod
def stop(self):
return

@abstractmethod
def queue_task(self, async_task: Callable[[Callable, any], any] = None, args: any = None):
return
117 changes: 117 additions & 0 deletions src/monocle_apptrace/exporters/okahu/okahu_exporter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import json
import logging
import os
from typing import Callable, Optional, Sequence
import requests
from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult, ConsoleSpanExporter
from requests.exceptions import ReadTimeout

from monocle_apptrace.exporters.exporter_processor import ExportTaskProcessor

REQUESTS_SUCCESS_STATUS_CODES = (200, 202)
OKAHU_PROD_INGEST_ENDPOINT = "https://ingest.okahu.co/api/v1/trace/ingest"

logger = logging.getLogger(__name__)


class OkahuSpanExporter(SpanExporter):
def __init__(
self,
endpoint: Optional[str] = None,
timeout: Optional[int] = None,
session: Optional[requests.Session] = None,
task_processor: ExportTaskProcessor = None
):
"""Okahu exporter."""
okahu_endpoint: str = os.environ.get("OKAHU_INGESTION_ENDPOINT", OKAHU_PROD_INGEST_ENDPOINT)
self.endpoint = endpoint or okahu_endpoint
api_key: str = os.environ.get("OKAHU_API_KEY")
self._closed = False
if not api_key:
logger.warning("OKAHU_API_KEY not set. Using ConsoleSpanExporter instead.")
self.exporter = ConsoleSpanExporter()
return
self.timeout = timeout or 15
self.session = session or requests.Session()
self.session.headers.update(
{"Content-Type": "application/json", "x-api-key": api_key}
)

self.task_processor = task_processor or None
if task_processor is not None:
task_processor.start()

def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
# After the call to Shutdown subsequent calls to Export are
# not allowed and should return a Failure result
if not hasattr(self, 'session'):
return self.exporter.export(spans)

if self._closed:
logger.warning("Exporter already shutdown, ignoring batch")
return SpanExportResult.FAILUREencoder
if len(spans) == 0:
return

span_list = {
"batch": []
}

# append the batch object with all the spans object
for span in spans:
# create a object from serialized span
obj = json.loads(span.to_json())
if obj["parent_id"] is None:
obj["parent_id"] = "None"
else:
obj["parent_id"] = remove_0x_from_start(obj["parent_id"])
if obj["context"] is not None:
obj["context"]["trace_id"] = remove_0x_from_start(obj["context"]["trace_id"])
obj["context"]["span_id"] = remove_0x_from_start(obj["context"]["span_id"])
span_list["batch"].append(obj)

def send_spans_to_okahu(span_list_local=None):
try:
result = self.session.post(
url=self.endpoint,
data=json.dumps(span_list_local),
timeout=self.timeout,
)
if result.status_code not in REQUESTS_SUCCESS_STATUS_CODES:
logger.error(
"Traces cannot be uploaded; status code: %s, message %s",
result.status_code,
result.text,
)
return SpanExportResult.FAILURE
logger.warning("spans successfully exported to okahu")
return SpanExportResult.SUCCESS
except ReadTimeout as e:
logger.warning("Trace export timed out: %s", str(e))
return SpanExportResult.FAILURE

# if async task function is present, then push the request to asnc task

if self.task_processor is not None and callable(self.task_processor.queue_task):
self.task_processor.queue_task(send_spans_to_okahu, span_list)
return SpanExportResult.SUCCESS
return send_spans_to_okahu(span_list)

def shutdown(self) -> None:
if self._closed:
logger.warning("Exporter already shutdown, ignoring call")
return
if hasattr(self, 'session'):
self.session.close()
self._closed = True

def force_flush(self, timeout_millis: int = 30000) -> bool:
return True


# only removes the first occurrence of 0x from the string
def remove_0x_from_start(my_str: str):
if my_str.startswith("0x"):
return my_str.replace("0x", "", 1)
return my_str
90 changes: 90 additions & 0 deletions tests/okahu_exporter_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import unittest
from unittest.mock import patch, MagicMock
from monocle_apptrace.exporters.okahu.okahu_exporter import OkahuSpanExporter
from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SpanExportResult
from requests.exceptions import ReadTimeout
from opentelemetry.sdk.trace import ReadableSpan
import json
class TestOkahuSpanExporter(unittest.TestCase):

@patch.dict('os.environ', {}, clear=True)
def test_default_to_console_exporter(self):
"""Test that it defaults to ConsoleSpanExporter when no API key is set."""
exporter = OkahuSpanExporter()
self.assertIsInstance(exporter.exporter, ConsoleSpanExporter)
self.assertEqual(exporter.endpoint, "https://ingest.okahu.co/api/v1/trace/ingest")

@patch.dict('os.environ', {'OKAHU_API_KEY': 'test-api-key'})
@patch('monocle_apptrace.exporters.okahu.exporter.requests.Session')
def test_okahu_exporter_with_api_key(self, mock_session):
"""Test that OkahuSpanExporter is used when an API key is set."""
mock_session_instance = MagicMock()
mock_session.return_value = mock_session_instance
mock_post = mock_session_instance.post
mock_post.return_value.status_code = 200

mock_span = MagicMock(spec=ReadableSpan)
mock_span.to_json.return_value = json.dumps({
"parent_id": "0x123456",
"context": {
"trace_id": "0xabcdef",
"span_id": "0x654321"
}
})
spans = [mock_span]
exporter = OkahuSpanExporter()
exporter.export(spans)
mock_post.assert_called_once()

@patch.dict('os.environ', {'OKAHU_API_KEY': 'test-api-key'})
@patch('monocle_apptrace.exporters.okahu.exporter.requests.Session')
def test_export_success(self, mock_session):
mock_session_instance = MagicMock()
mock_session.return_value = mock_session_instance
mock_session_instance.post.return_value.status_code = 200
exporter = OkahuSpanExporter()
mock_span = MagicMock()
mock_span.to_json.return_value = '{"parent_id": null, "context": {"trace_id": "0x123", "span_id": "0x456"}}'

result = exporter.export([mock_span])
self.assertEqual(result, SpanExportResult.SUCCESS)

mock_session_instance.post.assert_called_once_with(
url=exporter.endpoint,
data='{"batch": [{"parent_id": "None", "context": {"trace_id": "123", "span_id": "456"}}]}',
timeout=15
)

@patch.dict('os.environ', {'OKAHU_API_KEY': 'test-api-key'})
@patch('monocle_apptrace.exporters.okahu.exporter.requests.Session')
def test_export_failure(self, mock_session):
"""Test exporting spans with an error response from Okahu."""
mock_session_instance = MagicMock()
mock_session.return_value = mock_session_instance
mock_session_instance.post.return_value.status_code = 500

exporter = OkahuSpanExporter()
mock_span = MagicMock()
mock_span.to_json.return_value = '{"parent_id": null, "context": {"trace_id": "0x123", "span_id": "0x456"}}'

result = exporter.export([mock_span])
self.assertEqual(result, SpanExportResult.FAILURE)

@patch.dict('os.environ', {'OKAHU_API_KEY': 'test-api-key'})
@patch('monocle_apptrace.exporters.okahu.exporter.requests.Session')
def test_export_timeout(self, mock_session):
"""Test exporting spans with a timeout."""
mock_session_instance = MagicMock()
mock_session.return_value = mock_session_instance
mock_session_instance.post.side_effect = ReadTimeout

exporter = OkahuSpanExporter()
mock_span = MagicMock()
mock_span.to_json.return_value = '{"parent_id": null, "context": {"trace_id": "0x123", "span_id": "0x456"}}'

result = exporter.export([mock_span])
self.assertEqual(result, SpanExportResult.FAILURE)


if __name__ == '__main__':
unittest.main()
Loading