From 62c041758343900cee6970a4bc6a80b4fb1d182f Mon Sep 17 00:00:00 2001 From: subham sarkar Date: Fri, 13 Dec 2024 16:16:05 +0530 Subject: [PATCH] x-pack/metricbeat/module/openai: Add new module (#41516) (cherry picked from commit 93b018a8362139eeb6b81bde9c43374c7ad1cd46) --- .github/CODEOWNERS | 1 + metricbeat/docs/fields.asciidoc | 367 +++++++++++++ metricbeat/docs/modules/openai.asciidoc | 75 +++ metricbeat/docs/modules/openai/usage.asciidoc | 29 ++ metricbeat/docs/modules_list.asciidoc | 3 + x-pack/agentbeat/agentbeat.spec.yml | 5 + x-pack/metricbeat/include/list.go | 2 + x-pack/metricbeat/metricbeat.reference.yml | 36 ++ .../metricbeat/module/openai/_meta/config.yml | 34 ++ .../module/openai/_meta/docs.asciidoc | 2 + .../metricbeat/module/openai/_meta/fields.yml | 10 + x-pack/metricbeat/module/openai/doc.go | 6 + x-pack/metricbeat/module/openai/fields.go | 23 + .../module/openai/usage/_meta/data.json | 37 ++ .../module/openai/usage/_meta/docs.asciidoc | 1 + .../module/openai/usage/_meta/fields.yml | 156 ++++++ .../metricbeat/module/openai/usage/client.go | 71 +++ .../metricbeat/module/openai/usage/config.go | 94 ++++ .../metricbeat/module/openai/usage/errors.go | 10 + .../metricbeat/module/openai/usage/helper.go | 30 ++ .../module/openai/usage/persistcache.go | 187 +++++++ .../metricbeat/module/openai/usage/schema.go | 69 +++ .../metricbeat/module/openai/usage/usage.go | 387 ++++++++++++++ .../openai/usage/usage_integration_test.go | 482 ++++++++++++++++++ .../metricbeat/modules.d/openai.yml.disabled | 37 ++ 25 files changed, 2154 insertions(+) create mode 100644 metricbeat/docs/modules/openai.asciidoc create mode 100644 metricbeat/docs/modules/openai/usage.asciidoc create mode 100644 x-pack/metricbeat/module/openai/_meta/config.yml create mode 100644 x-pack/metricbeat/module/openai/_meta/docs.asciidoc create mode 100644 x-pack/metricbeat/module/openai/_meta/fields.yml create mode 100644 x-pack/metricbeat/module/openai/doc.go create mode 100644 x-pack/metricbeat/module/openai/fields.go create mode 100644 x-pack/metricbeat/module/openai/usage/_meta/data.json create mode 100644 x-pack/metricbeat/module/openai/usage/_meta/docs.asciidoc create mode 100644 x-pack/metricbeat/module/openai/usage/_meta/fields.yml create mode 100644 x-pack/metricbeat/module/openai/usage/client.go create mode 100644 x-pack/metricbeat/module/openai/usage/config.go create mode 100644 x-pack/metricbeat/module/openai/usage/errors.go create mode 100644 x-pack/metricbeat/module/openai/usage/helper.go create mode 100644 x-pack/metricbeat/module/openai/usage/persistcache.go create mode 100644 x-pack/metricbeat/module/openai/usage/schema.go create mode 100644 x-pack/metricbeat/module/openai/usage/usage.go create mode 100644 x-pack/metricbeat/module/openai/usage/usage_integration_test.go create mode 100644 x-pack/metricbeat/modules.d/openai.yml.disabled diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 7d3d09d1e78..1a8fd0a0088 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -223,6 +223,7 @@ CHANGELOG* /x-pack/metricbeat/module/iis @elastic/obs-infraobs-integrations /x-pack/metricbeat/module/istio/ @elastic/obs-cloudnative-monitoring /x-pack/metricbeat/module/mssql @elastic/obs-infraobs-integrations +/x-pack/metricbeat/module/openai @elastic/obs-infraobs-integrations /x-pack/metricbeat/module/oracle @elastic/obs-infraobs-integrations /x-pack/metricbeat/module/panw @elastic/obs-infraobs-integrations /x-pack/metricbeat/module/prometheus/ @elastic/obs-cloudnative-monitoring diff --git a/metricbeat/docs/fields.asciidoc b/metricbeat/docs/fields.asciidoc index 967c20f14cf..103bdd71835 100644 --- a/metricbeat/docs/fields.asciidoc +++ b/metricbeat/docs/fields.asciidoc @@ -68,6 +68,7 @@ grouped in the following categories: * <> * <> * <> +* <> * <> * <> * <> @@ -56706,6 +56707,372 @@ type: long -- +[[exported-fields-openai]] +== openai fields + +openai module + + + +[float] +=== openai + + + + +[float] +=== usage + +OpenAI API usage metrics and statistics + + + +*`openai.usage.organization_id`*:: ++ +-- +Organization identifier + +type: keyword + +-- + +*`openai.usage.organization_name`*:: ++ +-- +Organization name + +type: keyword + +-- + +*`openai.usage.api_key_id`*:: ++ +-- +API key identifier + +type: keyword + +-- + +*`openai.usage.api_key_name`*:: ++ +-- +API key name + +type: keyword + +-- + +*`openai.usage.api_key_redacted`*:: ++ +-- +Redacted API key + +type: keyword + +-- + +*`openai.usage.api_key_type`*:: ++ +-- +Type of API key + +type: keyword + +-- + +*`openai.usage.project_id`*:: ++ +-- +Project identifier + +type: keyword + +-- + +*`openai.usage.project_name`*:: ++ +-- +Project name + +type: keyword + +-- + +[float] +=== data + +General usage data metrics + + + +*`openai.usage.data.requests_total`*:: ++ +-- +Number of requests made + +type: long + +-- + +*`openai.usage.data.operation`*:: ++ +-- +Operation type + +type: keyword + +-- + +*`openai.usage.data.snapshot_id`*:: ++ +-- +Snapshot identifier + +type: keyword + +-- + +*`openai.usage.data.context_tokens_total`*:: ++ +-- +Total number of context tokens used + +type: long + +-- + +*`openai.usage.data.generated_tokens_total`*:: ++ +-- +Total number of generated tokens + +type: long + +-- + +*`openai.usage.data.cached_context_tokens_total`*:: ++ +-- +Total number of cached context tokens + +type: long + +-- + +*`openai.usage.data.email`*:: ++ +-- +User email + +type: keyword + +-- + +*`openai.usage.data.request_type`*:: ++ +-- +Type of request + +type: keyword + +-- + +[float] +=== dalle + +DALL-E API usage metrics + + + +*`openai.usage.dalle.num_images`*:: ++ +-- +Number of images generated + +type: long + +-- + +*`openai.usage.dalle.requests_total`*:: ++ +-- +Number of requests + +type: long + +-- + +*`openai.usage.dalle.image_size`*:: ++ +-- +Size of generated images + +type: keyword + +-- + +*`openai.usage.dalle.operation`*:: ++ +-- +Operation type + +type: keyword + +-- + +*`openai.usage.dalle.user_id`*:: ++ +-- +User identifier + +type: keyword + +-- + +*`openai.usage.dalle.model_id`*:: ++ +-- +Model identifier + +type: keyword + +-- + +[float] +=== whisper + +Whisper API usage metrics + + + +*`openai.usage.whisper.model_id`*:: ++ +-- +Model identifier + +type: keyword + +-- + +*`openai.usage.whisper.num_seconds`*:: ++ +-- +Number of seconds processed + +type: long + +-- + +*`openai.usage.whisper.requests_total`*:: ++ +-- +Number of requests + +type: long + +-- + +*`openai.usage.whisper.user_id`*:: ++ +-- +User identifier + +type: keyword + +-- + +[float] +=== tts + +Text-to-Speech API usage metrics + + + +*`openai.usage.tts.model_id`*:: ++ +-- +Model identifier + +type: keyword + +-- + +*`openai.usage.tts.num_characters`*:: ++ +-- +Number of characters processed + +type: long + +-- + +*`openai.usage.tts.requests_total`*:: ++ +-- +Number of requests + +type: long + +-- + +*`openai.usage.tts.user_id`*:: ++ +-- +User identifier + +type: keyword + +-- + +[float] +=== ft_data + +Fine-tuning data metrics + + + +*`openai.usage.ft_data.original`*:: ++ +-- +Raw fine-tuning data + +type: object + +-- + +[float] +=== assistant_code_interpreter + +Assistant Code Interpreter usage metrics + + + +*`openai.usage.assistant_code_interpreter.original`*:: ++ +-- +Raw assistant code interpreter data + +type: object + +-- + +[float] +=== retrieval_storage + +Retrieval storage usage metrics + + + +*`openai.usage.retrieval_storage.original`*:: ++ +-- +Raw retrieval storage data + +type: object + +-- + [[exported-fields-openmetrics]] == Openmetrics fields diff --git a/metricbeat/docs/modules/openai.asciidoc b/metricbeat/docs/modules/openai.asciidoc new file mode 100644 index 00000000000..ce5fd3e0ccc --- /dev/null +++ b/metricbeat/docs/modules/openai.asciidoc @@ -0,0 +1,75 @@ +//// +This file is generated! See scripts/mage/docs_collector.go +//// + +:modulename: openai +:edit_url: https://github.com/elastic/beats/edit/main/x-pack/metricbeat/module/openai/_meta/docs.asciidoc + + +[[metricbeat-module-openai]] +[role="xpack"] +== openai module + +beta[] + +This is the openai module. + + + +:edit_url: + +[float] +=== Example configuration + +The openai module supports the standard configuration options that are described +in <>. Here is an example configuration: + +[source,yaml] +---- +metricbeat.modules: +- module: openai + metricsets: ["usage"] + enabled: false + period: 1h + + # # Project API Keys - Multiple API keys can be specified for different projects + # api_keys: + # - key: "api_key1" + # - key: "api_key2" + + # # API Configuration + # ## Base URL for the OpenAI usage API endpoint + # api_url: "https://api.openai.com/v1/usage" + # ## Custom headers to be included in API requests + # headers: + # - "k1: v1" + # - "k2: v2" + ## Rate Limiting Configuration + # rate_limit: + # limit: 12 # seconds between requests + # burst: 1 # max concurrent requests + # ## Request Timeout Duration + # timeout: 30s + + # # Data Collection Configuration + # collection: + # ## Number of days to look back when collecting usage data + # lookback_days: 30 + # ## Whether to collect usage data in realtime. Defaults to false as how + # # OpenAI usage data is collected will end up adding duplicate data to ES + # # and also making it harder to do analytics. Best approach is to avoid + # # realtime collection and collect only upto last day (in UTC). So, there's + # # at most 24h delay. + # realtime: false +---- + +[float] +=== Metricsets + +The following metricsets are available: + +* <> + +include::openai/usage.asciidoc[] + +:edit_url!: diff --git a/metricbeat/docs/modules/openai/usage.asciidoc b/metricbeat/docs/modules/openai/usage.asciidoc new file mode 100644 index 00000000000..69d0ba313d9 --- /dev/null +++ b/metricbeat/docs/modules/openai/usage.asciidoc @@ -0,0 +1,29 @@ +//// +This file is generated! See scripts/mage/docs_collector.go +//// +:edit_url: https://github.com/elastic/beats/edit/main/x-pack/metricbeat/module/openai/usage/_meta/docs.asciidoc + + +[[metricbeat-metricset-openai-usage]] +[role="xpack"] +=== openai usage metricset + +beta[] + +include::../../../../x-pack/metricbeat/module/openai/usage/_meta/docs.asciidoc[] + + +:edit_url: + +==== Fields + +For a description of each field in the metricset, see the +<> section. + +Here is an example document generated by this metricset: + +[source,json] +---- +include::../../../../x-pack/metricbeat/module/openai/usage/_meta/data.json[] +---- +:edit_url!: \ No newline at end of file diff --git a/metricbeat/docs/modules_list.asciidoc b/metricbeat/docs/modules_list.asciidoc index abf04650f2c..ae81c25e5fc 100644 --- a/metricbeat/docs/modules_list.asciidoc +++ b/metricbeat/docs/modules_list.asciidoc @@ -242,6 +242,8 @@ This file is generated! See scripts/mage/docs_collector.go |<> |<> |image:./images/icon-yes.png[Prebuilt dashboards are available] | .1+| .1+| |<> +|<> beta[] |image:./images/icon-no.png[No prebuilt dashboards] | +.1+| .1+| |<> beta[] |<> beta[] |image:./images/icon-no.png[No prebuilt dashboards] | .1+| .1+| |<> beta[] |<> |image:./images/icon-yes.png[Prebuilt dashboards are available] | @@ -384,6 +386,7 @@ include::modules/munin.asciidoc[] include::modules/mysql.asciidoc[] include::modules/nats.asciidoc[] include::modules/nginx.asciidoc[] +include::modules/openai.asciidoc[] include::modules/openmetrics.asciidoc[] include::modules/oracle.asciidoc[] include::modules/panw.asciidoc[] diff --git a/x-pack/agentbeat/agentbeat.spec.yml b/x-pack/agentbeat/agentbeat.spec.yml index a2af5ce6bf8..5b6c33e4ceb 100644 --- a/x-pack/agentbeat/agentbeat.spec.yml +++ b/x-pack/agentbeat/agentbeat.spec.yml @@ -507,6 +507,11 @@ inputs: platforms: *platforms outputs: *outputs command: *metricbeat_command + - name: openai/metrics + description: "OpenAI metrics" + platforms: *platforms + outputs: *outputs + command: *metricbeat_command - name: panw/metrics description: "Palo Alto Networks metrics" platforms: *platforms diff --git a/x-pack/metricbeat/include/list.go b/x-pack/metricbeat/include/list.go index b9b426cf0ad..af3b1e9425d 100644 --- a/x-pack/metricbeat/include/list.go +++ b/x-pack/metricbeat/include/list.go @@ -55,6 +55,8 @@ import ( _ "github.com/elastic/beats/v7/x-pack/metricbeat/module/mssql" _ "github.com/elastic/beats/v7/x-pack/metricbeat/module/mssql/performance" _ "github.com/elastic/beats/v7/x-pack/metricbeat/module/mssql/transaction_log" + _ "github.com/elastic/beats/v7/x-pack/metricbeat/module/openai" + _ "github.com/elastic/beats/v7/x-pack/metricbeat/module/openai/usage" _ "github.com/elastic/beats/v7/x-pack/metricbeat/module/oracle" _ "github.com/elastic/beats/v7/x-pack/metricbeat/module/oracle/performance" _ "github.com/elastic/beats/v7/x-pack/metricbeat/module/oracle/sysmetric" diff --git a/x-pack/metricbeat/metricbeat.reference.yml b/x-pack/metricbeat/metricbeat.reference.yml index 8c4250caece..a3dad62e8a6 100644 --- a/x-pack/metricbeat/metricbeat.reference.yml +++ b/x-pack/metricbeat/metricbeat.reference.yml @@ -1279,6 +1279,42 @@ metricbeat.modules: # Path to server status. Default nginx_status server_status_path: "nginx_status" +#-------------------------------- Openai Module -------------------------------- +- module: openai + metricsets: ["usage"] + enabled: false + period: 1h + + # # Project API Keys - Multiple API keys can be specified for different projects + # api_keys: + # - key: "api_key1" + # - key: "api_key2" + + # # API Configuration + # ## Base URL for the OpenAI usage API endpoint + # api_url: "https://api.openai.com/v1/usage" + # ## Custom headers to be included in API requests + # headers: + # - "k1: v1" + # - "k2: v2" + ## Rate Limiting Configuration + # rate_limit: + # limit: 12 # seconds between requests + # burst: 1 # max concurrent requests + # ## Request Timeout Duration + # timeout: 30s + + # # Data Collection Configuration + # collection: + # ## Number of days to look back when collecting usage data + # lookback_days: 30 + # ## Whether to collect usage data in realtime. Defaults to false as how + # # OpenAI usage data is collected will end up adding duplicate data to ES + # # and also making it harder to do analytics. Best approach is to avoid + # # realtime collection and collect only upto last day (in UTC). So, there's + # # at most 24h delay. + # realtime: false + #----------------------------- Openmetrics Module ----------------------------- - module: openmetrics metricsets: ['collector'] diff --git a/x-pack/metricbeat/module/openai/_meta/config.yml b/x-pack/metricbeat/module/openai/_meta/config.yml new file mode 100644 index 00000000000..a34fd7b183d --- /dev/null +++ b/x-pack/metricbeat/module/openai/_meta/config.yml @@ -0,0 +1,34 @@ +- module: openai + metricsets: ["usage"] + enabled: false + period: 1h + + # # Project API Keys - Multiple API keys can be specified for different projects + # api_keys: + # - key: "api_key1" + # - key: "api_key2" + + # # API Configuration + # ## Base URL for the OpenAI usage API endpoint + # api_url: "https://api.openai.com/v1/usage" + # ## Custom headers to be included in API requests + # headers: + # - "k1: v1" + # - "k2: v2" + ## Rate Limiting Configuration + # rate_limit: + # limit: 12 # seconds between requests + # burst: 1 # max concurrent requests + # ## Request Timeout Duration + # timeout: 30s + + # # Data Collection Configuration + # collection: + # ## Number of days to look back when collecting usage data + # lookback_days: 30 + # ## Whether to collect usage data in realtime. Defaults to false as how + # # OpenAI usage data is collected will end up adding duplicate data to ES + # # and also making it harder to do analytics. Best approach is to avoid + # # realtime collection and collect only upto last day (in UTC). So, there's + # # at most 24h delay. + # realtime: false diff --git a/x-pack/metricbeat/module/openai/_meta/docs.asciidoc b/x-pack/metricbeat/module/openai/_meta/docs.asciidoc new file mode 100644 index 00000000000..744909c7a1a --- /dev/null +++ b/x-pack/metricbeat/module/openai/_meta/docs.asciidoc @@ -0,0 +1,2 @@ +This is the openai module. + diff --git a/x-pack/metricbeat/module/openai/_meta/fields.yml b/x-pack/metricbeat/module/openai/_meta/fields.yml new file mode 100644 index 00000000000..d514eb010f1 --- /dev/null +++ b/x-pack/metricbeat/module/openai/_meta/fields.yml @@ -0,0 +1,10 @@ +- key: openai + title: "openai" + release: beta + description: > + openai module + fields: + - name: openai + type: group + description: > + fields: diff --git a/x-pack/metricbeat/module/openai/doc.go b/x-pack/metricbeat/module/openai/doc.go new file mode 100644 index 00000000000..5f2f07fb0bc --- /dev/null +++ b/x-pack/metricbeat/module/openai/doc.go @@ -0,0 +1,6 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +// Package openai is a Metricbeat module that contains MetricSets. +package openai diff --git a/x-pack/metricbeat/module/openai/fields.go b/x-pack/metricbeat/module/openai/fields.go new file mode 100644 index 00000000000..de875dbdaab --- /dev/null +++ b/x-pack/metricbeat/module/openai/fields.go @@ -0,0 +1,23 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +// Code generated by beats/dev-tools/cmd/asset/asset.go - DO NOT EDIT. + +package openai + +import ( + "github.com/elastic/beats/v7/libbeat/asset" +) + +func init() { + if err := asset.SetFields("metricbeat", "openai", asset.ModuleFieldsPri, AssetOpenai); err != nil { + panic(err) + } +} + +// AssetOpenai returns asset data. +// This is the base64 encoded zlib format compressed contents of module/openai. +func AssetOpenai() string { + return "eJzsWMtu4zgQvPsrGtnL7sHYuw8BjOwDAbKbIMlgjgIttmWOKVJDtuI4Xz+gJNp6UHZsyRMEGB5FsqpZrK6YmcIatzPQGSomJgAkSOIMrsoPVxMAgxKZxRkskNgEgKONjchIaDWD6wkAVLsh1TyXOAFYCpTczoq5KSiWYo3BDdpmOIPE6DyrvgRQmzh1rNyyBHdfQ3ButAv3I0hVjvsM1fwW5g+3JQekSEbEFpjiYImRsCRiW9vTLtGN3+BGp6lWsGAWqxXACIzWBBJfUDaW7xQyCVPijbnCIsEba/wZ17jdaNOea5zovgYDgqMisRRojlO6T6OQdoA8HctEtMbtgMO5m1nj9ti5PNGQI3mqo6cxyFlMePaZHqv9nvEgmUM9l+h5myHo5UGezOhvGNOAK3ooEY5dkScackWeqoHRbcVMolv/582KUdXXnDUiYV9WZ6IvXTrFXLcmAf5FhYbJGqXPk9bSUIjUizL4PUdLNiJNTHaW+RKlVklgslHl/3m6QONc4EEhZbx9A43YNkVX97KG76lDfO+BIGDhPZ9VLLMrHTDgyYxPFVSfE+u0sVaErxSRXqMaQ+dnhwBqp3ZFACUB5LaTF/tiksI3hPxy5ewoqoL6hWHxCnl0eX0KnpZMvVVhykQ//zv98cWi6UFqNV4odU+m8+lbYfbl1V/zu7vp3yBSlxnVNbmuCSXHPrSkDIfoGalVFdD5DXRiZqk8jYpDdG/xvLwqwfbO/Zi07GUtyouseBtslCfxhs0e7dHxozI6t2hGyOei/96RzanmKEfg+8/hhAhbHfh1JWyGBmyGGK+mpKdFJB1qwE25ZawW9BUM7cGLK9cmdE1vMdaKj9X1FZr7uRijPfRX82Pa/sK90LLmM76SM+RTYc2DlqRO0WfbscX6KV0Zr5hxrywzljH3gL+8WXhzzrlwG5gsnzqOw8Lvhm3AkjYswT+CRl1SNOaj6x+hcEq5EioZ8uTSRiRCHbgrvXBPz8B0ORGdJPEj28CyVXhY593/A6wVlpiiKNYcI6EITWaQxvsjNPcMcKM5wu2eYVD3/3xhd1KBkwpqUr1DZ+MOiS9MRpWJx5L30QP77vhkqppO/YWYPwIAAP//Z/rS7Q==" +} diff --git a/x-pack/metricbeat/module/openai/usage/_meta/data.json b/x-pack/metricbeat/module/openai/usage/_meta/data.json new file mode 100644 index 00000000000..da78c73e036 --- /dev/null +++ b/x-pack/metricbeat/module/openai/usage/_meta/data.json @@ -0,0 +1,37 @@ +{ + "@timestamp": "2017-10-12T08:05:34.853Z", + "event": { + "dataset": "openai.usage", + "duration": 115000, + "module": "openai" + }, + "metricset": { + "name": "usage", + "period": 10000 + }, + "openai": { + "usage": { + "api_key_id": null, + "api_key_name": null, + "api_key_redacted": null, + "api_key_type": null, + "data": { + "cached_context_tokens_total": 0, + "context_tokens_total": 118, + "email": null, + "generated_tokens_total": 35, + "operation": "completion-realtime", + "request_type": "", + "requests_total": 1, + "snapshot_id": "gpt-4o-realtime-preview-2024-10-01" + }, + "organization_id": "org-dummy", + "organization_name": "Personal", + "project_id": null, + "project_name": null + } + }, + "service": { + "type": "openai" + } +} \ No newline at end of file diff --git a/x-pack/metricbeat/module/openai/usage/_meta/docs.asciidoc b/x-pack/metricbeat/module/openai/usage/_meta/docs.asciidoc new file mode 100644 index 00000000000..dc88baf82a0 --- /dev/null +++ b/x-pack/metricbeat/module/openai/usage/_meta/docs.asciidoc @@ -0,0 +1 @@ +This is the usage metricset of the module openai. diff --git a/x-pack/metricbeat/module/openai/usage/_meta/fields.yml b/x-pack/metricbeat/module/openai/usage/_meta/fields.yml new file mode 100644 index 00000000000..c25fe30c17b --- /dev/null +++ b/x-pack/metricbeat/module/openai/usage/_meta/fields.yml @@ -0,0 +1,156 @@ +- name: usage + type: group + release: beta + description: > + OpenAI API usage metrics and statistics + fields: + # Common base fields at root level + - name: organization_id + type: keyword + description: Organization identifier + - name: organization_name + type: keyword + description: Organization name + - name: api_key_id + type: keyword + description: API key identifier + - name: api_key_name + type: keyword + description: API key name + - name: api_key_redacted + type: keyword + description: Redacted API key + - name: api_key_type + type: keyword + description: Type of API key + - name: project_id + type: keyword + description: Project identifier + - name: project_name + type: keyword + description: Project name + + # Completion/Chat usage data + - name: data + type: group + description: > + General usage data metrics + fields: + - name: requests_total + type: long + description: Number of requests made + - name: operation + type: keyword + description: Operation type + - name: snapshot_id + type: keyword + description: Snapshot identifier + - name: context_tokens_total + type: long + description: Total number of context tokens used + - name: generated_tokens_total + type: long + description: Total number of generated tokens + - name: cached_context_tokens_total + type: long + description: Total number of cached context tokens + - name: email + type: keyword + description: User email + - name: request_type + type: keyword + description: Type of request + + # DALL-E image generation metrics + - name: dalle + type: group + description: > + DALL-E API usage metrics + fields: + - name: num_images + type: long + description: Number of images generated + - name: requests_total + type: long + description: Number of requests + - name: image_size + type: keyword + description: Size of generated images + - name: operation + type: keyword + description: Operation type + - name: user_id + type: keyword + description: User identifier + - name: model_id + type: keyword + description: Model identifier + + # Whisper speech-to-text metrics + - name: whisper + type: group + description: > + Whisper API usage metrics + fields: + - name: model_id + type: keyword + description: Model identifier + - name: num_seconds + type: long + description: Number of seconds processed + - name: requests_total + type: long + description: Number of requests + - name: user_id + type: keyword + description: User identifier + + # Text-to-Speech metrics + - name: tts + type: group + description: > + Text-to-Speech API usage metrics + fields: + - name: model_id + type: keyword + description: Model identifier + - name: num_characters + type: long + description: Number of characters processed + - name: requests_total + type: long + description: Number of requests + - name: user_id + type: keyword + description: User identifier + + # Additional data types (raw storage) + - name: ft_data + type: group + description: > + Fine-tuning data metrics + fields: + - name: original + type: object + object_type: keyword + description: Raw fine-tuning data + + - name: assistant_code_interpreter + type: group + description: > + Assistant Code Interpreter usage metrics + fields: + - name: original + type: object + object_type: keyword + description: Raw assistant code interpreter data + + - name: retrieval_storage + type: group + description: > + Retrieval storage usage metrics + fields: + - name: original + type: object + object_type: keyword + description: Raw retrieval storage data diff --git a/x-pack/metricbeat/module/openai/usage/client.go b/x-pack/metricbeat/module/openai/usage/client.go new file mode 100644 index 00000000000..3995386c30e --- /dev/null +++ b/x-pack/metricbeat/module/openai/usage/client.go @@ -0,0 +1,71 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package usage + +import ( + "fmt" + "net/http" + "time" + + "golang.org/x/time/rate" + + "github.com/elastic/elastic-agent-libs/logp" +) + +// RLHTTPClient wraps the standard http.Client with a rate limiter to control API request frequency. +type RLHTTPClient struct { + client *http.Client + logger *logp.Logger + Ratelimiter *rate.Limiter +} + +// Do executes an HTTP request while respecting rate limits. +// It waits for rate limit token before proceeding with the request. +// Returns the HTTP response and any error encountered. +func (c *RLHTTPClient) Do(req *http.Request) (*http.Response, error) { + start := time.Now() + + c.logger.Debug("Waiting for rate limit token") + + err := c.Ratelimiter.Wait(req.Context()) + if err != nil { + return nil, fmt.Errorf("failed to acquire rate limit token: %w", err) + } + + c.logger.Debug("Rate limit token acquired") + + waitDuration := time.Since(start) + + if waitDuration > time.Minute { + c.logger.Infof("Rate limit wait exceeded threshold: %v", waitDuration) + } + + resp, err := c.client.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to execute request: %w", err) + } + + return resp, nil +} + +// newClient creates a new rate-limited HTTP client with specified rate limiter and timeout. +func newClient(logger *logp.Logger, rl *rate.Limiter, timeout time.Duration) *RLHTTPClient { + transport := &http.Transport{ + MaxIdleConns: 100, + MaxIdleConnsPerHost: 100, + IdleConnTimeout: 90 * time.Second, + } + + client := &http.Client{ + Timeout: timeout, + Transport: transport, + } + + return &RLHTTPClient{ + client: client, + logger: logger, + Ratelimiter: rl, + } +} diff --git a/x-pack/metricbeat/module/openai/usage/config.go b/x-pack/metricbeat/module/openai/usage/config.go new file mode 100644 index 00000000000..f930a013747 --- /dev/null +++ b/x-pack/metricbeat/module/openai/usage/config.go @@ -0,0 +1,94 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package usage + +import ( + "errors" + "fmt" + "net/url" + "time" +) + +type Config struct { + APIKeys []apiKeyConfig `config:"api_keys" validate:"required"` + APIURL string `config:"api_url" validate:"required"` + Headers []string `config:"headers"` + RateLimit *rateLimitConfig `config:"rate_limit"` + Timeout time.Duration `config:"timeout" validate:"required"` + Collection collectionConfig `config:"collection"` +} + +type rateLimitConfig struct { + Limit *int `config:"limit" validate:"required"` + Burst *int `config:"burst" validate:"required"` +} + +type apiKeyConfig struct { + Key string `config:"key" validate:"required"` +} + +type collectionConfig struct { + LookbackDays int `config:"lookback_days"` + Realtime bool `config:"realtime"` +} + +func defaultConfig() Config { + return Config{ + APIURL: "https://api.openai.com/v1/usage", + Timeout: 30 * time.Second, + RateLimit: &rateLimitConfig{ + Limit: ptr(12), + Burst: ptr(1), + }, + Collection: collectionConfig{ + LookbackDays: 0, // 0 days + Realtime: false, // avoid realtime collection by default + }, + } +} + +func (c *Config) Validate() error { + var errs []error + + if len(c.APIKeys) == 0 { + errs = append(errs, errors.New("at least one API key must be configured")) + } + if c.APIURL == "" { + errs = append(errs, errors.New("api_url cannot be empty")) + } else { + _, err := url.ParseRequestURI(c.APIURL) + if err != nil { + errs = append(errs, fmt.Errorf("invalid api_url format: %w", err)) + } + } + if c.RateLimit == nil { + errs = append(errs, errors.New("rate_limit must be configured")) + } else { + if c.RateLimit.Limit == nil { + errs = append(errs, errors.New("rate_limit.limit must be configured")) + } + if c.RateLimit.Burst == nil { + errs = append(errs, errors.New("rate_limit.burst must be configured")) + } + } + if c.Timeout <= 0 { + errs = append(errs, errors.New("timeout must be greater than 0")) + } + if c.Collection.LookbackDays < 0 { + errs = append(errs, errors.New("lookback_days must be >= 0")) + } + + for i, apiKey := range c.APIKeys { + if apiKey.Key == "" { + errs = append(errs, fmt.Errorf("API key at position %d cannot be empty", i)) + } + } + + if len(errs) > 0 { + return fmt.Errorf("validation failed: %w", errors.Join(errs...)) + } + + return nil +} diff --git a/x-pack/metricbeat/module/openai/usage/errors.go b/x-pack/metricbeat/module/openai/usage/errors.go new file mode 100644 index 00000000000..9b8b793767c --- /dev/null +++ b/x-pack/metricbeat/module/openai/usage/errors.go @@ -0,0 +1,10 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package usage + +import "errors" + +// ErrNoState indicates no previous state exists for the given API key +var ErrNoState = errors.New("no previous state found") diff --git a/x-pack/metricbeat/module/openai/usage/helper.go b/x-pack/metricbeat/module/openai/usage/helper.go new file mode 100644 index 00000000000..992c025df8c --- /dev/null +++ b/x-pack/metricbeat/module/openai/usage/helper.go @@ -0,0 +1,30 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package usage + +import "strings" + +// dateFormatForStateStore is used to parse and format dates in the YYYY-MM-DD format +const dateFormatForStateStore = "2006-01-02" + +func ptr[T any](value T) *T { + return &value +} + +func processHeaders(headers []string) map[string]string { + headersMap := make(map[string]string, len(headers)) + for _, header := range headers { + parts := strings.SplitN(header, ":", 2) + if len(parts) != 2 { + continue + } + k, v := strings.TrimSpace(parts[0]), strings.TrimSpace(parts[1]) + if k == "" || v == "" { + continue + } + headersMap[k] = v + } + return headersMap +} diff --git a/x-pack/metricbeat/module/openai/usage/persistcache.go b/x-pack/metricbeat/module/openai/usage/persistcache.go new file mode 100644 index 00000000000..51a8d6fb0f8 --- /dev/null +++ b/x-pack/metricbeat/module/openai/usage/persistcache.go @@ -0,0 +1,187 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package usage + +import ( + "crypto/sha256" + "encoding/hex" + "errors" + "fmt" + "os" + "path" + "strings" + "sync" + "time" +) + +// stateManager handles the storage and retrieval of state data with key hashing and caching capabilities +type stateManager struct { + mu sync.RWMutex + store *stateStore + keyPrefix string + hashCache sync.Map +} + +// stateStore handles persistence of key-value pairs using the filesystem +type stateStore struct { + Dir string // Base directory for storing state files + mu sync.RWMutex +} + +// newStateManager creates a new state manager instance with the given storage path +func newStateManager(storePath string) (*stateManager, error) { + if strings.TrimSpace(storePath) == "" { + return nil, errors.New("empty path provided") + } + + store, err := newStateStore(storePath) + if err != nil { + return nil, fmt.Errorf("create state store: %w", err) + } + + return &stateManager{ + mu: sync.RWMutex{}, + store: store, + keyPrefix: "state_", + hashCache: sync.Map{}, + }, nil +} + +// newStateStore creates a new state store instance at the specified path +func newStateStore(path string) (*stateStore, error) { + if err := os.MkdirAll(path, 0o755); err != nil { + return nil, fmt.Errorf("creating state directory: %w", err) + } + return &stateStore{ + Dir: path, + }, nil +} + +// getStatePath builds the full file path for a given state key +func (s *stateStore) getStatePath(name string) string { + return path.Join(s.Dir, name) +} + +// Put stores a value in a file named by the key +func (s *stateStore) Put(key, value string) error { + s.mu.Lock() + defer s.mu.Unlock() + + filePath := s.getStatePath(key) + + // In case the file already exists, file is truncated. + f, err := os.Create(filePath) + if err != nil { + return fmt.Errorf("creating state file: %w", err) + } + defer f.Close() + + _, err = f.WriteString(value) + if err != nil { + return fmt.Errorf("writing value to state file: %w", err) + } + + if err = f.Sync(); err != nil { + return fmt.Errorf("syncing state file: %w", err) + } + + return nil +} + +// Get retrieves the value stored in the file named by the key +func (s *stateStore) Get(key string) (string, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + filePath := s.getStatePath(key) + data, err := os.ReadFile(filePath) + if err != nil { + return "", fmt.Errorf("reading state file: %w", err) + } + return string(data), nil +} + +// Has checks if a state exists for the given key +func (s *stateStore) Has(key string) bool { + s.mu.RLock() + defer s.mu.RUnlock() + + filePath := s.getStatePath(key) + _, err := os.Stat(filePath) + return err == nil +} + +// Remove deletes the state file for the given key +func (s *stateStore) Remove(key string) error { + s.mu.Lock() + defer s.mu.Unlock() + + filePath := s.getStatePath(key) + if err := os.Remove(filePath); err != nil && !os.IsNotExist(err) { + return fmt.Errorf("removing state file: %w", err) + } + return nil +} + +// Clear removes all state files by deleting and recreating the state directory +func (s *stateStore) Clear() error { + s.mu.Lock() + defer s.mu.Unlock() + + if err := os.RemoveAll(s.Dir); err != nil { + return fmt.Errorf("clearing state directory: %w", err) + } + return os.MkdirAll(s.Dir, 0o755) +} + +// GetLastProcessedDate retrieves and parses the last processed date for a given API key +func (s *stateManager) GetLastProcessedDate(apiKey string) (time.Time, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + stateKey := s.GetStateKey(apiKey) + + if !s.store.Has(stateKey) { + return time.Time{}, ErrNoState + } + + dateStr, err := s.store.Get(stateKey) + if err != nil { + return time.Time{}, fmt.Errorf("get state: %w", err) + } + + return time.Parse(dateFormatForStateStore, dateStr) +} + +// SaveState saves the last processed date for a given API key +func (s *stateManager) SaveState(apiKey, dateStr string) error { + s.mu.Lock() + defer s.mu.Unlock() + + stateKey := s.GetStateKey(apiKey) + return s.store.Put(stateKey, dateStr) +} + +// hashKey generates and caches a SHA-256 hash of the provided API key +func (s *stateManager) hashKey(apiKey string) string { + // Check cache first to avoid recomputing hashes + if hashedKey, ok := s.hashCache.Load(apiKey); ok { + return hashedKey.(string) + } + + // Generate SHA-256 hash and hex encode for safe filename usage + hasher := sha256.New() + _, _ = hasher.Write([]byte(apiKey)) + hashedKey := hex.EncodeToString(hasher.Sum(nil)) + + // Cache the computed hash for future lookups + s.hashCache.Store(apiKey, hashedKey) + return hashedKey +} + +// GetStateKey generates a unique state key for a given API key +func (s *stateManager) GetStateKey(apiKey string) string { + return s.keyPrefix + s.hashKey(apiKey) +} diff --git a/x-pack/metricbeat/module/openai/usage/schema.go b/x-pack/metricbeat/module/openai/usage/schema.go new file mode 100644 index 00000000000..64f22393e59 --- /dev/null +++ b/x-pack/metricbeat/module/openai/usage/schema.go @@ -0,0 +1,69 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package usage + +type BaseData struct { + OrganizationID string `json:"organization_id"` + OrganizationName string `json:"organization_name"` + ApiKeyID *string `json:"api_key_id"` + ApiKeyName *string `json:"api_key_name"` + ApiKeyRedacted *string `json:"api_key_redacted"` + ApiKeyType *string `json:"api_key_type"` + ProjectID *string `json:"project_id"` + ProjectName *string `json:"project_name"` +} + +type UsageResponse struct { + Object string `json:"object"` + Data []UsageData `json:"data"` + FtData []interface{} `json:"ft_data"` + DalleApiData []DalleData `json:"dalle_api_data"` + WhisperApiData []WhisperData `json:"whisper_api_data"` + TtsApiData []TtsData `json:"tts_api_data"` + AssistantCodeInterpreterData []interface{} `json:"assistant_code_interpreter_data"` + RetrievalStorageData []interface{} `json:"retrieval_storage_data"` +} + +type UsageData struct { + BaseData + AggregationTimestamp int64 `json:"aggregation_timestamp"` + NRequests int `json:"n_requests"` + Operation string `json:"operation"` + SnapshotID string `json:"snapshot_id"` + NContextTokensTotal int `json:"n_context_tokens_total"` + NGeneratedTokensTotal int `json:"n_generated_tokens_total"` + Email *string `json:"email"` + RequestType string `json:"request_type"` + NCachedContextTokensTotal int `json:"n_cached_context_tokens_total"` +} + +type DalleData struct { + BaseData + Timestamp int64 `json:"timestamp"` + NumImages int `json:"num_images"` + NumRequests int `json:"num_requests"` + ImageSize string `json:"image_size"` + Operation string `json:"operation"` + ModelID string `json:"model_id"` + UserID string `json:"user_id"` +} + +type WhisperData struct { + BaseData + Timestamp int64 `json:"timestamp"` + ModelID string `json:"model_id"` + NumSeconds int `json:"num_seconds"` + NumRequests int `json:"num_requests"` + UserID string `json:"user_id"` +} + +type TtsData struct { + BaseData + Timestamp int64 `json:"timestamp"` + ModelID string `json:"model_id"` + NumCharacters int `json:"num_characters"` + NumRequests int `json:"num_requests"` + UserID string `json:"user_id"` +} diff --git a/x-pack/metricbeat/module/openai/usage/usage.go b/x-pack/metricbeat/module/openai/usage/usage.go new file mode 100644 index 00000000000..86dbe76ce81 --- /dev/null +++ b/x-pack/metricbeat/module/openai/usage/usage.go @@ -0,0 +1,387 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package usage + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "net" + "net/http" + "path" + "time" + + "golang.org/x/sync/errgroup" + "golang.org/x/time/rate" + + "github.com/elastic/beats/v7/libbeat/common/cfgwarn" + "github.com/elastic/beats/v7/metricbeat/mb" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/mapstr" + "github.com/elastic/elastic-agent-libs/paths" +) + +// init registers the MetricSet with the central registry as soon as the program +// starts. The New function will be called later to instantiate an instance of +// the MetricSet for each host is defined in the module's configuration. After the +// MetricSet has been created then Fetch will begin to be called periodically. +func init() { + mb.Registry.MustAddMetricSet("openai", "usage", New) +} + +// MetricSet holds any configuration or state information. It must implement +// the mb.MetricSet interface. And this is best achieved by embedding +// mb.BaseMetricSet because it implements all of the required mb.MetricSet +// interface methods except for Fetch. +type MetricSet struct { + mb.BaseMetricSet + httpClient *RLHTTPClient + logger *logp.Logger + config Config + report mb.ReporterV2 + stateManager *stateManager + headers map[string]string +} + +// New creates a new instance of the MetricSet. New is responsible for unpacking +// any MetricSet specific configuration options if there are any. +func New(base mb.BaseMetricSet) (mb.MetricSet, error) { + cfgwarn.Beta("The openai usage metricset is beta.") + + config := defaultConfig() + if err := base.Module().UnpackConfig(&config); err != nil { + return nil, err + } + + if err := config.Validate(); err != nil { + return nil, err + } + + sm, err := newStateManager(paths.Resolve(paths.Data, path.Join("state", base.Module().Name(), base.Name()))) + if err != nil { + return nil, fmt.Errorf("create state manager: %w", err) + } + + logger := logp.NewLogger("openai.usage") + + httpClient := newClient( + logger, + rate.NewLimiter( + rate.Every(time.Duration(*config.RateLimit.Limit)*time.Second), + *config.RateLimit.Burst, + ), + config.Timeout, + ) + + return &MetricSet{ + BaseMetricSet: base, + httpClient: httpClient, + logger: logger, + config: config, + stateManager: sm, + headers: processHeaders(config.Headers), + }, nil +} + +// Fetch collects OpenAI API usage data for the configured time range. +// +// The collection process: +// 1. Determines the time range based on realtime/non-realtime configuration +// 2. Calculates start date using configured lookback days +// 3. Fetches usage data for each day in the range +// 4. Reports collected metrics through the mb.ReporterV2 +func (m *MetricSet) Fetch(report mb.ReporterV2) error { + endDate := time.Now().UTC().Truncate(time.Hour * 24) // truncate to day as we only collect daily data + + if !m.config.Collection.Realtime { + // If we're not collecting realtime data, then just pull until + // yesterday (in UTC). + endDate = endDate.AddDate(0, 0, -1) + } + + startDate := endDate.AddDate(0, 0, -m.config.Collection.LookbackDays) + + m.report = report + return m.fetchDateRange(startDate, endDate, m.httpClient) +} + +// fetchDateRange retrieves OpenAI API usage data for each configured API key within a date range. +// +// For each API key: +// 1. Retrieves last processed date from state store +// 2. Adjusts collection range to avoid duplicates +// 3. Collects daily usage data +// 4. Updates state store with latest processed date +// 5. Handles errors per day without failing entire range +func (m *MetricSet) fetchDateRange(startDate, endDate time.Time, httpClient *RLHTTPClient) error { + g, ctx := errgroup.WithContext(context.TODO()) + + for i := range m.config.APIKeys { + apiKey := m.config.APIKeys[i] + apiKeyIdx := i + 1 + g.Go(func() error { + lastProcessedDate, err := m.stateManager.GetLastProcessedDate(apiKey.Key) + if err == nil { + currentStartDate := lastProcessedDate.AddDate(0, 0, 1) + if currentStartDate.After(endDate) { + m.logger.Infof("Skipping API key #%d as current start date (%s) is after end date (%s)", apiKeyIdx, currentStartDate, endDate) + return nil + } + startDate = currentStartDate + } + + m.logger.Debugf("Fetching data for API key #%d from %s to %s", apiKeyIdx, startDate, endDate) + + for d := startDate; !d.After(endDate); d = d.AddDate(0, 0, 1) { + select { + case <-ctx.Done(): + return ctx.Err() + default: + dateStr := d.Format(dateFormatForStateStore) + if err := m.fetchSingleDay(apiKeyIdx, dateStr, apiKey.Key, httpClient); err != nil { + // If there's an error, log it and continue to the next day. + // In this case, we are not saving the state. + m.logger.Errorf("Error fetching data (api key #%d) for date %s: %v", apiKeyIdx, dateStr, err) + continue + } + if err := m.stateManager.SaveState(apiKey.Key, dateStr); err != nil { + m.logger.Errorf("Error storing state for API key: %v at index %d", err, apiKeyIdx) + } + } + } + return nil + }) + } + + if err := g.Wait(); err != nil { + m.logger.Errorf("Error fetching data: %v", err) + } + + return nil +} + +// fetchSingleDay retrieves usage data for a specific date and API key. +func (m *MetricSet) fetchSingleDay(apiKeyIdx int, dateStr, apiKey string, httpClient *RLHTTPClient) error { + req, err := m.createRequest(dateStr, apiKey) + if err != nil { + return fmt.Errorf("error creating request: %w", err) + } + + resp, err := httpClient.Do(req) + if err != nil { + var netErr net.Error + if errors.As(err, &netErr) && netErr.Timeout() { + return fmt.Errorf("request timed out with configured timeout: %v and error: %w", m.config.Timeout, err) + } + return fmt.Errorf("error making request: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("error response from API: status=%s", resp.Status) + } + + return m.processResponse(apiKeyIdx, resp, dateStr) +} + +// createRequest builds an HTTP request for the OpenAI usage API. +func (m *MetricSet) createRequest(dateStr, apiKey string) (*http.Request, error) { + req, err := http.NewRequestWithContext(context.TODO(), http.MethodGet, m.config.APIURL, nil) + if err != nil { + return nil, fmt.Errorf("error creating request: %w", err) + } + + q := req.URL.Query() + q.Add("date", dateStr) + req.URL.RawQuery = q.Encode() + + req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", apiKey)) + for key, value := range m.headers { + req.Header.Add(key, value) + } + + return req, nil +} + +// processResponse handles the API response and processes the usage data. +func (m *MetricSet) processResponse(apiKeyIdx int, resp *http.Response, dateStr string) error { + var usageResponse UsageResponse + if err := json.NewDecoder(resp.Body).Decode(&usageResponse); err != nil { + return fmt.Errorf("error decoding response: %w", err) + } + + m.logger.Infof("Fetching usage metrics (api key #%d) for date: %s", apiKeyIdx, dateStr) + + m.processUsageData(usageResponse.Data) + m.processDalleData(usageResponse.DalleApiData) + m.processWhisperData(usageResponse.WhisperApiData) + m.processTTSData(usageResponse.TtsApiData) + + // Process additional data. + // + // NOTE(shmsr): During testing, could not get the usage data for the following + // and found no documentation, example responses, etc. That's why let's store them + // as it is so that we can use processors later on to process them as needed. + m.processFTData(usageResponse.FtData) + m.processAssistantCodeInterpreterData(usageResponse.AssistantCodeInterpreterData) + m.processRetrievalStorageData(usageResponse.RetrievalStorageData) + + return nil +} + +func getBaseFields(data BaseData) mapstr.M { + return mapstr.M{ + "organization_id": data.OrganizationID, + "organization_name": data.OrganizationName, + "api_key_id": data.ApiKeyID, + "api_key_name": data.ApiKeyName, + "api_key_redacted": data.ApiKeyRedacted, + "api_key_type": data.ApiKeyType, + "project_id": data.ProjectID, + "project_name": data.ProjectName, + } +} + +func (m *MetricSet) processUsageData(data []UsageData) { + events := make([]mb.Event, 0, len(data)) + for _, usage := range data { + event := mb.Event{ + Timestamp: time.Unix(usage.AggregationTimestamp, 0).UTC(), // epoch time to time.Time (UTC) + MetricSetFields: mapstr.M{ + "data": mapstr.M{ + "requests_total": usage.NRequests, + "operation": usage.Operation, + "snapshot_id": usage.SnapshotID, + "context_tokens_total": usage.NContextTokensTotal, + "generated_tokens_total": usage.NGeneratedTokensTotal, + "email": usage.Email, + "request_type": usage.RequestType, + "cached_context_tokens_total": usage.NCachedContextTokensTotal, + }, + }, + } + event.MetricSetFields.DeepUpdate(getBaseFields(usage.BaseData)) + events = append(events, event) + } + m.processEvents(events) +} + +func (m *MetricSet) processDalleData(data []DalleData) { + events := make([]mb.Event, 0, len(data)) + for _, dalle := range data { + event := mb.Event{ + Timestamp: time.Unix(dalle.Timestamp, 0).UTC(), // epoch time to time.Time (UTC) + MetricSetFields: mapstr.M{ + "dalle": mapstr.M{ + "num_images": dalle.NumImages, + "requests_total": dalle.NumRequests, + "image_size": dalle.ImageSize, + "operation": dalle.Operation, + "user_id": dalle.UserID, + "model_id": dalle.ModelID, + }, + }, + } + event.MetricSetFields.DeepUpdate(getBaseFields(dalle.BaseData)) + events = append(events, event) + } + m.processEvents(events) +} + +func (m *MetricSet) processWhisperData(data []WhisperData) { + events := make([]mb.Event, 0, len(data)) + for _, whisper := range data { + event := mb.Event{ + Timestamp: time.Unix(whisper.Timestamp, 0).UTC(), // epoch time to time.Time (UTC) + MetricSetFields: mapstr.M{ + "whisper": mapstr.M{ + "model_id": whisper.ModelID, + "num_seconds": whisper.NumSeconds, + "requests_total": whisper.NumRequests, + "user_id": whisper.UserID, + }, + }, + } + event.MetricSetFields.DeepUpdate(getBaseFields(whisper.BaseData)) + events = append(events, event) + } + m.processEvents(events) +} + +func (m *MetricSet) processTTSData(data []TtsData) { + events := make([]mb.Event, 0, len(data)) + for _, tts := range data { + event := mb.Event{ + Timestamp: time.Unix(tts.Timestamp, 0).UTC(), // epoch time to time.Time (UTC) + MetricSetFields: mapstr.M{ + "tts": mapstr.M{ + "model_id": tts.ModelID, + "num_characters": tts.NumCharacters, + "requests_total": tts.NumRequests, + "user_id": tts.UserID, + }, + }, + } + event.MetricSetFields.DeepUpdate(getBaseFields(tts.BaseData)) + events = append(events, event) + } + + m.processEvents(events) +} + +func (m *MetricSet) processFTData(data []interface{}) { + events := make([]mb.Event, 0, len(data)) + for _, ft := range data { + event := mb.Event{ + MetricSetFields: mapstr.M{ + "ft_data": mapstr.M{ + "original": ft, + }, + }, + } + events = append(events, event) + } + m.processEvents(events) +} + +func (m *MetricSet) processAssistantCodeInterpreterData(data []interface{}) { + events := make([]mb.Event, 0, len(data)) + for _, aci := range data { + event := mb.Event{ + MetricSetFields: mapstr.M{ + "assistant_code_interpreter": mapstr.M{ + "original": aci, + }, + }, + } + events = append(events, event) + } + m.processEvents(events) +} + +func (m *MetricSet) processRetrievalStorageData(data []interface{}) { + events := make([]mb.Event, 0, len(data)) + for _, rs := range data { + event := mb.Event{ + MetricSetFields: mapstr.M{ + "retrieval_storage": mapstr.M{ + "original": rs, + }, + }, + } + events = append(events, event) + } + m.processEvents(events) +} + +func (m *MetricSet) processEvents(events []mb.Event) { + if len(events) == 0 { + return + } + for i := range events { + m.report.Event(events[i]) + } +} diff --git a/x-pack/metricbeat/module/openai/usage/usage_integration_test.go b/x-pack/metricbeat/module/openai/usage/usage_integration_test.go new file mode 100644 index 00000000000..8cb84241995 --- /dev/null +++ b/x-pack/metricbeat/module/openai/usage/usage_integration_test.go @@ -0,0 +1,482 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build integration + +package usage + +import ( + "fmt" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/metricbeat/mb" + mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing" + "github.com/elastic/elastic-agent-libs/mapstr" +) + +func TestFetch(t *testing.T) { + apiKey := time.Now().String() // to generate a unique API key everytime to ignore the stateStore + usagePath := "/usage" + server := initServer(usagePath, apiKey) + defer server.Close() + + tests := []struct { + name string + expected mb.Event + }{ + { + name: "tc: #1", + expected: mb.Event{ + RootFields: nil, + ModuleFields: nil, + MetricSetFields: mapstr.M{ + "api_key_id": (*string)(nil), + "api_key_name": (*string)(nil), + "api_key_redacted": (*string)(nil), + "api_key_type": (*string)(nil), + "data": mapstr.M{ + "email": (*string)(nil), + "cached_context_tokens_total": 0, + "context_tokens_total": 118, + "generated_tokens_total": 35, + "requests_total": 1, + "operation": "completion-realtime", + "request_type": "", + "snapshot_id": "gpt-4o-realtime-preview-2024-10-01", + }, + "organization_id": "org-dummy", + "organization_name": "Personal", + "project_id": (*string)(nil), + "project_name": (*string)(nil), + }, + Index: "", + ID: "", + Namespace: "", + Timestamp: time.Date(2024, time.November, 4, 5, 1, 0, 0, time.UTC), + Error: error(nil), + Host: "", + Service: "", + Took: 0, + Period: 0, + DisableTimeSeries: false, + }, + }, + { + name: "tc: #2", + expected: mb.Event{ + RootFields: nil, + ModuleFields: nil, + MetricSetFields: mapstr.M{ + "api_key_id": (*string)(nil), + "api_key_name": (*string)(nil), + "api_key_redacted": (*string)(nil), + "api_key_type": (*string)(nil), + "data": mapstr.M{ + "email": (*string)(nil), + "cached_context_tokens_total": 0, + "context_tokens_total": 31, + "generated_tokens_total": 12, + "requests_total": 1, + "operation": "completion", + "request_type": "", + "snapshot_id": "gpt-4o-2024-08-06", + }, + "organization_id": "org-dummy", + "organization_name": "Personal", + "project_id": (*string)(nil), + "project_name": (*string)(nil), + }, + Index: "", + ID: "", + Namespace: "", + Timestamp: time.Date(2024, time.November, 4, 5, 1, 0, 0, time.UTC), + Error: error(nil), + Host: "", + Service: "", + Took: 0, + Period: 0, + DisableTimeSeries: false, + }, + }, + { + name: "tc: #3", + expected: mb.Event{ + RootFields: nil, + ModuleFields: nil, + MetricSetFields: mapstr.M{ + "api_key_id": (*string)(nil), + "api_key_name": (*string)(nil), + "api_key_redacted": (*string)(nil), + "api_key_type": (*string)(nil), + "data": mapstr.M{ + "email": (*string)(nil), + "cached_context_tokens_total": 0, + "context_tokens_total": 13, + "generated_tokens_total": 9, + "requests_total": 1, + "operation": "completion", + "request_type": "", + "snapshot_id": "ft:gpt-3.5-turbo-0125:personal:yay-renew:APjjyG8E:ckpt-step-84", + }, + "organization_id": "org-dummy", + "organization_name": "Personal", + "project_id": (*string)(nil), + "project_name": (*string)(nil), + }, + Index: "", + ID: "", + Namespace: "", + Timestamp: time.Date(2024, time.November, 4, 5, 19, 0, 0, time.UTC), + Error: error(nil), + Host: "", + Service: "", + Took: 0, + Period: 0, + DisableTimeSeries: false, + }, + }, + { + name: "tc: #4", + expected: mb.Event{ + RootFields: nil, + ModuleFields: nil, + MetricSetFields: mapstr.M{ + "api_key_id": ptr("key_sha_id_random"), + "api_key_name": ptr("project_key"), + "api_key_redacted": ptr("sk-...zkA"), + "api_key_type": ptr("organization"), + "dalle": mapstr.M{ + "image_size": "1024x1024", + "model_id": "dall-e-3", + "num_images": 1, + "requests_total": 1, + "operation": "generations", + "user_id": "hello-test@elastic.co", + }, + "organization_id": "org-dummy", + "organization_name": "Personal", + "project_id": ptr("Default Project"), + "project_name": ptr("Default Project"), + }, + Index: "", + ID: "", + Namespace: "", + Timestamp: time.Date(2024, time.November, 4, 5, 1, 0, 0, time.UTC), + Error: error(nil), + Host: "", + Service: "", + Took: 0, + Period: 0, + DisableTimeSeries: false, + }, + }, + { + name: "tc: #5", + expected: mb.Event{ + RootFields: nil, + ModuleFields: nil, + MetricSetFields: mapstr.M{ + "api_key_id": (*string)(nil), + "api_key_name": (*string)(nil), + "api_key_redacted": (*string)(nil), + "api_key_type": (*string)(nil), + "whisper": mapstr.M{ + "model_id": "whisper-1", + "requests_total": 1, + "num_seconds": 2, + "user_id": "", + }, + "organization_id": "org-dummy", + "organization_name": "Personal", + "project_id": (*string)(nil), + "project_name": (*string)(nil), + }, + Index: "", + ID: "", + Namespace: "", + Timestamp: time.Date(2024, time.November, 4, 5, 1, 0, 0, time.UTC), + Error: error(nil), + Host: "", + Service: "", + Took: 0, + Period: 0, + DisableTimeSeries: false, + }, + }, + { + name: "tc: #6", + expected: mb.Event{ + RootFields: nil, + ModuleFields: nil, + MetricSetFields: mapstr.M{ + "api_key_id": ptr("key_fake_id"), + "api_key_name": ptr("project_key"), + "api_key_redacted": ptr("sk-...zkA"), + "api_key_type": ptr("organization"), + "tts": mapstr.M{ + "model_id": "tts-1", + "num_characters": 90, + "requests_total": 2, + "user_id": "hello-test@elastic.co", + }, + "organization_id": "org-fake", + "organization_name": "Personal", + "project_id": ptr("Default Project"), + "project_name": ptr("Default Project"), + }, + Index: "", + ID: "", + Namespace: "", + Timestamp: time.Date(2024, time.September, 4, 0, 0, 0, 0, time.UTC), + Error: error(nil), + Host: "", + Service: "", + Took: 0, + Period: 0, + DisableTimeSeries: false, + }, + }, + { + name: "tc: #7", + expected: mb.Event{ + RootFields: nil, + ModuleFields: nil, + MetricSetFields: mapstr.M{ + "api_key_id": ptr("key_fake_id"), + "api_key_name": ptr("fake_key"), + "api_key_redacted": ptr("sk-...FIA"), + "api_key_type": ptr("project"), + "tts": mapstr.M{ + "model_id": "tts-1", + "num_characters": 45, + "requests_total": 1, + "user_id": "hello-test@elastic.co", + }, + "organization_id": "org-fake", + "organization_name": "Personal", + "project_id": ptr("proj_fake_id"), + "project_name": ptr("fake_proj"), + }, + Index: "", + ID: "", + Namespace: "", + Timestamp: time.Date(2024, time.September, 5, 0, 0, 0, 0, time.UTC), + Error: error(nil), + Host: "", + Service: "", + Took: 0, + Period: 0, + DisableTimeSeries: false, + }, + }, + } + + f := mbtest.NewReportingMetricSetV2Error(t, getConfig(server.URL+"/usage", apiKey)) + events, errs := mbtest.ReportingFetchV2Error(f) + + require.Empty(t, errs, "Expected no errors") + require.NotEmpty(t, events, "Expected events to be returned") + require.Equal(t, len(tests), len(events)) + + for i, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require.Equal(t, tt.expected, events[i]) + }) + } +} + +func TestData(t *testing.T) { + apiKey := time.Now().String() // to generate a unique API key everytime + usagePath := "/usage" + server := initServer(usagePath, apiKey) + defer server.Close() + + f := mbtest.NewReportingMetricSetV2Error(t, getConfig(server.URL+"/usage", apiKey)) + + err := mbtest.WriteEventsReporterV2Error(f, t, "") + require.NoError(t, err, "Writing events should not return an error") +} + +func getConfig(url, apiKey string) map[string]interface{} { + return map[string]interface{}{ + "module": "openai", + "metricsets": []string{"usage"}, + "enabled": true, + "period": "24h", + "api_url": url, + "api_keys": []map[string]interface{}{ + {"key": apiKey}, + }, + "rate_limit": map[string]interface{}{ + "limit": 60, + "burst": 5, + }, + "collection": map[string]interface{}{ + "lookback_days": 0, + "realtime": false, + }, + } +} + +func initServer(endpoint, api_key string) *httptest.Server { + data := []byte(`{ + "object": "list", + "data": [ + { + "organization_id": "org-dummy", + "organization_name": "Personal", + "aggregation_timestamp": 1730696460, + "n_requests": 1, + "operation": "completion-realtime", + "snapshot_id": "gpt-4o-realtime-preview-2024-10-01", + "n_context_tokens_total": 118, + "n_generated_tokens_total": 35, + "email": null, + "api_key_id": null, + "api_key_name": null, + "api_key_redacted": null, + "api_key_type": null, + "project_id": null, + "project_name": null, + "request_type": "", + "n_cached_context_tokens_total": 0 + }, + { + "organization_id": "org-dummy", + "organization_name": "Personal", + "aggregation_timestamp": 1730696460, + "n_requests": 1, + "operation": "completion", + "snapshot_id": "gpt-4o-2024-08-06", + "n_context_tokens_total": 31, + "n_generated_tokens_total": 12, + "email": null, + "api_key_id": null, + "api_key_name": null, + "api_key_redacted": null, + "api_key_type": null, + "project_id": null, + "project_name": null, + "request_type": "", + "n_cached_context_tokens_total": 0 + }, + { + "organization_id": "org-dummy", + "organization_name": "Personal", + "aggregation_timestamp": 1730697540, + "n_requests": 1, + "operation": "completion", + "snapshot_id": "ft:gpt-3.5-turbo-0125:personal:yay-renew:APjjyG8E:ckpt-step-84", + "n_context_tokens_total": 13, + "n_generated_tokens_total": 9, + "email": null, + "api_key_id": null, + "api_key_name": null, + "api_key_redacted": null, + "api_key_type": null, + "project_id": null, + "project_name": null, + "request_type": "", + "n_cached_context_tokens_total": 0 + } + ], + "ft_data": [], + "dalle_api_data": [ + { + "timestamp": 1730696460, + "num_images": 1, + "num_requests": 1, + "image_size": "1024x1024", + "operation": "generations", + "user_id": "hello-test@elastic.co", + "organization_id": "org-dummy", + "api_key_id": "key_sha_id_random", + "api_key_name": "project_key", + "api_key_redacted": "sk-...zkA", + "api_key_type": "organization", + "organization_name": "Personal", + "model_id": "dall-e-3", + "project_id": "Default Project", + "project_name": "Default Project" + } + ], + "whisper_api_data": [ + { + "timestamp": 1730696460, + "model_id": "whisper-1", + "num_seconds": 2, + "num_requests": 1, + "user_id": null, + "organization_id": "org-dummy", + "api_key_id": null, + "api_key_name": null, + "api_key_redacted": null, + "api_key_type": null, + "organization_name": "Personal", + "project_id": null, + "project_name": null + } + ], + "tts_api_data": [ + { + "timestamp": 1725408000, + "model_id": "tts-1", + "num_characters": 90, + "num_requests": 2, + "user_id": "hello-test@elastic.co", + "organization_id": "org-fake", + "api_key_id": "key_fake_id", + "api_key_name": "project_key", + "api_key_redacted": "sk-...zkA", + "api_key_type": "organization", + "organization_name": "Personal", + "project_id": "Default Project", + "project_name": "Default Project" + }, + { + "timestamp": 1725494400, + "model_id": "tts-1", + "num_characters": 45, + "num_requests": 1, + "user_id": "hello-test@elastic.co", + "organization_id": "org-fake", + "api_key_id": "key_fake_id", + "api_key_name": "fake_key", + "api_key_redacted": "sk-...FIA", + "api_key_type": "project", + "organization_name": "Personal", + "project_id": "proj_fake_id", + "project_name": "fake_proj" + } + ], + "assistant_code_interpreter_data": [], + "retrieval_storage_data": [] +}`) + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Validate Bearer token + authHeader := r.Header.Get("Authorization") + expectedToken := fmt.Sprintf("Bearer %s", api_key) + + if authHeader != expectedToken { + w.WriteHeader(http.StatusUnauthorized) + return + } + + // If it doesn't match the expected endpoint, return 404 + if r.URL.Path != endpoint { + w.WriteHeader(http.StatusNotFound) + return + } + + w.WriteHeader(http.StatusOK) + _, _ = w.Write(data) + })) + return server +} diff --git a/x-pack/metricbeat/modules.d/openai.yml.disabled b/x-pack/metricbeat/modules.d/openai.yml.disabled new file mode 100644 index 00000000000..6a881ae8641 --- /dev/null +++ b/x-pack/metricbeat/modules.d/openai.yml.disabled @@ -0,0 +1,37 @@ +# Module: openai +# Docs: https://www.elastic.co/guide/en/beats/metricbeat/main/metricbeat-module-openai.html + +- module: openai + metricsets: ["usage"] + enabled: false + period: 1h + + # # Project API Keys - Multiple API keys can be specified for different projects + # api_keys: + # - key: "api_key1" + # - key: "api_key2" + + # # API Configuration + # ## Base URL for the OpenAI usage API endpoint + # api_url: "https://api.openai.com/v1/usage" + # ## Custom headers to be included in API requests + # headers: + # - "k1: v1" + # - "k2: v2" + ## Rate Limiting Configuration + # rate_limit: + # limit: 12 # seconds between requests + # burst: 1 # max concurrent requests + # ## Request Timeout Duration + # timeout: 30s + + # # Data Collection Configuration + # collection: + # ## Number of days to look back when collecting usage data + # lookback_days: 30 + # ## Whether to collect usage data in realtime. Defaults to false as how + # # OpenAI usage data is collected will end up adding duplicate data to ES + # # and also making it harder to do analytics. Best approach is to avoid + # # realtime collection and collect only upto last day (in UTC). So, there's + # # at most 24h delay. + # realtime: false