From 68e2b8ba6990b0c4cb6badb6dcc0349edbca26f9 Mon Sep 17 00:00:00 2001 From: Maurizio Branca Date: Fri, 28 Jun 2024 07:12:23 +0200 Subject: [PATCH] Upgrade azure-eventhub to input v2 API (#39511) * Rename client to pipelineClient for clarity * Rename azure to eventHubMetadata for clarity * Move remaining code out of eph.go * Remove erroneous error check * Update tests Switch from the v1 Outlet to the v2 PipelineClient * Simplify run() It seems we only need to wait for the context to be done here. * Update NOTICE * Clarify handler tradeoffs The existing input version does not handle publishing acks from the Beats pipeline. The input API v1 does not seem to offer this feature. With the transition to the input API v2, we have acks management. However, the legacy event hub SDK internally updates the checkpoint info after a successful handler call, and does not seem to offer hooks for acks management. Since the new modern SDK offers better checkpoint management, we keep the current behavior intact, and we'll implement ACKs in the event hub input v2. --------- Co-authored-by: Tiago Queiroz --- NOTICE.txt | 104 +++-- go.mod | 17 +- go.sum | 41 +- x-pack/filebeat/include/list.go | 1 - x-pack/filebeat/input/azureeventhub/eph.go | 114 ------ .../filebeat/input/azureeventhub/eph_test.go | 48 --- x-pack/filebeat/input/azureeventhub/input.go | 287 ++------------ .../input/azureeventhub/input_test.go | 129 ++++--- .../filebeat/input/azureeventhub/metrics.go | 3 +- .../input/azureeventhub/metrics_test.go | 26 +- .../input/azureeventhub/sanitization_test.go | 10 +- .../filebeat/input/azureeventhub/v1_input.go | 359 ++++++++++++++++++ .../input/default-inputs/inputs_other.go | 2 + 13 files changed, 565 insertions(+), 576 deletions(-) delete mode 100644 x-pack/filebeat/input/azureeventhub/eph.go delete mode 100644 x-pack/filebeat/input/azureeventhub/eph_test.go create mode 100644 x-pack/filebeat/input/azureeventhub/v1_input.go diff --git a/NOTICE.txt b/NOTICE.txt index 562764723fe..5a04ef0ca55 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -1742,11 +1742,11 @@ SOFTWARE. -------------------------------------------------------------------------------- Dependency : github.com/Azure/azure-sdk-for-go/sdk/azcore -Version: v1.9.0 +Version: v1.11.1 Licence type (autodetected): MIT -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/!azure/azure-sdk-for-go/sdk/azcore@v1.9.0/LICENSE.txt: +Contents of probable licence file $GOMODCACHE/github.com/!azure/azure-sdk-for-go/sdk/azcore@v1.11.1/LICENSE.txt: MIT License @@ -1773,11 +1773,11 @@ SOFTWARE -------------------------------------------------------------------------------- Dependency : github.com/Azure/azure-sdk-for-go/sdk/azidentity -Version: v1.4.0 +Version: v1.5.2 Licence type (autodetected): MIT -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/!azure/azure-sdk-for-go/sdk/azidentity@v1.4.0/LICENSE.txt: +Contents of probable licence file $GOMODCACHE/github.com/!azure/azure-sdk-for-go/sdk/azidentity@v1.5.2/LICENSE.txt: MIT License @@ -1954,11 +1954,11 @@ SOFTWARE. -------------------------------------------------------------------------------- Dependency : github.com/Azure/azure-sdk-for-go/sdk/storage/azblob -Version: v1.0.0 +Version: v1.3.2 Licence type (autodetected): MIT -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/!azure/azure-sdk-for-go/sdk/storage/azblob@v1.0.0/LICENSE.txt: +Contents of probable licence file $GOMODCACHE/github.com/!azure/azure-sdk-for-go/sdk/storage/azblob@v1.3.2/LICENSE.txt: MIT License @@ -17511,11 +17511,11 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -------------------------------------------------------------------------------- Dependency : github.com/golang-jwt/jwt/v5 -Version: v5.0.0 +Version: v5.2.1 Licence type (autodetected): MIT -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/golang-jwt/jwt/v5@v5.0.0/LICENSE: +Contents of probable licence file $GOMODCACHE/github.com/golang-jwt/jwt/v5@v5.2.1/LICENSE: Copyright (c) 2012 Dave Grijalva Copyright (c) 2021 golang-jwt maintainers @@ -29001,11 +29001,11 @@ Contents of probable licence file $GOMODCACHE/github.com/!azure/azure-pipeline-g -------------------------------------------------------------------------------- Dependency : github.com/Azure/azure-sdk-for-go/sdk/internal -Version: v1.5.0 +Version: v1.7.0 Licence type (autodetected): MIT -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/!azure/azure-sdk-for-go/sdk/internal@v1.5.0/LICENSE.txt: +Contents of probable licence file $GOMODCACHE/github.com/!azure/azure-sdk-for-go/sdk/internal@v1.7.0/LICENSE.txt: MIT License @@ -29122,13 +29122,43 @@ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +-------------------------------------------------------------------------------- +Dependency : github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage +Version: v1.5.0 +Licence type (autodetected): MIT +-------------------------------------------------------------------------------- + +Contents of probable licence file $GOMODCACHE/github.com/!azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage@v1.5.0/LICENSE.txt: + +MIT License + +Copyright (c) Microsoft Corporation. All rights reserved. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + -------------------------------------------------------------------------------- Dependency : github.com/Azure/go-amqp -Version: v1.0.0 +Version: v1.0.5 Licence type (autodetected): MIT -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/!azure/go-amqp@v1.0.0/LICENSE: +Contents of probable licence file $GOMODCACHE/github.com/!azure/go-amqp@v1.0.5/LICENSE: MIT License @@ -30826,11 +30856,11 @@ SOFTWARE. -------------------------------------------------------------------------------- Dependency : github.com/AzureAD/microsoft-authentication-library-for-go -Version: v1.1.1 +Version: v1.2.2 Licence type (autodetected): MIT -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/!azure!a!d/microsoft-authentication-library-for-go@v1.1.1/LICENSE: +Contents of probable licence file $GOMODCACHE/github.com/!azure!a!d/microsoft-authentication-library-for-go@v1.2.2/LICENSE: MIT License @@ -36984,40 +37014,6 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. --------------------------------------------------------------------------------- -Dependency : github.com/dnaeon/go-vcr -Version: v1.2.0 -Licence type (autodetected): BSD-2-Clause --------------------------------------------------------------------------------- - -Contents of probable licence file $GOMODCACHE/github.com/dnaeon/go-vcr@v1.2.0/LICENSE: - -Copyright (c) 2015-2016 Marin Atanasov Nikolov -All rights reserved. - -Redistribution and use in source and binary forms, with or without -modification, are permitted provided that the following conditions -are met: - - 1. Redistributions of source code must retain the above copyright - notice, this list of conditions and the following disclaimer - in this position and unchanged. - 2. Redistributions in binary form must reproduce the above copyright - notice, this list of conditions and the following disclaimer in the - documentation and/or other materials provided with the distribution. - -THIS SOFTWARE IS PROVIDED BY THE AUTHOR(S) ``AS IS'' AND ANY EXPRESS OR -IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES -OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. -IN NO EVENT SHALL THE AUTHOR(S) BE LIABLE FOR ANY DIRECT, INDIRECT, -INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT -NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF -THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - - -------------------------------------------------------------------------------- Dependency : github.com/dnephin/pflag Version: v1.0.7 @@ -45608,11 +45604,11 @@ SOFTWARE. -------------------------------------------------------------------------------- Dependency : github.com/joho/godotenv -Version: v1.3.0 +Version: v1.5.1 Licence type (autodetected): MIT -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/joho/godotenv@v1.3.0/LICENCE: +Contents of probable licence file $GOMODCACHE/github.com/joho/godotenv@v1.5.1/LICENCE: Copyright (c) 2013 John Barton @@ -49585,11 +49581,11 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -------------------------------------------------------------------------------- Dependency : github.com/pkg/browser -Version: v0.0.0-20210911075715-681adbf594b8 +Version: v0.0.0-20240102092130-5ac0b6a4141c Licence type (autodetected): BSD-2-Clause -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/pkg/browser@v0.0.0-20210911075715-681adbf594b8/LICENSE: +Contents of probable licence file $GOMODCACHE/github.com/pkg/browser@v0.0.0-20240102092130-5ac0b6a4141c/LICENSE: Copyright (c) 2014, Dave Cheney All rights reserved. @@ -49899,11 +49895,11 @@ Contents of probable licence file $GOMODCACHE/github.com/prometheus/client_golan -------------------------------------------------------------------------------- Dependency : github.com/rogpeppe/go-internal -Version: v1.11.0 +Version: v1.12.0 Licence type (autodetected): BSD-3-Clause -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/rogpeppe/go-internal@v1.11.0/LICENSE: +Contents of probable licence file $GOMODCACHE/github.com/rogpeppe/go-internal@v1.12.0/LICENSE: Copyright (c) 2018 The Go Authors. All rights reserved. diff --git a/go.mod b/go.mod index 9d71206e92c..6bf9204f493 100644 --- a/go.mod +++ b/go.mod @@ -180,14 +180,14 @@ require ( cloud.google.com/go v0.110.8 cloud.google.com/go/compute v1.23.0 cloud.google.com/go/redis v1.13.1 - github.com/Azure/azure-sdk-for-go/sdk/azcore v1.9.0 - github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.4.0 + github.com/Azure/azure-sdk-for-go/sdk/azcore v1.11.1 + github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.5.2 github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/consumption/armconsumption v1.1.0 github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/containerservice/armcontainerservice/v4 v4.6.0 github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/costmanagement/armcostmanagement v1.1.0 github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/monitor/armmonitor v0.8.0 github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources v1.1.1 - github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.0.0 + github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.3.2 github.com/Azure/go-autorest/autorest/adal v0.9.21 github.com/apache/arrow/go/v14 v14.0.2 github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.1 @@ -209,7 +209,7 @@ require ( github.com/elastic/toutoumomoma v0.0.0-20221026030040-594ef30cb640 github.com/foxcpp/go-mockdns v0.0.0-20201212160233-ede2f9158d15 github.com/go-ldap/ldap/v3 v3.4.6 - github.com/golang-jwt/jwt/v5 v5.0.0 + github.com/golang-jwt/jwt/v5 v5.2.1 github.com/google/cel-go v0.19.0 github.com/googleapis/gax-go/v2 v2.12.0 github.com/gorilla/handlers v1.5.1 @@ -240,15 +240,15 @@ require ( github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24 // indirect github.com/Azure/azure-amqp-common-go/v4 v4.2.0 // indirect github.com/Azure/azure-pipeline-go v0.2.3 // indirect - github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.0 // indirect - github.com/Azure/go-amqp v1.0.0 // indirect + github.com/Azure/azure-sdk-for-go/sdk/internal v1.7.0 // indirect + github.com/Azure/go-amqp v1.0.5 // indirect github.com/Azure/go-autorest v14.2.0+incompatible // indirect github.com/Azure/go-autorest/autorest/to v0.4.0 // indirect github.com/Azure/go-autorest/autorest/validation v0.3.1 // indirect github.com/Azure/go-autorest/logger v0.2.1 // indirect github.com/Azure/go-autorest/tracing v0.6.0 // indirect github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 // indirect - github.com/AzureAD/microsoft-authentication-library-for-go v1.1.1 // indirect + github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2 // indirect github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect github.com/andybalholm/brotli v1.0.5 // indirect github.com/antlr4-go/antlr/v4 v4.13.0 // indirect @@ -320,6 +320,7 @@ require ( github.com/jcmturner/gokrb5/v8 v8.4.2 // indirect github.com/jcmturner/rpc/v2 v2.0.3 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect + github.com/joho/godotenv v1.5.1 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/karrick/godirwalk v1.17.0 // indirect @@ -348,7 +349,7 @@ require ( github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.0.2 // indirect github.com/pierrec/lz4 v2.6.0+incompatible // indirect - github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 // indirect + github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect github.com/prometheus/client_golang v1.11.1 // indirect github.com/rootless-containers/rootlesskit v1.1.0 // indirect diff --git a/go.sum b/go.sum index ed2a429b647..12ad7d6a691 100644 --- a/go.sum +++ b/go.sum @@ -96,18 +96,18 @@ github.com/Azure/azure-sdk-for-go/sdk/azcore v0.19.0/go.mod h1:h6H6c8enJmmocHUbL github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U= github.com/Azure/azure-sdk-for-go/sdk/azcore v1.3.0/go.mod h1:tZoQYdDZNOiIjdSn0dVWVfl0NEPGOJqVLzSrcFk4Is0= github.com/Azure/azure-sdk-for-go/sdk/azcore v1.4.0/go.mod h1:ON4tFdPTwRcgWEaVDrN3584Ef+b7GgSJaXxe5fW9t4M= -github.com/Azure/azure-sdk-for-go/sdk/azcore v1.9.0 h1:fb8kj/Dh4CSwgsOzHeZY4Xh68cFVbzXx+ONXGMY//4w= -github.com/Azure/azure-sdk-for-go/sdk/azcore v1.9.0/go.mod h1:uReU2sSxZExRPBAg3qKzmAucSi51+SP1OhohieR821Q= +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.11.1 h1:E+OJmp2tPvt1W+amx48v1eqbjDYsgN+RzP4q16yV5eM= +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.11.1/go.mod h1:a6xsAQUZg+VsS3TJ05SRp524Hs4pZ/AeFSr5ENf0Yjo= github.com/Azure/azure-sdk-for-go/sdk/azidentity v0.11.0/go.mod h1:HcM1YX14R7CJcghJGOYCgdezslRSVzqwLf/q+4Y2r/0= github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0/go.mod h1:bhXu1AjYL+wutSL/kpSq6s7733q2Rb0yuot9Zgfqa/0= -github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.4.0 h1:BMAjVKJM0U/CYF27gA0ZMmXGkOcvfFtD0oHVZ1TIPRI= -github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.4.0/go.mod h1:1fXstnBMas5kzG+S3q8UoJcmyU6nUeunJcMDHcRYHhs= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.5.2 h1:FDif4R1+UUR+00q6wquyX90K7A8dN+R5E8GEadoP7sU= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.5.2/go.mod h1:aiYBYui4BJ/BJCAIKs92XiPyQfTaBWqvHujDwKb6CBU= github.com/Azure/azure-sdk-for-go/sdk/internal v0.7.0/go.mod h1:yqy467j36fJxcRV2TzfVZ1pCb5vxm4BtZPUdYWe/Xo8= github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0/go.mod h1:eWRD7oawr1Mu1sLCawqVc0CUiF43ia3qQMxLscsKQ9w= github.com/Azure/azure-sdk-for-go/sdk/internal v1.1.1/go.mod h1:eWRD7oawr1Mu1sLCawqVc0CUiF43ia3qQMxLscsKQ9w= github.com/Azure/azure-sdk-for-go/sdk/internal v1.1.2/go.mod h1:eWRD7oawr1Mu1sLCawqVc0CUiF43ia3qQMxLscsKQ9w= -github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.0 h1:d81/ng9rET2YqdVkVwkb6EXeRrLJIwyGnJcAlAWKwhs= -github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.0/go.mod h1:s4kgfzA0covAXNicZHDMN58jExvcng2mC/DepXiF1EI= +github.com/Azure/azure-sdk-for-go/sdk/internal v1.7.0 h1:rTfKOCZGy5ViVrlA74ZPE99a+SgoEE2K/yg3RyW9dFA= +github.com/Azure/azure-sdk-for-go/sdk/internal v1.7.0/go.mod h1:4OG6tQ9EOP/MT0NMjDlRzWoVFxfu9rN9B2X+tlSVktg= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/containerservice/armcontainerservice/v4 v4.6.0 h1:AAIdAyPkFff6XTct2lQCxOWN/+LnA41S7kIkzKaMbyE= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/containerservice/armcontainerservice/v4 v4.6.0/go.mod h1:noQIdW75SiQFB3mSFJBr4iRRH83S9skaFiBv4C0uEs0= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/costmanagement/armcostmanagement v1.1.0 h1:1MRED2aeLx/BPHC23XRtr8Mk6zcc70HNRYPQ73R0gHw= @@ -122,12 +122,15 @@ github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/monitor/armmonitor v0.8.0 github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/monitor/armmonitor v0.8.0/go.mod h1:kzRLpzzlw6eBUXE7eBw3oqfmKR/kxaHOk4+h9sAe6Yo= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources v1.1.1 h1:7CBQ+Ei8SP2c6ydQTGCCrS35bDxgTMfoP2miAwK++OU= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources v1.1.1/go.mod h1:c/wcGeGx5FUPbM/JltUYHZcKmigwyVLJlDq+4HdtXaw= -github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.0.0 h1:u/LLAOFgsMv7HmNL4Qufg58y+qElGOt5qv0z1mURkRY= +github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.5.0 h1:AifHbc4mg0x9zW52WOpKbsHaDKuRhlI7TVl47thgQ70= +github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.5.0/go.mod h1:T5RfihdXtBDxt1Ch2wobif3TvzTdumDy29kahv6AV9A= github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.0.0/go.mod h1:2e8rMJtl2+2j+HXbTBwnyGpm5Nou7KhvSfxOq8JpTag= +github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.3.2 h1:YUUxeiOWgdAQE3pXt2H7QXzZs0q8UBjgRbl56qo8GYM= +github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.3.2/go.mod h1:dmXQgZuiSubAecswZE+Sm8jkvEa7kQgTPVRvwL/nd0E= github.com/Azure/azure-storage-blob-go v0.15.0 h1:rXtgp8tN1p29GvpGgfJetavIG0V7OgcSXPpwp3tx6qk= github.com/Azure/azure-storage-blob-go v0.15.0/go.mod h1:vbjsVbX0dlxnRc4FFMPsS9BsJWPcne7GB7onqlPvz58= -github.com/Azure/go-amqp v1.0.0 h1:QfCugi1M+4F2JDTRgVnRw7PYXLXZ9hmqk3+9+oJh3OA= -github.com/Azure/go-amqp v1.0.0/go.mod h1:+bg0x3ce5+Q3ahCEXnCsGG3ETpDQe3MEVnOuT2ywPwc= +github.com/Azure/go-amqp v1.0.5 h1:po5+ljlcNSU8xtapHTe8gIc8yHxCzC03E8afH2g1ftU= +github.com/Azure/go-amqp v1.0.5/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE= github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78/go.mod h1:LmzpDX56iTiv29bbRTIsUNlaFfuhWRQBWjQdVyAevI8= github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 h1:L/gRVlceqvL25UVaW/CKtUDjefjrs0SPonmDGUVOYP0= github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= @@ -182,8 +185,8 @@ github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBp github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 h1:mFRzDkZVAjdal+s7s0MwaRv9igoPqLRdzOLzw/8Xvq8= github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358/go.mod h1:chxPXzSsl7ZWRAuOIE23GDNzjWuZquvFlgA8xmpunjU= github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1/go.mod h1:Vt9sXTKwMyGcOxSmLDMnGPgqsUg7m8pe215qMLrDXw4= -github.com/AzureAD/microsoft-authentication-library-for-go v1.1.1 h1:WpB/QDNLpMw72xHJc34BNNykqSOeEJDAWkhf0u12/Jk= -github.com/AzureAD/microsoft-authentication-library-for-go v1.1.1/go.mod h1:wP83P5OoQ5p6ip3ScPr0BAq0BvuPAvacpEuSzyouqAI= +github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2 h1:XHOnouVk1mxXfQidrMEnLlPk9UMeRtyBTnEFtxkV0kU= +github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2/go.mod h1:wP83P5OoQ5p6ip3ScPr0BAq0BvuPAvacpEuSzyouqAI= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= @@ -495,7 +498,6 @@ github.com/dlclark/regexp2 v1.4.0 h1:F1rxgk7p4uKjwIQxBs9oAXe5CqrXlCduYEJvrF4u93E github.com/dlclark/regexp2 v1.4.0/go.mod h1:2pZnwuY/m+8K6iRw6wQdMtk+rH5tNGR1i55kozfMjCc= github.com/dnaeon/go-vcr v1.0.1/go.mod h1:aBB1+wY4s93YsC3HHjMBMrwTj2R9FHDzUr9KyGc8n1E= github.com/dnaeon/go-vcr v1.1.0/go.mod h1:M7tiix8f0r6mKKJ3Yq/kqU1OYf3MnfmBWVbPx/yU9ko= -github.com/dnaeon/go-vcr v1.2.0 h1:zHCHvJYTMh1N7xnV7zf1m1GPBF9Ad0Jk/whtQ1663qI= github.com/dnaeon/go-vcr v1.2.0/go.mod h1:R4UdLID7HZT3taECzJs4YgbbH6PIGXB6W/sc5OLb6RQ= github.com/dnephin/pflag v1.0.7 h1:oxONGlWxhmUct0YzKTgrpQv9AUA1wtPBn7zuSjJqptk= github.com/dnephin/pflag v1.0.7/go.mod h1:uxE91IoWURlOiTUIA8Mq5ZZkAv3dPUfZNaT80Zm7OQE= @@ -866,8 +868,8 @@ github.com/golang-jwt/jwt/v4 v4.0.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzw github.com/golang-jwt/jwt/v4 v4.2.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzwAxVc6locg= github.com/golang-jwt/jwt/v4 v4.5.0 h1:7cYmW1XlMY7h7ii7UhUyChSgS5wUJEnm9uZVTGqOWzg= github.com/golang-jwt/jwt/v4 v4.5.0/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= -github.com/golang-jwt/jwt/v5 v5.0.0 h1:1n1XNM9hk7O9mnQoNBGolZvzebBQ7p93ULHRc28XJUE= -github.com/golang-jwt/jwt/v5 v5.0.0/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= +github.com/golang-jwt/jwt/v5 v5.2.1 h1:OuVbFODueb089Lh128TAcimifWaLhJwVflnrgM17wHk= +github.com/golang-jwt/jwt/v5 v5.2.1/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe h1:lXe2qZdvpiX5WZkZR4hgp4KJVfY3nMkvmwbVkpv1rVY= github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0= github.com/golang-sql/sqlexp v0.1.0 h1:ZCD6MBpcuOVfGVqsEmY5/4FtYiKz6tSyUv9LPEDei6A= @@ -1145,8 +1147,9 @@ github.com/jmoiron/sqlx v1.3.1 h1:aLN7YINNZ7cYOPK3QC83dbM6KT0NMqVMw961TqrejlE= github.com/jmoiron/sqlx v1.3.1/go.mod h1:2BljVx/86SuTyjE+aPYlHCTNvZrnJXghYGpNiXLBMCQ= github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901 h1:rp+c0RAYOWj8l6qbCUTSiRLG/iKnW3K3/QfPPuSsBt4= github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901/go.mod h1:Z86h9688Y0wesXCyonoVr47MasHilkuLMqGhRZ4Hpak= -github.com/joho/godotenv v1.3.0 h1:Zjp+RcGpHhGlrMbJzXTrZZPrWj+1vfm90La1wgB6Bhc= github.com/joho/godotenv v1.3.0/go.mod h1:7hK45KPybAkOC6peb+G5yklZfMxEjkZhHbwpqxOKXbg= +github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= +github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= github.com/jonboulle/clockwork v0.2.2 h1:UOGuzwb1PwsrDAObMuhUnj0p5ULPj8V/xJ7Kx9qUBdQ= github.com/jonboulle/clockwork v0.2.2/go.mod h1:Pkfl5aHPm1nk2H9h0bjmnJD/BcgbGXUBGnn1kMkgxc8= @@ -1429,8 +1432,9 @@ github.com/pierrre/gotestcover v0.0.0-20160517101806-924dca7d15f0 h1:i5VIxp6QB8o github.com/pierrre/gotestcover v0.0.0-20160517101806-924dca7d15f0/go.mod h1:4xpMLz7RBWyB+ElzHu8Llua96TRCB3YwX+l5EP1wmHk= github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4/go.mod h1:4OwLy04Bl9Ef3GJJCoec+30X3LQs/0/m4HFRt/2LUSA= github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4/go.mod h1:N6UoU20jOqggOuDwUaBQpluzLNDqif3kq9z2wpdYEfQ= -github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 h1:KoWmjvw+nsYOo29YJK9vDA65RGE3NrOnUtO7a+RF9HU= github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8/go.mod h1:HKlIX3XHQyzLZPlr7++PzdhaXEj94dEiJgZDTsxEqUI= +github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c h1:+mdjkGKdHQG3305AYmdv1U2eRNDiU2ErMBj1gwrq8eQ= +github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c/go.mod h1:7rwL4CYBLnjLxUqIJNnCWiEdr3bn6IUYi15bNlnbCCU= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -1511,8 +1515,8 @@ github.com/rogpeppe/go-internal v1.2.2/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFR github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= -github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= -github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA= +github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= +github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/rootless-containers/rootlesskit v1.1.0 h1:cRaRIYxY8oce4eE/zeAUZhgKu/4tU1p9YHN4+suwV7M= github.com/rootless-containers/rootlesskit v1.1.0/go.mod h1:H+o9ndNe7tS91WqU0/+vpvc+VaCd7TCIWaJjnV0ujUo= github.com/rs/cors v1.6.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU= @@ -2080,6 +2084,7 @@ golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220829200755-d48e67d00261/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/x-pack/filebeat/include/list.go b/x-pack/filebeat/include/list.go index fcca6f27de8..5e2cc02a4c9 100644 --- a/x-pack/filebeat/include/list.go +++ b/x-pack/filebeat/include/list.go @@ -15,7 +15,6 @@ import ( // Import packages that perform 'func init()'. _ "github.com/elastic/beats/v7/x-pack/filebeat/input/awscloudwatch" _ "github.com/elastic/beats/v7/x-pack/filebeat/input/awss3" - _ "github.com/elastic/beats/v7/x-pack/filebeat/input/azureeventhub" _ "github.com/elastic/beats/v7/x-pack/filebeat/input/cometd" _ "github.com/elastic/beats/v7/x-pack/filebeat/input/etw" _ "github.com/elastic/beats/v7/x-pack/filebeat/input/gcppubsub" diff --git a/x-pack/filebeat/input/azureeventhub/eph.go b/x-pack/filebeat/input/azureeventhub/eph.go deleted file mode 100644 index f6981b4882e..00000000000 --- a/x-pack/filebeat/input/azureeventhub/eph.go +++ /dev/null @@ -1,114 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -//go:build !aix - -package azureeventhub - -import ( - "context" - "errors" - "fmt" - - eventhub "github.com/Azure/azure-event-hubs-go/v3" - "github.com/Azure/azure-event-hubs-go/v3/eph" - "github.com/Azure/azure-event-hubs-go/v3/storage" - "github.com/Azure/azure-storage-blob-go/azblob" - "github.com/Azure/go-autorest/autorest/azure" -) - -// users can select from one of the already defined azure cloud envs -var environments = map[string]azure.Environment{ - azure.ChinaCloud.ResourceManagerEndpoint: azure.ChinaCloud, - azure.GermanCloud.ResourceManagerEndpoint: azure.GermanCloud, - azure.PublicCloud.ResourceManagerEndpoint: azure.PublicCloud, - azure.USGovernmentCloud.ResourceManagerEndpoint: azure.USGovernmentCloud, -} - -// runWithEPH will consume ingested events using the Event Processor Host (EPH). -// -// To learn more, check the following resources: -// - https://github.com/Azure/azure-event-hubs-go#event-processor-host -// - https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-event-processor-host -func (a *azureInput) runWithEPH() error { - // create a new Azure Storage Leaser / Checkpointer - cred, err := azblob.NewSharedKeyCredential(a.config.SAName, a.config.SAKey) - if err != nil { - return err - } - env, err := getAzureEnvironment(a.config.OverrideEnvironment) - if err != nil { - return err - } - leaserCheckpointer, err := storage.NewStorageLeaserCheckpointer(cred, a.config.SAName, a.config.SAContainer, env) - if err != nil { - a.log.Errorw("error creating storage leaser checkpointer", "error", err) - return err - } - - // adding a nil EventProcessorHostOption will break the code, - // this is why a condition is added and a.processor is assigned. - if a.config.ConsumerGroup != "" { - a.processor, err = eph.NewFromConnectionString( - a.workerCtx, - fmt.Sprintf("%s%s%s", a.config.ConnectionString, eventHubConnector, a.config.EventHubName), - leaserCheckpointer, - leaserCheckpointer, - eph.WithConsumerGroup(a.config.ConsumerGroup), - eph.WithNoBanner()) - } else { - a.processor, err = eph.NewFromConnectionString( - a.workerCtx, - fmt.Sprintf("%s%s%s", a.config.ConnectionString, eventHubConnector, a.config.EventHubName), - leaserCheckpointer, - leaserCheckpointer, - eph.WithNoBanner()) - } - if err != nil { - a.log.Errorw("error creating processor", "error", err) - return err - } - - // register a message handler -- many can be registered - handlerID, err := a.processor.RegisterHandler(a.workerCtx, - func(c context.Context, e *eventhub.Event) error { - var onEventErr error - // partitionID is not yet mapped in the azure-eventhub sdk - ok := a.processEvents(e, "") - if !ok { - onEventErr = errors.New("OnEvent function returned false. Stopping input worker") - a.log.Error(onEventErr.Error()) - a.Stop() - } - return onEventErr - }) - if err != nil { - a.log.Errorw("error registering handler", "error", err) - return err - } - a.log.Infof("handler id: %q is registered\n", handlerID) - - // Start handling messages from all of the partitions balancing across - // multiple consumers. - // The processor can be stopped by calling `Close()` on the processor. - err = a.processor.StartNonBlocking(a.workerCtx) - if err != nil { - a.log.Errorw("error starting the processor", "error", err) - return err - } - - return nil -} - -func getAzureEnvironment(overrideResManager string) (azure.Environment, error) { - // if no override is set then the azure public cloud is used - if overrideResManager == "" || overrideResManager == "" { - return azure.PublicCloud, nil - } - if env, ok := environments[overrideResManager]; ok { - return env, nil - } - // can retrieve hybrid env from the resource manager endpoint - return azure.EnvironmentFromURL(overrideResManager) -} diff --git a/x-pack/filebeat/input/azureeventhub/eph_test.go b/x-pack/filebeat/input/azureeventhub/eph_test.go deleted file mode 100644 index 86922d29559..00000000000 --- a/x-pack/filebeat/input/azureeventhub/eph_test.go +++ /dev/null @@ -1,48 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -//go:build !aix - -package azureeventhub - -import ( - "testing" - - "github.com/Azure/go-autorest/autorest/azure" - - "github.com/stretchr/testify/assert" -) - -var invalidConfig = azureInputConfig{ - SAKey: "invalid_key", - SAName: "storage", - SAContainer: ephContainerName, - ConnectionString: "invalid_connection_string", - ConsumerGroup: "$Default", -} - -func TestRunWithEPH(t *testing.T) { - input := azureInput{config: invalidConfig} - // decoding error when key is invalid - err := input.runWithEPH() - assert.Error(t, err, '7') -} - -func TestGetAzureEnvironment(t *testing.T) { - resMan := "" - env, err := getAzureEnvironment(resMan) - assert.NoError(t, err) - assert.Equal(t, env, azure.PublicCloud) - resMan = "https://management.microsoftazure.de/" - env, err = getAzureEnvironment(resMan) - assert.NoError(t, err) - assert.Equal(t, env, azure.GermanCloud) - resMan = "http://management.invalidhybrid.com/" - _, err = getAzureEnvironment(resMan) - assert.Errorf(t, err, "invalid character 'F' looking for beginning of value") - resMan = "" - env, err = getAzureEnvironment(resMan) - assert.NoError(t, err) - assert.Equal(t, env, azure.PublicCloud) -} diff --git a/x-pack/filebeat/input/azureeventhub/input.go b/x-pack/filebeat/input/azureeventhub/input.go index bc2244925e6..a12083f2abe 100644 --- a/x-pack/filebeat/input/azureeventhub/input.go +++ b/x-pack/filebeat/input/azureeventhub/input.go @@ -7,23 +7,18 @@ package azureeventhub import ( - "context" - "encoding/json" "fmt" "strings" - "sync" - "time" - eventhub "github.com/Azure/azure-event-hubs-go/v3" - "github.com/Azure/azure-event-hubs-go/v3/eph" - "github.com/mitchellh/hashstructure" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" + "github.com/Azure/go-autorest/autorest/azure" - "github.com/elastic/beats/v7/filebeat/channel" - "github.com/elastic/beats/v7/filebeat/input" + v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/feature" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" - "github.com/elastic/elastic-agent-libs/mapstr" + "github.com/elastic/go-concert/unison" ) const ( @@ -32,262 +27,60 @@ const ( inputName = "azure-eventhub" ) -func init() { - err := input.Register(inputName, NewInput) - if err != nil { - panic(fmt.Errorf("failed to register %v input: %w", inputName, err)) - } +var environments = map[string]azure.Environment{ + azure.ChinaCloud.ResourceManagerEndpoint: azure.ChinaCloud, + azure.GermanCloud.ResourceManagerEndpoint: azure.GermanCloud, + azure.PublicCloud.ResourceManagerEndpoint: azure.PublicCloud, + azure.USGovernmentCloud.ResourceManagerEndpoint: azure.USGovernmentCloud, } -// configID computes a unique ID for the input configuration. -// -// It is used to identify the input in the registry and to detect -// changes in the configuration. +// Plugin returns the Azure Event Hub input plugin. // -// We will remove this function as we upgrade the input to the -// v2 API (there is an ID in the v2 context). -func configID(config *conf.C) (string, error) { - var tmp struct { - ID string `config:"id"` - } - if err := config.Unpack(&tmp); err != nil { - return "", fmt.Errorf("error extracting ID: %w", err) - } - if tmp.ID != "" { - return tmp.ID, nil - } - - var h map[string]interface{} - _ = config.Unpack(&h) - id, err := hashstructure.Hash(h, nil) - if err != nil { - return "", fmt.Errorf("can not compute ID from configuration: %w", err) +// Required register the plugin loader for the +// input API v2. +func Plugin(log *logp.Logger) v2.Plugin { + return v2.Plugin{ + Name: inputName, + Stability: feature.Stable, + Deprecated: false, + Info: "Collect logs from Azure Event Hub", + Manager: &eventHubInputManager{ + log: log, + }, } +} - return fmt.Sprintf("%16X", id), nil +// eventHubInputManager is the manager for the Azure Event Hub input. +// +// It is responsible for creating new instances of the input, according +// to the configuration provided. +type eventHubInputManager struct { + log *logp.Logger } -// azureInput struct for the azure-eventhub input -type azureInput struct { - config azureInputConfig // azure-eventhub configuration - context input.Context - outlet channel.Outleter - log *logp.Logger // logging info and error messages - workerCtx context.Context // worker goroutine context. It's cancelled when the input stops or the worker exits. - workerCancel context.CancelFunc // used to signal that the worker should stop. - workerOnce sync.Once // guarantees that the worker goroutine is only started once. - processor *eph.EventProcessorHost // eph will be assigned if users have enabled the option - id string // ID of the input; used to identify the input in the input metrics registry only, and will be removed once the input is migrated to v2. - metrics *inputMetrics // Metrics for the input. +func (m *eventHubInputManager) Init(unison.Group) error { + return nil } -// NewInput creates a new azure-eventhub input -func NewInput( - cfg *conf.C, - connector channel.Connector, - inputContext input.Context, -) (input.Input, error) { +func (m *eventHubInputManager) Create(cfg *conf.C) (v2.Input, error) { var config azureInputConfig if err := cfg.Unpack(&config); err != nil { return nil, fmt.Errorf("reading %s input config: %w", inputName, err) } - // Since this is a v1 input, we need to set the ID manually. - // - // We need an ID to identify the input in the input metrics - // registry. - // - // This is a temporary workaround until we migrate the input to v2. - inputId, err := configID(cfg) - if err != nil { - return nil, err - } - - inputCtx, cancelInputCtx := context.WithCancel(context.Background()) - go func() { - defer cancelInputCtx() - select { - case <-inputContext.Done: - case <-inputCtx.Done(): - } - }() - - // If the input ever needs to be made restartable, then context would need - // to be recreated with each restart. - workerCtx, workerCancel := context.WithCancel(inputCtx) - - in := azureInput{ - id: inputId, - config: config, - log: logp.NewLogger(fmt.Sprintf("%s input", inputName)).With("connection string", stripConnectionString(config.ConnectionString)), - context: inputContext, - workerCtx: workerCtx, - workerCancel: workerCancel, - } - out, err := connector.Connect(cfg) - if err != nil { - return nil, err - } - in.outlet = out - in.log.Infof("Initialized %s input.", inputName) - - return &in, nil + return newEventHubInputV1(config, m.log) } -// Run starts the `azure-eventhub` input and then returns. -// -// The first invocation will start an input worker. All subsequent -// invocations will be no-ops. -// -// The input worker will continue fetching data from the event hub until -// the input Runner calls the `Stop()` method. -func (a *azureInput) Run() { - // `Run` is invoked periodically by the input Runner. The `sync.Once` - // guarantees that we only start the worker once during the first - // invocation. - a.workerOnce.Do(func() { - a.log.Infof("%s input worker is starting.", inputName) - - // We set up the metrics in the `Run()` method and tear them down - // in the `Stop()` method. - // - // The factory method `NewInput` is not a viable solution because - // the Runner invokes it during the configuration check without - // calling the `Stop()` function; this causes panics - // due to multiple metrics registrations. - a.metrics = newInputMetrics(a.id, nil) - - err := a.runWithEPH() - if err != nil { - a.log.Errorw("error starting the input worker", "error", err) - return - } - a.log.Infof("%s input worker has started.", inputName) +func createPipelineClient(pipeline beat.Pipeline) (beat.Client, error) { + return pipeline.ConnectWith(beat.ClientConfig{ + Processing: beat.ProcessingConfig{ + // This input only produces events with basic types so normalization + // is not required. + EventNormalization: to.Ptr(false), + }, }) } -// Stop stops `azure-eventhub` input. -func (a *azureInput) Stop() { - a.log.Infof("%s input worker is stopping.", inputName) - if a.processor != nil { - // Tells the processor to stop processing events and release all - // resources (like scheduler, leaser, checkpointer, and client). - err := a.processor.Close(context.Background()) - if err != nil { - a.log.Errorw("error while closing eventhostprocessor", "error", err) - } - } - - if a.metrics != nil { - a.metrics.Close() - } - - a.workerCancel() - a.log.Infof("%s input worker has stopped.", inputName) -} - -// Wait stop the current server -func (a *azureInput) Wait() { - a.Stop() -} - -func (a *azureInput) processEvents(event *eventhub.Event, partitionID string) bool { - processingStartTime := time.Now() - azure := mapstr.M{ - // partitionID is only mapped in the non-eph option which is not available yet, this field will be temporary unavailable - //"partition_id": partitionID, - "eventhub": a.config.EventHubName, - "consumer_group": a.config.ConsumerGroup, - } - - // update the input metrics - a.metrics.receivedMessages.Inc() - a.metrics.receivedBytes.Add(uint64(len(event.Data))) - - records := a.parseMultipleRecords(event.Data) - - for _, record := range records { - _, _ = azure.Put("offset", event.SystemProperties.Offset) - _, _ = azure.Put("sequence_number", event.SystemProperties.SequenceNumber) - _, _ = azure.Put("enqueued_time", event.SystemProperties.EnqueuedTime) - ok := a.outlet.OnEvent(beat.Event{ - // this is the default value for the @timestamp field; usually the ingest - // pipeline replaces it with a value in the payload. - Timestamp: processingStartTime, - Fields: mapstr.M{ - "message": record, - "azure": azure, - }, - Private: event.Data, - }) - if !ok { - a.metrics.processingTime.Update(time.Since(processingStartTime).Nanoseconds()) - return ok - } - - a.metrics.sentEvents.Inc() - } - - a.metrics.processedMessages.Inc() - a.metrics.processingTime.Update(time.Since(processingStartTime).Nanoseconds()) - - return true -} - -// parseMultipleRecords will try to split the message into multiple ones based on the group field provided by the configuration -func (a *azureInput) parseMultipleRecords(bMessage []byte) []string { - var mapObject map[string][]interface{} - var messages []string - - // Clean up the message for known issues [1] where Azure services produce malformed JSON documents. - // Sanitization occurs if options are available and the message contains an invalid JSON. - // - // [1]: https://learn.microsoft.com/en-us/answers/questions/1001797/invalid-json-logs-produced-for-function-apps - if len(a.config.SanitizeOptions) != 0 && !json.Valid(bMessage) { - bMessage = sanitize(bMessage, a.config.SanitizeOptions...) - a.metrics.sanitizedMessages.Inc() - } - - // check if the message is a "records" object containing a list of events - err := json.Unmarshal(bMessage, &mapObject) - if err == nil { - if len(mapObject[expandEventListFromField]) > 0 { - for _, ms := range mapObject[expandEventListFromField] { - js, err := json.Marshal(ms) - if err == nil { - messages = append(messages, string(js)) - a.metrics.receivedEvents.Inc() - } else { - a.log.Errorw(fmt.Sprintf("serializing message %s", ms), "error", err) - } - } - } - } else { - a.log.Debugf("deserializing multiple messages to a `records` object returning error: %s", err) - // in some cases the message is an array - var arrayObject []interface{} - err = json.Unmarshal(bMessage, &arrayObject) - if err != nil { - // return entire message - a.log.Debugf("deserializing multiple messages to an array returning error: %s", err) - a.metrics.decodeErrors.Inc() - return []string{string(bMessage)} - } - - for _, ms := range arrayObject { - js, err := json.Marshal(ms) - if err == nil { - messages = append(messages, string(js)) - a.metrics.receivedEvents.Inc() - } else { - a.log.Errorw(fmt.Sprintf("serializing message %s", ms), "error", err) - } - } - } - - return messages -} - // Strip connection string to remove sensitive information // A connection string should look like this: // Endpoint=sb://dummynamespace.servicebus.windows.net/;SharedAccessKeyName=DummyAccessKeyName;SharedAccessKey=5dOntTRytoC24opYThisAsit3is2B+OGY1US/fuL3ly= diff --git a/x-pack/filebeat/input/azureeventhub/input_test.go b/x-pack/filebeat/input/azureeventhub/input_test.go index a5fab488dfc..64f0a1b68e6 100644 --- a/x-pack/filebeat/input/azureeventhub/input_test.go +++ b/x-pack/filebeat/input/azureeventhub/input_test.go @@ -12,20 +12,19 @@ import ( "testing" "time" + "github.com/Azure/go-autorest/autorest/azure" + "github.com/elastic/elastic-agent-libs/logp" - "github.com/elastic/elastic-agent-libs/mapstr" + "github.com/elastic/elastic-agent-libs/monitoring" eventhub "github.com/Azure/azure-event-hubs-go/v3" "github.com/stretchr/testify/assert" - "github.com/elastic/beats/v7/filebeat/channel" - "github.com/elastic/beats/v7/filebeat/input/inputtest" "github.com/elastic/beats/v7/libbeat/beat" - conf "github.com/elastic/elastic-agent-libs/config" ) -var config = azureInputConfig{ +var defaultTestConfig = azureInputConfig{ SAKey: "", SAName: "", SAContainer: ephContainerName, @@ -33,21 +32,38 @@ var config = azureInputConfig{ ConsumerGroup: "", } +func TestGetAzureEnvironment(t *testing.T) { + resMan := "" + env, err := getAzureEnvironment(resMan) + assert.NoError(t, err) + assert.Equal(t, env, azure.PublicCloud) + resMan = "https://management.microsoftazure.de/" + env, err = getAzureEnvironment(resMan) + assert.NoError(t, err) + assert.Equal(t, env, azure.GermanCloud) + resMan = "http://management.invalidhybrid.com/" + _, err = getAzureEnvironment(resMan) + assert.Errorf(t, err, "invalid character 'F' looking for beginning of value") + resMan = "" + env, err = getAzureEnvironment(resMan) + assert.NoError(t, err) + assert.Equal(t, env, azure.PublicCloud) +} + func TestProcessEvents(t *testing.T) { - // Stub outlet for receiving events generated by the input. - o := &stubOutleter{} - out, err := newStubOutlet(o) - if err != nil { - t.Fatal(err) - } + log := logp.NewLogger(fmt.Sprintf("%s test for input", inputName)) + reg := monitoring.NewRegistry() metrics := newInputMetrics("test", reg) defer metrics.Close() - input := azureInput{ - config: config, - metrics: metrics, - outlet: out, + fakePipelineClient := fakeClient{} + + input := eventHubInputV1{ + config: defaultTestConfig, + log: log, + metrics: metrics, + pipelineClient: &fakePipelineClient, } var sn int64 = 12 now := time.Now() @@ -67,12 +83,10 @@ func TestProcessEvents(t *testing.T) { Data: []byte(msg), SystemProperties: &properties, } - ok := input.processEvents(&ev, "0") - if !ok { - t.Fatal("OnEvent function returned false") - } - assert.Equal(t, len(o.Events), 1) - message, err := o.Events[0].Fields.GetValue("message") + input.processEvents(&ev) + + assert.Equal(t, len(fakePipelineClient.publishedEvents), 1) + message, err := fakePipelineClient.publishedEvents[0].Fields.GetValue("message") if err != nil { t.Fatal(err) } @@ -94,12 +108,16 @@ func TestParseMultipleRecords(t *testing.T) { metrics := newInputMetrics("test", reg) defer metrics.Close() - input := azureInput{ - metrics: metrics, - log: logp.NewLogger(fmt.Sprintf("%s test for input", inputName)), + fakePipelineClient := fakeClient{} + + input := eventHubInputV1{ + config: azureInputConfig{}, + log: logp.NewLogger(fmt.Sprintf("%s test for input", inputName)), + metrics: metrics, + pipelineClient: &fakePipelineClient, } - messages := input.parseMultipleRecords([]byte(msg)) + messages := input.unpackRecords([]byte(msg)) assert.NotNil(t, messages) assert.Equal(t, len(messages), 3) for _, ms := range messages { @@ -110,7 +128,7 @@ func TestParseMultipleRecords(t *testing.T) { msg1 := "[{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}," + "{\"test\":\"this is 2nd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}," + "{\"test\":\"this is 3rd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}]" - messages = input.parseMultipleRecords([]byte(msg1)) + messages = input.unpackRecords([]byte(msg1)) assert.NotNil(t, messages) assert.Equal(t, len(messages), 3) for _, ms := range messages { @@ -119,7 +137,7 @@ func TestParseMultipleRecords(t *testing.T) { // one event only msg2 := "{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}" - messages = input.parseMultipleRecords([]byte(msg2)) + messages = input.unpackRecords([]byte(msg2)) assert.NotNil(t, messages) assert.Equal(t, len(messages), 1) for _, ms := range messages { @@ -127,15 +145,16 @@ func TestParseMultipleRecords(t *testing.T) { } } -func TestNewInputDone(t *testing.T) { - config := mapstr.M{ - "connection_string": "Endpoint=sb://something", - "eventhub": "insights-operational-logs", - "storage_account": "someaccount", - "storage_account_key": "secret", - } - inputtest.AssertNotStartedInputCanBeDone(t, NewInput, &config) -} +//func TestNewInputDone(t *testing.T) { +// log := logp.NewLogger(fmt.Sprintf("%s test for input", inputName)) +// config := mapstr.M{ +// "connection_string": "Endpoint=sb://something", +// "eventhub": "insights-operational-logs", +// "storage_account": "someaccount", +// "storage_account_key": "secret", +// } +// inputtest.AssertNotStartedInputCanBeDone(t, NewInput, &config) +//} func TestStripConnectionString(t *testing.T) { tests := []struct { @@ -161,36 +180,22 @@ func TestStripConnectionString(t *testing.T) { } } -type stubOutleter struct { +// ackClient is a fake beat.Client that ACKs the published messages. +type fakeClient struct { sync.Mutex - cond *sync.Cond - done bool - Events []beat.Event + publishedEvents []beat.Event } -func newStubOutlet(stub *stubOutleter) (channel.Outleter, error) { - stub.cond = sync.NewCond(stub) - defer stub.Close() +func (c *fakeClient) Close() error { return nil } - connector := channel.ConnectorFunc(func(_ *conf.C, _ beat.ClientConfig) (channel.Outleter, error) { - return stub, nil - }) - return connector.ConnectWith(nil, beat.ClientConfig{ - Processing: beat.ProcessingConfig{}, - }) +func (c *fakeClient) Publish(event beat.Event) { + c.Lock() + defer c.Unlock() + c.publishedEvents = append(c.publishedEvents, event) } -func (o *stubOutleter) Close() error { - o.Lock() - defer o.Unlock() - o.done = true - return nil -} -func (o *stubOutleter) Done() <-chan struct{} { return nil } -func (o *stubOutleter) OnEvent(event beat.Event) bool { - o.Lock() - defer o.Unlock() - o.Events = append(o.Events, event) - o.cond.Broadcast() - return o.done +func (c *fakeClient) PublishAll(event []beat.Event) { + for _, e := range event { + c.Publish(e) + } } diff --git a/x-pack/filebeat/input/azureeventhub/metrics.go b/x-pack/filebeat/input/azureeventhub/metrics.go index 8aeabc57265..efef262d2c0 100644 --- a/x-pack/filebeat/input/azureeventhub/metrics.go +++ b/x-pack/filebeat/input/azureeventhub/metrics.go @@ -3,7 +3,6 @@ // you may not use this file except in compliance with the Elastic License. //go:build !aix -// +build !aix package azureeventhub @@ -15,7 +14,7 @@ import ( "github.com/elastic/elastic-agent-libs/monitoring/adapter" ) -// newInputMetrics creates a new `*inputMetrics` to track metrics. +// newInputMetrics creates a new `*inputMetrics` to track input metrics. func newInputMetrics(id string, parentRegistry *monitoring.Registry) *inputMetrics { reg, unregister := inputmon.NewInputRegistry(inputName, id, parentRegistry) inputMetrics := inputMetrics{ diff --git a/x-pack/filebeat/input/azureeventhub/metrics_test.go b/x-pack/filebeat/input/azureeventhub/metrics_test.go index b6730c34956..52b9f008f5c 100644 --- a/x-pack/filebeat/input/azureeventhub/metrics_test.go +++ b/x-pack/filebeat/input/azureeventhub/metrics_test.go @@ -117,18 +117,13 @@ func TestInputMetricsEventsReceived(t *testing.T) { reg := monitoring.NewRegistry() metrics := newInputMetrics("test", reg) - // Stub outlet for receiving events generated by the input. - o := &stubOutleter{} - out, err := newStubOutlet(o) - if err != nil { - t.Fatal(err) - } + fakeClient := fakeClient{} - input := azureInput{ - config: inputConfig, - metrics: metrics, - outlet: out, - log: log, + input := eventHubInputV1{ + config: inputConfig, + metrics: metrics, + pipelineClient: &fakeClient, + log: log, } ev := eventhub.Event{ @@ -136,13 +131,10 @@ func TestInputMetricsEventsReceived(t *testing.T) { SystemProperties: &properties, } - ok := input.processEvents(&ev, "0") - if !ok { - t.Fatal("OnEvent function returned false") - } + input.processEvents(&ev) - if ok := assert.Equal(t, len(tc.expectedRecords), len(o.Events)); ok { - for i, e := range o.Events { + if ok := assert.Equal(t, len(tc.expectedRecords), len(fakeClient.publishedEvents)); ok { + for i, e := range fakeClient.publishedEvents { msg, err := e.Fields.GetValue("message") if err != nil { t.Fatal(err) diff --git a/x-pack/filebeat/input/azureeventhub/sanitization_test.go b/x-pack/filebeat/input/azureeventhub/sanitization_test.go index f4d072f5036..6e2645c40d7 100644 --- a/x-pack/filebeat/input/azureeventhub/sanitization_test.go +++ b/x-pack/filebeat/input/azureeventhub/sanitization_test.go @@ -3,7 +3,6 @@ // you may not use this file except in compliance with the Elastic License. //go:build !aix -// +build !aix package azureeventhub @@ -31,15 +30,16 @@ func TestParseMultipleRecordsSanitization(t *testing.T) { metrics := newInputMetrics("test", reg) defer metrics.Close() - input := azureInput{ + input := eventHubInputV1{ config: azureInputConfig{ SanitizeOptions: []string{"SINGLE_QUOTES", "NEW_LINES"}, }, - metrics: metrics, - log: logp.NewLogger(fmt.Sprintf("%s test for input", inputName)), + log: logp.NewLogger(fmt.Sprintf("%s test for input", inputName)), + metrics: metrics, + pipelineClient: &fakeClient{}, } - messages := input.parseMultipleRecords([]byte(msg)) + messages := input.unpackRecords([]byte(msg)) assert.NotNil(t, messages) assert.Equal(t, len(messages), 3) for _, ms := range messages { diff --git a/x-pack/filebeat/input/azureeventhub/v1_input.go b/x-pack/filebeat/input/azureeventhub/v1_input.go new file mode 100644 index 00000000000..43573c812b6 --- /dev/null +++ b/x-pack/filebeat/input/azureeventhub/v1_input.go @@ -0,0 +1,359 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build !aix + +package azureeventhub + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/Azure/go-autorest/autorest/azure" + + eventhub "github.com/Azure/azure-event-hubs-go/v3" + "github.com/Azure/azure-event-hubs-go/v3/eph" + "github.com/Azure/azure-event-hubs-go/v3/storage" + "github.com/Azure/azure-storage-blob-go/azblob" + + v2 "github.com/elastic/beats/v7/filebeat/input/v2" + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/mapstr" +) + +// eventHubInputV1 is the Azure Event Hub input V1. +// +// This input uses the Azure Event Hub SDK v3 (legacy). +type eventHubInputV1 struct { + config azureInputConfig + log *logp.Logger + metrics *inputMetrics + processor *eph.EventProcessorHost + pipelineClient beat.Client +} + +// newEventHubInputV1 creates a new instance of the Azure Event Hub input V1. +// This input uses the Azure Event Hub SDK v3 (legacy). +func newEventHubInputV1(config azureInputConfig, logger *logp.Logger) (v2.Input, error) { + log := logger. + Named(inputName). + With( + "connection string", stripConnectionString(config.ConnectionString), + ) + + return &eventHubInputV1{ + config: config, + log: log, + }, nil +} + +func (in *eventHubInputV1) Name() string { + return inputName +} + +func (in *eventHubInputV1) Test(v2.TestContext) error { + return nil +} + +func (in *eventHubInputV1) Run( + inputContext v2.Context, + pipeline beat.Pipeline, +) error { + var err error + + // Create pipelineClient for publishing events. + in.pipelineClient, err = createPipelineClient(pipeline) + if err != nil { + return fmt.Errorf("failed to create pipeline pipelineClient: %w", err) + } + defer in.pipelineClient.Close() + + // Setup input metrics + in.metrics = newInputMetrics(inputContext.ID, nil) + defer in.metrics.Close() + + ctx := v2.GoContextFromCanceler(inputContext.Cancelation) + + // Initialize the input components + // in preparation for the main run loop. + err = in.setup(ctx) + if err != nil { + return err + } + + // Start the main run loop + err = in.run(ctx) + if err != nil { + in.log.Errorw("error running input", "error", err) + return err + } + + return nil +} + +// setup initializes the input components. +// +// The main components are: +// 1. Azure Storage Leaser / Checkpointer +// 2. Event Processor Host +// 3. Message handler +func (in *eventHubInputV1) setup(ctx context.Context) error { + + // ---------------------------------------------------- + // 1 — Create a new Azure Storage Leaser / Checkpointer + // ---------------------------------------------------- + + cred, err := azblob.NewSharedKeyCredential(in.config.SAName, in.config.SAKey) + if err != nil { + return err + } + + env, err := getAzureEnvironment(in.config.OverrideEnvironment) + if err != nil { + return err + } + + leaserCheckpointer, err := storage.NewStorageLeaserCheckpointer(cred, in.config.SAName, in.config.SAContainer, env) + if err != nil { + in.log.Errorw("error creating storage leaser checkpointer", "error", err) + return err + } + + in.log.Infof("storage leaser checkpointer created for container %q", in.config.SAContainer) + + // ------------------------------------------------ + // 2 — Create a new event processor host + // ------------------------------------------------ + + // adding a nil EventProcessorHostOption will break the code, + // this is why a condition is added and a.processor is assigned. + if in.config.ConsumerGroup != "" { + in.processor, err = eph.NewFromConnectionString( + ctx, + fmt.Sprintf("%s%s%s", in.config.ConnectionString, eventHubConnector, in.config.EventHubName), + leaserCheckpointer, + leaserCheckpointer, + eph.WithConsumerGroup(in.config.ConsumerGroup), + eph.WithNoBanner()) + } else { + in.processor, err = eph.NewFromConnectionString( + ctx, + fmt.Sprintf("%s%s%s", in.config.ConnectionString, eventHubConnector, in.config.EventHubName), + leaserCheckpointer, + leaserCheckpointer, + eph.WithNoBanner()) + } + if err != nil { + in.log.Errorw("error creating processor", "error", err) + return err + } + + in.log.Infof("event processor host created for event hub %q", in.config.EventHubName) + + // ------------------------------------------------ + // 3 — Register a message handler + // ------------------------------------------------ + + // register a message handler -- many can be registered + handlerID, err := in.processor.RegisterHandler(ctx, func(c context.Context, e *eventhub.Event) error { + + // Take the event message from the event hub, + // creates and publishes one (or more) events + // to the beats pipeline. + in.processEvents(e) + + // Why is this function always returning no error? + // + // The legacy SDK does not offer hooks to control + // checkpointing (it internally updates the checkpoint + // info after a successful handler execution). + // + // So we are keeping the existing behaviour (do not + // handle publish acks). + // + // On shutdown, Filebeat stops the input, waits for + // the output to process all the events in the queue. + return nil + }) + if err != nil { + in.log.Errorw("error registering handler", "error", err) + return err + } + + in.log.Infof("handler id: %q is registered\n", handlerID) + + return nil +} + +func (in *eventHubInputV1) run(ctx context.Context) error { + // Start handling messages from all the partitions balancing across + // multiple consumers. + // The processor can be stopped by calling `Close()` on the processor. + + // The `Start()` function is not an option because + // it waits for an `os.Interrupt` signal to stop + // the processor. + err := in.processor.StartNonBlocking(ctx) + if err != nil { + in.log.Errorw("error starting the processor", "error", err) + return err + } + defer func() { + in.log.Infof("%s input worker is stopping.", inputName) + err := in.processor.Close(context.Background()) + if err != nil { + in.log.Errorw("error while closing eventhostprocessor", "error", err) + } + in.log.Infof("%s input worker has stopped.", inputName) + }() + + in.log.Infof("%s input worker has started.", inputName) + + // wait for the context to be done + <-ctx.Done() + + return ctx.Err() +} + +func (in *eventHubInputV1) processEvents(event *eventhub.Event) { + processingStartTime := time.Now() + eventHubMetadata := mapstr.M{ + // The `partition_id` is not available in the + // current version of the SDK. + "eventhub": in.config.EventHubName, + "consumer_group": in.config.ConsumerGroup, + } + + // update the input metrics + in.metrics.receivedMessages.Inc() + in.metrics.receivedBytes.Add(uint64(len(event.Data))) + + records := in.unpackRecords(event.Data) + + for _, record := range records { + _, _ = eventHubMetadata.Put("offset", event.SystemProperties.Offset) + _, _ = eventHubMetadata.Put("sequence_number", event.SystemProperties.SequenceNumber) + _, _ = eventHubMetadata.Put("enqueued_time", event.SystemProperties.EnqueuedTime) + + event := beat.Event{ + // We set the timestamp to the processing + // start time as default value. + // + // Usually, the ingest pipeline replaces it + // with a value in the payload. + Timestamp: processingStartTime, + Fields: mapstr.M{ + "message": record, + "azure": eventHubMetadata, + }, + Private: event.Data, + } + + in.pipelineClient.Publish(event) + + in.metrics.sentEvents.Inc() + } + + in.metrics.processedMessages.Inc() + in.metrics.processingTime.Update(time.Since(processingStartTime).Nanoseconds()) +} + +// unpackRecords will try to split the message into multiple ones based on +// the group field provided by the configuration. +// +// `unpackRecords()` supports two types of messages: +// +// 1. A message with an object with a `records` +// field containing a list of events. +// 2. A message with a single event. +// +// (1) Here is an example of a message containing an object with +// a `records` field: +// +// { +// "records": [ +// { +// "time": "2019-12-17T13:43:44.4946995Z", +// "test": "this is some message" +// } +// ] +// } +// +// (2) Here is an example of a message with a single event: +// +// { +// "time": "2019-12-17T13:43:44.4946995Z", +// "test": "this is some message" +// } +// +// The Diagnostic Settings uses the single object with `records` +// fields (1) when exporting data from an Azure service to an +// event hub. This is the most common case. +func (in *eventHubInputV1) unpackRecords(bMessage []byte) []string { + var mapObject map[string][]interface{} + var messages []string + + // Clean up the message for known issues [1] where Azure services produce malformed JSON documents. + // Sanitization occurs if options are available and the message contains an invalid JSON. + // + // [1]: https://learn.microsoft.com/en-us/answers/questions/1001797/invalid-json-logs-produced-for-function-apps + if len(in.config.SanitizeOptions) != 0 && !json.Valid(bMessage) { + bMessage = sanitize(bMessage, in.config.SanitizeOptions...) + in.metrics.sanitizedMessages.Inc() + } + + // check if the message is a "records" object containing a list of events + err := json.Unmarshal(bMessage, &mapObject) + if err == nil { + if len(mapObject[expandEventListFromField]) > 0 { + for _, ms := range mapObject[expandEventListFromField] { + js, err := json.Marshal(ms) + if err == nil { + messages = append(messages, string(js)) + in.metrics.receivedEvents.Inc() + } else { + in.log.Errorw(fmt.Sprintf("serializing message %s", ms), "error", err) + } + } + } + } else { + in.log.Debugf("deserializing multiple messages to a `records` object returning error: %s", err) + // in some cases the message is an array + var arrayObject []interface{} + err = json.Unmarshal(bMessage, &arrayObject) + if err != nil { + // return entire message + in.log.Debugf("deserializing multiple messages to an array returning error: %s", err) + in.metrics.decodeErrors.Inc() + return []string{string(bMessage)} + } + + for _, ms := range arrayObject { + js, err := json.Marshal(ms) + if err == nil { + messages = append(messages, string(js)) + in.metrics.receivedEvents.Inc() + } else { + in.log.Errorw(fmt.Sprintf("serializing message %s", ms), "error", err) + } + } + } + + return messages +} + +func getAzureEnvironment(overrideResManager string) (azure.Environment, error) { + // if no override is set then the azure public cloud is used + if overrideResManager == "" || overrideResManager == "" { + return azure.PublicCloud, nil + } + if env, ok := environments[overrideResManager]; ok { + return env, nil + } + // can retrieve hybrid env from the resource manager endpoint + return azure.EnvironmentFromURL(overrideResManager) +} diff --git a/x-pack/filebeat/input/default-inputs/inputs_other.go b/x-pack/filebeat/input/default-inputs/inputs_other.go index 2fa63535dbb..6c7708d7c0f 100644 --- a/x-pack/filebeat/input/default-inputs/inputs_other.go +++ b/x-pack/filebeat/input/default-inputs/inputs_other.go @@ -13,6 +13,7 @@ import ( "github.com/elastic/beats/v7/x-pack/filebeat/input/awscloudwatch" "github.com/elastic/beats/v7/x-pack/filebeat/input/awss3" "github.com/elastic/beats/v7/x-pack/filebeat/input/azureblobstorage" + "github.com/elastic/beats/v7/x-pack/filebeat/input/azureeventhub" "github.com/elastic/beats/v7/x-pack/filebeat/input/benchmark" "github.com/elastic/beats/v7/x-pack/filebeat/input/cel" "github.com/elastic/beats/v7/x-pack/filebeat/input/cloudfoundry" @@ -31,6 +32,7 @@ import ( func xpackInputs(info beat.Info, log *logp.Logger, store beater.StateStore) []v2.Plugin { return []v2.Plugin{ azureblobstorage.Plugin(log, store), + azureeventhub.Plugin(log), cel.Plugin(log, store), cloudfoundry.Plugin(), entityanalytics.Plugin(log),