Skip to content

Commit

Permalink
reader/writer modes
Browse files Browse the repository at this point in the history
  • Loading branch information
akvlad committed Jun 11, 2024
1 parent 3647077 commit 85f293b
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 134 deletions.
3 changes: 3 additions & 0 deletions common.js
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,6 @@ module.exports.logType = boolEnv('DISTINGUISH_LOGS_METRICS') ? 1 : 0
module.exports.metricType = boolEnv('DISTINGUISH_LOGS_METRICS') ? 2 : 0

module.exports.bothType = 0

module.exports.writerMode = (process.env.MODE === 'writer' || !process.env.MODE) && !boolEnv('READONLY')
module.exports.readerMode = process.env.MODE === 'reader' || boolEnv('READONLY') || !process.env.MODE
140 changes: 70 additions & 70 deletions qryn_bun.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ import handlerTempoLabelV2 from './lib/handlers/tempo_v2_tags.js'
import handlerTempoLabelV2Values from './lib/handlers/tempo_v2_values.js'
import {init as pyroscopeInit } from './pyroscope/pyroscope.js'

import { boolEnv, readonly } from './common.js'
import { boolEnv, readonly, readerMode, writerMode } from './common.js'
import DATABASE, { init } from './lib/db/clickhouse.js'
import { startAlerting } from './lib/db/alerting/index.js'
import fs from 'fs'
Expand Down Expand Up @@ -110,61 +110,61 @@ export default async() => {
});

app.get('/hello', wrapper(handlerHello))
.get('/ready', wrapper(handlerHello))
.post('/loki/api/v1/push', wrapper(handlerPush, {
app.get('/ready', wrapper(handlerHello))
writerMode && app.post('/loki/api/v1/push', wrapper(handlerPush, {
'application/json': lokiPushJSONParser,
'application/x-protobuf': lokiPushProtoParser,
'*': lokiPushJSONParser
}))
.post('/:target/_doc', wrapper(handlerElasticPush, {
writerMode && app.post('/:target/_doc', wrapper(handlerElasticPush, {
'application/json': jsonParser,
'*': rawStringParser
}))
.post('/:target/_create/:id', wrapper(handlerElasticPush, {
writerMode && app.post('/:target/_create/:id', wrapper(handlerElasticPush, {
'application/json': jsonParser,
'*': rawStringParser
}))
.put('/:target/_doc/:id', wrapper(handlerElasticPush, {
writerMode && app.put('/:target/_doc/:id', wrapper(handlerElasticPush, {
'application/json': jsonParser,
'*': rawStringParser
}))
.put('/:target/_create/:id', wrapper(handlerElasticPush, {
writerMode && app.put('/:target/_create/:id', wrapper(handlerElasticPush, {
'application/json': jsonParser,
'*': rawStringParser
}))
.post('/_bulk', wrapper(handlerElasticBulk, {
writerMode && app.post('/_bulk', wrapper(handlerElasticBulk, {
'application/json': jsonParser,
'*': rawStringParser
}))
.post('/:target/_bulk', wrapper(handlerElasticBulk, {
writerMode && app.post('/:target/_bulk', wrapper(handlerElasticBulk, {
'application/json': jsonParser,
'*': rawStringParser
}))
.post('/tempo/api/push', wrapper(handlerTempoPush, {
writerMode && app.post('/tempo/api/push', wrapper(handlerTempoPush, {
'application/json': tempoPushParser,
'application/x-ndjson': tempoPushNDJSONParser,
'*': tempoPushParser
}))
.post('/tempo/spans', wrapper(handlerTempoPush, {
writerMode && app.post('/tempo/spans', wrapper(handlerTempoPush, {
'application/json': tempoPushParser,
'application/x-ndjson': tempoPushNDJSONParser,
'*': tempoPushParser
}))
.post('/api/v2/spans', wrapper(handlerTempoPush, {
writerMode && app.post('/api/v2/spans', wrapper(handlerTempoPush, {
'application/json': tempoPushParser,
'application/x-ndjson': tempoPushNDJSONParser,
'*': tempoPushParser
}))
.get('/api/traces/:traceId', wrapper(handlerTempoTraces))
.get('/api/traces/:traceId/:json', wrapper(handlerTempoTraces))
.get('/tempo/api/traces/:traceId', wrapper(handlerTempoTraces))
.get('/tempo/api/traces/:traceId/:json', wrapper(handlerTempoTraces))
.get('/api/echo', wrapper(handlerTempoEcho))
.get('/tempo/api/echo', wrapper(handlerTempoEcho))
.ws('/loki/api/v1/tail', wsWrapper(handlerTail))
.get('/config', () => new Response('not supported'))
.get('/metrics', () => new Response('not supported'))
.get('/influx/api/v2/write/health', () => new Response('ok'))
readerMode && app.get('/api/traces/:traceId', wrapper(handlerTempoTraces))
readerMode && app.get('/api/traces/:traceId/:json', wrapper(handlerTempoTraces))
readerMode && app.get('/tempo/api/traces/:traceId', wrapper(handlerTempoTraces))
readerMode && app.get('/tempo/api/traces/:traceId/:json', wrapper(handlerTempoTraces))
readerMode && app.get('/api/echo', wrapper(handlerTempoEcho))
readerMode && app.get('/tempo/api/echo', wrapper(handlerTempoEcho))
readerMode && app.ws('/loki/api/v1/tail', wsWrapper(handlerTail))
app.get('/config', () => new Response('not supported'))
app.get('/metrics', () => new Response('not supported'))
app.get('/influx/api/v2/write/health', () => new Response('ok'))


const fastify = {
Expand All @@ -182,74 +182,74 @@ export default async() => {
}
}

fastify.get('/api/search/tags', handlerTempoLabel)
fastify.get('/tempo/api/search/tags', handlerTempoLabel)
readerMode && fastify.get('/api/search/tags', handlerTempoLabel)
readerMode && fastify.get('/tempo/api/search/tags', handlerTempoLabel)

/* Tempo Tag Value Handler */
fastify.get('/api/search/tag/:name/values', handlerTempoLabelValues)
fastify.get('/tempo/api/search/tag/:name/values', handlerTempoLabelValues)
readerMode && fastify.get('/api/search/tag/:name/values', handlerTempoLabelValues)
readerMode && fastify.get('/tempo/api/search/tag/:name/values', handlerTempoLabelValues)

/* Tempo Traces Query Handler */
fastify.get('/api/search', handlerTempoSearch)
fastify.get('/tempo/api/search', handlerTempoSearch)
readerMode && fastify.get('/api/search', handlerTempoSearch)
readerMode && fastify.get('/tempo/api/search', handlerTempoSearch)

/* Tempo Echo Handler */
fastify.get('/api/echo', handlerTempoEcho)
fastify.get('/tempo/api/echo', handlerTempoEcho)

/* Telegraf HTTP Bulk handler */
fastify.post('/telegraf', handlerTelegraf, {
writerMode && fastify.post('/telegraf', handlerTelegraf, {
'*': jsonParser
})

/* Datadog Log Push Handler */
fastify.post('/api/v2/logs', handlerDatadogLogPush, {
writerMode && fastify.post('/api/v2/logs', handlerDatadogLogPush, {
'application/json': jsonParser,
'*': rawStringParser
})

/* Datadog Series Push Handler */

fastify.post('/api/v2/series', handlerDatadogSeriesPush, {
writerMode && fastify.post('/api/v2/series', handlerDatadogSeriesPush, {
'application/json': jsonParser,
'*': rawStringParser
})

/* Query Handler */

fastify.get('/loki/api/v1/query_range', handlerQueryRange)
readerMode && fastify.get('/loki/api/v1/query_range', handlerQueryRange)

/* Label Handlers */
/* Label Value Handler via query (test) */

fastify.get('/loki/api/v1/query', handlerQuery)
readerMode && fastify.get('/loki/api/v1/query', handlerQuery)

/* Label Handlers */
fastify.get('/loki/api/v1/label', handlerLabel)
fastify.get('/loki/api/v1/labels', handlerLabel)
readerMode && fastify.get('/loki/api/v1/label', handlerLabel)
readerMode && fastify.get('/loki/api/v1/labels', handlerLabel)

/* Label Value Handler */

fastify.get('/loki/api/v1/label/:name/values', handlerLabelValues)
readerMode && fastify.get('/loki/api/v1/label/:name/values', handlerLabelValues)

/* Series Handler - experimental support for both Loki and Prometheus */

fastify.get('/loki/api/v1/series', handlerSeries)
readerMode && fastify.get('/loki/api/v1/series', handlerSeries)

fastify.get('/api/v1/series', handlerPromSeries)
fastify.post('/api/v1/series', handlerPromSeries, {
readerMode && fastify.get('/api/v1/series', handlerPromSeries)
readerMode && fastify.post('/api/v1/series', handlerPromSeries, {
'application/x-www-form-urlencoded': wwwFormParser
})

/* ALERT MANAGER Handlers */
fastify.get('/api/prom/rules', handlerGetRules)
fastify.get('/api/prom/rules/:ns/:group', handlerGetGroup)
fastify.post('/api/prom/rules/:ns', handlerPostGroup, {
readerMode && fastify.get('/api/prom/rules', handlerGetRules)
readerMode && fastify.get('/api/prom/rules/:ns/:group', handlerGetGroup)
readerMode && fastify.post('/api/prom/rules/:ns', handlerPostGroup, {
'*': yamlParser
})
fastify.delete('/api/prom/rules/:ns/:group', handlerDelGroup)
fastify.delete('/api/prom/rules/:ns', handlerDelNS)
fastify.get('/prometheus/api/v1/rules', handlerPromGetRules)
readerMode && fastify.delete('/api/prom/rules/:ns/:group', handlerDelGroup)
readerMode && fastify.delete('/api/prom/rules/:ns', handlerDelNS)
readerMode && fastify.get('/prometheus/api/v1/rules', handlerPromGetRules)

/* PROMETHEUS REMOTE WRITE Handlers */
const remoteWritePaths = [
Expand All @@ -260,59 +260,59 @@ export default async() => {
'/api/prom/push'
]
for (const path of remoteWritePaths) {
fastify.post(path, promWriteHandler, {
writerMode && fastify.post(path, promWriteHandler, {
'application/x-protobuf': prometheusPushProtoParser,
'application/json': jsonParser,
'*': combinedParser(prometheusPushProtoParser, jsonParser)
})
fastify.get(path, handlerTempoEcho)
writerMode && fastify.get(path, handlerTempoEcho)
}

/* PROMQETHEUS API EMULATION */

fastify.post('/api/v1/query_range', handlerPromQueryRange, {
readerMode && fastify.post('/api/v1/query_range', handlerPromQueryRange, {
'application/x-www-form-urlencoded': wwwFormParser
})
fastify.get('/api/v1/query_range', handlerPromQueryRange)
readerMode && fastify.get('/api/v1/query_range', handlerPromQueryRange)

fastify.post('/api/v1/query', handlerPromQuery, {
readerMode && fastify.post('/api/v1/query', handlerPromQuery, {
'application/x-www-form-urlencoded': wwwFormParser
})
fastify.get('/api/v1/query', handlerPromQuery)
fastify.get('/api/v1/labels', handlerPromLabel) // piggyback on qryn labels
fastify.get('/api/v1/label/:name/values', handlerPromLabelValues) // piggyback on qryn values
fastify.post('/api/v1/labels', handlerPromLabel, {
readerMode && fastify.get('/api/v1/query', handlerPromQuery)
readerMode && fastify.get('/api/v1/labels', handlerPromLabel) // piggyback on qryn labels
readerMode && fastify.get('/api/v1/label/:name/values', handlerPromLabelValues) // piggyback on qryn values
readerMode && fastify.post('/api/v1/labels', handlerPromLabel, {
'*': rawStringParser
}) // piggyback on qryn labels
fastify.post('/api/v1/label/:name/values', handlerPromLabelValues, {
readerMode && fastify.post('/api/v1/label/:name/values', handlerPromLabelValues, {
'*': rawStringParser
}) // piggyback on qryn values

fastify.get('/api/v1/metadata', handlerPromDefault.misc) // default handler TBD
fastify.get('/api/v1/rules', handlerPromDefault.rules) // default handler TBD
fastify.get('/api/v1/query_exemplars', handlerPromDefault.misc) // default handler TBD
fastify.post('/api/v1/query_exemplars', handlerPromDefault.misc, {
readerMode && fastify.get('/api/v1/metadata', handlerPromDefault.misc) // default handler TBD
readerMode && fastify.get('/api/v1/rules', handlerPromDefault.rules) // default handler TBD
readerMode && fastify.get('/api/v1/query_exemplars', handlerPromDefault.misc) // default handler TBD
readerMode && fastify.post('/api/v1/query_exemplars', handlerPromDefault.misc, {
'application/x-www-form-urlencoded': wwwFormParser
}) // default handler TBD
fastify.get('/api/v1/format_query', handlerPromDefault.misc) // default handler TBD
fastify.post('/api/v1/format_query', handlerPromDefault.misc, {
readerMode && fastify.get('/api/v1/format_query', handlerPromDefault.misc) // default handler TBD
readerMode && fastify.post('/api/v1/format_query', handlerPromDefault.misc, {
'application/x-www-form-urlencoded': wwwFormParser
}) // default handler TBD
fastify.get('/api/v1/status/buildinfo', handlerPromDefault.buildinfo) // default handler TBD

/* NewRelic Log Handler */

fastify.post('/log/v1', handlerNewrelicLogPush, {
writerMode && fastify.post('/log/v1', handlerNewrelicLogPush, {
'text/plain': jsonParser,
'*': jsonParser
})

/* INFLUX WRITE Handlers */

fastify.post('/write', handlerInfluxWrite, {
writerMode && fastify.post('/write', handlerInfluxWrite, {
'*': rawStringParser
})
fastify.post('/influx/api/v2/write', handlerInfluxWrite, {
writerMode && fastify.post('/influx/api/v2/write', handlerInfluxWrite, {
'*': rawStringParser
})
/* INFLUX HEALTH Handlers */
Expand All @@ -321,16 +321,16 @@ export default async() => {
fastify.get('/influx/health', handlerInfluxHealth)


fastify.post('/v1/traces', handlerOTLPPush, {
writerMode && fastify.post('/v1/traces', handlerOTLPPush, {
'*': otlpPushProtoParser
})

fastify.get('/api/v2/search/tags', handlerTempoLabelV2)
fastify.get('/tempo/api/v2/search/tags', handlerTempoLabelV2)
fastify.get('/api/v2/search/tag/:name/values', handlerTempoLabelV2Values)
fastify.get('/tempo/api/v2/search/tag/:name/values', handlerTempoLabelV2Values)
readerMode && fastify.get('/api/v2/search/tags', handlerTempoLabelV2)
readerMode && fastify.get('/tempo/api/v2/search/tags', handlerTempoLabelV2)
readerMode && fastify.get('/api/v2/search/tag/:name/values', handlerTempoLabelV2Values)
readerMode && fastify.get('/tempo/api/v2/search/tag/:name/values', handlerTempoLabelV2Values)

pyroscopeInit(fastify)
readerMode && pyroscopeInit(fastify)

const serveView = fs.existsSync(path.join(__dirname, 'view/index.html'))
if (serveView) {
Expand Down
Loading

0 comments on commit 85f293b

Please sign in to comment.