Skip to content

Commit

Permalink
feat: supports subpath routing (alibaba#2026)
Browse files Browse the repository at this point in the history
* support subpath

* clang format && add unittest

* clang format
  • Loading branch information
KayzzzZ authored Jan 16, 2025
1 parent a605b1c commit 0885bb8
Show file tree
Hide file tree
Showing 11 changed files with 500 additions and 80 deletions.
76 changes: 52 additions & 24 deletions core/plugin/flusher/sls/DiskBufferWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -770,6 +770,7 @@ bool DiskBufferWriter::SendToBufferFile(SenderQueueItem* dataPtr) {
bufferMeta.set_shardhashkey(data->mShardHashKey);
bufferMeta.set_compresstype(ConvertCompressType(flusher->GetCompressType()));
bufferMeta.set_telemetrytype(flusher->mTelemetryType);
bufferMeta.set_subpath(flusher->GetSubpath());
#ifdef __ENTERPRISE__
bufferMeta.set_endpointmode(GetEndpointMode(flusher->mEndpointMode));
#endif
Expand Down Expand Up @@ -866,30 +867,57 @@ SLSResponse DiskBufferWriter::SendBufferFileData(const sls_logs::LogtailBufferMe
} else {
dataType = RawDataType::EVENT_GROUP;
}
if (bufferMeta.has_telemetrytype() && bufferMeta.telemetrytype() == sls_logs::SLS_TELEMETRY_TYPE_METRICS) {
return PostMetricStoreLogs(accessKeyId,
accessKeySecret,
type,
host,
httpsFlag,
bufferMeta.project(),
bufferMeta.logstore(),
GetSLSCompressTypeString(bufferMeta.compresstype()),
logData,
bufferMeta.rawsize());
} else {
return PostLogStoreLogs(accessKeyId,
accessKeySecret,
type,
host,
httpsFlag,
bufferMeta.project(),
bufferMeta.logstore(),
GetSLSCompressTypeString(bufferMeta.compresstype()),
dataType,
logData,
bufferMeta.rawsize(),
bufferMeta.has_shardhashkey() ? bufferMeta.shardhashkey() : "");

auto telemetryType
= bufferMeta.has_telemetrytype() ? bufferMeta.telemetrytype() : sls_logs::SLS_TELEMETRY_TYPE_LOGS;
switch (telemetryType) {
case sls_logs::SLS_TELEMETRY_TYPE_LOGS:
return PostLogStoreLogs(accessKeyId,
accessKeySecret,
type,
host,
httpsFlag,
bufferMeta.project(),
bufferMeta.logstore(),
GetSLSCompressTypeString(bufferMeta.compresstype()),
dataType,
logData,
bufferMeta.rawsize(),
bufferMeta.has_shardhashkey() ? bufferMeta.shardhashkey() : "");
case sls_logs::SLS_TELEMETRY_TYPE_METRICS:
return PostMetricStoreLogs(accessKeyId,
accessKeySecret,
type,
host,
httpsFlag,
bufferMeta.project(),
bufferMeta.logstore(),
GetSLSCompressTypeString(bufferMeta.compresstype()),
logData,
bufferMeta.rawsize());
case sls_logs::SLS_TELEMETRY_TYPE_APM_METRICS:
case sls_logs::SLS_TELEMETRY_TYPE_APM_TRACES:
case sls_logs::SLS_TELEMETRY_TYPE_APM_AGENTINFOS:
return PostAPMBackendLogs(accessKeyId,
accessKeySecret,
type,
host,
httpsFlag,
bufferMeta.project(),
bufferMeta.logstore(),
GetSLSCompressTypeString(bufferMeta.compresstype()),
dataType,
logData,
bufferMeta.rawsize(),
bufferMeta.subpath());
default: {
// should not happen
LOG_ERROR(sLogger, ("Unhandled telemetry type", " should not happen"));
SLSResponse response;
response.mErrorCode = LOGE_REQUEST_ERROR;
response.mErrorMsg = "Unhandled telemetry type";
return response;
}
}
}

Expand Down
155 changes: 101 additions & 54 deletions core/plugin/flusher/sls/FlusherSLS.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -294,16 +294,57 @@ bool FlusherSLS::Init(const Json::Value& config, Json::Value& optionalGoPipeline
mContext->GetRegion());
}

// TelemetryType
string telemetryType;
if (!GetOptionalStringParam(config, "TelemetryType", telemetryType, errorMsg)) {
PARAM_WARNING_DEFAULT(mContext->GetLogger(),
mContext->GetAlarm(),
errorMsg,
"logs",
sName,
mContext->GetConfigName(),
mContext->GetProjectName(),
mContext->GetLogstoreName(),
mContext->GetRegion());
} else if (telemetryType == "metrics") {
// TelemetryType set to metrics
mTelemetryType = BOOL_FLAG(enable_metricstore_channel) ? sls_logs::SLS_TELEMETRY_TYPE_METRICS
: sls_logs::SLS_TELEMETRY_TYPE_LOGS;
} else if (telemetryType == "arms_agentinfo") {
mSubpath = APM_AGENTINFOS_URL;
mTelemetryType = sls_logs::SLS_TELEMETRY_TYPE_APM_AGENTINFOS;
} else if (telemetryType == "arms_metrics") {
mSubpath = APM_METRICS_URL;
mTelemetryType = sls_logs::SLS_TELEMETRY_TYPE_APM_METRICS;
} else if (telemetryType == "arms_traces") {
mSubpath = APM_TRACES_URL;
mTelemetryType = sls_logs::SLS_TELEMETRY_TYPE_APM_TRACES;
} else if (!telemetryType.empty() && telemetryType != "logs") {
// TelemetryType invalid
PARAM_WARNING_DEFAULT(mContext->GetLogger(),
mContext->GetAlarm(),
"string param TelemetryType is not valid",
"logs",
sName,
mContext->GetConfigName(),
mContext->GetProjectName(),
mContext->GetLogstoreName(),
mContext->GetRegion());
}

// Logstore
if (!GetMandatoryStringParam(config, "Logstore", mLogstore, errorMsg)) {
PARAM_ERROR_RETURN(mContext->GetLogger(),
mContext->GetAlarm(),
errorMsg,
sName,
mContext->GetConfigName(),
mContext->GetProjectName(),
mContext->GetLogstoreName(),
mContext->GetRegion());
if (mTelemetryType == sls_logs::SLS_TELEMETRY_TYPE_LOGS || mTelemetryType == sls_logs::SLS_TELEMETRY_TYPE_METRICS) {
// log and metric
if (!GetMandatoryStringParam(config, "Logstore", mLogstore, errorMsg)) {
PARAM_ERROR_RETURN(mContext->GetLogger(),
mContext->GetAlarm(),
errorMsg,
sName,
mContext->GetConfigName(),
mContext->GetProjectName(),
mContext->GetLogstoreName(),
mContext->GetRegion());
}
}

// Region
Expand Down Expand Up @@ -409,32 +450,6 @@ bool FlusherSLS::Init(const Json::Value& config, Json::Value& optionalGoPipeline
}
#endif

// TelemetryType
string telemetryType;
if (!GetOptionalStringParam(config, "TelemetryType", telemetryType, errorMsg)) {
PARAM_WARNING_DEFAULT(mContext->GetLogger(),
mContext->GetAlarm(),
errorMsg,
"logs",
sName,
mContext->GetConfigName(),
mContext->GetProjectName(),
mContext->GetLogstoreName(),
mContext->GetRegion());
} else if (telemetryType == "metrics") {
mTelemetryType = BOOL_FLAG(enable_metricstore_channel) ? sls_logs::SLS_TELEMETRY_TYPE_METRICS
: sls_logs::SLS_TELEMETRY_TYPE_LOGS;
} else if (!telemetryType.empty() && telemetryType != "logs") {
PARAM_WARNING_DEFAULT(mContext->GetLogger(),
mContext->GetAlarm(),
"string param TelemetryType is not valid",
"logs",
sName,
mContext->GetConfigName(),
mContext->GetProjectName(),
mContext->GetLogstoreName(),
mContext->GetRegion());
}

// Batch
const char* key = "Batch";
Expand Down Expand Up @@ -465,25 +480,17 @@ bool FlusherSLS::Init(const Json::Value& config, Json::Value& optionalGoPipeline
}

// ShardHashKeys
if (!GetOptionalListParam<string>(config, "ShardHashKeys", mShardHashKeys, errorMsg)) {
PARAM_WARNING_IGNORE(mContext->GetLogger(),
mContext->GetAlarm(),
errorMsg,
sName,
mContext->GetConfigName(),
mContext->GetProjectName(),
mContext->GetLogstoreName(),
mContext->GetRegion());
} else if (!mShardHashKeys.empty() && mContext->IsExactlyOnceEnabled()) {
mShardHashKeys.clear();
PARAM_WARNING_IGNORE(mContext->GetLogger(),
mContext->GetAlarm(),
"exactly once enabled when ShardHashKeys is not empty",
sName,
mContext->GetConfigName(),
mContext->GetProjectName(),
mContext->GetLogstoreName(),
mContext->GetRegion());
if (mTelemetryType == sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_LOGS && !mContext->IsExactlyOnceEnabled()) {
if (!GetOptionalListParam<string>(config, "ShardHashKeys", mShardHashKeys, errorMsg)) {
PARAM_WARNING_IGNORE(mContext->GetLogger(),
mContext->GetAlarm(),
errorMsg,
sName,
mContext->GetConfigName(),
mContext->GetProjectName(),
mContext->GetLogstoreName(),
mContext->GetRegion());
}
}

DefaultFlushStrategyOptions strategy{
Expand Down Expand Up @@ -667,6 +674,11 @@ bool FlusherSLS::BuildRequest(SenderQueueItem* item, unique_ptr<HttpSinkRequest>
case sls_logs::SLS_TELEMETRY_TYPE_METRICS:
req = CreatePostMetricStoreLogsRequest(accessKeyId, accessKeySecret, type, data);
break;
case sls_logs::SLS_TELEMETRY_TYPE_APM_AGENTINFOS:
case sls_logs::SLS_TELEMETRY_TYPE_APM_METRICS:
case sls_logs::SLS_TELEMETRY_TYPE_APM_TRACES:
req = CreatePostAPMBackendRequest(accessKeyId, accessKeySecret, type, data, mSubpath);
break;
default:
break;
}
Expand Down Expand Up @@ -1245,6 +1257,41 @@ unique_ptr<HttpSinkRequest> FlusherSLS::CreatePostMetricStoreLogsRequest(const s
1);
}

unique_ptr<HttpSinkRequest> FlusherSLS::CreatePostAPMBackendRequest(const string& accessKeyId,
const string& accessKeySecret,
SLSClientManager::AuthType type,
SLSSenderQueueItem* item,
const std::string& subPath) const {
string query;
map<string, string> header;
PreparePostAPMBackendRequest(accessKeyId,
accessKeySecret,
type,
item->mCurrentHost,
item->mRealIpFlag,
mProject,
item->mLogstore,
CompressTypeToString(mCompressor->GetCompressType()),
item->mType,
item->mData,
item->mRawSize,
mSubpath,
query,
header);
bool httpsFlag = SLSClientManager::GetInstance()->UsingHttps(mRegion);
return make_unique<HttpSinkRequest>(HTTP_POST,
httpsFlag,
item->mCurrentHost,
httpsFlag ? 443 : 80,
subPath,
"",
header,
item->mData,
item,
INT32_FLAG(default_http_request_timeout_sec),
1);
}

sls_logs::SlsCompressType ConvertCompressType(CompressType type) {
sls_logs::SlsCompressType compressType = sls_logs::SLS_CMP_NONE;
switch (type) {
Expand Down
9 changes: 9 additions & 0 deletions core/plugin/flusher/sls/FlusherSLS.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ class FlusherSLS : public HttpFlusher {
// for use of Go pipeline and shennong
bool Send(std::string&& data, const std::string& shardHashKey, const std::string& logstore = "");

std::string GetSubpath() const { return mSubpath; }

std::string mProject;
std::string mLogstore;
std::string mRegion;
Expand Down Expand Up @@ -130,6 +132,13 @@ class FlusherSLS : public HttpFlusher {
const std::string& accessKeySecret,
SLSClientManager::AuthType type,
SLSSenderQueueItem* item) const;
std::unique_ptr<HttpSinkRequest> CreatePostAPMBackendRequest(const std::string& accessKeyId,
const std::string& accessKeySecret,
SLSClientManager::AuthType type,
SLSSenderQueueItem* item,
const std::string& subPath) const;

std::string mSubpath;

Batcher<SLSEventBatchStatus> mBatcher;
std::unique_ptr<EventGroupSerializer> mGroupSerializer;
Expand Down
79 changes: 79 additions & 0 deletions core/plugin/flusher/sls/SLSClientManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,50 @@ void PreparePostMetricStoreLogsRequest(const string& accessKeyId,
header[AUTHORIZATION] = LOG_HEADSIGNATURE_PREFIX + accessKeyId + ':' + signature;
}

void PreparePostAPMBackendRequest(const string& accessKeyId,
const string& accessKeySecret,
SLSClientManager::AuthType type,
const string& host,
bool isHostIp,
const string& project,
const string& logstore,
const string& compressType,
RawDataType dataType,
const string& body,
size_t rawSize,
const string& path,
string& query,
map<string, string>& header) {
if (isHostIp) {
header[HOST] = project + "." + host;
} else {
header[HOST] = host;
}
header[USER_AGENT] = SLSClientManager::GetInstance()->GetUserAgent();
header[DATE] = GetDateString();
header[CONTENT_TYPE] = TYPE_LOG_PROTOBUF;
header[CONTENT_LENGTH] = to_string(body.size());
header[CONTENT_MD5] = CalcMD5(body);
header[X_LOG_APIVERSION] = LOG_API_VERSION;
header[X_LOG_SIGNATUREMETHOD] = HMAC_SHA1;
if (!compressType.empty()) {
header[X_LOG_COMPRESSTYPE] = compressType;
}
if (dataType == RawDataType::EVENT_GROUP) {
header[X_LOG_BODYRAWSIZE] = to_string(rawSize);
} else {
header[X_LOG_BODYRAWSIZE] = to_string(body.size());
header[X_LOG_MODE] = LOG_MODE_BATCH_GROUP;
}
if (type == SLSClientManager::AuthType::ANONYMOUS) {
header[X_LOG_KEYPROVIDER] = MD5_SHA1_SALT_KEYPROVIDER;
}

map<string, string> parameterList;
string signature = GetUrlSignature(HTTP_POST, path, header, parameterList, body, accessKeySecret);
header[AUTHORIZATION] = LOG_HEADSIGNATURE_PREFIX + accessKeyId + ':' + signature;
}

SLSResponse PostLogStoreLogs(const string& accessKeyId,
const string& accessKeySecret,
SLSClientManager::AuthType type,
Expand Down Expand Up @@ -303,6 +347,41 @@ SLSResponse PostMetricStoreLogs(const string& accessKeyId,
return ParseHttpResponse(response);
}

SLSResponse PostAPMBackendLogs(const string& accessKeyId,
const string& accessKeySecret,
SLSClientManager::AuthType type,
const string& host,
bool httpsFlag,
const string& project,
const string& logstore,
const string& compressType,
RawDataType dataType,
const string& body,
size_t rawSize,
const std::string& subpath) {
string query;
map<string, string> header;
PreparePostAPMBackendRequest(accessKeyId,
accessKeySecret,
type,
host,
false, // sync request always uses vip
project,
logstore,
compressType,
dataType,
body,
rawSize,
subpath,
query,
header);
HttpResponse response;
SendHttpRequest(
make_unique<HttpRequest>(HTTP_POST, httpsFlag, host, httpsFlag ? 443 : 80, subpath, "", header, body),
response);
return ParseHttpResponse(response);
}

SLSResponse PutWebTracking(const string& host,
bool httpsFlag,
const string& logstore,
Expand Down
Loading

0 comments on commit 0885bb8

Please sign in to comment.