Skip to content

Commit

Permalink
Merge pull request #26 from monocle2ai/kshitiz/added_linting
Browse files Browse the repository at this point in the history
Added 100% linting
  • Loading branch information
kshitiz-okahu authored Aug 12, 2024
2 parents f766fe9 + 1104834 commit ed593f9
Show file tree
Hide file tree
Showing 26 changed files with 138 additions and 135 deletions.
33 changes: 33 additions & 0 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@


name: lint

on:
push:
branches: [ "main" ]
pull_request:
# The branches below must be a subset of the branches above
branches: [ "main" ]
schedule:
- cron: '18 8 * * 0'

jobs:
tox:
name: Run pylint
runs-on: ubuntu-latest
permissions:
contents: read
security-events: write
actions: read # only required for a private repository by github/codeql-action/upload-sarif to get the Action run status
steps:
- name: Checkout code
uses: actions/checkout@v4

- name: Install pylint
run: |
pip install .
pip install pylint
- name: Run pylint tests
run: pylint --fail-under=10 src
continue-on-error: true
3 changes: 3 additions & 0 deletions .pylintrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[MASTER]

disable=unused-argument,too-many-arguments,missing-function-docstring,line-too-long,missing-module-docstring,too-many-instance-attributes,broad-exception-caught,missing-class-docstring,bare-except,logging-fstring-interpolation
4 changes: 2 additions & 2 deletions Monocle_User_Guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,13 @@ setup_monocle_telemetry(
wrapper_methods=[
WrapperMethod(
package="langchain.schema.runnable",
object="RunnableParallel",
object_name="RunnableParallel",
method="invoke",
span_name="langchain.workflow",
wrapper=task_wrapper),
WrapperMethod(
package="langchain.schema.runnable",
object="RunnableParallel",
object_name="RunnableParallel",
method="ainvoke",
span_name="langchain.workflow",
wrapper=atask_wrapper)
Expand Down
4 changes: 2 additions & 2 deletions src/monocle_apptrace/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,13 @@ setup_monocle_telemetry(
wrapper_methods=[
WrapperMethod(
package="langchain.schema.runnable",
object="RunnableParallel",
object_name="RunnableParallel",
method="invoke",
span_name="langchain.workflow",
wrapper=task_wrapper),
WrapperMethod(
package="langchain.schema.runnable",
object="RunnableParallel",
object_name="RunnableParallel",
method="ainvoke",
span_name="langchain.workflow",
wrapper=atask_wrapper)
Expand Down
2 changes: 0 additions & 2 deletions src/monocle_apptrace/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +0,0 @@


17 changes: 10 additions & 7 deletions src/monocle_apptrace/exporters/file_exporter.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
#pylint: disable=consider-using-with

from os import linesep, path
from io import TextIOWrapper
from datetime import datetime
from typing import Optional, Callable, Sequence
from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.resources import SERVICE_NAME

DEFAULT_FILE_PREFIX:str = "monocle_trace_"
DEFAULT_TIME_FORMAT:str = "%Y-%m-%d_%H.%M.%S"

class FileSpanExporter(SpanExporter):
DEFAULT_FILE_PREFIX:str = "monocle_trace_"
DEFAULT_TIME_FORMAT:str = "%Y-%m-%d_%H.%M.%S"
current_trace_id: int = None
current_file_path: str = None

Expand Down Expand Up @@ -44,17 +47,17 @@ def rotate_file(self, trace_name:str, trace_id:int) -> None:
self.current_file_path = path.join(self.output_path,
self.file_prefix + trace_name + "_" + hex(trace_id) + "_"
+ datetime.now().strftime(self.time_format) + ".json")
self.out_handle = open(self.current_file_path, "w")
self.out_handle = open(self.current_file_path, "w", encoding='UTF-8')
self.current_trace_id = trace_id

def force_flush(self, timeout_millis: int = 30000) -> bool:
self.out_handle.flush()
return True

def reset_handle(self) -> None:
if self.out_handle != None:
if self.out_handle is not None:
self.out_handle.close()
self.out_handle = None

def shutdown(self) -> None:
self.reset_handle()
self.reset_handle()
2 changes: 1 addition & 1 deletion src/monocle_apptrace/haystack/wrap_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def wrap(tracer, to_wrap, wrapped, instance, args, kwargs):
workflow_name = span.resource.attributes.get("service.name")
span.set_attribute("workflow_name",workflow_name)
span.set_attribute("workflow_type", WORKFLOW_TYPE_MAP["haystack"])

response = wrapped(*args, **kwargs)

return response
10 changes: 1 addition & 9 deletions src/monocle_apptrace/haystack/wrap_openai.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@


import logging
from opentelemetry import context as context_api
from opentelemetry.instrumentation.utils import _SUPPRESS_INSTRUMENTATION_KEY
Expand All @@ -17,9 +15,7 @@ def _set_input_attributes(span, kwargs, instance, args):

if 'model' in instance.__dict__:
model_name = instance.__dict__.get("model")
set_span_attribute(span, "openai_model_name", model_name)

return
set_span_attribute(span, "model_name", model_name)

@dont_throw
def _set_response_attributes(span, response):
Expand All @@ -39,14 +35,10 @@ def wrap_openai(tracer, to_wrap, wrapped, instance, args, kwargs):
with tracer.start_as_current_span("haystack.openai") as span:
if span.is_recording():
_set_input_attributes(span, kwargs, instance, args)



response = wrapped(*args, **kwargs)

if response:
if span.is_recording():
_set_response_attributes(span, response)


return response
15 changes: 6 additions & 9 deletions src/monocle_apptrace/haystack/wrap_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@


import logging
from opentelemetry import context as context_api
from opentelemetry.context import attach, set_value
Expand All @@ -18,13 +16,12 @@ def wrap(tracer, to_wrap, wrapped, instance, args, kwargs):
name = "haystack_pipeline"
attach(set_value("workflow_name", name))
inputs = set()
input = get_workflow_input(args, inputs)
workflow_input = get_workflow_input(args, inputs)

with tracer.start_as_current_span(f"{name}.workflow") as span:
span.set_attribute(PROMPT_INPUT_KEY, input)
span.set_attribute(PROMPT_INPUT_KEY, workflow_input)
workflow_name = span.resource.attributes.get("service.name")
set_workflow_attributes(span, workflow_name)

response = wrapped(*args, **kwargs)
set_workflow_output(span, response)
return response
Expand All @@ -37,12 +34,12 @@ def get_workflow_input(args, inputs):
for value in args[0].values():
for text in value.values():
inputs.add(text)
input: str = ""

workflow_input: str = ""

for input_str in inputs:
input = input + input_str
return input
workflow_input = workflow_input + input_str
return workflow_input

def set_workflow_attributes(span, workflow_name):
span.set_attribute("workflow_name",workflow_name)
Expand Down
51 changes: 23 additions & 28 deletions src/monocle_apptrace/instrumentor.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,26 @@
from opentelemetry.sdk.trace.export import BatchSpanProcessor, SpanProcessor
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry import trace
from opentelemetry.context import get_value, attach, set_value
from monocle_apptrace.wrap_common import CONTEXT_PROPERTIES_KEY
from monocle_apptrace.wrapper import INBUILT_METHODS_LIST, WrapperMethod
from monocle_apptrace.exporters.file_exporter import FileSpanExporter
from opentelemetry.context import get_value, attach, set_value


logger = logging.getLogger(__name__)

_instruments = ("langchain >= 0.0.346",)

class MonocleInstrumentor(BaseInstrumentor):

workflow_name: str = ""
user_wrapper_methods: list[WrapperMethod] = []
instrumented_method_list: list[object] = []

def __init__(
self,
user_wrapper_methods: list[WrapperMethod] = []) -> None:
self.user_wrapper_methods = user_wrapper_methods
user_wrapper_methods: list[WrapperMethod] = None) -> None:
self.user_wrapper_methods = user_wrapper_methods or []
super().__init__()

def instrumentation_dependencies(self) -> Collection[str]:
Expand Down Expand Up @@ -64,11 +64,11 @@ def _instrument(self, **kwargs):
self.instrumented_method_list.append(wrapped_method)
except Exception as ex:
if wrapped_method in user_method_list:
logger.error(f"""_instrument wrap Exception: {str(ex)}
logger.error(f"""_instrument wrap Exception: {str(ex)}
for package: {wrap_package},
object:{wrap_object},
method:{wrap_method}""")


def _uninstrument(self, **kwargs):
for wrapped_method in self.instrumented_method_list:
Expand All @@ -81,34 +81,33 @@ def _uninstrument(self, **kwargs):
wrap_method,
)
except Exception as ex:
logger.error(f"""_instrument unwrap Exception: {str(ex)}
logger.error(f"""_instrument unwrap Exception: {str(ex)}
for package: {wrap_package},
object:{wrap_object},
method:{wrap_method}""")


def setup_monocle_telemetry(
workflow_name: str,
span_processors: List[SpanProcessor] =
[BatchSpanProcessor(FileSpanExporter())],
wrapper_methods: List[WrapperMethod] = []):
span_processors: List[SpanProcessor] = None,
wrapper_methods: List[WrapperMethod] = None):
resource = Resource(attributes={
SERVICE_NAME: workflow_name
})
traceProvider = TracerProvider(resource=resource)
tracerProviderDefault = trace.get_tracer_provider()
providerType = type(tracerProviderDefault).__name__
isProxyProvider = "Proxy" in providerType
span_processors = span_processors or [BatchSpanProcessor(FileSpanExporter())]
trace_provider = TracerProvider(resource=resource)
tracer_provider_default = trace.get_tracer_provider()
provider_type = type(tracer_provider_default).__name__
is_proxy_provider = "Proxy" in provider_type
for processor in span_processors:
processor.on_start = on_processor_start
if not isProxyProvider:
tracerProviderDefault.add_span_processor(processor)
if not is_proxy_provider:
tracer_provider_default.add_span_processor(processor)
else :
traceProvider.add_span_processor(processor)
if isProxyProvider :
trace.set_tracer_provider(traceProvider)
instrumentor = MonocleInstrumentor(user_wrapper_methods=wrapper_methods)
instrumentor.app_name = workflow_name
trace_provider.add_span_processor(processor)
if is_proxy_provider :
trace.set_tracer_provider(trace_provider)
instrumentor = MonocleInstrumentor(user_wrapper_methods=wrapper_methods or [])
# instrumentor.app_name = workflow_name
if not instrumentor.is_instrumented_by_opentelemetry:
instrumentor.instrument()

Expand All @@ -119,11 +118,7 @@ def on_processor_start(span: Span, parent_context):
for key, value in context_properties.items():
span.set_attribute(
f"{CONTEXT_PROPERTIES_KEY}.{key}", value
)
)

def set_context_properties(properties: dict) -> None:
attach(set_value(CONTEXT_PROPERTIES_KEY, properties))




3 changes: 2 additions & 1 deletion src/monocle_apptrace/llamaindex/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@

#pylint: disable=protected-access
import os
from monocle_apptrace.utils import load_wrapper_from_config

def get_llm_span_name_for_openai(instance):
if (hasattr(instance, "_is_azure_client")
if (hasattr(instance, "_is_azure_client")
and callable(getattr(instance, "_is_azure_client"))
and instance._is_azure_client()):
return "llamaindex.azure_openai"
Expand Down
28 changes: 9 additions & 19 deletions src/monocle_apptrace/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,10 @@
import json
from importlib import import_module

logger = logging.getLogger(__name__)

class Config:
exception_logger = None

def set_span_attribute(span, name, value):
if value is not None:
if value != "":
span.set_attribute(name, value)
return


def dont_throw(func):
"""
Expand All @@ -23,15 +16,12 @@ def dont_throw(func):
"""
# Obtain a logger specific to the function's module
logger = logging.getLogger(func.__module__)

# pylint: disable=inconsistent-return-statements
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
logger.warning("Failed to execute %s, error: %s", func.__name__, str(e))
if Config.exception_logger:
Config.exception_logger(e)

except Exception as ex:
logger.warning("Failed to execute %s, error: %s", func.__name__, str(ex))
return wrapper

def with_tracer_wrapper(func):
Expand All @@ -45,17 +35,17 @@ def wrapper(wrapped, instance, args, kwargs):

return _with_tracer

def resolve_from_alias(map, alias):
def resolve_from_alias(my_map, alias):
"""Find a alias that is not none from list of aliases"""

for i in alias:
if i in map.keys():
return map[i]
if i in my_map.keys():
return my_map[i]
return None

def load_wrapper_from_config(config_file_path:str, module_name:str=None):
def load_wrapper_from_config(config_file_path: str, module_name: str = None):
wrapper_methods = []
with open(config_file_path) as config_file:
with open(config_file_path, encoding='UTF-8') as config_file:
json_data = json.load(config_file)
wrapper_methods = json_data["wrapper_methods"]
for wrapper_method in wrapper_methods:
Expand All @@ -69,4 +59,4 @@ def load_wrapper_from_config(config_file_path:str, module_name:str=None):

def get_wrapper_method(package_name: str, method_name: str):
wrapper_module = import_module("monocle_apptrace." + package_name)
return getattr(wrapper_module, method_name)
return getattr(wrapper_module, method_name)
Loading

0 comments on commit ed593f9

Please sign in to comment.