Skip to content

Commit

Permalink
Merge pull request #552 from Cluas/fix-timestamp
Browse files Browse the repository at this point in the history
fix: timestamp error caused the return of a large number of profiles
  • Loading branch information
akvlad authored Aug 27, 2024
2 parents 87a42e6 + 1683efa commit c378f5a
Showing 1 changed file with 36 additions and 13 deletions.
49 changes: 36 additions & 13 deletions pyroscope/pyroscope.js
Original file line number Diff line number Diff line change
Expand Up @@ -120,15 +120,20 @@ const selectSeries = async (req, res) => {
return selectSeriesImpl(fromTimeSec, toTimeSec, req.body)
}

let mergeRequestsCounter = 0
const mergeRequestsLimit = 10

const selectMergeProfile = async (req, res) => {
const ctx = newCtxIdx()
try {
const _req = req.body
const fromTimeSec = Math.floor(req.getStart && req.getStart()
? parseInt(req.getStart()) / 1000
: Date.now() / 1000 - HISTORY_TIMESPAN)
const toTimeSec = Math.floor(req.getEnd && req.getEnd()
? parseInt(req.getEnd()) / 1000
const fromTimeSec =
Math.floor(_req && _req.getStart
? parseInt(_req.getStart()) / 1000
: (Date.now() - HISTORY_TIMESPAN) / 1000)
const toTimeSec =
Math.floor(_req && _req.getEnd
? parseInt(_req.getEnd()) / 1000
: Date.now() / 1000)
let typeID = _req.getProfileTypeid && _req.getProfileTypeid()
if (!typeID) {
Expand Down Expand Up @@ -166,15 +171,18 @@ const selectMergeProfile = async (req, res) => {
)
labelSelectorQuery(idxReq, labelSelector)
const withIdxReq = (new Sql.With('idx', idxReq, !!clusterName))
const mainReq = (new Sql.Select())
let mainReq = (new Sql.Select())
.with(withIdxReq)
.select([new Sql.Raw('payload'), 'payload'])
.from([`${DATABASE_NAME()}.profiles${dist}`, 'p'])
.where(Sql.And(
new Sql.In('p.fingerprint', 'IN', new Sql.WithReference(withIdxReq)),
Sql.Gte('p.timestamp_ns', new Sql.Raw(`${fromTimeSec}000000000`)),
Sql.Lt('p.timestamp_ns', new Sql.Raw(`${toTimeSec}000000000`))))
.orderBy(new Sql.Raw('timestamp_ns'))
.orderBy([new Sql.Raw('timestamp_ns'), 'DESC'], [new Sql.Raw('p.fingerprint'), 'ASC'])
if (process.env.ADVANCED_PROFILES_MERGE_LIMIT) {
mainReq = mainReq.limit(parseInt(process.env.ADVANCED_PROFILES_MERGE_LIMIT))
}
const approxReq = (new Sql.Select())
.select(
[new Sql.Raw('sum(length(payload))'), 'size'],
Expand All @@ -196,13 +204,28 @@ const selectMergeProfile = async (req, res) => {

for (let i = 0; i < chunksCount; i++) {
promises.push((async (i) => {
// eslint-disable-next-line no-unmodified-loop-condition
while (mergeRequestsCounter >= mergeRequestsLimit) {
await (new Promise((resolve) => setTimeout(resolve, 50)))
}
logger.debug(`Processing chunk ${i}`)
const profiles = await clickhouse.rawRequest(mainReq.toString() + ` LIMIT ${chunkSize} OFFSET ${i * chunkSize} FORMAT RowBinary`,
null,
DATABASE_NAME(),
{
responseType: 'arraybuffer'
})
mergeRequestsCounter++
let profiles = null
try {
let end = i * chunkSize + chunkSize
if (process.env.ADVANCED_PROFILES_MERGE_LIMIT && end > process.env.ADVANCED_PROFILES_MERGE_LIMIT) {
end = process.env.ADVANCED_PROFILES_MERGE_LIMIT
}
mainReq.limit(end - i * chunkSize, i * chunkSize)
profiles = await clickhouse.rawRequest(mainReq.toString() + ' FORMAT RowBinary',
null,
DATABASE_NAME(),
{
responseType: 'arraybuffer'
})
} finally {
mergeRequestsCounter--
}
const binData = Uint8Array.from(profiles.data)
logger.debug(`Chunk ${i} - ${binData.length} bytes`)
const start = process.hrtime.bigint()
Expand Down

0 comments on commit c378f5a

Please sign in to comment.