Skip to content

Commit

Permalink
#fix: pure virtual method called issue;
Browse files Browse the repository at this point in the history
  • Loading branch information
akvlad committed Oct 6, 2023
1 parent 228e17d commit f4bc4d3
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 50 deletions.
8 changes: 8 additions & 0 deletions common.js
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,11 @@ module.exports.CORS = process.env.CORS_ALLOW_ORIGIN || '*'
module.exports.clusterName = process.env.CLUSTER_NAME

module.exports.readonly = process.env.READONLY || false

module.exports.bun = () => {
try {
return Bun
} catch (err) {
return false
}
}
44 changes: 27 additions & 17 deletions lib/db/clickhouse.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,6 @@ const dist = clusterName ? '_dist' : ''

/* DB Helper */
const ClickHouse = require('@apla/clickhouse')
const clickhouseOptions = {
host: process.env.CLICKHOUSE_SERVER || 'localhost',
port: process.env.CLICKHOUSE_PORT || 8123,
auth: process.env.CLICKHOUSE_AUTH || 'default:',
protocol: process.env.CLICKHOUSE_PROTO ? process.env.CLICKHOUSE_PROTO + ':' : 'http:',
readonly: !!process.env.READONLY,
queryOptions: { database: process.env.CLICKHOUSE_DB || 'cloki' }
}

const transpiler = require('../../parser/transpiler')
const rotationLabels = process.env.LABELS_DAYS || 7
Expand All @@ -33,9 +25,9 @@ const axios = require('axios')
const { samplesTableName, samplesReadTableName } = UTILS
const path = require('path')
const { Transform } = require('stream')
const { CORS } = require('../../common')

const protocol = process.env.CLICKHOUSE_PROTO || 'http'
const { CORS, bun } = require('../../common')
const clickhouseOptions = require('./clickhouse_options').databaseOptions
const { getClickhouseUrl } = require('./clickhouse_options')

// External Storage Policy for Tables (S3, MINIO)
const storagePolicy = process.env.STORAGE_POLICY || false
Expand Down Expand Up @@ -76,7 +68,8 @@ const conveyor = {
let throttler = null
const resolvers = {}
const rejectors = {}
if (isMainThread) {
let first = false
if (isMainThread && !bun()) {
throttler = new Worker(path.join(__dirname, 'throttler.js'))
throttler.on('message', (msg) => {
switch (msg.status) {
Expand All @@ -90,8 +83,29 @@ if (isMainThread) {
delete resolvers[msg.id]
delete rejectors[msg.id]
})
} else if (isMainThread && !first) {
first = true
const _throttler = require('./throttler')
throttler = {
on: _throttler.on,
postMessage: _throttler.postMessage,
removeAllListeners: _throttler.removeAllListeners,
terminate: _throttler.terminate
}
_throttler.init()
throttler.on('message', (msg) => {
switch (msg.status) {
case 'ok':
resolvers[msg.id]()
break
case 'err':
rejectors[msg.id](new Error('Database push error'))
break
}
delete resolvers[msg.id]
delete rejectors[msg.id]
})
}

// timeSeriesv2Throttler.start();

/* Cache Helper */
Expand Down Expand Up @@ -348,10 +362,6 @@ function pushOTLP (traces) {
})
}

function getClickhouseUrl () {
return `${protocol}://${clickhouseOptions.auth}@${clickhouseOptions.host}:${clickhouseOptions.port}`
}

/**
* @param query {{
* query: string,
Expand Down
22 changes: 22 additions & 0 deletions lib/db/clickhouse_options.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
const UTILS = require('../utils')
const { samplesTableName, samplesReadTableName } = UTILS

const clickhouseOptions = {
host: process.env.CLICKHOUSE_SERVER || 'localhost',
port: process.env.CLICKHOUSE_PORT || 8123,
auth: process.env.CLICKHOUSE_AUTH || 'default:',
protocol: process.env.CLICKHOUSE_PROTO ? process.env.CLICKHOUSE_PROTO + ':' : 'http:',
readonly: !!process.env.READONLY,
queryOptions: { database: process.env.CLICKHOUSE_DB || 'cloki' }
}

function getClickhouseUrl () {
return `${clickhouseOptions.protocol}//${clickhouseOptions.auth}@${clickhouseOptions.host}:${clickhouseOptions.port}`
}

module.exports = {
samplesTableName,
samplesReadTableName,
getClickhouseUrl,
databaseOptions: clickhouseOptions
}
92 changes: 60 additions & 32 deletions lib/db/throttler.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
const { isMainThread, parentPort } = require('worker_threads')
const axios = require('axios')
const { getClickhouseUrl, samplesTableName } = require('./clickhouse')
const clickhouseOptions = require('./clickhouse').databaseOptions
const { getClickhouseUrl, samplesTableName } = require('./clickhouse_options')
const clickhouseOptions = require('./clickhouse_options').databaseOptions
const logger = require('../logger')
const clusterName = require('../../common').clusterName
const dist = clusterName ? '_dist' : ''
const { EventEmitter } = require('events')

const axiosError = async (err) => {
try {
Expand Down Expand Up @@ -71,14 +72,45 @@ const tracesThottler = new TimeoutThrottler(
(trace_id, span_id, parent_id, name, timestamp_ns, duration_ns, service_name, payload_type, payload, tags)
FORMAT JSONEachRow`)

if (isMainThread) {
module.exports = {
samplesThrottler,
timeSeriesThrottler,
TimeoutThrottler
const emitter = new EventEmitter()
let on = true
const postMessage = message => {
const genericRequest = (throttler) => {
throttler.queue.push(message.data)
throttler.resolvers.push(() => {
if (isMainThread) {
emitter.emit('message', { status: 'ok', id: message.id })
return
}
parentPort.postMessage({ status: 'ok', id: message.id })
})
throttler.rejects.push(() => {
if (isMainThread) {
emitter.emit('message', { status: 'err', id: message.id })
return
}
parentPort.postMessage({ status: 'err', id: message.id })
})
}
} else {
let on = true
switch (message.type) {
case 'end':
on = false
if (!isMainThread) {
parentPort.removeAllListeners('message')
}
break
case 'values':
genericRequest(samplesThrottler)
break
case 'labels':
genericRequest(timeSeriesThrottler)
break
case 'traces':
genericRequest(tracesThottler)
}
}

const init = () => {
setTimeout(async () => {
// eslint-disable-next-line no-unmodified-loop-condition
while (on) {
Expand All @@ -96,29 +128,25 @@ if (isMainThread) {
}
}
}, 0)
parentPort.on('message', message => {
const genericRequest = (throttler) => {
throttler.queue.push(message.data)
throttler.resolvers.push(() => {
parentPort.postMessage({ status: 'ok', id: message.id })
})
throttler.rejects.push(() => {
parentPort.postMessage({ status: 'err', id: message.id })
})
}
switch (message.type) {
case 'end':
on = false
parentPort.removeAllListeners('message')
break
case 'values':
genericRequest(samplesThrottler)
break
case 'labels':
genericRequest(timeSeriesThrottler)
break
case 'traces':
genericRequest(tracesThottler)
}

if (isMainThread) {
module.exports = {
samplesThrottler,
timeSeriesThrottler,
tracesThottler,
TimeoutThrottler,
postMessage,
on: emitter.on.bind(emitter),
removeAllListeners: emitter.removeAllListeners.bind(emitter),
init,
terminate: () => {
postMessage({ type: 'end' })
}
}
} else {
init()
parentPort.on('message', message => {
postMessage(message)
})
}
3 changes: 2 additions & 1 deletion qryn.mjs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {init, bun} from './qryn_node_wrapper.js'
import {init} from './qryn_node_wrapper.js'
import {bun} from './common.js'
import bunInit from './qryn_bun.mjs'

if (bun()) {
Expand Down

0 comments on commit f4bc4d3

Please sign in to comment.