diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 7d3d09d1e78..1a8fd0a0088 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -223,6 +223,7 @@ CHANGELOG* /x-pack/metricbeat/module/iis @elastic/obs-infraobs-integrations /x-pack/metricbeat/module/istio/ @elastic/obs-cloudnative-monitoring /x-pack/metricbeat/module/mssql @elastic/obs-infraobs-integrations +/x-pack/metricbeat/module/openai @elastic/obs-infraobs-integrations /x-pack/metricbeat/module/oracle @elastic/obs-infraobs-integrations /x-pack/metricbeat/module/panw @elastic/obs-infraobs-integrations /x-pack/metricbeat/module/prometheus/ @elastic/obs-cloudnative-monitoring diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 74d832bc4b6..1a46b04d9bf 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -369,6 +369,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - The environment variable `BEATS_AZURE_EVENTHUB_INPUT_TRACING_ENABLED: true` enables internal logs tracer for the azure-eventhub input. {issue}41931[41931] {pull}41932[41932] - The Filestream input now uses the `fingerprint` file identity by default. The state from files are automatically migrated if the previous file identity was `native` (the default) or `path`. If the `file_identity` is explicitly set, there is no change in behaviour. {issue}40197[40197] {pull}41762[41762] - Rate limiting operability improvements in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}41977[41977] +- Added default values in the streaming input for websocket retries and put a cap on retry wait time to be lesser than equal to the maximum defined wait time. {pull}42012[42012] *Auditbeat* @@ -421,6 +422,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Add support for region/zone for Vertex AI service in GCP module {pull}41551[41551] - Add support for location label as an optional configuration parameter in GCP metrics metricset. {issue}41550[41550] {pull}41626[41626] - Added `tier_preference`, `creation_date` and `version` fields to the `elasticsearch.index` metricset. {pull}41944[41944] +- Add `use_performance_counters` to collect CPU metrics using performance counters on Windows for `system/cpu` and `system/core` {pull}41965[41965] *Metricbeat* - Add benchmark module {pull}41801[41801] diff --git a/NOTICE.txt b/NOTICE.txt index dd599cb5cb1..ae812a0cbcc 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -12699,11 +12699,11 @@ Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-l -------------------------------------------------------------------------------- Dependency : github.com/elastic/elastic-agent-system-metrics -Version: v0.11.4 +Version: v0.11.5 Licence type (autodetected): Apache-2.0 -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-system-metrics@v0.11.4/LICENSE.txt: +Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-system-metrics@v0.11.5/LICENSE.txt: Apache License Version 2.0, January 2004 diff --git a/go.mod b/go.mod index 2e25c7c9de0..77abb4800d1 100644 --- a/go.mod +++ b/go.mod @@ -178,7 +178,7 @@ require ( github.com/elastic/ebpfevents v0.6.0 github.com/elastic/elastic-agent-autodiscover v0.9.0 github.com/elastic/elastic-agent-libs v0.17.4 - github.com/elastic/elastic-agent-system-metrics v0.11.4 + github.com/elastic/elastic-agent-system-metrics v0.11.5 github.com/elastic/go-elasticsearch/v8 v8.14.0 github.com/elastic/go-quark v0.2.0 github.com/elastic/go-sfdc v0.0.0-20241010131323-8e176480d727 diff --git a/go.sum b/go.sum index cbf4b3eab9e..2e45c13cbd3 100644 --- a/go.sum +++ b/go.sum @@ -324,8 +324,8 @@ github.com/elastic/elastic-agent-client/v7 v7.15.0 h1:nDB7v8TBoNuD6IIzC3z7Q0y+7b github.com/elastic/elastic-agent-client/v7 v7.15.0/go.mod h1:6h+f9QdIr3GO2ODC0Y8+aEXRwzbA5W4eV4dd/67z7nI= github.com/elastic/elastic-agent-libs v0.17.4 h1:kWK5Kn2EQjM97yHqbeXv+cFAIti4IiI9Qj8huM+lZzE= github.com/elastic/elastic-agent-libs v0.17.4/go.mod h1:5CR02awPrBr+tfmjBBK+JI+dMmHNQjpVY24J0wjbC7M= -github.com/elastic/elastic-agent-system-metrics v0.11.4 h1:Z/8CML5RKvGpi6/QUFok1K3EriBAv2kUAXnsk8hCifk= -github.com/elastic/elastic-agent-system-metrics v0.11.4/go.mod h1:TTW2ysv78uHBQ68hG8TXiaX1m6f29ZHgGWb8XONYsU8= +github.com/elastic/elastic-agent-system-metrics v0.11.5 h1:JSjXFEn8uYZ9hoC/GxZNMgJ622UoP96sjYP/49/Uvuo= +github.com/elastic/elastic-agent-system-metrics v0.11.5/go.mod h1:nzkrGajQA29YNcfP62gfzhxX9an3/xdQ3RmfQNw9YTI= github.com/elastic/elastic-transport-go/v8 v8.6.0 h1:Y2S/FBjx1LlCv5m6pWAF2kDJAHoSjSRSJCApolgfthA= github.com/elastic/elastic-transport-go/v8 v8.6.0/go.mod h1:YLHer5cj0csTzNFXoNQ8qhtGY1GTvSqPnKWKaqQE3Hk= github.com/elastic/fsevents v0.0.0-20181029231046-e1d381a4d270 h1:cWPqxlPtir4RoQVCpGSRXmLqjEHpJKbR60rxh1nQZY4= diff --git a/metricbeat/docs/fields.asciidoc b/metricbeat/docs/fields.asciidoc index 967c20f14cf..103bdd71835 100644 --- a/metricbeat/docs/fields.asciidoc +++ b/metricbeat/docs/fields.asciidoc @@ -68,6 +68,7 @@ grouped in the following categories: * <> * <> * <> +* <> * <> * <> * <> @@ -56706,6 +56707,372 @@ type: long -- +[[exported-fields-openai]] +== openai fields + +openai module + + + +[float] +=== openai + + + + +[float] +=== usage + +OpenAI API usage metrics and statistics + + + +*`openai.usage.organization_id`*:: ++ +-- +Organization identifier + +type: keyword + +-- + +*`openai.usage.organization_name`*:: ++ +-- +Organization name + +type: keyword + +-- + +*`openai.usage.api_key_id`*:: ++ +-- +API key identifier + +type: keyword + +-- + +*`openai.usage.api_key_name`*:: ++ +-- +API key name + +type: keyword + +-- + +*`openai.usage.api_key_redacted`*:: ++ +-- +Redacted API key + +type: keyword + +-- + +*`openai.usage.api_key_type`*:: ++ +-- +Type of API key + +type: keyword + +-- + +*`openai.usage.project_id`*:: ++ +-- +Project identifier + +type: keyword + +-- + +*`openai.usage.project_name`*:: ++ +-- +Project name + +type: keyword + +-- + +[float] +=== data + +General usage data metrics + + + +*`openai.usage.data.requests_total`*:: ++ +-- +Number of requests made + +type: long + +-- + +*`openai.usage.data.operation`*:: ++ +-- +Operation type + +type: keyword + +-- + +*`openai.usage.data.snapshot_id`*:: ++ +-- +Snapshot identifier + +type: keyword + +-- + +*`openai.usage.data.context_tokens_total`*:: ++ +-- +Total number of context tokens used + +type: long + +-- + +*`openai.usage.data.generated_tokens_total`*:: ++ +-- +Total number of generated tokens + +type: long + +-- + +*`openai.usage.data.cached_context_tokens_total`*:: ++ +-- +Total number of cached context tokens + +type: long + +-- + +*`openai.usage.data.email`*:: ++ +-- +User email + +type: keyword + +-- + +*`openai.usage.data.request_type`*:: ++ +-- +Type of request + +type: keyword + +-- + +[float] +=== dalle + +DALL-E API usage metrics + + + +*`openai.usage.dalle.num_images`*:: ++ +-- +Number of images generated + +type: long + +-- + +*`openai.usage.dalle.requests_total`*:: ++ +-- +Number of requests + +type: long + +-- + +*`openai.usage.dalle.image_size`*:: ++ +-- +Size of generated images + +type: keyword + +-- + +*`openai.usage.dalle.operation`*:: ++ +-- +Operation type + +type: keyword + +-- + +*`openai.usage.dalle.user_id`*:: ++ +-- +User identifier + +type: keyword + +-- + +*`openai.usage.dalle.model_id`*:: ++ +-- +Model identifier + +type: keyword + +-- + +[float] +=== whisper + +Whisper API usage metrics + + + +*`openai.usage.whisper.model_id`*:: ++ +-- +Model identifier + +type: keyword + +-- + +*`openai.usage.whisper.num_seconds`*:: ++ +-- +Number of seconds processed + +type: long + +-- + +*`openai.usage.whisper.requests_total`*:: ++ +-- +Number of requests + +type: long + +-- + +*`openai.usage.whisper.user_id`*:: ++ +-- +User identifier + +type: keyword + +-- + +[float] +=== tts + +Text-to-Speech API usage metrics + + + +*`openai.usage.tts.model_id`*:: ++ +-- +Model identifier + +type: keyword + +-- + +*`openai.usage.tts.num_characters`*:: ++ +-- +Number of characters processed + +type: long + +-- + +*`openai.usage.tts.requests_total`*:: ++ +-- +Number of requests + +type: long + +-- + +*`openai.usage.tts.user_id`*:: ++ +-- +User identifier + +type: keyword + +-- + +[float] +=== ft_data + +Fine-tuning data metrics + + + +*`openai.usage.ft_data.original`*:: ++ +-- +Raw fine-tuning data + +type: object + +-- + +[float] +=== assistant_code_interpreter + +Assistant Code Interpreter usage metrics + + + +*`openai.usage.assistant_code_interpreter.original`*:: ++ +-- +Raw assistant code interpreter data + +type: object + +-- + +[float] +=== retrieval_storage + +Retrieval storage usage metrics + + + +*`openai.usage.retrieval_storage.original`*:: ++ +-- +Raw retrieval storage data + +type: object + +-- + [[exported-fields-openmetrics]] == Openmetrics fields diff --git a/metricbeat/docs/modules/openai.asciidoc b/metricbeat/docs/modules/openai.asciidoc new file mode 100644 index 00000000000..ce5fd3e0ccc --- /dev/null +++ b/metricbeat/docs/modules/openai.asciidoc @@ -0,0 +1,75 @@ +//// +This file is generated! See scripts/mage/docs_collector.go +//// + +:modulename: openai +:edit_url: https://github.com/elastic/beats/edit/main/x-pack/metricbeat/module/openai/_meta/docs.asciidoc + + +[[metricbeat-module-openai]] +[role="xpack"] +== openai module + +beta[] + +This is the openai module. + + + +:edit_url: + +[float] +=== Example configuration + +The openai module supports the standard configuration options that are described +in <>. Here is an example configuration: + +[source,yaml] +---- +metricbeat.modules: +- module: openai + metricsets: ["usage"] + enabled: false + period: 1h + + # # Project API Keys - Multiple API keys can be specified for different projects + # api_keys: + # - key: "api_key1" + # - key: "api_key2" + + # # API Configuration + # ## Base URL for the OpenAI usage API endpoint + # api_url: "https://api.openai.com/v1/usage" + # ## Custom headers to be included in API requests + # headers: + # - "k1: v1" + # - "k2: v2" + ## Rate Limiting Configuration + # rate_limit: + # limit: 12 # seconds between requests + # burst: 1 # max concurrent requests + # ## Request Timeout Duration + # timeout: 30s + + # # Data Collection Configuration + # collection: + # ## Number of days to look back when collecting usage data + # lookback_days: 30 + # ## Whether to collect usage data in realtime. Defaults to false as how + # # OpenAI usage data is collected will end up adding duplicate data to ES + # # and also making it harder to do analytics. Best approach is to avoid + # # realtime collection and collect only upto last day (in UTC). So, there's + # # at most 24h delay. + # realtime: false +---- + +[float] +=== Metricsets + +The following metricsets are available: + +* <> + +include::openai/usage.asciidoc[] + +:edit_url!: diff --git a/metricbeat/docs/modules/openai/usage.asciidoc b/metricbeat/docs/modules/openai/usage.asciidoc new file mode 100644 index 00000000000..69d0ba313d9 --- /dev/null +++ b/metricbeat/docs/modules/openai/usage.asciidoc @@ -0,0 +1,29 @@ +//// +This file is generated! See scripts/mage/docs_collector.go +//// +:edit_url: https://github.com/elastic/beats/edit/main/x-pack/metricbeat/module/openai/usage/_meta/docs.asciidoc + + +[[metricbeat-metricset-openai-usage]] +[role="xpack"] +=== openai usage metricset + +beta[] + +include::../../../../x-pack/metricbeat/module/openai/usage/_meta/docs.asciidoc[] + + +:edit_url: + +==== Fields + +For a description of each field in the metricset, see the +<> section. + +Here is an example document generated by this metricset: + +[source,json] +---- +include::../../../../x-pack/metricbeat/module/openai/usage/_meta/data.json[] +---- +:edit_url!: \ No newline at end of file diff --git a/metricbeat/docs/modules/system.asciidoc b/metricbeat/docs/modules/system.asciidoc index 2fc3930d844..164abe5c91e 100644 --- a/metricbeat/docs/modules/system.asciidoc +++ b/metricbeat/docs/modules/system.asciidoc @@ -265,6 +265,11 @@ metricbeat.modules: # Filter systemd services based on a name pattern #service.pattern_filter: ["ssh*", "nfs*"] + + # This option enables the use of performance counters to collect data for cpu/core metricset. + # Only effective for Windows. + # You should use this option if running beats on machins with more than 64 cores. + #use_performance_counters: true ---- [float] diff --git a/metricbeat/docs/modules_list.asciidoc b/metricbeat/docs/modules_list.asciidoc index abf04650f2c..ae81c25e5fc 100644 --- a/metricbeat/docs/modules_list.asciidoc +++ b/metricbeat/docs/modules_list.asciidoc @@ -242,6 +242,8 @@ This file is generated! See scripts/mage/docs_collector.go |<> |<> |image:./images/icon-yes.png[Prebuilt dashboards are available] | .1+| .1+| |<> +|<> beta[] |image:./images/icon-no.png[No prebuilt dashboards] | +.1+| .1+| |<> beta[] |<> beta[] |image:./images/icon-no.png[No prebuilt dashboards] | .1+| .1+| |<> beta[] |<> |image:./images/icon-yes.png[Prebuilt dashboards are available] | @@ -384,6 +386,7 @@ include::modules/munin.asciidoc[] include::modules/mysql.asciidoc[] include::modules/nats.asciidoc[] include::modules/nginx.asciidoc[] +include::modules/openai.asciidoc[] include::modules/openmetrics.asciidoc[] include::modules/oracle.asciidoc[] include::modules/panw.asciidoc[] diff --git a/metricbeat/metricbeat.reference.yml b/metricbeat/metricbeat.reference.yml index f1412be4b6f..6cb4352b87d 100644 --- a/metricbeat/metricbeat.reference.yml +++ b/metricbeat/metricbeat.reference.yml @@ -142,6 +142,11 @@ metricbeat.modules: # Filter systemd services based on a name pattern #service.pattern_filter: ["ssh*", "nfs*"] + # This option enables the use of performance counters to collect data for cpu/core metricset. + # Only effective for Windows. + # You should use this option if running beats on machins with more than 64 cores. + #use_performance_counters: true + #------------------------------ Aerospike Module ------------------------------ - module: aerospike metricsets: ["namespace"] diff --git a/metricbeat/module/system/_meta/config.reference.yml b/metricbeat/module/system/_meta/config.reference.yml index 974df87cb0b..777a6444eca 100644 --- a/metricbeat/module/system/_meta/config.reference.yml +++ b/metricbeat/module/system/_meta/config.reference.yml @@ -81,3 +81,8 @@ # Filter systemd services based on a name pattern #service.pattern_filter: ["ssh*", "nfs*"] + + # This option enables the use of performance counters to collect data for cpu/core metricset. + # Only effective for Windows. + # You should use this option if running beats on machins with more than 64 cores. + #use_performance_counters: true diff --git a/metricbeat/module/system/core/_meta/docs.asciidoc b/metricbeat/module/system/core/_meta/docs.asciidoc index e70e55f0db7..751c4759a9c 100644 --- a/metricbeat/module/system/core/_meta/docs.asciidoc +++ b/metricbeat/module/system/core/_meta/docs.asciidoc @@ -14,6 +14,10 @@ This metricset is available on: *`core.metrics`*:: This option controls what metrics are reported for each CPU core. The value is a list and two metric types are supported - `percentages` and `ticks`. The default value is `core.metrics: [percentages]`. +*`use_performance_counters`*:: This option enables the use of performance counters to +collect data for the CPU/core metricset. It is only effective on Windows. +You should use this option if running beats on machins with more than 64 cores. +The default value is `use_performance_counters: true` + [source,yaml] ---- @@ -21,4 +25,5 @@ metricbeat.modules: - module: system metricsets: [core] core.metrics: [percentages, ticks] + #use_performance_counters: true ---- diff --git a/metricbeat/module/system/core/config.go b/metricbeat/module/system/core/config.go index 8ac1d2f9575..940a2328216 100644 --- a/metricbeat/module/system/core/config.go +++ b/metricbeat/module/system/core/config.go @@ -34,8 +34,9 @@ const ( // Config for the system core metricset. type Config struct { - Metrics []string `config:"core.metrics"` - CPUTicks *bool `config:"cpu_ticks"` // Deprecated. + Metrics []string `config:"core.metrics"` + CPUTicks *bool `config:"cpu_ticks"` // Deprecated. + UserPerformanceCounters bool `config:"use_performance_counters"` } // Validate validates the core config. @@ -65,5 +66,6 @@ func (c Config) Validate() (metrics.MetricOpts, error) { } var defaultConfig = Config{ - Metrics: []string{percentages}, + Metrics: []string{percentages}, + UserPerformanceCounters: true, } diff --git a/metricbeat/module/system/core/core.go b/metricbeat/module/system/core/core.go index 1bf2f3f3a3d..fc5e7b9e394 100644 --- a/metricbeat/module/system/core/core.go +++ b/metricbeat/module/system/core/core.go @@ -41,6 +41,7 @@ type MetricSet struct { mb.BaseMetricSet opts metrics.MetricOpts cores *metrics.Monitor + sys resolve.Resolver } // New returns a new core MetricSet. @@ -58,11 +59,25 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { if config.CPUTicks != nil && *config.CPUTicks { config.Metrics = append(config.Metrics, "ticks") } - sys := base.Module().(resolve.Resolver) + sys, ok := base.Module().(resolve.Resolver) + if !ok { + return nil, fmt.Errorf("unexpected module type: %T", base.Module()) + } + + cpuOpts := make([]metrics.OptionFunc, 0) + if config.UserPerformanceCounters { + cpuOpts = append(cpuOpts, metrics.WithWindowsPerformanceCounter()) + } + cpu, err := metrics.New(sys, cpuOpts...) + if err != nil { + return nil, fmt.Errorf("error initializing system.cpu metricset: %w", err) + } + return &MetricSet{ BaseMetricSet: base, opts: opts, - cores: metrics.New(sys), + cores: cpu, + sys: sys, }, nil } @@ -109,6 +124,5 @@ func (m *MetricSet) Diagnostics() []diagnostics.DiagnosticSetup { } func (m *MetricSet) getDiagData() []byte { - sys := m.BaseMetricSet.Module().(resolve.Resolver) - return diagnostics.GetRawFileOrErrorString(sys, "/proc/stat") + return diagnostics.GetRawFileOrErrorString(m.sys, "/proc/stat") } diff --git a/metricbeat/module/system/cpu/_meta/docs.asciidoc b/metricbeat/module/system/cpu/_meta/docs.asciidoc index fb83c41e500..b0d9da58459 100644 --- a/metricbeat/module/system/cpu/_meta/docs.asciidoc +++ b/metricbeat/module/system/cpu/_meta/docs.asciidoc @@ -15,6 +15,10 @@ This metricset is available on: is a list and three metric types are supported - `percentages`, `normalized_percentages`, and `ticks`. The default value is `cpu.metrics: [percentages]`. +*`use_performance_counters`*:: This option enables the use of performance counters to +collect data for the CPU/core metricset. It is only effective on Windows. +You should use this option if running beats on machins with more than 64 cores. +The default value is `use_performance_counters: true` + [source,yaml] ---- @@ -22,4 +26,5 @@ metricbeat.modules: - module: system metricsets: [cpu] cpu.metrics: [percentages, normalized_percentages, ticks] + #use_performance_counters: true ---- diff --git a/metricbeat/module/system/cpu/config.go b/metricbeat/module/system/cpu/config.go index ef9d78fe0ce..7cfffed57a5 100644 --- a/metricbeat/module/system/cpu/config.go +++ b/metricbeat/module/system/cpu/config.go @@ -35,8 +35,9 @@ const ( // Config for the system cpu metricset. type Config struct { - Metrics []string `config:"cpu.metrics"` - CPUTicks *bool `config:"cpu_ticks"` // Deprecated. + Metrics []string `config:"cpu.metrics"` + CPUTicks *bool `config:"cpu_ticks"` // Deprecated. + UserPerformanceCounters bool `config:"use_performance_counters"` } // Validate validates the cpu config. @@ -69,5 +70,6 @@ func (c Config) Validate() (metrics.MetricOpts, error) { } var defaultConfig = Config{ - Metrics: []string{percentages, normalizedPercentages}, + Metrics: []string{percentages, normalizedPercentages}, + UserPerformanceCounters: true, } diff --git a/metricbeat/module/system/cpu/cpu.go b/metricbeat/module/system/cpu/cpu.go index 8eb06c2427b..ace37b809e8 100644 --- a/metricbeat/module/system/cpu/cpu.go +++ b/metricbeat/module/system/cpu/cpu.go @@ -44,6 +44,7 @@ type MetricSet struct { mb.BaseMetricSet opts metrics.MetricOpts cpu *metrics.Monitor + sys resolve.Resolver } // New is a mb.MetricSetFactory that returns a cpu.MetricSet. @@ -61,11 +62,25 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { if config.CPUTicks != nil && *config.CPUTicks { config.Metrics = append(config.Metrics, "ticks") } - sys := base.Module().(resolve.Resolver) + sys, ok := base.Module().(resolve.Resolver) + if !ok { + return nil, fmt.Errorf("unexpected module type: %T", base.Module()) + } + + cpuOpts := make([]metrics.OptionFunc, 0) + if config.UserPerformanceCounters { + cpuOpts = append(cpuOpts, metrics.WithWindowsPerformanceCounter()) + } + cpu, err := metrics.New(sys, cpuOpts...) + + if err != nil { + return nil, fmt.Errorf("error initializing system.cpu metricset: %w", err) + } return &MetricSet{ BaseMetricSet: base, opts: opts, - cpu: metrics.New(sys), + cpu: cpu, + sys: sys, }, nil } @@ -125,13 +140,11 @@ func (m *MetricSet) Diagnostics() []diagnostics.DiagnosticSetup { } func (m *MetricSet) fetchRawCPU() []byte { - sys := m.BaseMetricSet.Module().(resolve.Resolver) - return diagnostics.GetRawFileOrErrorString(sys, "/proc/stat") + return diagnostics.GetRawFileOrErrorString(m.sys, "/proc/stat") } func (m *MetricSet) fetchCPUInfo() []byte { - sys := m.BaseMetricSet.Module().(resolve.Resolver) - return diagnostics.GetRawFileOrErrorString(sys, "/proc/cpuinfo") + return diagnostics.GetRawFileOrErrorString(m.sys, "/proc/cpuinfo") } // copyFieldsOrDefault copies the field specified by key to the given map. It will diff --git a/x-pack/agentbeat/agentbeat.spec.yml b/x-pack/agentbeat/agentbeat.spec.yml index a2af5ce6bf8..5b6c33e4ceb 100644 --- a/x-pack/agentbeat/agentbeat.spec.yml +++ b/x-pack/agentbeat/agentbeat.spec.yml @@ -507,6 +507,11 @@ inputs: platforms: *platforms outputs: *outputs command: *metricbeat_command + - name: openai/metrics + description: "OpenAI metrics" + platforms: *platforms + outputs: *outputs + command: *metricbeat_command - name: panw/metrics description: "Palo Alto Networks metrics" platforms: *platforms diff --git a/x-pack/filebeat/docs/inputs/input-streaming.asciidoc b/x-pack/filebeat/docs/inputs/input-streaming.asciidoc index 9a6f67e5bc4..7f07fb4954f 100644 --- a/x-pack/filebeat/docs/inputs/input-streaming.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-streaming.asciidoc @@ -337,17 +337,17 @@ filebeat.inputs: [float] ==== `retry.max_attempts` -The maximum number of times the input should attempt to reconnect to the streaming data source in the event of a connection failure. The default value is `nil` which means no retries will be attempted. +The maximum number of times the input should attempt to reconnect to the streaming data source in the event of a connection failure. The default value is `5` which means a maximum of 5 retries will be attempted. [float] ==== `retry.wait_min` -The minimum time to wait between retries. This ensures that retries are spaced out enough to give the system time to recover or resolve transient issues, rather than bombarding the system with rapid retries. For example, `wait_min` might be set to 1 second, meaning that even if the calculated backoff is less than this, the client will wait at least 1 second before retrying. +The minimum time to wait between retries. This ensures that retries are spaced out enough to give the system time to recover or resolve transient issues, rather than bombarding the system with rapid retries. For example, `wait_min` might be set to 1 second, meaning that even if the calculated backoff is less than this, the client will wait at least 1 second before retrying. The default value is `1` second. [float] ==== `retry.wait_max` -The maximum time to wait between retries. This prevents the retry mechanism from becoming too slow, ensuring that the client does not wait indefinitely between retries. This is crucial in systems where timeouts or user experience are critical. For example, `wait_max` might be set to 10 seconds, meaning that even if the calculated backoff is greater than this, the client will wait at most 10 seconds before retrying. +The maximum time to wait between retries. This prevents the retry mechanism from becoming too slow, ensuring that the client does not wait indefinitely between retries. This is crucial in systems where timeouts or user experience are critical. For example, `wait_max` might be set to 10 seconds, meaning that even if the calculated backoff is greater than this, the client will wait at most 10 seconds before retrying. The default value is `30` seconds. [float] === `timeout` diff --git a/x-pack/filebeat/input/streaming/config.go b/x-pack/filebeat/input/streaming/config.go index 6ccaf0c7349..eea8c2afc70 100644 --- a/x-pack/filebeat/input/streaming/config.go +++ b/x-pack/filebeat/input/streaming/config.go @@ -171,5 +171,10 @@ func defaultConfig() config { Transport: httpcommon.HTTPTransportSettings{ Timeout: 180 * time.Second, }, + Retry: &retry{ + MaxAttempts: 5, + WaitMin: 1 * time.Second, + WaitMax: 30 * time.Second, + }, } } diff --git a/x-pack/filebeat/input/streaming/websocket.go b/x-pack/filebeat/input/streaming/websocket.go index 8a78757f0e9..8a8cf76fe36 100644 --- a/x-pack/filebeat/input/streaming/websocket.go +++ b/x-pack/filebeat/input/streaming/websocket.go @@ -261,6 +261,10 @@ func calculateWaitTime(waitMin, waitMax time.Duration, attempt int) time.Duratio jitter := rand.Float64() * maxJitter waitTime := time.Duration(backoff + jitter) + // caps the wait time to the maximum wait time + if waitTime > waitMax { + waitTime = waitMax + } return waitTime } diff --git a/x-pack/metricbeat/include/list.go b/x-pack/metricbeat/include/list.go index b9b426cf0ad..af3b1e9425d 100644 --- a/x-pack/metricbeat/include/list.go +++ b/x-pack/metricbeat/include/list.go @@ -55,6 +55,8 @@ import ( _ "github.com/elastic/beats/v7/x-pack/metricbeat/module/mssql" _ "github.com/elastic/beats/v7/x-pack/metricbeat/module/mssql/performance" _ "github.com/elastic/beats/v7/x-pack/metricbeat/module/mssql/transaction_log" + _ "github.com/elastic/beats/v7/x-pack/metricbeat/module/openai" + _ "github.com/elastic/beats/v7/x-pack/metricbeat/module/openai/usage" _ "github.com/elastic/beats/v7/x-pack/metricbeat/module/oracle" _ "github.com/elastic/beats/v7/x-pack/metricbeat/module/oracle/performance" _ "github.com/elastic/beats/v7/x-pack/metricbeat/module/oracle/sysmetric" diff --git a/x-pack/metricbeat/metricbeat.reference.yml b/x-pack/metricbeat/metricbeat.reference.yml index 8c4250caece..1f1f410ce5a 100644 --- a/x-pack/metricbeat/metricbeat.reference.yml +++ b/x-pack/metricbeat/metricbeat.reference.yml @@ -142,6 +142,11 @@ metricbeat.modules: # Filter systemd services based on a name pattern #service.pattern_filter: ["ssh*", "nfs*"] + # This option enables the use of performance counters to collect data for cpu/core metricset. + # Only effective for Windows. + # You should use this option if running beats on machins with more than 64 cores. + #use_performance_counters: true + #------------------------------- ActiveMQ Module ------------------------------- - module: activemq metricsets: ['broker', 'queue', 'topic'] @@ -1279,6 +1284,42 @@ metricbeat.modules: # Path to server status. Default nginx_status server_status_path: "nginx_status" +#-------------------------------- Openai Module -------------------------------- +- module: openai + metricsets: ["usage"] + enabled: false + period: 1h + + # # Project API Keys - Multiple API keys can be specified for different projects + # api_keys: + # - key: "api_key1" + # - key: "api_key2" + + # # API Configuration + # ## Base URL for the OpenAI usage API endpoint + # api_url: "https://api.openai.com/v1/usage" + # ## Custom headers to be included in API requests + # headers: + # - "k1: v1" + # - "k2: v2" + ## Rate Limiting Configuration + # rate_limit: + # limit: 12 # seconds between requests + # burst: 1 # max concurrent requests + # ## Request Timeout Duration + # timeout: 30s + + # # Data Collection Configuration + # collection: + # ## Number of days to look back when collecting usage data + # lookback_days: 30 + # ## Whether to collect usage data in realtime. Defaults to false as how + # # OpenAI usage data is collected will end up adding duplicate data to ES + # # and also making it harder to do analytics. Best approach is to avoid + # # realtime collection and collect only upto last day (in UTC). So, there's + # # at most 24h delay. + # realtime: false + #----------------------------- Openmetrics Module ----------------------------- - module: openmetrics metricsets: ['collector'] diff --git a/x-pack/metricbeat/module/openai/_meta/config.yml b/x-pack/metricbeat/module/openai/_meta/config.yml new file mode 100644 index 00000000000..a34fd7b183d --- /dev/null +++ b/x-pack/metricbeat/module/openai/_meta/config.yml @@ -0,0 +1,34 @@ +- module: openai + metricsets: ["usage"] + enabled: false + period: 1h + + # # Project API Keys - Multiple API keys can be specified for different projects + # api_keys: + # - key: "api_key1" + # - key: "api_key2" + + # # API Configuration + # ## Base URL for the OpenAI usage API endpoint + # api_url: "https://api.openai.com/v1/usage" + # ## Custom headers to be included in API requests + # headers: + # - "k1: v1" + # - "k2: v2" + ## Rate Limiting Configuration + # rate_limit: + # limit: 12 # seconds between requests + # burst: 1 # max concurrent requests + # ## Request Timeout Duration + # timeout: 30s + + # # Data Collection Configuration + # collection: + # ## Number of days to look back when collecting usage data + # lookback_days: 30 + # ## Whether to collect usage data in realtime. Defaults to false as how + # # OpenAI usage data is collected will end up adding duplicate data to ES + # # and also making it harder to do analytics. Best approach is to avoid + # # realtime collection and collect only upto last day (in UTC). So, there's + # # at most 24h delay. + # realtime: false diff --git a/x-pack/metricbeat/module/openai/_meta/docs.asciidoc b/x-pack/metricbeat/module/openai/_meta/docs.asciidoc new file mode 100644 index 00000000000..744909c7a1a --- /dev/null +++ b/x-pack/metricbeat/module/openai/_meta/docs.asciidoc @@ -0,0 +1,2 @@ +This is the openai module. + diff --git a/x-pack/metricbeat/module/openai/_meta/fields.yml b/x-pack/metricbeat/module/openai/_meta/fields.yml new file mode 100644 index 00000000000..d514eb010f1 --- /dev/null +++ b/x-pack/metricbeat/module/openai/_meta/fields.yml @@ -0,0 +1,10 @@ +- key: openai + title: "openai" + release: beta + description: > + openai module + fields: + - name: openai + type: group + description: > + fields: diff --git a/x-pack/metricbeat/module/openai/doc.go b/x-pack/metricbeat/module/openai/doc.go new file mode 100644 index 00000000000..5f2f07fb0bc --- /dev/null +++ b/x-pack/metricbeat/module/openai/doc.go @@ -0,0 +1,6 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +// Package openai is a Metricbeat module that contains MetricSets. +package openai diff --git a/x-pack/metricbeat/module/openai/fields.go b/x-pack/metricbeat/module/openai/fields.go new file mode 100644 index 00000000000..de875dbdaab --- /dev/null +++ b/x-pack/metricbeat/module/openai/fields.go @@ -0,0 +1,23 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +// Code generated by beats/dev-tools/cmd/asset/asset.go - DO NOT EDIT. + +package openai + +import ( + "github.com/elastic/beats/v7/libbeat/asset" +) + +func init() { + if err := asset.SetFields("metricbeat", "openai", asset.ModuleFieldsPri, AssetOpenai); err != nil { + panic(err) + } +} + +// AssetOpenai returns asset data. +// This is the base64 encoded zlib format compressed contents of module/openai. +func AssetOpenai() string { + return "eJzsWMtu4zgQvPsrGtnL7sHYuw8BjOwDAbKbIMlgjgIttmWOKVJDtuI4Xz+gJNp6UHZsyRMEGB5FsqpZrK6YmcIatzPQGSomJgAkSOIMrsoPVxMAgxKZxRkskNgEgKONjchIaDWD6wkAVLsh1TyXOAFYCpTczoq5KSiWYo3BDdpmOIPE6DyrvgRQmzh1rNyyBHdfQ3ButAv3I0hVjvsM1fwW5g+3JQekSEbEFpjiYImRsCRiW9vTLtGN3+BGp6lWsGAWqxXACIzWBBJfUDaW7xQyCVPijbnCIsEba/wZ17jdaNOea5zovgYDgqMisRRojlO6T6OQdoA8HctEtMbtgMO5m1nj9ti5PNGQI3mqo6cxyFlMePaZHqv9nvEgmUM9l+h5myHo5UGezOhvGNOAK3ooEY5dkScackWeqoHRbcVMolv/582KUdXXnDUiYV9WZ6IvXTrFXLcmAf5FhYbJGqXPk9bSUIjUizL4PUdLNiJNTHaW+RKlVklgslHl/3m6QONc4EEhZbx9A43YNkVX97KG76lDfO+BIGDhPZ9VLLMrHTDgyYxPFVSfE+u0sVaErxSRXqMaQ+dnhwBqp3ZFACUB5LaTF/tiksI3hPxy5ewoqoL6hWHxCnl0eX0KnpZMvVVhykQ//zv98cWi6UFqNV4odU+m8+lbYfbl1V/zu7vp3yBSlxnVNbmuCSXHPrSkDIfoGalVFdD5DXRiZqk8jYpDdG/xvLwqwfbO/Zi07GUtyouseBtslCfxhs0e7dHxozI6t2hGyOei/96RzanmKEfg+8/hhAhbHfh1JWyGBmyGGK+mpKdFJB1qwE25ZawW9BUM7cGLK9cmdE1vMdaKj9X1FZr7uRijPfRX82Pa/sK90LLmM76SM+RTYc2DlqRO0WfbscX6KV0Zr5hxrywzljH3gL+8WXhzzrlwG5gsnzqOw8Lvhm3AkjYswT+CRl1SNOaj6x+hcEq5EioZ8uTSRiRCHbgrvXBPz8B0ORGdJPEj28CyVXhY593/A6wVlpiiKNYcI6EITWaQxvsjNPcMcKM5wu2eYVD3/3xhd1KBkwpqUr1DZ+MOiS9MRpWJx5L30QP77vhkqppO/YWYPwIAAP//Z/rS7Q==" +} diff --git a/x-pack/metricbeat/module/openai/usage/_meta/data.json b/x-pack/metricbeat/module/openai/usage/_meta/data.json new file mode 100644 index 00000000000..da78c73e036 --- /dev/null +++ b/x-pack/metricbeat/module/openai/usage/_meta/data.json @@ -0,0 +1,37 @@ +{ + "@timestamp": "2017-10-12T08:05:34.853Z", + "event": { + "dataset": "openai.usage", + "duration": 115000, + "module": "openai" + }, + "metricset": { + "name": "usage", + "period": 10000 + }, + "openai": { + "usage": { + "api_key_id": null, + "api_key_name": null, + "api_key_redacted": null, + "api_key_type": null, + "data": { + "cached_context_tokens_total": 0, + "context_tokens_total": 118, + "email": null, + "generated_tokens_total": 35, + "operation": "completion-realtime", + "request_type": "", + "requests_total": 1, + "snapshot_id": "gpt-4o-realtime-preview-2024-10-01" + }, + "organization_id": "org-dummy", + "organization_name": "Personal", + "project_id": null, + "project_name": null + } + }, + "service": { + "type": "openai" + } +} \ No newline at end of file diff --git a/x-pack/metricbeat/module/openai/usage/_meta/docs.asciidoc b/x-pack/metricbeat/module/openai/usage/_meta/docs.asciidoc new file mode 100644 index 00000000000..dc88baf82a0 --- /dev/null +++ b/x-pack/metricbeat/module/openai/usage/_meta/docs.asciidoc @@ -0,0 +1 @@ +This is the usage metricset of the module openai. diff --git a/x-pack/metricbeat/module/openai/usage/_meta/fields.yml b/x-pack/metricbeat/module/openai/usage/_meta/fields.yml new file mode 100644 index 00000000000..c25fe30c17b --- /dev/null +++ b/x-pack/metricbeat/module/openai/usage/_meta/fields.yml @@ -0,0 +1,156 @@ +- name: usage + type: group + release: beta + description: > + OpenAI API usage metrics and statistics + fields: + # Common base fields at root level + - name: organization_id + type: keyword + description: Organization identifier + - name: organization_name + type: keyword + description: Organization name + - name: api_key_id + type: keyword + description: API key identifier + - name: api_key_name + type: keyword + description: API key name + - name: api_key_redacted + type: keyword + description: Redacted API key + - name: api_key_type + type: keyword + description: Type of API key + - name: project_id + type: keyword + description: Project identifier + - name: project_name + type: keyword + description: Project name + + # Completion/Chat usage data + - name: data + type: group + description: > + General usage data metrics + fields: + - name: requests_total + type: long + description: Number of requests made + - name: operation + type: keyword + description: Operation type + - name: snapshot_id + type: keyword + description: Snapshot identifier + - name: context_tokens_total + type: long + description: Total number of context tokens used + - name: generated_tokens_total + type: long + description: Total number of generated tokens + - name: cached_context_tokens_total + type: long + description: Total number of cached context tokens + - name: email + type: keyword + description: User email + - name: request_type + type: keyword + description: Type of request + + # DALL-E image generation metrics + - name: dalle + type: group + description: > + DALL-E API usage metrics + fields: + - name: num_images + type: long + description: Number of images generated + - name: requests_total + type: long + description: Number of requests + - name: image_size + type: keyword + description: Size of generated images + - name: operation + type: keyword + description: Operation type + - name: user_id + type: keyword + description: User identifier + - name: model_id + type: keyword + description: Model identifier + + # Whisper speech-to-text metrics + - name: whisper + type: group + description: > + Whisper API usage metrics + fields: + - name: model_id + type: keyword + description: Model identifier + - name: num_seconds + type: long + description: Number of seconds processed + - name: requests_total + type: long + description: Number of requests + - name: user_id + type: keyword + description: User identifier + + # Text-to-Speech metrics + - name: tts + type: group + description: > + Text-to-Speech API usage metrics + fields: + - name: model_id + type: keyword + description: Model identifier + - name: num_characters + type: long + description: Number of characters processed + - name: requests_total + type: long + description: Number of requests + - name: user_id + type: keyword + description: User identifier + + # Additional data types (raw storage) + - name: ft_data + type: group + description: > + Fine-tuning data metrics + fields: + - name: original + type: object + object_type: keyword + description: Raw fine-tuning data + + - name: assistant_code_interpreter + type: group + description: > + Assistant Code Interpreter usage metrics + fields: + - name: original + type: object + object_type: keyword + description: Raw assistant code interpreter data + + - name: retrieval_storage + type: group + description: > + Retrieval storage usage metrics + fields: + - name: original + type: object + object_type: keyword + description: Raw retrieval storage data diff --git a/x-pack/metricbeat/module/openai/usage/client.go b/x-pack/metricbeat/module/openai/usage/client.go new file mode 100644 index 00000000000..3995386c30e --- /dev/null +++ b/x-pack/metricbeat/module/openai/usage/client.go @@ -0,0 +1,71 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package usage + +import ( + "fmt" + "net/http" + "time" + + "golang.org/x/time/rate" + + "github.com/elastic/elastic-agent-libs/logp" +) + +// RLHTTPClient wraps the standard http.Client with a rate limiter to control API request frequency. +type RLHTTPClient struct { + client *http.Client + logger *logp.Logger + Ratelimiter *rate.Limiter +} + +// Do executes an HTTP request while respecting rate limits. +// It waits for rate limit token before proceeding with the request. +// Returns the HTTP response and any error encountered. +func (c *RLHTTPClient) Do(req *http.Request) (*http.Response, error) { + start := time.Now() + + c.logger.Debug("Waiting for rate limit token") + + err := c.Ratelimiter.Wait(req.Context()) + if err != nil { + return nil, fmt.Errorf("failed to acquire rate limit token: %w", err) + } + + c.logger.Debug("Rate limit token acquired") + + waitDuration := time.Since(start) + + if waitDuration > time.Minute { + c.logger.Infof("Rate limit wait exceeded threshold: %v", waitDuration) + } + + resp, err := c.client.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to execute request: %w", err) + } + + return resp, nil +} + +// newClient creates a new rate-limited HTTP client with specified rate limiter and timeout. +func newClient(logger *logp.Logger, rl *rate.Limiter, timeout time.Duration) *RLHTTPClient { + transport := &http.Transport{ + MaxIdleConns: 100, + MaxIdleConnsPerHost: 100, + IdleConnTimeout: 90 * time.Second, + } + + client := &http.Client{ + Timeout: timeout, + Transport: transport, + } + + return &RLHTTPClient{ + client: client, + logger: logger, + Ratelimiter: rl, + } +} diff --git a/x-pack/metricbeat/module/openai/usage/config.go b/x-pack/metricbeat/module/openai/usage/config.go new file mode 100644 index 00000000000..f930a013747 --- /dev/null +++ b/x-pack/metricbeat/module/openai/usage/config.go @@ -0,0 +1,94 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package usage + +import ( + "errors" + "fmt" + "net/url" + "time" +) + +type Config struct { + APIKeys []apiKeyConfig `config:"api_keys" validate:"required"` + APIURL string `config:"api_url" validate:"required"` + Headers []string `config:"headers"` + RateLimit *rateLimitConfig `config:"rate_limit"` + Timeout time.Duration `config:"timeout" validate:"required"` + Collection collectionConfig `config:"collection"` +} + +type rateLimitConfig struct { + Limit *int `config:"limit" validate:"required"` + Burst *int `config:"burst" validate:"required"` +} + +type apiKeyConfig struct { + Key string `config:"key" validate:"required"` +} + +type collectionConfig struct { + LookbackDays int `config:"lookback_days"` + Realtime bool `config:"realtime"` +} + +func defaultConfig() Config { + return Config{ + APIURL: "https://api.openai.com/v1/usage", + Timeout: 30 * time.Second, + RateLimit: &rateLimitConfig{ + Limit: ptr(12), + Burst: ptr(1), + }, + Collection: collectionConfig{ + LookbackDays: 0, // 0 days + Realtime: false, // avoid realtime collection by default + }, + } +} + +func (c *Config) Validate() error { + var errs []error + + if len(c.APIKeys) == 0 { + errs = append(errs, errors.New("at least one API key must be configured")) + } + if c.APIURL == "" { + errs = append(errs, errors.New("api_url cannot be empty")) + } else { + _, err := url.ParseRequestURI(c.APIURL) + if err != nil { + errs = append(errs, fmt.Errorf("invalid api_url format: %w", err)) + } + } + if c.RateLimit == nil { + errs = append(errs, errors.New("rate_limit must be configured")) + } else { + if c.RateLimit.Limit == nil { + errs = append(errs, errors.New("rate_limit.limit must be configured")) + } + if c.RateLimit.Burst == nil { + errs = append(errs, errors.New("rate_limit.burst must be configured")) + } + } + if c.Timeout <= 0 { + errs = append(errs, errors.New("timeout must be greater than 0")) + } + if c.Collection.LookbackDays < 0 { + errs = append(errs, errors.New("lookback_days must be >= 0")) + } + + for i, apiKey := range c.APIKeys { + if apiKey.Key == "" { + errs = append(errs, fmt.Errorf("API key at position %d cannot be empty", i)) + } + } + + if len(errs) > 0 { + return fmt.Errorf("validation failed: %w", errors.Join(errs...)) + } + + return nil +} diff --git a/x-pack/metricbeat/module/openai/usage/errors.go b/x-pack/metricbeat/module/openai/usage/errors.go new file mode 100644 index 00000000000..9b8b793767c --- /dev/null +++ b/x-pack/metricbeat/module/openai/usage/errors.go @@ -0,0 +1,10 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package usage + +import "errors" + +// ErrNoState indicates no previous state exists for the given API key +var ErrNoState = errors.New("no previous state found") diff --git a/x-pack/metricbeat/module/openai/usage/helper.go b/x-pack/metricbeat/module/openai/usage/helper.go new file mode 100644 index 00000000000..992c025df8c --- /dev/null +++ b/x-pack/metricbeat/module/openai/usage/helper.go @@ -0,0 +1,30 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package usage + +import "strings" + +// dateFormatForStateStore is used to parse and format dates in the YYYY-MM-DD format +const dateFormatForStateStore = "2006-01-02" + +func ptr[T any](value T) *T { + return &value +} + +func processHeaders(headers []string) map[string]string { + headersMap := make(map[string]string, len(headers)) + for _, header := range headers { + parts := strings.SplitN(header, ":", 2) + if len(parts) != 2 { + continue + } + k, v := strings.TrimSpace(parts[0]), strings.TrimSpace(parts[1]) + if k == "" || v == "" { + continue + } + headersMap[k] = v + } + return headersMap +} diff --git a/x-pack/metricbeat/module/openai/usage/persistcache.go b/x-pack/metricbeat/module/openai/usage/persistcache.go new file mode 100644 index 00000000000..51a8d6fb0f8 --- /dev/null +++ b/x-pack/metricbeat/module/openai/usage/persistcache.go @@ -0,0 +1,187 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package usage + +import ( + "crypto/sha256" + "encoding/hex" + "errors" + "fmt" + "os" + "path" + "strings" + "sync" + "time" +) + +// stateManager handles the storage and retrieval of state data with key hashing and caching capabilities +type stateManager struct { + mu sync.RWMutex + store *stateStore + keyPrefix string + hashCache sync.Map +} + +// stateStore handles persistence of key-value pairs using the filesystem +type stateStore struct { + Dir string // Base directory for storing state files + mu sync.RWMutex +} + +// newStateManager creates a new state manager instance with the given storage path +func newStateManager(storePath string) (*stateManager, error) { + if strings.TrimSpace(storePath) == "" { + return nil, errors.New("empty path provided") + } + + store, err := newStateStore(storePath) + if err != nil { + return nil, fmt.Errorf("create state store: %w", err) + } + + return &stateManager{ + mu: sync.RWMutex{}, + store: store, + keyPrefix: "state_", + hashCache: sync.Map{}, + }, nil +} + +// newStateStore creates a new state store instance at the specified path +func newStateStore(path string) (*stateStore, error) { + if err := os.MkdirAll(path, 0o755); err != nil { + return nil, fmt.Errorf("creating state directory: %w", err) + } + return &stateStore{ + Dir: path, + }, nil +} + +// getStatePath builds the full file path for a given state key +func (s *stateStore) getStatePath(name string) string { + return path.Join(s.Dir, name) +} + +// Put stores a value in a file named by the key +func (s *stateStore) Put(key, value string) error { + s.mu.Lock() + defer s.mu.Unlock() + + filePath := s.getStatePath(key) + + // In case the file already exists, file is truncated. + f, err := os.Create(filePath) + if err != nil { + return fmt.Errorf("creating state file: %w", err) + } + defer f.Close() + + _, err = f.WriteString(value) + if err != nil { + return fmt.Errorf("writing value to state file: %w", err) + } + + if err = f.Sync(); err != nil { + return fmt.Errorf("syncing state file: %w", err) + } + + return nil +} + +// Get retrieves the value stored in the file named by the key +func (s *stateStore) Get(key string) (string, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + filePath := s.getStatePath(key) + data, err := os.ReadFile(filePath) + if err != nil { + return "", fmt.Errorf("reading state file: %w", err) + } + return string(data), nil +} + +// Has checks if a state exists for the given key +func (s *stateStore) Has(key string) bool { + s.mu.RLock() + defer s.mu.RUnlock() + + filePath := s.getStatePath(key) + _, err := os.Stat(filePath) + return err == nil +} + +// Remove deletes the state file for the given key +func (s *stateStore) Remove(key string) error { + s.mu.Lock() + defer s.mu.Unlock() + + filePath := s.getStatePath(key) + if err := os.Remove(filePath); err != nil && !os.IsNotExist(err) { + return fmt.Errorf("removing state file: %w", err) + } + return nil +} + +// Clear removes all state files by deleting and recreating the state directory +func (s *stateStore) Clear() error { + s.mu.Lock() + defer s.mu.Unlock() + + if err := os.RemoveAll(s.Dir); err != nil { + return fmt.Errorf("clearing state directory: %w", err) + } + return os.MkdirAll(s.Dir, 0o755) +} + +// GetLastProcessedDate retrieves and parses the last processed date for a given API key +func (s *stateManager) GetLastProcessedDate(apiKey string) (time.Time, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + stateKey := s.GetStateKey(apiKey) + + if !s.store.Has(stateKey) { + return time.Time{}, ErrNoState + } + + dateStr, err := s.store.Get(stateKey) + if err != nil { + return time.Time{}, fmt.Errorf("get state: %w", err) + } + + return time.Parse(dateFormatForStateStore, dateStr) +} + +// SaveState saves the last processed date for a given API key +func (s *stateManager) SaveState(apiKey, dateStr string) error { + s.mu.Lock() + defer s.mu.Unlock() + + stateKey := s.GetStateKey(apiKey) + return s.store.Put(stateKey, dateStr) +} + +// hashKey generates and caches a SHA-256 hash of the provided API key +func (s *stateManager) hashKey(apiKey string) string { + // Check cache first to avoid recomputing hashes + if hashedKey, ok := s.hashCache.Load(apiKey); ok { + return hashedKey.(string) + } + + // Generate SHA-256 hash and hex encode for safe filename usage + hasher := sha256.New() + _, _ = hasher.Write([]byte(apiKey)) + hashedKey := hex.EncodeToString(hasher.Sum(nil)) + + // Cache the computed hash for future lookups + s.hashCache.Store(apiKey, hashedKey) + return hashedKey +} + +// GetStateKey generates a unique state key for a given API key +func (s *stateManager) GetStateKey(apiKey string) string { + return s.keyPrefix + s.hashKey(apiKey) +} diff --git a/x-pack/metricbeat/module/openai/usage/schema.go b/x-pack/metricbeat/module/openai/usage/schema.go new file mode 100644 index 00000000000..64f22393e59 --- /dev/null +++ b/x-pack/metricbeat/module/openai/usage/schema.go @@ -0,0 +1,69 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package usage + +type BaseData struct { + OrganizationID string `json:"organization_id"` + OrganizationName string `json:"organization_name"` + ApiKeyID *string `json:"api_key_id"` + ApiKeyName *string `json:"api_key_name"` + ApiKeyRedacted *string `json:"api_key_redacted"` + ApiKeyType *string `json:"api_key_type"` + ProjectID *string `json:"project_id"` + ProjectName *string `json:"project_name"` +} + +type UsageResponse struct { + Object string `json:"object"` + Data []UsageData `json:"data"` + FtData []interface{} `json:"ft_data"` + DalleApiData []DalleData `json:"dalle_api_data"` + WhisperApiData []WhisperData `json:"whisper_api_data"` + TtsApiData []TtsData `json:"tts_api_data"` + AssistantCodeInterpreterData []interface{} `json:"assistant_code_interpreter_data"` + RetrievalStorageData []interface{} `json:"retrieval_storage_data"` +} + +type UsageData struct { + BaseData + AggregationTimestamp int64 `json:"aggregation_timestamp"` + NRequests int `json:"n_requests"` + Operation string `json:"operation"` + SnapshotID string `json:"snapshot_id"` + NContextTokensTotal int `json:"n_context_tokens_total"` + NGeneratedTokensTotal int `json:"n_generated_tokens_total"` + Email *string `json:"email"` + RequestType string `json:"request_type"` + NCachedContextTokensTotal int `json:"n_cached_context_tokens_total"` +} + +type DalleData struct { + BaseData + Timestamp int64 `json:"timestamp"` + NumImages int `json:"num_images"` + NumRequests int `json:"num_requests"` + ImageSize string `json:"image_size"` + Operation string `json:"operation"` + ModelID string `json:"model_id"` + UserID string `json:"user_id"` +} + +type WhisperData struct { + BaseData + Timestamp int64 `json:"timestamp"` + ModelID string `json:"model_id"` + NumSeconds int `json:"num_seconds"` + NumRequests int `json:"num_requests"` + UserID string `json:"user_id"` +} + +type TtsData struct { + BaseData + Timestamp int64 `json:"timestamp"` + ModelID string `json:"model_id"` + NumCharacters int `json:"num_characters"` + NumRequests int `json:"num_requests"` + UserID string `json:"user_id"` +} diff --git a/x-pack/metricbeat/module/openai/usage/usage.go b/x-pack/metricbeat/module/openai/usage/usage.go new file mode 100644 index 00000000000..86dbe76ce81 --- /dev/null +++ b/x-pack/metricbeat/module/openai/usage/usage.go @@ -0,0 +1,387 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package usage + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "net" + "net/http" + "path" + "time" + + "golang.org/x/sync/errgroup" + "golang.org/x/time/rate" + + "github.com/elastic/beats/v7/libbeat/common/cfgwarn" + "github.com/elastic/beats/v7/metricbeat/mb" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/mapstr" + "github.com/elastic/elastic-agent-libs/paths" +) + +// init registers the MetricSet with the central registry as soon as the program +// starts. The New function will be called later to instantiate an instance of +// the MetricSet for each host is defined in the module's configuration. After the +// MetricSet has been created then Fetch will begin to be called periodically. +func init() { + mb.Registry.MustAddMetricSet("openai", "usage", New) +} + +// MetricSet holds any configuration or state information. It must implement +// the mb.MetricSet interface. And this is best achieved by embedding +// mb.BaseMetricSet because it implements all of the required mb.MetricSet +// interface methods except for Fetch. +type MetricSet struct { + mb.BaseMetricSet + httpClient *RLHTTPClient + logger *logp.Logger + config Config + report mb.ReporterV2 + stateManager *stateManager + headers map[string]string +} + +// New creates a new instance of the MetricSet. New is responsible for unpacking +// any MetricSet specific configuration options if there are any. +func New(base mb.BaseMetricSet) (mb.MetricSet, error) { + cfgwarn.Beta("The openai usage metricset is beta.") + + config := defaultConfig() + if err := base.Module().UnpackConfig(&config); err != nil { + return nil, err + } + + if err := config.Validate(); err != nil { + return nil, err + } + + sm, err := newStateManager(paths.Resolve(paths.Data, path.Join("state", base.Module().Name(), base.Name()))) + if err != nil { + return nil, fmt.Errorf("create state manager: %w", err) + } + + logger := logp.NewLogger("openai.usage") + + httpClient := newClient( + logger, + rate.NewLimiter( + rate.Every(time.Duration(*config.RateLimit.Limit)*time.Second), + *config.RateLimit.Burst, + ), + config.Timeout, + ) + + return &MetricSet{ + BaseMetricSet: base, + httpClient: httpClient, + logger: logger, + config: config, + stateManager: sm, + headers: processHeaders(config.Headers), + }, nil +} + +// Fetch collects OpenAI API usage data for the configured time range. +// +// The collection process: +// 1. Determines the time range based on realtime/non-realtime configuration +// 2. Calculates start date using configured lookback days +// 3. Fetches usage data for each day in the range +// 4. Reports collected metrics through the mb.ReporterV2 +func (m *MetricSet) Fetch(report mb.ReporterV2) error { + endDate := time.Now().UTC().Truncate(time.Hour * 24) // truncate to day as we only collect daily data + + if !m.config.Collection.Realtime { + // If we're not collecting realtime data, then just pull until + // yesterday (in UTC). + endDate = endDate.AddDate(0, 0, -1) + } + + startDate := endDate.AddDate(0, 0, -m.config.Collection.LookbackDays) + + m.report = report + return m.fetchDateRange(startDate, endDate, m.httpClient) +} + +// fetchDateRange retrieves OpenAI API usage data for each configured API key within a date range. +// +// For each API key: +// 1. Retrieves last processed date from state store +// 2. Adjusts collection range to avoid duplicates +// 3. Collects daily usage data +// 4. Updates state store with latest processed date +// 5. Handles errors per day without failing entire range +func (m *MetricSet) fetchDateRange(startDate, endDate time.Time, httpClient *RLHTTPClient) error { + g, ctx := errgroup.WithContext(context.TODO()) + + for i := range m.config.APIKeys { + apiKey := m.config.APIKeys[i] + apiKeyIdx := i + 1 + g.Go(func() error { + lastProcessedDate, err := m.stateManager.GetLastProcessedDate(apiKey.Key) + if err == nil { + currentStartDate := lastProcessedDate.AddDate(0, 0, 1) + if currentStartDate.After(endDate) { + m.logger.Infof("Skipping API key #%d as current start date (%s) is after end date (%s)", apiKeyIdx, currentStartDate, endDate) + return nil + } + startDate = currentStartDate + } + + m.logger.Debugf("Fetching data for API key #%d from %s to %s", apiKeyIdx, startDate, endDate) + + for d := startDate; !d.After(endDate); d = d.AddDate(0, 0, 1) { + select { + case <-ctx.Done(): + return ctx.Err() + default: + dateStr := d.Format(dateFormatForStateStore) + if err := m.fetchSingleDay(apiKeyIdx, dateStr, apiKey.Key, httpClient); err != nil { + // If there's an error, log it and continue to the next day. + // In this case, we are not saving the state. + m.logger.Errorf("Error fetching data (api key #%d) for date %s: %v", apiKeyIdx, dateStr, err) + continue + } + if err := m.stateManager.SaveState(apiKey.Key, dateStr); err != nil { + m.logger.Errorf("Error storing state for API key: %v at index %d", err, apiKeyIdx) + } + } + } + return nil + }) + } + + if err := g.Wait(); err != nil { + m.logger.Errorf("Error fetching data: %v", err) + } + + return nil +} + +// fetchSingleDay retrieves usage data for a specific date and API key. +func (m *MetricSet) fetchSingleDay(apiKeyIdx int, dateStr, apiKey string, httpClient *RLHTTPClient) error { + req, err := m.createRequest(dateStr, apiKey) + if err != nil { + return fmt.Errorf("error creating request: %w", err) + } + + resp, err := httpClient.Do(req) + if err != nil { + var netErr net.Error + if errors.As(err, &netErr) && netErr.Timeout() { + return fmt.Errorf("request timed out with configured timeout: %v and error: %w", m.config.Timeout, err) + } + return fmt.Errorf("error making request: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("error response from API: status=%s", resp.Status) + } + + return m.processResponse(apiKeyIdx, resp, dateStr) +} + +// createRequest builds an HTTP request for the OpenAI usage API. +func (m *MetricSet) createRequest(dateStr, apiKey string) (*http.Request, error) { + req, err := http.NewRequestWithContext(context.TODO(), http.MethodGet, m.config.APIURL, nil) + if err != nil { + return nil, fmt.Errorf("error creating request: %w", err) + } + + q := req.URL.Query() + q.Add("date", dateStr) + req.URL.RawQuery = q.Encode() + + req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", apiKey)) + for key, value := range m.headers { + req.Header.Add(key, value) + } + + return req, nil +} + +// processResponse handles the API response and processes the usage data. +func (m *MetricSet) processResponse(apiKeyIdx int, resp *http.Response, dateStr string) error { + var usageResponse UsageResponse + if err := json.NewDecoder(resp.Body).Decode(&usageResponse); err != nil { + return fmt.Errorf("error decoding response: %w", err) + } + + m.logger.Infof("Fetching usage metrics (api key #%d) for date: %s", apiKeyIdx, dateStr) + + m.processUsageData(usageResponse.Data) + m.processDalleData(usageResponse.DalleApiData) + m.processWhisperData(usageResponse.WhisperApiData) + m.processTTSData(usageResponse.TtsApiData) + + // Process additional data. + // + // NOTE(shmsr): During testing, could not get the usage data for the following + // and found no documentation, example responses, etc. That's why let's store them + // as it is so that we can use processors later on to process them as needed. + m.processFTData(usageResponse.FtData) + m.processAssistantCodeInterpreterData(usageResponse.AssistantCodeInterpreterData) + m.processRetrievalStorageData(usageResponse.RetrievalStorageData) + + return nil +} + +func getBaseFields(data BaseData) mapstr.M { + return mapstr.M{ + "organization_id": data.OrganizationID, + "organization_name": data.OrganizationName, + "api_key_id": data.ApiKeyID, + "api_key_name": data.ApiKeyName, + "api_key_redacted": data.ApiKeyRedacted, + "api_key_type": data.ApiKeyType, + "project_id": data.ProjectID, + "project_name": data.ProjectName, + } +} + +func (m *MetricSet) processUsageData(data []UsageData) { + events := make([]mb.Event, 0, len(data)) + for _, usage := range data { + event := mb.Event{ + Timestamp: time.Unix(usage.AggregationTimestamp, 0).UTC(), // epoch time to time.Time (UTC) + MetricSetFields: mapstr.M{ + "data": mapstr.M{ + "requests_total": usage.NRequests, + "operation": usage.Operation, + "snapshot_id": usage.SnapshotID, + "context_tokens_total": usage.NContextTokensTotal, + "generated_tokens_total": usage.NGeneratedTokensTotal, + "email": usage.Email, + "request_type": usage.RequestType, + "cached_context_tokens_total": usage.NCachedContextTokensTotal, + }, + }, + } + event.MetricSetFields.DeepUpdate(getBaseFields(usage.BaseData)) + events = append(events, event) + } + m.processEvents(events) +} + +func (m *MetricSet) processDalleData(data []DalleData) { + events := make([]mb.Event, 0, len(data)) + for _, dalle := range data { + event := mb.Event{ + Timestamp: time.Unix(dalle.Timestamp, 0).UTC(), // epoch time to time.Time (UTC) + MetricSetFields: mapstr.M{ + "dalle": mapstr.M{ + "num_images": dalle.NumImages, + "requests_total": dalle.NumRequests, + "image_size": dalle.ImageSize, + "operation": dalle.Operation, + "user_id": dalle.UserID, + "model_id": dalle.ModelID, + }, + }, + } + event.MetricSetFields.DeepUpdate(getBaseFields(dalle.BaseData)) + events = append(events, event) + } + m.processEvents(events) +} + +func (m *MetricSet) processWhisperData(data []WhisperData) { + events := make([]mb.Event, 0, len(data)) + for _, whisper := range data { + event := mb.Event{ + Timestamp: time.Unix(whisper.Timestamp, 0).UTC(), // epoch time to time.Time (UTC) + MetricSetFields: mapstr.M{ + "whisper": mapstr.M{ + "model_id": whisper.ModelID, + "num_seconds": whisper.NumSeconds, + "requests_total": whisper.NumRequests, + "user_id": whisper.UserID, + }, + }, + } + event.MetricSetFields.DeepUpdate(getBaseFields(whisper.BaseData)) + events = append(events, event) + } + m.processEvents(events) +} + +func (m *MetricSet) processTTSData(data []TtsData) { + events := make([]mb.Event, 0, len(data)) + for _, tts := range data { + event := mb.Event{ + Timestamp: time.Unix(tts.Timestamp, 0).UTC(), // epoch time to time.Time (UTC) + MetricSetFields: mapstr.M{ + "tts": mapstr.M{ + "model_id": tts.ModelID, + "num_characters": tts.NumCharacters, + "requests_total": tts.NumRequests, + "user_id": tts.UserID, + }, + }, + } + event.MetricSetFields.DeepUpdate(getBaseFields(tts.BaseData)) + events = append(events, event) + } + + m.processEvents(events) +} + +func (m *MetricSet) processFTData(data []interface{}) { + events := make([]mb.Event, 0, len(data)) + for _, ft := range data { + event := mb.Event{ + MetricSetFields: mapstr.M{ + "ft_data": mapstr.M{ + "original": ft, + }, + }, + } + events = append(events, event) + } + m.processEvents(events) +} + +func (m *MetricSet) processAssistantCodeInterpreterData(data []interface{}) { + events := make([]mb.Event, 0, len(data)) + for _, aci := range data { + event := mb.Event{ + MetricSetFields: mapstr.M{ + "assistant_code_interpreter": mapstr.M{ + "original": aci, + }, + }, + } + events = append(events, event) + } + m.processEvents(events) +} + +func (m *MetricSet) processRetrievalStorageData(data []interface{}) { + events := make([]mb.Event, 0, len(data)) + for _, rs := range data { + event := mb.Event{ + MetricSetFields: mapstr.M{ + "retrieval_storage": mapstr.M{ + "original": rs, + }, + }, + } + events = append(events, event) + } + m.processEvents(events) +} + +func (m *MetricSet) processEvents(events []mb.Event) { + if len(events) == 0 { + return + } + for i := range events { + m.report.Event(events[i]) + } +} diff --git a/x-pack/metricbeat/module/openai/usage/usage_integration_test.go b/x-pack/metricbeat/module/openai/usage/usage_integration_test.go new file mode 100644 index 00000000000..8cb84241995 --- /dev/null +++ b/x-pack/metricbeat/module/openai/usage/usage_integration_test.go @@ -0,0 +1,482 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build integration + +package usage + +import ( + "fmt" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/metricbeat/mb" + mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing" + "github.com/elastic/elastic-agent-libs/mapstr" +) + +func TestFetch(t *testing.T) { + apiKey := time.Now().String() // to generate a unique API key everytime to ignore the stateStore + usagePath := "/usage" + server := initServer(usagePath, apiKey) + defer server.Close() + + tests := []struct { + name string + expected mb.Event + }{ + { + name: "tc: #1", + expected: mb.Event{ + RootFields: nil, + ModuleFields: nil, + MetricSetFields: mapstr.M{ + "api_key_id": (*string)(nil), + "api_key_name": (*string)(nil), + "api_key_redacted": (*string)(nil), + "api_key_type": (*string)(nil), + "data": mapstr.M{ + "email": (*string)(nil), + "cached_context_tokens_total": 0, + "context_tokens_total": 118, + "generated_tokens_total": 35, + "requests_total": 1, + "operation": "completion-realtime", + "request_type": "", + "snapshot_id": "gpt-4o-realtime-preview-2024-10-01", + }, + "organization_id": "org-dummy", + "organization_name": "Personal", + "project_id": (*string)(nil), + "project_name": (*string)(nil), + }, + Index: "", + ID: "", + Namespace: "", + Timestamp: time.Date(2024, time.November, 4, 5, 1, 0, 0, time.UTC), + Error: error(nil), + Host: "", + Service: "", + Took: 0, + Period: 0, + DisableTimeSeries: false, + }, + }, + { + name: "tc: #2", + expected: mb.Event{ + RootFields: nil, + ModuleFields: nil, + MetricSetFields: mapstr.M{ + "api_key_id": (*string)(nil), + "api_key_name": (*string)(nil), + "api_key_redacted": (*string)(nil), + "api_key_type": (*string)(nil), + "data": mapstr.M{ + "email": (*string)(nil), + "cached_context_tokens_total": 0, + "context_tokens_total": 31, + "generated_tokens_total": 12, + "requests_total": 1, + "operation": "completion", + "request_type": "", + "snapshot_id": "gpt-4o-2024-08-06", + }, + "organization_id": "org-dummy", + "organization_name": "Personal", + "project_id": (*string)(nil), + "project_name": (*string)(nil), + }, + Index: "", + ID: "", + Namespace: "", + Timestamp: time.Date(2024, time.November, 4, 5, 1, 0, 0, time.UTC), + Error: error(nil), + Host: "", + Service: "", + Took: 0, + Period: 0, + DisableTimeSeries: false, + }, + }, + { + name: "tc: #3", + expected: mb.Event{ + RootFields: nil, + ModuleFields: nil, + MetricSetFields: mapstr.M{ + "api_key_id": (*string)(nil), + "api_key_name": (*string)(nil), + "api_key_redacted": (*string)(nil), + "api_key_type": (*string)(nil), + "data": mapstr.M{ + "email": (*string)(nil), + "cached_context_tokens_total": 0, + "context_tokens_total": 13, + "generated_tokens_total": 9, + "requests_total": 1, + "operation": "completion", + "request_type": "", + "snapshot_id": "ft:gpt-3.5-turbo-0125:personal:yay-renew:APjjyG8E:ckpt-step-84", + }, + "organization_id": "org-dummy", + "organization_name": "Personal", + "project_id": (*string)(nil), + "project_name": (*string)(nil), + }, + Index: "", + ID: "", + Namespace: "", + Timestamp: time.Date(2024, time.November, 4, 5, 19, 0, 0, time.UTC), + Error: error(nil), + Host: "", + Service: "", + Took: 0, + Period: 0, + DisableTimeSeries: false, + }, + }, + { + name: "tc: #4", + expected: mb.Event{ + RootFields: nil, + ModuleFields: nil, + MetricSetFields: mapstr.M{ + "api_key_id": ptr("key_sha_id_random"), + "api_key_name": ptr("project_key"), + "api_key_redacted": ptr("sk-...zkA"), + "api_key_type": ptr("organization"), + "dalle": mapstr.M{ + "image_size": "1024x1024", + "model_id": "dall-e-3", + "num_images": 1, + "requests_total": 1, + "operation": "generations", + "user_id": "hello-test@elastic.co", + }, + "organization_id": "org-dummy", + "organization_name": "Personal", + "project_id": ptr("Default Project"), + "project_name": ptr("Default Project"), + }, + Index: "", + ID: "", + Namespace: "", + Timestamp: time.Date(2024, time.November, 4, 5, 1, 0, 0, time.UTC), + Error: error(nil), + Host: "", + Service: "", + Took: 0, + Period: 0, + DisableTimeSeries: false, + }, + }, + { + name: "tc: #5", + expected: mb.Event{ + RootFields: nil, + ModuleFields: nil, + MetricSetFields: mapstr.M{ + "api_key_id": (*string)(nil), + "api_key_name": (*string)(nil), + "api_key_redacted": (*string)(nil), + "api_key_type": (*string)(nil), + "whisper": mapstr.M{ + "model_id": "whisper-1", + "requests_total": 1, + "num_seconds": 2, + "user_id": "", + }, + "organization_id": "org-dummy", + "organization_name": "Personal", + "project_id": (*string)(nil), + "project_name": (*string)(nil), + }, + Index: "", + ID: "", + Namespace: "", + Timestamp: time.Date(2024, time.November, 4, 5, 1, 0, 0, time.UTC), + Error: error(nil), + Host: "", + Service: "", + Took: 0, + Period: 0, + DisableTimeSeries: false, + }, + }, + { + name: "tc: #6", + expected: mb.Event{ + RootFields: nil, + ModuleFields: nil, + MetricSetFields: mapstr.M{ + "api_key_id": ptr("key_fake_id"), + "api_key_name": ptr("project_key"), + "api_key_redacted": ptr("sk-...zkA"), + "api_key_type": ptr("organization"), + "tts": mapstr.M{ + "model_id": "tts-1", + "num_characters": 90, + "requests_total": 2, + "user_id": "hello-test@elastic.co", + }, + "organization_id": "org-fake", + "organization_name": "Personal", + "project_id": ptr("Default Project"), + "project_name": ptr("Default Project"), + }, + Index: "", + ID: "", + Namespace: "", + Timestamp: time.Date(2024, time.September, 4, 0, 0, 0, 0, time.UTC), + Error: error(nil), + Host: "", + Service: "", + Took: 0, + Period: 0, + DisableTimeSeries: false, + }, + }, + { + name: "tc: #7", + expected: mb.Event{ + RootFields: nil, + ModuleFields: nil, + MetricSetFields: mapstr.M{ + "api_key_id": ptr("key_fake_id"), + "api_key_name": ptr("fake_key"), + "api_key_redacted": ptr("sk-...FIA"), + "api_key_type": ptr("project"), + "tts": mapstr.M{ + "model_id": "tts-1", + "num_characters": 45, + "requests_total": 1, + "user_id": "hello-test@elastic.co", + }, + "organization_id": "org-fake", + "organization_name": "Personal", + "project_id": ptr("proj_fake_id"), + "project_name": ptr("fake_proj"), + }, + Index: "", + ID: "", + Namespace: "", + Timestamp: time.Date(2024, time.September, 5, 0, 0, 0, 0, time.UTC), + Error: error(nil), + Host: "", + Service: "", + Took: 0, + Period: 0, + DisableTimeSeries: false, + }, + }, + } + + f := mbtest.NewReportingMetricSetV2Error(t, getConfig(server.URL+"/usage", apiKey)) + events, errs := mbtest.ReportingFetchV2Error(f) + + require.Empty(t, errs, "Expected no errors") + require.NotEmpty(t, events, "Expected events to be returned") + require.Equal(t, len(tests), len(events)) + + for i, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require.Equal(t, tt.expected, events[i]) + }) + } +} + +func TestData(t *testing.T) { + apiKey := time.Now().String() // to generate a unique API key everytime + usagePath := "/usage" + server := initServer(usagePath, apiKey) + defer server.Close() + + f := mbtest.NewReportingMetricSetV2Error(t, getConfig(server.URL+"/usage", apiKey)) + + err := mbtest.WriteEventsReporterV2Error(f, t, "") + require.NoError(t, err, "Writing events should not return an error") +} + +func getConfig(url, apiKey string) map[string]interface{} { + return map[string]interface{}{ + "module": "openai", + "metricsets": []string{"usage"}, + "enabled": true, + "period": "24h", + "api_url": url, + "api_keys": []map[string]interface{}{ + {"key": apiKey}, + }, + "rate_limit": map[string]interface{}{ + "limit": 60, + "burst": 5, + }, + "collection": map[string]interface{}{ + "lookback_days": 0, + "realtime": false, + }, + } +} + +func initServer(endpoint, api_key string) *httptest.Server { + data := []byte(`{ + "object": "list", + "data": [ + { + "organization_id": "org-dummy", + "organization_name": "Personal", + "aggregation_timestamp": 1730696460, + "n_requests": 1, + "operation": "completion-realtime", + "snapshot_id": "gpt-4o-realtime-preview-2024-10-01", + "n_context_tokens_total": 118, + "n_generated_tokens_total": 35, + "email": null, + "api_key_id": null, + "api_key_name": null, + "api_key_redacted": null, + "api_key_type": null, + "project_id": null, + "project_name": null, + "request_type": "", + "n_cached_context_tokens_total": 0 + }, + { + "organization_id": "org-dummy", + "organization_name": "Personal", + "aggregation_timestamp": 1730696460, + "n_requests": 1, + "operation": "completion", + "snapshot_id": "gpt-4o-2024-08-06", + "n_context_tokens_total": 31, + "n_generated_tokens_total": 12, + "email": null, + "api_key_id": null, + "api_key_name": null, + "api_key_redacted": null, + "api_key_type": null, + "project_id": null, + "project_name": null, + "request_type": "", + "n_cached_context_tokens_total": 0 + }, + { + "organization_id": "org-dummy", + "organization_name": "Personal", + "aggregation_timestamp": 1730697540, + "n_requests": 1, + "operation": "completion", + "snapshot_id": "ft:gpt-3.5-turbo-0125:personal:yay-renew:APjjyG8E:ckpt-step-84", + "n_context_tokens_total": 13, + "n_generated_tokens_total": 9, + "email": null, + "api_key_id": null, + "api_key_name": null, + "api_key_redacted": null, + "api_key_type": null, + "project_id": null, + "project_name": null, + "request_type": "", + "n_cached_context_tokens_total": 0 + } + ], + "ft_data": [], + "dalle_api_data": [ + { + "timestamp": 1730696460, + "num_images": 1, + "num_requests": 1, + "image_size": "1024x1024", + "operation": "generations", + "user_id": "hello-test@elastic.co", + "organization_id": "org-dummy", + "api_key_id": "key_sha_id_random", + "api_key_name": "project_key", + "api_key_redacted": "sk-...zkA", + "api_key_type": "organization", + "organization_name": "Personal", + "model_id": "dall-e-3", + "project_id": "Default Project", + "project_name": "Default Project" + } + ], + "whisper_api_data": [ + { + "timestamp": 1730696460, + "model_id": "whisper-1", + "num_seconds": 2, + "num_requests": 1, + "user_id": null, + "organization_id": "org-dummy", + "api_key_id": null, + "api_key_name": null, + "api_key_redacted": null, + "api_key_type": null, + "organization_name": "Personal", + "project_id": null, + "project_name": null + } + ], + "tts_api_data": [ + { + "timestamp": 1725408000, + "model_id": "tts-1", + "num_characters": 90, + "num_requests": 2, + "user_id": "hello-test@elastic.co", + "organization_id": "org-fake", + "api_key_id": "key_fake_id", + "api_key_name": "project_key", + "api_key_redacted": "sk-...zkA", + "api_key_type": "organization", + "organization_name": "Personal", + "project_id": "Default Project", + "project_name": "Default Project" + }, + { + "timestamp": 1725494400, + "model_id": "tts-1", + "num_characters": 45, + "num_requests": 1, + "user_id": "hello-test@elastic.co", + "organization_id": "org-fake", + "api_key_id": "key_fake_id", + "api_key_name": "fake_key", + "api_key_redacted": "sk-...FIA", + "api_key_type": "project", + "organization_name": "Personal", + "project_id": "proj_fake_id", + "project_name": "fake_proj" + } + ], + "assistant_code_interpreter_data": [], + "retrieval_storage_data": [] +}`) + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Validate Bearer token + authHeader := r.Header.Get("Authorization") + expectedToken := fmt.Sprintf("Bearer %s", api_key) + + if authHeader != expectedToken { + w.WriteHeader(http.StatusUnauthorized) + return + } + + // If it doesn't match the expected endpoint, return 404 + if r.URL.Path != endpoint { + w.WriteHeader(http.StatusNotFound) + return + } + + w.WriteHeader(http.StatusOK) + _, _ = w.Write(data) + })) + return server +} diff --git a/x-pack/metricbeat/modules.d/openai.yml.disabled b/x-pack/metricbeat/modules.d/openai.yml.disabled new file mode 100644 index 00000000000..6a881ae8641 --- /dev/null +++ b/x-pack/metricbeat/modules.d/openai.yml.disabled @@ -0,0 +1,37 @@ +# Module: openai +# Docs: https://www.elastic.co/guide/en/beats/metricbeat/main/metricbeat-module-openai.html + +- module: openai + metricsets: ["usage"] + enabled: false + period: 1h + + # # Project API Keys - Multiple API keys can be specified for different projects + # api_keys: + # - key: "api_key1" + # - key: "api_key2" + + # # API Configuration + # ## Base URL for the OpenAI usage API endpoint + # api_url: "https://api.openai.com/v1/usage" + # ## Custom headers to be included in API requests + # headers: + # - "k1: v1" + # - "k2: v2" + ## Rate Limiting Configuration + # rate_limit: + # limit: 12 # seconds between requests + # burst: 1 # max concurrent requests + # ## Request Timeout Duration + # timeout: 30s + + # # Data Collection Configuration + # collection: + # ## Number of days to look back when collecting usage data + # lookback_days: 30 + # ## Whether to collect usage data in realtime. Defaults to false as how + # # OpenAI usage data is collected will end up adding duplicate data to ES + # # and also making it harder to do analytics. Best approach is to avoid + # # realtime collection and collect only upto last day (in UTC). So, there's + # # at most 24h delay. + # realtime: false