diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index b34b9fb93..d2e72b6eb 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -15,3 +15,4 @@ /pkg/timeutil/ @jmank88 /pkg/values/ @smartcontractkit/keystone /pkg/workflows/ @smartcontractkit/keystone +/pkg/beholder/ @smartcontractkit/realtime diff --git a/.mockery.yaml b/.mockery.yaml index 01391f6bd..95cf7acdf 100644 --- a/.mockery.yaml +++ b/.mockery.yaml @@ -32,3 +32,6 @@ packages: interfaces: CapabilitiesRegistry: Relayer: + github.com/smartcontractkit/chainlink-common/pkg/beholder/internal: + interfaces: + OTLPExporter: diff --git a/go.mod b/go.mod index 94065c9cb..9c210662f 100644 --- a/go.mod +++ b/go.mod @@ -1,12 +1,15 @@ module github.com/smartcontractkit/chainlink-common -go 1.21 +go 1.22 + +toolchain go1.22.6 require ( github.com/confluentinc/confluent-kafka-go/v2 v2.3.0 github.com/dominikbraun/graph v0.23.0 github.com/fxamacker/cbor/v2 v2.5.0 github.com/go-json-experiment/json v0.0.0-20231102232822-2e55bd4e08b0 + github.com/go-playground/validator/v10 v10.22.0 github.com/google/go-cmp v0.6.0 github.com/google/uuid v1.6.0 github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.1 @@ -29,8 +32,16 @@ require ( github.com/stretchr/testify v1.9.0 go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.52.0 go.opentelemetry.io/otel v1.28.0 + go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.0.0-20240801233905-f7977e064c9c + go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.28.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.28.0 + go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.4.0 + go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.28.0 + go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.28.0 + go.opentelemetry.io/otel/log v0.4.0 go.opentelemetry.io/otel/sdk v1.28.0 + go.opentelemetry.io/otel/sdk/log v0.4.0 + go.opentelemetry.io/otel/sdk/metric v1.28.0 go.opentelemetry.io/otel/trace v1.28.0 go.uber.org/goleak v1.3.0 go.uber.org/zap v1.27.0 @@ -50,13 +61,17 @@ require ( github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/fatih/color v1.14.1 // indirect + github.com/gabriel-vasile/mimetype v1.4.3 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect + github.com/go-playground/locales v0.14.1 // indirect + github.com/go-playground/universal-translator v0.18.1 // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect; indirec github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0 // indirect - github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.21.0 // indirect github.com/hashicorp/yamux v0.1.1 // indirect + github.com/leodido/go-urn v1.4.0 // indirect github.com/mailru/easyjson v0.7.7 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.17 // indirect @@ -74,15 +89,15 @@ require ( github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect github.com/x448/float16 v0.8.4 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.28.0 // indirect - go.opentelemetry.io/otel/metric v1.28.0 // indirect + go.opentelemetry.io/otel/metric v1.28.0 go.opentelemetry.io/proto/otlp v1.3.1 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/net v0.27.0 // indirect golang.org/x/sync v0.7.0 // indirect golang.org/x/sys v0.22.0 // indirect golang.org/x/text v0.16.0 // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20240711142825-46eb208f015d // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240711142825-46eb208f015d // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20240730163845-b1a4ccb954bf // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240730163845-b1a4ccb954bf // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 3b3797a39..5f25f0974 100644 --- a/go.sum +++ b/go.sum @@ -52,6 +52,8 @@ github.com/fatih/color v1.14.1 h1:qfhVLaG5s+nCROl1zJsZRxFeYrHLqWroPOQ8BWiNb4w= github.com/fatih/color v1.14.1/go.mod h1:2oHN61fhTpgcxD3TSWCgKDiH1+x4OiDVVGH8WlgGZGg= github.com/fxamacker/cbor/v2 v2.5.0 h1:oHsG0V/Q6E/wqTS2O1Cozzsy69nqCiguo5Q1a1ADivE= github.com/fxamacker/cbor/v2 v2.5.0/go.mod h1:TA1xS00nchWmaBnEIxPSE5oHLuJBAVvqrtAnWBwBCVo= +github.com/gabriel-vasile/mimetype v1.4.3 h1:in2uUcidCuFcDKtdcBxlR0rJ1+fsokWf+uqxgUFjbI0= +github.com/gabriel-vasile/mimetype v1.4.3/go.mod h1:d8uq/6HKRL6CGdk+aubisF/M5GcPfT7nKyLpA0lbSSk= github.com/go-json-experiment/json v0.0.0-20231102232822-2e55bd4e08b0 h1:ymLjT4f35nQbASLnvxEde4XOBL+Sn7rFuV+FOJqkljg= github.com/go-json-experiment/json v0.0.0-20231102232822-2e55bd4e08b0/go.mod h1:6daplAwHHGbUGib4990V3Il26O0OC4aRyvewaaAihaA= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= @@ -59,6 +61,14 @@ github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s= +github.com/go-playground/assert/v2 v2.2.0/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= +github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA= +github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY= +github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY= +github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY= +github.com/go-playground/validator/v10 v10.22.0 h1:k6HsTZ0sTnROkhS//R0O+55JgM8C4Bx7ia+JlgcnOao= +github.com/go-playground/validator/v10 v10.22.0/go.mod h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM= github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y= github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= @@ -100,8 +110,8 @@ github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.1 h1:qnpS github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.1/go.mod h1:lXGCsh6c22WGtjr+qGHj1otzZpV/1kwTMAqkwZsnWRU= github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0 h1:pRhl55Yx1eC7BZ1N+BBWwnKaMyD8uC+34TLdndZMAKk= github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0/go.mod h1:XKMd7iuf/RGPSMJ/U4HP0zS2Z9Fh8Ps9a+6X26m/tmI= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 h1:bkypFPDjIYGfCYD5mRBvpqxfYX1YCS1PXdKYWi8FsN0= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0/go.mod h1:P+Lt/0by1T8bfcF3z737NnSbmxQAppXMRziHUxPOC8k= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.21.0 h1:CWyXh/jylQWp2dtiV33mY4iSSp6yf4lmn+c7/tN+ObI= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.21.0/go.mod h1:nCLIt0w3Ept2NwF8ThLmrppXsfT07oC8k0XNDxd8sVU= github.com/hashicorp/consul/sdk v0.16.0 h1:SE9m0W6DEfgIVCJX7xU+iv/hUl4m/nxqMTnCdMxDpJ8= github.com/hashicorp/consul/sdk v0.16.0/go.mod h1:7pxqqhqoaPqnBnzXD1StKed62LqJeClzVsUEy85Zr0A= github.com/hashicorp/go-hclog v1.5.0 h1:bI2ocEMgcVlz55Oj1xZNBsVi900c7II+fWDyV9o+13c= @@ -124,6 +134,8 @@ github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ= +github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI= github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/linkedin/goavro/v2 v2.9.7/go.mod h1:UgQUb2N/pmueQYH9bfqFioWxzYCZXSfF8Jw03O5sjqA= @@ -229,14 +241,30 @@ go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.5 go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.52.0/go.mod h1:BMsdeOxN04K0L5FNUBfjFdvwWGNe/rkmSwH4Aelu/X0= go.opentelemetry.io/otel v1.28.0 h1:/SqNcYk+idO0CxKEUOtKQClMK/MimZihKYMruSMViUo= go.opentelemetry.io/otel v1.28.0/go.mod h1:q68ijF8Fc8CnMHKyzqL6akLO46ePnjkgfIMIjUIX9z4= +go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.0.0-20240801233905-f7977e064c9c h1:cj2Wk3ZNyGRDmQqjK0QJdS+teMQnvv7+uTiKNqnSjTo= +go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.0.0-20240801233905-f7977e064c9c/go.mod h1:I011FHqumHsou3RhN8DH/ZnkwFRS1rNyq4EAnu+2Twc= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.28.0 h1:U2guen0GhqH8o/G2un8f/aG/y++OuW6MyCo6hT9prXk= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.28.0/go.mod h1:yeGZANgEcpdx/WK0IvvRFC+2oLiMS2u4L/0Rj2M2Qr0= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.28.0 h1:3Q/xZUyC1BBkualc9ROb4G8qkH90LXEIICcs5zv1OYY= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.28.0/go.mod h1:s75jGIWA9OfCMzF0xr+ZgfrB5FEbbV7UuYo32ahUiFI= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.28.0 h1:R3X6ZXmNPRR8ul6i3WgFURCHzaXjHdm0karRG/+dj3s= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.28.0/go.mod h1:QWFXnDavXWwMx2EEcZsf3yxgEKAqsxQ+Syjp+seyInw= +go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.4.0 h1:0MH3f8lZrflbUWXVxyBg/zviDFdGE062uKh5+fu8Vv0= +go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.4.0/go.mod h1:Vh68vYiHY5mPdekTr0ox0sALsqjoVy0w3Os278yX5SQ= +go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.28.0 h1:BJee2iLkfRfl9lc7aFmBwkWxY/RI1RDdXepSF6y8TPE= +go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.28.0/go.mod h1:DIzlHs3DRscCIBU3Y9YSzPfScwnYnzfnCd4g8zA7bZc= +go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.28.0 h1:EVSnY9JbEEW92bEkIYOVMw4q1WJxIAGoFTrtYOzWuRQ= +go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.28.0/go.mod h1:Ea1N1QQryNXpCD0I1fdLibBAIpQuBkznMmkdKrapk1Y= +go.opentelemetry.io/otel/log v0.4.0 h1:/vZ+3Utqh18e8TPjuc3ecg284078KWrR8BRz+PQAj3o= +go.opentelemetry.io/otel/log v0.4.0/go.mod h1:DhGnQvky7pHy82MIRV43iXh3FlKN8UUKftn0KbLOq6I= go.opentelemetry.io/otel/metric v1.28.0 h1:f0HGvSl1KRAU1DLgLGFjrwVyismPlnuU6JD6bOeuA5Q= go.opentelemetry.io/otel/metric v1.28.0/go.mod h1:Fb1eVBFZmLVTMb6PPohq3TO9IIhUisDsbJoL/+uQW4s= go.opentelemetry.io/otel/sdk v1.28.0 h1:b9d7hIry8yZsgtbmM0DKyPWMMUMlK9NEKuIG4aBqWyE= go.opentelemetry.io/otel/sdk v1.28.0/go.mod h1:oYj7ClPUA7Iw3m+r7GeEjz0qckQRJK2B8zjcZEfu7Pg= +go.opentelemetry.io/otel/sdk/log v0.4.0 h1:1mMI22L82zLqf6KtkjrRy5BbagOTWdJsqMY/HSqILAA= +go.opentelemetry.io/otel/sdk/log v0.4.0/go.mod h1:AYJ9FVF0hNOgAVzUG/ybg/QttnXhUePWAupmCqtdESo= +go.opentelemetry.io/otel/sdk/metric v1.28.0 h1:OkuaKgKrgAbYrrY0t92c+cC+2F6hsFNnCQArXCKlg08= +go.opentelemetry.io/otel/sdk/metric v1.28.0/go.mod h1:cWPjykihLAPvXKi4iZc1dpER3Jdq2Z0YLse3moQUCpg= go.opentelemetry.io/otel/trace v1.28.0 h1:GhQ9cUuQGmNDd5BTCP2dAvv75RdMxEfTmYejp+lkx9g= go.opentelemetry.io/otel/trace v1.28.0/go.mod h1:jPyXzNPg6da9+38HEwElrQiHlVMTnVfM3/yv2OlIHaI= go.opentelemetry.io/proto/otlp v1.3.1 h1:TrMUixzpM0yuc/znrFTP9MMRh8trP93mkCiDVeXrui0= @@ -330,10 +358,10 @@ google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoA google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= google.golang.org/genproto v0.0.0-20210401141331-865547bb08e2/go.mod h1:9lPAdzaEmUacj36I+k7YKbEc5CXzPIeORRgDAUOu28A= -google.golang.org/genproto/googleapis/api v0.0.0-20240711142825-46eb208f015d h1:kHjw/5UfflP/L5EbledDrcG4C2597RtymmGRZvHiCuY= -google.golang.org/genproto/googleapis/api v0.0.0-20240711142825-46eb208f015d/go.mod h1:mw8MG/Qz5wfgYr6VqVCiZcHe/GJEfI+oGGDCohaVgB0= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240711142825-46eb208f015d h1:JU0iKnSg02Gmb5ZdV8nYsKEKsP6o/FGVWTrw4i1DA9A= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240711142825-46eb208f015d/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY= +google.golang.org/genproto/googleapis/api v0.0.0-20240730163845-b1a4ccb954bf h1:GillM0Ef0pkZPIB+5iO6SDK+4T9pf6TpaYR6ICD5rVE= +google.golang.org/genproto/googleapis/api v0.0.0-20240730163845-b1a4ccb954bf/go.mod h1:OFMYQFHJ4TM3JRlWDZhJbZfra2uqc3WLBZiaaqP4DtU= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240730163845-b1a4ccb954bf h1:liao9UHurZLtiEwBgT9LMOnKYsHze6eA6w1KQCMVN2Q= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240730163845-b1a4ccb954bf/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= diff --git a/pkg/beholder/client.go b/pkg/beholder/client.go new file mode 100644 index 000000000..cdebed97a --- /dev/null +++ b/pkg/beholder/client.go @@ -0,0 +1,332 @@ +package beholder + +import ( + "context" + "errors" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" + otellog "go.opentelemetry.io/otel/log" + otelglobal "go.opentelemetry.io/otel/log/global" + otelmetric "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/propagation" + sdklog "go.opentelemetry.io/otel/sdk/log" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + sdkresource "go.opentelemetry.io/otel/sdk/resource" + "go.opentelemetry.io/otel/sdk/trace" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + oteltrace "go.opentelemetry.io/otel/trace" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/insecure" +) + +type Emitter interface { + // Sends message with bytes and attributes to OTel Collector + Emit(ctx context.Context, body []byte, attrs map[string]any) error + // Sends message to OTel Collector + EmitMessage(ctx context.Context, m Message) error +} +type Client interface { + Logger() otellog.Logger + Tracer() oteltrace.Tracer + Meter() otelmetric.Meter + Emitter() Emitter + Close() error +} + +var _ Client = (*otelClient)(nil) + +type messageEmitter struct { + exporter sdklog.Exporter + messageLogger otellog.Logger +} + +type otelClient struct { + config Config + // Logger + logger otellog.Logger + // Tracer + tracer oteltrace.Tracer + // Meter + meter otelmetric.Meter + // Message Emitter + emitter Emitter + // Graceful shutdown for tracer, meter, logger providers + closeFunc func() error +} + +func NewClient( + config Config, + logger otellog.Logger, + tracer oteltrace.Tracer, + meter otelmetric.Meter, + emitter Emitter, + onClose func() error, +) Client { + return &otelClient{ + config: config, + logger: logger, + tracer: tracer, + meter: meter, + emitter: emitter, + closeFunc: onClose, + } +} + +// NewOtelClient creates a new BeholderClient with OTel exporter +func NewOtelClient(cfg Config, errorHandler errorHandlerFunc) (Client, error) { + factory := func(ctx context.Context, options ...otlploggrpc.Option) (sdklog.Exporter, error) { + return otlploggrpc.New(ctx, options...) + } + return newOtelClient(cfg, errorHandler, factory) +} + +// Used for testing to override the default exporter +type otlploggrpcFactory func(ctx context.Context, options ...otlploggrpc.Option) (sdklog.Exporter, error) + +func newOtelClient(cfg Config, errorHandler errorHandlerFunc, otlploggrpcNew otlploggrpcFactory) (Client, error) { + ctx := context.Background() + baseResource, err := newOtelResource(cfg) + if err != nil { + return nil, err + } + creds := insecure.NewCredentials() + if !cfg.InsecureConnection && cfg.CACertFile != "" { + creds, err = credentials.NewClientTLSFromFile(cfg.CACertFile, "") + if err != nil { + return nil, err + } + } + sharedLogExporter, err := otlploggrpcNew( + ctx, + otlploggrpc.WithTLSCredentials(creds), + otlploggrpc.WithEndpoint(cfg.OtelExporterGRPCEndpoint), + ) + if err != nil { + return nil, err + } + + // Logger + loggerProcessor := sdklog.NewBatchProcessor( + sharedLogExporter, + sdklog.WithExportTimeout(cfg.LogExportTimeout), // Default is 30s + ) + loggerAttributes := []attribute.KeyValue{ + attribute.String("beholder_data_type", "zap_log_message"), + } + loggerResource, err := sdkresource.Merge( + sdkresource.NewSchemaless(loggerAttributes...), + baseResource, + ) + if err != nil { + return nil, err + } + loggerProvider := sdklog.NewLoggerProvider( + sdklog.WithResource(loggerResource), + sdklog.WithProcessor(loggerProcessor), + ) + logger := loggerProvider.Logger(cfg.PackageName) + + // Set global logger provider + otelglobal.SetLoggerProvider(loggerProvider) + + // Tracer + tracerProvider, err := newTracerProvider(cfg, baseResource, creds) + if err != nil { + return nil, err + } + tracer := tracerProvider.Tracer(cfg.PackageName) + otel.SetTracerProvider(tracerProvider) + otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{})) + + // Meter + meterProvider, err := newMeterProvider(cfg, baseResource, creds) + if err != nil { + return nil, err + } + meter := meterProvider.Meter(cfg.PackageName) + otel.SetMeterProvider(meterProvider) + + // Message Emitter + messageLogProcessor := sdklog.NewBatchProcessor( + sharedLogExporter, + sdklog.WithExportTimeout(cfg.EmitterExportTimeout), // Default is 30s + ) + messageAttributes := []attribute.KeyValue{ + attribute.String("beholder_data_type", "custom_message"), + } + messageLoggerResource, err := sdkresource.Merge( + sdkresource.NewSchemaless(messageAttributes...), + baseResource, + ) + if err != nil { + return nil, err + } + messageLoggerProvider := sdklog.NewLoggerProvider( + sdklog.WithResource(messageLoggerResource), + sdklog.WithProcessor(messageLogProcessor), + ) + messageLogger := messageLoggerProvider.Logger(cfg.PackageName) + messageEmitter := newMessageEmitter(sharedLogExporter, messageLogger) + + setOtelErrorHandler(errorHandler) + + onClose := closeFunc(ctx, loggerProvider, messageLoggerProvider, tracerProvider, meterProvider) + + client := NewClient(cfg, logger, tracer, meter, messageEmitter, onClose) + + return client, nil +} + +type errorHandlerFunc func(err error) + +// Sets the global error handler for OpenTelemetry +func setOtelErrorHandler(h errorHandlerFunc) { + otel.SetErrorHandler(otel.ErrorHandlerFunc(h)) +} + +func newOtelResource(cfg Config) (resource *sdkresource.Resource, err error) { + extraResources, err := sdkresource.New( + context.Background(), + sdkresource.WithOS(), + sdkresource.WithContainer(), + sdkresource.WithHost(), + ) + if err != nil { + return nil, err + } + resource, err = sdkresource.Merge( + sdkresource.Default(), + extraResources, + ) + if err != nil { + return nil, err + } + // Add custom resource attributes + attrs := make([]attribute.KeyValue, 0, len(cfg.ResourceAttributes)) + for k, v := range cfg.ResourceAttributes { + attrs = append(attrs, attribute.String(k, v)) + } + resource, err = sdkresource.Merge( + sdkresource.NewSchemaless(attrs...), + resource, + ) + if err != nil { + return nil, err + } + return +} + +func newMessageEmitter( + exporter sdklog.Exporter, + messageLogger otellog.Logger, +) Emitter { + return messageEmitter{ + exporter: exporter, + messageLogger: messageLogger, + } +} + +// Emits logs the message, but does not wait for the message to be processed. +// Open question: what are pros/cons for using use map[]any vs use otellog.KeyValue +func (e messageEmitter) Emit(ctx context.Context, body []byte, attrs map[string]any) error { + message := NewMessage(body, attrs) + if err := message.Validate(); err != nil { + return err + } + e.messageLogger.Emit(ctx, message.OtelRecord()) + return nil +} + +func (e messageEmitter) EmitMessage(ctx context.Context, message Message) error { + if err := message.Validate(); err != nil { + return err + } + e.messageLogger.Emit(ctx, message.OtelRecord()) + return nil +} + +func (b *otelClient) Logger() otellog.Logger { + return b.logger +} + +func (b *otelClient) Tracer() oteltrace.Tracer { + return b.tracer +} + +func (b *otelClient) Meter() otelmetric.Meter { + return b.meter +} +func (b *otelClient) Emitter() Emitter { + return b.emitter +} + +func (b *otelClient) Close() error { + if b.closeFunc != nil { + return b.closeFunc() + } + return nil +} + +type otelProvider interface { + Shutdown(ctx context.Context) error +} + +// Returns function that finalizes all providers +func closeFunc(ctx context.Context, providers ...otelProvider) func() error { + return func() (err error) { + for _, provider := range providers { + err = errors.Join(err, provider.Shutdown(ctx)) + } + return + } +} + +func newTracerProvider(config Config, resource *sdkresource.Resource, creds credentials.TransportCredentials) (*sdktrace.TracerProvider, error) { + ctx := context.Background() + + exporter, err := otlptracegrpc.New(ctx, + otlptracegrpc.WithTLSCredentials(creds), + otlptracegrpc.WithEndpoint(config.OtelExporterGRPCEndpoint), + ) + if err != nil { + return nil, err + } + tp := sdktrace.NewTracerProvider( + sdktrace.WithBatcher(exporter, + trace.WithBatchTimeout(config.TraceBatchTimeout)), // Default is 5s + sdktrace.WithResource(resource), + sdktrace.WithSampler( + sdktrace.ParentBased( + sdktrace.TraceIDRatioBased(config.TraceSampleRate), + ), + ), + ) + return tp, nil +} + +func newMeterProvider(config Config, resource *sdkresource.Resource, creds credentials.TransportCredentials) (*sdkmetric.MeterProvider, error) { + ctx := context.Background() + + exporter, err := otlpmetricgrpc.New( + ctx, + otlpmetricgrpc.WithTLSCredentials(creds), + otlpmetricgrpc.WithEndpoint(config.OtelExporterGRPCEndpoint), + ) + if err != nil { + return nil, err + } + + mp := sdkmetric.NewMeterProvider( + sdkmetric.WithReader( + sdkmetric.NewPeriodicReader( + exporter, + sdkmetric.WithInterval(config.MetricReaderInterval), // Default is 10s + )), + sdkmetric.WithResource(resource), + ) + return mp, nil +} diff --git a/pkg/beholder/client_test.go b/pkg/beholder/client_test.go new file mode 100644 index 000000000..02f8e56fd --- /dev/null +++ b/pkg/beholder/client_test.go @@ -0,0 +1,273 @@ +package beholder + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc" + otellog "go.opentelemetry.io/otel/log" + sdklog "go.opentelemetry.io/otel/sdk/log" + + "github.com/smartcontractkit/chainlink-common/pkg/beholder/internal/mocks" +) + +type MockExporter struct { + mock.Mock + sdklog.Exporter +} + +func (m *MockExporter) Export(ctx context.Context, records []sdklog.Record) error { + args := m.Called(ctx, records) + return args.Error(0) +} + +func (m *MockExporter) Shutdown(ctx context.Context) error { + args := m.Called(ctx) + return args.Error(0) +} + +func (m *MockExporter) ForceFlush(ctx context.Context) error { + args := m.Called(ctx) + return args.Error(0) +} + +func TestClient(t *testing.T) { + defaultCustomAttributes := func() map[string]any { + return map[string]any{ + "int_key_1": 123, + "int64_key_1": int64(123), + "int32_key_1": int32(123), + "str_key_1": "str_val_1", + "bool_key_1": true, + "float_key_1": 123.456, + "byte_key_1": []byte("byte_val_1"), + "str_slice_key_1": []string{"str_val_1", "str_val_2"}, + "nil_key_1": nil, + "beholder_data_schema": "/schemas/ids/1001", // Required field, URI + } + } + defaultMessageBody := []byte("body bytes") + + tests := []struct { + name string + makeCustomAttributes func() map[string]any + messageBody []byte + messageCount int + exporterMockErrorCount int + exporterOutputExpected bool + messageGenerator func(client Client, messageBody []byte, customAttributes map[string]any) + }{ + { + name: "Test Emit", + makeCustomAttributes: defaultCustomAttributes, + messageBody: defaultMessageBody, + messageCount: 10, + exporterMockErrorCount: 0, + exporterOutputExpected: true, + messageGenerator: func(client Client, messageBody []byte, customAttributes map[string]any) { + err := client.Emitter().Emit(context.Background(), messageBody, customAttributes) + assert.NoError(t, err) + }, + }, { + name: "Test EmitMessage", + makeCustomAttributes: defaultCustomAttributes, + messageBody: defaultMessageBody, + messageCount: 10, + exporterMockErrorCount: 0, + exporterOutputExpected: true, + messageGenerator: func(client Client, messageBody []byte, customAttributes map[string]any) { + message := NewMessage(messageBody, customAttributes) + err := client.Emitter().EmitMessage(context.Background(), message) + assert.NoError(t, err) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + exporterMock := mocks.NewOTLPExporter(t) + defer exporterMock.AssertExpectations(t) + + otelErrorHandler := func(err error) { + t.Fatalf("otel error: %v", err) + } + // Override exporter factory which is used by BeholderClient + exporterFactory := func(context.Context, ...otlploggrpc.Option) (sdklog.Exporter, error) { + return exporterMock, nil + } + client, err := newOtelClient(DefaultConfig(), otelErrorHandler, exporterFactory) + if err != nil { + t.Fatalf("Error creating beholder client: %v", err) + } + // Number of messages to emit + done := make(chan struct{}, 1) + + // Simulate exporter error if configured + if tc.exporterMockErrorCount > 0 { + exporterMock.On("Export", mock.Anything, mock.Anything).Return(fmt.Errorf("an error occurred")).Times(tc.exporterMockErrorCount) + } + + customAttributes := tc.makeCustomAttributes() + + if tc.exporterOutputExpected { + exporterMock.On("Export", mock.Anything, mock.Anything).Return(nil).Once(). + Run(func(args mock.Arguments) { + assert.IsType(t, args.Get(1), []sdklog.Record{}, "Record type mismatch") + records := args.Get(1).([]sdklog.Record) + assert.Equal(t, tc.messageCount, len(records), "Record count mismatch") + record := records[0] + assert.Equal(t, tc.messageBody, record.Body().AsBytes(), "Record body mismatch") + actualAttributeKeys := map[string]struct{}{} + record.WalkAttributes(func(kv otellog.KeyValue) bool { + key := kv.Key + actualAttributeKeys[key] = struct{}{} + expectedValue, ok := customAttributes[key] + if !ok { + t.Fatalf("Record attribute key not found: %s", key) + } + expectedKv := OtelAttr(key, expectedValue) + equal := kv.Value.Equal(expectedKv.Value) + assert.True(t, equal, fmt.Sprintf("Record attributes mismatch for key %v", key)) + return true + }) + for key := range customAttributes { + if _, ok := actualAttributeKeys[key]; !ok { + t.Fatalf("Record attribute key not found: %s", key) + } + } + done <- struct{}{} + }) + } + for i := 0; i < tc.messageCount; i++ { + go tc.messageGenerator(client, tc.messageBody, customAttributes) + } + + select { + case <-done: + case <-time.After(10 * time.Second): + t.Fatalf("Timed out waiting for messages to be emitted") + } + }) + } +} + +func TestEmitterMessageValidation(t *testing.T) { + getEmitter := func(exporterMock *mocks.OTLPExporter) Emitter { + client, err := newOtelClient( + DefaultConfig(), + func(err error) { t.Fatalf("otel error: %v", err) }, + // Override exporter factory which is used by BeholderClient + func(context.Context, ...otlploggrpc.Option) (sdklog.Exporter, error) { + return exporterMock, nil + }, + ) + assert.NoError(t, err) + return client.Emitter() + } + + for _, tc := range []struct { + name string + attrs Attributes + exporterCalledTimes int + expectedError string + }{ + { + name: "Missing required attribute", + attrs: Attributes{ + "key": "value", + }, + exporterCalledTimes: 0, + expectedError: "'Metadata.BeholderDataSchema' Error:Field validation for 'BeholderDataSchema' failed on the 'required' tag", + }, + { + name: "Invalid URI", + attrs: Attributes{ + "beholder_data_schema": "example-schema", + }, + exporterCalledTimes: 0, + expectedError: "'Metadata.BeholderDataSchema' Error:Field validation for 'BeholderDataSchema' failed on the 'uri' tag", + }, + { + name: "Valid URI", + exporterCalledTimes: 1, + attrs: Attributes{ + "beholder_data_schema": "/example-schema/versions/1", + }, + expectedError: "", + }, + } { + t.Run(tc.name, func(t *testing.T) { + setupMock := func(exporterMock *mocks.OTLPExporter) (*mocks.OTLPExporter, <-chan struct{}) { + done := make(chan struct{}, tc.exporterCalledTimes) + if tc.exporterCalledTimes > 0 { + exporterMock.On("Export", mock.Anything, mock.Anything).Return(nil).Times(tc.exporterCalledTimes). + Run(func(args mock.Arguments) { + done <- struct{}{} + }) + } + return exporterMock, done + } + + assertError := func(err error, expected string) { + if tc.expectedError != "" { + assert.ErrorContains(t, err, expected) + } else { + assert.NoError(t, err) + } + } + + assertMock := func(exporterMock *mocks.OTLPExporter) { + if tc.exporterCalledTimes > 0 { + exporterMock.AssertExpectations(t) + } else { + exporterMock.AssertNotCalled(t, "Export") + } + } + + waitUntilSent := func(done <-chan struct{}) { + for range tc.exporterCalledTimes { + select { + case <-done: + case <-time.After(10 * time.Second): + t.Fatalf("Timed out waiting for messages to be emitted") + } + } + } + + setupTest := func() (emitter Emitter, message Message, assertExpectations func(err error)) { + exporterMock, done := setupMock(mocks.NewOTLPExporter(t)) + emitter = getEmitter(exporterMock) + message = NewMessage([]byte("test"), tc.attrs) + + assertExpectations = func(err error) { + assertError(err, tc.expectedError) + if err == nil { + waitUntilSent(done) + } + assertMock(exporterMock) + } + return + } + + t.Run("Emitter.EmitMessage", func(t *testing.T) { + emitter, message, assertExpectations := setupTest() + + err := emitter.EmitMessage(context.Background(), message) + + assertExpectations(err) + }) + + t.Run("Emitter.Emit", func(t *testing.T) { + emitter, message, assertExpectations := setupTest() + + err := emitter.Emit(context.Background(), message.Body, tc.attrs) + + assertExpectations(err) + }) + }) + } +} diff --git a/pkg/beholder/config.go b/pkg/beholder/config.go new file mode 100644 index 000000000..a7f7dd0b9 --- /dev/null +++ b/pkg/beholder/config.go @@ -0,0 +1,58 @@ +package beholder + +import ( + "time" + + otelattr "go.opentelemetry.io/otel/attribute" +) + +type Config struct { + InsecureConnection bool + CACertFile string + OtelExporterGRPCEndpoint string + + PackageName string + // OTel Resource + ResourceAttributes map[string]string + // EventEmitter + EmitterExportTimeout time.Duration + // OTel Trace + TraceSampleRate float64 + TraceBatchTimeout time.Duration + // OTel Metric + MetricReaderInterval time.Duration + // OTel Log + LogExportTimeout time.Duration +} + +var defaultOtelAttributes = map[string]string{ + "package_name": "beholder", +} + +func DefaultConfig() Config { + return Config{ + InsecureConnection: true, + CACertFile: "", + OtelExporterGRPCEndpoint: "localhost:4317", + PackageName: "beholder", + // Resource + ResourceAttributes: defaultOtelAttributes, + // EventEmitter + EmitterExportTimeout: 1 * time.Second, + // Trace + TraceSampleRate: 1, + TraceBatchTimeout: 1 * time.Second, + // Metric + MetricReaderInterval: 1 * time.Second, + // Log + LogExportTimeout: 1 * time.Second, + } +} + +func (c Config) Attributes() []otelattr.KeyValue { + attrs := make([]otelattr.KeyValue, 0, len(c.ResourceAttributes)) + for k, v := range c.ResourceAttributes { + attrs = append(attrs, otelattr.String(k, v)) + } + return attrs +} diff --git a/pkg/beholder/config_test.go b/pkg/beholder/config_test.go new file mode 100644 index 000000000..523376f60 --- /dev/null +++ b/pkg/beholder/config_test.go @@ -0,0 +1,38 @@ +package beholder_test + +import ( + "fmt" + "time" + + beholder "github.com/smartcontractkit/chainlink-common/pkg/beholder" +) + +const ( + packageName = "beholder" +) + +func ExampleConfig() { + config := beholder.Config{ + InsecureConnection: true, + CACertFile: "", + OtelExporterGRPCEndpoint: "localhost:4317", + PackageName: packageName, + // Resource + ResourceAttributes: map[string]string{ + "package_name": packageName, + "sender": "beholdeclient", + }, + // EventEmitter + EmitterExportTimeout: 1 * time.Second, + // Trace + TraceSampleRate: 1, + TraceBatchTimeout: 1 * time.Second, + // Metric + MetricReaderInterval: 1 * time.Second, + // Log + LogExportTimeout: 1 * time.Second, + } + fmt.Printf("%+v", config) + // Output: + // {InsecureConnection:true CACertFile: OtelExporterGRPCEndpoint:localhost:4317 PackageName:beholder ResourceAttributes:map[package_name:beholder sender:beholdeclient] EmitterExportTimeout:1s TraceSampleRate:1 TraceBatchTimeout:1s MetricReaderInterval:1s LogExportTimeout:1s} +} diff --git a/pkg/beholder/example_test.go b/pkg/beholder/example_test.go new file mode 100644 index 000000000..26043c93c --- /dev/null +++ b/pkg/beholder/example_test.go @@ -0,0 +1,157 @@ +package beholder_test + +import ( + "context" + "fmt" + "log" + "sync" + "time" + + otelattribute "go.opentelemetry.io/otel/attribute" + otellog "go.opentelemetry.io/otel/log" + oteltrace "go.opentelemetry.io/otel/trace" + "google.golang.org/protobuf/proto" + + "github.com/smartcontractkit/chainlink-common/pkg/beholder" + "github.com/smartcontractkit/chainlink-common/pkg/beholder/global" + "github.com/smartcontractkit/chainlink-common/pkg/beholder/pb" +) + +func ExampleClient() { + ctx := context.Background() + + client, err := beholder.NewOtelClient(beholder.DefaultConfig(), errorHandler) + if err != nil { + log.Fatalf("Error creating beholder client: %v", err) + } + var wg sync.WaitGroup + for i := range 3 { + wg.Add(1) + fmt.Printf("Emitting message %d\n", i) + go func(i int) { + // Create message metadata + metadata := beholder.Metadata{ + DonID: "test_don_id", + NetworkName: []string{"test_network"}, + NetworkChainID: "test_chain_id", + BeholderDataSchema: "/custom-message/versions/1", + } + // Create custom message + customMessage := beholder.Message{ + // Set protobuf message bytes as body + Body: newMessageBytes(i), + // Set metadata attributes + Attrs: metadata.Attributes().Add( + // Add custom attributes + "timestamp", time.Now().Unix(), + "sender", "example-client", + ), + } + // Get message emitter + em := client.Emitter() + // Emit custom message + err := em.EmitMessage(ctx, customMessage) + if err != nil { + log.Fatalf("Error emitting message: %v", err) + } + wg.Done() + }(i) + } + wg.Wait() + // Output: + // Emitting message 0 + // Emitting message 1 + // Emitting message 2 +} + +func newMessageBytes(i int) []byte { + // Create protobuf message + customMessagePb := &pb.CustomMessage{} + customMessagePb.BoolVal = true + customMessagePb.IntVal = int64(i) + customMessagePb.FloatVal = float32(i) + customMessagePb.StringVal = fmt.Sprintf("string-value-%d", i) + customMessagePb.BytesVal = []byte{byte(i)} + // Encode protobuf message + customMessageBytes, err := proto.Marshal(customMessagePb) + if err != nil { + log.Fatalf("Error encoding message: %v", err) + } + return customMessageBytes +} + +func errorHandler(e error) { + if e != nil { + log.Fatalf("otel error: %v", e) + } +} + +func asseetNoError(err error) { + if err != nil { + panic(err) + } +} + +func ExampleEmitter() { + ctx := context.Background() + // Initialize beholder client + c, err := beholder.NewOtelClient(beholder.DefaultConfig(), asseetNoError) + if err != nil { + log.Fatalf("Error creating beholder client: %v", err) + } + var client beholder.Client = c + + // Set global client so it will be accessible from anywhere through beholder/global functions + global.SetClient(&client) + // After that you can use global functions to get logger, tracer, meter, messageEmitter + logger, tracer, meter, messageEmitter := global.Logger(), global.Tracer(), global.Meter(), global.Emitter() + + fmt.Println("Emit otel log record") + logger.Emit(ctx, otellog.Record{}) + + fmt.Println("Create trace span") + ctx, span := tracer.Start(ctx, "ExampleGlobalClient", oteltrace.WithAttributes(otelattribute.String("key", "value"))) + defer span.End() + + fmt.Println("Create metric counter") + counter, _ := meter.Int64Counter("global_counter") + counter.Add(ctx, 1) + + fmt.Println("Emit custom message") + err = messageEmitter.Emit(ctx, []byte("test"), beholder.Attributes{ + "key": "value", + "beholder_data_schema": "/test/versions/1", + }) + if err != nil { + log.Fatalf("Error emitting message: %v", err) + } + // Output: + // Emit otel log record + // Create trace span + // Create metric counter + // Emit custom message +} + +func ExampleBootstrap() { + beholderConfig := beholder.DefaultConfig() + + // Bootstrap Beholder Client + err := global.Bootstrap(beholderConfig, errorHandler) + if err != nil { + log.Fatalf("Error bootstrapping Beholder: %v", err) + } + + payloadBytes := newMessageBytes(0) + + // Emit custom message + for range 3 { + err := global.Emit(context.Background(), payloadBytes, beholder.Attributes{ + "beholder_data_type": "custom_message", + "foo": "bar", + }) + if err != nil { + log.Printf("Error emitting message: %v", err) + } + } + // Output: +} diff --git a/pkg/beholder/global/global.go b/pkg/beholder/global/global.go new file mode 100644 index 000000000..350d3353e --- /dev/null +++ b/pkg/beholder/global/global.go @@ -0,0 +1,78 @@ +package global + +import ( + "context" + "sync/atomic" + + otellog "go.opentelemetry.io/otel/log" + otelmetric "go.opentelemetry.io/otel/metric" + oteltrace "go.opentelemetry.io/otel/trace" + + "github.com/smartcontractkit/chainlink-common/pkg/beholder" +) + +// Pointer to the global Beholder Client +var globalBeholderClient = defaultBeholderClient() + +// SetClient sets the global Beholder Client +func SetClient(client *beholder.Client) { + globalBeholderClient.Store(client) +} + +// Returns the global Beholder Client +// Its thread-safe and can be used concurrently +func GetClient() beholder.Client { + ptr := globalBeholderClient.Load() + return *ptr +} + +func Logger() otellog.Logger { + return GetClient().Logger() +} + +func Tracer() oteltrace.Tracer { + return GetClient().Tracer() +} + +func Meter() otelmetric.Meter { + return GetClient().Meter() +} + +func Emitter() beholder.Emitter { + return GetClient().Emitter() +} + +func SpanFromContext(ctx context.Context) oteltrace.Span { + return oteltrace.SpanFromContext(ctx) +} + +func defaultBeholderClient() *atomic.Pointer[beholder.Client] { + ptr := &atomic.Pointer[beholder.Client]{} + client := beholder.NewNoopClient() + ptr.Store(&client) + return ptr +} + +func EmitMessage(ctx context.Context, message beholder.Message) error { + return Emitter().EmitMessage(ctx, message) +} + +func Emit(ctx context.Context, body []byte, attrs beholder.Attributes) error { + return Emitter().Emit(ctx, body, attrs) +} + +func Bootstrap(cfg beholder.Config, errorHandler func(error)) error { + // Initialize beholder client + c, err := beholder.NewOtelClient(cfg, errorHandler) + if err != nil { + return err + } + var client beholder.Client = c + // Set global client so it will be accessible from anywhere through beholder/global functions + SetClient(&client) + return nil +} + +func NewConfig() beholder.Config { + return beholder.DefaultConfig() +} diff --git a/pkg/beholder/global/global_test.go b/pkg/beholder/global/global_test.go new file mode 100644 index 000000000..e5c3cefbe --- /dev/null +++ b/pkg/beholder/global/global_test.go @@ -0,0 +1,58 @@ +package global_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + + otelattribute "go.opentelemetry.io/otel/attribute" + otellog "go.opentelemetry.io/otel/log" + otellognoop "go.opentelemetry.io/otel/log/noop" + otelmetricnoop "go.opentelemetry.io/otel/metric/noop" + oteltrace "go.opentelemetry.io/otel/trace" + oteltracenoop "go.opentelemetry.io/otel/trace/noop" + + "github.com/smartcontractkit/chainlink-common/pkg/beholder" + "github.com/smartcontractkit/chainlink-common/pkg/beholder/global" +) + +func TestGlobal(t *testing.T) { + // Get global logger, tracer, meter, messageEmitter + // If not initialized with global.SetClient will return noop client + logger, tracer, meter, messageEmitter := global.Logger(), global.Tracer(), global.Meter(), global.Emitter() + noopClient := beholder.NewNoopClient() + assert.IsType(t, otellognoop.Logger{}, logger) + assert.IsType(t, oteltracenoop.Tracer{}, tracer) + assert.IsType(t, otelmetricnoop.Meter{}, meter) + expectedMessageEmitter := beholder.NewNoopClient().Emitter() + assert.IsType(t, expectedMessageEmitter, messageEmitter) + + assert.IsType(t, noopClient, global.GetClient()) + assert.NotSame(t, noopClient, global.GetClient()) + + // Set global client so it will be accessible from anywhere through beholder/global functions + var client beholder.Client = noopClient + global.SetClient(&client) + assert.Same(t, noopClient, global.GetClient()) + + // After that use global functions to get logger, tracer, meter, messageEmitter + logger, tracer, meter, messageEmitter = global.Logger(), global.Tracer(), global.Meter(), global.Emitter() + + // Emit otel log record + logger.Emit(context.Background(), otellog.Record{}) + + // Create trace span + ctx, span := tracer.Start(context.Background(), "ExampleGlobalClient", oteltrace.WithAttributes(otelattribute.String("key", "value"))) + defer span.End() + + // Create metric counter + counter, _ := meter.Int64Counter("global_counter") + counter.Add(context.Background(), 1) + + // Emit custom message + err := messageEmitter.Emit(ctx, []byte("test"), beholder.Attributes{"key": "value"}) + if err != nil { + t.Fatalf("Error emitting message: %v", err) + } +} diff --git a/pkg/beholder/internal/exporter.go b/pkg/beholder/internal/exporter.go new file mode 100644 index 000000000..271077a5c --- /dev/null +++ b/pkg/beholder/internal/exporter.go @@ -0,0 +1,18 @@ +package internal + +import ( + "context" + + "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc" + sdklog "go.opentelemetry.io/otel/sdk/log" +) + +var _ sdklog.Exporter = (*otlploggrpc.Exporter)(nil) +var _ OTLPExporter = (*otlploggrpc.Exporter)(nil) + +// Copy of sdklog.Exporter interface, used for mocking +type OTLPExporter interface { + Export(ctx context.Context, records []sdklog.Record) error + Shutdown(ctx context.Context) error + ForceFlush(ctx context.Context) error +} diff --git a/pkg/beholder/internal/mocks/otlp_exporter.go b/pkg/beholder/internal/mocks/otlp_exporter.go new file mode 100644 index 000000000..c5feb8aa2 --- /dev/null +++ b/pkg/beholder/internal/mocks/otlp_exporter.go @@ -0,0 +1,177 @@ +// Code generated by mockery v2.43.2. DO NOT EDIT. + +package mocks + +import ( + context "context" + + log "go.opentelemetry.io/otel/sdk/log" + + mock "github.com/stretchr/testify/mock" +) + +// OTLPExporter is an autogenerated mock type for the OTLPExporter type +type OTLPExporter struct { + mock.Mock +} + +type OTLPExporter_Expecter struct { + mock *mock.Mock +} + +func (_m *OTLPExporter) EXPECT() *OTLPExporter_Expecter { + return &OTLPExporter_Expecter{mock: &_m.Mock} +} + +// Export provides a mock function with given fields: ctx, records +func (_m *OTLPExporter) Export(ctx context.Context, records []log.Record) error { + ret := _m.Called(ctx, records) + + if len(ret) == 0 { + panic("no return value specified for Export") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, []log.Record) error); ok { + r0 = rf(ctx, records) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// OTLPExporter_Export_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Export' +type OTLPExporter_Export_Call struct { + *mock.Call +} + +// Export is a helper method to define mock.On call +// - ctx context.Context +// - records []log.Record +func (_e *OTLPExporter_Expecter) Export(ctx interface{}, records interface{}) *OTLPExporter_Export_Call { + return &OTLPExporter_Export_Call{Call: _e.mock.On("Export", ctx, records)} +} + +func (_c *OTLPExporter_Export_Call) Run(run func(ctx context.Context, records []log.Record)) *OTLPExporter_Export_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].([]log.Record)) + }) + return _c +} + +func (_c *OTLPExporter_Export_Call) Return(_a0 error) *OTLPExporter_Export_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *OTLPExporter_Export_Call) RunAndReturn(run func(context.Context, []log.Record) error) *OTLPExporter_Export_Call { + _c.Call.Return(run) + return _c +} + +// ForceFlush provides a mock function with given fields: ctx +func (_m *OTLPExporter) ForceFlush(ctx context.Context) error { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for ForceFlush") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = rf(ctx) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// OTLPExporter_ForceFlush_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ForceFlush' +type OTLPExporter_ForceFlush_Call struct { + *mock.Call +} + +// ForceFlush is a helper method to define mock.On call +// - ctx context.Context +func (_e *OTLPExporter_Expecter) ForceFlush(ctx interface{}) *OTLPExporter_ForceFlush_Call { + return &OTLPExporter_ForceFlush_Call{Call: _e.mock.On("ForceFlush", ctx)} +} + +func (_c *OTLPExporter_ForceFlush_Call) Run(run func(ctx context.Context)) *OTLPExporter_ForceFlush_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *OTLPExporter_ForceFlush_Call) Return(_a0 error) *OTLPExporter_ForceFlush_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *OTLPExporter_ForceFlush_Call) RunAndReturn(run func(context.Context) error) *OTLPExporter_ForceFlush_Call { + _c.Call.Return(run) + return _c +} + +// Shutdown provides a mock function with given fields: ctx +func (_m *OTLPExporter) Shutdown(ctx context.Context) error { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for Shutdown") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = rf(ctx) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// OTLPExporter_Shutdown_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Shutdown' +type OTLPExporter_Shutdown_Call struct { + *mock.Call +} + +// Shutdown is a helper method to define mock.On call +// - ctx context.Context +func (_e *OTLPExporter_Expecter) Shutdown(ctx interface{}) *OTLPExporter_Shutdown_Call { + return &OTLPExporter_Shutdown_Call{Call: _e.mock.On("Shutdown", ctx)} +} + +func (_c *OTLPExporter_Shutdown_Call) Run(run func(ctx context.Context)) *OTLPExporter_Shutdown_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *OTLPExporter_Shutdown_Call) Return(_a0 error) *OTLPExporter_Shutdown_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *OTLPExporter_Shutdown_Call) RunAndReturn(run func(context.Context) error) *OTLPExporter_Shutdown_Call { + _c.Call.Return(run) + return _c +} + +// NewOTLPExporter creates a new instance of OTLPExporter. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewOTLPExporter(t interface { + mock.TestingT + Cleanup(func()) +}) *OTLPExporter { + mock := &OTLPExporter{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/pkg/beholder/message.go b/pkg/beholder/message.go new file mode 100644 index 000000000..d6e7964af --- /dev/null +++ b/pkg/beholder/message.go @@ -0,0 +1,256 @@ +package beholder + +import ( + "fmt" + "reflect" + + "github.com/go-playground/validator/v10" + "go.opentelemetry.io/otel/attribute" + otellog "go.opentelemetry.io/otel/log" + otelsdklog "go.opentelemetry.io/otel/sdk/log" +) + +type Message struct { + Attrs map[string]any + Body []byte +} + +type Metadata struct { + // REQUIRED FIELDS + // Schema Registry URI to fetch schema + BeholderDataSchema string `validate:"required,uri"` + + // OPTIONAL FIELDS + // The version of the CL node. + NodeVersion string + // mTLS public key for the node operator. This is used as an identity key but with the added benefit of being able to provide signatures. + NodeCsaKey string + // Signature from CSA private key. + NodeCsaSignature string + DonID string + // The RDD network name the CL node is operating with. + NetworkName []string + WorkflowID string + WorkflowName string + WorkflowOwnerAddress string + // Hash of the workflow spec. + WorkflowSpecID string + // The unique execution of a workflow. + WorkflowExecutionID string + // The address for the contract. + CapabilityContractAddress string + CapabilityID string + CapabilityVersion string + CapabilityName string + NetworkChainID string +} + +func (m Metadata) Attributes() Attributes { + attrs := make(Attributes, reflect.ValueOf(m).NumField()) + attrs["node_version"] = m.NodeVersion + attrs["node_csa_key"] = m.NodeCsaKey + attrs["node_csa_signature"] = m.NodeCsaSignature + attrs["don_id"] = m.DonID + attrs["network_name"] = m.NetworkName + attrs["workflow_id"] = m.WorkflowID + attrs["workflow_name"] = m.WorkflowName + attrs["workflow_owner_address"] = m.WorkflowOwnerAddress + attrs["workflow_spec_id"] = m.WorkflowSpecID + attrs["workflow_execution_id"] = m.WorkflowExecutionID + attrs["beholder_data_schema"] = m.BeholderDataSchema + attrs["capability_contract_address"] = m.CapabilityContractAddress + attrs["capability_id"] = m.CapabilityID + attrs["capability_version"] = m.CapabilityVersion + attrs["capability_name"] = m.CapabilityName + attrs["network_chain_id"] = m.NetworkChainID + return attrs +} + +type Attributes map[string]any + +func NewAttributes(args ...any) Attributes { + attrs := make(Attributes, len(args)/2) + attrs.Add(args...) + return attrs +} + +func (a Attributes) Add(args ...any) Attributes { + for i := 1; i < len(args); i += 2 { + if key, ok := args[i-1].(string); ok { + val := args[i] + a[key] = val + } + } + return a +} + +func NewMessage(body []byte, attrs Attributes) Message { + return Message{ + Body: body, + Attrs: attrs, + } +} + +func (e *Message) AddAttributes(attrs Attributes) { + if e.Attrs == nil { + e.Attrs = make(map[string]any, len(attrs)) + } + for k, v := range attrs { + e.Attrs[k] = v + } +} + +func (e *Message) AddOtelAttributes(attrs ...attribute.KeyValue) { + if e.Attrs == nil { + e.Attrs = make(map[string]any, len(attrs)) + } + for _, v := range attrs { + e.Attrs[string(v.Key)] = v.Value + } +} + +func (e *Message) OtelRecord() otellog.Record { + return newRecord(e.Body, e.Attrs) +} + +func (e *Message) SdkOtelRecord() otelsdklog.Record { + return newSdkRecord(e.Body, e.Attrs) +} + +func (e *Message) Copy() Message { + attrs := make(Attributes, len(e.Attrs)) + for k, v := range e.Attrs { + attrs[k] = v + } + c := Message{ + Attrs: attrs, + } + if e.Body != nil { + c.Body = make([]byte, len(e.Body)) + copy(c.Body, e.Body) + } + return c +} + +// Creates otellog.Record from body and attributes +func newRecord(body []byte, attrs map[string]any) otellog.Record { + otelRecord := otellog.Record{} + if body != nil { + otelRecord.SetBody(otellog.BytesValue(body)) + } + for k, v := range attrs { + otelRecord.AddAttributes(OtelAttr(k, v)) + } + return otelRecord +} + +// Creates otelsdklog.Record from body and attributes +// NOTE: internal function otelsdklog.newRecord returns value not pointer +func newSdkRecord(body []byte, attrs map[string]any) otelsdklog.Record { + sdkRecord := otelsdklog.Record{} + if body != nil { + sdkRecord.SetBody(otellog.BytesValue(body)) + } + for k, v := range attrs { + sdkRecord.AddAttributes(OtelAttr(k, v)) + } + return sdkRecord +} + +func OtelAttr(key string, value any) otellog.KeyValue { + switch v := value.(type) { + case string: + return otellog.String(key, v) + case []string: + vals := make([]otellog.Value, 0, len(v)) + for _, s := range v { + vals = append(vals, otellog.StringValue(s)) + } + return otellog.Slice(key, vals...) + case int64: + return otellog.Int64(key, v) + case int: + return otellog.Int(key, v) + case float64: + return otellog.Float64(key, v) + case bool: + return otellog.Bool(key, v) + case []byte: + return otellog.Bytes(key, v) + case nil: + return otellog.Empty(key) + case otellog.Value: + return otellog.KeyValue{Key: key, Value: v} + case attribute.Value: + return OtelAttr(key, v.AsInterface()) + default: + return otellog.String(key, fmt.Sprintf("", v)) + } +} + +func (e Message) String() string { + return fmt.Sprintf("Message{Attrs: %v, Body: %v}", e.Attrs, e.Body) +} + +// Sets metadata fields from attributes +func (m *Metadata) FromAttributes(attrs Attributes) *Metadata { + for k, v := range attrs { + switch k { + case "node_version": + m.NodeVersion = v.(string) + case "node_csa_key": + m.NodeCsaKey = v.(string) + case "node_csa_signature": + m.NodeCsaSignature = v.(string) + case "don_id": + m.DonID = v.(string) + case "network_name": + m.NetworkName = v.([]string) + case "workflow_id": + m.WorkflowID = v.(string) + case "workflow_name": + m.WorkflowName = v.(string) + case "workflow_owner_address": + m.WorkflowOwnerAddress = v.(string) + case "workflow_spec_id": + m.WorkflowSpecID = v.(string) + case "workflow_execution_id": + m.WorkflowExecutionID = v.(string) + case "beholder_data_schema": + m.BeholderDataSchema = v.(string) + case "capability_contract_address": + m.CapabilityContractAddress = v.(string) + case "capability_id": + m.CapabilityID = v.(string) + case "capability_version": + m.CapabilityVersion = v.(string) + case "capability_name": + m.CapabilityName = v.(string) + case "network_chain_id": + m.NetworkChainID = v.(string) + } + } + return m +} + +func NewMetadata(attrs Attributes) *Metadata { + m := &Metadata{} + m.FromAttributes(attrs) + return m +} + +func (m *Metadata) Validate() error { + validate := validator.New() + return validate.Struct(m) +} + +func (e Message) Validate() error { + if e.Body == nil { + return fmt.Errorf("message body is required") + } + if len(e.Attrs) == 0 { + return fmt.Errorf("message attributes are required") + } + metadata := NewMetadata(e.Attrs) + return metadata.Validate() +} diff --git a/pkg/beholder/message_test.go b/pkg/beholder/message_test.go new file mode 100644 index 000000000..a547af5b0 --- /dev/null +++ b/pkg/beholder/message_test.go @@ -0,0 +1,131 @@ +package beholder_test + +import ( + "fmt" + "testing" + + "github.com/go-playground/validator/v10" + "github.com/stretchr/testify/assert" + + "github.com/smartcontractkit/chainlink-common/pkg/beholder" +) + +func ExampleMessage() { + // Create message with body and attributes + e1 := beholder.NewMessage([]byte{1}, beholder.Attributes{"key_string": "value"}) + fmt.Println("#1", e1) + // Create attributes + additionalAttributes := beholder.Attributes{ + "key_string": "new value", + "key_int32": int32(1), + } + // Add more attributes + additionalAttributes.Add( + "key_string", "updated value", // this will overrider previous value + "key_int32", int32(2), + "key3", true, + ) + // Add attributes to message + e1.AddAttributes(additionalAttributes) + fmt.Println("#2", e1) + // Create empty message struct + e2 := beholder.Message{} + fmt.Println("#3", e2) + // Add attributes to message + e2.AddAttributes(beholder.Attributes{"key_int": 1}) + fmt.Println("#4", e2) + // Update attribute key_int + e2.AddAttributes(beholder.Attributes{"key_int": 2}) + fmt.Println("#5", e2) + // Set message body + e2.Body = []byte("0123") + fmt.Println("#6", e2) + // Reset attributes + e2.Attrs = beholder.Attributes{} + fmt.Println("#7", e2) + // Reset body + e2.Body = nil + fmt.Println("#8", e2) + // Shalow copy of message + e3 := beholder.NewMessage(e1.Body, e1.Attrs) + fmt.Println("#9", e3) + e1.Body[0] = byte(2) // Wil mutate e3 + fmt.Println("#10", e3) + // Deep copy + e4 := e1.Copy() + fmt.Println("#11", e4) + e1.Body[0] = byte(3) // Should not mutate e4 + fmt.Println("#12", e4) + // Output: + // #1 Message{Attrs: map[key_string:value], Body: [1]} + // #2 Message{Attrs: map[key3:true key_int32:2 key_string:updated value], Body: [1]} + // #3 Message{Attrs: map[], Body: []} + // #4 Message{Attrs: map[key_int:1], Body: []} + // #5 Message{Attrs: map[key_int:2], Body: []} + // #6 Message{Attrs: map[key_int:2], Body: [48 49 50 51]} + // #7 Message{Attrs: map[], Body: [48 49 50 51]} + // #8 Message{Attrs: map[], Body: []} + // #9 Message{Attrs: map[key3:true key_int32:2 key_string:updated value], Body: [1]} + // #10 Message{Attrs: map[key3:true key_int32:2 key_string:updated value], Body: [2]} + // #11 Message{Attrs: map[key3:true key_int32:2 key_string:updated value], Body: [2]} + // #12 Message{Attrs: map[key3:true key_int32:2 key_string:updated value], Body: [2]} +} + +func testMetadata() beholder.Metadata { + return beholder.Metadata{ + NodeVersion: "v1.0.0", + NodeCsaKey: "test_key", + NodeCsaSignature: "test_signature", + DonID: "test_don_id", + NetworkName: []string{"test_network"}, + WorkflowID: "test_workflow_id", + WorkflowName: "test_workflow_name", + WorkflowOwnerAddress: "test_owner_address", + WorkflowSpecID: "test_spec_id", + WorkflowExecutionID: "test_execution_id", + BeholderDataSchema: "/schemas/ids/test_schema", // required field, URI + CapabilityContractAddress: "test_contract_address", + CapabilityID: "test_capability_id", + CapabilityVersion: "test_capability_version", + CapabilityName: "test_capability_name", + NetworkChainID: "test_chain_id", + } +} +func ExampleMetadata() { + m := testMetadata() + fmt.Println(m) + fmt.Println(m.Attributes()) + // Output: + // {/schemas/ids/test_schema v1.0.0 test_key test_signature test_don_id [test_network] test_workflow_id test_workflow_name test_owner_address test_spec_id test_execution_id test_contract_address test_capability_id test_capability_version test_capability_name test_chain_id} + // map[beholder_data_schema:/schemas/ids/test_schema capability_contract_address:test_contract_address capability_id:test_capability_id capability_name:test_capability_name capability_version:test_capability_version don_id:test_don_id network_chain_id:test_chain_id network_name:[test_network] node_csa_key:test_key node_csa_signature:test_signature node_version:v1.0.0 workflow_execution_id:test_execution_id workflow_id:test_workflow_id workflow_name:test_workflow_name workflow_owner_address:test_owner_address workflow_spec_id:test_spec_id] +} + +func ExampleValidate() { + validate := validator.New() + + metadata := beholder.Metadata{} + if err := validate.Struct(metadata); err != nil { + fmt.Println(err) + } + metadata.BeholderDataSchema = "example.proto" + if err := validate.Struct(metadata); err != nil { + fmt.Println(err) + } + metadata.BeholderDataSchema = "/schemas/ids/test_schema" + if err := validate.Struct(metadata); err != nil { + fmt.Println(err) + } else { + fmt.Println("Metadata is valid") + } + // Output: + // Key: 'Metadata.BeholderDataSchema' Error:Field validation for 'BeholderDataSchema' failed on the 'required' tag + // Key: 'Metadata.BeholderDataSchema' Error:Field validation for 'BeholderDataSchema' failed on the 'uri' tag + // Metadata is valid +} + +func TestAttributesConversion(t *testing.T) { + expected := testMetadata() + attrs := expected.Attributes() + actual := beholder.NewMetadata(attrs) + assert.Equal(t, expected, *actual) +} diff --git a/pkg/beholder/noop.go b/pkg/beholder/noop.go new file mode 100644 index 000000000..3df416934 --- /dev/null +++ b/pkg/beholder/noop.go @@ -0,0 +1,91 @@ +package beholder + +import ( + "context" + "fmt" + "time" + + "go.opentelemetry.io/otel/exporters/stdout/stdoutlog" + "go.opentelemetry.io/otel/exporters/stdout/stdoutmetric" + "go.opentelemetry.io/otel/exporters/stdout/stdouttrace" + otellognoop "go.opentelemetry.io/otel/log/noop" + otelmetricnoop "go.opentelemetry.io/otel/metric/noop" + sdklog "go.opentelemetry.io/otel/sdk/log" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + oteltracenoop "go.opentelemetry.io/otel/trace/noop" +) + +// Default client to fallback when is is not initialized properly +func NewNoopClient() Client { + cfg := DefaultConfig() + // Logger + loggerProvider := otellognoop.NewLoggerProvider() + logger := loggerProvider.Logger(cfg.PackageName) + // Tracer + tracerProvider := oteltracenoop.NewTracerProvider() + tracer := tracerProvider.Tracer(cfg.PackageName) + + // Meter + meterProvider := otelmetricnoop.NewMeterProvider() + meter := meterProvider.Meter(cfg.PackageName) + + // MessageEmitter + messageEmitter := noopMessageEmitter{} + + onClose := func() error { return nil } + + client := NewClient(cfg, logger, tracer, meter, messageEmitter, onClose) + + return client +} + +// NewStdoutClient creates a new BeholderClient with stdout exporters +// Use for testing and debugging +// Also this client is used as a noop client when otel exporter is not initialized properly +func NewStdoutClient() Client { + cfg := DefaultConfig() + // Logger + loggerExporter, _ := stdoutlog.New(stdoutlog.WithoutTimestamps()) // stdoutlog.New() never returns an error + loggerProvider := sdklog.NewLoggerProvider(sdklog.WithProcessor(sdklog.NewSimpleProcessor(loggerExporter))) + logger := loggerProvider.Logger(cfg.PackageName) + setOtelErrorHandler(func(err error) { + fmt.Printf("OTel error %s", err) + }) + + // Tracer + traceExporter, _ := stdouttrace.New() // stdouttrace.New() never returns an error + tracerProvider := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor( + sdktrace.NewSimpleSpanProcessor(traceExporter), + )) + tracer := tracerProvider.Tracer(cfg.PackageName) + + // Meter + metricExporter, _ := stdoutmetric.New() // stdoutmetric.New() never returns an error + meterProvider := sdkmetric.NewMeterProvider( + sdkmetric.WithReader( + sdkmetric.NewPeriodicReader( + metricExporter, + sdkmetric.WithInterval(time.Second), // Default is 10s + )), + ) + meter := meterProvider.Meter(cfg.PackageName) + + // MessageEmitter + messageEmitter := newMessageEmitter(loggerExporter, logger) + + onClose := closeFunc(context.Background(), loggerProvider, tracerProvider, meterProvider) + + client := NewClient(cfg, logger, tracer, meter, messageEmitter, onClose) + + return client +} + +type noopMessageEmitter struct{} + +func (noopMessageEmitter) Emit(ctx context.Context, body []byte, attrs map[string]any) error { + return nil +} +func (noopMessageEmitter) EmitMessage(ctx context.Context, message Message) error { + return nil +} diff --git a/pkg/beholder/pb/example.pb.go b/pkg/beholder/pb/example.pb.go new file mode 100644 index 000000000..9d8914468 --- /dev/null +++ b/pkg/beholder/pb/example.pb.go @@ -0,0 +1,185 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.31.0 +// protoc v4.25.1 +// source: example.proto + +package pb + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// Used for testing +type CustomMessage struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + BoolVal bool `protobuf:"varint,1,opt,name=bool_val,json=boolVal,proto3" json:"bool_val,omitempty"` + IntVal int64 `protobuf:"varint,2,opt,name=int_val,json=intVal,proto3" json:"int_val,omitempty"` + FloatVal float32 `protobuf:"fixed32,3,opt,name=float_val,json=floatVal,proto3" json:"float_val,omitempty"` + StringVal string `protobuf:"bytes,4,opt,name=string_val,json=stringVal,proto3" json:"string_val,omitempty"` + BytesVal []byte `protobuf:"bytes,5,opt,name=bytes_val,json=bytesVal,proto3" json:"bytes_val,omitempty"` +} + +func (x *CustomMessage) Reset() { + *x = CustomMessage{} + if protoimpl.UnsafeEnabled { + mi := &file_example_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *CustomMessage) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CustomMessage) ProtoMessage() {} + +func (x *CustomMessage) ProtoReflect() protoreflect.Message { + mi := &file_example_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CustomMessage.ProtoReflect.Descriptor instead. +func (*CustomMessage) Descriptor() ([]byte, []int) { + return file_example_proto_rawDescGZIP(), []int{0} +} + +func (x *CustomMessage) GetBoolVal() bool { + if x != nil { + return x.BoolVal + } + return false +} + +func (x *CustomMessage) GetIntVal() int64 { + if x != nil { + return x.IntVal + } + return 0 +} + +func (x *CustomMessage) GetFloatVal() float32 { + if x != nil { + return x.FloatVal + } + return 0 +} + +func (x *CustomMessage) GetStringVal() string { + if x != nil { + return x.StringVal + } + return "" +} + +func (x *CustomMessage) GetBytesVal() []byte { + if x != nil { + return x.BytesVal + } + return nil +} + +var File_example_proto protoreflect.FileDescriptor + +var file_example_proto_rawDesc = []byte{ + 0x0a, 0x0d, 0x65, 0x78, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, + 0x02, 0x70, 0x62, 0x22, 0x9c, 0x01, 0x0a, 0x0d, 0x43, 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x4d, 0x65, + 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x19, 0x0a, 0x08, 0x62, 0x6f, 0x6f, 0x6c, 0x5f, 0x76, 0x61, + 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, 0x62, 0x6f, 0x6f, 0x6c, 0x56, 0x61, 0x6c, + 0x12, 0x17, 0x0a, 0x07, 0x69, 0x6e, 0x74, 0x5f, 0x76, 0x61, 0x6c, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x03, 0x52, 0x06, 0x69, 0x6e, 0x74, 0x56, 0x61, 0x6c, 0x12, 0x1b, 0x0a, 0x09, 0x66, 0x6c, 0x6f, + 0x61, 0x74, 0x5f, 0x76, 0x61, 0x6c, 0x18, 0x03, 0x20, 0x01, 0x28, 0x02, 0x52, 0x08, 0x66, 0x6c, + 0x6f, 0x61, 0x74, 0x56, 0x61, 0x6c, 0x12, 0x1d, 0x0a, 0x0a, 0x73, 0x74, 0x72, 0x69, 0x6e, 0x67, + 0x5f, 0x76, 0x61, 0x6c, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x73, 0x74, 0x72, 0x69, + 0x6e, 0x67, 0x56, 0x61, 0x6c, 0x12, 0x1b, 0x0a, 0x09, 0x62, 0x79, 0x74, 0x65, 0x73, 0x5f, 0x76, + 0x61, 0x6c, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x08, 0x62, 0x79, 0x74, 0x65, 0x73, 0x56, + 0x61, 0x6c, 0x42, 0x3e, 0x5a, 0x3c, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, + 0x2f, 0x73, 0x6d, 0x61, 0x72, 0x74, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x61, 0x63, 0x74, 0x6b, 0x69, + 0x74, 0x2f, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x6c, 0x69, 0x6e, 0x6b, 0x2d, 0x63, 0x6f, 0x6d, 0x6d, + 0x6f, 0x6e, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x62, 0x65, 0x68, 0x6f, 0x6c, 0x64, 0x65, 0x72, 0x2f, + 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_example_proto_rawDescOnce sync.Once + file_example_proto_rawDescData = file_example_proto_rawDesc +) + +func file_example_proto_rawDescGZIP() []byte { + file_example_proto_rawDescOnce.Do(func() { + file_example_proto_rawDescData = protoimpl.X.CompressGZIP(file_example_proto_rawDescData) + }) + return file_example_proto_rawDescData +} + +var file_example_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_example_proto_goTypes = []interface{}{ + (*CustomMessage)(nil), // 0: pb.CustomMessage +} +var file_example_proto_depIdxs = []int32{ + 0, // [0:0] is the sub-list for method output_type + 0, // [0:0] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_example_proto_init() } +func file_example_proto_init() { + if File_example_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_example_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*CustomMessage); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_example_proto_rawDesc, + NumEnums: 0, + NumMessages: 1, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_example_proto_goTypes, + DependencyIndexes: file_example_proto_depIdxs, + MessageInfos: file_example_proto_msgTypes, + }.Build() + File_example_proto = out.File + file_example_proto_rawDesc = nil + file_example_proto_goTypes = nil + file_example_proto_depIdxs = nil +} diff --git a/pkg/beholder/pb/example.proto b/pkg/beholder/pb/example.proto new file mode 100644 index 000000000..55d1a9b7d --- /dev/null +++ b/pkg/beholder/pb/example.proto @@ -0,0 +1,14 @@ +syntax = "proto3"; + +option go_package = "github.com/smartcontractkit/chainlink-common/pkg/beholder/pb"; + +package pb; + +// Used for testing +message CustomMessage { + bool bool_val=1; + int64 int_val=2; + float float_val=3; + string string_val=4; + bytes bytes_val=5; +} \ No newline at end of file diff --git a/pkg/beholder/pb/generate.go b/pkg/beholder/pb/generate.go new file mode 100644 index 000000000..697c33637 --- /dev/null +++ b/pkg/beholder/pb/generate.go @@ -0,0 +1,3 @@ +//go:generate protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative example.proto + +package pb