From 73c930e0d578c803cb99c3dd3700744d7209be36 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9my=20Abi-Khalil?= Date: Fri, 6 Dec 2024 15:02:32 +0100 Subject: [PATCH 01/17] Refs #NAV-3691 - In progress, integration of otlp. --- source/jormungandr/jormungandr/__init__.py | 2 + source/jormungandr/jormungandr/api.py | 27 ++ .../autocomplete/abstract_autocomplete.py | 2 + .../jormungandr/equipments/sytral.py | 2 + source/jormungandr/jormungandr/exceptions.py | 3 + .../external_services/external_service.py | 2 + source/jormungandr/jormungandr/otlp.py | 261 ++++++++++++++++++ .../bss/common_bss_provider.py | 2 + .../car/common_car_park_provider.py | 2 + .../realtime_schedule/realtime_proxy.py | 4 + .../jormungandr/scenarios/distributed.py | 16 +- .../jormungandr/scenarios/new_default.py | 6 +- .../ridesharing/ridesharing_service.py | 4 + .../ridesharing_service_manager.py | 6 +- source/jormungandr/jormungandr/schedule.py | 15 +- .../street_network/street_network.py | 2 + source/jormungandr/jormungandr/utils.py | 2 + 17 files changed, 342 insertions(+), 16 deletions(-) create mode 100644 source/jormungandr/jormungandr/otlp.py diff --git a/source/jormungandr/jormungandr/__init__.py b/source/jormungandr/jormungandr/__init__.py index 9d8c8ece20..b0c6bf0269 100644 --- a/source/jormungandr/jormungandr/__init__.py +++ b/source/jormungandr/jormungandr/__init__.py @@ -47,6 +47,8 @@ if app.config.get(str('PATCH_WITH_GEVENT_SOCKET'), False): init.patch_http(patch_level=app.config.get(str('PATCH_WITH_GEVENT_SOCKET_LEVEL'), "socket")) +from jormungandr import otlp + from jormungandr import new_relic new_relic.init(app.config.get(str('NEWRELIC_CONFIG_PATH'), None)) diff --git a/source/jormungandr/jormungandr/api.py b/source/jormungandr/jormungandr/api.py index 5ec030498e..a37cd8c3af 100644 --- a/source/jormungandr/jormungandr/api.py +++ b/source/jormungandr/jormungandr/api.py @@ -44,6 +44,7 @@ from jormungandr.authentication import get_user, get_token, get_app_name, get_used_coverages from jormungandr._version import __version__ import six +from jormungandr.otlp import otlp_instance @rest_api.representation("text/jsonp") @@ -126,6 +127,32 @@ def add_info_newrelic(response, *args, **kwargs): return response +@app.after_request +def record_request_call_to_otlp(response, *args, **kwargs): + try: + token = get_token() + user = get_user(token=token, abort_if_no_token=False) if token else None + user_id = str(user.id) if user else "unknown" + token_name = get_app_name(token) if user else "unknown" + version = __version__ + coverages = get_used_coverages() + coverage = coverages[0] if coverages else "unknown" + labels = { + "token": token, + "user_id": user_id, + "token_name": token_name, + "version": version, + "coverage": coverage, + "status": response.status_code, + } + otlp_instance.send_request_call_metrics(labels) + except: + logger = logging.getLogger(__name__) + logger.exception('error while reporting to otlp:') + + return response + + # If modules are configured, then load and run them if 'MODULES' in rest_api.app.config: rest_api.module_loader = ModulesLoader(rest_api) diff --git a/source/jormungandr/jormungandr/autocomplete/abstract_autocomplete.py b/source/jormungandr/jormungandr/autocomplete/abstract_autocomplete.py index 8e80a725fd..e58f28c304 100644 --- a/source/jormungandr/jormungandr/autocomplete/abstract_autocomplete.py +++ b/source/jormungandr/jormungandr/autocomplete/abstract_autocomplete.py @@ -34,6 +34,7 @@ from jormungandr.exceptions import UnknownObject, TechnicalError, log_exception import six from jormungandr.new_relic import record_custom_event +from jormungandr.otlp import otlp_instance class AutocompleteError(RuntimeError): @@ -79,6 +80,7 @@ def record_status(self, status, exc=None): if exc is not None: data["cause"] = str(exc) record_custom_event('autocomplete_status', data) + otlp_instance.send_event_metric('autocomplete_status', data) def get_object_by_uri(self, uri, request_id=None, instances=None, current_datetime=None): """ diff --git a/source/jormungandr/jormungandr/equipments/sytral.py b/source/jormungandr/jormungandr/equipments/sytral.py index c85f2047b8..99a7a08e40 100644 --- a/source/jormungandr/jormungandr/equipments/sytral.py +++ b/source/jormungandr/jormungandr/equipments/sytral.py @@ -30,6 +30,7 @@ from __future__ import absolute_import, print_function, unicode_literals, division from jormungandr import cache, app, new_relic +from jormungandr.otlp import otlp_instance from navitiacommon import type_pb2 from dateutil import parser from jormungandr.utils import date_to_timestamp, PY3 @@ -118,6 +119,7 @@ def record_call(self, status, **kwargs): params = {'parking_system_id': "SytralRT", 'dataset': "sytral", 'status': status} params.update(kwargs) new_relic.record_custom_event('parking_status', params) + otlp_instance.send_event_metric('parking_status', params) def _fill_equipment_details(self, equipment_form_web_service, equipment_details): equipment_details.id = equipment_form_web_service['id'] diff --git a/source/jormungandr/jormungandr/exceptions.py b/source/jormungandr/jormungandr/exceptions.py index 403aedc150..bffbb349a1 100644 --- a/source/jormungandr/jormungandr/exceptions.py +++ b/source/jormungandr/jormungandr/exceptions.py @@ -32,6 +32,7 @@ from werkzeug.exceptions import HTTPException import logging from jormungandr.new_relic import record_exception +from jormungandr.otlp import otlp_instance __all__ = [ "RegionNotFound", @@ -166,9 +167,11 @@ def log_exception(sender, exception, **extra): logger.debug(error) if exception.code >= 500: record_exception() + otlp_instance.record_exception(exception) else: logger.exception(error) record_exception() + otlp_instance.record_exception(exception) class StatManagerError(RuntimeError): diff --git a/source/jormungandr/jormungandr/external_services/external_service.py b/source/jormungandr/jormungandr/external_services/external_service.py index 872ce63900..e0172f9731 100644 --- a/source/jormungandr/jormungandr/external_services/external_service.py +++ b/source/jormungandr/jormungandr/external_services/external_service.py @@ -36,6 +36,7 @@ import logging import requests as requests from six.moves.urllib.parse import urlencode +from jormungandr.otlp import otlp_instance class ExternalServiceError(RuntimeError): @@ -79,6 +80,7 @@ def record_call(self, url, status, **kwargs): params = {'external_service_id': "Forseti", 'status': status, 'external_service_url': url} params.update(kwargs) new_relic.record_custom_event('external_service_status', params) + otlp_instance.send_event_metric('external_service_status', params) @abc.abstractmethod def get_response(self, arguments): diff --git a/source/jormungandr/jormungandr/otlp.py b/source/jormungandr/jormungandr/otlp.py new file mode 100644 index 0000000000..b90ded4621 --- /dev/null +++ b/source/jormungandr/jormungandr/otlp.py @@ -0,0 +1,261 @@ +# coding=utf-8 + +# Copyright (c) 2001, Canal TP and/or its affiliates. All rights reserved. +# +# This file is part of Navitia, +# the software to build cool stuff with public transport. +# +# Hope you'll enjoy and contribute to this project, +# powered by Hove (www.hove.com). +# Help us simplify mobility and open public transport: +# a non ending quest to the responsive locomotion way of traveling! +# +# LICENCE: This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +# +# Stay tuned using +# twitter @navitia +# [matrix] channel #navitia:matrix.org (https://app.element.io/#/room/#navitia:matrix.org) +# https://groups.google.com/d/forum/navitia +# www.navitia.io + +import logging +from typing import Dict + +from flask import request + +from opentelemetry.sdk.resources import SERVICE_NAME, Resource +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter +from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter +from opentelemetry.trace.status import Status, StatusCode + +from opentelemetry import trace, metrics +import os + + +class OtlpMeta(type): + _instance: Dict = {} + + def __call__(cls, *args, **kwds): + if cls not in cls._instance: + cls._instance[cls] = super().__call__(*args, **kwds) + + return cls._instance[cls] + + +class Otlp(metaclass=OtlpMeta): + __service_name = "jormungandr" + __request_call_labels = {} + + def __init__(self) -> None: + self.__log = logging.getLogger(__name__) + + try: + self.__environment = os.getenv("ENVIRONMENT", "local") + self.__resource = Resource( + attributes={ + SERVICE_NAME: self.__service_name, + } + ) + self.__init_tracer() + self.__init_meter() + + self.__declare_counters() + self.__declare_histograms() + except Exception: + self.__log.exception("failure while initializing otlp") + self._tracer = None + self._meter = None + + def __init_tracer(self): + trace_exporter = OTLPSpanExporter() + span_processor = BatchSpanProcessor(trace_exporter) + trace_provider = TracerProvider(resource=self.__resource) + + trace_provider.add_span_processor(span_processor) + trace.set_tracer_provider(trace_provider) + + self._tracer = trace.get_tracer(__name__) + + def __init_meter(self): + exporter = OTLPMetricExporter() + reader = PeriodicExportingMetricReader(exporter) + provider = MeterProvider(resource=self.__resource, metric_readers=[reader]) + + metrics.set_meter_provider(provider) + + self._meter = metrics.get_meter(__name__) + + def __declare_counters(self) -> None: + self.__jormungandr_exception = self._meter.create_counter( + name="jormungandr_exception", description="Count exception" + ) + self.__jormungandr_request_call = self._meter.create_counter( + name="jormungandr_request_call", description="Count request call" + ) + self.__jormungandr_event = self._meter.create_counter( + name="jormungandr_event", description="Count event" + ) + + def __declare_histograms(self) -> None: + pass + + def get_tracer(self) -> trace.Tracer: + return self._tracer + + def record_exception(self, exception: BaseException, attributes: Dict = {}) -> None: + """ + record the exception currently handled to otlp + """ + if self._tracer: + # TODO: Can remove this and use directly request.id below ? + try: + navitia_request_id = request.id + except RuntimeError: + self.__log.exception("failure while getting request id. We are outside of a flask context :(") + navitia_request_id = 42 + + try: + span = trace.get_current_span() + span.set_attribute("navitia_request_id", str(navitia_request_id)) + for key, value in attributes.items(): + span.set_attribute(key, value) + span.set_status(Status(StatusCode.ERROR, "Exception")) + span.record_exception(exception) + except Exception: + self.__log.exception("failure while reporting to otlp (with trace)") + + if self._meter: + try: + self.__jormungandr_exception.add( + 1, + { + "exception_type": type(exception).__name__, + "status": Status(StatusCode.ERROR, "Exception"), + }, + ) + except Exception: + self.__log.exception("failure while reporting to otlp (with meter)") + + def send_request_call_metrics(self, labels=None) -> None: + if labels: + self.record_request_call_labels(labels) + + self.__request_call_labels["environment"] = self.__environment + + self.__jormungandr_request_call.add(1, self.__request_call_labels) + self.__request_call_labels.clear() + + def record_request_call_labels(self, labels: Dict) -> None: + self.__request_call_labels.update(labels) + + def record_request_call_label(self, label_name: str, label_value: str) -> None: + self.__request_call_labels[label_name] = label_value + + def send_event_metric(self, event_type: str, labels: Dict = {}) -> None: + labels["environment"] = self.__environment + labels["event_type"] = event_type + self.__jormungandr_event.add(1, labels) + + # def send_distributed_event(self, call_name: str, group_name: str, status) -> None: + # labels = { + # "service": service_name, + # "call": call_name, + # "group": group_name, + # "status": status, + # "az": os.getenv('JORMUNGANDR_DEPLOYMENT_AZ', "unknown"), + # } + + +otlp_instance = Otlp() + + +def __get_common_event_params(service_name, call_name, status="ok"): + return { + "service": service_name, + "call": call_name, + "status": status, + "az": os.getenv('JORMUNGANDR_DEPLOYMENT_AZ', "unknown"), + } + + +# # Using existing library of opentelemetry without instrumenting +# from opentelemetry.sdk.resources import SERVICE_NAME, Resource + +# # Metrics part +# from opentelemetry import metrics +# from opentelemetry.sdk.metrics import MeterProvider +# from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter +# from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader + +# # Traces Part +# from opentelemetry import trace +# from opentelemetry.sdk.trace import TracerProvider +# from opentelemetry.sdk.trace.export import BatchSpanProcessor +# from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter + +# # OpenTelemetry configuration + +# resource = Resource( +# attributes={ +# SERVICE_NAME: "my_jormungandr_service", +# } +# ) + +# # Metrics configuration +# exporter = OTLPMetricExporter() +# reader = PeriodicExportingMetricReader(exporter) + +# provider = MeterProvider(resource=resource, metric_readers=[reader]) +# metrics.set_meter_provider(provider) + +# meter = metrics.get_meter("my_jormungandr_meter") +# my_metric_counter = meter.create_counter(name="my_jormungandr_from_otlp_lib", description="An example counter") +# my_metric_histogram = meter.create_histogram( +# name="my_jormungandr_histogram_from_otlp_lib", description="An example histogram" +# ) + +# my_metric_counter.add(1, {"environment": "local"}) + + +# # Trace configuration +# trace_exporter = OTLPSpanExporter() + +# span_processor = BatchSpanProcessor(trace_exporter) + +# trace_provider = TracerProvider(resource=resource) +# trace_provider.add_span_processor(span_processor) +# trace.set_tracer_provider(trace_provider) + +# tracer = trace.get_tracer(__name__) + + +# my_trace = trace.get_tracer("my_jormungandr_tracer") + + +# @app.route("/my_trace") +# def call_http(): +# logging.info("-----------------------------------------------------") +# my_metric_counter.add(1, {"environment": "local"}) +# my_metric_histogram.record(0.42, {"environment": "local"}) + +# with tracer.start_as_current_span("example-span") as span: +# span.set_attribute("example-attribute", "example-value") +# print("Doing some work...") +# logging.info("-----------------------------------------------------") + +# return 'good ?' diff --git a/source/jormungandr/jormungandr/parking_space_availability/bss/common_bss_provider.py b/source/jormungandr/jormungandr/parking_space_availability/bss/common_bss_provider.py index 54b070516b..98f0246a19 100644 --- a/source/jormungandr/jormungandr/parking_space_availability/bss/common_bss_provider.py +++ b/source/jormungandr/jormungandr/parking_space_availability/bss/common_bss_provider.py @@ -27,6 +27,7 @@ # https://groups.google.com/d/forum/navitia # www.navitia.io from jormungandr import new_relic +from jormungandr.otlp import otlp_instance from jormungandr.parking_space_availability import AbstractParkingPlacesProvider from abc import abstractmethod from jormungandr.parking_space_availability.bss.stands import Stands, StandsStatus @@ -59,3 +60,4 @@ def record_call(self, status, **kwargs): params = {'bss_system_id': six.text_type(self.network), 'status': status} params.update(kwargs) new_relic.record_custom_event('bss_status', params) + otlp_instance.record_custom_event('bss_status', params) diff --git a/source/jormungandr/jormungandr/parking_space_availability/car/common_car_park_provider.py b/source/jormungandr/jormungandr/parking_space_availability/car/common_car_park_provider.py index ef06972d31..8a5ca004f6 100644 --- a/source/jormungandr/jormungandr/parking_space_availability/car/common_car_park_provider.py +++ b/source/jormungandr/jormungandr/parking_space_availability/car/common_car_park_provider.py @@ -35,6 +35,7 @@ import requests as requests from jormungandr import cache, app, new_relic +from jormungandr.otlp import otlp_instance from jormungandr.parking_space_availability import AbstractParkingPlacesProvider from jormungandr.ptref import FeedPublisher @@ -120,3 +121,4 @@ def record_call(self, status, **kwargs): params = {'parking_system_id': self.provider_name, 'dataset': self.dataset, 'status': status} params.update(kwargs) new_relic.record_custom_event('parking_status', params) + otlp_instance.record_custom_event('parking_status', params) diff --git a/source/jormungandr/jormungandr/realtime_schedule/realtime_proxy.py b/source/jormungandr/jormungandr/realtime_schedule/realtime_proxy.py index 1288025919..2107c934d7 100644 --- a/source/jormungandr/jormungandr/realtime_schedule/realtime_proxy.py +++ b/source/jormungandr/jormungandr/realtime_schedule/realtime_proxy.py @@ -35,6 +35,7 @@ from jormungandr.utils import timestamp_to_datetime, record_external_failure from jormungandr.utils import date_to_timestamp, pb_del_if from jormungandr import new_relic +from jormungandr.otlp import otlp_instance from navitiacommon import type_pb2 import datetime import hashlib @@ -281,6 +282,7 @@ def record_internal_failure(self, message, comment=None): if comment is not None: params['comment'] = comment new_relic.record_custom_event('realtime_internal_failure', params) + otlp_instance.send_event_metric('realtime_internal_failure', params) def record_call(self, status, **kwargs): """ @@ -289,6 +291,7 @@ def record_call(self, status, **kwargs): params = {'realtime_system_id': six.text_type(self.rt_system_id), 'status': status} params.update(kwargs) new_relic.record_custom_event('realtime_status', params) + otlp_instance.send_event_metric('realtime_status', params) def record_additional_info(self, status, **kwargs): """ @@ -297,6 +300,7 @@ def record_additional_info(self, status, **kwargs): params = {'realtime_system_id': six.text_type(self.rt_system_id), 'status': status} params.update(kwargs) new_relic.record_custom_event('realtime_proxy_additional_info', params) + otlp_instance.send_event_metric('realtime_proxy_additional_info', params) @cache.memoize(app.config.get(str('CACHE_CONFIGURATION'), {}).get(str('TIMEOUT_PTOBJECTS'), 600)) def _get_direction(self, line_uri, object_code, default_value): diff --git a/source/jormungandr/jormungandr/scenarios/distributed.py b/source/jormungandr/jormungandr/scenarios/distributed.py index 43575590c7..2ab2ec112f 100644 --- a/source/jormungandr/jormungandr/scenarios/distributed.py +++ b/source/jormungandr/jormungandr/scenarios/distributed.py @@ -53,6 +53,7 @@ updated_common_journey_request_with_default, ) from jormungandr.new_relic import record_custom_parameter +from jormungandr.otlp import otlp_instance from navitiacommon import response_pb2, type_pb2 from flask_restful import abort from .helper_classes.timer_logger_helper import timed_logger @@ -460,7 +461,7 @@ def _compute_isochrone_common( pt_journey_pool = PtJourneyPool(**pt_journey_args) res = [] - for (dep_mode, arr_mode, future_pt_journey) in pt_journey_pool: + for dep_mode, arr_mode, future_pt_journey in pt_journey_pool: logger.debug("waiting for pt journey starts with %s and ends with %s", dep_mode, arr_mode) pt_journeys = wait_and_get_pt_journeys(future_pt_journey, False) if pt_journeys: @@ -499,6 +500,7 @@ def call_kraken( context=None, ): record_custom_parameter('scenario', 'distributed') + otlp_instance.record_request_call_label('scenario', 'distributed') logger = logging.getLogger(__name__) """ All spawned futures must be started(if they're not yet started) when leaving the scope. @@ -590,13 +592,13 @@ def graphical_isochrones(self, request, instance): if request.get("max_duration") is None: request["max_duration"] = max(request["boundary_duration[]"], key=int) if request.get('additional_time_after_first_section_taxi') is None: - request[ - 'additional_time_after_first_section_taxi' - ] = instance.additional_time_after_first_section_taxi + request['additional_time_after_first_section_taxi'] = ( + instance.additional_time_after_first_section_taxi + ) if request.get('additional_time_before_last_section_taxi') is None: - request[ - 'additional_time_before_last_section_taxi' - ] = instance.additional_time_after_first_section_taxi + request['additional_time_before_last_section_taxi'] = ( + instance.additional_time_after_first_section_taxi + ) if request.get('on_street_bike_parking_duration') is None: request['on_street_bike_parking_duration'] = instance.on_street_bike_parking_duration diff --git a/source/jormungandr/jormungandr/scenarios/new_default.py b/source/jormungandr/jormungandr/scenarios/new_default.py index 681dadf331..e980c1c68b 100644 --- a/source/jormungandr/jormungandr/scenarios/new_default.py +++ b/source/jormungandr/jormungandr/scenarios/new_default.py @@ -98,6 +98,7 @@ from jormungandr.autocomplete.geocodejson import GeocodeJson from jormungandr import global_autocomplete from jormungandr.new_relic import record_custom_parameter +from jormungandr.otlp import otlp_instance from jormungandr import fallback_modes from six.moves import filter @@ -604,7 +605,7 @@ def _build_candidate_pool_and_sections_set(journeys): candidates_pool = list() idx_of_jrny_must_keep = list() - for (i, jrny) in enumerate(journeys): + for i, jrny in enumerate(journeys): if set(jrny.tags) & set(JOURNEY_TAGS_TO_RETAIN) or jrny.type in set(JOURNEY_TYPES_TO_RETAIN): idx_of_jrny_must_keep.append(i) sections_set |= set([_get_section_id(s) for s in jrny.sections if s.type in SECTION_TYPES_TO_RETAIN]) @@ -972,6 +973,8 @@ def tag_reliable_journeys(responses): "physical_mode:LongDistanceTrain", ] reliable_fallback_modes = [response_pb2.Bike, response_pb2.Walking] + + # returns true if : # - a journey has at least one public transport section # - all public transport sections use reliable physical modes @@ -1564,6 +1567,7 @@ def call_kraken( # TODO: handle min_alternative_journeys # TODO: call first bss|bss and do not call walking|walking if no bss in first results record_custom_parameter('scenario', 'new_default') + otlp_instance.record_request_call_label('scenario', 'new_default') resp = [] logger = logging.getLogger(__name__) futures = [] diff --git a/source/jormungandr/jormungandr/scenarios/ridesharing/ridesharing_service.py b/source/jormungandr/jormungandr/scenarios/ridesharing/ridesharing_service.py index d128b03564..18b5759060 100644 --- a/source/jormungandr/jormungandr/scenarios/ridesharing/ridesharing_service.py +++ b/source/jormungandr/jormungandr/scenarios/ridesharing/ridesharing_service.py @@ -32,6 +32,7 @@ import abc import six from jormungandr import new_relic +from jormungandr.otlp import otlp_instance from jormungandr.utils import decode_polyline from navitiacommon import type_pb2 from collections import namedtuple @@ -170,6 +171,7 @@ def record_internal_failure(self, message): 'ridesharing_service_url': self.service_url, } new_relic.record_custom_event('ridesharing_internal_failure', params) + otlp_instance.send_event_metric('ridesharing_internal_failure', params) def record_call(self, status, **kwargs): """ @@ -182,6 +184,7 @@ def record_call(self, status, **kwargs): } params.update(kwargs) new_relic.record_custom_event('ridesharing_status', params) + otlp_instance.send_event_metric('ridesharing_status', params) def record_additional_info(self, status, **kwargs): """ @@ -194,6 +197,7 @@ def record_additional_info(self, status, **kwargs): } params.update(kwargs) new_relic.record_custom_event('ridesharing_proxy_additional_info', params) + otlp_instance.send_event_metric('ridesharing_proxy_additional_info', params) def __eq__(self, other): return all( diff --git a/source/jormungandr/jormungandr/scenarios/ridesharing/ridesharing_service_manager.py b/source/jormungandr/jormungandr/scenarios/ridesharing/ridesharing_service_manager.py index cf21c4a5ba..1924d7d1ad 100644 --- a/source/jormungandr/jormungandr/scenarios/ridesharing/ridesharing_service_manager.py +++ b/source/jormungandr/jormungandr/scenarios/ridesharing/ridesharing_service_manager.py @@ -33,6 +33,7 @@ from jormungandr import utils import six from jormungandr import new_relic +from jormungandr.otlp import otlp_instance from jormungandr import app from jormungandr.scenarios import journey_filter from jormungandr.scenarios.ridesharing.ridesharing_journey import Gender @@ -286,7 +287,10 @@ def build_ridesharing_journeys(self, from_pt_obj, to_pt_obj, request_dates, inst self.logger.exception( 'Error while retrieving ridesharing ads and feed_publishers from %s to %s: {}', from_str, to_str ) - new_relic.record_custom_event('ridesharing_internal_failure', {'message': str(e)}) + params = {'message': str(e)} + new_relic.record_custom_event('ridesharing_internal_failure', params) + otlp_instance.send_event_metric('ridesharing_internal_failure') + otlp_instance.record_exception(e, params) rsjs = [] fps = [] diff --git a/source/jormungandr/jormungandr/schedule.py b/source/jormungandr/jormungandr/schedule.py index a562208583..09f0f16db2 100644 --- a/source/jormungandr/jormungandr/schedule.py +++ b/source/jormungandr/jormungandr/schedule.py @@ -39,6 +39,8 @@ from navitiacommon import type_pb2, request_pb2, response_pb2 from copy import deepcopy from jormungandr import new_relic +from jormungandr.otlp import otlp_instance +from jormungandr.otlp import otlp_instance import gevent import gevent.pool @@ -213,9 +215,9 @@ def _get_realtime_proxy(self, route_point): rt_system = self.instance.realtime_proxy_manager.get(rt_system_code) if not rt_system: log.info('impossible to find {}, no realtime added'.format(rt_system_code)) - new_relic.record_custom_event( - 'realtime_internal_failure', {'rt_system_id': rt_system_code, 'message': 'no handler found'} - ) + params = {'rt_system_id': rt_system_code, 'message': 'no handler found'} + new_relic.record_custom_event('realtime_internal_failure', params) + otlp_instance.send_event_metric('realtime_internal_failure', params) return None return rt_system @@ -237,10 +239,9 @@ def _get_next_realtime_passages(self, rt_system, route_point, request): log.exception( 'failure while requesting next passages to external RT system {}'.format(rt_system.rt_system_id) ) - new_relic.record_custom_event( - 'realtime_internal_failure', - {'rt_system_id': six.text_type(rt_system.rt_system_id), 'message': str(e)}, - ) + params = {'rt_system_id': six.text_type(rt_system.rt_system_id), 'message': str(e)} + new_relic.record_custom_event('realtime_internal_failure', params) + otlp_instance.send_event_metric('realtime_internal_failure', params) if next_rt_passages is None: log.debug('no next passages, using base schedule') diff --git a/source/jormungandr/jormungandr/street_network/street_network.py b/source/jormungandr/jormungandr/street_network/street_network.py index 8ff4343ec6..c146b64580 100644 --- a/source/jormungandr/jormungandr/street_network/street_network.py +++ b/source/jormungandr/jormungandr/street_network/street_network.py @@ -31,6 +31,7 @@ from __future__ import absolute_import, print_function, unicode_literals, division from jormungandr import utils, new_relic +from jormungandr.otlp import otlp_instance import abc from enum import Enum @@ -155,6 +156,7 @@ def record_call(self, status, **kwargs): params = {'streetnetwork_id': six.text_type(self.sn_system_id), 'status': status} params.update(kwargs) new_relic.record_custom_event('streetnetwork', params) + otlp_instance.send_event_metric('streetnetwork', params) def _add_feed_publisher(self, resp): sn_feed = self.feed_publisher() diff --git a/source/jormungandr/jormungandr/utils.py b/source/jormungandr/jormungandr/utils.py index 1e76ec4f94..d99001a414 100644 --- a/source/jormungandr/jormungandr/utils.py +++ b/source/jormungandr/jormungandr/utils.py @@ -41,6 +41,7 @@ from jormungandr.exceptions import ConfigException, UnableToParse, InvalidArguments from six.moves.urllib.parse import urlparse from jormungandr import new_relic, app +from jormungandr.otlp import otlp_instance from six.moves import zip, range from jormungandr.exceptions import TechnicalError from flask import request, g @@ -636,6 +637,7 @@ def get_first_pt_section(journey): def record_external_failure(message, connector_type, connector_name): params = {'{}_system_id'.format(connector_type): six.text_type(connector_name), 'message': message} new_relic.record_custom_event('{}_external_failure'.format(connector_type), params) + otlp_instance.send_event_metric('{}_external_failure'.format(connector_type), params) def decode_polyline(encoded, precision=6): From 084d5c95cb0795036f03dd61c654f253c49e896d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9my=20Abi-Khalil?= Date: Fri, 6 Dec 2024 16:33:02 +0100 Subject: [PATCH 02/17] Refs #NAV-3691 - In progress, integration of otlp. distributedEvent part. --- source/jormungandr/jormungandr/new_relic.py | 9 +- source/jormungandr/jormungandr/otlp.py | 162 ++++++-------------- 2 files changed, 55 insertions(+), 116 deletions(-) diff --git a/source/jormungandr/jormungandr/new_relic.py b/source/jormungandr/jormungandr/new_relic.py index 203a04fad1..61d7102649 100644 --- a/source/jormungandr/jormungandr/new_relic.py +++ b/source/jormungandr/jormungandr/new_relic.py @@ -36,6 +36,7 @@ from typing import Text, Callable from contextlib import contextmanager from jormungandr import app +from jormungandr.otlp import otlp_instance try: from newrelic import agent @@ -130,9 +131,10 @@ def get_common_event_params(service_name, call_name, status="ok"): } +# TODO: Move this function into otlp.py when we will remove newrelic def distributedEvent(call_name, group_name): """ - Custom event that we publish to New Relic for distributed scenario + Custom event that we publish to New Relic and Grafana for distributed scenario """ def wrap(func): @@ -148,6 +150,7 @@ def wrapper(obj, service, *args, **kwargs): except Exception as e: event_params["status"] = "failed" event_params.update({"exception": e}) + otlp_instance.record_exception(e, event_params) raise duration = timeit.default_timer() - start_time @@ -156,6 +159,10 @@ def wrapper(obj, service, *args, **kwargs): # Send the custom event to newrelic ! record_custom_event("distributed", event_params) + event_params.pop("duration") + otlp_instance.send_event_metric("distributed", event_params) + otlp_instance.send_distributed_duration_metric(event_params, duration) + return result return wrapper diff --git a/source/jormungandr/jormungandr/otlp.py b/source/jormungandr/jormungandr/otlp.py index b90ded4621..77afd13ddc 100644 --- a/source/jormungandr/jormungandr/otlp.py +++ b/source/jormungandr/jormungandr/otlp.py @@ -112,46 +112,54 @@ def __declare_counters(self) -> None: ) def __declare_histograms(self) -> None: - pass + self.__jormungandr_distributed_duration = self._meter.create_histogram( + name="jormungandr_distributed_duration", description="Distributed scenario duration" + ) def get_tracer(self) -> trace.Tracer: return self._tracer + def __record_exception_trace(self, exception: BaseException, attributes: Dict = {}) -> None: + # TODO: Can remove this and use directly request.id below ? Check if it works on SBX (remove this before merge !) + try: + navitia_request_id = request.id + except RuntimeError: + self.__log.exception("failure while getting request id. We are outside of a flask context :(") + navitia_request_id = 42 + + try: + span = trace.get_current_span() + span.set_attribute("navitia_request_id", str(navitia_request_id)) + for key, value in attributes.items(): + span.set_attribute(key, value) + span.set_status(Status(StatusCode.ERROR, "Exception")) + span.record_exception(exception) + except Exception: + self.__log.exception("failure while reporting to otlp (with trace)") + + def __record_exception_meter(self, exception: BaseException) -> None: + try: + self.__jormungandr_exception.add( + 1, + { + "exception_type": type(exception).__name__, + "status": Status(StatusCode.ERROR, "Exception"), + }, + ) + except Exception: + self.__log.exception("failure while reporting to otlp (with meter)") + def record_exception(self, exception: BaseException, attributes: Dict = {}) -> None: - """ - record the exception currently handled to otlp - """ if self._tracer: - # TODO: Can remove this and use directly request.id below ? - try: - navitia_request_id = request.id - except RuntimeError: - self.__log.exception("failure while getting request id. We are outside of a flask context :(") - navitia_request_id = 42 - - try: - span = trace.get_current_span() - span.set_attribute("navitia_request_id", str(navitia_request_id)) - for key, value in attributes.items(): - span.set_attribute(key, value) - span.set_status(Status(StatusCode.ERROR, "Exception")) - span.record_exception(exception) - except Exception: - self.__log.exception("failure while reporting to otlp (with trace)") + self.__record_exception_trace(exception, attributes) if self._meter: - try: - self.__jormungandr_exception.add( - 1, - { - "exception_type": type(exception).__name__, - "status": Status(StatusCode.ERROR, "Exception"), - }, - ) - except Exception: - self.__log.exception("failure while reporting to otlp (with meter)") + self.__record_exception_meter(exception) def send_request_call_metrics(self, labels=None) -> None: + if not self._meter: + return + if labels: self.record_request_call_labels(labels) @@ -167,95 +175,19 @@ def record_request_call_label(self, label_name: str, label_value: str) -> None: self.__request_call_labels[label_name] = label_value def send_event_metric(self, event_type: str, labels: Dict = {}) -> None: + if not self._meter: + return + labels["environment"] = self.__environment labels["event_type"] = event_type self.__jormungandr_event.add(1, labels) - # def send_distributed_event(self, call_name: str, group_name: str, status) -> None: - # labels = { - # "service": service_name, - # "call": call_name, - # "group": group_name, - # "status": status, - # "az": os.getenv('JORMUNGANDR_DEPLOYMENT_AZ', "unknown"), - # } - - -otlp_instance = Otlp() - - -def __get_common_event_params(service_name, call_name, status="ok"): - return { - "service": service_name, - "call": call_name, - "status": status, - "az": os.getenv('JORMUNGANDR_DEPLOYMENT_AZ', "unknown"), - } - - -# # Using existing library of opentelemetry without instrumenting -# from opentelemetry.sdk.resources import SERVICE_NAME, Resource - -# # Metrics part -# from opentelemetry import metrics -# from opentelemetry.sdk.metrics import MeterProvider -# from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter -# from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader - -# # Traces Part -# from opentelemetry import trace -# from opentelemetry.sdk.trace import TracerProvider -# from opentelemetry.sdk.trace.export import BatchSpanProcessor -# from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter - -# # OpenTelemetry configuration - -# resource = Resource( -# attributes={ -# SERVICE_NAME: "my_jormungandr_service", -# } -# ) - -# # Metrics configuration -# exporter = OTLPMetricExporter() -# reader = PeriodicExportingMetricReader(exporter) - -# provider = MeterProvider(resource=resource, metric_readers=[reader]) -# metrics.set_meter_provider(provider) - -# meter = metrics.get_meter("my_jormungandr_meter") -# my_metric_counter = meter.create_counter(name="my_jormungandr_from_otlp_lib", description="An example counter") -# my_metric_histogram = meter.create_histogram( -# name="my_jormungandr_histogram_from_otlp_lib", description="An example histogram" -# ) - -# my_metric_counter.add(1, {"environment": "local"}) - - -# # Trace configuration -# trace_exporter = OTLPSpanExporter() - -# span_processor = BatchSpanProcessor(trace_exporter) - -# trace_provider = TracerProvider(resource=resource) -# trace_provider.add_span_processor(span_processor) -# trace.set_tracer_provider(trace_provider) - -# tracer = trace.get_tracer(__name__) - - -# my_trace = trace.get_tracer("my_jormungandr_tracer") - + def send_distributed_duration_metric(self, labels, duration) -> None: + if self._meter: + return -# @app.route("/my_trace") -# def call_http(): -# logging.info("-----------------------------------------------------") -# my_metric_counter.add(1, {"environment": "local"}) -# my_metric_histogram.record(0.42, {"environment": "local"}) + labels["environment"] = self.__environment + self.__jormungandr_distributed_duration.record(duration, labels) -# with tracer.start_as_current_span("example-span") as span: -# span.set_attribute("example-attribute", "example-value") -# print("Doing some work...") -# logging.info("-----------------------------------------------------") -# return 'good ?' +otlp_instance = Otlp() From 2e77e065b76478c05b6890375eb481fc3ad0fdc6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9my=20Abi-Khalil?= Date: Mon, 9 Dec 2024 09:52:52 +0100 Subject: [PATCH 03/17] Refs #NAV-3691 - Add streetnetwork call & stat manage event metrics. --- source/jormungandr/jormungandr/new_relic.py | 9 ++++++++- source/jormungandr/jormungandr/otlp.py | 10 ++++++++++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/source/jormungandr/jormungandr/new_relic.py b/source/jormungandr/jormungandr/new_relic.py index 61d7102649..7bc457984b 100644 --- a/source/jormungandr/jormungandr/new_relic.py +++ b/source/jormungandr/jormungandr/new_relic.py @@ -131,7 +131,7 @@ def get_common_event_params(service_name, call_name, status="ok"): } -# TODO: Move this function into otlp.py when we will remove newrelic +# TODO: Update and move this function into otlp.py when we will remove newrelic def distributedEvent(call_name, group_name): """ Custom event that we publish to New Relic and Grafana for distributed scenario @@ -186,6 +186,7 @@ def record_streetnetwork_call(call_name, connector_name, mode, coverage_name): except Exception as e: event_params["status"] = "failed" event_params.update({"exception": e}) + otlp_instance.record_exception(e, event_params) raise duration = timeit.default_timer() - start_time @@ -193,6 +194,9 @@ def record_streetnetwork_call(call_name, connector_name, mode, coverage_name): # Send the custom event to newrelic ! record_custom_event(newrelic_service_name, event_params) + # Send metrics to otlp + otlp_instance.send_event_metric(newrelic_service_name, event_params) + otlp_instance.send_streetnetwork_call_duration_metric(event_params, duration) def statManagerEvent(call_name, group_name): @@ -212,6 +216,7 @@ def wrapper(obj, service, *args, **kwargs): except Exception as e: event_params["status"] = "failed" event_params.update({"reason": str(e)}) + otlp_instance.record_exception(e, event_params) raise finally: duration = timeit.default_timer() - start_time @@ -219,6 +224,8 @@ def wrapper(obj, service, *args, **kwargs): # Send the custom event to newrelic ! record_custom_event("stat_manager", event_params) + # Send metrics to otlp + otlp_instance.send_event_metric("stat_manager", event_params) return wrapper diff --git a/source/jormungandr/jormungandr/otlp.py b/source/jormungandr/jormungandr/otlp.py index 77afd13ddc..b3c2a16c10 100644 --- a/source/jormungandr/jormungandr/otlp.py +++ b/source/jormungandr/jormungandr/otlp.py @@ -115,6 +115,9 @@ def __declare_histograms(self) -> None: self.__jormungandr_distributed_duration = self._meter.create_histogram( name="jormungandr_distributed_duration", description="Distributed scenario duration" ) + self.__jormungandr_streetnetwork_call_duration = self._meter.create_histogram( + name="jormungandr_streetnetwork_call_duration", description="Streetnetwork call duration" + ) def get_tracer(self) -> trace.Tracer: return self._tracer @@ -189,5 +192,12 @@ def send_distributed_duration_metric(self, labels, duration) -> None: labels["environment"] = self.__environment self.__jormungandr_distributed_duration.record(duration, labels) + def send_streetnetwork_call_duration_metric(self, labels, duration) -> None: + if self._meter: + return + + labels["environment"] = self.__environment + self.__jormungandr_streetnetwork_call_duration.record(duration, labels) + otlp_instance = Otlp() From c73f084583910c0dd55f39f4b009ec93b470edc3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9my=20Abi-Khalil?= Date: Mon, 9 Dec 2024 16:44:40 +0100 Subject: [PATCH 04/17] Refs #NAV-3691 - Fix opentelemetry-exporter-otlp-proto-http version (depending with proto lib). --- source/jormungandr/requirements.txt | 3 +++ 1 file changed, 3 insertions(+) diff --git a/source/jormungandr/requirements.txt b/source/jormungandr/requirements.txt index ee6b8921ed..6a25fd7b7a 100644 --- a/source/jormungandr/requirements.txt +++ b/source/jormungandr/requirements.txt @@ -59,3 +59,6 @@ greenlet==0.4.15 ; python_version <= "3.6" pickle-mixin==1.0.2 Flask-Caching==1.7.2 boto3==1.28.82 +opentelemetry-api==1.28.2 +opentelemetry-sdk==1.28.2 +opentelemetry-exporter-otlp-proto-http==1.15.0 From 7924b123c1e3823b1df8f9f4356bb0aa3c3a4913 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9my=20Abi-Khalil?= Date: Wed, 11 Dec 2024 14:43:44 +0100 Subject: [PATCH 05/17] Refs #NAV 3691 - Clean and adjust some metrics. --- source/jormungandr/jormungandr/api.py | 17 +++-- .../autocomplete/abstract_autocomplete.py | 2 +- .../jormungandr/equipments/sytral.py | 2 +- source/jormungandr/jormungandr/exceptions.py | 16 +++++ .../external_services/external_service.py | 2 +- .../jormungandr/interfaces/v1/Journeys.py | 1 - source/jormungandr/jormungandr/new_relic.py | 13 ++-- source/jormungandr/jormungandr/otlp.py | 69 +++++++++---------- .../realtime_schedule/realtime_proxy.py | 6 +- .../ridesharing/ridesharing_service.py | 6 +- .../ridesharing_service_manager.py | 2 +- source/jormungandr/jormungandr/schedule.py | 4 +- .../street_network/street_network.py | 2 +- source/jormungandr/jormungandr/utils.py | 2 +- 14 files changed, 80 insertions(+), 64 deletions(-) diff --git a/source/jormungandr/jormungandr/api.py b/source/jormungandr/jormungandr/api.py index a37cd8c3af..d03c4f992a 100644 --- a/source/jormungandr/jormungandr/api.py +++ b/source/jormungandr/jormungandr/api.py @@ -33,7 +33,7 @@ from __future__ import absolute_import, print_function, unicode_literals, division import importlib from flask_restful.representations import json -from flask import request, make_response, abort +from flask import request, make_response, abort, g from jormungandr import rest_api, app, i_manager from jormungandr.index import index from jormungandr.modules_loader import ModulesLoader @@ -45,6 +45,7 @@ from jormungandr._version import __version__ import six from jormungandr.otlp import otlp_instance +import time @rest_api.representation("text/jsonp") @@ -127,13 +128,21 @@ def add_info_newrelic(response, *args, **kwargs): return response +@app.before_request +def set_request_id(): + otlp_instance.record_request_call_label("api", request.endpoint) + g.start = time.time() + + @app.after_request def record_request_call_to_otlp(response, *args, **kwargs): try: - token = get_token() + duration = time.time() - g.start + token = get_token() if get_token() else "unknown" user = get_user(token=token, abort_if_no_token=False) if token else None user_id = str(user.id) if user else "unknown" - token_name = get_app_name(token) if user else "unknown" + token_name = get_app_name(token) + token_name = token_name if token_name else "unknown" version = __version__ coverages = get_used_coverages() coverage = coverages[0] if coverages else "unknown" @@ -145,7 +154,7 @@ def record_request_call_to_otlp(response, *args, **kwargs): "coverage": coverage, "status": response.status_code, } - otlp_instance.send_request_call_metrics(labels) + otlp_instance.send_request_call_metrics(duration, labels) except: logger = logging.getLogger(__name__) logger.exception('error while reporting to otlp:') diff --git a/source/jormungandr/jormungandr/autocomplete/abstract_autocomplete.py b/source/jormungandr/jormungandr/autocomplete/abstract_autocomplete.py index e58f28c304..0a682af7ee 100644 --- a/source/jormungandr/jormungandr/autocomplete/abstract_autocomplete.py +++ b/source/jormungandr/jormungandr/autocomplete/abstract_autocomplete.py @@ -80,7 +80,7 @@ def record_status(self, status, exc=None): if exc is not None: data["cause"] = str(exc) record_custom_event('autocomplete_status', data) - otlp_instance.send_event_metric('autocomplete_status', data) + otlp_instance.send_event_metrics('autocomplete_status', data) def get_object_by_uri(self, uri, request_id=None, instances=None, current_datetime=None): """ diff --git a/source/jormungandr/jormungandr/equipments/sytral.py b/source/jormungandr/jormungandr/equipments/sytral.py index 99a7a08e40..0d64428613 100644 --- a/source/jormungandr/jormungandr/equipments/sytral.py +++ b/source/jormungandr/jormungandr/equipments/sytral.py @@ -119,7 +119,7 @@ def record_call(self, status, **kwargs): params = {'parking_system_id': "SytralRT", 'dataset': "sytral", 'status': status} params.update(kwargs) new_relic.record_custom_event('parking_status', params) - otlp_instance.send_event_metric('parking_status', params) + otlp_instance.send_event_metrics('parking_status', params) def _fill_equipment_details(self, equipment_form_web_service, equipment_details): equipment_details.id = equipment_form_web_service['id'] diff --git a/source/jormungandr/jormungandr/exceptions.py b/source/jormungandr/jormungandr/exceptions.py index bffbb349a1..7f4dd9d11e 100644 --- a/source/jormungandr/jormungandr/exceptions.py +++ b/source/jormungandr/jormungandr/exceptions.py @@ -33,6 +33,7 @@ import logging from jormungandr.new_relic import record_exception from jormungandr.otlp import otlp_instance +from typing import Dict __all__ = [ "RegionNotFound", @@ -50,6 +51,10 @@ def format_error(code, message): return error +def format_otlp_error(data: Dict) -> Dict: + return {"error_id": data["error"]["id"], "error_message": data["error"]["message"]} + + class RegionNotFound(HTTPException): def __init__(self, region=None, lon=None, lat=None, object_id=None, custom_msg=None): super(RegionNotFound, self).__init__() @@ -77,6 +82,7 @@ def __init__(self, region=None, lon=None, lat=None, object_id=None, custom_msg=N self.data = format_error("unknown_object", "Invalid id : {id}".format(id=object_id)) else: self.data = format_error("unknown_object", "Unable to parse region") + otlp_instance.record_exception(self, format_otlp_error(self.data)) def __str__(self): return repr(self.data['message']) @@ -88,6 +94,7 @@ def __init__(self, region, path): error = 'The region {} is dead'.format(region) self.data = format_error("dead_socket", error) self.code = 503 + otlp_instance.record_exception(self, format_otlp_error(self.data)) class ApiNotFound(HTTPException): @@ -96,6 +103,7 @@ def __init__(self, api): error = 'The api {} doesn\'t exist'.format(api) self.data = format_error("unknown_object", error) self.code = 404 + otlp_instance.record_exception(self, format_otlp_error(self.data)) class UnknownObject(HTTPException): @@ -104,6 +112,7 @@ def __init__(self, msg): error = 'The object {} doesn\'t exist'.format(msg) self.data = format_error("unknown_object", error) self.code = 404 + otlp_instance.record_exception(self, format_otlp_error(self.data)) class InvalidArguments(HTTPException): @@ -111,6 +120,7 @@ def __init__(self, arg): super(InvalidArguments, self).__init__() self.data = format_error("unknown_object", "Invalid arguments " + arg) self.code = 400 + otlp_instance.record_exception(self, format_otlp_error(self.data)) class UnableToParse(HTTPException): @@ -118,6 +128,7 @@ def __init__(self, msg): super(UnableToParse, self).__init__() self.data = format_error("unable_to_parse", msg) self.code = 400 + otlp_instance.record_exception(self, format_otlp_error(self.data)) class TechnicalError(HTTPException): @@ -125,6 +136,7 @@ def __init__(self, msg): super(TechnicalError, self).__init__() self.data = format_error("technical_error", msg) self.code = 500 + otlp_instance.record_exception(self, format_otlp_error(self.data)) # Only used by geovelo streetnetwork @@ -133,6 +145,7 @@ def __init__(self, msg): super(GeoveloTechnicalError, self).__init__() self.data = format_error("technical_error", msg) self.code = 500 + otlp_instance.record_exception(self, format_otlp_error(self.data)) class HandimapTechnicalError(HTTPException): @@ -140,6 +153,7 @@ def __init__(self, msg): super(HandimapTechnicalError, self).__init__() self.data = format_error("technical_error", msg) self.code = 500 + otlp_instance.record_exception(self, format_otlp_error(self.data)) class AndyamoTechnicalError(HTTPException): @@ -147,6 +161,7 @@ def __init__(self, msg): super(AndyamoTechnicalError, self).__init__() self.data = format_error("technical_error", msg) self.code = 500 + otlp_instance.record_exception(self, format_otlp_error(self.data)) class ConfigException(Exception): @@ -154,6 +169,7 @@ def __init__(self, arg): super(ConfigException, self).__init__(arg) self.data = format_error("config_exception", "Invalid config " + arg) self.code = 400 + otlp_instance.record_exception(self, format_otlp_error(self.data)) def log_exception(sender, exception, **extra): diff --git a/source/jormungandr/jormungandr/external_services/external_service.py b/source/jormungandr/jormungandr/external_services/external_service.py index e0172f9731..d4fe42cd81 100644 --- a/source/jormungandr/jormungandr/external_services/external_service.py +++ b/source/jormungandr/jormungandr/external_services/external_service.py @@ -80,7 +80,7 @@ def record_call(self, url, status, **kwargs): params = {'external_service_id': "Forseti", 'status': status, 'external_service_url': url} params.update(kwargs) new_relic.record_custom_event('external_service_status', params) - otlp_instance.send_event_metric('external_service_status', params) + otlp_instance.send_event_metrics('external_service_status', params) @abc.abstractmethod def get_response(self, arguments): diff --git a/source/jormungandr/jormungandr/interfaces/v1/Journeys.py b/source/jormungandr/jormungandr/interfaces/v1/Journeys.py index ad92d55014..5bc31595b8 100644 --- a/source/jormungandr/jormungandr/interfaces/v1/Journeys.py +++ b/source/jormungandr/jormungandr/interfaces/v1/Journeys.py @@ -484,7 +484,6 @@ def wrapper(*args, **kwargs): class Journeys(JourneyCommon): def __init__(self): # journeys must have a custom authentication process - super(Journeys, self).__init__(output_type_serializer=api.JourneysSerializer) parser_get = self.parsers["get"] diff --git a/source/jormungandr/jormungandr/new_relic.py b/source/jormungandr/jormungandr/new_relic.py index 7bc457984b..189a3f006d 100644 --- a/source/jormungandr/jormungandr/new_relic.py +++ b/source/jormungandr/jormungandr/new_relic.py @@ -158,10 +158,8 @@ def wrapper(obj, service, *args, **kwargs): # Send the custom event to newrelic ! record_custom_event("distributed", event_params) - - event_params.pop("duration") - otlp_instance.send_event_metric("distributed", event_params) - otlp_instance.send_distributed_duration_metric(event_params, duration) + # Send metrics to otlp + otlp_instance.send_event_metrics("distributed", event_params) return result @@ -170,6 +168,7 @@ def wrapper(obj, service, *args, **kwargs): return wrap +# TODO: Update and move this function into otlp.py when we will remove newrelic @contextmanager def record_streetnetwork_call(call_name, connector_name, mode, coverage_name): """ @@ -195,10 +194,10 @@ def record_streetnetwork_call(call_name, connector_name, mode, coverage_name): # Send the custom event to newrelic ! record_custom_event(newrelic_service_name, event_params) # Send metrics to otlp - otlp_instance.send_event_metric(newrelic_service_name, event_params) - otlp_instance.send_streetnetwork_call_duration_metric(event_params, duration) + otlp_instance.send_event_metrics(newrelic_service_name, event_params) +# TODO: Update and move this function into otlp.py when we will remove newrelic def statManagerEvent(call_name, group_name): """ Custom event that we publish to New Relic for stat_manager @@ -225,7 +224,7 @@ def wrapper(obj, service, *args, **kwargs): # Send the custom event to newrelic ! record_custom_event("stat_manager", event_params) # Send metrics to otlp - otlp_instance.send_event_metric("stat_manager", event_params) + otlp_instance.send_event_metrics("stat_manager", event_params) return wrapper diff --git a/source/jormungandr/jormungandr/otlp.py b/source/jormungandr/jormungandr/otlp.py index b3c2a16c10..5baac71f8b 100644 --- a/source/jormungandr/jormungandr/otlp.py +++ b/source/jormungandr/jormungandr/otlp.py @@ -29,6 +29,7 @@ # https://groups.google.com/d/forum/navitia # www.navitia.io +import os import logging from typing import Dict @@ -44,7 +45,6 @@ from opentelemetry.trace.status import Status, StatusCode from opentelemetry import trace, metrics -import os class OtlpMeta(type): @@ -59,13 +59,14 @@ def __call__(cls, *args, **kwds): class Otlp(metaclass=OtlpMeta): __service_name = "jormungandr" + __platform = "unknown" __request_call_labels = {} - def __init__(self) -> None: + def __init__(self, platform: str) -> None: self.__log = logging.getLogger(__name__) try: - self.__environment = os.getenv("ENVIRONMENT", "local") + self.__platform = platform + " (Python)" self.__resource = Resource( attributes={ SERVICE_NAME: self.__service_name, @@ -112,11 +113,11 @@ def __declare_counters(self) -> None: ) def __declare_histograms(self) -> None: - self.__jormungandr_distributed_duration = self._meter.create_histogram( - name="jormungandr_distributed_duration", description="Distributed scenario duration" + self.__jormungandr_event_duration = self._meter.create_histogram( + name="jormungandr_event_duration", description="Event duration" ) - self.__jormungandr_streetnetwork_call_duration = self._meter.create_histogram( - name="jormungandr_streetnetwork_call_duration", description="Streetnetwork call duration" + self.__jormungandr_request_call_duration = self._meter.create_histogram( + name="jormungandr_request_call_duration", description="Request call duration", unit="s" ) def get_tracer(self) -> trace.Tracer: @@ -131,12 +132,15 @@ def __record_exception_trace(self, exception: BaseException, attributes: Dict = navitia_request_id = 42 try: - span = trace.get_current_span() - span.set_attribute("navitia_request_id", str(navitia_request_id)) - for key, value in attributes.items(): - span.set_attribute(key, value) - span.set_status(Status(StatusCode.ERROR, "Exception")) - span.record_exception(exception) + with self._tracer.start_as_current_span("exception") as span: + span.set_attribute("navitia_request_id", str(navitia_request_id)) + for key, value in attributes.items(): + span.set_attribute(key, value) + for key, value in self.__request_call_labels.items(): + span.set_attribute(key, value) + span.set_status(Status(StatusCode.ERROR, "Exception")) + span.record_exception(exception) + except Exception: self.__log.exception("failure while reporting to otlp (with trace)") @@ -144,10 +148,7 @@ def __record_exception_meter(self, exception: BaseException) -> None: try: self.__jormungandr_exception.add( 1, - { - "exception_type": type(exception).__name__, - "status": Status(StatusCode.ERROR, "Exception"), - }, + {"exception_type": type(exception).__name__}, ) except Exception: self.__log.exception("failure while reporting to otlp (with meter)") @@ -159,16 +160,17 @@ def record_exception(self, exception: BaseException, attributes: Dict = {}) -> N if self._meter: self.__record_exception_meter(exception) - def send_request_call_metrics(self, labels=None) -> None: + def send_request_call_metrics(self, duration, labels=None) -> None: if not self._meter: return if labels: self.record_request_call_labels(labels) - self.__request_call_labels["environment"] = self.__environment - - self.__jormungandr_request_call.add(1, self.__request_call_labels) + self.record_request_call_label("platform", self.__platform) + labels = self.__request_call_labels.copy() + self.__jormungandr_request_call.add(1, labels) + self.__jormungandr_request_call_duration.record(duration, labels) self.__request_call_labels.clear() def record_request_call_labels(self, labels: Dict) -> None: @@ -177,27 +179,18 @@ def record_request_call_labels(self, labels: Dict) -> None: def record_request_call_label(self, label_name: str, label_value: str) -> None: self.__request_call_labels[label_name] = label_value - def send_event_metric(self, event_type: str, labels: Dict = {}) -> None: + def send_event_metrics(self, event_type: str, labels: Dict = {}) -> None: if not self._meter: return - labels["environment"] = self.__environment + labels["platform"] = self.__platform labels["event_type"] = event_type + if "navitia_request_id" in labels: + labels.pop("navitia_request_id") + if "duration" in labels: + duration = labels.pop("duration", None) + self.__jormungandr_event_duration.record(duration, labels) self.__jormungandr_event.add(1, labels) - def send_distributed_duration_metric(self, labels, duration) -> None: - if self._meter: - return - - labels["environment"] = self.__environment - self.__jormungandr_distributed_duration.record(duration, labels) - - def send_streetnetwork_call_duration_metric(self, labels, duration) -> None: - if self._meter: - return - - labels["environment"] = self.__environment - self.__jormungandr_streetnetwork_call_duration.record(duration, labels) - -otlp_instance = Otlp() +otlp_instance = Otlp(os.getenv("OTEL_PLATFORM")) diff --git a/source/jormungandr/jormungandr/realtime_schedule/realtime_proxy.py b/source/jormungandr/jormungandr/realtime_schedule/realtime_proxy.py index 2107c934d7..6fa3deb911 100644 --- a/source/jormungandr/jormungandr/realtime_schedule/realtime_proxy.py +++ b/source/jormungandr/jormungandr/realtime_schedule/realtime_proxy.py @@ -282,7 +282,7 @@ def record_internal_failure(self, message, comment=None): if comment is not None: params['comment'] = comment new_relic.record_custom_event('realtime_internal_failure', params) - otlp_instance.send_event_metric('realtime_internal_failure', params) + otlp_instance.send_event_metrics('realtime_internal_failure', params) def record_call(self, status, **kwargs): """ @@ -291,7 +291,7 @@ def record_call(self, status, **kwargs): params = {'realtime_system_id': six.text_type(self.rt_system_id), 'status': status} params.update(kwargs) new_relic.record_custom_event('realtime_status', params) - otlp_instance.send_event_metric('realtime_status', params) + otlp_instance.send_event_metrics('realtime_status', params) def record_additional_info(self, status, **kwargs): """ @@ -300,7 +300,7 @@ def record_additional_info(self, status, **kwargs): params = {'realtime_system_id': six.text_type(self.rt_system_id), 'status': status} params.update(kwargs) new_relic.record_custom_event('realtime_proxy_additional_info', params) - otlp_instance.send_event_metric('realtime_proxy_additional_info', params) + otlp_instance.send_event_metrics('realtime_proxy_additional_info', params) @cache.memoize(app.config.get(str('CACHE_CONFIGURATION'), {}).get(str('TIMEOUT_PTOBJECTS'), 600)) def _get_direction(self, line_uri, object_code, default_value): diff --git a/source/jormungandr/jormungandr/scenarios/ridesharing/ridesharing_service.py b/source/jormungandr/jormungandr/scenarios/ridesharing/ridesharing_service.py index 18b5759060..e6438400d6 100644 --- a/source/jormungandr/jormungandr/scenarios/ridesharing/ridesharing_service.py +++ b/source/jormungandr/jormungandr/scenarios/ridesharing/ridesharing_service.py @@ -171,7 +171,7 @@ def record_internal_failure(self, message): 'ridesharing_service_url': self.service_url, } new_relic.record_custom_event('ridesharing_internal_failure', params) - otlp_instance.send_event_metric('ridesharing_internal_failure', params) + otlp_instance.send_event_metrics('ridesharing_internal_failure', params) def record_call(self, status, **kwargs): """ @@ -184,7 +184,7 @@ def record_call(self, status, **kwargs): } params.update(kwargs) new_relic.record_custom_event('ridesharing_status', params) - otlp_instance.send_event_metric('ridesharing_status', params) + otlp_instance.send_event_metrics('ridesharing_status', params) def record_additional_info(self, status, **kwargs): """ @@ -197,7 +197,7 @@ def record_additional_info(self, status, **kwargs): } params.update(kwargs) new_relic.record_custom_event('ridesharing_proxy_additional_info', params) - otlp_instance.send_event_metric('ridesharing_proxy_additional_info', params) + otlp_instance.send_event_metrics('ridesharing_proxy_additional_info', params) def __eq__(self, other): return all( diff --git a/source/jormungandr/jormungandr/scenarios/ridesharing/ridesharing_service_manager.py b/source/jormungandr/jormungandr/scenarios/ridesharing/ridesharing_service_manager.py index 1924d7d1ad..010da92819 100644 --- a/source/jormungandr/jormungandr/scenarios/ridesharing/ridesharing_service_manager.py +++ b/source/jormungandr/jormungandr/scenarios/ridesharing/ridesharing_service_manager.py @@ -289,7 +289,7 @@ def build_ridesharing_journeys(self, from_pt_obj, to_pt_obj, request_dates, inst ) params = {'message': str(e)} new_relic.record_custom_event('ridesharing_internal_failure', params) - otlp_instance.send_event_metric('ridesharing_internal_failure') + otlp_instance.send_event_metrics('ridesharing_internal_failure') otlp_instance.record_exception(e, params) rsjs = [] fps = [] diff --git a/source/jormungandr/jormungandr/schedule.py b/source/jormungandr/jormungandr/schedule.py index 09f0f16db2..1151d1c95b 100644 --- a/source/jormungandr/jormungandr/schedule.py +++ b/source/jormungandr/jormungandr/schedule.py @@ -217,7 +217,7 @@ def _get_realtime_proxy(self, route_point): log.info('impossible to find {}, no realtime added'.format(rt_system_code)) params = {'rt_system_id': rt_system_code, 'message': 'no handler found'} new_relic.record_custom_event('realtime_internal_failure', params) - otlp_instance.send_event_metric('realtime_internal_failure', params) + otlp_instance.send_event_metrics('realtime_internal_failure', params) return None return rt_system @@ -241,7 +241,7 @@ def _get_next_realtime_passages(self, rt_system, route_point, request): ) params = {'rt_system_id': six.text_type(rt_system.rt_system_id), 'message': str(e)} new_relic.record_custom_event('realtime_internal_failure', params) - otlp_instance.send_event_metric('realtime_internal_failure', params) + otlp_instance.send_event_metrics('realtime_internal_failure', params) if next_rt_passages is None: log.debug('no next passages, using base schedule') diff --git a/source/jormungandr/jormungandr/street_network/street_network.py b/source/jormungandr/jormungandr/street_network/street_network.py index c146b64580..d314c37a41 100644 --- a/source/jormungandr/jormungandr/street_network/street_network.py +++ b/source/jormungandr/jormungandr/street_network/street_network.py @@ -156,7 +156,7 @@ def record_call(self, status, **kwargs): params = {'streetnetwork_id': six.text_type(self.sn_system_id), 'status': status} params.update(kwargs) new_relic.record_custom_event('streetnetwork', params) - otlp_instance.send_event_metric('streetnetwork', params) + otlp_instance.send_event_metrics('streetnetwork', params) def _add_feed_publisher(self, resp): sn_feed = self.feed_publisher() diff --git a/source/jormungandr/jormungandr/utils.py b/source/jormungandr/jormungandr/utils.py index d99001a414..05d355f964 100644 --- a/source/jormungandr/jormungandr/utils.py +++ b/source/jormungandr/jormungandr/utils.py @@ -637,7 +637,7 @@ def get_first_pt_section(journey): def record_external_failure(message, connector_type, connector_name): params = {'{}_system_id'.format(connector_type): six.text_type(connector_name), 'message': message} new_relic.record_custom_event('{}_external_failure'.format(connector_type), params) - otlp_instance.send_event_metric('{}_external_failure'.format(connector_type), params) + otlp_instance.send_event_metrics('{}_external_failure'.format(connector_type), params) def decode_polyline(encoded, precision=6): From 885b6f126dce73e955d340b6b74df06f03cfaaaf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9my=20Abi-Khalil?= Date: Wed, 11 Dec 2024 16:22:28 +0100 Subject: [PATCH 06/17] Refs #NAV 3691 - Clean and adjust some metrics (for another dashboard). --- source/jormungandr/jormungandr/api.py | 12 +++++----- source/jormungandr/jormungandr/otlp.py | 22 +++++++++---------- .../jormungandr/scenarios/distributed.py | 2 +- .../jormungandr/scenarios/new_default.py | 2 +- 4 files changed, 19 insertions(+), 19 deletions(-) diff --git a/source/jormungandr/jormungandr/api.py b/source/jormungandr/jormungandr/api.py index d03c4f992a..a309a62451 100644 --- a/source/jormungandr/jormungandr/api.py +++ b/source/jormungandr/jormungandr/api.py @@ -130,7 +130,12 @@ def add_info_newrelic(response, *args, **kwargs): @app.before_request def set_request_id(): - otlp_instance.record_request_call_label("api", request.endpoint) + otlp_instance.record_label("api", request.endpoint) + otlp_instance.record_label("version", __version__) + coverages = get_used_coverages() + coverage = coverages[0] if coverages else "unknown" + otlp_instance.record_label("coverage", coverage) + g.start = time.time() @@ -143,15 +148,10 @@ def record_request_call_to_otlp(response, *args, **kwargs): user_id = str(user.id) if user else "unknown" token_name = get_app_name(token) token_name = token_name if token_name else "unknown" - version = __version__ - coverages = get_used_coverages() - coverage = coverages[0] if coverages else "unknown" labels = { "token": token, "user_id": user_id, "token_name": token_name, - "version": version, - "coverage": coverage, "status": response.status_code, } otlp_instance.send_request_call_metrics(duration, labels) diff --git a/source/jormungandr/jormungandr/otlp.py b/source/jormungandr/jormungandr/otlp.py index 5baac71f8b..73849f0875 100644 --- a/source/jormungandr/jormungandr/otlp.py +++ b/source/jormungandr/jormungandr/otlp.py @@ -60,7 +60,7 @@ def __call__(cls, *args, **kwds): class Otlp(metaclass=OtlpMeta): __service_name = "jormungandr" __platform = "unknown" - __request_call_labels = {} + __labels = {} def __init__(self, platform: str) -> None: self.__log = logging.getLogger(__name__) @@ -136,7 +136,7 @@ def __record_exception_trace(self, exception: BaseException, attributes: Dict = span.set_attribute("navitia_request_id", str(navitia_request_id)) for key, value in attributes.items(): span.set_attribute(key, value) - for key, value in self.__request_call_labels.items(): + for key, value in self.__labels.items(): span.set_attribute(key, value) span.set_status(Status(StatusCode.ERROR, "Exception")) span.record_exception(exception) @@ -165,26 +165,26 @@ def send_request_call_metrics(self, duration, labels=None) -> None: return if labels: - self.record_request_call_labels(labels) + self.record_labels(labels) - self.record_request_call_label("platform", self.__platform) - labels = self.__request_call_labels.copy() + self.record_label("platform", self.__platform) + labels = self.__labels.copy() self.__jormungandr_request_call.add(1, labels) self.__jormungandr_request_call_duration.record(duration, labels) - self.__request_call_labels.clear() + self.__labels.clear() - def record_request_call_labels(self, labels: Dict) -> None: - self.__request_call_labels.update(labels) + def record_labels(self, labels: Dict) -> None: + self.__labels.update(labels) - def record_request_call_label(self, label_name: str, label_value: str) -> None: - self.__request_call_labels[label_name] = label_value + def record_label(self, label_name: str, label_value: str) -> None: + self.__labels[label_name] = label_value def send_event_metrics(self, event_type: str, labels: Dict = {}) -> None: if not self._meter: return - labels["platform"] = self.__platform labels["event_type"] = event_type + labels.update(self.__labels) if "navitia_request_id" in labels: labels.pop("navitia_request_id") if "duration" in labels: diff --git a/source/jormungandr/jormungandr/scenarios/distributed.py b/source/jormungandr/jormungandr/scenarios/distributed.py index 2ab2ec112f..a7c6c26506 100644 --- a/source/jormungandr/jormungandr/scenarios/distributed.py +++ b/source/jormungandr/jormungandr/scenarios/distributed.py @@ -500,7 +500,7 @@ def call_kraken( context=None, ): record_custom_parameter('scenario', 'distributed') - otlp_instance.record_request_call_label('scenario', 'distributed') + otlp_instance.record_label('scenario', 'distributed') logger = logging.getLogger(__name__) """ All spawned futures must be started(if they're not yet started) when leaving the scope. diff --git a/source/jormungandr/jormungandr/scenarios/new_default.py b/source/jormungandr/jormungandr/scenarios/new_default.py index e980c1c68b..e003f8f8f9 100644 --- a/source/jormungandr/jormungandr/scenarios/new_default.py +++ b/source/jormungandr/jormungandr/scenarios/new_default.py @@ -1567,7 +1567,7 @@ def call_kraken( # TODO: handle min_alternative_journeys # TODO: call first bss|bss and do not call walking|walking if no bss in first results record_custom_parameter('scenario', 'new_default') - otlp_instance.record_request_call_label('scenario', 'new_default') + otlp_instance.record_label('scenario', 'new_default') resp = [] logger = logging.getLogger(__name__) futures = [] From b1321344b58348a8a1e228e20c3e5a3f2512e954 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9my=20Abi-Khalil?= Date: Tue, 17 Dec 2024 10:42:08 +0100 Subject: [PATCH 07/17] Refs #NAV-3691 - Fix 401 error --- source/jormungandr/jormungandr/api.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/jormungandr/jormungandr/api.py b/source/jormungandr/jormungandr/api.py index a309a62451..9d3ea1a412 100644 --- a/source/jormungandr/jormungandr/api.py +++ b/source/jormungandr/jormungandr/api.py @@ -144,7 +144,7 @@ def record_request_call_to_otlp(response, *args, **kwargs): try: duration = time.time() - g.start token = get_token() if get_token() else "unknown" - user = get_user(token=token, abort_if_no_token=False) if token else None + user = get_user(token=token, abort_if_no_token=False) if token != "unknown" else None user_id = str(user.id) if user else "unknown" token_name = get_app_name(token) token_name = token_name if token_name else "unknown" From 9a15e67895639653269678f56a2212324118f41f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9my=20Abi-Khalil?= Date: Mon, 23 Dec 2024 17:02:05 +0100 Subject: [PATCH 08/17] Refs #NAV-3691 - Add try except into before and after request call --- source/jormungandr/jormungandr/api.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/source/jormungandr/jormungandr/api.py b/source/jormungandr/jormungandr/api.py index 9d3ea1a412..cd123989e3 100644 --- a/source/jormungandr/jormungandr/api.py +++ b/source/jormungandr/jormungandr/api.py @@ -130,13 +130,17 @@ def add_info_newrelic(response, *args, **kwargs): @app.before_request def set_request_id(): - otlp_instance.record_label("api", request.endpoint) - otlp_instance.record_label("version", __version__) - coverages = get_used_coverages() - coverage = coverages[0] if coverages else "unknown" - otlp_instance.record_label("coverage", coverage) + try: + g.start = time.time() - g.start = time.time() + otlp_instance.record_label("api", request.endpoint) + otlp_instance.record_label("version", __version__) + coverages = get_used_coverages() + coverage = coverages[0] if coverages else "unknown" + otlp_instance.record_label("coverage", coverage) + except: + logger = logging.getLogger(__name__) + logger.exception('error while reporting to otlp from app.before_request') @app.after_request @@ -157,7 +161,7 @@ def record_request_call_to_otlp(response, *args, **kwargs): otlp_instance.send_request_call_metrics(duration, labels) except: logger = logging.getLogger(__name__) - logger.exception('error while reporting to otlp:') + logger.exception('error while reporting to otlp from app.after_request') return response From d14a00199073cfc28ebfa8cbf40185cfe5c25bdf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9my=20Abi-Khalil?= Date: Wed, 22 Jan 2025 11:55:34 +0100 Subject: [PATCH 09/17] Refs #NAV-3691 - Update otlp service (add account label) --- source/jormungandr/jormungandr/otlp.py | 93 ++++++++++++++++++-------- 1 file changed, 64 insertions(+), 29 deletions(-) diff --git a/source/jormungandr/jormungandr/otlp.py b/source/jormungandr/jormungandr/otlp.py index 73849f0875..dc45dddfc3 100644 --- a/source/jormungandr/jormungandr/otlp.py +++ b/source/jormungandr/jormungandr/otlp.py @@ -32,7 +32,6 @@ import os import logging from typing import Dict - from flask import request from opentelemetry.sdk.resources import SERVICE_NAME, Resource @@ -43,7 +42,6 @@ from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter from opentelemetry.trace.status import Status, StatusCode - from opentelemetry import trace, metrics @@ -60,13 +58,21 @@ def __call__(cls, *args, **kwds): class Otlp(metaclass=OtlpMeta): __service_name = "jormungandr" __platform = "unknown" + __account = "unknown" __labels = {} - def __init__(self, platform: str) -> None: + def __init__(self, platform: str, account: str) -> None: self.__log = logging.getLogger(__name__) + self._tracer = None + self._meter = None + + if not os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT"): + self.__log.info("OTLP not configured. Disabling otlp.") + return try: self.__platform = platform + " (Python)" + self.__account = account self.__resource = Resource( attributes={ SERVICE_NAME: self.__service_name, @@ -78,7 +84,7 @@ def __init__(self, platform: str) -> None: self.__declare_counters() self.__declare_histograms() except Exception: - self.__log.exception("failure while initializing otlp") + self.__log.exception("Failure while initializing otlp. Disabling otlp.") self._tracer = None self._meter = None @@ -123,20 +129,53 @@ def __declare_histograms(self) -> None: def get_tracer(self) -> trace.Tracer: return self._tracer - def __record_exception_trace(self, exception: BaseException, attributes: Dict = {}) -> None: - # TODO: Can remove this and use directly request.id below ? Check if it works on SBX (remove this before merge !) - try: - navitia_request_id = request.id - except RuntimeError: - self.__log.exception("failure while getting request id. We are outside of a flask context :(") - navitia_request_id = 42 + def __get_request_id(self) -> str: + if not request: + return "unknown" + + return str(request.id) + + def __should_ignore(self) -> bool: + ignore_paths = ["/status", "/"] + + return request.path in ignore_paths + + def __get_labels(self) -> Dict: + if self.__get_request_id() not in self.__labels: + self.__labels[self.__get_request_id()] = self.__generate_default_labels() + + return self.__labels[self.__get_request_id()] + + def record_labels(self, labels: Dict) -> None: + if self.__should_ignore(): + return + + self.__get_labels().update(labels) + def record_label(self, label_name: str, label_value: str) -> None: + if self.__should_ignore(): + return + + self.__get_labels()[label_name] = label_value + + def __clear_labels(self) -> None: + self.__get_labels().clear() + self.__labels.pop(self.__get_request_id(), None) + + def __generate_default_labels(self) -> Dict: + return { + "event_type": "unknown", + "platform": self.__platform, + "account": self.__account, + } + + def __record_exception_trace(self, exception: BaseException, attributes: Dict = {}) -> None: try: with self._tracer.start_as_current_span("exception") as span: - span.set_attribute("navitia_request_id", str(navitia_request_id)) + span.set_attribute("navitia_request_id", self.__get_request_id()) for key, value in attributes.items(): span.set_attribute(key, value) - for key, value in self.__labels.items(): + for key, value in self.__get_labels().items(): span.set_attribute(key, value) span.set_status(Status(StatusCode.ERROR, "Exception")) span.record_exception(exception) @@ -146,10 +185,11 @@ def __record_exception_trace(self, exception: BaseException, attributes: Dict = def __record_exception_meter(self, exception: BaseException) -> None: try: - self.__jormungandr_exception.add( - 1, - {"exception_type": type(exception).__name__}, - ) + labels = {"exception_type": type(exception).__name__} + for key, value in self.__get_labels().items(): + labels[key] = value + + self.__jormungandr_exception.add(1, labels) except Exception: self.__log.exception("failure while reporting to otlp (with meter)") @@ -168,29 +208,24 @@ def send_request_call_metrics(self, duration, labels=None) -> None: self.record_labels(labels) self.record_label("platform", self.__platform) - labels = self.__labels.copy() + labels = self.__get_labels().copy() self.__jormungandr_request_call.add(1, labels) self.__jormungandr_request_call_duration.record(duration, labels) - self.__labels.clear() - - def record_labels(self, labels: Dict) -> None: - self.__labels.update(labels) - - def record_label(self, label_name: str, label_value: str) -> None: - self.__labels[label_name] = label_value + self.__clear_labels() def send_event_metrics(self, event_type: str, labels: Dict = {}) -> None: if not self._meter: return - labels["platform"] = self.__platform - labels["event_type"] = event_type - labels.update(self.__labels) + + labels = self.__get_labels().copy() + labels.update("event_type", event_type) if "navitia_request_id" in labels: labels.pop("navitia_request_id") if "duration" in labels: duration = labels.pop("duration", None) self.__jormungandr_event_duration.record(duration, labels) self.__jormungandr_event.add(1, labels) + self.__clear_labels() -otlp_instance = Otlp(os.getenv("OTEL_PLATFORM")) +otlp_instance = Otlp(os.getenv("OTEL_PLATFORM"), os.getenv("OTEL_ACCOUNT")) From fad62d0334f2fb85a9b731b1a5a5890ba24d5404 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9my=20Abi-Khalil?= Date: Thu, 23 Jan 2025 15:01:10 +0100 Subject: [PATCH 10/17] Fix minor syntax --- .../jormungandr/jormungandr/scenarios/distributed.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/source/jormungandr/jormungandr/scenarios/distributed.py b/source/jormungandr/jormungandr/scenarios/distributed.py index a7c6c26506..15adde44ff 100644 --- a/source/jormungandr/jormungandr/scenarios/distributed.py +++ b/source/jormungandr/jormungandr/scenarios/distributed.py @@ -592,13 +592,13 @@ def graphical_isochrones(self, request, instance): if request.get("max_duration") is None: request["max_duration"] = max(request["boundary_duration[]"], key=int) if request.get('additional_time_after_first_section_taxi') is None: - request['additional_time_after_first_section_taxi'] = ( - instance.additional_time_after_first_section_taxi - ) + request[ + 'additional_time_after_first_section_taxi' + ] = instance.additional_time_after_first_section_taxi if request.get('additional_time_before_last_section_taxi') is None: - request['additional_time_before_last_section_taxi'] = ( - instance.additional_time_after_first_section_taxi - ) + request[ + 'additional_time_before_last_section_taxi' + ] = instance.additional_time_after_first_section_taxi if request.get('on_street_bike_parking_duration') is None: request['on_street_bike_parking_duration'] = instance.on_street_bike_parking_duration From c5e1a0d921f59ce2b9414058fcbea27c05d91b8e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9my=20Abi-Khalil?= Date: Fri, 24 Jan 2025 14:57:40 +0100 Subject: [PATCH 11/17] Refs #NAV-3691 - Update missing otlp call. --- source/jormungandr/jormungandr/exceptions.py | 2 +- source/jormungandr/jormungandr/otlp.py | 10 +++++----- .../bss/common_bss_provider.py | 2 +- .../car/common_car_park_provider.py | 2 +- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/source/jormungandr/jormungandr/exceptions.py b/source/jormungandr/jormungandr/exceptions.py index 7f4dd9d11e..fe3ebd64a0 100644 --- a/source/jormungandr/jormungandr/exceptions.py +++ b/source/jormungandr/jormungandr/exceptions.py @@ -46,7 +46,7 @@ ] -def format_error(code, message): +def format_error(code: str, message: str) -> Dict: error = {"error": {"id": code, "message": message}, "message": message} return error diff --git a/source/jormungandr/jormungandr/otlp.py b/source/jormungandr/jormungandr/otlp.py index dc45dddfc3..fd7a071ee8 100644 --- a/source/jormungandr/jormungandr/otlp.py +++ b/source/jormungandr/jormungandr/otlp.py @@ -56,10 +56,10 @@ def __call__(cls, *args, **kwds): class Otlp(metaclass=OtlpMeta): - __service_name = "jormungandr" - __platform = "unknown" - __account = "unknown" - __labels = {} + __service_name: str = "jormungandr" + __platform: str = "unknown" + __account: str = "unknown" + __labels: Dict = {} def __init__(self, platform: str, account: str) -> None: self.__log = logging.getLogger(__name__) @@ -218,7 +218,7 @@ def send_event_metrics(self, event_type: str, labels: Dict = {}) -> None: return labels = self.__get_labels().copy() - labels.update("event_type", event_type) + labels["event_type"] = event_type if "navitia_request_id" in labels: labels.pop("navitia_request_id") if "duration" in labels: diff --git a/source/jormungandr/jormungandr/parking_space_availability/bss/common_bss_provider.py b/source/jormungandr/jormungandr/parking_space_availability/bss/common_bss_provider.py index 98f0246a19..436468e77e 100644 --- a/source/jormungandr/jormungandr/parking_space_availability/bss/common_bss_provider.py +++ b/source/jormungandr/jormungandr/parking_space_availability/bss/common_bss_provider.py @@ -60,4 +60,4 @@ def record_call(self, status, **kwargs): params = {'bss_system_id': six.text_type(self.network), 'status': status} params.update(kwargs) new_relic.record_custom_event('bss_status', params) - otlp_instance.record_custom_event('bss_status', params) + otlp_instance.send_event_metrics('bss_status', params) diff --git a/source/jormungandr/jormungandr/parking_space_availability/car/common_car_park_provider.py b/source/jormungandr/jormungandr/parking_space_availability/car/common_car_park_provider.py index 8a5ca004f6..51f652565b 100644 --- a/source/jormungandr/jormungandr/parking_space_availability/car/common_car_park_provider.py +++ b/source/jormungandr/jormungandr/parking_space_availability/car/common_car_park_provider.py @@ -121,4 +121,4 @@ def record_call(self, status, **kwargs): params = {'parking_system_id': self.provider_name, 'dataset': self.dataset, 'status': status} params.update(kwargs) new_relic.record_custom_event('parking_status', params) - otlp_instance.record_custom_event('parking_status', params) + otlp_instance.send_event_metrics('parking_status', params) From 0528e10b24bf4c74deca28fa35d5ac08e57f4509 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9my=20Abi-Khalil?= Date: Wed, 29 Jan 2025 11:27:06 +0100 Subject: [PATCH 12/17] Refs #NAV-3691 - Fix event metrics. --- source/jormungandr/jormungandr/api.py | 28 +++++++++---------- .../autocomplete/abstract_autocomplete.py | 1 - source/jormungandr/jormungandr/otlp.py | 26 +++++++++-------- 3 files changed, 28 insertions(+), 27 deletions(-) diff --git a/source/jormungandr/jormungandr/api.py b/source/jormungandr/jormungandr/api.py index cd123989e3..38de445a89 100644 --- a/source/jormungandr/jormungandr/api.py +++ b/source/jormungandr/jormungandr/api.py @@ -128,21 +128,6 @@ def add_info_newrelic(response, *args, **kwargs): return response -@app.before_request -def set_request_id(): - try: - g.start = time.time() - - otlp_instance.record_label("api", request.endpoint) - otlp_instance.record_label("version", __version__) - coverages = get_used_coverages() - coverage = coverages[0] if coverages else "unknown" - otlp_instance.record_label("coverage", coverage) - except: - logger = logging.getLogger(__name__) - logger.exception('error while reporting to otlp from app.before_request') - - @app.after_request def record_request_call_to_otlp(response, *args, **kwargs): try: @@ -165,6 +150,19 @@ def record_request_call_to_otlp(response, *args, **kwargs): return response +@app.before_request +def set_request_id(): + try: + g.start = time.time() + + otlp_instance.record_label("api", request.endpoint) + otlp_instance.record_label("version", __version__) + coverages = ", ".join(sorted(get_used_coverages())) if get_used_coverages() else "unknown" + otlp_instance.record_label("coverage", coverages) + except: + logger = logging.getLogger(__name__) + logger.exception('error while reporting to otlp from app.before_request') + # If modules are configured, then load and run them if 'MODULES' in rest_api.app.config: diff --git a/source/jormungandr/jormungandr/autocomplete/abstract_autocomplete.py b/source/jormungandr/jormungandr/autocomplete/abstract_autocomplete.py index 0a682af7ee..e143896104 100644 --- a/source/jormungandr/jormungandr/autocomplete/abstract_autocomplete.py +++ b/source/jormungandr/jormungandr/autocomplete/abstract_autocomplete.py @@ -36,7 +36,6 @@ from jormungandr.new_relic import record_custom_event from jormungandr.otlp import otlp_instance - class AutocompleteError(RuntimeError): pass diff --git a/source/jormungandr/jormungandr/otlp.py b/source/jormungandr/jormungandr/otlp.py index fd7a071ee8..00cd595a59 100644 --- a/source/jormungandr/jormungandr/otlp.py +++ b/source/jormungandr/jormungandr/otlp.py @@ -141,10 +141,12 @@ def __should_ignore(self) -> bool: return request.path in ignore_paths def __get_labels(self) -> Dict: - if self.__get_request_id() not in self.__labels: - self.__labels[self.__get_request_id()] = self.__generate_default_labels() + request_id = self.__get_request_id() - return self.__labels[self.__get_request_id()] + if request_id not in self.__labels: + self.__labels[request_id] = self.__generate_default_labels() + + return self.__labels[request_id] def record_labels(self, labels: Dict) -> None: if self.__should_ignore(): @@ -164,7 +166,8 @@ def __clear_labels(self) -> None: def __generate_default_labels(self) -> Dict: return { - "event_type": "unknown", + "coverage": "unknown", + "api": "unknown", "platform": self.__platform, "account": self.__account, } @@ -179,7 +182,6 @@ def __record_exception_trace(self, exception: BaseException, attributes: Dict = span.set_attribute(key, value) span.set_status(Status(StatusCode.ERROR, "Exception")) span.record_exception(exception) - except Exception: self.__log.exception("failure while reporting to otlp (with trace)") @@ -206,26 +208,28 @@ def send_request_call_metrics(self, duration, labels=None) -> None: if labels: self.record_labels(labels) - - self.record_label("platform", self.__platform) labels = self.__get_labels().copy() self.__jormungandr_request_call.add(1, labels) self.__jormungandr_request_call_duration.record(duration, labels) self.__clear_labels() - def send_event_metrics(self, event_type: str, labels: Dict = {}) -> None: + def send_event_metrics(self, event_type: str, params: Dict = {}) -> None: if not self._meter: return - labels = self.__get_labels().copy() - labels["event_type"] = event_type + labels = { + "platform": self.__platform, + "account": self.__account, + "event_type": event_type + } + labels.update(params) + if "navitia_request_id" in labels: labels.pop("navitia_request_id") if "duration" in labels: duration = labels.pop("duration", None) self.__jormungandr_event_duration.record(duration, labels) self.__jormungandr_event.add(1, labels) - self.__clear_labels() otlp_instance = Otlp(os.getenv("OTEL_PLATFORM"), os.getenv("OTEL_ACCOUNT")) From 76451bbf95453252428150eab346df3915039a53 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9my=20Abi-Khalil?= Date: Wed, 29 Jan 2025 15:53:26 +0100 Subject: [PATCH 13/17] Refs #NAV-3691 - Fix black syntax --- source/jormungandr/jormungandr/api.py | 3 ++- .../jormungandr/autocomplete/abstract_autocomplete.py | 1 + source/jormungandr/jormungandr/otlp.py | 9 +++------ 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/source/jormungandr/jormungandr/api.py b/source/jormungandr/jormungandr/api.py index 38de445a89..fc9aad16a9 100644 --- a/source/jormungandr/jormungandr/api.py +++ b/source/jormungandr/jormungandr/api.py @@ -150,6 +150,7 @@ def record_request_call_to_otlp(response, *args, **kwargs): return response + @app.before_request def set_request_id(): try: @@ -157,7 +158,7 @@ def set_request_id(): otlp_instance.record_label("api", request.endpoint) otlp_instance.record_label("version", __version__) - coverages = ", ".join(sorted(get_used_coverages())) if get_used_coverages() else "unknown" + coverages = ", ".join(sorted(get_used_coverages())) if get_used_coverages() else "unknown" otlp_instance.record_label("coverage", coverages) except: logger = logging.getLogger(__name__) diff --git a/source/jormungandr/jormungandr/autocomplete/abstract_autocomplete.py b/source/jormungandr/jormungandr/autocomplete/abstract_autocomplete.py index e143896104..0a682af7ee 100644 --- a/source/jormungandr/jormungandr/autocomplete/abstract_autocomplete.py +++ b/source/jormungandr/jormungandr/autocomplete/abstract_autocomplete.py @@ -36,6 +36,7 @@ from jormungandr.new_relic import record_custom_event from jormungandr.otlp import otlp_instance + class AutocompleteError(RuntimeError): pass diff --git a/source/jormungandr/jormungandr/otlp.py b/source/jormungandr/jormungandr/otlp.py index 00cd595a59..1351dd1b2f 100644 --- a/source/jormungandr/jormungandr/otlp.py +++ b/source/jormungandr/jormungandr/otlp.py @@ -65,8 +65,9 @@ def __init__(self, platform: str, account: str) -> None: self.__log = logging.getLogger(__name__) self._tracer = None self._meter = None + otel_exporter_otlp_endpoint = os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT", "") - if not os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT"): + if otel_exporter_otlp_endpoint == "": self.__log.info("OTLP not configured. Disabling otlp.") return @@ -217,11 +218,7 @@ def send_event_metrics(self, event_type: str, params: Dict = {}) -> None: if not self._meter: return - labels = { - "platform": self.__platform, - "account": self.__account, - "event_type": event_type - } + labels = {"platform": self.__platform, "account": self.__account, "event_type": event_type} labels.update(params) if "navitia_request_id" in labels: From 232745e4ff2112e55c6dd2d70637bf2a7d11b40f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9my=20Abi-Khalil?= Date: Wed, 29 Jan 2025 16:05:28 +0100 Subject: [PATCH 14/17] Refs #NAV-3691 - Fix mypy check. --- source/jormungandr/jormungandr/exceptions.py | 4 ++-- source/jormungandr/jormungandr/otlp.py | 2 ++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/source/jormungandr/jormungandr/exceptions.py b/source/jormungandr/jormungandr/exceptions.py index fe3ebd64a0..8ef33023be 100644 --- a/source/jormungandr/jormungandr/exceptions.py +++ b/source/jormungandr/jormungandr/exceptions.py @@ -46,12 +46,12 @@ ] -def format_error(code: str, message: str) -> Dict: +def format_error(code, message): error = {"error": {"id": code, "message": message}, "message": message} return error -def format_otlp_error(data: Dict) -> Dict: +def format_otlp_error(data): return {"error_id": data["error"]["id"], "error_message": data["error"]["message"]} diff --git a/source/jormungandr/jormungandr/otlp.py b/source/jormungandr/jormungandr/otlp.py index 1351dd1b2f..db2d20c6eb 100644 --- a/source/jormungandr/jormungandr/otlp.py +++ b/source/jormungandr/jormungandr/otlp.py @@ -120,6 +120,8 @@ def __declare_counters(self) -> None: ) def __declare_histograms(self) -> None: + self._meter = metrics.get_meter(__name__) + self.__jormungandr_event_duration = self._meter.create_histogram( name="jormungandr_event_duration", description="Event duration" ) From 8847298c2a2fc1e0e5e212df267ccaa51e9f8bd0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9my=20Abi-Khalil?= Date: Wed, 29 Jan 2025 16:43:12 +0100 Subject: [PATCH 15/17] Refs #NAV-3691 - Update pre commit configuration for otlp.py. --- .pre-commit-config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 08c62871d1..82c2fc5b69 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -33,4 +33,4 @@ repos: args: [--ignore-missing-imports, --py2, --follow-imports, skip] files: source language_version: python3.9 - exclude: /monitor/|/third_party/|/tests/|/sql/|env\.py$|setup\.py$ + exclude: /monitor/|/third_party/|/tests/|/sql/|env\.py$|setup\.py|otlp\.py$ From 01ed9adca6f3a851161b25257c6efd2cbf39097c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9my=20Abi-Khalil?= Date: Fri, 7 Feb 2025 08:56:57 +0100 Subject: [PATCH 16/17] Refs #NAV-3691 - Remove metaclass for otlp object. --- source/jormungandr/jormungandr/otlp.py | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/source/jormungandr/jormungandr/otlp.py b/source/jormungandr/jormungandr/otlp.py index db2d20c6eb..2a9ca86a04 100644 --- a/source/jormungandr/jormungandr/otlp.py +++ b/source/jormungandr/jormungandr/otlp.py @@ -45,17 +45,7 @@ from opentelemetry import trace, metrics -class OtlpMeta(type): - _instance: Dict = {} - - def __call__(cls, *args, **kwds): - if cls not in cls._instance: - cls._instance[cls] = super().__call__(*args, **kwds) - - return cls._instance[cls] - - -class Otlp(metaclass=OtlpMeta): +class Otlp: __service_name: str = "jormungandr" __platform: str = "unknown" __account: str = "unknown" From e64b0fd9a890f49431413a3408b0280a2fbd8809 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9my=20Abi-Khalil?= Date: Fri, 7 Feb 2025 10:56:37 +0100 Subject: [PATCH 17/17] Refs #NAV-3691 - Update record label of coverages. --- source/jormungandr/jormungandr/api.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/source/jormungandr/jormungandr/api.py b/source/jormungandr/jormungandr/api.py index fc9aad16a9..2b8720a310 100644 --- a/source/jormungandr/jormungandr/api.py +++ b/source/jormungandr/jormungandr/api.py @@ -128,6 +128,12 @@ def add_info_newrelic(response, *args, **kwargs): return response +def __get_otlp_coverages_label(): + used_coverages = get_used_coverages() + + return ", ".join(sorted(used_coverages)) if used_coverages else "unknown" + + @app.after_request def record_request_call_to_otlp(response, *args, **kwargs): try: @@ -142,6 +148,7 @@ def record_request_call_to_otlp(response, *args, **kwargs): "user_id": user_id, "token_name": token_name, "status": response.status_code, + "coverages": __get_otlp_coverages_label(), } otlp_instance.send_request_call_metrics(duration, labels) except: @@ -158,8 +165,7 @@ def set_request_id(): otlp_instance.record_label("api", request.endpoint) otlp_instance.record_label("version", __version__) - coverages = ", ".join(sorted(get_used_coverages())) if get_used_coverages() else "unknown" - otlp_instance.record_label("coverage", coverages) + otlp_instance.record_label("coverage", __get_otlp_coverages_label()) except: logger = logging.getLogger(__name__) logger.exception('error while reporting to otlp from app.before_request')