diff --git a/NOTICE.txt b/NOTICE.txt index 562764723fe5..5a04ef0ca55a 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -1742,11 +1742,11 @@ SOFTWARE. -------------------------------------------------------------------------------- Dependency : github.com/Azure/azure-sdk-for-go/sdk/azcore -Version: v1.9.0 +Version: v1.11.1 Licence type (autodetected): MIT -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/!azure/azure-sdk-for-go/sdk/azcore@v1.9.0/LICENSE.txt: +Contents of probable licence file $GOMODCACHE/github.com/!azure/azure-sdk-for-go/sdk/azcore@v1.11.1/LICENSE.txt: MIT License @@ -1773,11 +1773,11 @@ SOFTWARE -------------------------------------------------------------------------------- Dependency : github.com/Azure/azure-sdk-for-go/sdk/azidentity -Version: v1.4.0 +Version: v1.5.2 Licence type (autodetected): MIT -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/!azure/azure-sdk-for-go/sdk/azidentity@v1.4.0/LICENSE.txt: +Contents of probable licence file $GOMODCACHE/github.com/!azure/azure-sdk-for-go/sdk/azidentity@v1.5.2/LICENSE.txt: MIT License @@ -1954,11 +1954,11 @@ SOFTWARE. -------------------------------------------------------------------------------- Dependency : github.com/Azure/azure-sdk-for-go/sdk/storage/azblob -Version: v1.0.0 +Version: v1.3.2 Licence type (autodetected): MIT -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/!azure/azure-sdk-for-go/sdk/storage/azblob@v1.0.0/LICENSE.txt: +Contents of probable licence file $GOMODCACHE/github.com/!azure/azure-sdk-for-go/sdk/storage/azblob@v1.3.2/LICENSE.txt: MIT License @@ -17511,11 +17511,11 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -------------------------------------------------------------------------------- Dependency : github.com/golang-jwt/jwt/v5 -Version: v5.0.0 +Version: v5.2.1 Licence type (autodetected): MIT -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/golang-jwt/jwt/v5@v5.0.0/LICENSE: +Contents of probable licence file $GOMODCACHE/github.com/golang-jwt/jwt/v5@v5.2.1/LICENSE: Copyright (c) 2012 Dave Grijalva Copyright (c) 2021 golang-jwt maintainers @@ -29001,11 +29001,11 @@ Contents of probable licence file $GOMODCACHE/github.com/!azure/azure-pipeline-g -------------------------------------------------------------------------------- Dependency : github.com/Azure/azure-sdk-for-go/sdk/internal -Version: v1.5.0 +Version: v1.7.0 Licence type (autodetected): MIT -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/!azure/azure-sdk-for-go/sdk/internal@v1.5.0/LICENSE.txt: +Contents of probable licence file $GOMODCACHE/github.com/!azure/azure-sdk-for-go/sdk/internal@v1.7.0/LICENSE.txt: MIT License @@ -29122,13 +29122,43 @@ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +-------------------------------------------------------------------------------- +Dependency : github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage +Version: v1.5.0 +Licence type (autodetected): MIT +-------------------------------------------------------------------------------- + +Contents of probable licence file $GOMODCACHE/github.com/!azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage@v1.5.0/LICENSE.txt: + +MIT License + +Copyright (c) Microsoft Corporation. All rights reserved. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + -------------------------------------------------------------------------------- Dependency : github.com/Azure/go-amqp -Version: v1.0.0 +Version: v1.0.5 Licence type (autodetected): MIT -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/!azure/go-amqp@v1.0.0/LICENSE: +Contents of probable licence file $GOMODCACHE/github.com/!azure/go-amqp@v1.0.5/LICENSE: MIT License @@ -30826,11 +30856,11 @@ SOFTWARE. -------------------------------------------------------------------------------- Dependency : github.com/AzureAD/microsoft-authentication-library-for-go -Version: v1.1.1 +Version: v1.2.2 Licence type (autodetected): MIT -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/!azure!a!d/microsoft-authentication-library-for-go@v1.1.1/LICENSE: +Contents of probable licence file $GOMODCACHE/github.com/!azure!a!d/microsoft-authentication-library-for-go@v1.2.2/LICENSE: MIT License @@ -36984,40 +37014,6 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. --------------------------------------------------------------------------------- -Dependency : github.com/dnaeon/go-vcr -Version: v1.2.0 -Licence type (autodetected): BSD-2-Clause --------------------------------------------------------------------------------- - -Contents of probable licence file $GOMODCACHE/github.com/dnaeon/go-vcr@v1.2.0/LICENSE: - -Copyright (c) 2015-2016 Marin Atanasov Nikolov -All rights reserved. - -Redistribution and use in source and binary forms, with or without -modification, are permitted provided that the following conditions -are met: - - 1. Redistributions of source code must retain the above copyright - notice, this list of conditions and the following disclaimer - in this position and unchanged. - 2. Redistributions in binary form must reproduce the above copyright - notice, this list of conditions and the following disclaimer in the - documentation and/or other materials provided with the distribution. - -THIS SOFTWARE IS PROVIDED BY THE AUTHOR(S) ``AS IS'' AND ANY EXPRESS OR -IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES -OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. -IN NO EVENT SHALL THE AUTHOR(S) BE LIABLE FOR ANY DIRECT, INDIRECT, -INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT -NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF -THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - - -------------------------------------------------------------------------------- Dependency : github.com/dnephin/pflag Version: v1.0.7 @@ -45608,11 +45604,11 @@ SOFTWARE. -------------------------------------------------------------------------------- Dependency : github.com/joho/godotenv -Version: v1.3.0 +Version: v1.5.1 Licence type (autodetected): MIT -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/joho/godotenv@v1.3.0/LICENCE: +Contents of probable licence file $GOMODCACHE/github.com/joho/godotenv@v1.5.1/LICENCE: Copyright (c) 2013 John Barton @@ -49585,11 +49581,11 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -------------------------------------------------------------------------------- Dependency : github.com/pkg/browser -Version: v0.0.0-20210911075715-681adbf594b8 +Version: v0.0.0-20240102092130-5ac0b6a4141c Licence type (autodetected): BSD-2-Clause -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/pkg/browser@v0.0.0-20210911075715-681adbf594b8/LICENSE: +Contents of probable licence file $GOMODCACHE/github.com/pkg/browser@v0.0.0-20240102092130-5ac0b6a4141c/LICENSE: Copyright (c) 2014, Dave Cheney All rights reserved. @@ -49899,11 +49895,11 @@ Contents of probable licence file $GOMODCACHE/github.com/prometheus/client_golan -------------------------------------------------------------------------------- Dependency : github.com/rogpeppe/go-internal -Version: v1.11.0 +Version: v1.12.0 Licence type (autodetected): BSD-3-Clause -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/rogpeppe/go-internal@v1.11.0/LICENSE: +Contents of probable licence file $GOMODCACHE/github.com/rogpeppe/go-internal@v1.12.0/LICENSE: Copyright (c) 2018 The Go Authors. All rights reserved. diff --git a/go.mod b/go.mod index 9d71206e92c7..6bf9204f4935 100644 --- a/go.mod +++ b/go.mod @@ -180,14 +180,14 @@ require ( cloud.google.com/go v0.110.8 cloud.google.com/go/compute v1.23.0 cloud.google.com/go/redis v1.13.1 - github.com/Azure/azure-sdk-for-go/sdk/azcore v1.9.0 - github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.4.0 + github.com/Azure/azure-sdk-for-go/sdk/azcore v1.11.1 + github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.5.2 github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/consumption/armconsumption v1.1.0 github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/containerservice/armcontainerservice/v4 v4.6.0 github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/costmanagement/armcostmanagement v1.1.0 github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/monitor/armmonitor v0.8.0 github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources v1.1.1 - github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.0.0 + github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.3.2 github.com/Azure/go-autorest/autorest/adal v0.9.21 github.com/apache/arrow/go/v14 v14.0.2 github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.1 @@ -209,7 +209,7 @@ require ( github.com/elastic/toutoumomoma v0.0.0-20221026030040-594ef30cb640 github.com/foxcpp/go-mockdns v0.0.0-20201212160233-ede2f9158d15 github.com/go-ldap/ldap/v3 v3.4.6 - github.com/golang-jwt/jwt/v5 v5.0.0 + github.com/golang-jwt/jwt/v5 v5.2.1 github.com/google/cel-go v0.19.0 github.com/googleapis/gax-go/v2 v2.12.0 github.com/gorilla/handlers v1.5.1 @@ -240,15 +240,15 @@ require ( github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24 // indirect github.com/Azure/azure-amqp-common-go/v4 v4.2.0 // indirect github.com/Azure/azure-pipeline-go v0.2.3 // indirect - github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.0 // indirect - github.com/Azure/go-amqp v1.0.0 // indirect + github.com/Azure/azure-sdk-for-go/sdk/internal v1.7.0 // indirect + github.com/Azure/go-amqp v1.0.5 // indirect github.com/Azure/go-autorest v14.2.0+incompatible // indirect github.com/Azure/go-autorest/autorest/to v0.4.0 // indirect github.com/Azure/go-autorest/autorest/validation v0.3.1 // indirect github.com/Azure/go-autorest/logger v0.2.1 // indirect github.com/Azure/go-autorest/tracing v0.6.0 // indirect github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 // indirect - github.com/AzureAD/microsoft-authentication-library-for-go v1.1.1 // indirect + github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2 // indirect github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect github.com/andybalholm/brotli v1.0.5 // indirect github.com/antlr4-go/antlr/v4 v4.13.0 // indirect @@ -320,6 +320,7 @@ require ( github.com/jcmturner/gokrb5/v8 v8.4.2 // indirect github.com/jcmturner/rpc/v2 v2.0.3 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect + github.com/joho/godotenv v1.5.1 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/karrick/godirwalk v1.17.0 // indirect @@ -348,7 +349,7 @@ require ( github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.0.2 // indirect github.com/pierrec/lz4 v2.6.0+incompatible // indirect - github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 // indirect + github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect github.com/prometheus/client_golang v1.11.1 // indirect github.com/rootless-containers/rootlesskit v1.1.0 // indirect diff --git a/go.sum b/go.sum index ed2a429b6476..12ad7d6a691c 100644 --- a/go.sum +++ b/go.sum @@ -96,18 +96,18 @@ github.com/Azure/azure-sdk-for-go/sdk/azcore v0.19.0/go.mod h1:h6H6c8enJmmocHUbL github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U= github.com/Azure/azure-sdk-for-go/sdk/azcore v1.3.0/go.mod h1:tZoQYdDZNOiIjdSn0dVWVfl0NEPGOJqVLzSrcFk4Is0= github.com/Azure/azure-sdk-for-go/sdk/azcore v1.4.0/go.mod h1:ON4tFdPTwRcgWEaVDrN3584Ef+b7GgSJaXxe5fW9t4M= -github.com/Azure/azure-sdk-for-go/sdk/azcore v1.9.0 h1:fb8kj/Dh4CSwgsOzHeZY4Xh68cFVbzXx+ONXGMY//4w= -github.com/Azure/azure-sdk-for-go/sdk/azcore v1.9.0/go.mod h1:uReU2sSxZExRPBAg3qKzmAucSi51+SP1OhohieR821Q= +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.11.1 h1:E+OJmp2tPvt1W+amx48v1eqbjDYsgN+RzP4q16yV5eM= +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.11.1/go.mod h1:a6xsAQUZg+VsS3TJ05SRp524Hs4pZ/AeFSr5ENf0Yjo= github.com/Azure/azure-sdk-for-go/sdk/azidentity v0.11.0/go.mod h1:HcM1YX14R7CJcghJGOYCgdezslRSVzqwLf/q+4Y2r/0= github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0/go.mod h1:bhXu1AjYL+wutSL/kpSq6s7733q2Rb0yuot9Zgfqa/0= -github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.4.0 h1:BMAjVKJM0U/CYF27gA0ZMmXGkOcvfFtD0oHVZ1TIPRI= -github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.4.0/go.mod h1:1fXstnBMas5kzG+S3q8UoJcmyU6nUeunJcMDHcRYHhs= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.5.2 h1:FDif4R1+UUR+00q6wquyX90K7A8dN+R5E8GEadoP7sU= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.5.2/go.mod h1:aiYBYui4BJ/BJCAIKs92XiPyQfTaBWqvHujDwKb6CBU= github.com/Azure/azure-sdk-for-go/sdk/internal v0.7.0/go.mod h1:yqy467j36fJxcRV2TzfVZ1pCb5vxm4BtZPUdYWe/Xo8= github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0/go.mod h1:eWRD7oawr1Mu1sLCawqVc0CUiF43ia3qQMxLscsKQ9w= github.com/Azure/azure-sdk-for-go/sdk/internal v1.1.1/go.mod h1:eWRD7oawr1Mu1sLCawqVc0CUiF43ia3qQMxLscsKQ9w= github.com/Azure/azure-sdk-for-go/sdk/internal v1.1.2/go.mod h1:eWRD7oawr1Mu1sLCawqVc0CUiF43ia3qQMxLscsKQ9w= -github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.0 h1:d81/ng9rET2YqdVkVwkb6EXeRrLJIwyGnJcAlAWKwhs= -github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.0/go.mod h1:s4kgfzA0covAXNicZHDMN58jExvcng2mC/DepXiF1EI= +github.com/Azure/azure-sdk-for-go/sdk/internal v1.7.0 h1:rTfKOCZGy5ViVrlA74ZPE99a+SgoEE2K/yg3RyW9dFA= +github.com/Azure/azure-sdk-for-go/sdk/internal v1.7.0/go.mod h1:4OG6tQ9EOP/MT0NMjDlRzWoVFxfu9rN9B2X+tlSVktg= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/containerservice/armcontainerservice/v4 v4.6.0 h1:AAIdAyPkFff6XTct2lQCxOWN/+LnA41S7kIkzKaMbyE= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/containerservice/armcontainerservice/v4 v4.6.0/go.mod h1:noQIdW75SiQFB3mSFJBr4iRRH83S9skaFiBv4C0uEs0= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/costmanagement/armcostmanagement v1.1.0 h1:1MRED2aeLx/BPHC23XRtr8Mk6zcc70HNRYPQ73R0gHw= @@ -122,12 +122,15 @@ github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/monitor/armmonitor v0.8.0 github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/monitor/armmonitor v0.8.0/go.mod h1:kzRLpzzlw6eBUXE7eBw3oqfmKR/kxaHOk4+h9sAe6Yo= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources v1.1.1 h1:7CBQ+Ei8SP2c6ydQTGCCrS35bDxgTMfoP2miAwK++OU= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources v1.1.1/go.mod h1:c/wcGeGx5FUPbM/JltUYHZcKmigwyVLJlDq+4HdtXaw= -github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.0.0 h1:u/LLAOFgsMv7HmNL4Qufg58y+qElGOt5qv0z1mURkRY= +github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.5.0 h1:AifHbc4mg0x9zW52WOpKbsHaDKuRhlI7TVl47thgQ70= +github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.5.0/go.mod h1:T5RfihdXtBDxt1Ch2wobif3TvzTdumDy29kahv6AV9A= github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.0.0/go.mod h1:2e8rMJtl2+2j+HXbTBwnyGpm5Nou7KhvSfxOq8JpTag= +github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.3.2 h1:YUUxeiOWgdAQE3pXt2H7QXzZs0q8UBjgRbl56qo8GYM= +github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.3.2/go.mod h1:dmXQgZuiSubAecswZE+Sm8jkvEa7kQgTPVRvwL/nd0E= github.com/Azure/azure-storage-blob-go v0.15.0 h1:rXtgp8tN1p29GvpGgfJetavIG0V7OgcSXPpwp3tx6qk= github.com/Azure/azure-storage-blob-go v0.15.0/go.mod h1:vbjsVbX0dlxnRc4FFMPsS9BsJWPcne7GB7onqlPvz58= -github.com/Azure/go-amqp v1.0.0 h1:QfCugi1M+4F2JDTRgVnRw7PYXLXZ9hmqk3+9+oJh3OA= -github.com/Azure/go-amqp v1.0.0/go.mod h1:+bg0x3ce5+Q3ahCEXnCsGG3ETpDQe3MEVnOuT2ywPwc= +github.com/Azure/go-amqp v1.0.5 h1:po5+ljlcNSU8xtapHTe8gIc8yHxCzC03E8afH2g1ftU= +github.com/Azure/go-amqp v1.0.5/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE= github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78/go.mod h1:LmzpDX56iTiv29bbRTIsUNlaFfuhWRQBWjQdVyAevI8= github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 h1:L/gRVlceqvL25UVaW/CKtUDjefjrs0SPonmDGUVOYP0= github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= @@ -182,8 +185,8 @@ github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBp github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 h1:mFRzDkZVAjdal+s7s0MwaRv9igoPqLRdzOLzw/8Xvq8= github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358/go.mod h1:chxPXzSsl7ZWRAuOIE23GDNzjWuZquvFlgA8xmpunjU= github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1/go.mod h1:Vt9sXTKwMyGcOxSmLDMnGPgqsUg7m8pe215qMLrDXw4= -github.com/AzureAD/microsoft-authentication-library-for-go v1.1.1 h1:WpB/QDNLpMw72xHJc34BNNykqSOeEJDAWkhf0u12/Jk= -github.com/AzureAD/microsoft-authentication-library-for-go v1.1.1/go.mod h1:wP83P5OoQ5p6ip3ScPr0BAq0BvuPAvacpEuSzyouqAI= +github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2 h1:XHOnouVk1mxXfQidrMEnLlPk9UMeRtyBTnEFtxkV0kU= +github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2/go.mod h1:wP83P5OoQ5p6ip3ScPr0BAq0BvuPAvacpEuSzyouqAI= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= @@ -495,7 +498,6 @@ github.com/dlclark/regexp2 v1.4.0 h1:F1rxgk7p4uKjwIQxBs9oAXe5CqrXlCduYEJvrF4u93E github.com/dlclark/regexp2 v1.4.0/go.mod h1:2pZnwuY/m+8K6iRw6wQdMtk+rH5tNGR1i55kozfMjCc= github.com/dnaeon/go-vcr v1.0.1/go.mod h1:aBB1+wY4s93YsC3HHjMBMrwTj2R9FHDzUr9KyGc8n1E= github.com/dnaeon/go-vcr v1.1.0/go.mod h1:M7tiix8f0r6mKKJ3Yq/kqU1OYf3MnfmBWVbPx/yU9ko= -github.com/dnaeon/go-vcr v1.2.0 h1:zHCHvJYTMh1N7xnV7zf1m1GPBF9Ad0Jk/whtQ1663qI= github.com/dnaeon/go-vcr v1.2.0/go.mod h1:R4UdLID7HZT3taECzJs4YgbbH6PIGXB6W/sc5OLb6RQ= github.com/dnephin/pflag v1.0.7 h1:oxONGlWxhmUct0YzKTgrpQv9AUA1wtPBn7zuSjJqptk= github.com/dnephin/pflag v1.0.7/go.mod h1:uxE91IoWURlOiTUIA8Mq5ZZkAv3dPUfZNaT80Zm7OQE= @@ -866,8 +868,8 @@ github.com/golang-jwt/jwt/v4 v4.0.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzw github.com/golang-jwt/jwt/v4 v4.2.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzwAxVc6locg= github.com/golang-jwt/jwt/v4 v4.5.0 h1:7cYmW1XlMY7h7ii7UhUyChSgS5wUJEnm9uZVTGqOWzg= github.com/golang-jwt/jwt/v4 v4.5.0/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= -github.com/golang-jwt/jwt/v5 v5.0.0 h1:1n1XNM9hk7O9mnQoNBGolZvzebBQ7p93ULHRc28XJUE= -github.com/golang-jwt/jwt/v5 v5.0.0/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= +github.com/golang-jwt/jwt/v5 v5.2.1 h1:OuVbFODueb089Lh128TAcimifWaLhJwVflnrgM17wHk= +github.com/golang-jwt/jwt/v5 v5.2.1/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe h1:lXe2qZdvpiX5WZkZR4hgp4KJVfY3nMkvmwbVkpv1rVY= github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0= github.com/golang-sql/sqlexp v0.1.0 h1:ZCD6MBpcuOVfGVqsEmY5/4FtYiKz6tSyUv9LPEDei6A= @@ -1145,8 +1147,9 @@ github.com/jmoiron/sqlx v1.3.1 h1:aLN7YINNZ7cYOPK3QC83dbM6KT0NMqVMw961TqrejlE= github.com/jmoiron/sqlx v1.3.1/go.mod h1:2BljVx/86SuTyjE+aPYlHCTNvZrnJXghYGpNiXLBMCQ= github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901 h1:rp+c0RAYOWj8l6qbCUTSiRLG/iKnW3K3/QfPPuSsBt4= github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901/go.mod h1:Z86h9688Y0wesXCyonoVr47MasHilkuLMqGhRZ4Hpak= -github.com/joho/godotenv v1.3.0 h1:Zjp+RcGpHhGlrMbJzXTrZZPrWj+1vfm90La1wgB6Bhc= github.com/joho/godotenv v1.3.0/go.mod h1:7hK45KPybAkOC6peb+G5yklZfMxEjkZhHbwpqxOKXbg= +github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= +github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= github.com/jonboulle/clockwork v0.2.2 h1:UOGuzwb1PwsrDAObMuhUnj0p5ULPj8V/xJ7Kx9qUBdQ= github.com/jonboulle/clockwork v0.2.2/go.mod h1:Pkfl5aHPm1nk2H9h0bjmnJD/BcgbGXUBGnn1kMkgxc8= @@ -1429,8 +1432,9 @@ github.com/pierrre/gotestcover v0.0.0-20160517101806-924dca7d15f0 h1:i5VIxp6QB8o github.com/pierrre/gotestcover v0.0.0-20160517101806-924dca7d15f0/go.mod h1:4xpMLz7RBWyB+ElzHu8Llua96TRCB3YwX+l5EP1wmHk= github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4/go.mod h1:4OwLy04Bl9Ef3GJJCoec+30X3LQs/0/m4HFRt/2LUSA= github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4/go.mod h1:N6UoU20jOqggOuDwUaBQpluzLNDqif3kq9z2wpdYEfQ= -github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 h1:KoWmjvw+nsYOo29YJK9vDA65RGE3NrOnUtO7a+RF9HU= github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8/go.mod h1:HKlIX3XHQyzLZPlr7++PzdhaXEj94dEiJgZDTsxEqUI= +github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c h1:+mdjkGKdHQG3305AYmdv1U2eRNDiU2ErMBj1gwrq8eQ= +github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c/go.mod h1:7rwL4CYBLnjLxUqIJNnCWiEdr3bn6IUYi15bNlnbCCU= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -1511,8 +1515,8 @@ github.com/rogpeppe/go-internal v1.2.2/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFR github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= -github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= -github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA= +github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= +github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/rootless-containers/rootlesskit v1.1.0 h1:cRaRIYxY8oce4eE/zeAUZhgKu/4tU1p9YHN4+suwV7M= github.com/rootless-containers/rootlesskit v1.1.0/go.mod h1:H+o9ndNe7tS91WqU0/+vpvc+VaCd7TCIWaJjnV0ujUo= github.com/rs/cors v1.6.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU= @@ -2080,6 +2084,7 @@ golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220829200755-d48e67d00261/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/x-pack/filebeat/include/list.go b/x-pack/filebeat/include/list.go index fcca6f27de86..5e2cc02a4c94 100644 --- a/x-pack/filebeat/include/list.go +++ b/x-pack/filebeat/include/list.go @@ -15,7 +15,6 @@ import ( // Import packages that perform 'func init()'. _ "github.com/elastic/beats/v7/x-pack/filebeat/input/awscloudwatch" _ "github.com/elastic/beats/v7/x-pack/filebeat/input/awss3" - _ "github.com/elastic/beats/v7/x-pack/filebeat/input/azureeventhub" _ "github.com/elastic/beats/v7/x-pack/filebeat/input/cometd" _ "github.com/elastic/beats/v7/x-pack/filebeat/input/etw" _ "github.com/elastic/beats/v7/x-pack/filebeat/input/gcppubsub" diff --git a/x-pack/filebeat/input/azureeventhub/eph.go b/x-pack/filebeat/input/azureeventhub/eph.go deleted file mode 100644 index f6981b4882e5..000000000000 --- a/x-pack/filebeat/input/azureeventhub/eph.go +++ /dev/null @@ -1,114 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -//go:build !aix - -package azureeventhub - -import ( - "context" - "errors" - "fmt" - - eventhub "github.com/Azure/azure-event-hubs-go/v3" - "github.com/Azure/azure-event-hubs-go/v3/eph" - "github.com/Azure/azure-event-hubs-go/v3/storage" - "github.com/Azure/azure-storage-blob-go/azblob" - "github.com/Azure/go-autorest/autorest/azure" -) - -// users can select from one of the already defined azure cloud envs -var environments = map[string]azure.Environment{ - azure.ChinaCloud.ResourceManagerEndpoint: azure.ChinaCloud, - azure.GermanCloud.ResourceManagerEndpoint: azure.GermanCloud, - azure.PublicCloud.ResourceManagerEndpoint: azure.PublicCloud, - azure.USGovernmentCloud.ResourceManagerEndpoint: azure.USGovernmentCloud, -} - -// runWithEPH will consume ingested events using the Event Processor Host (EPH). -// -// To learn more, check the following resources: -// - https://github.com/Azure/azure-event-hubs-go#event-processor-host -// - https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-event-processor-host -func (a *azureInput) runWithEPH() error { - // create a new Azure Storage Leaser / Checkpointer - cred, err := azblob.NewSharedKeyCredential(a.config.SAName, a.config.SAKey) - if err != nil { - return err - } - env, err := getAzureEnvironment(a.config.OverrideEnvironment) - if err != nil { - return err - } - leaserCheckpointer, err := storage.NewStorageLeaserCheckpointer(cred, a.config.SAName, a.config.SAContainer, env) - if err != nil { - a.log.Errorw("error creating storage leaser checkpointer", "error", err) - return err - } - - // adding a nil EventProcessorHostOption will break the code, - // this is why a condition is added and a.processor is assigned. - if a.config.ConsumerGroup != "" { - a.processor, err = eph.NewFromConnectionString( - a.workerCtx, - fmt.Sprintf("%s%s%s", a.config.ConnectionString, eventHubConnector, a.config.EventHubName), - leaserCheckpointer, - leaserCheckpointer, - eph.WithConsumerGroup(a.config.ConsumerGroup), - eph.WithNoBanner()) - } else { - a.processor, err = eph.NewFromConnectionString( - a.workerCtx, - fmt.Sprintf("%s%s%s", a.config.ConnectionString, eventHubConnector, a.config.EventHubName), - leaserCheckpointer, - leaserCheckpointer, - eph.WithNoBanner()) - } - if err != nil { - a.log.Errorw("error creating processor", "error", err) - return err - } - - // register a message handler -- many can be registered - handlerID, err := a.processor.RegisterHandler(a.workerCtx, - func(c context.Context, e *eventhub.Event) error { - var onEventErr error - // partitionID is not yet mapped in the azure-eventhub sdk - ok := a.processEvents(e, "") - if !ok { - onEventErr = errors.New("OnEvent function returned false. Stopping input worker") - a.log.Error(onEventErr.Error()) - a.Stop() - } - return onEventErr - }) - if err != nil { - a.log.Errorw("error registering handler", "error", err) - return err - } - a.log.Infof("handler id: %q is registered\n", handlerID) - - // Start handling messages from all of the partitions balancing across - // multiple consumers. - // The processor can be stopped by calling `Close()` on the processor. - err = a.processor.StartNonBlocking(a.workerCtx) - if err != nil { - a.log.Errorw("error starting the processor", "error", err) - return err - } - - return nil -} - -func getAzureEnvironment(overrideResManager string) (azure.Environment, error) { - // if no override is set then the azure public cloud is used - if overrideResManager == "" || overrideResManager == "" { - return azure.PublicCloud, nil - } - if env, ok := environments[overrideResManager]; ok { - return env, nil - } - // can retrieve hybrid env from the resource manager endpoint - return azure.EnvironmentFromURL(overrideResManager) -} diff --git a/x-pack/filebeat/input/azureeventhub/eph_test.go b/x-pack/filebeat/input/azureeventhub/eph_test.go deleted file mode 100644 index 86922d295597..000000000000 --- a/x-pack/filebeat/input/azureeventhub/eph_test.go +++ /dev/null @@ -1,48 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -//go:build !aix - -package azureeventhub - -import ( - "testing" - - "github.com/Azure/go-autorest/autorest/azure" - - "github.com/stretchr/testify/assert" -) - -var invalidConfig = azureInputConfig{ - SAKey: "invalid_key", - SAName: "storage", - SAContainer: ephContainerName, - ConnectionString: "invalid_connection_string", - ConsumerGroup: "$Default", -} - -func TestRunWithEPH(t *testing.T) { - input := azureInput{config: invalidConfig} - // decoding error when key is invalid - err := input.runWithEPH() - assert.Error(t, err, '7') -} - -func TestGetAzureEnvironment(t *testing.T) { - resMan := "" - env, err := getAzureEnvironment(resMan) - assert.NoError(t, err) - assert.Equal(t, env, azure.PublicCloud) - resMan = "https://management.microsoftazure.de/" - env, err = getAzureEnvironment(resMan) - assert.NoError(t, err) - assert.Equal(t, env, azure.GermanCloud) - resMan = "http://management.invalidhybrid.com/" - _, err = getAzureEnvironment(resMan) - assert.Errorf(t, err, "invalid character 'F' looking for beginning of value") - resMan = "" - env, err = getAzureEnvironment(resMan) - assert.NoError(t, err) - assert.Equal(t, env, azure.PublicCloud) -} diff --git a/x-pack/filebeat/input/azureeventhub/input.go b/x-pack/filebeat/input/azureeventhub/input.go index bc2244925e61..a12083f2abec 100644 --- a/x-pack/filebeat/input/azureeventhub/input.go +++ b/x-pack/filebeat/input/azureeventhub/input.go @@ -7,23 +7,18 @@ package azureeventhub import ( - "context" - "encoding/json" "fmt" "strings" - "sync" - "time" - eventhub "github.com/Azure/azure-event-hubs-go/v3" - "github.com/Azure/azure-event-hubs-go/v3/eph" - "github.com/mitchellh/hashstructure" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" + "github.com/Azure/go-autorest/autorest/azure" - "github.com/elastic/beats/v7/filebeat/channel" - "github.com/elastic/beats/v7/filebeat/input" + v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/feature" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" - "github.com/elastic/elastic-agent-libs/mapstr" + "github.com/elastic/go-concert/unison" ) const ( @@ -32,262 +27,60 @@ const ( inputName = "azure-eventhub" ) -func init() { - err := input.Register(inputName, NewInput) - if err != nil { - panic(fmt.Errorf("failed to register %v input: %w", inputName, err)) - } +var environments = map[string]azure.Environment{ + azure.ChinaCloud.ResourceManagerEndpoint: azure.ChinaCloud, + azure.GermanCloud.ResourceManagerEndpoint: azure.GermanCloud, + azure.PublicCloud.ResourceManagerEndpoint: azure.PublicCloud, + azure.USGovernmentCloud.ResourceManagerEndpoint: azure.USGovernmentCloud, } -// configID computes a unique ID for the input configuration. -// -// It is used to identify the input in the registry and to detect -// changes in the configuration. +// Plugin returns the Azure Event Hub input plugin. // -// We will remove this function as we upgrade the input to the -// v2 API (there is an ID in the v2 context). -func configID(config *conf.C) (string, error) { - var tmp struct { - ID string `config:"id"` - } - if err := config.Unpack(&tmp); err != nil { - return "", fmt.Errorf("error extracting ID: %w", err) - } - if tmp.ID != "" { - return tmp.ID, nil - } - - var h map[string]interface{} - _ = config.Unpack(&h) - id, err := hashstructure.Hash(h, nil) - if err != nil { - return "", fmt.Errorf("can not compute ID from configuration: %w", err) +// Required register the plugin loader for the +// input API v2. +func Plugin(log *logp.Logger) v2.Plugin { + return v2.Plugin{ + Name: inputName, + Stability: feature.Stable, + Deprecated: false, + Info: "Collect logs from Azure Event Hub", + Manager: &eventHubInputManager{ + log: log, + }, } +} - return fmt.Sprintf("%16X", id), nil +// eventHubInputManager is the manager for the Azure Event Hub input. +// +// It is responsible for creating new instances of the input, according +// to the configuration provided. +type eventHubInputManager struct { + log *logp.Logger } -// azureInput struct for the azure-eventhub input -type azureInput struct { - config azureInputConfig // azure-eventhub configuration - context input.Context - outlet channel.Outleter - log *logp.Logger // logging info and error messages - workerCtx context.Context // worker goroutine context. It's cancelled when the input stops or the worker exits. - workerCancel context.CancelFunc // used to signal that the worker should stop. - workerOnce sync.Once // guarantees that the worker goroutine is only started once. - processor *eph.EventProcessorHost // eph will be assigned if users have enabled the option - id string // ID of the input; used to identify the input in the input metrics registry only, and will be removed once the input is migrated to v2. - metrics *inputMetrics // Metrics for the input. +func (m *eventHubInputManager) Init(unison.Group) error { + return nil } -// NewInput creates a new azure-eventhub input -func NewInput( - cfg *conf.C, - connector channel.Connector, - inputContext input.Context, -) (input.Input, error) { +func (m *eventHubInputManager) Create(cfg *conf.C) (v2.Input, error) { var config azureInputConfig if err := cfg.Unpack(&config); err != nil { return nil, fmt.Errorf("reading %s input config: %w", inputName, err) } - // Since this is a v1 input, we need to set the ID manually. - // - // We need an ID to identify the input in the input metrics - // registry. - // - // This is a temporary workaround until we migrate the input to v2. - inputId, err := configID(cfg) - if err != nil { - return nil, err - } - - inputCtx, cancelInputCtx := context.WithCancel(context.Background()) - go func() { - defer cancelInputCtx() - select { - case <-inputContext.Done: - case <-inputCtx.Done(): - } - }() - - // If the input ever needs to be made restartable, then context would need - // to be recreated with each restart. - workerCtx, workerCancel := context.WithCancel(inputCtx) - - in := azureInput{ - id: inputId, - config: config, - log: logp.NewLogger(fmt.Sprintf("%s input", inputName)).With("connection string", stripConnectionString(config.ConnectionString)), - context: inputContext, - workerCtx: workerCtx, - workerCancel: workerCancel, - } - out, err := connector.Connect(cfg) - if err != nil { - return nil, err - } - in.outlet = out - in.log.Infof("Initialized %s input.", inputName) - - return &in, nil + return newEventHubInputV1(config, m.log) } -// Run starts the `azure-eventhub` input and then returns. -// -// The first invocation will start an input worker. All subsequent -// invocations will be no-ops. -// -// The input worker will continue fetching data from the event hub until -// the input Runner calls the `Stop()` method. -func (a *azureInput) Run() { - // `Run` is invoked periodically by the input Runner. The `sync.Once` - // guarantees that we only start the worker once during the first - // invocation. - a.workerOnce.Do(func() { - a.log.Infof("%s input worker is starting.", inputName) - - // We set up the metrics in the `Run()` method and tear them down - // in the `Stop()` method. - // - // The factory method `NewInput` is not a viable solution because - // the Runner invokes it during the configuration check without - // calling the `Stop()` function; this causes panics - // due to multiple metrics registrations. - a.metrics = newInputMetrics(a.id, nil) - - err := a.runWithEPH() - if err != nil { - a.log.Errorw("error starting the input worker", "error", err) - return - } - a.log.Infof("%s input worker has started.", inputName) +func createPipelineClient(pipeline beat.Pipeline) (beat.Client, error) { + return pipeline.ConnectWith(beat.ClientConfig{ + Processing: beat.ProcessingConfig{ + // This input only produces events with basic types so normalization + // is not required. + EventNormalization: to.Ptr(false), + }, }) } -// Stop stops `azure-eventhub` input. -func (a *azureInput) Stop() { - a.log.Infof("%s input worker is stopping.", inputName) - if a.processor != nil { - // Tells the processor to stop processing events and release all - // resources (like scheduler, leaser, checkpointer, and client). - err := a.processor.Close(context.Background()) - if err != nil { - a.log.Errorw("error while closing eventhostprocessor", "error", err) - } - } - - if a.metrics != nil { - a.metrics.Close() - } - - a.workerCancel() - a.log.Infof("%s input worker has stopped.", inputName) -} - -// Wait stop the current server -func (a *azureInput) Wait() { - a.Stop() -} - -func (a *azureInput) processEvents(event *eventhub.Event, partitionID string) bool { - processingStartTime := time.Now() - azure := mapstr.M{ - // partitionID is only mapped in the non-eph option which is not available yet, this field will be temporary unavailable - //"partition_id": partitionID, - "eventhub": a.config.EventHubName, - "consumer_group": a.config.ConsumerGroup, - } - - // update the input metrics - a.metrics.receivedMessages.Inc() - a.metrics.receivedBytes.Add(uint64(len(event.Data))) - - records := a.parseMultipleRecords(event.Data) - - for _, record := range records { - _, _ = azure.Put("offset", event.SystemProperties.Offset) - _, _ = azure.Put("sequence_number", event.SystemProperties.SequenceNumber) - _, _ = azure.Put("enqueued_time", event.SystemProperties.EnqueuedTime) - ok := a.outlet.OnEvent(beat.Event{ - // this is the default value for the @timestamp field; usually the ingest - // pipeline replaces it with a value in the payload. - Timestamp: processingStartTime, - Fields: mapstr.M{ - "message": record, - "azure": azure, - }, - Private: event.Data, - }) - if !ok { - a.metrics.processingTime.Update(time.Since(processingStartTime).Nanoseconds()) - return ok - } - - a.metrics.sentEvents.Inc() - } - - a.metrics.processedMessages.Inc() - a.metrics.processingTime.Update(time.Since(processingStartTime).Nanoseconds()) - - return true -} - -// parseMultipleRecords will try to split the message into multiple ones based on the group field provided by the configuration -func (a *azureInput) parseMultipleRecords(bMessage []byte) []string { - var mapObject map[string][]interface{} - var messages []string - - // Clean up the message for known issues [1] where Azure services produce malformed JSON documents. - // Sanitization occurs if options are available and the message contains an invalid JSON. - // - // [1]: https://learn.microsoft.com/en-us/answers/questions/1001797/invalid-json-logs-produced-for-function-apps - if len(a.config.SanitizeOptions) != 0 && !json.Valid(bMessage) { - bMessage = sanitize(bMessage, a.config.SanitizeOptions...) - a.metrics.sanitizedMessages.Inc() - } - - // check if the message is a "records" object containing a list of events - err := json.Unmarshal(bMessage, &mapObject) - if err == nil { - if len(mapObject[expandEventListFromField]) > 0 { - for _, ms := range mapObject[expandEventListFromField] { - js, err := json.Marshal(ms) - if err == nil { - messages = append(messages, string(js)) - a.metrics.receivedEvents.Inc() - } else { - a.log.Errorw(fmt.Sprintf("serializing message %s", ms), "error", err) - } - } - } - } else { - a.log.Debugf("deserializing multiple messages to a `records` object returning error: %s", err) - // in some cases the message is an array - var arrayObject []interface{} - err = json.Unmarshal(bMessage, &arrayObject) - if err != nil { - // return entire message - a.log.Debugf("deserializing multiple messages to an array returning error: %s", err) - a.metrics.decodeErrors.Inc() - return []string{string(bMessage)} - } - - for _, ms := range arrayObject { - js, err := json.Marshal(ms) - if err == nil { - messages = append(messages, string(js)) - a.metrics.receivedEvents.Inc() - } else { - a.log.Errorw(fmt.Sprintf("serializing message %s", ms), "error", err) - } - } - } - - return messages -} - // Strip connection string to remove sensitive information // A connection string should look like this: // Endpoint=sb://dummynamespace.servicebus.windows.net/;SharedAccessKeyName=DummyAccessKeyName;SharedAccessKey=5dOntTRytoC24opYThisAsit3is2B+OGY1US/fuL3ly= diff --git a/x-pack/filebeat/input/azureeventhub/input_test.go b/x-pack/filebeat/input/azureeventhub/input_test.go index a5fab488dfc3..64f0a1b68e6f 100644 --- a/x-pack/filebeat/input/azureeventhub/input_test.go +++ b/x-pack/filebeat/input/azureeventhub/input_test.go @@ -12,20 +12,19 @@ import ( "testing" "time" + "github.com/Azure/go-autorest/autorest/azure" + "github.com/elastic/elastic-agent-libs/logp" - "github.com/elastic/elastic-agent-libs/mapstr" + "github.com/elastic/elastic-agent-libs/monitoring" eventhub "github.com/Azure/azure-event-hubs-go/v3" "github.com/stretchr/testify/assert" - "github.com/elastic/beats/v7/filebeat/channel" - "github.com/elastic/beats/v7/filebeat/input/inputtest" "github.com/elastic/beats/v7/libbeat/beat" - conf "github.com/elastic/elastic-agent-libs/config" ) -var config = azureInputConfig{ +var defaultTestConfig = azureInputConfig{ SAKey: "", SAName: "", SAContainer: ephContainerName, @@ -33,21 +32,38 @@ var config = azureInputConfig{ ConsumerGroup: "", } +func TestGetAzureEnvironment(t *testing.T) { + resMan := "" + env, err := getAzureEnvironment(resMan) + assert.NoError(t, err) + assert.Equal(t, env, azure.PublicCloud) + resMan = "https://management.microsoftazure.de/" + env, err = getAzureEnvironment(resMan) + assert.NoError(t, err) + assert.Equal(t, env, azure.GermanCloud) + resMan = "http://management.invalidhybrid.com/" + _, err = getAzureEnvironment(resMan) + assert.Errorf(t, err, "invalid character 'F' looking for beginning of value") + resMan = "" + env, err = getAzureEnvironment(resMan) + assert.NoError(t, err) + assert.Equal(t, env, azure.PublicCloud) +} + func TestProcessEvents(t *testing.T) { - // Stub outlet for receiving events generated by the input. - o := &stubOutleter{} - out, err := newStubOutlet(o) - if err != nil { - t.Fatal(err) - } + log := logp.NewLogger(fmt.Sprintf("%s test for input", inputName)) + reg := monitoring.NewRegistry() metrics := newInputMetrics("test", reg) defer metrics.Close() - input := azureInput{ - config: config, - metrics: metrics, - outlet: out, + fakePipelineClient := fakeClient{} + + input := eventHubInputV1{ + config: defaultTestConfig, + log: log, + metrics: metrics, + pipelineClient: &fakePipelineClient, } var sn int64 = 12 now := time.Now() @@ -67,12 +83,10 @@ func TestProcessEvents(t *testing.T) { Data: []byte(msg), SystemProperties: &properties, } - ok := input.processEvents(&ev, "0") - if !ok { - t.Fatal("OnEvent function returned false") - } - assert.Equal(t, len(o.Events), 1) - message, err := o.Events[0].Fields.GetValue("message") + input.processEvents(&ev) + + assert.Equal(t, len(fakePipelineClient.publishedEvents), 1) + message, err := fakePipelineClient.publishedEvents[0].Fields.GetValue("message") if err != nil { t.Fatal(err) } @@ -94,12 +108,16 @@ func TestParseMultipleRecords(t *testing.T) { metrics := newInputMetrics("test", reg) defer metrics.Close() - input := azureInput{ - metrics: metrics, - log: logp.NewLogger(fmt.Sprintf("%s test for input", inputName)), + fakePipelineClient := fakeClient{} + + input := eventHubInputV1{ + config: azureInputConfig{}, + log: logp.NewLogger(fmt.Sprintf("%s test for input", inputName)), + metrics: metrics, + pipelineClient: &fakePipelineClient, } - messages := input.parseMultipleRecords([]byte(msg)) + messages := input.unpackRecords([]byte(msg)) assert.NotNil(t, messages) assert.Equal(t, len(messages), 3) for _, ms := range messages { @@ -110,7 +128,7 @@ func TestParseMultipleRecords(t *testing.T) { msg1 := "[{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}," + "{\"test\":\"this is 2nd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}," + "{\"test\":\"this is 3rd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}]" - messages = input.parseMultipleRecords([]byte(msg1)) + messages = input.unpackRecords([]byte(msg1)) assert.NotNil(t, messages) assert.Equal(t, len(messages), 3) for _, ms := range messages { @@ -119,7 +137,7 @@ func TestParseMultipleRecords(t *testing.T) { // one event only msg2 := "{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}" - messages = input.parseMultipleRecords([]byte(msg2)) + messages = input.unpackRecords([]byte(msg2)) assert.NotNil(t, messages) assert.Equal(t, len(messages), 1) for _, ms := range messages { @@ -127,15 +145,16 @@ func TestParseMultipleRecords(t *testing.T) { } } -func TestNewInputDone(t *testing.T) { - config := mapstr.M{ - "connection_string": "Endpoint=sb://something", - "eventhub": "insights-operational-logs", - "storage_account": "someaccount", - "storage_account_key": "secret", - } - inputtest.AssertNotStartedInputCanBeDone(t, NewInput, &config) -} +//func TestNewInputDone(t *testing.T) { +// log := logp.NewLogger(fmt.Sprintf("%s test for input", inputName)) +// config := mapstr.M{ +// "connection_string": "Endpoint=sb://something", +// "eventhub": "insights-operational-logs", +// "storage_account": "someaccount", +// "storage_account_key": "secret", +// } +// inputtest.AssertNotStartedInputCanBeDone(t, NewInput, &config) +//} func TestStripConnectionString(t *testing.T) { tests := []struct { @@ -161,36 +180,22 @@ func TestStripConnectionString(t *testing.T) { } } -type stubOutleter struct { +// ackClient is a fake beat.Client that ACKs the published messages. +type fakeClient struct { sync.Mutex - cond *sync.Cond - done bool - Events []beat.Event + publishedEvents []beat.Event } -func newStubOutlet(stub *stubOutleter) (channel.Outleter, error) { - stub.cond = sync.NewCond(stub) - defer stub.Close() +func (c *fakeClient) Close() error { return nil } - connector := channel.ConnectorFunc(func(_ *conf.C, _ beat.ClientConfig) (channel.Outleter, error) { - return stub, nil - }) - return connector.ConnectWith(nil, beat.ClientConfig{ - Processing: beat.ProcessingConfig{}, - }) +func (c *fakeClient) Publish(event beat.Event) { + c.Lock() + defer c.Unlock() + c.publishedEvents = append(c.publishedEvents, event) } -func (o *stubOutleter) Close() error { - o.Lock() - defer o.Unlock() - o.done = true - return nil -} -func (o *stubOutleter) Done() <-chan struct{} { return nil } -func (o *stubOutleter) OnEvent(event beat.Event) bool { - o.Lock() - defer o.Unlock() - o.Events = append(o.Events, event) - o.cond.Broadcast() - return o.done +func (c *fakeClient) PublishAll(event []beat.Event) { + for _, e := range event { + c.Publish(e) + } } diff --git a/x-pack/filebeat/input/azureeventhub/metrics.go b/x-pack/filebeat/input/azureeventhub/metrics.go index 8aeabc57265f..efef262d2c01 100644 --- a/x-pack/filebeat/input/azureeventhub/metrics.go +++ b/x-pack/filebeat/input/azureeventhub/metrics.go @@ -3,7 +3,6 @@ // you may not use this file except in compliance with the Elastic License. //go:build !aix -// +build !aix package azureeventhub @@ -15,7 +14,7 @@ import ( "github.com/elastic/elastic-agent-libs/monitoring/adapter" ) -// newInputMetrics creates a new `*inputMetrics` to track metrics. +// newInputMetrics creates a new `*inputMetrics` to track input metrics. func newInputMetrics(id string, parentRegistry *monitoring.Registry) *inputMetrics { reg, unregister := inputmon.NewInputRegistry(inputName, id, parentRegistry) inputMetrics := inputMetrics{ diff --git a/x-pack/filebeat/input/azureeventhub/metrics_test.go b/x-pack/filebeat/input/azureeventhub/metrics_test.go index b6730c349566..52b9f008f5c7 100644 --- a/x-pack/filebeat/input/azureeventhub/metrics_test.go +++ b/x-pack/filebeat/input/azureeventhub/metrics_test.go @@ -117,18 +117,13 @@ func TestInputMetricsEventsReceived(t *testing.T) { reg := monitoring.NewRegistry() metrics := newInputMetrics("test", reg) - // Stub outlet for receiving events generated by the input. - o := &stubOutleter{} - out, err := newStubOutlet(o) - if err != nil { - t.Fatal(err) - } + fakeClient := fakeClient{} - input := azureInput{ - config: inputConfig, - metrics: metrics, - outlet: out, - log: log, + input := eventHubInputV1{ + config: inputConfig, + metrics: metrics, + pipelineClient: &fakeClient, + log: log, } ev := eventhub.Event{ @@ -136,13 +131,10 @@ func TestInputMetricsEventsReceived(t *testing.T) { SystemProperties: &properties, } - ok := input.processEvents(&ev, "0") - if !ok { - t.Fatal("OnEvent function returned false") - } + input.processEvents(&ev) - if ok := assert.Equal(t, len(tc.expectedRecords), len(o.Events)); ok { - for i, e := range o.Events { + if ok := assert.Equal(t, len(tc.expectedRecords), len(fakeClient.publishedEvents)); ok { + for i, e := range fakeClient.publishedEvents { msg, err := e.Fields.GetValue("message") if err != nil { t.Fatal(err) diff --git a/x-pack/filebeat/input/azureeventhub/sanitization_test.go b/x-pack/filebeat/input/azureeventhub/sanitization_test.go index f4d072f50363..6e2645c40d74 100644 --- a/x-pack/filebeat/input/azureeventhub/sanitization_test.go +++ b/x-pack/filebeat/input/azureeventhub/sanitization_test.go @@ -3,7 +3,6 @@ // you may not use this file except in compliance with the Elastic License. //go:build !aix -// +build !aix package azureeventhub @@ -31,15 +30,16 @@ func TestParseMultipleRecordsSanitization(t *testing.T) { metrics := newInputMetrics("test", reg) defer metrics.Close() - input := azureInput{ + input := eventHubInputV1{ config: azureInputConfig{ SanitizeOptions: []string{"SINGLE_QUOTES", "NEW_LINES"}, }, - metrics: metrics, - log: logp.NewLogger(fmt.Sprintf("%s test for input", inputName)), + log: logp.NewLogger(fmt.Sprintf("%s test for input", inputName)), + metrics: metrics, + pipelineClient: &fakeClient{}, } - messages := input.parseMultipleRecords([]byte(msg)) + messages := input.unpackRecords([]byte(msg)) assert.NotNil(t, messages) assert.Equal(t, len(messages), 3) for _, ms := range messages { diff --git a/x-pack/filebeat/input/azureeventhub/v1_input.go b/x-pack/filebeat/input/azureeventhub/v1_input.go new file mode 100644 index 000000000000..43573c812b64 --- /dev/null +++ b/x-pack/filebeat/input/azureeventhub/v1_input.go @@ -0,0 +1,359 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build !aix + +package azureeventhub + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/Azure/go-autorest/autorest/azure" + + eventhub "github.com/Azure/azure-event-hubs-go/v3" + "github.com/Azure/azure-event-hubs-go/v3/eph" + "github.com/Azure/azure-event-hubs-go/v3/storage" + "github.com/Azure/azure-storage-blob-go/azblob" + + v2 "github.com/elastic/beats/v7/filebeat/input/v2" + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/mapstr" +) + +// eventHubInputV1 is the Azure Event Hub input V1. +// +// This input uses the Azure Event Hub SDK v3 (legacy). +type eventHubInputV1 struct { + config azureInputConfig + log *logp.Logger + metrics *inputMetrics + processor *eph.EventProcessorHost + pipelineClient beat.Client +} + +// newEventHubInputV1 creates a new instance of the Azure Event Hub input V1. +// This input uses the Azure Event Hub SDK v3 (legacy). +func newEventHubInputV1(config azureInputConfig, logger *logp.Logger) (v2.Input, error) { + log := logger. + Named(inputName). + With( + "connection string", stripConnectionString(config.ConnectionString), + ) + + return &eventHubInputV1{ + config: config, + log: log, + }, nil +} + +func (in *eventHubInputV1) Name() string { + return inputName +} + +func (in *eventHubInputV1) Test(v2.TestContext) error { + return nil +} + +func (in *eventHubInputV1) Run( + inputContext v2.Context, + pipeline beat.Pipeline, +) error { + var err error + + // Create pipelineClient for publishing events. + in.pipelineClient, err = createPipelineClient(pipeline) + if err != nil { + return fmt.Errorf("failed to create pipeline pipelineClient: %w", err) + } + defer in.pipelineClient.Close() + + // Setup input metrics + in.metrics = newInputMetrics(inputContext.ID, nil) + defer in.metrics.Close() + + ctx := v2.GoContextFromCanceler(inputContext.Cancelation) + + // Initialize the input components + // in preparation for the main run loop. + err = in.setup(ctx) + if err != nil { + return err + } + + // Start the main run loop + err = in.run(ctx) + if err != nil { + in.log.Errorw("error running input", "error", err) + return err + } + + return nil +} + +// setup initializes the input components. +// +// The main components are: +// 1. Azure Storage Leaser / Checkpointer +// 2. Event Processor Host +// 3. Message handler +func (in *eventHubInputV1) setup(ctx context.Context) error { + + // ---------------------------------------------------- + // 1 — Create a new Azure Storage Leaser / Checkpointer + // ---------------------------------------------------- + + cred, err := azblob.NewSharedKeyCredential(in.config.SAName, in.config.SAKey) + if err != nil { + return err + } + + env, err := getAzureEnvironment(in.config.OverrideEnvironment) + if err != nil { + return err + } + + leaserCheckpointer, err := storage.NewStorageLeaserCheckpointer(cred, in.config.SAName, in.config.SAContainer, env) + if err != nil { + in.log.Errorw("error creating storage leaser checkpointer", "error", err) + return err + } + + in.log.Infof("storage leaser checkpointer created for container %q", in.config.SAContainer) + + // ------------------------------------------------ + // 2 — Create a new event processor host + // ------------------------------------------------ + + // adding a nil EventProcessorHostOption will break the code, + // this is why a condition is added and a.processor is assigned. + if in.config.ConsumerGroup != "" { + in.processor, err = eph.NewFromConnectionString( + ctx, + fmt.Sprintf("%s%s%s", in.config.ConnectionString, eventHubConnector, in.config.EventHubName), + leaserCheckpointer, + leaserCheckpointer, + eph.WithConsumerGroup(in.config.ConsumerGroup), + eph.WithNoBanner()) + } else { + in.processor, err = eph.NewFromConnectionString( + ctx, + fmt.Sprintf("%s%s%s", in.config.ConnectionString, eventHubConnector, in.config.EventHubName), + leaserCheckpointer, + leaserCheckpointer, + eph.WithNoBanner()) + } + if err != nil { + in.log.Errorw("error creating processor", "error", err) + return err + } + + in.log.Infof("event processor host created for event hub %q", in.config.EventHubName) + + // ------------------------------------------------ + // 3 — Register a message handler + // ------------------------------------------------ + + // register a message handler -- many can be registered + handlerID, err := in.processor.RegisterHandler(ctx, func(c context.Context, e *eventhub.Event) error { + + // Take the event message from the event hub, + // creates and publishes one (or more) events + // to the beats pipeline. + in.processEvents(e) + + // Why is this function always returning no error? + // + // The legacy SDK does not offer hooks to control + // checkpointing (it internally updates the checkpoint + // info after a successful handler execution). + // + // So we are keeping the existing behaviour (do not + // handle publish acks). + // + // On shutdown, Filebeat stops the input, waits for + // the output to process all the events in the queue. + return nil + }) + if err != nil { + in.log.Errorw("error registering handler", "error", err) + return err + } + + in.log.Infof("handler id: %q is registered\n", handlerID) + + return nil +} + +func (in *eventHubInputV1) run(ctx context.Context) error { + // Start handling messages from all the partitions balancing across + // multiple consumers. + // The processor can be stopped by calling `Close()` on the processor. + + // The `Start()` function is not an option because + // it waits for an `os.Interrupt` signal to stop + // the processor. + err := in.processor.StartNonBlocking(ctx) + if err != nil { + in.log.Errorw("error starting the processor", "error", err) + return err + } + defer func() { + in.log.Infof("%s input worker is stopping.", inputName) + err := in.processor.Close(context.Background()) + if err != nil { + in.log.Errorw("error while closing eventhostprocessor", "error", err) + } + in.log.Infof("%s input worker has stopped.", inputName) + }() + + in.log.Infof("%s input worker has started.", inputName) + + // wait for the context to be done + <-ctx.Done() + + return ctx.Err() +} + +func (in *eventHubInputV1) processEvents(event *eventhub.Event) { + processingStartTime := time.Now() + eventHubMetadata := mapstr.M{ + // The `partition_id` is not available in the + // current version of the SDK. + "eventhub": in.config.EventHubName, + "consumer_group": in.config.ConsumerGroup, + } + + // update the input metrics + in.metrics.receivedMessages.Inc() + in.metrics.receivedBytes.Add(uint64(len(event.Data))) + + records := in.unpackRecords(event.Data) + + for _, record := range records { + _, _ = eventHubMetadata.Put("offset", event.SystemProperties.Offset) + _, _ = eventHubMetadata.Put("sequence_number", event.SystemProperties.SequenceNumber) + _, _ = eventHubMetadata.Put("enqueued_time", event.SystemProperties.EnqueuedTime) + + event := beat.Event{ + // We set the timestamp to the processing + // start time as default value. + // + // Usually, the ingest pipeline replaces it + // with a value in the payload. + Timestamp: processingStartTime, + Fields: mapstr.M{ + "message": record, + "azure": eventHubMetadata, + }, + Private: event.Data, + } + + in.pipelineClient.Publish(event) + + in.metrics.sentEvents.Inc() + } + + in.metrics.processedMessages.Inc() + in.metrics.processingTime.Update(time.Since(processingStartTime).Nanoseconds()) +} + +// unpackRecords will try to split the message into multiple ones based on +// the group field provided by the configuration. +// +// `unpackRecords()` supports two types of messages: +// +// 1. A message with an object with a `records` +// field containing a list of events. +// 2. A message with a single event. +// +// (1) Here is an example of a message containing an object with +// a `records` field: +// +// { +// "records": [ +// { +// "time": "2019-12-17T13:43:44.4946995Z", +// "test": "this is some message" +// } +// ] +// } +// +// (2) Here is an example of a message with a single event: +// +// { +// "time": "2019-12-17T13:43:44.4946995Z", +// "test": "this is some message" +// } +// +// The Diagnostic Settings uses the single object with `records` +// fields (1) when exporting data from an Azure service to an +// event hub. This is the most common case. +func (in *eventHubInputV1) unpackRecords(bMessage []byte) []string { + var mapObject map[string][]interface{} + var messages []string + + // Clean up the message for known issues [1] where Azure services produce malformed JSON documents. + // Sanitization occurs if options are available and the message contains an invalid JSON. + // + // [1]: https://learn.microsoft.com/en-us/answers/questions/1001797/invalid-json-logs-produced-for-function-apps + if len(in.config.SanitizeOptions) != 0 && !json.Valid(bMessage) { + bMessage = sanitize(bMessage, in.config.SanitizeOptions...) + in.metrics.sanitizedMessages.Inc() + } + + // check if the message is a "records" object containing a list of events + err := json.Unmarshal(bMessage, &mapObject) + if err == nil { + if len(mapObject[expandEventListFromField]) > 0 { + for _, ms := range mapObject[expandEventListFromField] { + js, err := json.Marshal(ms) + if err == nil { + messages = append(messages, string(js)) + in.metrics.receivedEvents.Inc() + } else { + in.log.Errorw(fmt.Sprintf("serializing message %s", ms), "error", err) + } + } + } + } else { + in.log.Debugf("deserializing multiple messages to a `records` object returning error: %s", err) + // in some cases the message is an array + var arrayObject []interface{} + err = json.Unmarshal(bMessage, &arrayObject) + if err != nil { + // return entire message + in.log.Debugf("deserializing multiple messages to an array returning error: %s", err) + in.metrics.decodeErrors.Inc() + return []string{string(bMessage)} + } + + for _, ms := range arrayObject { + js, err := json.Marshal(ms) + if err == nil { + messages = append(messages, string(js)) + in.metrics.receivedEvents.Inc() + } else { + in.log.Errorw(fmt.Sprintf("serializing message %s", ms), "error", err) + } + } + } + + return messages +} + +func getAzureEnvironment(overrideResManager string) (azure.Environment, error) { + // if no override is set then the azure public cloud is used + if overrideResManager == "" || overrideResManager == "" { + return azure.PublicCloud, nil + } + if env, ok := environments[overrideResManager]; ok { + return env, nil + } + // can retrieve hybrid env from the resource manager endpoint + return azure.EnvironmentFromURL(overrideResManager) +} diff --git a/x-pack/filebeat/input/default-inputs/inputs_other.go b/x-pack/filebeat/input/default-inputs/inputs_other.go index 2fa63535dbbe..6c7708d7c0f4 100644 --- a/x-pack/filebeat/input/default-inputs/inputs_other.go +++ b/x-pack/filebeat/input/default-inputs/inputs_other.go @@ -13,6 +13,7 @@ import ( "github.com/elastic/beats/v7/x-pack/filebeat/input/awscloudwatch" "github.com/elastic/beats/v7/x-pack/filebeat/input/awss3" "github.com/elastic/beats/v7/x-pack/filebeat/input/azureblobstorage" + "github.com/elastic/beats/v7/x-pack/filebeat/input/azureeventhub" "github.com/elastic/beats/v7/x-pack/filebeat/input/benchmark" "github.com/elastic/beats/v7/x-pack/filebeat/input/cel" "github.com/elastic/beats/v7/x-pack/filebeat/input/cloudfoundry" @@ -31,6 +32,7 @@ import ( func xpackInputs(info beat.Info, log *logp.Logger, store beater.StateStore) []v2.Plugin { return []v2.Plugin{ azureblobstorage.Plugin(log, store), + azureeventhub.Plugin(log), cel.Plugin(log, store), cloudfoundry.Plugin(), entityanalytics.Plugin(log),