Skip to content

Commit

Permalink
series request impl
Browse files Browse the repository at this point in the history
  • Loading branch information
akvlad committed Jul 17, 2024
1 parent 2b444a2 commit 7aaa225
Showing 1 changed file with 161 additions and 1 deletion.
162 changes: 161 additions & 1 deletion pyroscope/pyroscope.js
Original file line number Diff line number Diff line change
Expand Up @@ -564,14 +564,174 @@ const selectMergeProfile = async (req, res) => {
return res.code(200).send(Buffer.from(response))
}

const series = async (req, res) => {
const _req = req.body
const fromTimeSec = Math.floor(req.getStart && req.getStart()
? parseInt(req.getStart()) / 1000
: Date.now() / 1000 - HISTORY_TIMESPAN)
const toTimeSec = Math.floor(req.getEnd && req.getEnd()
? parseInt(req.getEnd()) / 1000
: Date.now() / 1000)
const dist = clusterName ? '_dist' : ''
const promises = []
for (const labelSelector of _req.getMatchersList() || []) {
const specialMatchers = getSpecialMatchers(labelSelector)
const specialClauses = specialMatchersQuery(specialMatchers.matchers)
const serviceNameSelector = serviceNameSelectorQuery(labelSelector)
const idxReq = (new Sql.Select())
.select(new Sql.Raw('fingerprint'))
.from(`${DATABASE_NAME()}.profiles_series_gin`)
.where(
Sql.And(
serviceNameSelector,
Sql.Gte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(fromTimeSec)}))`)),
Sql.Lte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(toTimeSec)}))`))
)
)
labelSelectorQuery(idxReq, specialMatchers.query)
const withIdxReq = (new Sql.With('idx', idxReq, !!clusterName))
const labelsReq = (new Sql.Select())
.with(withIdxReq)
.select(
['tags', 'tags'],
['type_id', 'type_id'],
['sample_types_units', '_sample_types_units'])
.from([`${DATABASE_NAME()}.profiles_series${dist}`, 'p'])
.join('p.sample_types_units', 'array')
.where(
Sql.And(
serviceNameSelector,
specialClauses,
Sql.Gte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(fromTimeSec)}))`)),
Sql.Lte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(toTimeSec)}))`)),
new Sql.In('p.fingerprint', 'IN', new Sql.WithReference(withIdxReq))
)
)
console.log(labelsReq.toString())
promises.push(clickhouse.rawRequest(labelsReq.toString() + ' FORMAT JSON', null, DATABASE_NAME()))
}
const resp = await Promise.all(promises)
const response = new messages.SeriesResponse()
const labelsSet = []
resp.forEach(_res => {
for (const row of _res.data.data) {
const labels = new types.Labels()
const _labels = []
for (const tag of row.tags) {
const pair = new types.LabelPair()
pair.setName(tag[0])
pair.setValue(tag[1])
_labels.push(pair)
}
const typeId = row.type_id.split(':')
const _pair = (name, val) => {
const pair = new types.LabelPair()
pair.setName(name)
pair.setValue(val)
return pair
}
_labels.push(
_pair('__name__', typeId[0]),
_pair('__period_type__', typeId[1]),
_pair('__period_unit__', typeId[2]),
_pair('__sample_type__', row._sample_types_units[0]),
_pair('__sample_unit__', row._sample_types_units[1]),
_pair('__profile_type__',
`${typeId[0]}:${row._sample_types_units[0]}:${row._sample_types_units[1]}:${typeId[1]}:${typeId[2]}`)
)
labels.setLabelsList(_labels)
labelsSet.push(labels)
}
})
response.setLabelsSetList(labelsSet)
return res.code(200).send(Buffer.from(response.serializeBinary()))
}

/**
*
* @param query {string}
*/
const getSpecialMatchers = (query) => {
if (query.length <= 2) {
return []
}
const res = {}
for (const name of
['__name__', '__period_type__', '__period_unit__', '__sample_type__', '__sample_unit__', '__profile_type__']) {
console.log(`${name}\\s*(=~|!~|=|!=)\\s*("([^"]|\\\\.)+"),*`)
const re = new RegExp(`${name}\\s*(=~|!~|=|!=)\\s*("([^"]|\\\\.)+"),*`, 'g')
const pair = re.exec(query)
if (pair) {
res[name] = [pair[1], JSON.parse(pair[2])]
}
query = query.replaceAll(re, '')
}
query = query.replaceAll(/,\s*}$/g, '}')
return {
matchers: res,
query: query
}
}

const matcherClause = (field, matcher) => {
let valRul
const val = matcher[1]
switch (matcher[0]) {
case '=':
valRul = Sql.Eq(new Sql.Raw(field), Sql.val(val))
break
case '!=':
valRul = Sql.Ne(new Sql.Raw(field), Sql.val(val))
break
case '=~':
valRul = Sql.Eq(new Sql.Raw(`match(${(new Sql.Raw(field)).toString()}, ${Sql.quoteVal(val)})`), 1)
break
case '!~':
valRul = Sql.Ne(new Sql.Raw(`match(${(new Sql.Raw(field)).toString()}, ${Sql.quoteVal(val)})`), 1)
}
return valRul
}

const specialMatchersQuery = (matchers) => {
const clauses = []
if (matchers.__name__) {
clauses.push(matcherClause("splitByChar(':', type_id)[1]", matchers.__name__))
}
if (matchers.__period_type__) {
clauses.push(matcherClause("splitByChar(':', type_id)[2]", matchers.__period_type__))
}
if (matchers.__period_unit__) {
clauses.push(matcherClause("splitByChar(':', type_id)[3]", matchers.__period_unit__))
}
if (matchers.__sample_type__) {
clauses.push(matcherClause('_sample_types_units.1', matchers.__sample_type__))
}
if (matchers.__sample_unit__) {
clauses.push(matcherClause('_sample_types_units.2', matchers.__sample_unit__))
}
if (matchers.__profile_type__) {
clauses.push(matcherClause(
"format('{}:{}:{}:{}:{}', (splitByChar(':', type_id) as _parts)[1], _sample_types_units.1, _sample_types_units.2, _parts[2], _parts[3])",
matchers.__profile_type__))
}
if (clauses.length === 0) {
return Sql.Eq(new Sql.Raw('1'), 1)
}
if (clauses.length === 1) {
return clauses[0]
}
return new Sql.And(...clauses)
}

module.exports.init = (fastify) => {
const fns = {
profileTypes: profileTypesHandler,
labelNames: labelNames,
labelValues: labelValues,
selectMergeStacktraces: selectMergeStacktraces,
selectSeries: selectSeries,
selectMergeProfile: selectMergeProfile
selectMergeProfile: selectMergeProfile,
series: series
}
for (const name of Object.keys(fns)) {
fastify.post(services.QuerierServiceService[name].path, (req, res) => {
Expand Down

0 comments on commit 7aaa225

Please sign in to comment.