diff --git a/changelogs/current.yaml b/changelogs/current.yaml index 9ecf0d6e48ce..ba316d2be54e 100644 --- a/changelogs/current.yaml +++ b/changelogs/current.yaml @@ -8,6 +8,9 @@ minor_behavior_changes: bug_fixes: # *Changes expected to improve the state of the world and are unlikely to have negative effects* +- area: tracers + change: | + use unary RPC calls for OpenTelemetry trace exports, rather than client-side streaming connections. removed_config_or_runtime: # *Normally occurs at the end of the* :ref:`deprecation period ` diff --git a/source/extensions/tracers/opentelemetry/grpc_trace_exporter.h b/source/extensions/tracers/opentelemetry/grpc_trace_exporter.h index ecfc4baaa27e..b35823b851ba 100644 --- a/source/extensions/tracers/opentelemetry/grpc_trace_exporter.h +++ b/source/extensions/tracers/opentelemetry/grpc_trace_exporter.h @@ -69,7 +69,7 @@ class OpenTelemetryGrpcTraceExporterClient : Logger::Loggablestream_->isAboveWriteBufferHighWatermark()) { return false; } - stream_->stream_->sendMessage(request, false); + stream_->stream_->sendMessage(request, true); } else { stream_.reset(); } diff --git a/test/extensions/tracers/opentelemetry/grpc_trace_exporter_test.cc b/test/extensions/tracers/opentelemetry/grpc_trace_exporter_test.cc index e4da244ae3c6..84db740d5ebf 100644 --- a/test/extensions/tracers/opentelemetry/grpc_trace_exporter_test.cc +++ b/test/extensions/tracers/opentelemetry/grpc_trace_exporter_test.cc @@ -24,24 +24,24 @@ class OpenTelemetryGrpcTraceExporterTest : public testing::Test { opentelemetry::proto::collector::trace::v1::ExportTraceServiceResponse>; OpenTelemetryGrpcTraceExporterTest() : async_client_(new Grpc::MockAsyncClient) { - expectStreamStart(); + expectTraceExportStart(); } - void expectStreamStart() { + void expectTraceExportStart() { EXPECT_CALL(*async_client_, startRaw(_, _, _, _)) .WillOnce( Invoke([this](absl::string_view, absl::string_view, Grpc::RawAsyncStreamCallbacks& cbs, const Http::AsyncClient::StreamOptions&) { this->callbacks_ = dynamic_cast(&cbs); - return &this->stream_; + return &this->conn_; })); } - void expectStreamMessage(const std::string& expected_message_yaml) { + void expectTraceExportMessage(const std::string& expected_message_yaml) { opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest expected_message; TestUtility::loadFromYaml(expected_message_yaml, expected_message); - EXPECT_CALL(stream_, isAboveWriteBufferHighWatermark()).WillOnce(Return(false)); - EXPECT_CALL(stream_, sendMessageRaw_(_, false)) + EXPECT_CALL(conn_, isAboveWriteBufferHighWatermark()).WillOnce(Return(false)); + EXPECT_CALL(conn_, sendMessageRaw_(_, true)) .WillOnce(Invoke([expected_message](Buffer::InstancePtr& request, bool) { opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest message; Buffer::ZeroCopyInputStreamImpl request_stream(std::move(request)); @@ -52,14 +52,14 @@ class OpenTelemetryGrpcTraceExporterTest : public testing::Test { protected: Grpc::MockAsyncClient* async_client_; - Grpc::MockAsyncStream stream_; + Grpc::MockAsyncStream conn_; TraceCallbacks* callbacks_; }; TEST_F(OpenTelemetryGrpcTraceExporterTest, CreateExporterAndExportSpan) { OpenTelemetryGrpcTraceExporter exporter(Grpc::RawAsyncClientPtr{async_client_}); - expectStreamMessage(R"EOF( + expectTraceExportMessage(R"EOF( resource_spans: scope_spans: - spans: @@ -75,8 +75,8 @@ TEST_F(OpenTelemetryGrpcTraceExporterTest, CreateExporterAndExportSpan) { TEST_F(OpenTelemetryGrpcTraceExporterTest, NoExportWithHighWatermark) { OpenTelemetryGrpcTraceExporter exporter(Grpc::RawAsyncClientPtr{async_client_}); - EXPECT_CALL(stream_, isAboveWriteBufferHighWatermark()).WillOnce(Return(true)); - EXPECT_CALL(stream_, sendMessageRaw_(_, false)).Times(0); + EXPECT_CALL(conn_, isAboveWriteBufferHighWatermark()).WillOnce(Return(true)); + EXPECT_CALL(conn_, sendMessageRaw_(_, false)).Times(0); opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest request; opentelemetry::proto::trace::v1::Span span; span.set_name("tests"); @@ -93,25 +93,25 @@ TEST_F(OpenTelemetryGrpcTraceExporterTest, ExportWithRemoteClose) { - name: "test" )EOF"; - expectStreamMessage(request_yaml); + expectTraceExportMessage(request_yaml); opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest request; opentelemetry::proto::trace::v1::Span span; span.set_name("test"); *request.add_resource_spans()->add_scope_spans()->add_spans() = span; EXPECT_TRUE(exporter.log(request)); - // Close the stream, now that we've created it. + // Terminate the request, now that we've created it. callbacks_->onRemoteClose(Grpc::Status::Internal, "bad"); - // Second call should make a new stream. - expectStreamStart(); - expectStreamMessage(request_yaml); + // Second call should make a new request. + expectTraceExportStart(); + expectTraceExportMessage(request_yaml); EXPECT_TRUE(exporter.log(request)); } TEST_F(OpenTelemetryGrpcTraceExporterTest, ExportWithNoopCallbacks) { OpenTelemetryGrpcTraceExporter exporter(Grpc::RawAsyncClientPtr{async_client_}); - expectStreamMessage(R"EOF( + expectTraceExportMessage(R"EOF( resource_spans: scope_spans: - spans: