Skip to content

Commit

Permalink
stream info: extendable response flag support (envoyproxy#31719)
Browse files Browse the repository at this point in the history
* stream info: extendable stream info support

Signed-off-by: wbpcode <[email protected]>

* fix building

Signed-off-by: wbpcode <[email protected]>

* added legacy response flags

Signed-off-by: wbpcode <[email protected]>

* minor update

Signed-off-by: wbpcode <[email protected]>

* fix unit test and compiling

Signed-off-by: wbpcode <[email protected]>

* add custom flag test

Signed-off-by: wbpcode <[email protected]>

* fix tests

Signed-off-by: wbpcode <[email protected]>

* minor update

Signed-off-by: wbpcode <[email protected]>

* address comments and refactor the flag underlying type

Signed-off-by: wbpcode <[email protected]>

* minor update

Signed-off-by: wbpcode <[email protected]>

* fix test

Signed-off-by: wbpcode <[email protected]>

* fix test

Signed-off-by: wbpcode <[email protected]>

---------

Signed-off-by: wbpcode <[email protected]>
  • Loading branch information
code authored Jan 30, 2024
1 parent c951d31 commit b34d122
Show file tree
Hide file tree
Showing 20 changed files with 415 additions and 140 deletions.
6 changes: 3 additions & 3 deletions contrib/generic_proxy/filters/network/source/stats.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ CodeOrFlags::CodeOrFlags(Server::Configuration::ServerFactoryContext& context)
code_stat_names_.push_back(pool_.add(std::to_string(i)));
}

for (const auto& flag : StreamInfo::ResponseFlagUtils::ALL_RESPONSE_STRINGS_FLAGS) {
for (const auto& flag : StreamInfo::ResponseFlagUtils::CORE_RESPONSE_FLAGS) {
flag_stat_names_.emplace(flag.second, pool_.add(flag.first.short_string_));
}

Expand All @@ -42,13 +42,13 @@ Stats::StatName CodeOrFlags::statNameFromFlag(StreamInfo::ResponseFlag flag) con

absl::InlinedVector<StreamInfo::ResponseFlag, 2>
getResponseFlags(const StreamInfo::StreamInfo& info) {
if (info.responseFlags() == 0) {
if (!info.hasAnyResponseFlag()) {
return {};
}

absl::InlinedVector<StreamInfo::ResponseFlag, 2> flags;

for (const auto& flag : StreamInfo::ResponseFlagUtils::ALL_RESPONSE_STRINGS_FLAGS) {
for (const auto& flag : StreamInfo::ResponseFlagUtils::CORE_RESPONSE_FLAGS) {
if (info.hasResponseFlag(flag.second)) {
flags.push_back(flag.second);
}
Expand Down
115 changes: 75 additions & 40 deletions envoy/stream_info/stream_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,67 +36,103 @@ using ClusterInfoConstSharedPtr = std::shared_ptr<const ClusterInfo>;

namespace StreamInfo {

enum ResponseFlag {
enum ResponseFlag : uint16_t {
// Local server healthcheck failed.
FailedLocalHealthCheck = 0x1,
FailedLocalHealthCheck,
// No healthy upstream.
NoHealthyUpstream = 0x2,
NoHealthyUpstream,
// Request timeout on upstream.
UpstreamRequestTimeout = 0x4,
UpstreamRequestTimeout,
// Local codec level reset was sent on the stream.
LocalReset = 0x8,
LocalReset,
// Remote codec level reset was received on the stream.
UpstreamRemoteReset = 0x10,
UpstreamRemoteReset,
// Local reset by a connection pool due to an initial connection failure.
UpstreamConnectionFailure = 0x20,
UpstreamConnectionFailure,
// If the stream was locally reset due to connection termination.
UpstreamConnectionTermination = 0x40,
UpstreamConnectionTermination,
// The stream was reset because of a resource overflow.
UpstreamOverflow = 0x80,
UpstreamOverflow,
// No route found for a given request.
NoRouteFound = 0x100,
NoRouteFound,
// Request was delayed before proxying.
DelayInjected = 0x200,
DelayInjected,
// Abort with error code was injected.
FaultInjected = 0x400,
FaultInjected,
// Request was ratelimited locally by rate limit filter.
RateLimited = 0x800,
RateLimited,
// Request was unauthorized by external authorization service.
UnauthorizedExternalService = 0x1000,
UnauthorizedExternalService,
// Unable to call Ratelimit service.
RateLimitServiceError = 0x2000,
RateLimitServiceError,
// If the stream was reset due to a downstream connection termination.
DownstreamConnectionTermination = 0x4000,
DownstreamConnectionTermination,
// Exceeded upstream retry limit.
UpstreamRetryLimitExceeded = 0x8000,
UpstreamRetryLimitExceeded,
// Request hit the stream idle timeout, triggering a 408.
StreamIdleTimeout = 0x10000,
StreamIdleTimeout,
// Request specified x-envoy-* header values that failed strict header checks.
InvalidEnvoyRequestHeaders = 0x20000,
InvalidEnvoyRequestHeaders,
// Downstream request had an HTTP protocol error
DownstreamProtocolError = 0x40000,
DownstreamProtocolError,
// Upstream request reached to user defined max stream duration.
UpstreamMaxStreamDurationReached = 0x80000,
UpstreamMaxStreamDurationReached,
// True if the response was served from an Envoy cache filter.
ResponseFromCacheFilter = 0x100000,
ResponseFromCacheFilter,
// Filter config was not received within the permitted warming deadline.
NoFilterConfigFound = 0x200000,
NoFilterConfigFound,
// Request or connection exceeded the downstream connection duration.
DurationTimeout = 0x400000,
DurationTimeout,
// Upstream response had an HTTP protocol error
UpstreamProtocolError = 0x800000,
UpstreamProtocolError,
// No cluster found for a given request.
NoClusterFound = 0x1000000,
NoClusterFound,
// Overload Manager terminated the stream.
OverloadManager = 0x2000000,
OverloadManager,
// DNS resolution failed.
DnsResolutionFailed = 0x4000000,
DnsResolutionFailed,
// Drop certain percentage of overloaded traffic.
DropOverLoad = 0x8000000,
DropOverLoad,
// ATTENTION: MAKE SURE THIS REMAINS EQUAL TO THE LAST FLAG.
LastFlag = DropOverLoad,
};

class ResponseFlagUtils;

// TODO(wbpcode): rename the ExtendedResponseFlag to ResponseFlag and legacy
// ResponseFlag to CoreResponseFlag.
class ExtendedResponseFlag {
public:
ExtendedResponseFlag() = default;

/**
* Construct a response flag from the core response flag enum. The integer
* value of the enum is used as the raw integer value of the flag.
* @param flag the core response flag enum.
*/
ExtendedResponseFlag(ResponseFlag flag) : raw_value_(flag) {}

/**
* Get the raw integer value of the flag.
* @return uint16_t the raw integer value.
*/
uint16_t value() const { return raw_value_; }

bool operator==(const ExtendedResponseFlag& other) const {
return raw_value_ == other.raw_value_;
}

private:
friend class ResponseFlagUtils;

// This private constructor is used to create extended response flags from
// uint16_t values. This can only be used by ResponseFlagUtils to ensure
// only validated values are used.
ExtendedResponseFlag(uint16_t value) : raw_value_(value) {}

uint16_t raw_value_{};
};

/**
* Constants for the response code details field of StreamInfo for details sent
* by core (non-extension) code.
Expand Down Expand Up @@ -596,7 +632,7 @@ class StreamInfo {
* @param response_flag the response flag. Each filter can set independent response flags. The
* flags are accumulated.
*/
virtual void setResponseFlag(ResponseFlag response_flag) PURE;
virtual void setResponseFlag(ExtendedResponseFlag response_flag) PURE;

/**
* @param code the HTTP response code to set for this request.
Expand All @@ -617,13 +653,6 @@ class StreamInfo {
virtual void
setConnectionTerminationDetails(absl::string_view connection_termination_details) PURE;

/**
* @param response_flags the response_flags to intersect with.
* @return true if the intersection of the response_flags argument and the currently set response
* flags is non-empty.
*/
virtual bool intersectResponseFlags(uint64_t response_flags) const PURE;

/**
* @return std::string& the name of the route. The name is get from the route() and it is
* empty if there is no route.
Expand Down Expand Up @@ -758,17 +787,23 @@ class StreamInfo {
/**
* @return whether response flag is set or not.
*/
virtual bool hasResponseFlag(ResponseFlag response_flag) const PURE;
virtual bool hasResponseFlag(ExtendedResponseFlag response_flag) const PURE;

/**
* @return whether any response flag is set or not.
*/
virtual bool hasAnyResponseFlag() const PURE;

/**
* @return response flags encoded as an integer.
* @return all response flags that are set.
*/
virtual absl::Span<const ExtendedResponseFlag> responseFlags() const PURE;

/**
* @return response flags encoded as an integer. Every bit of the integer is used to represent a
* flag. Only flags that are declared in the enum ResponseFlag type are supported.
*/
virtual uint64_t responseFlags() const PURE;
virtual uint64_t legacyResponseFlags() const PURE;

/**
* @return whether the request is a health check request or not.
Expand Down
2 changes: 1 addition & 1 deletion mobile/library/common/stream_info/extra_stream_info.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ void setFinalStreamIntel(StreamInfo& stream_info, TimeSource& time_source,
final_intel.sent_byte_count = stream_info.getUpstreamBytesMeter()->wireBytesSent();
final_intel.received_byte_count = stream_info.getUpstreamBytesMeter()->wireBytesReceived();
}
final_intel.response_flags = stream_info.responseFlags();
final_intel.response_flags = stream_info.legacyResponseFlags();
}

bool isStreamIdleTimeout(const StreamInfo& stream_info) {
Expand Down
26 changes: 20 additions & 6 deletions source/common/access_log/access_log_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -215,20 +215,34 @@ bool HeaderFilter::evaluate(const Formatter::HttpFormatterContext& context,
}

ResponseFlagFilter::ResponseFlagFilter(
const envoy::config::accesslog::v3::ResponseFlagFilter& config) {
const envoy::config::accesslog::v3::ResponseFlagFilter& config)
: has_configured_flags_(!config.flags().empty()) {

// Preallocate the vector to avoid frequent heap allocations.
configured_flags_.resize(StreamInfo::ResponseFlagUtils::responseFlagsVec().size(), false);
for (int i = 0; i < config.flags_size(); i++) {
absl::optional<StreamInfo::ResponseFlag> response_flag =
StreamInfo::ResponseFlagUtils::toResponseFlag(config.flags(i));
auto response_flag = StreamInfo::ResponseFlagUtils::toResponseFlag(config.flags(i));
// The config has been validated. Therefore, every flag in the config will have a mapping.
ASSERT(response_flag.has_value());
configured_flags_ |= response_flag.value();

// The vector is allocated with the size of the response flags vec. Therefore, the index
// should always be valid.
ASSERT(response_flag.value().value() < configured_flags_.size());

configured_flags_[response_flag.value().value()] = true;
}
}

bool ResponseFlagFilter::evaluate(const Formatter::HttpFormatterContext&,
const StreamInfo::StreamInfo& info) const {
if (configured_flags_ != 0) {
return info.intersectResponseFlags(configured_flags_);
if (has_configured_flags_) {
for (const auto flag : info.responseFlags()) {
ASSERT(flag.value() < configured_flags_.size());
if (configured_flags_[flag.value()]) {
return true;
}
}
return false;
}
return info.hasAnyResponseFlag();
}
Expand Down
3 changes: 2 additions & 1 deletion source/common/access_log/access_log_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ class ResponseFlagFilter : public Filter {
const StreamInfo::StreamInfo& info) const override;

private:
uint64_t configured_flags_{};
const bool has_configured_flags_{};
std::vector<bool> configured_flags_{};
};

/**
Expand Down
6 changes: 5 additions & 1 deletion source/common/stream_info/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ envoy_cc_library(
deps = [
":filter_state_lib",
":stream_id_provider_lib",
":utility_lib",
"//envoy/http:request_id_extension_interface",
"//envoy/stream_info:stream_info_interface",
"//source/common/common:assert_lib",
Expand All @@ -37,7 +38,10 @@ envoy_cc_library(
name = "utility_lib",
srcs = ["utility.cc"],
hdrs = ["utility.h"],
external_deps = ["abseil_optional"],
external_deps = [
"abseil_optional",
"abseil_node_hash_map",
],
deps = [
"//envoy/common:time_interface",
"//envoy/http:codes_interface",
Expand Down
34 changes: 26 additions & 8 deletions source/common/stream_info/stream_info_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "source/common/runtime/runtime_features.h"
#include "source/common/stream_info/filter_state_impl.h"
#include "source/common/stream_info/stream_id_provider_impl.h"
#include "source/common/stream_info/utility.h"

#include "absl/strings/str_replace.h"

Expand Down Expand Up @@ -231,17 +232,31 @@ struct StreamInfoImpl : public StreamInfo {

uint64_t bytesSent() const override { return bytes_sent_; }

void setResponseFlag(ResponseFlag response_flag) override { response_flags_ |= response_flag; }
void setResponseFlag(ExtendedResponseFlag flag) override {
ASSERT(flag.value() < ResponseFlagUtils::responseFlagsVec().size());
if (!hasResponseFlag(flag)) {
response_flags_.push_back(flag);
}
}

bool intersectResponseFlags(uint64_t response_flags) const override {
return (response_flags_ & response_flags) != 0;
bool hasResponseFlag(ExtendedResponseFlag flag) const override {
return std::find(response_flags_.begin(), response_flags_.end(), flag) != response_flags_.end();
}

bool hasResponseFlag(ResponseFlag flag) const override { return response_flags_ & flag; }
bool hasAnyResponseFlag() const override { return !response_flags_.empty(); }

bool hasAnyResponseFlag() const override { return response_flags_ != 0; }
absl::Span<const ExtendedResponseFlag> responseFlags() const override { return response_flags_; }

uint64_t responseFlags() const override { return response_flags_; }
uint64_t legacyResponseFlags() const override {
uint64_t legacy_flags = 0;
for (ExtendedResponseFlag flag : response_flags_) {
if (flag.value() <= static_cast<uint16_t>(ResponseFlag::LastFlag)) {
ASSERT(flag.value() < 64, "Legacy response flag out of range");
legacy_flags |= (1UL << flag.value());
}
}
return legacy_flags;
}

const std::string& getRouteName() const override {
return route_ != nullptr ? route_->routeName() : EMPTY_STRING;
Expand Down Expand Up @@ -374,7 +389,10 @@ struct StreamInfoImpl : public StreamInfo {
// derive final time from other info's complete duration and start time.
final_time_ = info.startTimeMonotonic() + info.requestComplete().value();
}
response_flags_ = info.responseFlags();
response_flags_.clear();
auto other_response_flags = info.responseFlags();
response_flags_.insert(response_flags_.end(), other_response_flags.begin(),
other_response_flags.end());
health_check_request_ = info.healthCheck();
route_ = info.route();
metadata_ = info.dynamicMetadata();
Expand Down Expand Up @@ -423,7 +441,7 @@ struct StreamInfoImpl : public StreamInfo {
public:
absl::optional<std::string> response_code_details_;
absl::optional<std::string> connection_termination_details_;
uint64_t response_flags_{};
absl::InlinedVector<ExtendedResponseFlag, 4> response_flags_{};
bool health_check_request_{};
Router::RouteConstSharedPtr route_;
envoy::config::core::v3::Metadata metadata_{};
Expand Down
Loading

0 comments on commit b34d122

Please sign in to comment.