Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update inference span for botocore sagemaker #93

Merged
merged 2 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/monocle_apptrace/botocore/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import os
from monocle_apptrace.utils import get_wrapper_methods_config

parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
BOTOCORE_METHODS = get_wrapper_methods_config(
wrapper_methods_config_path=os.path.join(parent_dir, 'metamodel', 'maps', 'botocore_methods.json'),
attributes_config_base_path=os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))


Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
{
"type": "inference",
"attributes": [
[
{
"_comment": "provider type , inference_endpoint",
"attribute": "type",
"accessor": "lambda arguments:'inference.aws_sagemaker'"
},
{
"attribute": "inference_endpoint",
"accessor": "lambda arguments: resolve_from_alias(arguments['instance'].__dict__, ['azure_endpoint', 'api_base']) or arguments['instance'].meta.endpoint_url"
}
],
[
{
"_comment": "LLM Model",
"attribute": "name",
"accessor": "lambda arguments: resolve_from_alias(arguments['instance'].__dict__, ['model', 'model_name']) or arguments['kwargs'].get('EndpointName', '')"
},
{
"attribute": "type",
"accessor": "lambda arguments: 'model.llm.' + (resolve_from_alias(arguments['instance'].__dict__, ['model', 'model_name']) or arguments['kwargs'].get('EndpointName', ''))"
}
]
]
}
13 changes: 13 additions & 0 deletions src/monocle_apptrace/metamodel/maps/botocore_methods.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
"wrapper_methods": [
{
"package": "botocore.client",
"object": "ClientCreator",
"method": "create_client",
"wrapper_package": "wrap_common",
"wrapper_method": "task_wrapper",
"skip_span": true,
"output_processor": ["metamodel/maps/attributes/inference/botocore_entities.json"]
}
]
}
35 changes: 26 additions & 9 deletions src/monocle_apptrace/wrap_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from monocle_apptrace.utils import set_attribute, get_vectorstore_deployment
from monocle_apptrace.utils import get_fully_qualified_class_name, get_nested_value
from monocle_apptrace.message_processing import extract_messages, extract_assistant_message
from functools import wraps

logger = logging.getLogger(__name__)
WORKFLOW_TYPE_KEY = "workflow_type"
Expand Down Expand Up @@ -69,6 +70,10 @@ def task_wrapper(tracer: Tracer, to_wrap, wrapped, instance, args, kwargs):
workflow_input = get_workflow_input(args, inputs)
set_attribute(DATA_INPUT_KEY, workflow_input)

if to_wrap.get('skip_span'):
return_value = wrapped(*args, **kwargs)
botocore_processor(tracer, to_wrap, wrapped, instance, args, kwargs, return_value)
return return_value

with tracer.start_as_current_span(name) as span:
pre_task_processing(to_wrap, instance, args, span)
Expand All @@ -78,6 +83,21 @@ def task_wrapper(tracer: Tracer, to_wrap, wrapped, instance, args, kwargs):

return return_value

def botocore_processor(tracer, to_wrap, wrapped, instance, args, kwargs,return_value):
if kwargs.get("service_name") == "sagemaker-runtime":
return_value.invoke_endpoint = _instrumented_endpoint_invoke(to_wrap,return_value,return_value.invoke_endpoint,tracer)

def _instrumented_endpoint_invoke(to_wrap, instance, fn, tracer):
@wraps(fn)
def with_instrumentation(*args, **kwargs):

with tracer.start_as_current_span("botocore-sagemaker-invoke-endpoint") as span:
response = fn(*args, **kwargs)
process_span(to_wrap, span, instance=instance,args=args, kwargs=kwargs, return_value=response)
return response

return with_instrumentation

def get_workflow_input(args, inputs):
if args is not None and len(args) > 0:
for value in args[0].values():
Expand All @@ -95,29 +115,25 @@ def process_span(to_wrap, span, instance, args, kwargs, return_value):
# Check if the output_processor is a valid JSON (in Python, that means it's a dictionary)
instance_args = {}
set_provider_name(instance, instance_args)
span_index = 1
span_index = 0
if is_root_span(span):
span_index += set_workflow_attributes(to_wrap, span, span_index)
span_index += set_app_hosting_identifier_attribute(span, span_index)
span_index += set_workflow_attributes(to_wrap, span, span_index+1)
span_index += set_app_hosting_identifier_attribute(span, span_index+1)
if 'output_processor' in to_wrap:
output_processor=to_wrap['output_processor']
if isinstance(output_processor, dict) and len(output_processor) > 0:
if 'type' in output_processor:
span.set_attribute("span.type", output_processor['type'])
else:
logger.warning("type of span not found or incorrect written in entity json")
count = 0
if 'attributes' in output_processor:
count = len(output_processor["attributes"])
span.set_attribute("entity.count", count)
span_index = 1
for processors in output_processor["attributes"]:
for processor in processors:
attribute = processor.get('attribute')
accessor = processor.get('accessor')

if attribute and accessor:
attribute_name = f"entity.{span_index}.{attribute}"
attribute_name = f"entity.{span_index+1}.{attribute}"
try:
arguments = {"instance":instance, "args":args, "kwargs":kwargs, "output":return_value}
result = eval(accessor)(arguments)
Expand All @@ -130,7 +146,6 @@ def process_span(to_wrap, span, instance, args, kwargs, return_value):
span_index += 1
else:
logger.warning("attributes not found or incorrect written in entity json")
span.set_attribute("span.count", count)
if 'events' in output_processor:
events = output_processor['events']
accessor_mapping = {
Expand All @@ -156,6 +171,8 @@ def process_span(to_wrap, span, instance, args, kwargs, return_value):

else:
logger.warning("empty or entities json is not in correct format")
if span_index > 0:
span.set_attribute("entity.count", span_index)

def set_workflow_attributes(to_wrap, span: Span, span_index):
return_value = 1
Expand Down
3 changes: 2 additions & 1 deletion src/monocle_apptrace/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from monocle_apptrace.haystack import HAYSTACK_METHODS
from monocle_apptrace.langchain import LANGCHAIN_METHODS
from monocle_apptrace.llamaindex import LLAMAINDEX_METHODS
from monocle_apptrace.botocore import BOTOCORE_METHODS
from monocle_apptrace.wrap_common import task_wrapper

# pylint: disable=too-few-public-methods
Expand All @@ -23,4 +24,4 @@ def __init__(

self.wrapper = wrapper

INBUILT_METHODS_LIST = LANGCHAIN_METHODS + LLAMAINDEX_METHODS + HAYSTACK_METHODS
INBUILT_METHODS_LIST = LANGCHAIN_METHODS + LLAMAINDEX_METHODS + HAYSTACK_METHODS + BOTOCORE_METHODS
145 changes: 145 additions & 0 deletions tests/sagemaker_sample.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
from opentelemetry import trace
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
from monocle_apptrace.instrumentor import setup_monocle_telemetry
setup_monocle_telemetry(
workflow_name="sagemaker_workflow_1",
span_processors=[BatchSpanProcessor(ConsoleSpanExporter())],
wrapper_methods=[])

# Continue with your code
from langchain_community.vectorstores import OpenSearchVectorSearch
import boto3
from requests_aws4auth import AWS4Auth

from langchain_community.embeddings import SagemakerEndpointEmbeddings
from opensearchpy import RequestsHttpConnection
import os
import json
from typing import Dict, List

from langchain_community.embeddings.sagemaker_endpoint import EmbeddingsContentHandler

def produce_response(query):
#similar_documents = search_similar_documents_opensearch(query)
return produce_llm_response(query)


def produce_llm_response(query):
client = boto3.client('sagemaker-runtime', region_name='us-east-1')

endpoint_name = "okahu-sagemaker-rag-qa-ep" # Your endpoint name.
content_type = "application/json" # The MIME type of the input data in the request body.
accept = "application/json" # The desired MIME type of the inference in the response.

data = {
"context": """You are an assistant for question-answering tasks. \
Use the following pieces of retrieved context to answer the question. \
If you don't know the answer, just say that you don't know. \
Use three sentences maximum and keep the answer concise.\
""" ,
"question": query
}

response = client.invoke_endpoint(
EndpointName=endpoint_name,
ContentType=content_type,
Accept=accept,
Body=json.dumps(data)
)

content = response['Body'].read()

# Print the content
response_str = content.decode('utf-8')
print(f"The response provided by the endpoint: {response_str}")

answer = json.loads(response_str)["answer"]
return answer


def build_context(similar_documents):
if len(similar_documents) > 0:
documents_concatenated = "-------------END OF DOCUMENT-------------".join(similar_documents)
return f"""Based on embedding lookup, we've found these documents to be the most relevant from the knowledge
base: {documents_concatenated}"""
else:
return "We couldn't locate any documents that would be relevant for this question. Please apologize politely " \
"and say that you don't know the answer if this is not something you can answer on your own."


def search_similar_documents_opensearch(query):
opensearch_url = os.environ['OPENSEARCH_ENDPOINT_URL']
index_name = "embeddings" # Your index name
content_handler = ContentHandler()
sagemaker_endpoint_embeddings = SagemakerEndpointEmbeddings(endpoint_name="okahu-sagemaker-rag-embedding-ep",
region_name="us-east-1",
content_handler=content_handler)
region = 'us-east-1'
service = 'aoss'
credentials = boto3.Session().get_credentials()
aws_auth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service,
session_token=credentials.token)
doc_search = OpenSearchVectorSearch(
opensearch_url=opensearch_url,
index_name=index_name,
embedding_function=sagemaker_endpoint_embeddings,
http_auth=aws_auth,
use_ssl=True,
verify_certs=True,
ssl_assert_hostname=True,
ssl_show_warn=True,
connection_class=RequestsHttpConnection
)
docs = doc_search.similarity_search(query)
print(f"Retrieved docs: {docs}")
return [doc.page_content for doc in docs]


class ContentHandler(EmbeddingsContentHandler):
content_type = "application/json"
accepts = "application/json"

def transform_input(self, inputs: List[str], model_kwargs: Dict) -> bytes:
input_str = json.dumps({"text_inputs": inputs, **model_kwargs})
return input_str.encode("utf-8")

def transform_output(self, output: bytes) -> List[List[float]]:
response_json = json.loads(output.read().decode("utf-8"))
return response_json["embedding"]


produce_response("hello")

# {
# "name": "botocore-sagemaker-invoke-endpoint",
# "context": {
# "trace_id": "0x74c550d05bd44bd4bc7791230f2838c1",
# "span_id": "0x12636d1179add9a6",
# "trace_state": "[]"
# },
# "kind": "SpanKind.INTERNAL",
# "parent_id": null,
# "start_time": "2024-12-05T09:42:08.229975Z",
# "end_time": "2024-12-05T09:42:10.207668Z",
# "status": {
# "status_code": "UNSET"
# },
# "attributes": {
# "entity.1.name": "sagemaker_workflow_1",
# "entity.1.type": "workflow.generic",
# "span.type": "inference",
# "entity.2.type": "inference.aws_sagemaker",
# "entity.2.inference_endpoint": "https://runtime.sagemaker.us-east-1.amazonaws.com",
# "entity.3.name": "okahu-sagemaker-rag-qa-ep",
# "entity.3.type": "model.llm.okahu-sagemaker-rag-qa-ep",
# "entity.count": 3
# },
# "events": [],
# "links": [],
# "resource": {
# "attributes": {
# "service.name": "sagemaker_workflow_1"
# },
# "schema_url": ""
# }
# }
Loading