diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2c9ddc3e..2ad8949a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -21,10 +21,10 @@ jobs: - name: Set up Go uses: actions/setup-go@v3 with: - go-version: "1.20" + go-version: 1.21 - - name: Build - run: go build -v ./... + - name: Build all modules + run: make build - - name: Test - run: go test -v ./... + - name: Test all modules + run: make test diff --git a/Makefile b/Makefile index aeaa1660..a123bd0b 100644 --- a/Makefile +++ b/Makefile @@ -111,6 +111,7 @@ BUILDER := builder .PHONY: genotelarrowcol genotelarrowcol: + $(GOCMD) install go.opentelemetry.io/collector/cmd/builder@latest $(BUILDER) --skip-compilation --config collector/cmd/otelarrowcol/build.yaml --output-path collector/cmd/otelarrowcol .PHONY: otelarrowcol diff --git a/collector/cmd/otelarrowcol/go.mod b/collector/cmd/otelarrowcol/go.mod index 9b326d51..4f1cbc64 100644 --- a/collector/cmd/otelarrowcol/go.mod +++ b/collector/cmd/otelarrowcol/go.mod @@ -2,7 +2,9 @@ module github.com/open-telemetry/otel-arrow/collector/cmd/otelarrowcol -go 1.20 +go 1.21 + +toolchain go1.21.3 require ( github.com/lightstep/telemetry-generator/generatorreceiver v0.15.0 diff --git a/collector/cmd/otelarrowcol/go.sum b/collector/cmd/otelarrowcol/go.sum index 50abaa0a..c6b9efc0 100644 --- a/collector/cmd/otelarrowcol/go.sum +++ b/collector/cmd/otelarrowcol/go.sum @@ -133,11 +133,13 @@ cloud.google.com/go/compute v1.15.1/go.mod h1:bjjoF/NtFUrkD/urWfdHaKuOPDR5nWIs63 cloud.google.com/go/compute v1.18.0/go.mod h1:1X7yHxec2Ga+Ss6jPyjxRxpu2uu7PLgsOVXvgU0yacs= cloud.google.com/go/compute v1.19.0/go.mod h1:rikpw2y+UMidAe9tISo04EHNOIf42RLYF/q8Bs93scU= cloud.google.com/go/compute v1.23.0 h1:tP41Zoavr8ptEqaW6j+LQOnyBBhO7OkOMAGrgLopTwY= +cloud.google.com/go/compute v1.23.0/go.mod h1:4tCnrn48xsqlwSAiLf1HXMQk8CONslYbdiEZc9FEIbM= cloud.google.com/go/compute/metadata v0.1.0/go.mod h1:Z1VN+bulIf6bt4P/C37K4DyZYZEXYonfTBHHFPO/4UU= cloud.google.com/go/compute/metadata v0.2.0/go.mod h1:zFmK7XCadkQkj6TtorcaGlCW1hT1fIilQDwofLpJ20k= cloud.google.com/go/compute/metadata v0.2.1/go.mod h1:jgHgmJd2RKBGzXqF5LR2EZMGxBkeanZ9wwa75XHJgOM= cloud.google.com/go/compute/metadata v0.2.3/go.mod h1:VAV5nSsACxMJvgaAuX6Pk2AawlZn8kiOGuCv6gTkwuA= cloud.google.com/go/compute/metadata v0.2.4-0.20230617002413-005d2dfb6b68 h1:aRVqY1p2IJaBGStWMsQMpkAa83cPkCDLl80eOj0Rbz4= +cloud.google.com/go/compute/metadata v0.2.4-0.20230617002413-005d2dfb6b68/go.mod h1:1a3eRNYX12fs5UABBIXS8HXVvQbX9hRB/RkEBPORpe8= cloud.google.com/go/contactcenterinsights v1.3.0/go.mod h1:Eu2oemoePuEFc/xKFPjbTuPSj0fYJcPls9TFlPNnHHY= cloud.google.com/go/contactcenterinsights v1.4.0/go.mod h1:L2YzkGbPsv+vMQMCADxJoT9YiTTnSEd6fEvCeHTYVck= cloud.google.com/go/contactcenterinsights v1.6.0/go.mod h1:IIDlT6CLcDoyv79kDv8iWxMSTZhLxSCofVV5W6YFM/w= @@ -647,6 +649,7 @@ github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx2 github.com/boombuler/barcode v1.0.0/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8= github.com/boombuler/barcode v1.0.1/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8= github.com/brianvoe/gofakeit/v6 v6.17.0 h1:obbQTJeHfktJtiZzq0Q1bEpsNUs+yHrYlPVWt7BtmJ4= +github.com/brianvoe/gofakeit/v6 v6.17.0/go.mod h1:Ow6qC71xtwm79anlwKRlWZW6zVq9D2XHE4QSSMP/rU8= github.com/btcsuite/btcd v0.20.1-beta/go.mod h1:wVuoA8VJLEcwgqHBwHmzLRazpKxTv13Px/pDuV7OomQ= github.com/btcsuite/btcd v0.22.0-beta.0.20220111032746-97732e52810c/go.mod h1:tjmYdS6MLJ5/s0Fj4DbLgSbDHbEqLJrtnHecBFkdz5M= github.com/btcsuite/btcd v0.23.0/go.mod h1:0QJIIN1wwIXF/3G/m87gIwGniDMDQqjVn4SZgnFpsYY= @@ -707,6 +710,7 @@ github.com/cncf/xds/go v0.0.0-20220314180256-7f1daf1720fc/go.mod h1:eXthEFrGJvWH github.com/cncf/xds/go v0.0.0-20230105202645-06c439db220b/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20230310173818-32f1caf87195/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20230607035331-e9ce68804cb4 h1:/inchEIKaYC1Akx+H+gqO04wryn5h75LSazbRlnya1k= +github.com/cncf/xds/go v0.0.0-20230607035331-e9ce68804cb4/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cockroachdb/datadriven v1.0.0/go.mod h1:5Ib8Meh+jk1RlHIXej6Pzevx/NLlNvQB9pmSBZErGA4= github.com/cockroachdb/datadriven v1.0.2/go.mod h1:a9RdTaap04u637JoCzcUoIcDmvwSUtcUFtT/C3kJlTU= github.com/cockroachdb/datadriven v1.0.3-0.20230801171734-e384cf455877/go.mod h1:a9RdTaap04u637JoCzcUoIcDmvwSUtcUFtT/C3kJlTU= @@ -805,6 +809,7 @@ github.com/envoyproxy/protoc-gen-validate v0.6.7/go.mod h1:dyJXwwfPK2VSqiB9Klm1J github.com/envoyproxy/protoc-gen-validate v0.9.1/go.mod h1:OKNgG7TCp5pF4d6XftA0++PMirau2/yoOwVac3AbF2w= github.com/envoyproxy/protoc-gen-validate v0.10.0/go.mod h1:DRjgyB0I43LtJapqN6NiRwroiAU2PaFuvk/vjgh61ss= github.com/envoyproxy/protoc-gen-validate v1.0.2 h1:QkIBuU5k+x7/QXPvPPnWXWlCdaBFApVqftFV6k087DA= +github.com/envoyproxy/protoc-gen-validate v1.0.2/go.mod h1:GpiZQP3dDbg4JouG/NNS7QWXpgx6x8QiMKdmN72jogE= github.com/etcd-io/bbolt v1.3.3/go.mod h1:ZF2nL25h33cCyBtcyWeZ2/I3HQOfTP+0PIEvHjkjCrw= github.com/ethereum/c-kzg-4844 v0.3.1/go.mod h1:VewdlzQmpT5QSrVhbBuGoCdFJkpaJlO1aQputP83wc0= github.com/ethereum/go-ethereum v1.10.26/go.mod h1:EYFyF19u3ezGLD4RqOkLq+ZCXzYbLoNDdZlMt7kyKFg= @@ -930,6 +935,7 @@ github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfU github.com/golang/glog v1.0.0/go.mod h1:EWib/APOK0SL3dFbYqvxE3UYd8E6s1ouQ7iEp/0LWV4= github.com/golang/glog v1.1.0/go.mod h1:pfYeQZ3JWZoXTV5sFc986z3HTpwQs9At6P4ImfuP3NQ= github.com/golang/glog v1.1.2 h1:DVjP2PbBOzHyzA+dn3WhHIq4NdVu3Q+pvivFICf/7fo= +github.com/golang/glog v1.1.2/go.mod h1:zR+okUeTbrL6EL3xHUDxZuEtGv04p5shwip1+mL/rLQ= github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= @@ -939,6 +945,7 @@ github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4er github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.5.0/go.mod h1:CWnOUgYIOo4TcNZ0wHX3YZCqsaM1I1Jvs6v3mP3KVu8= github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= +github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -1349,6 +1356,7 @@ github.com/open-telemetry/opentelemetry-collector-contrib/extension/headerssette github.com/open-telemetry/opentelemetry-collector-contrib/extension/pprofextension v0.88.0 h1:52tD8EO8/tTsouMlRUlNfToPG7zPifJkbcsTmcCIScY= github.com/open-telemetry/opentelemetry-collector-contrib/extension/pprofextension v0.88.0/go.mod h1:qygQpMj8Xk4J9YQMgsvC5raRRfNqkkPJRFZ6ZFPL7MI= github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.88.0 h1:ornGkT2YBY/8W4kcVnErFehd6NUHqUW8g36DG7+3tCQ= +github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.88.0/go.mod h1:l2gdVngRvmSczRunw8WWun/mmkUkLuDSua2cWzalqrM= github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/opentracing/opentracing-go v1.0.3-0.20180606204148-bd9c31933947/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= @@ -1441,6 +1449,7 @@ github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6po github.com/rogpeppe/go-internal v1.8.1/go.mod h1:JeRgkft04UBgHMgCIwADu4Pn6Mtm5d4nPKWu0nJ5d+o= github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/rs/cors v1.7.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU= github.com/rs/cors v1.10.1 h1:L0uuZVXIKlI1SShY2nhFfo44TYvDPQ1w4oFkUJNfhyo= github.com/rs/cors v1.10.1/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU= @@ -1658,6 +1667,7 @@ go.opentelemetry.io/collector/extension v0.88.0/go.mod h1:5wPlOyWtVJcZS9CMhFUnuR go.opentelemetry.io/collector/extension/auth v0.88.0 h1:4vjVCWwkh3uGqClM32jbmUDGzZ22lK4IdoSTd5xTu/s= go.opentelemetry.io/collector/extension/auth v0.88.0/go.mod h1:IcwiPhj6ZGTaZ7WVRVvl97uyw9NShsVqcTRLtXddpK0= go.opentelemetry.io/collector/extension/zpagesextension v0.88.0 h1:cpkwzjhq6jfkVq3ltUl9wdb/8RrWbn0utHTCU3K5Mhc= +go.opentelemetry.io/collector/extension/zpagesextension v0.88.0/go.mod h1:8LPmV8UkQgDAfNaAizQqLzYnYibzQv81eBGKv0Mk6wU= go.opentelemetry.io/collector/featuregate v1.0.0-rcv0017 h1:DtJQalPXMWQqT6jd2LZ1oKrOfLJJRCi+rh2LKnkj4Zo= go.opentelemetry.io/collector/featuregate v1.0.0-rcv0017/go.mod h1:fLmJMf1AoHttkF8p5oJAc4o5ZpHu8yO5XYJ7gbLCLzo= go.opentelemetry.io/collector/otelcol v0.88.0 h1:f2eRVLJY66w9WFj5iT1Tg6Qxtlljagov9v8TPStuK2g= @@ -1671,6 +1681,7 @@ go.opentelemetry.io/collector/processor/batchprocessor v0.88.0/go.mod h1:SQhHxRc go.opentelemetry.io/collector/receiver v0.88.0 h1:MPvVAFOfjl0+Ylka7so8QoK8T2Za2471rv5t3sqbbSY= go.opentelemetry.io/collector/receiver v0.88.0/go.mod h1:MIZ6jPPZ+I8XibZm6I3RAn9h7Wcy2ZJsPmtXd2BLr60= go.opentelemetry.io/collector/receiver/otlpreceiver v0.88.0 h1:Sh+Y4RB6C7ZQkMPBvaSSKMWIYYtItahWnq1DJ5ddTW4= +go.opentelemetry.io/collector/receiver/otlpreceiver v0.88.0/go.mod h1:Qr01GQKUmFnbgQApFhq4LNuS+n0YURfJfgLiaydwZRc= go.opentelemetry.io/collector/semconv v0.88.0 h1:8TVP4hYaUC87S6CCLKNoSxsUE0ChldE4vqotvNHHUnE= go.opentelemetry.io/collector/semconv v0.88.0/go.mod h1:j/8THcqVxFna1FpvA2zYIsUperEtOaRaqoLYIN4doWw= go.opentelemetry.io/collector/service v0.88.0 h1:KSue2w94Tb2xjenlm+SC2y2g87hdhFJeHMT9pEshKAE= @@ -1682,6 +1693,7 @@ go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.45.0/go.mod h1: go.opentelemetry.io/contrib/propagators/b3 v1.20.0 h1:Yty9Vs4F3D6/liF1o6FNt0PvN85h/BJJ6DQKJ3nrcM0= go.opentelemetry.io/contrib/propagators/b3 v1.20.0/go.mod h1:On4VgbkqYL18kbJlWsa18+cMNe6rYpBnPi1ARI/BrsU= go.opentelemetry.io/contrib/zpages v0.45.0 h1:jIwHHGoWzJoZdbIUtWdErjL85Gni6BignnAFqDtMRL4= +go.opentelemetry.io/contrib/zpages v0.45.0/go.mod h1:4mIdA5hqH6hEx9sZgV50qKfQO8aIYolUZboHmz+G7vw= go.opentelemetry.io/otel v1.19.0 h1:MuS/TNf4/j4IXsZuJegVzI1cwut7Qc00344rgH7p8bs= go.opentelemetry.io/otel v1.19.0/go.mod h1:i0QyjOq3UPoTzff0PJB2N66fb4S0+rSbSB15/oyH9fY= go.opentelemetry.io/otel/bridge/opencensus v0.42.0 h1:QvC+bcZkWMphWPiVqRQygMj6M0/3TOuJEO+erRA7kI8= @@ -1722,6 +1734,7 @@ go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/automaxprocs v1.5.2/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0= go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= +go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= @@ -1923,6 +1936,7 @@ golang.org/x/oauth2 v0.5.0/go.mod h1:9/XBHVqLaWO3/BRHs5jbpYCnOZVjj5V0ndyaAM7KB4I golang.org/x/oauth2 v0.6.0/go.mod h1:ycmewcwgD4Rpr3eZJLSB4Kyyljb3qDh40vJ8STE5HKw= golang.org/x/oauth2 v0.7.0/go.mod h1:hPLQkd9LyjfXTiRohC/41GhcFqxisoUQ99sCUOHO9x4= golang.org/x/oauth2 v0.11.0 h1:vPL4xzxBM4niKCW6g9whtaWVXTJf1U5e4aZxxFx/gbU= +golang.org/x/oauth2 v0.11.0/go.mod h1:LdF7O/8bLR/qWK9DrpXmbHLTouvRHK0SgJl0GmDBchk= golang.org/x/perf v0.0.0-20230113213139-801c7ef9e5c5/go.mod h1:UBKtEnL8aqnd+0JHqZ+2qoMDwtuy6cYhhKNoHLBiTQc= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -2301,6 +2315,7 @@ google.golang.org/genproto v0.0.0-20230331144136-dcfb400f0633/go.mod h1:UUQDJDOl google.golang.org/genproto v0.0.0-20230403163135-c38d8f061ccd/go.mod h1:UUQDJDOlWu4KYeJZffbWgBkS1YFobzKbLVfK69pe0Ak= google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1/go.mod h1:nKE/iIaLqn2bQwXBg8f1g2Ylh6r5MN5CmZvuzZCgsCU= google.golang.org/genproto v0.0.0-20231002182017-d307bd883b97 h1:SeZZZx0cP0fqUyA+oRzP9k7cSwJlvDFiROO72uwD6i0= +google.golang.org/genproto v0.0.0-20231002182017-d307bd883b97/go.mod h1:t1VqOqqvce95G3hIDCT5FeO3YUc6Q4Oe24L/+rNMxRk= google.golang.org/genproto/googleapis/api v0.0.0-20230822172742-b8732ec3820d h1:DoPTO70H+bcDXcd39vOqb2viZxgqeBeSGtZ55yZU4/Q= google.golang.org/genproto/googleapis/api v0.0.0-20230822172742-b8732ec3820d/go.mod h1:KjSP20unUpOx5kyQUFa7k4OJg0qeJ7DEZflGDu2p6Bk= google.golang.org/genproto/googleapis/rpc v0.0.0-20231012201019-e917dd12ba7a h1:a2MQQVoTo96JC9PMGtGBymLp7+/RzpFc2yX/9WfFg1c= diff --git a/collector/go.mod b/collector/go.mod index 2212b6dc..1f091da9 100644 --- a/collector/go.mod +++ b/collector/go.mod @@ -1,6 +1,6 @@ module github.com/open-telemetry/otel-arrow/collector -go 1.20 +go 1.21 require ( contrib.go.opencensus.io/exporter/prometheus v0.4.2 @@ -41,7 +41,6 @@ require ( go.opentelemetry.io/otel/metric v1.19.0 go.opentelemetry.io/otel/sdk v1.19.0 go.opentelemetry.io/otel/sdk/metric v1.19.0 - go.opentelemetry.io/otel/trace v1.19.0 go.opentelemetry.io/proto/otlp v1.0.0 go.uber.org/multierr v1.11.0 go.uber.org/zap v1.26.0 @@ -114,6 +113,7 @@ require ( go.opentelemetry.io/collector/featuregate v1.0.0-rcv0017 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.45.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.45.0 // indirect + go.opentelemetry.io/otel/trace v1.19.0 // indirect golang.org/x/crypto v0.14.0 // indirect golang.org/x/mod v0.11.0 // indirect golang.org/x/sys v0.13.0 // indirect diff --git a/collector/go.sum b/collector/go.sum index 7ab1e329..b14e075d 100644 --- a/collector/go.sum +++ b/collector/go.sum @@ -133,11 +133,13 @@ cloud.google.com/go/compute v1.15.1/go.mod h1:bjjoF/NtFUrkD/urWfdHaKuOPDR5nWIs63 cloud.google.com/go/compute v1.18.0/go.mod h1:1X7yHxec2Ga+Ss6jPyjxRxpu2uu7PLgsOVXvgU0yacs= cloud.google.com/go/compute v1.19.0/go.mod h1:rikpw2y+UMidAe9tISo04EHNOIf42RLYF/q8Bs93scU= cloud.google.com/go/compute v1.23.0 h1:tP41Zoavr8ptEqaW6j+LQOnyBBhO7OkOMAGrgLopTwY= +cloud.google.com/go/compute v1.23.0/go.mod h1:4tCnrn48xsqlwSAiLf1HXMQk8CONslYbdiEZc9FEIbM= cloud.google.com/go/compute/metadata v0.1.0/go.mod h1:Z1VN+bulIf6bt4P/C37K4DyZYZEXYonfTBHHFPO/4UU= cloud.google.com/go/compute/metadata v0.2.0/go.mod h1:zFmK7XCadkQkj6TtorcaGlCW1hT1fIilQDwofLpJ20k= cloud.google.com/go/compute/metadata v0.2.1/go.mod h1:jgHgmJd2RKBGzXqF5LR2EZMGxBkeanZ9wwa75XHJgOM= cloud.google.com/go/compute/metadata v0.2.3/go.mod h1:VAV5nSsACxMJvgaAuX6Pk2AawlZn8kiOGuCv6gTkwuA= cloud.google.com/go/compute/metadata v0.2.4-0.20230617002413-005d2dfb6b68 h1:aRVqY1p2IJaBGStWMsQMpkAa83cPkCDLl80eOj0Rbz4= +cloud.google.com/go/compute/metadata v0.2.4-0.20230617002413-005d2dfb6b68/go.mod h1:1a3eRNYX12fs5UABBIXS8HXVvQbX9hRB/RkEBPORpe8= cloud.google.com/go/contactcenterinsights v1.3.0/go.mod h1:Eu2oemoePuEFc/xKFPjbTuPSj0fYJcPls9TFlPNnHHY= cloud.google.com/go/contactcenterinsights v1.4.0/go.mod h1:L2YzkGbPsv+vMQMCADxJoT9YiTTnSEd6fEvCeHTYVck= cloud.google.com/go/contactcenterinsights v1.6.0/go.mod h1:IIDlT6CLcDoyv79kDv8iWxMSTZhLxSCofVV5W6YFM/w= @@ -645,6 +647,7 @@ github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx2 github.com/boombuler/barcode v1.0.0/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8= github.com/boombuler/barcode v1.0.1/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8= github.com/brianvoe/gofakeit/v6 v6.17.0 h1:obbQTJeHfktJtiZzq0Q1bEpsNUs+yHrYlPVWt7BtmJ4= +github.com/brianvoe/gofakeit/v6 v6.17.0/go.mod h1:Ow6qC71xtwm79anlwKRlWZW6zVq9D2XHE4QSSMP/rU8= github.com/btcsuite/btcd v0.20.1-beta/go.mod h1:wVuoA8VJLEcwgqHBwHmzLRazpKxTv13Px/pDuV7OomQ= github.com/btcsuite/btcd v0.22.0-beta.0.20220111032746-97732e52810c/go.mod h1:tjmYdS6MLJ5/s0Fj4DbLgSbDHbEqLJrtnHecBFkdz5M= github.com/btcsuite/btcd v0.23.0/go.mod h1:0QJIIN1wwIXF/3G/m87gIwGniDMDQqjVn4SZgnFpsYY= @@ -705,6 +708,7 @@ github.com/cncf/xds/go v0.0.0-20220314180256-7f1daf1720fc/go.mod h1:eXthEFrGJvWH github.com/cncf/xds/go v0.0.0-20230105202645-06c439db220b/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20230310173818-32f1caf87195/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20230607035331-e9ce68804cb4 h1:/inchEIKaYC1Akx+H+gqO04wryn5h75LSazbRlnya1k= +github.com/cncf/xds/go v0.0.0-20230607035331-e9ce68804cb4/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cockroachdb/datadriven v1.0.0/go.mod h1:5Ib8Meh+jk1RlHIXej6Pzevx/NLlNvQB9pmSBZErGA4= github.com/cockroachdb/datadriven v1.0.2/go.mod h1:a9RdTaap04u637JoCzcUoIcDmvwSUtcUFtT/C3kJlTU= github.com/cockroachdb/datadriven v1.0.3-0.20230801171734-e384cf455877/go.mod h1:a9RdTaap04u637JoCzcUoIcDmvwSUtcUFtT/C3kJlTU= @@ -803,6 +807,7 @@ github.com/envoyproxy/protoc-gen-validate v0.6.7/go.mod h1:dyJXwwfPK2VSqiB9Klm1J github.com/envoyproxy/protoc-gen-validate v0.9.1/go.mod h1:OKNgG7TCp5pF4d6XftA0++PMirau2/yoOwVac3AbF2w= github.com/envoyproxy/protoc-gen-validate v0.10.0/go.mod h1:DRjgyB0I43LtJapqN6NiRwroiAU2PaFuvk/vjgh61ss= github.com/envoyproxy/protoc-gen-validate v1.0.2 h1:QkIBuU5k+x7/QXPvPPnWXWlCdaBFApVqftFV6k087DA= +github.com/envoyproxy/protoc-gen-validate v1.0.2/go.mod h1:GpiZQP3dDbg4JouG/NNS7QWXpgx6x8QiMKdmN72jogE= github.com/etcd-io/bbolt v1.3.3/go.mod h1:ZF2nL25h33cCyBtcyWeZ2/I3HQOfTP+0PIEvHjkjCrw= github.com/ethereum/c-kzg-4844 v0.3.1/go.mod h1:VewdlzQmpT5QSrVhbBuGoCdFJkpaJlO1aQputP83wc0= github.com/ethereum/go-ethereum v1.10.26/go.mod h1:EYFyF19u3ezGLD4RqOkLq+ZCXzYbLoNDdZlMt7kyKFg= @@ -927,6 +932,7 @@ github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfU github.com/golang/glog v1.0.0/go.mod h1:EWib/APOK0SL3dFbYqvxE3UYd8E6s1ouQ7iEp/0LWV4= github.com/golang/glog v1.1.0/go.mod h1:pfYeQZ3JWZoXTV5sFc986z3HTpwQs9At6P4ImfuP3NQ= github.com/golang/glog v1.1.2 h1:DVjP2PbBOzHyzA+dn3WhHIq4NdVu3Q+pvivFICf/7fo= +github.com/golang/glog v1.1.2/go.mod h1:zR+okUeTbrL6EL3xHUDxZuEtGv04p5shwip1+mL/rLQ= github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= @@ -1003,6 +1009,7 @@ github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+ github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4= +github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/enterprise-certificate-proxy v0.0.0-20220520183353-fd19c99a87aa/go.mod h1:17drOmN3MwGY7t0e+Ei9b45FFGA3fBs3x36SsCg1hq8= github.com/googleapis/enterprise-certificate-proxy v0.1.0/go.mod h1:17drOmN3MwGY7t0e+Ei9b45FFGA3fBs3x36SsCg1hq8= github.com/googleapis/enterprise-certificate-proxy v0.2.0/go.mod h1:8C0jb7/mgJe/9KK8Lm7X9ctZC2t60YyIpYEI16jx0Qg= @@ -1423,6 +1430,7 @@ github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6po github.com/rogpeppe/go-internal v1.8.1/go.mod h1:JeRgkft04UBgHMgCIwADu4Pn6Mtm5d4nPKWu0nJ5d+o= github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/rs/cors v1.7.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU= github.com/rs/cors v1.10.1 h1:L0uuZVXIKlI1SShY2nhFfo44TYvDPQ1w4oFkUJNfhyo= github.com/rs/cors v1.10.1/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU= @@ -1658,6 +1666,7 @@ go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/automaxprocs v1.5.2/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0= go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= +go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= @@ -1859,6 +1868,7 @@ golang.org/x/oauth2 v0.5.0/go.mod h1:9/XBHVqLaWO3/BRHs5jbpYCnOZVjj5V0ndyaAM7KB4I golang.org/x/oauth2 v0.6.0/go.mod h1:ycmewcwgD4Rpr3eZJLSB4Kyyljb3qDh40vJ8STE5HKw= golang.org/x/oauth2 v0.7.0/go.mod h1:hPLQkd9LyjfXTiRohC/41GhcFqxisoUQ99sCUOHO9x4= golang.org/x/oauth2 v0.11.0 h1:vPL4xzxBM4niKCW6g9whtaWVXTJf1U5e4aZxxFx/gbU= +golang.org/x/oauth2 v0.11.0/go.mod h1:LdF7O/8bLR/qWK9DrpXmbHLTouvRHK0SgJl0GmDBchk= golang.org/x/perf v0.0.0-20230113213139-801c7ef9e5c5/go.mod h1:UBKtEnL8aqnd+0JHqZ+2qoMDwtuy6cYhhKNoHLBiTQc= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -2089,6 +2099,7 @@ gonum.org/v1/gonum v0.8.2/go.mod h1:oe/vMfY3deqTw+1EZJhuvEW2iwGF1bW9wwu7XCu0+v0= gonum.org/v1/gonum v0.9.3/go.mod h1:TZumC3NeyVQskjXqmyWt4S3bINhy7B4eYwW69EbyX+0= gonum.org/v1/gonum v0.11.0/go.mod h1:fSG4YDCxxUZQJ7rKsQrj0gMOg00Il0Z96/qMA4bVQhA= gonum.org/v1/gonum v0.14.0 h1:2NiG67LD1tEH0D7kM+ps2V+fXmsAnpUeec7n8tcr4S0= +gonum.org/v1/gonum v0.14.0/go.mod h1:AoWeoz0becf9QMWtE8iWXNXc27fK4fNeHNf/oMejGfU= gonum.org/v1/netlib v0.0.0-20181029234149-ec6d1f5cefe6/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw= gonum.org/v1/netlib v0.0.0-20190313105609-8cb42192e0e0/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw= gonum.org/v1/plot v0.0.0-20190515093506-e2840ee46a6b/go.mod h1:Wt8AAjI+ypCyYX3nZBvf6cAIx93T+c/OS2HFAYskSZc= @@ -2237,6 +2248,7 @@ google.golang.org/genproto v0.0.0-20230331144136-dcfb400f0633/go.mod h1:UUQDJDOl google.golang.org/genproto v0.0.0-20230403163135-c38d8f061ccd/go.mod h1:UUQDJDOlWu4KYeJZffbWgBkS1YFobzKbLVfK69pe0Ak= google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1/go.mod h1:nKE/iIaLqn2bQwXBg8f1g2Ylh6r5MN5CmZvuzZCgsCU= google.golang.org/genproto v0.0.0-20231002182017-d307bd883b97 h1:SeZZZx0cP0fqUyA+oRzP9k7cSwJlvDFiROO72uwD6i0= +google.golang.org/genproto v0.0.0-20231002182017-d307bd883b97/go.mod h1:t1VqOqqvce95G3hIDCT5FeO3YUc6Q4Oe24L/+rNMxRk= google.golang.org/genproto/googleapis/api v0.0.0-20230822172742-b8732ec3820d h1:DoPTO70H+bcDXcd39vOqb2viZxgqeBeSGtZ55yZU4/Q= google.golang.org/genproto/googleapis/api v0.0.0-20230822172742-b8732ec3820d/go.mod h1:KjSP20unUpOx5kyQUFa7k4OJg0qeJ7DEZflGDu2p6Bk= google.golang.org/genproto/googleapis/rpc v0.0.0-20231012201019-e917dd12ba7a h1:a2MQQVoTo96JC9PMGtGBymLp7+/RzpFc2yX/9WfFg1c= diff --git a/collector/processor/batchprocessor/batch_processor.go b/collector/processor/batchprocessor/batch_processor.go index 2c2203e3..f1ec0b95 100644 --- a/collector/processor/batchprocessor/batch_processor.go +++ b/collector/processor/batchprocessor/batch_processor.go @@ -14,6 +14,7 @@ import ( "time" "go.opentelemetry.io/otel/attribute" + "go.uber.org/multierr" "go.uber.org/zap" "go.opentelemetry.io/collector/client" @@ -26,8 +27,10 @@ import ( "go.opentelemetry.io/collector/processor" ) -// errTooManyBatchers is returned when the MetadataCardinalityLimit has been reached. -var errTooManyBatchers = consumererror.NewPermanent(errors.New("too many batcher metadata-value combinations")) +var ( + // errTooManyBatchers is returned when the MetadataCardinalityLimit has been reached. + errTooManyBatchers = consumererror.NewPermanent(errors.New("too many batcher metadata-value combinations")) +) // batch_processor is a component that accepts spans and metrics, places them // into batches and sends downstream. @@ -86,11 +89,25 @@ type shard struct { timer *time.Timer // newItem is used to receive data items from producers. - newItem chan any + newItem chan dataItem // batch is an in-flight data item containing one of the // underlying data types. batch batch + + pending []pendingItem + + totalSent int +} + +type pendingItem struct { + numItems int + respCh chan error +} + +type dataItem struct { + data any + responseCh chan error } // batch is an interface generalizing the individual signal types. @@ -105,6 +122,44 @@ type batch interface { add(item any) } +// partialError is useful when a producer adds items that are split +// between multiple batches. This signals that producers should continue +// waiting until a completeError is received. +type partialError struct { + err error +} + +func (pe partialError) Error() string { + if pe.err == nil { + return "" + } + return fmt.Sprintf("batch partial error: %s", pe.err.Error()) +} + +func (pe partialError) Unwrap() error { + return pe.err +} + +func isPartialError(err error) bool { + return errors.Is(err, partialError{err: errors.Unwrap(err)}) +} + +type completeError struct { + err error +} + +func (ce completeError) Error() string { + if ce.err == nil { + return "" + } + return fmt.Sprintf("batch complete error: %s", ce.err.Error()) + +} + +func (ce completeError) Unwrap() error { + return ce.err +} + var _ consumer.Traces = (*batchProcessor)(nil) var _ consumer.Metrics = (*batchProcessor)(nil) var _ consumer.Logs = (*batchProcessor)(nil) @@ -152,10 +207,11 @@ func (bp *batchProcessor) newShard(md map[string][]string) *shard { }) b := &shard{ processor: bp, - newItem: make(chan any, runtime.NumCPU()), + newItem: make(chan dataItem, runtime.NumCPU()), exportCtx: exportCtx, batch: bp.batchFunc(), } + b.processor.goroutines.Add(1) go b.start() return b @@ -205,11 +261,11 @@ func (b *shard) start() { if b.batch.itemCount() > 0 { // TODO: Set a timeout on sendTraces or // make it cancellable using the context that Shutdown gets as a parameter - b.sendItems(triggerTimeout) + b.sendItems(triggerShutdown) } return case item := <-b.newItem: - if item == nil { + if item.data == nil { continue } b.processItem(item) @@ -222,12 +278,26 @@ func (b *shard) start() { } } -func (b *shard) processItem(item any) { - b.batch.add(item) +func (b *shard) processItem(item dataItem) { + before := b.batch.itemCount() + b.batch.add(item.data) + after := b.batch.itemCount() + + totalItems := after - before + b.pending = append(b.pending, pendingItem{ + numItems: totalItems, + respCh: item.responseCh, + }) + + b.flushItems() +} + +func (b *shard) flushItems() { sent := false + for b.batch.itemCount() > 0 && (!b.hasTimer() || b.batch.itemCount() >= b.processor.sendBatchSize) { - sent = true b.sendItems(triggerBatchSize) + sent = true } if sent { @@ -254,6 +324,31 @@ func (b *shard) resetTimer() { func (b *shard) sendItems(trigger trigger) { sent, bytes, err := b.batch.export(b.exportCtx, b.processor.sendBatchMaxSize, b.processor.telemetry.detailed) + + numItemsBefore := b.totalSent + numItemsAfter := b.totalSent + sent + // The current batch can contain items from several different producers. Ensure each producer gets a response back. + for len(b.pending) > 0 && numItemsBefore < numItemsAfter { + // Waiter only had some items in the current batch + if numItemsBefore + b.pending[0].numItems > numItemsAfter { + b.pending[0].numItems -= (numItemsAfter - numItemsBefore) + b.pending[0].respCh <- partialError{err: err} + numItemsBefore = numItemsAfter + } else { // waiter gets a complete response. + numItemsBefore += b.pending[0].numItems + b.pending[0].respCh <- completeError{err: err} + + // complete response sent so b.pending[0] can be popped from queue. + if len(b.pending) > 1 { + b.pending = b.pending[1:] + } else { + b.pending = []pendingItem{} + } + } + } + + b.totalSent = numItemsAfter + if err != nil { b.processor.logger.Warn("Sender failed", zap.Error(err)) } else { @@ -267,8 +362,39 @@ type singleShardBatcher struct { batcher *shard } -func (sb *singleShardBatcher) consume(_ context.Context, data any) error { - sb.batcher.newItem <- data +func (sb *singleShardBatcher) consume(ctx context.Context, data any) error { + respCh := make(chan error, 1) + // TODO: add a semaphore to only write to channel if sizeof(data) keeps + // us below some configured inflight byte limit. + item := dataItem{ + data: data, + responseCh: respCh, + } + select { + case <-ctx.Done(): + return ctx.Err() + case sb.batcher.newItem <- item: + } + var err error + + for { + select { + case newErr := <-respCh: + // nil response might be wrapped as an error. + if errors.Unwrap(newErr) != nil { + err = multierr.Append(err, newErr) + } + + if isPartialError(newErr) { + continue + } + + return err + case <-ctx.Done(): + err = multierr.Append(err, ctx.Err()) + return err + } + } return nil } @@ -288,6 +414,7 @@ type multiShardBatcher struct { } func (mb *multiShardBatcher) consume(ctx context.Context, data any) error { + respCh := make(chan error, 1) // Get each metadata key value, form the corresponding // attribute set for use as a map lookup key. info := client.FromContext(ctx) @@ -324,10 +451,43 @@ func (mb *multiShardBatcher) consume(ctx context.Context, data any) error { } mb.lock.Unlock() } - b.(*shard).newItem <- data + + item := dataItem{ + data: data, + responseCh: respCh, + } + select { + case <-ctx.Done(): + return ctx.Err() + case b.(*shard).newItem <- item: + } + + var err error + for { + select { + case newErr := <-respCh: + // nil response might be wrapped as an error. + if errors.Unwrap(newErr) != nil { + err = multierr.Append(err, newErr) + } + + if isPartialError(newErr) { + continue + } + + return err + case <-ctx.Done(): + err = multierr.Append(err, ctx.Err()) + return err + } + } return nil } +func recordBatchError(err error) error { + return fmt.Errorf("Batch contained errors: %w", err) +} + func (mb *multiShardBatcher) currentMetadataCardinality() int { mb.lock.Lock() defer mb.lock.Unlock() @@ -378,6 +538,7 @@ func newBatchTraces(nextConsumer consumer.Traces) *batchTraces { // add updates current batchTraces by adding new TraceData object func (bt *batchTraces) add(item any) { td := item.(ptrace.Traces) + newSpanCount := td.SpanCount() if newSpanCount == 0 { return @@ -502,4 +663,4 @@ func (bl *batchLogs) add(item any) { } bl.logCount += newLogsCount ld.ResourceLogs().MoveAndAppendTo(bl.logData.ResourceLogs()) -} \ No newline at end of file +} diff --git a/collector/processor/batchprocessor/batch_processor_test.go b/collector/processor/batchprocessor/batch_processor_test.go index 558d871a..e4b8b55f 100644 --- a/collector/processor/batchprocessor/batch_processor_test.go +++ b/collector/processor/batchprocessor/batch_processor_test.go @@ -5,6 +5,7 @@ package batchprocessor import ( "context" + "errors" "fmt" "math" "sync" @@ -14,17 +15,18 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/open-telemetry/otel-arrow/collector/processor/batchprocessor/testdata" "go.opentelemetry.io/collector/client" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/collector/processor" "go.opentelemetry.io/collector/processor/processortest" - "github.com/open-telemetry/otel-arrow/collector/processor/batchprocessor/testdata" ) func TestProcessorShutdown(t *testing.T) { @@ -82,15 +84,17 @@ func TestBatchProcessorSpansDelivered(t *testing.T) { sink := new(consumertest.TracesSink) cfg := createDefaultConfig().(*Config) cfg.SendBatchSize = 128 + cfg.Timeout = 5 * time.Second creationSet := processortest.NewNopCreateSettings() creationSet.MetricsLevel = configtelemetry.LevelDetailed - batcher, err := newBatchTracesProcessor(creationSet, sink, cfg, false) + batcher, err := newBatchTracesProcessor(creationSet, sink, cfg, true) require.NoError(t, err) require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) requestCount := 1000 spansPerRequest := 100 sentResourceSpans := ptrace.NewTraces().ResourceSpans() + var wg sync.WaitGroup for requestNum := 0; requestNum < requestCount; requestNum++ { td := testdata.GenerateTraces(spansPerRequest) spans := td.ResourceSpans().At(0).ScopeSpans().At(0).Spans() @@ -98,13 +102,24 @@ func TestBatchProcessorSpansDelivered(t *testing.T) { spans.At(spanIndex).SetName(getTestSpanName(requestNum, spanIndex)) } td.ResourceSpans().At(0).CopyTo(sentResourceSpans.AppendEmpty()) - assert.NoError(t, batcher.ConsumeTraces(context.Background(), td)) + // ConsumeTraces is a blocking function and should be run in a go routine + // until batch size reached to unblock. + wg.Add(1) + go func() { + assert.NoError(t, batcher.ConsumeTraces(context.Background(), td)) + wg.Done() + }() } // Added to test logic that check for empty resources. td := ptrace.NewTraces() - assert.NoError(t, batcher.ConsumeTraces(context.Background(), td)) + wg.Add(1) + go func() { + assert.NoError(t, batcher.ConsumeTraces(context.Background(), td)) + wg.Done() + }() + wg.Wait() require.NoError(t, batcher.Shutdown(context.Background())) require.Equal(t, requestCount*spansPerRequest, sink.SpanCount()) @@ -125,35 +140,39 @@ func TestBatchProcessorSpansDeliveredEnforceBatchSize(t *testing.T) { cfg := createDefaultConfig().(*Config) cfg.SendBatchSize = 128 cfg.SendBatchMaxSize = 130 + cfg.Timeout = 2 * time.Second creationSet := processortest.NewNopCreateSettings() creationSet.MetricsLevel = configtelemetry.LevelDetailed - batcher, err := newBatchTracesProcessor(creationSet, sink, cfg, false) + batcher, err := newBatchTracesProcessor(creationSet, sink, cfg, true) require.NoError(t, err) require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) requestCount := 1000 spansPerRequest := 150 + var wg sync.WaitGroup for requestNum := 0; requestNum < requestCount; requestNum++ { td := testdata.GenerateTraces(spansPerRequest) spans := td.ResourceSpans().At(0).ScopeSpans().At(0).Spans() for spanIndex := 0; spanIndex < spansPerRequest; spanIndex++ { spans.At(spanIndex).SetName(getTestSpanName(requestNum, spanIndex)) } - assert.NoError(t, batcher.ConsumeTraces(context.Background(), td)) + wg.Add(1) + go func() { + assert.NoError(t, batcher.ConsumeTraces(context.Background(), td)) + wg.Done() + }() } // Added to test logic that check for empty resources. td := ptrace.NewTraces() - require.NoError(t, batcher.ConsumeTraces(context.Background(), td)) - - // wait for all spans to be reported - for { - if sink.SpanCount() == requestCount*spansPerRequest { - break - } - <-time.After(cfg.Timeout) - } - + wg.Add(1) + go func() { + require.NoError(t, batcher.ConsumeTraces(context.Background(), td)) + wg.Done() + }() + + // shutdown will flush any remaining spans + wg.Wait() require.NoError(t, batcher.Shutdown(context.Background())) require.Equal(t, requestCount*spansPerRequest, sink.SpanCount()) @@ -174,7 +193,7 @@ func testBatchProcessorSentBySize(t *testing.T, tel testTelemetry, useOtel bool) cfg := createDefaultConfig().(*Config) sendBatchSize := 20 cfg.SendBatchSize = uint32(sendBatchSize) - cfg.Timeout = 500 * time.Millisecond + cfg.Timeout = 5 * time.Second creationSet := tel.NewProcessorCreateSettings() creationSet.MetricsLevel = configtelemetry.LevelDetailed batcher, err := newBatchTracesProcessor(creationSet, sink, cfg, useOtel) @@ -186,12 +205,18 @@ func testBatchProcessorSentBySize(t *testing.T, tel testTelemetry, useOtel bool) start := time.Now() sizeSum := 0 + var wg sync.WaitGroup for requestNum := 0; requestNum < requestCount; requestNum++ { td := testdata.GenerateTraces(spansPerRequest) sizeSum += sizer.TracesSize(td) - assert.NoError(t, batcher.ConsumeTraces(context.Background(), td)) + wg.Add(1) + go func() { + assert.NoError(t, batcher.ConsumeTraces(context.Background(), td)) + wg.Done() + }() } + wg.Wait() require.NoError(t, batcher.Shutdown(context.Background())) elapsed := time.Since(start) @@ -230,7 +255,7 @@ func testBatchProcessorSentBySizeWithMaxSize(t *testing.T, tel testTelemetry, us sendBatchMaxSize := 37 cfg.SendBatchSize = uint32(sendBatchSize) cfg.SendBatchMaxSize = uint32(sendBatchMaxSize) - cfg.Timeout = 500 * time.Millisecond + cfg.Timeout = 5 * time.Second creationSet := tel.NewProcessorCreateSettings() creationSet.MetricsLevel = configtelemetry.LevelDetailed batcher, err := newBatchTracesProcessor(creationSet, sink, cfg, useOtel) @@ -241,17 +266,20 @@ func testBatchProcessorSentBySizeWithMaxSize(t *testing.T, tel testTelemetry, us spansPerRequest := 500 totalSpans := requestCount * spansPerRequest - start := time.Now() + var wg sync.WaitGroup for requestNum := 0; requestNum < requestCount; requestNum++ { td := testdata.GenerateTraces(spansPerRequest) - assert.NoError(t, batcher.ConsumeTraces(context.Background(), td)) + // this should be a noerr but need to separate triggerTimeout from triggerShutdown + wg.Add(1) + go func() { + assert.NoError(t, batcher.ConsumeTraces(context.Background(), td)) + wg.Done() + }() } + wg.Wait() require.NoError(t, batcher.Shutdown(context.Background())) - elapsed := time.Since(start) - require.LessOrEqual(t, elapsed.Nanoseconds(), cfg.Timeout.Nanoseconds()) - // The max batch size is not a divisor of the total number of spans expectedBatchesNum := int(math.Ceil(float64(totalSpans) / float64(sendBatchMaxSize))) @@ -263,7 +291,6 @@ func testBatchProcessorSentBySizeWithMaxSize(t *testing.T, tel testTelemetry, us sendCount: float64(expectedBatchesNum), sendSizeSum: float64(sink.SpanCount()), sizeTrigger: math.Floor(float64(totalSpans) / float64(sendBatchMaxSize)), - timeoutTrigger: 1, }) } @@ -276,31 +303,27 @@ func TestBatchProcessorSentByTimeout(t *testing.T) { requestCount := 5 spansPerRequest := 10 - start := time.Now() creationSet := processortest.NewNopCreateSettings() creationSet.MetricsLevel = configtelemetry.LevelDetailed - batcher, err := newBatchTracesProcessor(creationSet, sink, cfg, false) + batcher, err := newBatchTracesProcessor(creationSet, sink, cfg, true) require.NoError(t, err) require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) + var wg sync.WaitGroup + start := time.Now() for requestNum := 0; requestNum < requestCount; requestNum++ { td := testdata.GenerateTraces(spansPerRequest) - assert.NoError(t, batcher.ConsumeTraces(context.Background(), td)) - } - - // Wait for at least one batch to be sent. - for { - if sink.SpanCount() != 0 { - break - } - <-time.After(cfg.Timeout) + wg.Add(1) + go func() { + assert.NoError(t, batcher.ConsumeTraces(context.Background(), td)) + wg.Done() + }() } + wg.Wait() elapsed := time.Since(start) require.LessOrEqual(t, cfg.Timeout.Nanoseconds(), elapsed.Nanoseconds()) - - // This should not change the results in the sink, verified by the expectedBatchesNum require.NoError(t, batcher.Shutdown(context.Background())) expectedBatchesNum := 1 @@ -327,17 +350,23 @@ func TestBatchProcessorTraceSendWhenClosing(t *testing.T) { creationSet := processortest.NewNopCreateSettings() creationSet.MetricsLevel = configtelemetry.LevelDetailed - batcher, err := newBatchTracesProcessor(creationSet, sink, &cfg, false) + batcher, err := newBatchTracesProcessor(creationSet, sink, &cfg, true) require.NoError(t, err) require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) requestCount := 10 spansPerRequest := 10 + var wg sync.WaitGroup for requestNum := 0; requestNum < requestCount; requestNum++ { td := testdata.GenerateTraces(spansPerRequest) - assert.NoError(t, batcher.ConsumeTraces(context.Background(), td)) + wg.Add(1) + go func() { + assert.NoError(t, batcher.ConsumeTraces(context.Background(), td)) + wg.Done() + }() } + wg.Wait() require.NoError(t, batcher.Shutdown(context.Background())) require.Equal(t, requestCount*spansPerRequest, sink.SpanCount()) @@ -358,12 +387,13 @@ func TestBatchMetricProcessor_ReceivingData(t *testing.T) { creationSet := processortest.NewNopCreateSettings() creationSet.MetricsLevel = configtelemetry.LevelDetailed - batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg, false) + batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg, true) require.NoError(t, err) require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) sentResourceMetrics := pmetric.NewMetrics().ResourceMetrics() + var wg sync.WaitGroup for requestNum := 0; requestNum < requestCount; requestNum++ { md := testdata.GenerateMetrics(metricsPerRequest) metrics := md.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics() @@ -371,13 +401,22 @@ func TestBatchMetricProcessor_ReceivingData(t *testing.T) { metrics.At(metricIndex).SetName(getTestMetricName(requestNum, metricIndex)) } md.ResourceMetrics().At(0).CopyTo(sentResourceMetrics.AppendEmpty()) - assert.NoError(t, batcher.ConsumeMetrics(context.Background(), md)) + wg.Add(1) + go func() { + assert.NoError(t, batcher.ConsumeMetrics(context.Background(), md)) + wg.Done() + }() } // Added to test case with empty resources sent. md := pmetric.NewMetrics() - assert.NoError(t, batcher.ConsumeMetrics(context.Background(), md)) + wg.Add(1) + go func() { + assert.NoError(t, batcher.ConsumeMetrics(context.Background(), md)) + wg.Done() + }() + wg.Wait() require.NoError(t, batcher.Shutdown(context.Background())) require.Equal(t, requestCount*2*metricsPerRequest, sink.DataPointCount()) @@ -403,7 +442,7 @@ func testBatchMetricProcessorBatchSize(t *testing.T, tel testTelemetry, useOtel // Instantiate the batch processor with low config values to test data // gets sent through the processor. cfg := Config{ - Timeout: 100 * time.Millisecond, + Timeout: 2 * time.Second, SendBatchSize: 50, } @@ -419,18 +458,20 @@ func testBatchMetricProcessorBatchSize(t *testing.T, tel testTelemetry, useOtel require.NoError(t, err) require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) - start := time.Now() size := 0 + var wg sync.WaitGroup for requestNum := 0; requestNum < requestCount; requestNum++ { md := testdata.GenerateMetrics(metricsPerRequest) size += sizer.MetricsSize(md) - assert.NoError(t, batcher.ConsumeMetrics(context.Background(), md)) + wg.Add(1) + go func() { + assert.NoError(t, batcher.ConsumeMetrics(context.Background(), md)) + wg.Done() + }() } + wg.Wait() require.NoError(t, batcher.Shutdown(context.Background())) - elapsed := time.Since(start) - require.LessOrEqual(t, elapsed.Nanoseconds(), cfg.Timeout.Nanoseconds()) - expectedBatchesNum := requestCount * dataPointsPerRequest / int(cfg.SendBatchSize) expectedBatchingFactor := int(cfg.SendBatchSize) / dataPointsPerRequest @@ -464,7 +505,7 @@ func TestBatchMetrics_UnevenBatchMaxSize(t *testing.T) { batchMetrics.add(md) require.Equal(t, dataPointsPerMetric*metricsCount, batchMetrics.dataPointCount) - sent, _, sendErr := batchMetrics.export(ctx, sendBatchMaxSize, false) + sent, _, sendErr := batchMetrics.export(ctx, sendBatchMaxSize, true) require.NoError(t, sendErr) require.Equal(t, sendBatchMaxSize, sent) remainingDataPointCount := metricsCount*dataPointsPerMetric - sendBatchMaxSize @@ -482,28 +523,24 @@ func TestBatchMetricsProcessor_Timeout(t *testing.T) { creationSet := processortest.NewNopCreateSettings() creationSet.MetricsLevel = configtelemetry.LevelDetailed - batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg, false) + batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg, true) require.NoError(t, err) require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) + var wg sync.WaitGroup start := time.Now() for requestNum := 0; requestNum < requestCount; requestNum++ { md := testdata.GenerateMetrics(metricsPerRequest) - assert.NoError(t, batcher.ConsumeMetrics(context.Background(), md)) - } - - // Wait for at least one batch to be sent. - for { - if sink.DataPointCount() != 0 { - break - } - <-time.After(cfg.Timeout) + wg.Add(1) + go func() { + assert.NoError(t, batcher.ConsumeMetrics(context.Background(), md)) + wg.Done() + }() } + wg.Wait() elapsed := time.Since(start) require.LessOrEqual(t, cfg.Timeout.Nanoseconds(), elapsed.Nanoseconds()) - - // This should not change the results in the sink, verified by the expectedBatchesNum require.NoError(t, batcher.Shutdown(context.Background())) expectedBatchesNum := 1 @@ -531,15 +568,21 @@ func TestBatchMetricProcessor_Shutdown(t *testing.T) { creationSet := processortest.NewNopCreateSettings() creationSet.MetricsLevel = configtelemetry.LevelDetailed - batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg, false) + batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg, true) require.NoError(t, err) require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) + var wg sync.WaitGroup for requestNum := 0; requestNum < requestCount; requestNum++ { md := testdata.GenerateMetrics(metricsPerRequest) - assert.NoError(t, batcher.ConsumeMetrics(context.Background(), md)) + wg.Add(1) + go func() { + assert.NoError(t, batcher.ConsumeMetrics(context.Background(), md)) + wg.Done() + }() } + wg.Wait() require.NoError(t, batcher.Shutdown(context.Background())) require.Equal(t, requestCount*2*metricsPerRequest, sink.DataPointCount()) @@ -630,7 +673,7 @@ func runMetricsProcessorBenchmark(b *testing.B, cfg Config) { creationSet := processortest.NewNopCreateSettings() creationSet.MetricsLevel = configtelemetry.LevelDetailed metricsPerRequest := 1000 - batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg, false) + batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg, true) require.NoError(b, err) require.NoError(b, batcher.Start(ctx, componenttest.NewNopHost())) @@ -677,12 +720,13 @@ func TestBatchLogProcessor_ReceivingData(t *testing.T) { creationSet := processortest.NewNopCreateSettings() creationSet.MetricsLevel = configtelemetry.LevelDetailed - batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg, false) + batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg, true) require.NoError(t, err) require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) sentResourceLogs := plog.NewLogs().ResourceLogs() + var wg sync.WaitGroup for requestNum := 0; requestNum < requestCount; requestNum++ { ld := testdata.GenerateLogs(logsPerRequest) logs := ld.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords() @@ -690,13 +734,22 @@ func TestBatchLogProcessor_ReceivingData(t *testing.T) { logs.At(logIndex).SetSeverityText(getTestLogSeverityText(requestNum, logIndex)) } ld.ResourceLogs().At(0).CopyTo(sentResourceLogs.AppendEmpty()) - assert.NoError(t, batcher.ConsumeLogs(context.Background(), ld)) + wg.Add(1) + go func() { + assert.NoError(t, batcher.ConsumeLogs(context.Background(), ld)) + wg.Done() + }() } // Added to test case with empty resources sent. ld := plog.NewLogs() - assert.NoError(t, batcher.ConsumeLogs(context.Background(), ld)) + wg.Add(1) + go func() { + assert.NoError(t, batcher.ConsumeLogs(context.Background(), ld)) + wg.Done() + }() + wg.Wait() require.NoError(t, batcher.Shutdown(context.Background())) require.Equal(t, requestCount*logsPerRequest, sink.LogRecordCount()) @@ -722,7 +775,7 @@ func testBatchLogProcessorBatchSize(t *testing.T, tel testTelemetry, useOtel boo // Instantiate the batch processor with low config values to test data // gets sent through the processor. cfg := Config{ - Timeout: 100 * time.Millisecond, + Timeout: 2 * time.Second, SendBatchSize: 50, } @@ -736,18 +789,20 @@ func testBatchLogProcessorBatchSize(t *testing.T, tel testTelemetry, useOtel boo require.NoError(t, err) require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) - start := time.Now() size := 0 + var wg sync.WaitGroup for requestNum := 0; requestNum < requestCount; requestNum++ { ld := testdata.GenerateLogs(logsPerRequest) size += sizer.LogsSize(ld) - assert.NoError(t, batcher.ConsumeLogs(context.Background(), ld)) + wg.Add(1) + go func() { + assert.NoError(t, batcher.ConsumeLogs(context.Background(), ld)) + wg.Done() + }() } + wg.Wait() require.NoError(t, batcher.Shutdown(context.Background())) - elapsed := time.Since(start) - require.LessOrEqual(t, elapsed.Nanoseconds(), cfg.Timeout.Nanoseconds()) - expectedBatchesNum := requestCount * logsPerRequest / int(cfg.SendBatchSize) expectedBatchingFactor := int(cfg.SendBatchSize) / logsPerRequest @@ -771,7 +826,7 @@ func testBatchLogProcessorBatchSize(t *testing.T, tel testTelemetry, useOtel boo func TestBatchLogsProcessor_Timeout(t *testing.T) { cfg := Config{ - Timeout: 100 * time.Millisecond, + Timeout: 3 * time.Second, SendBatchSize: 100, } requestCount := 5 @@ -780,28 +835,24 @@ func TestBatchLogsProcessor_Timeout(t *testing.T) { creationSet := processortest.NewNopCreateSettings() creationSet.MetricsLevel = configtelemetry.LevelDetailed - batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg, false) + batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg, true) require.NoError(t, err) require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) + var wg sync.WaitGroup start := time.Now() for requestNum := 0; requestNum < requestCount; requestNum++ { ld := testdata.GenerateLogs(logsPerRequest) - assert.NoError(t, batcher.ConsumeLogs(context.Background(), ld)) - } - - // Wait for at least one batch to be sent. - for { - if sink.LogRecordCount() != 0 { - break - } - <-time.After(cfg.Timeout) + wg.Add(1) + go func() { + assert.NoError(t, batcher.ConsumeLogs(context.Background(), ld)) + wg.Done() + }() } + wg.Wait() elapsed := time.Since(start) require.LessOrEqual(t, cfg.Timeout.Nanoseconds(), elapsed.Nanoseconds()) - - // This should not change the results in the sink, verified by the expectedBatchesNum require.NoError(t, batcher.Shutdown(context.Background())) expectedBatchesNum := 1 @@ -829,15 +880,21 @@ func TestBatchLogProcessor_Shutdown(t *testing.T) { creationSet := processortest.NewNopCreateSettings() creationSet.MetricsLevel = configtelemetry.LevelDetailed - batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg, false) + batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg, true) require.NoError(t, err) require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) + var wg sync.WaitGroup for requestNum := 0; requestNum < requestCount; requestNum++ { ld := testdata.GenerateLogs(logsPerRequest) - assert.NoError(t, batcher.ConsumeLogs(context.Background(), ld)) + wg.Add(1) + go func() { + assert.NoError(t, batcher.ConsumeLogs(context.Background(), ld)) + wg.Done() + }() } + wg.Wait() require.NoError(t, batcher.Shutdown(context.Background())) require.Equal(t, requestCount*logsPerRequest, sink.LogRecordCount()) @@ -869,7 +926,38 @@ func logsReceivedBySeverityText(lds []plog.Logs) map[string]plog.LogRecord { func TestShutdown(t *testing.T) { factory := NewFactory() - processortest.VerifyShutdown(t, factory, factory.CreateDefaultConfig()) + verifyTracesDoesNotProduceAfterShutdown(t, factory, factory.CreateDefaultConfig()) +} +func verifyTracesDoesNotProduceAfterShutdown(t *testing.T, factory processor.Factory, cfg component.Config) { + // Create a proc and output its produce to a sink. + nextSink := new(consumertest.TracesSink) + proc, err := factory.CreateTracesProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, nextSink) + if err != nil { + if errors.Is(err, component.ErrDataTypeIsNotSupported) { + return + } + require.NoError(t, err) + } + assert.NoError(t, proc.Start(context.Background(), componenttest.NewNopHost())) + + // Send some traces to the proc. + const generatedCount = 10 + var wg sync.WaitGroup + for i := 0; i < generatedCount; i++ { + wg.Add(1) + go func() { + assert.NoError(t, proc.ConsumeTraces(context.Background(), testdata.GenerateTraces(1))) + wg.Done() + }() + } + + // Now shutdown the proc. + wg.Wait() + assert.NoError(t, proc.Shutdown(context.Background())) + + // The Shutdown() is done. It means the proc must have sent everything we + // gave it to the next sink. + assert.EqualValues(t, generatedCount, nextSink.SpanCount()) } type metadataTracesSink struct { @@ -904,11 +992,11 @@ func TestBatchProcessorSpansBatchedByMetadata(t *testing.T) { } cfg := createDefaultConfig().(*Config) cfg.SendBatchSize = 1000 - cfg.Timeout = 10 * time.Minute + cfg.Timeout = 1 * time.Second cfg.MetadataKeys = []string{"token1", "token2"} creationSet := processortest.NewNopCreateSettings() creationSet.MetricsLevel = configtelemetry.LevelDetailed - batcher, err := newBatchTracesProcessor(creationSet, sink, cfg, false) + batcher, err := newBatchTracesProcessor(creationSet, sink, cfg, true) require.NoError(t, err) require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) @@ -947,6 +1035,7 @@ func TestBatchProcessorSpansBatchedByMetadata(t *testing.T) { requestCount := 1000 spansPerRequest := 33 sentResourceSpans := ptrace.NewTraces().ResourceSpans() + var wg sync.WaitGroup for requestNum := 0; requestNum < requestCount; requestNum++ { td := testdata.GenerateTraces(spansPerRequest) spans := td.ResourceSpans().At(0).ScopeSpans().At(0).Spans() @@ -957,9 +1046,14 @@ func TestBatchProcessorSpansBatchedByMetadata(t *testing.T) { // use round-robin to assign context. num := requestNum % len(callCtxs) expectByContext[num] += spansPerRequest - assert.NoError(t, batcher.ConsumeTraces(callCtxs[num], td)) + wg.Add(1) + go func() { + assert.NoError(t, batcher.ConsumeTraces(callCtxs[num], td)) + wg.Done() + }() } + wg.Wait() require.NoError(t, batcher.Shutdown(context.Background())) // The following tests are the same as TestBatchProcessorSpansDelivered(). @@ -1000,12 +1094,14 @@ func TestBatchProcessorMetadataCardinalityLimit(t *testing.T) { cfg := createDefaultConfig().(*Config) cfg.MetadataKeys = []string{"token"} cfg.MetadataCardinalityLimit = cardLimit + cfg.Timeout = 1 * time.Second creationSet := processortest.NewNopCreateSettings() - batcher, err := newBatchTracesProcessor(creationSet, sink, cfg, false) + batcher, err := newBatchTracesProcessor(creationSet, sink, cfg, true) require.NoError(t, err) require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) bg := context.Background() + var wg sync.WaitGroup for requestNum := 0; requestNum < cardLimit; requestNum++ { td := testdata.GenerateTraces(1) ctx := client.NewContext(bg, client.Info{ @@ -1014,21 +1110,29 @@ func TestBatchProcessorMetadataCardinalityLimit(t *testing.T) { }), }) - assert.NoError(t, batcher.ConsumeTraces(ctx, td)) + wg.Add(1) + go func() { + assert.NoError(t, batcher.ConsumeTraces(ctx, td)) + wg.Done() + }() } - td := testdata.GenerateTraces(1) + wg.Wait() + td := testdata.GenerateTraces(2) ctx := client.NewContext(bg, client.Info{ Metadata: client.NewMetadata(map[string][]string{ "token": {"limit_exceeded"}, }), }) - err = batcher.ConsumeTraces(ctx, td) - assert.Error(t, err) - assert.True(t, consumererror.IsPermanent(err)) - assert.Contains(t, err.Error(), "too many") + wg.Add(1) + go func() { + err := batcher.ConsumeTraces(ctx, td) + assert.ErrorIs(t, err, errTooManyBatchers) + wg.Done() + }() + wg.Wait() require.NoError(t, batcher.Shutdown(context.Background())) } @@ -1044,7 +1148,7 @@ func TestBatchZeroConfig(t *testing.T) { sink := new(consumertest.LogsSink) creationSet := processortest.NewNopCreateSettings() creationSet.MetricsLevel = configtelemetry.LevelDetailed - batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg, false) + batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg, true) require.NoError(t, err) require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) @@ -1084,7 +1188,7 @@ func TestBatchSplitOnly(t *testing.T) { sink := new(consumertest.LogsSink) creationSet := processortest.NewNopCreateSettings() creationSet.MetricsLevel = configtelemetry.LevelDetailed - batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg, false) + batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg, true) require.NoError(t, err) require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) @@ -1104,4 +1208,4 @@ func TestBatchSplitOnly(t *testing.T) { for _, ld := range receivedMds { require.Equal(t, maxBatch, ld.LogRecordCount()) } -} \ No newline at end of file +} diff --git a/collector/processor/batchprocessor/config.go b/collector/processor/batchprocessor/config.go index d6112f98..6a2f8664 100644 --- a/collector/processor/batchprocessor/config.go +++ b/collector/processor/batchprocessor/config.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package batchprocessor // import "github.com/open-telemetry/otel-arrow/collector/processor/batchprocessor +package batchprocessor // import "github.com/open-telemetry/otel-arrow/collector/processor/batchprocessor" import ( "errors" diff --git a/collector/processor/batchprocessor/factory.go b/collector/processor/batchprocessor/factory.go index 716f47f6..eb98bf39 100644 --- a/collector/processor/batchprocessor/factory.go +++ b/collector/processor/batchprocessor/factory.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package batchprocessor // import "github.com/open-telemetry/otel-arrow/collector/processor/batchprocessor +package batchprocessor // import "github.com/open-telemetry/otel-arrow/collector/processor/batchprocessor" import ( "context" diff --git a/collector/processor/batchprocessor/metrics.go b/collector/processor/batchprocessor/metrics.go index c644d10f..10475b35 100644 --- a/collector/processor/batchprocessor/metrics.go +++ b/collector/processor/batchprocessor/metrics.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package batchprocessor // import "github.com/open-telemetry/otel-arrow/collector/processor/batchprocessor +package batchprocessor // import "github.com/open-telemetry/otel-arrow/collector/processor/batchprocessor" import ( "context" @@ -35,6 +35,7 @@ type trigger int const ( triggerTimeout trigger = iota triggerBatchSize + triggerShutdown ) func init() { diff --git a/collector/processor/batchprocessor/metrics_test.go b/collector/processor/batchprocessor/metrics_test.go index 089123ce..4a825543 100644 --- a/collector/processor/batchprocessor/metrics_test.go +++ b/collector/processor/batchprocessor/metrics_test.go @@ -64,10 +64,6 @@ type expectedMetrics struct { } func telemetryTest(t *testing.T, testFunc func(t *testing.T, tel testTelemetry, useOtel bool)) { - t.Run("WithOC", func(t *testing.T) { - testFunc(t, setupTelemetry(t, false), false) - }) - t.Run("WithOTel", func(t *testing.T) { testFunc(t, setupTelemetry(t, true), true) }) diff --git a/collector/processor/batchprocessor/splitlogs.go b/collector/processor/batchprocessor/splitlogs.go index 80b30889..ede243e9 100644 --- a/collector/processor/batchprocessor/splitlogs.go +++ b/collector/processor/batchprocessor/splitlogs.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package batchprocessor // import "github.com/open-telemetry/otel-arrow/collector/processor/batchprocessor +package batchprocessor // import "github.com/open-telemetry/otel-arrow/collector/processor/batchprocessor" import ( "go.opentelemetry.io/collector/pdata/plog" diff --git a/collector/processor/batchprocessor/splitmetrics.go b/collector/processor/batchprocessor/splitmetrics.go index bf654059..d9e7d509 100644 --- a/collector/processor/batchprocessor/splitmetrics.go +++ b/collector/processor/batchprocessor/splitmetrics.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package batchprocessor // import "github.com/open-telemetry/otel-arrow/collector/processor/batchprocessor +package batchprocessor // import "github.com/open-telemetry/otel-arrow/collector/processor/batchprocessor" import ( "go.opentelemetry.io/collector/pdata/pmetric" diff --git a/go.mod b/go.mod index 800d393b..9620fe16 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/open-telemetry/otel-arrow -go 1.20 +go 1.21 require ( github.com/HdrHistogram/hdrhistogram-go v1.1.2 diff --git a/go.sum b/go.sum index 87662115..c54a4235 100644 --- a/go.sum +++ b/go.sum @@ -3,6 +3,7 @@ github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym github.com/HdrHistogram/hdrhistogram-go v1.1.2 h1:5IcZpTvzydCQeHzK4Ef/D5rrSqwxob0t8PQPMybUNFM= github.com/HdrHistogram/hdrhistogram-go v1.1.2/go.mod h1:yDgFjdqOqDEKOvasDdhWNXYg9BVp4O+o5f6V/ehm6Oo= github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c h1:RGWPOewvKIROun94nF7v2cua9qP+thov/7M50KEoeSU= +github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c/go.mod h1:X0CRv0ky0k6m906ixxpzmDRLvX58TFUKS2eePweuyxk= github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw= github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs= github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= @@ -49,8 +50,10 @@ github.com/google/flatbuffers v2.0.8+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6 github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4= +github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes= @@ -63,6 +66,7 @@ github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQs github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -88,6 +92,7 @@ github.com/pierrec/lz4/v4 v4.1.17/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFu github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= @@ -147,6 +152,7 @@ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= +golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -194,6 +200,7 @@ google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqw gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=