Skip to content

Commit

Permalink
Opentelemetry stream fix (envoyproxy#31844)
Browse files Browse the repository at this point in the history
The OpenTelemetry exporter was previously using client-side streaming for exporting trace data. This appears to work fine with the reference OpenTelemetry collector, but some collectors (DataPrepper for one example) instead seem to expect unary RPC calls for trace exports (in accordance with the OpenTelemetry specification). It seems that Data Prepper appears to rely specifically on the end-stream flag arriving in order for it to properly process the trace exports.

Risk Level: medium
Testing: no new tests

Fixes envoyproxy#31497 

Signed-off-by: Ashish Banerjee <[email protected]>
  • Loading branch information
ashishb-solo authored Jan 17, 2024
1 parent 8dcb316 commit f520e8e
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 17 deletions.
3 changes: 3 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ minor_behavior_changes:

bug_fixes:
# *Changes expected to improve the state of the world and are unlikely to have negative effects*
- area: tracers
change: |
use unary RPC calls for OpenTelemetry trace exports, rather than client-side streaming connections.
removed_config_or_runtime:
# *Normally occurs at the end of the* :ref:`deprecation period <deprecated>`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class OpenTelemetryGrpcTraceExporterClient : Logger::Loggable<Logger::Id::tracin
if (stream_->stream_->isAboveWriteBufferHighWatermark()) {
return false;
}
stream_->stream_->sendMessage(request, false);
stream_->stream_->sendMessage(request, true);
} else {
stream_.reset();
}
Expand Down
32 changes: 16 additions & 16 deletions test/extensions/tracers/opentelemetry/grpc_trace_exporter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,24 @@ class OpenTelemetryGrpcTraceExporterTest : public testing::Test {
opentelemetry::proto::collector::trace::v1::ExportTraceServiceResponse>;

OpenTelemetryGrpcTraceExporterTest() : async_client_(new Grpc::MockAsyncClient) {
expectStreamStart();
expectTraceExportStart();
}

void expectStreamStart() {
void expectTraceExportStart() {
EXPECT_CALL(*async_client_, startRaw(_, _, _, _))
.WillOnce(
Invoke([this](absl::string_view, absl::string_view, Grpc::RawAsyncStreamCallbacks& cbs,
const Http::AsyncClient::StreamOptions&) {
this->callbacks_ = dynamic_cast<TraceCallbacks*>(&cbs);
return &this->stream_;
return &this->conn_;
}));
}

void expectStreamMessage(const std::string& expected_message_yaml) {
void expectTraceExportMessage(const std::string& expected_message_yaml) {
opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest expected_message;
TestUtility::loadFromYaml(expected_message_yaml, expected_message);
EXPECT_CALL(stream_, isAboveWriteBufferHighWatermark()).WillOnce(Return(false));
EXPECT_CALL(stream_, sendMessageRaw_(_, false))
EXPECT_CALL(conn_, isAboveWriteBufferHighWatermark()).WillOnce(Return(false));
EXPECT_CALL(conn_, sendMessageRaw_(_, true))
.WillOnce(Invoke([expected_message](Buffer::InstancePtr& request, bool) {
opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest message;
Buffer::ZeroCopyInputStreamImpl request_stream(std::move(request));
Expand All @@ -52,14 +52,14 @@ class OpenTelemetryGrpcTraceExporterTest : public testing::Test {

protected:
Grpc::MockAsyncClient* async_client_;
Grpc::MockAsyncStream stream_;
Grpc::MockAsyncStream conn_;
TraceCallbacks* callbacks_;
};

TEST_F(OpenTelemetryGrpcTraceExporterTest, CreateExporterAndExportSpan) {
OpenTelemetryGrpcTraceExporter exporter(Grpc::RawAsyncClientPtr{async_client_});

expectStreamMessage(R"EOF(
expectTraceExportMessage(R"EOF(
resource_spans:
scope_spans:
- spans:
Expand All @@ -75,8 +75,8 @@ TEST_F(OpenTelemetryGrpcTraceExporterTest, CreateExporterAndExportSpan) {
TEST_F(OpenTelemetryGrpcTraceExporterTest, NoExportWithHighWatermark) {
OpenTelemetryGrpcTraceExporter exporter(Grpc::RawAsyncClientPtr{async_client_});

EXPECT_CALL(stream_, isAboveWriteBufferHighWatermark()).WillOnce(Return(true));
EXPECT_CALL(stream_, sendMessageRaw_(_, false)).Times(0);
EXPECT_CALL(conn_, isAboveWriteBufferHighWatermark()).WillOnce(Return(true));
EXPECT_CALL(conn_, sendMessageRaw_(_, false)).Times(0);
opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest request;
opentelemetry::proto::trace::v1::Span span;
span.set_name("tests");
Expand All @@ -93,25 +93,25 @@ TEST_F(OpenTelemetryGrpcTraceExporterTest, ExportWithRemoteClose) {
- name: "test"
)EOF";

expectStreamMessage(request_yaml);
expectTraceExportMessage(request_yaml);
opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest request;
opentelemetry::proto::trace::v1::Span span;
span.set_name("test");
*request.add_resource_spans()->add_scope_spans()->add_spans() = span;
EXPECT_TRUE(exporter.log(request));

// Close the stream, now that we've created it.
// Terminate the request, now that we've created it.
callbacks_->onRemoteClose(Grpc::Status::Internal, "bad");

// Second call should make a new stream.
expectStreamStart();
expectStreamMessage(request_yaml);
// Second call should make a new request.
expectTraceExportStart();
expectTraceExportMessage(request_yaml);
EXPECT_TRUE(exporter.log(request));
}

TEST_F(OpenTelemetryGrpcTraceExporterTest, ExportWithNoopCallbacks) {
OpenTelemetryGrpcTraceExporter exporter(Grpc::RawAsyncClientPtr{async_client_});
expectStreamMessage(R"EOF(
expectTraceExportMessage(R"EOF(
resource_spans:
scope_spans:
- spans:
Expand Down

0 comments on commit f520e8e

Please sign in to comment.