Skip to content

Commit

Permalink
feat: add scrape state metrics (#1900)
Browse files Browse the repository at this point in the history
  • Loading branch information
catdogpandas authored Dec 16, 2024
1 parent 4251b49 commit 5f9ce93
Show file tree
Hide file tree
Showing 14 changed files with 215 additions and 15 deletions.
4 changes: 4 additions & 0 deletions core/common/http/AsynCurlRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ bool AsynCurlRunner::AddRequestToClient(unique_ptr<AsynHttpRequest>&& request) {

if (curl == nullptr) {
LOG_ERROR(sLogger, ("failed to send request", "failed to init curl handler")("request address", request.get()));
request->mResponse.SetNetworkStatus(CURLE_FAILED_INIT);
request->OnSendDone(request->mResponse);
return false;
}
Expand All @@ -106,6 +107,7 @@ bool AsynCurlRunner::AddRequestToClient(unique_ptr<AsynHttpRequest>&& request) {
LOG_ERROR(sLogger,
("failed to send request", "failed to add the easy curl handle to multi_handle")(
"errMsg", curl_multi_strerror(res))("request address", request.get()));
request->mResponse.SetNetworkStatus(CURLE_FAILED_INIT);
request->OnSendDone(request->mResponse);
curl_easy_cleanup(curl);
return false;
Expand Down Expand Up @@ -190,6 +192,7 @@ void AsynCurlRunner::HandleCompletedRequests(int& runningHandlers) {
case CURLE_OK: {
long statusCode = 0;
curl_easy_getinfo(handler, CURLINFO_RESPONSE_CODE, &statusCode);
request->mResponse.SetNetworkStatus(CURLE_OK);
request->mResponse.SetStatusCode(statusCode);
request->OnSendDone(request->mResponse);
LOG_DEBUG(sLogger,
Expand All @@ -214,6 +217,7 @@ void AsynCurlRunner::HandleCompletedRequests(int& runningHandlers) {
++runningHandlers;
requestReused = true;
} else {
request->mResponse.SetNetworkStatus(msg->data.result);
request->OnSendDone(request->mResponse);
LOG_DEBUG(
sLogger,
Expand Down
74 changes: 74 additions & 0 deletions core/common/http/HttpResponse.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

#pragma once

#include <curl/curl.h>

#include <cstdint>
#include <functional>
#include <map>
Expand All @@ -28,6 +30,24 @@ namespace logtail {

struct CurlTLS;

enum NetworkCode {
Ok = 0,
ConnectionFailed,
RemoteAccessDenied,
SSLConnectError,
SSLCertError,
SSLOtherProblem,
SendDataFailed,
RecvDataFailed,
Timeout,
Other
};

struct NetworkStatus {
NetworkCode mCode = NetworkCode::Ok;
std::string mMessage;
};

bool caseInsensitiveComp(const char lhs, const char rhs);

bool compareHeader(const std::string& lhs, const std::string& rhs);
Expand Down Expand Up @@ -76,8 +96,62 @@ class HttpResponse {

void SetStatusCode(int32_t code) { mStatusCode = code; }

void SetNetworkStatus(CURLcode code) {
mNetworkStatus.mMessage = curl_easy_strerror(code);
// please refer to https://curl.se/libcurl/c/libcurl-errors.html
switch (code) {
case CURLE_OK:
mNetworkStatus.mCode = NetworkCode::Ok;
break;
case CURLE_COULDNT_CONNECT:
mNetworkStatus.mCode = NetworkCode::ConnectionFailed;
break;
case CURLE_LOGIN_DENIED:
case CURLE_REMOTE_ACCESS_DENIED:
mNetworkStatus.mCode = NetworkCode::RemoteAccessDenied;
break;
case CURLE_OPERATION_TIMEDOUT:
mNetworkStatus.mCode = NetworkCode::Timeout;
break;
case CURLE_SSL_CONNECT_ERROR:
mNetworkStatus.mCode = NetworkCode::SSLConnectError;
break;
case CURLE_SSL_CERTPROBLEM:
case CURLE_SSL_CACERT:
mNetworkStatus.mCode = NetworkCode::SSLCertError;
break;
case CURLE_SEND_ERROR:
case CURLE_SEND_FAIL_REWIND:
mNetworkStatus.mCode = NetworkCode::SendDataFailed;
break;
case CURLE_RECV_ERROR:
mNetworkStatus.mCode = NetworkCode::RecvDataFailed;
break;
case CURLE_SSL_PINNEDPUBKEYNOTMATCH:
case CURLE_SSL_INVALIDCERTSTATUS:
case CURLE_SSL_CACERT_BADFILE:
case CURLE_SSL_CIPHER:
case CURLE_SSL_ENGINE_NOTFOUND:
case CURLE_SSL_ENGINE_SETFAILED:
case CURLE_USE_SSL_FAILED:
case CURLE_SSL_ENGINE_INITFAILED:
case CURLE_SSL_CRL_BADFILE:
case CURLE_SSL_ISSUER_ERROR:
case CURLE_SSL_SHUTDOWN_FAILED:
mNetworkStatus.mCode = NetworkCode::SSLOtherProblem;
break;
case CURLE_FAILED_INIT:
default:
mNetworkStatus.mCode = NetworkCode::Other;
break;
}
}

const NetworkStatus& GetNetworkStatus() { return mNetworkStatus; }

private:
int32_t mStatusCode = 0; // 0 means no response from server
NetworkStatus mNetworkStatus; // 0 means no error
std::map<std::string, std::string, decltype(compareHeader)*> mHeader;
std::unique_ptr<void, std::function<void(void*)>> mBody;
size_t (*mWriteCallback)(char*, size_t, size_t, void*) = nullptr;
Expand Down
1 change: 1 addition & 0 deletions core/models/PipelineEventGroup.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ enum class EventGroupMetaKey {
CONTAINER_IMAGE_NAME,
CONTAINER_IMAGE_ID,

PROMETHEUS_SCRAPE_STATE,
PROMETHEUS_SCRAPE_DURATION,
PROMETHEUS_SCRAPE_RESPONSE_SIZE,
PROMETHEUS_SAMPLES_SCRAPED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
#include "prometheus/Constants.h"

using namespace std;

DECLARE_FLAG_STRING(_pod_name_);

namespace logtail {

const string ProcessorPromRelabelMetricNative::sName = "processor_prom_relabel_metric_native";
Expand Down Expand Up @@ -192,6 +194,13 @@ void ProcessorPromRelabelMetricNative::AddAutoMetrics(PipelineEventGroup& metric
// up metric must be the last one
bool upState = StringTo<bool>(metricGroup.GetMetadata(EventGroupMetaKey::PROMETHEUS_UP_STATE).to_string());

if (metricGroup.HasMetadata(EventGroupMetaKey::PROMETHEUS_SCRAPE_STATE)) {
auto scrapeState = metricGroup.GetMetadata(EventGroupMetaKey::PROMETHEUS_SCRAPE_STATE);
AddMetric(metricGroup, prometheus::SCRAPE_STATE, 1.0 * upState, timestamp, nanoSec, targetTags);
auto& last = metricGroup.MutableEvents()[metricGroup.GetEvents().size() - 1];
last.Cast<MetricEvent>().SetTag(METRIC_LABEL_KEY_STATUS, scrapeState);
}

AddMetric(metricGroup, prometheus::UP, 1.0 * upState, timestamp, nanoSec, targetTags);
}

Expand Down
1 change: 1 addition & 0 deletions core/prometheus/Constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ const char* const PARAM_LABEL_NAME = "__param_";
const char* const LABELS = "labels";

// auto metrics
const char* const SCRAPE_STATE = "scrape_state";
const char* const SCRAPE_DURATION_SECONDS = "scrape_duration_seconds";
const char* const SCRAPE_RESPONSE_SIZE_BYTES = "scrape_response_size_bytes";
const char* const SCRAPE_SAMPLES_LIMIT = "scrape_samples_limit";
Expand Down
34 changes: 34 additions & 0 deletions core/prometheus/Utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <iomanip>

#include "common/StringTools.h"
#include "http/HttpResponse.h"
#include "models/StringView.h"

using namespace std;
Expand Down Expand Up @@ -148,4 +149,37 @@ uint64_t GetRandSleepMilliSec(const std::string& key, uint64_t intervalSeconds,
randSleep -= sleepOffset;
return randSleep;
}

namespace prom {

std::string NetworkCodeToState(NetworkCode code) {
static map<uint64_t, string> sNetworkCodeMap = {{NetworkCode::Ok, "OK"},
{NetworkCode::ConnectionFailed, "ERR_CONN_FAILED"},
{NetworkCode::RemoteAccessDenied, "ERR_ACCESS_DENIED"},
{NetworkCode::Timeout, "ERR_TIMEOUT"},
{NetworkCode::SSLConnectError, "ERR_SSL_CONN_ERR"},
{NetworkCode::SSLCertError, "ERR_SSL_CERT_ERR"},
{NetworkCode::SSLOtherProblem, "ERR_SSL_OTHER_PROBLEM"},
{NetworkCode::SendDataFailed, "ERR_SEND_DATA_FAILED"},
{NetworkCode::RecvDataFailed, "ERR_RECV_DATA_FAILED"},
{NetworkCode::Other, "ERR_UNKNOWN"}};
static string sCurlOther = "ERR_UNKNOWN";
if (sNetworkCodeMap.count(code)) {
return sNetworkCodeMap[code];
}
return sCurlOther;
}

std::string HttpCodeToState(uint64_t code) {
if (code > 1000) {
return "ERR_HTTP_UNKNOWN";
}
if (code == 200) {
return "OK";
}
string statePrefix = "ERR_HTTP_";
return statePrefix + ToString(code);
}

} // namespace prom
} // namespace logtail
7 changes: 7 additions & 0 deletions core/prometheus/Utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#include <string>

#include "common/http/HttpResponse.h"
#include "models/StringView.h"

namespace logtail {
Expand All @@ -19,4 +20,10 @@ void SplitStringView(const std::string& s, char delimiter, std::vector<StringVie
bool IsNumber(const std::string& str);

uint64_t GetRandSleepMilliSec(const std::string& key, uint64_t intervalSeconds, uint64_t currentMilliSeconds);

namespace prom {
std::string NetworkCodeToState(NetworkCode code);
std::string HttpCodeToState(uint64_t code);
}

} // namespace logtail
31 changes: 19 additions & 12 deletions core/prometheus/schedulers/ScrapeScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,18 +99,25 @@ void ScrapeScheduler::OnMetricResult(HttpResponse& response, uint64_t) {
mSelfMonitor->AddCounter(METRIC_PLUGIN_OUT_SIZE_BYTES, response.GetStatusCode(), responseBody.mRawSize);
mSelfMonitor->AddCounter(METRIC_PLUGIN_PROM_SCRAPE_TIME_MS, response.GetStatusCode(), scrapeDurationMilliSeconds);

const auto& networkStatus = response.GetNetworkStatus();
if (networkStatus.mCode != NetworkCode::Ok) {
// not 0 means curl error
mScrapeState = prom::NetworkCodeToState(networkStatus.mCode);
} else if (response.GetStatusCode() != 200) {
mScrapeState = prom::HttpCodeToState(response.GetStatusCode());
} else {
// 0 means success
mScrapeState = prom::NetworkCodeToState(NetworkCode::Ok);
}

mScrapeDurationSeconds = scrapeDurationMilliSeconds * sRate;
mScrapeResponseSizeBytes = responseBody.mRawSize;
mUpState = response.GetStatusCode() == 200;
if (response.GetStatusCode() != 200) {
mScrapeResponseSizeBytes = 0;
string headerStr;
for (const auto& [k, v] : mScrapeConfigPtr->mRequestHeaders) {
headerStr.append(k).append(":").append(v).append(";");
}
LOG_WARNING(
sLogger,
("scrape failed, status code", response.GetStatusCode())("target", mHash)("http header", headerStr));
LOG_WARNING(sLogger,
("scrape failed, status code",
response.GetStatusCode())("target", mHash)("curl msg", response.GetNetworkStatus().mMessage));
}
auto& eventGroup = responseBody.mEventGroup;

Expand All @@ -121,6 +128,7 @@ void ScrapeScheduler::OnMetricResult(HttpResponse& response, uint64_t) {
}

void ScrapeScheduler::SetAutoMetricMeta(PipelineEventGroup& eGroup) {
eGroup.SetMetadata(EventGroupMetaKey::PROMETHEUS_SCRAPE_STATE, mScrapeState);
eGroup.SetMetadata(EventGroupMetaKey::PROMETHEUS_SCRAPE_TIMESTAMP_MILLISEC, ToString(mScrapeTimestampMilliSec));
eGroup.SetMetadata(EventGroupMetaKey::PROMETHEUS_SCRAPE_DURATION, ToString(mScrapeDurationSeconds));
eGroup.SetMetadata(EventGroupMetaKey::PROMETHEUS_SCRAPE_RESPONSE_SIZE, ToString(mScrapeResponseSizeBytes));
Expand Down Expand Up @@ -244,11 +252,10 @@ void ScrapeScheduler::InitSelfMonitor(const MetricLabels& defaultLabels) {
MetricLabels labels = defaultLabels;
labels.emplace_back(METRIC_LABEL_KEY_INSTANCE, mInstance);

static const std::unordered_map<std::string, MetricType> sScrapeMetricKeys = {
{METRIC_PLUGIN_OUT_EVENTS_TOTAL, MetricType::METRIC_TYPE_COUNTER},
{METRIC_PLUGIN_OUT_SIZE_BYTES, MetricType::METRIC_TYPE_COUNTER},
{METRIC_PLUGIN_PROM_SCRAPE_TIME_MS, MetricType::METRIC_TYPE_COUNTER},
};
static const std::unordered_map<std::string, MetricType> sScrapeMetricKeys
= {{METRIC_PLUGIN_OUT_EVENTS_TOTAL, MetricType::METRIC_TYPE_COUNTER},
{METRIC_PLUGIN_OUT_SIZE_BYTES, MetricType::METRIC_TYPE_COUNTER},
{METRIC_PLUGIN_PROM_SCRAPE_TIME_MS, MetricType::METRIC_TYPE_COUNTER}};

mSelfMonitor->InitMetricManager(sScrapeMetricKeys, labels);

Expand Down
1 change: 1 addition & 0 deletions core/prometheus/schedulers/ScrapeScheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ class ScrapeScheduler : public BaseScheduler {
size_t mInputIndex;

// auto metrics
std::string mScrapeState;
uint64_t mScrapeTimestampMilliSec = 0;
double mScrapeDurationSeconds = 0;
uint64_t mScrapeResponseSizeBytes = 0;
Expand Down
4 changes: 4 additions & 0 deletions core/unittest/common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ target_link_libraries(timer_unittest ${UT_BASE_TARGET})
add_executable(curl_unittest http/CurlUnittest.cpp)
target_link_libraries(curl_unittest ${UT_BASE_TARGET})

add_executable(http_response_unittest http/HttpResponseUnittest.cpp)
target_link_libraries(http_response_unittest ${UT_BASE_TARGET})

include(GoogleTest)
gtest_discover_tests(common_simple_utils_unittest)
gtest_discover_tests(common_logfileoperator_unittest)
Expand All @@ -66,4 +69,5 @@ gtest_discover_tests(safe_queue_unittest)
gtest_discover_tests(http_request_timer_event_unittest)
gtest_discover_tests(timer_unittest)
gtest_discover_tests(curl_unittest)
gtest_discover_tests(http_response_unittest)

27 changes: 27 additions & 0 deletions core/unittest/common/http/HttpResponseUnittest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@

#include "common/http/HttpResponse.h"
#include "unittest/Unittest.h"

using namespace std;

namespace logtail {
class HttpResponseUnittest : public ::testing::Test {
public:
void TestNetworkStatus();
};

void HttpResponseUnittest::TestNetworkStatus() {
HttpResponse resp;
resp.SetNetworkStatus(CURLE_OK);
APSARA_TEST_EQUAL(resp.GetNetworkStatus().mCode, NetworkCode::Ok);

resp.SetNetworkStatus(CURLE_RECV_ERROR);
APSARA_TEST_EQUAL(resp.GetNetworkStatus().mCode, NetworkCode::RecvDataFailed);

resp.SetNetworkStatus(CURLE_FAILED_INIT);
APSARA_TEST_EQUAL(resp.GetNetworkStatus().mCode, NetworkCode::Other);
}

UNIT_TEST_CASE(HttpResponseUnittest, TestNetworkStatus);
} // namespace logtail
UNIT_TEST_MAIN
Original file line number Diff line number Diff line change
Expand Up @@ -204,20 +204,25 @@ test_metric8{k1="v1", k3="v2", } 9.9410452992e+10 1715829785083
eventGroup.SetMetadata(EventGroupMetaKey::PROMETHEUS_SCRAPE_DURATION, ToString(1.5));
eventGroup.SetMetadata(EventGroupMetaKey::PROMETHEUS_SCRAPE_RESPONSE_SIZE, ToString(2325));
eventGroup.SetMetadata(EventGroupMetaKey::PROMETHEUS_UP_STATE, ToString(true));
eventGroup.SetMetadata(EventGroupMetaKey::PROMETHEUS_SCRAPE_STATE, string("OK"));
eventGroup.SetTag(string("instance"), "localhost:8080");
eventGroup.SetTag(string("job"), "test_job");
processor.AddAutoMetrics(eventGroup);

APSARA_TEST_EQUAL((size_t)15, eventGroup.GetEvents().size());
APSARA_TEST_EQUAL((size_t)16, eventGroup.GetEvents().size());
APSARA_TEST_EQUAL(1.5, eventGroup.GetEvents().at(8).Cast<MetricEvent>().GetValue<UntypedSingleValue>()->mValue);
APSARA_TEST_EQUAL(2325, eventGroup.GetEvents().at(9).Cast<MetricEvent>().GetValue<UntypedSingleValue>()->mValue);
APSARA_TEST_EQUAL(1000, eventGroup.GetEvents().at(10).Cast<MetricEvent>().GetValue<UntypedSingleValue>()->mValue);
APSARA_TEST_EQUAL(8, eventGroup.GetEvents().at(11).Cast<MetricEvent>().GetValue<UntypedSingleValue>()->mValue);
APSARA_TEST_EQUAL(8, eventGroup.GetEvents().at(12).Cast<MetricEvent>().GetValue<UntypedSingleValue>()->mValue);
APSARA_TEST_EQUAL(15, eventGroup.GetEvents().at(13).Cast<MetricEvent>().GetValue<UntypedSingleValue>()->mValue);
// scrape_state
APSARA_TEST_EQUAL(1, eventGroup.GetEvents().at(14).Cast<MetricEvent>().GetValue<UntypedSingleValue>()->mValue);
APSARA_TEST_EQUAL("localhost:8080", eventGroup.GetEvents().at(14).Cast<MetricEvent>().GetTag("instance"));
APSARA_TEST_EQUAL("test_job", eventGroup.GetEvents().at(14).Cast<MetricEvent>().GetTag("job"));
APSARA_TEST_EQUAL("OK", eventGroup.GetEvents().at(14).Cast<MetricEvent>().GetTag("status"));
// up
APSARA_TEST_EQUAL(1, eventGroup.GetEvents().at(15).Cast<MetricEvent>().GetValue<UntypedSingleValue>()->mValue);
APSARA_TEST_EQUAL("localhost:8080", eventGroup.GetEvents().at(15).Cast<MetricEvent>().GetTag("instance"));
APSARA_TEST_EQUAL("test_job", eventGroup.GetEvents().at(15).Cast<MetricEvent>().GetTag("job"));
}

void ProcessorPromRelabelMetricNativeUnittest::TestHonorLabels() {
Expand Down
10 changes: 10 additions & 0 deletions core/unittest/prometheus/ScrapeSchedulerUnittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,22 @@ void ScrapeSchedulerUnittest::TestProcess() {
// if status code is not 200, no data will be processed
// but will continue running, sending self-monitoring metrics
httpResponse.SetStatusCode(503);
httpResponse.SetNetworkStatus(CURLE_OK);
event.OnMetricResult(httpResponse, 0);
APSARA_TEST_EQUAL(1UL, event.mItem.size());
event.mItem.clear();

httpResponse.GetBody<PromMetricResponseBody>()->mEventGroup = PipelineEventGroup(std::make_shared<SourceBuffer>());
httpResponse.SetStatusCode(503);
httpResponse.SetNetworkStatus(CURLE_COULDNT_CONNECT);
event.OnMetricResult(httpResponse, 0);
APSARA_TEST_EQUAL(event.mScrapeState, "ERR_CONN_FAILED");
APSARA_TEST_EQUAL(1UL, event.mItem.size());
event.mItem.clear();

httpResponse.GetBody<PromMetricResponseBody>()->mEventGroup = PipelineEventGroup(std::make_shared<SourceBuffer>());
httpResponse.SetStatusCode(200);
httpResponse.SetNetworkStatus(CURLE_OK);
string body1 = "# HELP go_gc_duration_seconds A summary of the pause duration of garbage collection cycles.\n"
"# TYPE go_gc_duration_seconds summary\n"
"go_gc_duration_seconds{quantile=\"0\"} 1.5531e-05\n"
Expand Down
Loading

0 comments on commit 5f9ce93

Please sign in to comment.