diff --git a/.github/workflows/bump_version.yml b/.github/workflows/build_release.yml similarity index 58% rename from .github/workflows/bump_version.yml rename to .github/workflows/build_release.yml index 7fa0222e..923de627 100644 --- a/.github/workflows/bump_version.yml +++ b/.github/workflows/build_release.yml @@ -1,39 +1,30 @@ -name: 'Build Multi-Arch' +name: 'CI+CD' on: - push: - branches: - - 'master' - paths-ignore: - - '**.md' - - '**.yml' - - '**.yaml' + release: + types: [created] + workflow_dispatch: + inputs: + TAG_NAME: + description: 'Release Version Tag (0.0.0)' + required: true jobs: - bump-version: - name: 'Bump Version on master & Publish' + build: + name: 'Build & Publish' runs-on: ubuntu-latest steps: - name: 'Checkout source code' - uses: 'actions/checkout@v2' + uses: 'actions/checkout@v3' with: ref: ${{ github.ref }} - - name: 'cat package.json' - run: cat ./package.json - - name: 'Automated Version Bump' - id: version-bump - uses: 'phips28/gh-action-bump-version@master' - with: - tag-prefix: 'v' - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - - name: 'cat package.json' - run: cat ./package.json - - name: 'Output Step' - env: - NEW_TAG: ${{ steps.version-bump.outputs.newTag }} - run: echo "new tag $NEW_TAG" + - uses: MYXOMOPX/modify-pkg-json@1.0.1 + id: setcmnver + with: + target: ./package.json + action: "set_version" + argument: "${{ github.event.inputs.TAG_NAME || github.event.release.tag_name }}" - name: Check NPM secret presence id: checksecrets shell: bash @@ -45,15 +36,16 @@ jobs: fi env: SECRET: ${{ secrets.NPM_TOKEN }} - - uses: actions/setup-node@v1 + - uses: actions/setup-node@v4.0.0 if: ${{ steps.checksecrets.outputs.secretspresent }} with: - node-version: 14 + node-version: 18 - name: Publish to NPM if: ${{ steps.checksecrets.outputs.secretspresent }} run: | npm config set //registry.npmjs.org/:_authToken ${NPM_TOKEN} npm install + npm audit --fix npm publish --access public env: NPM_TOKEN: ${{ secrets.NPM_TOKEN }} @@ -84,18 +76,31 @@ jobs: with: username: ${{ secrets.DOCKERHUB_USERNAME }} password: ${{ secrets.DOCKERHUB_TOKEN }} + - name: Build and push to Docker Hub if: ${{ steps.checkdocker.outputs.secretspresent }} - id: docker_build uses: docker/build-push-action@v3.0.0 with: platforms: linux/amd64, linux/arm64 push: true tags: | qxip/qryn:latest - qxip/qryn:${{ steps.version-bump.outputs.newTag }} + qxip/qryn:${{ github.event.inputs.TAG_NAME || github.event.release.tag_name }} qxip/cloki:latest - qxip/cloki:${{ steps.version-bump.outputs.newTag }} + qxip/cloki:${{ github.event.inputs.TAG_NAME || github.event.release.tag_name }} + + - name: Build and push to Docker Hub (bun) + if: ${{ steps.checkdocker.outputs.secretspresent }} + uses: docker/build-push-action@v3.0.0 + with: + platforms: linux/amd64, linux/arm64 + file: ./Dockerfile_bun + push: true + tags: | + qxip/qryn:bun + qxip/qryn:${{ github.event.inputs.TAG_NAME || github.event.release.tag_name }}-bun + qxip/cloki:bun + qxip/cloki:${{ github.event.inputs.TAG_NAME || github.event.release.tag_name }}-bun - name: Log in to the GHCR registry uses: docker/login-action@v2.0.0 @@ -108,7 +113,18 @@ jobs: uses: docker/build-push-action@v3.0.0 with: platforms: linux/amd64, linux/arm64 + file: ./Dockerfile push: true tags: | ghcr.io/metrico/qryn:latest - ghcr.io/metrico/qryn:${{ steps.version-bump.outputs.newTag }} + ghcr.io/metrico/qryn:${{ github.event.inputs.TAG_NAME || github.event.release.tag_name }} + + - name: Build and push to GHCR (bun) + uses: docker/build-push-action@v3.0.0 + with: + platforms: linux/amd64, linux/arm64 + file: ./Dockerfile_bun + push: true + tags: | + ghcr.io/metrico/qryn:bun + ghcr.io/metrico/qryn:${{ github.event.inputs.TAG_NAME || github.event.release.tag_name }}-bun diff --git a/.github/workflows/bump_version_beta.yml b/.github/workflows/bump_version_beta.yml deleted file mode 100644 index 96cab544..00000000 --- a/.github/workflows/bump_version_beta.yml +++ /dev/null @@ -1,95 +0,0 @@ -name: 'Bump & Publish' - -on: - push: - branches: - - 'beta' - paths-ignore: - - '**.md' - - '**.yml' - - '**.yaml' - -jobs: - bump-version: - name: 'Bump Version on master & Publish' - runs-on: ubuntu-latest - - steps: - - name: 'Checkout source code' - uses: 'actions/checkout@v2' - with: - ref: ${{ github.ref }} - - name: 'cat package.json' - run: cat ./package.json - - name: 'Automated Version Bump' - id: version-bump - uses: 'phips28/gh-action-bump-version@master' - with: - tag-prefix: 'v' - tag-suffix: '-beta' - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - - name: 'cat package.json' - run: cat ./package.json - - name: 'Output Step' - env: - NEW_TAG: ${{ steps.version-bump.outputs.newTag }} - run: echo "new tag $NEW_TAG" - - name: Check NPM secret presence - id: checksecrets - shell: bash - run: | - if [ "$SECRET" == "" ]; then - echo ::set-output name=secretspresent::false - else - echo ::set-output name=secretspresent::true - fi - env: - SECRET: ${{ secrets.NPM_TOKEN }} - - uses: actions/setup-node@v1 - if: ${{ steps.checksecrets.outputs.secretspresent }} - with: - node-version: 14 - - name: Publish to NPM - if: ${{ steps.checksecrets.outputs.secretspresent }} - run: | - npm config set //registry.npmjs.org/:_authToken ${NPM_TOKEN} - npm install - npm publish --access public - env: - NPM_TOKEN: ${{ secrets.NPM_TOKEN }} - - - name: Check Docker secret presence - id: checkdocker - shell: bash - run: | - if [ "$SECRET" == "" ]; then - echo ::set-output name=secretspresent::false - else - echo ::set-output name=secretspresent::true - fi - env: - SECRET: ${{ secrets.DOCKERHUB_TOKEN }} - - name: Set up Docker QEMU - if: ${{ steps.checkdocker.outputs.secretspresent }} - uses: docker/setup-qemu-action@v1 - - name: Set up Docker Buildx - if: ${{ steps.checkdocker.outputs.secretspresent }} - uses: docker/setup-buildx-action@v1 - - name: Login to DockerHub - if: ${{ steps.checkdocker.outputs.secretspresent }} - uses: docker/login-action@v1 - with: - username: ${{ secrets.DOCKERHUB_USERNAME }} - password: ${{ secrets.DOCKERHUB_TOKEN }} - - name: Build and push - if: ${{ steps.checkdocker.outputs.secretspresent }} - id: docker_build - uses: docker/build-push-action@v2 - with: - push: true - tags: | - qxip/qryn:latest - qxip/qryn:${{ steps.version-bump.outputs.newTag }} - qxip/cloki:latest - qxip/cloki:${{ steps.version-bump.outputs.newTag }} diff --git a/.github/workflows/ghcr_push.yml b/.github/workflows/ghcr_push.yml deleted file mode 100644 index cfb1811b..00000000 --- a/.github/workflows/ghcr_push.yml +++ /dev/null @@ -1,41 +0,0 @@ -name: ghcr push - -on: - push: - branches: ['main', 'master', 'beta'] - paths-ignore: ['**/*.md', '**/*.yml'] - -env: - REGISTRY: ghcr.io - IMAGE_NAME: ${{ github.repository }} - -jobs: - build-and-push-image: - runs-on: ubuntu-latest - permissions: - contents: read - packages: write - - steps: - - name: Checkout repository - uses: actions/checkout@v3 - - - name: get-npm-version - id: package-version - uses: martinbeentjes/npm-get-version-action@main - - - name: Log in to the Container registry - uses: docker/login-action@v2.0.0 - with: - registry: ${{ env.REGISTRY }} - username: ${{ github.actor }} - password: ${{ secrets.GITHUB_TOKEN }} - - - name: Build and push - uses: docker/build-push-action@v3.0.0 - with: - context: . - push: true - tags: | - ghcr.io/metrico/qryn:latest - ghcr.io/metrico/qryn:${{ steps.package-version.outputs.current-version}} diff --git a/.github/workflows/node-clickhouse.js.yml b/.github/workflows/node-clickhouse.js.yml index 0fee32e5..9e5fdf41 100644 --- a/.github/workflows/node-clickhouse.js.yml +++ b/.github/workflows/node-clickhouse.js.yml @@ -23,7 +23,7 @@ jobs: strategy: matrix: - node-version: [18, 16.x] + node-version: [18, 16.x, 20] # See supported Node.js release schedule at https://nodejs.org/en/about/releases/ steps: @@ -43,4 +43,4 @@ jobs: CLICKHOUSE_TSDB: loki INTEGRATION_E2E: 1 CLOKI_EXT_URL: 127.0.0.1:3100 - run: node qryn.js >/dev/stdout & npm run test --forceExit + run: node qryn.mjs >/dev/stdout & npm run test --forceExit diff --git a/.github/workflows/npm-clickhouse.yml b/.github/workflows/npm-clickhouse.yml deleted file mode 100644 index d5e140c8..00000000 --- a/.github/workflows/npm-clickhouse.yml +++ /dev/null @@ -1,33 +0,0 @@ -# This workflow will validate qryn using nodejs + clickhouse + npm without the git repository sources - -name: QRYN NPM CI - -on: - push: - branches: [ master, beta ] - pull_request: - branches: [ master, beta ] - -jobs: - build: - - runs-on: ubuntu-latest - - strategy: - matrix: - node-version: [14.x] - # See supported Node.js release schedule at https://nodejs.org/en/about/releases/ - - steps: - - uses: EpicStep/clickhouse-github-action@v1.0.0 - - name: Use Node.js ${{ matrix.node-version }} - uses: actions/setup-node@v2 - with: - node-version: ${{ matrix.node-version }} - - run: npm install --unsafe-perm -g qryn - - env: - CLICKHOUSE_DB: loki - DEBUG: true - run: cd $(dirname $(readlink -f `which qryn`)) && (timeout 10s qryn || ( [[ $? -eq 124 ]] && exit 0 )) -# - run: npm install -g jest -# - run: cd $(dirname $(readlink -f `which qryn`)) && npm run test diff --git a/.gitignore b/.gitignore index f838c061..c2bb70cb 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,8 @@ node_modules /test/e2e/ /lib/influx/.idea/ /lib/influx/influx.iml +/wasm_parts/_vendor.zip +/wasm_parts/.idea/ +/wasm_parts/vendor/ +/wasm_parts/main.wasm +/wasm_parts/wasm_parts.iml diff --git a/Dockerfile_bun b/Dockerfile_bun new file mode 100644 index 00000000..827398a0 --- /dev/null +++ b/Dockerfile_bun @@ -0,0 +1,15 @@ +# qryn bun builder +FROM oven/bun:alpine + +# BUILD FORCE +ENV BUILD 20231027 +ENV PORT 3100 + +COPY . /app +WORKDIR /app +RUN rm -rf package-lock.json +RUN bun install + +# Expose Ports +EXPOSE 3100 +CMD [ "bun", "qryn.mjs" ] diff --git a/README.md b/README.md index c555f51d..7711d6b0 100644 --- a/README.md +++ b/README.md @@ -4,53 +4,59 @@ [![Build Status](https://github.com/metrico/qryn/actions/workflows/bump_version.yml/badge.svg)](https://github.com/metrico/qryn/actions/workflows/bump_version.yml) ![CodeQL](https://github.com/lmangani/cLoki/workflows/CodeQL/badge.svg) - - Matrix - - - -# [qryn.dev](https://qryn.dev) :cloud: [qryn.cloud](https://qryn.cloud) :heart: -> ... it's pronounced /ˈkwɪr..ɪŋ/ or just querying +made in Ukraine ![image](https://user-images.githubusercontent.com/1423657/232089970-c4536f16-5967-4051-85a5-8ad94fcde67c.png) +# [qryn 3.x](https://qryn.dev) :cloud: [qryn.cloud](https://qryn.cloud) -:rocket: **qryn** is a _drop-in Grafana compatible_ **polyglot observability** framework
-- **Logs, Metrics and Traces** living happily together. Drop-in compatible with multiple vendors formats. -- Native [LogQL/PromQL/TempoQL APIs](https://qryn.cloud) support for [querying](https://github.com/lmangani/qryn/wiki/LogQL-for-Beginners), [processing](https://github.com/lmangani/qryn/wiki/LogQL-Supported-Queries), [tracing](https://github.com/lmangani/qryn/wiki/Tempo-Tracing) and [alerting](https://github.com/lmangani/qryn/wiki/Ruler---Alerts) [^2] in [Grafana](http://docs.grafana.org/features/explore/) [^3] -- Search, filter and extract metrics from _logs, events, spans and traces_ using familiar languages. _SQL Optional_. -- Ingestion [APIs](https://qryn.metrico.in/#/support) transparently compatible with [Opentelemetry, Loki, Prometheus, InfluxDB, Elastic](https://qryn.dev) _and [more](https://github.com/metrico/otel-collector)_ -- Ready to use with popular Agents such as [Promtail, Grafana-Agent, Vector, Logstash, Telegraf](https://qryn.metrico.in/#/ingestion) _and more_ -- Built in [Explore UI](https://github.com/metrico/cloki-view) and [CLI](https://github.com/lmangani/vLogQL) for querying supported datasources -- Designed for edge _(js/bun/wasm)_ and core/backend deployments _(golang/rust)_. -- Total data control. Compatible with [ClickHouse](https://clickhouse.com/) or [InfluxDB IOx](https://influxdata.com) with S3 object storage. +:rocket: _polyglot, lighweight, multi-standard drop-in_ **observability** framework for _**Logs, Metrics and Traces**_
-:rocket: **qryn.cloud** is the _supercharged_ **qryn** version developed in _go_ with additional _functionality, speed and features!_
+> ... it's pronounced /ˈkwɪr..ɪŋ/ or just _querying_ + +- **Polyglot**: Use **LogQL, PromQL**, and **TempoQL** languages to query, process and alert _any data_ +- **Lightweight**: Powered by **ClickHouse** OLAP Engine + **Bun** the _fast, all-in-one_ JavaScript runtime +- **Voracious**: Ingestion compatible with **Opentelemetry, Loki, Prometheus, Influx, Datadog, Elastic** _+ more_ +- **Versatile**: Explore data with qryn's built-in **Explore UI** and **CLI** or _native_ **Grafana** compatibility +- **Secure**: Retain total control of data, using **ClickHouse** or **InfluxDB IOx** with **S3** object storage +- **Unmetered**: Unlimited FOSS deployments or **qryn.cloud** option with advanced features and performance +- **Indepentent**: Designed to be a stand-alone, all-in-one _Loki, Prometheus, Tempo_ drop-in alternative
+## 🚀 [Get Started](https://qryn.metrico.in/#/installation) + +* Setup & Deploy **qryn** _OSS_ using the [documentation](https://qryn.metrico.in/#/installation) and get help in our [Matrix room](https://matrix.to/#/#qryn:matrix.org) :octocat: +* No time? Use [qryn.cloud](https://qryn.cloud) and get polyglot in just minutes! Drop-in LGTM alternative ☁️ +
-## 🚀 [Get Started](https://qryn.metrico.in/#/installation) +
-:octocat: Get qryn OSS up and running on-prem in no time using the [Documentation](https://qryn.metrico.in/#/installation) or join our [Matrix Room](https://matrix.to/#/#qryn:matrix.org) +## Features -☁️ Create a free account on [qryn.cloud](https://qryn.cloud) and go straight to production at any scale with **polyglot confidence**. +💡 _**qryn** independently implements popular observability standards, protocols and query languages:_ +
+ +### 📚 OpenTelemetry + +⚡ **qryn** is officially integrated with [opentelemetry](https://github.com/metrico/otel-collector) supports _any log, trace or metric format_ + +Ingested data can be queried using any of the avialable qryn APIs _(LogQL, PromQL, TraceQL)_
-## Supported Features +### 📚 Loki + LogQL -### 📚 OpenTelemetry -qryn fully supports opentelemetry and comes with a powerful [otel-collector](https://github.com/metrico/otel-collector) distribution supporting _any log, trace or metric format_ and writing directly to ClickHouse _qryn tables_ ready to be consumed through any query API. +> Any Loki compatible client or application can be used with qryn out of the box + +⚡ **qryn** implements the [Loki API](https://github.com/lmangani/qryn/wiki/LogQL-Supported-Queries) for transparent compatibility with **[LogQL](https://grafana.com/docs/loki/latest/query/)** clients
-### 📚 LogQL -qryn implements a complete [LogQL API](https://github.com/lmangani/qryn/wiki/LogQL-Supported-Queries) to provide transparent compatibility with Loki clients
The Grafana Loki datasource can be used to natively browse and query _logs_ and display extracted _timeseries_
@@ -59,11 +65,16 @@ The Grafana Loki datasource can be used to natively browse and query _logs_ and :tada: _No plugins needed_ +
-### 📈 Prometheus -qryn implements a complete [Prometheus API](https://github.com/lmangani/qryn/wiki/LogQL-Supported-Queries) to provide transparent compatibility with Prometheus clients
-The Grafana Prometheus datasource can be used to natively browse and query _metrics_ and display extracted _timeseries_
+### 📈 Prometheus + PromQL + +> Any Prometheus compatible client or application can be used with qryn out of the box + +⚡ **qryn** implements the [Prometheus API](https://prometheus.io/docs/prometheus/latest/querying/api/) for transparent **[PromQL](https://prometheus.io/docs/prometheus/latest/querying/basics/)** compatibility using WASM 🏆
+ +The Grafana Prometheus datasource can be used to natively to query _metrics_ and display _timeseries_
@@ -71,11 +82,16 @@ The Grafana Prometheus datasource can be used to natively browse and query _metr :tada: _No plugins needed_ +
-### 🕛 Tempo -qryn implements the [Tempo API](https://github.com/lmangani/qryn/wiki/LogQL-Supported-Queries) to provide transparent compatibility with Tempo/OTLP clients.
-The Tempo datasource can be used to natively query _traces_ including _beta search_ and _service graphs_
+### 🕛 Tempo + TraceQL + +⚡ **qryn** implements the [Tempo API](https://github.com/lmangani/qryn/wiki/LogQL-Supported-Queries) for transparent compatibility with **[TraceQL](https://grafana.com/docs/tempo/latest/traceql/)** clients.
+ +> Any Tempo/Opentelemetry compatible client or application can be used with qryn out of the box + +The Tempo datasource can be used to natively query _traces_ including _**TraceQL**_ and supporting _service graphs_
@@ -85,8 +101,20 @@ The Tempo datasource can be used to natively query _traces_ including _beta sear
-### ↔️ Correlation -Data correlation made simple with dynamic **links** between _logs, metrics and traces_ +### 📚 Other Vendors + +**qryn** can ingest data using the [InfluxDB, DataDog, Elastic](https://qryn.metrico.in/#/support) and other vendors. + + +
+ +With **qryn** and **grafana** everything _just works_ right out of the box: + +- Native datasource support without any plugin or extension +- Advanced Correlation between Logs, Metrics and Traces +- Service Graphs and Service Status Panels, and all the cool features + +
@@ -94,7 +122,9 @@ Data correlation made simple with dynamic **links** between _logs, metrics and t
-### :eye: View +
+ +### :eye: Explore View No Grafana? No Problem. **qryn** ships with **view** - it's own lightweight data exploration tool @@ -121,9 +151,9 @@ Whether it's code, documentation or grammar, we ❤️ all contributions. Not su     [![Contributors for @metrico/qryn](https://contributors-img.web.app/image?repo=lmangani/cloki)](https://github.com/metrico/qryn/graphs/contributors) -[![Stargazers repo roster for @metrico/qryn](https://reporoster.com/stars/metrico/qryn)](https://github.com/metrico/qryn/stargazers) +[![Stargazers repo roster for @metrico/qryn](https://bytecrank.com/nastyox/reporoster/php/stargazersSVG.php?user=metrico&repo=qryn)](https://github.com/metrico/qryn/stargazers) -[![Forkers repo roster for @metrico/qryn](https://reporoster.com/forks/metrico/qryn)](https://github.com/metrico/qryn/network/members) +[![Forkers repo roster for @metrico/qryn](https://bytecrank.com/nastyox/reporoster/php/forkersSVG.php?user=metrico&repo=qryn)](https://github.com/metrico/qryn/network/members) #### License diff --git a/common.js b/common.js index acf354a2..5936707d 100644 --- a/common.js +++ b/common.js @@ -125,3 +125,13 @@ module.exports.isCustomSamplesOrderingRule = () => { module.exports.CORS = process.env.CORS_ALLOW_ORIGIN || '*' module.exports.clusterName = process.env.CLUSTER_NAME + +module.exports.readonly = process.env.READONLY || false + +module.exports.bun = () => { + try { + return Bun + } catch (err) { + return false + } +} diff --git a/docker/docker-compose-centos.yml b/docker/docker-compose-centos.yml index 7a142343..9beb607f 100644 --- a/docker/docker-compose-centos.yml +++ b/docker/docker-compose-centos.yml @@ -39,4 +39,4 @@ services: container_name: centos volumes: - ../:/opt/qryn - entrypoint: bash -c 'cd ~ ; cp -rf /opt/qryn . ; cd qryn; ls -la ; rm -rf node_modules ; npm install ; CLICKHOUSE_DB=loki CLICKHOUSE_TSDB=loki INTEGRATION_E2E=1 CLICKHOUSE_SERVER=clickhouse-seed node qryn.js' + entrypoint: bash -c 'cd ~ ; cp -rf /opt/qryn . ; cd qryn; ls -la ; rm -rf node_modules ; npm install ; CLICKHOUSE_DB=loki CLICKHOUSE_TSDB=loki INTEGRATION_E2E=1 CLICKHOUSE_SERVER=clickhouse-seed node qryn.mjs' diff --git a/lib/bun_wrapper.js b/lib/bun_wrapper.js new file mode 100644 index 00000000..6fe38a2c --- /dev/null +++ b/lib/bun_wrapper.js @@ -0,0 +1,168 @@ +const { Transform } = require('stream') +const log = require('./logger') +const { EventEmitter } = require('events') + +class BodyStream extends Transform { + _transform (chunk, encoding, callback) { + callback(null, chunk) + } + + once (event, listerer) { + const self = this + const _listener = (e) => { + listerer(e) + self.removeListener(event, _listener) + } + this.on(event, _listener) + } +} + +const wrapper = (handler, parsers) => { + /** + * @param ctx {Request} + */ + const res = async (ctx, server) => { + let response = '' + let status = 200 + let reqBody = '' + let headers = {} + log.info(`${ctx.url}`) + + const stream = new BodyStream() + setTimeout(async () => { + if (!ctx.body) { + stream.end() + return + } + for await (const chunk of ctx.body) { + stream.write(chunk) + } + stream.end() + }) + const req = { + headers: Object.fromEntries(ctx.headers.entries()), + raw: stream, + log: log, + params: ctx.params || {}, + query: {} + } + for (const [key, value] of (new URL(ctx.url)).searchParams) { + if (!(key in req.query)) { + req.query[key] = value + continue + } + req.query[key] = Array.isArray(req.query[key]) + ? [...req.query[key], value] + : [req.query[key], value] + } + const res = { + send: (msg) => { + response = msg + }, + code: (code) => { + status = code + return res + }, + header: (key, value) => { + headers[key] = value + return res + }, + headers: (hdrs) => { + headers = { ...headers, ...hdrs } + return res + } + } + + if (parsers) { + const contentType = (ctx.headers.get('Content-Type') || '') + let ok = false + for (const [type, parser] of Object.entries(parsers)) { + if (type !== '*' && contentType.indexOf(type) > -1) { + log.debug(`parsing ${type}`) + reqBody = await parser(req, stream) + ok = true + log.debug(`parsing ${type} ok`) + } + } + if (!ok && parsers['*']) { + log.debug('parsing *') + reqBody = await parsers['*'](req, stream) + ok = true + log.debug('parsing * ok') + } + if (!ok) { + throw new Error('undefined content type ' + contentType) + } + } + + req.body = reqBody || stream + + let result = handler(req, res) + if (result && result.then) { + result = await result + } + if (result && result.on) { + response = '' + result.on('data', (d) => { + response += d + }) + await new Promise((resolve, reject) => { + result.on('end', resolve) + result.on('error', reject) + result.on('close', resolve) + }) + result = null + } + if (result) { + response = result + } + if (response instanceof Object && typeof response !== 'string' && !Buffer.isBuffer(response)) { + response = JSON.stringify(response) + } + return new Response(response, { status: status, headers: headers }) + } + return res +} + +const wsWrapper = (handler) => { + /** + * @param ctx {Request} + */ + const res = { + open: async (ctx, server) => { + const req = { + headers: Object.fromEntries(ctx.data.ctx.headers.entries()), + log: log, + query: {} + } + for (const [key, value] of (new URL(ctx.data.ctx.url)).searchParams) { + if (!(key in req.query)) { + req.query[key] = value + continue + } + req.query[key] = Array.isArray(req.query[key]) + ? [...req.query[key], value] + : [req.query[key], value] + } + + ctx.closeEmitter = new EventEmitter() + ctx.closeEmitter.send = ctx.send.bind(ctx) + + const ws = { + socket: ctx.closeEmitter + } + + const result = handler(ws, { query: req.query }) + if (result && result.then) { + await result + } + }, + close: (ctx) => { ctx.closeEmitter.emit('close') } + } + return res +} + +module.exports = { + wrapper, + wsWrapper +} diff --git a/lib/db/clickhouse.js b/lib/db/clickhouse.js index 6e7ab8bf..a704fe5f 100644 --- a/lib/db/clickhouse.js +++ b/lib/db/clickhouse.js @@ -17,14 +17,6 @@ const dist = clusterName ? '_dist' : '' /* DB Helper */ const ClickHouse = require('@apla/clickhouse') -const clickhouseOptions = { - host: process.env.CLICKHOUSE_SERVER || 'localhost', - port: process.env.CLICKHOUSE_PORT || 8123, - auth: process.env.CLICKHOUSE_AUTH || 'default:', - protocol: process.env.CLICKHOUSE_PROTO ? process.env.CLICKHOUSE_PROTO + ':' : 'http:', - readonly: !!process.env.READONLY, - queryOptions: { database: process.env.CLICKHOUSE_DB || 'cloki' } -} const transpiler = require('../../parser/transpiler') const rotationLabels = process.env.LABELS_DAYS || 7 @@ -33,9 +25,9 @@ const axios = require('axios') const { samplesTableName, samplesReadTableName } = UTILS const path = require('path') const { Transform } = require('stream') -const { CORS } = require('../../common') - -const protocol = process.env.CLICKHOUSE_PROTO || 'http' +const { CORS, bun } = require('../../common') +const clickhouseOptions = require('./clickhouse_options').databaseOptions +const { getClickhouseUrl } = require('./clickhouse_options') // External Storage Policy for Tables (S3, MINIO) const storagePolicy = process.env.STORAGE_POLICY || false @@ -76,7 +68,8 @@ const conveyor = { let throttler = null const resolvers = {} const rejectors = {} -if (isMainThread) { +let first = false +if (isMainThread && !bun()) { throttler = new Worker(path.join(__dirname, 'throttler.js')) throttler.on('message', (msg) => { switch (msg.status) { @@ -90,8 +83,29 @@ if (isMainThread) { delete resolvers[msg.id] delete rejectors[msg.id] }) +} else if (isMainThread && !first) { + first = true + const _throttler = require('./throttler') + throttler = { + on: _throttler.on, + postMessage: _throttler.postMessage, + removeAllListeners: _throttler.removeAllListeners, + terminate: _throttler.terminate + } + _throttler.init() + throttler.on('message', (msg) => { + switch (msg.status) { + case 'ok': + resolvers[msg.id]() + break + case 'err': + rejectors[msg.id](new Error('Database push error')) + break + } + delete resolvers[msg.id] + delete rejectors[msg.id] + }) } - // timeSeriesv2Throttler.start(); /* Cache Helper */ @@ -348,10 +362,6 @@ function pushOTLP (traces) { }) } -function getClickhouseUrl () { - return `${protocol}://${clickhouseOptions.auth}@${clickhouseOptions.host}:${clickhouseOptions.port}` -} - /** * @param query {{ * query: string, @@ -455,6 +465,7 @@ const queryTempoScanV2 = async function (query) { } const limit = query.limit ? `LIMIT ${parseInt(query.limit)}` : '' const sql = `${select} ${from} WHERE ${where.join(' AND ')} ORDER BY timestamp_ns DESC ${limit} FORMAT JSON` + console.log(sql) const resp = await rawRequest(sql, null, process.env.CLICKHOUSE_DB || 'cloki') return resp.data.data ? resp.data.data : JSON.parse(resp.data).data } @@ -815,7 +826,13 @@ const outputTempoSearch = async (dataStream, res) => { */ const preprocessStream = (rawStream, processors) => { let dStream = StringStream.from(rawStream.data).lines().endWith(JSON.stringify({ EOF: true })) - .map(chunk => chunk ? JSON.parse(chunk) : ({}), DataStream) + .map(chunk => { + try { + return chunk ? JSON.parse(chunk) : ({}) + } catch (e) { + return {} + } + }, DataStream) .map(chunk => { try { if (!chunk || !chunk.labels) { @@ -1333,15 +1350,16 @@ const samplesReadTable = { * @param query {string} * @param data {string | Buffer | Uint8Array} * @param database {string} + * @param config {Object?} * @returns {Promise>} */ -const rawRequest = (query, data, database) => { +const rawRequest = (query, data, database, config) => { const getParams = [ (database ? `database=${encodeURIComponent(database)}` : null), (data ? `query=${encodeURIComponent(query)}` : null) ].filter(p => p) const url = `${getClickhouseUrl()}/${getParams.length ? `?${getParams.join('&')}` : ''}` - return axios.post(url, data || query) + return axios.post(url, data || query, config) } /** diff --git a/lib/db/clickhouse_options.js b/lib/db/clickhouse_options.js new file mode 100644 index 00000000..2db4510b --- /dev/null +++ b/lib/db/clickhouse_options.js @@ -0,0 +1,22 @@ +const UTILS = require('../utils') +const { samplesTableName, samplesReadTableName } = UTILS + +const clickhouseOptions = { + host: process.env.CLICKHOUSE_SERVER || 'localhost', + port: process.env.CLICKHOUSE_PORT || 8123, + auth: process.env.CLICKHOUSE_AUTH || 'default:', + protocol: process.env.CLICKHOUSE_PROTO ? process.env.CLICKHOUSE_PROTO + ':' : 'http:', + readonly: !!process.env.READONLY, + queryOptions: { database: process.env.CLICKHOUSE_DB || 'cloki' } +} + +function getClickhouseUrl () { + return `${clickhouseOptions.protocol}//${clickhouseOptions.auth}@${clickhouseOptions.host}:${clickhouseOptions.port}` +} + +module.exports = { + samplesTableName, + samplesReadTableName, + getClickhouseUrl, + databaseOptions: clickhouseOptions +} diff --git a/lib/db/throttler.js b/lib/db/throttler.js index 21a3250c..8d84495e 100644 --- a/lib/db/throttler.js +++ b/lib/db/throttler.js @@ -1,10 +1,11 @@ const { isMainThread, parentPort } = require('worker_threads') const axios = require('axios') -const { getClickhouseUrl, samplesTableName } = require('./clickhouse') -const clickhouseOptions = require('./clickhouse').databaseOptions +const { getClickhouseUrl, samplesTableName } = require('./clickhouse_options') +const clickhouseOptions = require('./clickhouse_options').databaseOptions const logger = require('../logger') const clusterName = require('../../common').clusterName const dist = clusterName ? '_dist' : '' +const { EventEmitter } = require('events') const axiosError = async (err) => { try { @@ -71,14 +72,45 @@ const tracesThottler = new TimeoutThrottler( (trace_id, span_id, parent_id, name, timestamp_ns, duration_ns, service_name, payload_type, payload, tags) FORMAT JSONEachRow`) -if (isMainThread) { - module.exports = { - samplesThrottler, - timeSeriesThrottler, - TimeoutThrottler +const emitter = new EventEmitter() +let on = true +const postMessage = message => { + const genericRequest = (throttler) => { + throttler.queue.push(message.data) + throttler.resolvers.push(() => { + if (isMainThread) { + emitter.emit('message', { status: 'ok', id: message.id }) + return + } + parentPort.postMessage({ status: 'ok', id: message.id }) + }) + throttler.rejects.push(() => { + if (isMainThread) { + emitter.emit('message', { status: 'err', id: message.id }) + return + } + parentPort.postMessage({ status: 'err', id: message.id }) + }) } -} else { - let on = true + switch (message.type) { + case 'end': + on = false + if (!isMainThread) { + parentPort.removeAllListeners('message') + } + break + case 'values': + genericRequest(samplesThrottler) + break + case 'labels': + genericRequest(timeSeriesThrottler) + break + case 'traces': + genericRequest(tracesThottler) + } +} + +const init = () => { setTimeout(async () => { // eslint-disable-next-line no-unmodified-loop-condition while (on) { @@ -96,29 +128,25 @@ if (isMainThread) { } } }, 0) - parentPort.on('message', message => { - const genericRequest = (throttler) => { - throttler.queue.push(message.data) - throttler.resolvers.push(() => { - parentPort.postMessage({ status: 'ok', id: message.id }) - }) - throttler.rejects.push(() => { - parentPort.postMessage({ status: 'err', id: message.id }) - }) - } - switch (message.type) { - case 'end': - on = false - parentPort.removeAllListeners('message') - break - case 'values': - genericRequest(samplesThrottler) - break - case 'labels': - genericRequest(timeSeriesThrottler) - break - case 'traces': - genericRequest(tracesThottler) +} + +if (isMainThread) { + module.exports = { + samplesThrottler, + timeSeriesThrottler, + tracesThottler, + TimeoutThrottler, + postMessage, + on: emitter.on.bind(emitter), + removeAllListeners: emitter.removeAllListeners.bind(emitter), + init, + terminate: () => { + postMessage({ type: 'end' }) } + } +} else { + init() + parentPort.on('message', message => { + postMessage(message) }) } diff --git a/lib/db/zipkin.js b/lib/db/zipkin.js index 2837c727..3920dcfc 100644 --- a/lib/db/zipkin.js +++ b/lib/db/zipkin.js @@ -26,7 +26,13 @@ module.exports = class { * @returns {string} */ toJson () { - return JSON.stringify(this, (k, val) => typeof val === 'bigint' ? val.toString() : val) + const res = { + ...this, + timestamp_ns: this.timestamp_ns.toString(), + duration_ns: this.duration_ns.toString() + } + return JSON.stringify(res) + //return JSON.stringify(this, (k, val) => typeof val === 'bigint' ? val.toString() : val) } /** diff --git a/lib/handlers/404.js b/lib/handlers/404.js index bf1cb337..daba3363 100644 --- a/lib/handlers/404.js +++ b/lib/handlers/404.js @@ -1,6 +1,6 @@ function handler (req, res) { req.log.debug('unsupported', req.url) - return res.send('404 Not Supported') + return res.code(404).send('404 Not Supported') } module.exports = handler diff --git a/lib/handlers/datadog_log_push.js b/lib/handlers/datadog_log_push.js index cbb89883..a1cd8677 100644 --- a/lib/handlers/datadog_log_push.js +++ b/lib/handlers/datadog_log_push.js @@ -18,6 +18,11 @@ */ const stringify = require('../utils').stringify +const DATABASE = require('../db/clickhouse') +const { bulk_labels, bulk, labels } = DATABASE.cache +const { fingerPrint } = require('../utils') +const { readonly } = require('../../common') + const tagsToObject = (data, delimiter = ',') => Object.fromEntries(data.split(',').map(v => { const fields = v.split(':') @@ -25,13 +30,12 @@ const tagsToObject = (data, delimiter = ',') => })) async function handler (req, res) { - const self = this req.log.debug('Datadog Log Index Request') if (!req.body) { req.log.error('No Request Body or Target!') return res.code(400).send('{"status":400, "error": { "reason": "No Request Body" } }') } - if (this.readonly) { + if (readonly) { req.log.error('Readonly! No push support.') return res.code(400).send('{"status":400, "error": { "reason": "Read Only Mode" } }') } @@ -69,9 +73,9 @@ async function handler (req, res) { } // Calculate Fingerprint const strJson = stringify(JSONLabels) - finger = self.fingerPrint(strJson) + finger = fingerPrint(strJson) // Store Fingerprint - promises.push(self.bulk_labels.add([[ + promises.push(bulk_labels.add([[ new Date().toISOString().split('T')[0], finger, strJson, @@ -79,8 +83,8 @@ async function handler (req, res) { ]])) for (const key in JSONLabels) { req.log.debug({ key, data: JSONLabels[key] }, 'Storing label') - self.labels.add('_LABELS_', key) - self.labels.add(key, JSONLabels[key]) + labels.add('_LABELS_', key) + labels.add(key, JSONLabels[key]) } } catch (err) { req.log.error({ err }, 'failed ingesting datadog log') @@ -94,7 +98,7 @@ async function handler (req, res) { stream.message ] req.log.debug({ finger, values }, 'store') - promises.push(self.bulk.add([values])) + promises.push(bulk.add([values])) }) } await Promise.all(promises) diff --git a/lib/handlers/datadog_series_push.js b/lib/handlers/datadog_series_push.js index f7f92420..58cf1863 100644 --- a/lib/handlers/datadog_series_push.js +++ b/lib/handlers/datadog_series_push.js @@ -25,16 +25,19 @@ */ const stringify = require('../utils').stringify +const DATABASE = require('../db/clickhouse') +const { bulk_labels, bulk, labels } = DATABASE.cache +const { fingerPrint } = require('../utils') +const { readonly } = require('../../common') async function handler (req, res) { - const self = this req.log.debug('Datadog Series Index Request') if (!req.body) { req.log.error('No Request Body!') res.code(500).send() return } - if (this.readonly) { + if (readonly) { req.log.error('Readonly! No push support.') res.code(500).send() return @@ -63,18 +66,18 @@ async function handler (req, res) { } // Calculate Fingerprint const strJson = stringify(JSONLabels) - finger = self.fingerPrint(strJson) - self.labels.add(finger.toString(), stream.labels) + finger = fingerPrint(strJson) + labels.add(finger.toString(), stream.labels) // Store Fingerprint - promises.push(self.bulk_labels.add([[ + promises.push(bulk_labels.add([[ new Date().toISOString().split('T')[0], finger, strJson, JSONLabels.__name__ || 'undefined' ]])) for (const key in JSONLabels) { - self.labels.add('_LABELS_', key) - self.labels.add(key, JSONLabels[key]) + labels.add('_LABELS_', key) + labels.add(key, JSONLabels[key]) } } catch (err) { req.log.error({ err }) @@ -97,7 +100,7 @@ async function handler (req, res) { entry.value, JSONLabels.__name__ || 'undefined' ] - promises.push(self.bulk.add([values])) + promises.push(bulk.add([values])) }) } }) diff --git a/lib/handlers/elastic_bulk.js b/lib/handlers/elastic_bulk.js index afa3a418..f7668539 100644 --- a/lib/handlers/elastic_bulk.js +++ b/lib/handlers/elastic_bulk.js @@ -8,15 +8,18 @@ const { asyncLogError } = require('../../common') const stringify = require('../utils').stringify +const DATABASE = require('../db/clickhouse') +const { bulk_labels, bulk, labels } = DATABASE.cache +const { fingerPrint } = require('../utils') +const { readonly } = require('../../common') async function handler (req, res) { - const self = this req.log.debug('ELASTIC Bulk Request') if (!req.body) { asyncLogError('No Request Body or Target!' + req.body, req.log) return res.code(400).send('{"status":400, "error": { "reason": "No Request Body" } }') } - if (this.readonly) { + if (readonly) { asyncLogError('Readonly! No push support.', req.log) return res.code(400).send('{"status":400, "error": { "reason": "Read Only Mode" } }') } @@ -38,6 +41,9 @@ async function handler (req, res) { const promises = [] if (streams) { streams.forEach(function (stream) { + if (!stream) { + return + } try { stream = JSON.parse(stream) } catch (err) { asyncLogError(err, req.log); return }; @@ -67,10 +73,10 @@ async function handler (req, res) { } // Calculate Fingerprint const strJson = stringify(JSONLabels) - finger = self.fingerPrint(strJson) + finger = fingerPrint(strJson) req.log.debug({ JSONLabels, finger }, 'LABELS FINGERPRINT') // Store Fingerprint - promises.push(self.bulk_labels.add([[ + promises.push(bulk_labels.add([[ new Date().toISOString().split('T')[0], finger, strJson, @@ -78,8 +84,8 @@ async function handler (req, res) { ]])) for (const key in JSONLabels) { req.log.debug({ key, data: JSONLabels[key] }, 'Storing label') - self.labels.add('_LABELS_', key) - self.labels.add(key, JSONLabels[key]) + labels.add('_LABELS_', key) + labels.add(key, JSONLabels[key]) } } catch (err) { asyncLogError(err, req.log) @@ -93,7 +99,7 @@ async function handler (req, res) { JSON.stringify(stream) || stream ] req.log.debug({ finger, values }, 'store') - promises.push(self.bulk.add([values])) + promises.push(bulk.add([values])) // Reset State, Expect Command lastTags = false diff --git a/lib/handlers/elastic_index.js b/lib/handlers/elastic_index.js index 19528092..ee314c45 100644 --- a/lib/handlers/elastic_index.js +++ b/lib/handlers/elastic_index.js @@ -11,15 +11,19 @@ const { asyncLogError } = require('../../common') const stringify = require('../utils').stringify +const DATABASE = require('../db/clickhouse') +const { bulk_labels, bulk, labels } = DATABASE.cache +const { fingerPrint } = require('../utils') +const { readonly } = require('../../common') + async function handler (req, res) { - const self = this req.log.debug('ELASTIC Index Request') if (!req.body || !req.params.target) { asyncLogError('No Request Body or Target!', req.log) return res.code(400).send('{"status":400, "error": { "reason": "No Request Body" } }') } - if (this.readonly) { + if (readonly) { asyncLogError('Readonly! No push support.', req.log) return res.code(400).send('{"status":400, "error": { "reason": "Read Only Mode" } }') } @@ -57,9 +61,9 @@ async function handler (req, res) { } // Calculate Fingerprint const strJson = stringify(JSONLabels) - finger = self.fingerPrint(strJson) + finger = fingerPrint(strJson) // Store Fingerprint - promises.push(self.bulk_labels.add([[ + promises.push(bulk_labels.add([[ new Date().toISOString().split('T')[0], finger, strJson, @@ -67,8 +71,8 @@ async function handler (req, res) { ]])) for (const key in JSONLabels) { req.log.debug({ key, data: JSONLabels[key] }, 'Storing label') - self.labels.add('_LABELS_', key) - self.labels.add(key, JSONLabels[key]) + labels.add('_LABELS_', key) + labels.add(key, JSONLabels[key]) } } catch (err) { asyncLogError(err, req.log) @@ -87,7 +91,7 @@ async function handler (req, res) { JSON.stringify(stream) || stream ] req.log.debug({ finger, values }, 'store') - promises.push(self.bulk.add([values])) + promises.push(bulk.add([values])) }) } await Promise.all(promises) diff --git a/lib/handlers/influx_write.js b/lib/handlers/influx_write.js index 5563f8a9..42a93103 100644 --- a/lib/handlers/influx_write.js +++ b/lib/handlers/influx_write.js @@ -39,14 +39,17 @@ const stringify = require('../utils').stringify const influxParser = require('../influx') const { asyncLogError, errors } = require('../../common') +const DATABASE = require('../db/clickhouse') +const { bulk_labels, bulk, labels } = DATABASE.cache +const { fingerPrint } = require('../utils') +const { readonly } = require('../../common') async function handler (req, res) { - const self = this if (!req.body && !req.body.metrics) { asyncLogError('No Request Body!', req.log) return } - if (self.readonly) { + if (readonly) { asyncLogError('Readonly! No push support.', req.log) return res.code(500).send('') } @@ -75,10 +78,10 @@ async function handler (req, res) { } // Calculate Fingerprint const strLabels = stringify(Object.fromEntries(Object.entries(JSONLabels).sort())) - finger = self.fingerPrint(strLabels) - self.labels.add(finger.toString(), stream.labels) + finger = fingerPrint(strLabels) + labels.add(finger.toString(), stream.labels) // Store Fingerprint - self.bulk_labels.add([[ + bulk_labels.add([[ new Date().toISOString().split('T')[0], finger, strLabels, @@ -86,8 +89,8 @@ async function handler (req, res) { ]]) for (const key in JSONLabels) { // req.log.debug({ key, data: JSONLabels[key] }, 'Storing label'); - self.labels.add('_LABELS_', key) - self.labels.add(key, JSONLabels[key]) + labels.add('_LABELS_', key) + labels.add(key, JSONLabels[key]) } } catch (err) { asyncLogError(err, req.log) @@ -111,7 +114,7 @@ async function handler (req, res) { value || 0, key || '' ] - self.bulk.add([values]) + bulk.add([values]) } /* logs or syslog */ } else if (stream.measurement === 'syslog' || JSONFields.message) { @@ -123,7 +126,7 @@ async function handler (req, res) { null, JSONFields.message ] - self.bulk.add([values]) + bulk.add([values]) } }) } diff --git a/lib/handlers/newrelic_log_push.js b/lib/handlers/newrelic_log_push.js index c4b6fb6a..dda46c96 100644 --- a/lib/handlers/newrelic_log_push.js +++ b/lib/handlers/newrelic_log_push.js @@ -31,15 +31,18 @@ const { QrynBadRequest } = require('./errors') const stringify = require('../utils').stringify +const DATABASE = require('../db/clickhouse') +const { bulk_labels, bulk, labels } = DATABASE.cache +const { fingerPrint } = require('../utils') +const { readonly } = require('../../common') async function handler (req, res) { - const self = this req.log.debug('NewRelic Log Index Request') if (!req.body) { req.log.error('No Request Body') throw new QrynBadRequest('No request body') } - if (this.readonly) { + if (readonly) { req.log.error('Readonly! No push support.') throw new QrynBadRequest('Read only mode') } @@ -77,12 +80,12 @@ async function handler (req, res) { // Calculate Fingerprint const strJson = stringify(JSONLabels) - finger = self.fingerPrint(strJson) + finger = fingerPrint(strJson) // Store Fingerprint for (const key in JSONLabels) { req.log.debug({ key, data: JSONLabels[key] }, 'Storing label') - self.labels.add('_LABELS_', key) - self.labels.add(key, JSONLabels[key]) + labels.add('_LABELS_', key) + labels.add(key, JSONLabels[key]) } const dates = {} @@ -99,11 +102,11 @@ async function handler (req, res) { null, log.message ] - promises.push(self.bulk.add([values])) + promises.push(bulk.add([values])) }) } for (const d of Object.keys(dates)) { - promises.push(self.bulk_labels.add([[ + promises.push(bulk_labels.add([[ d, finger, strJson, diff --git a/lib/handlers/otlp_push.js b/lib/handlers/otlp_push.js index 1c93d30d..73a62d1e 100644 --- a/lib/handlers/otlp_push.js +++ b/lib/handlers/otlp_push.js @@ -17,16 +17,9 @@ }] */ -const { Transform } = require('stream') const { asyncLogError } = require('../../common') - -function handleOne (req, streams, promises) { - const self = this - streams.on('data', function (stream) { - stream = stream.value - promises.push(self.pushZipkin([stream])) - }) -} +const { pushOTLP } = require('../db/clickhouse') +const { readonly } = require('../../common') async function handler (req, res) { req.log.debug('POST /tempo/api/push') @@ -34,7 +27,7 @@ async function handler (req, res) { asyncLogError('No Request Body!', req.log) return res.code(500).send() } - if (this.readonly) { + if (readonly) { asyncLogError('Readonly! No push support.', req.log) return res.code(500).send() } @@ -53,7 +46,7 @@ async function handler (req, res) { spans.push.apply(spans, scope.spans) } } - await this.pushOTLP(spans) + await pushOTLP(spans) return res.code(200).send('OK') } diff --git a/lib/handlers/prom_push.js b/lib/handlers/prom_push.js index 9fcf36ae..b841b2fe 100644 --- a/lib/handlers/prom_push.js +++ b/lib/handlers/prom_push.js @@ -13,6 +13,10 @@ */ const { asyncLogError } = require('../../common') const stringify = require('../utils').stringify +const DATABASE = require('../db/clickhouse') +const { bulk_labels, bulk, labels } = DATABASE.cache +const { fingerPrint } = require('../utils') +const { readonly } = require('../../common') async function handler (req, res) { const self = this @@ -21,7 +25,7 @@ async function handler (req, res) { asyncLogError('No Request Body!', req.log) return res.code(500).send() } - if (this.readonly) { + if (readonly) { asyncLogError('Readonly! No push support.', req.log) return res.code(500).send() } @@ -41,14 +45,12 @@ async function handler (req, res) { }, {}) // Calculate Fingerprint const strJson = stringify(JSONLabels) - finger = self.fingerPrint(strJson) - req.log.debug({ labels: stream.labels, finger }, 'LABELS FINGERPRINT') - self.labels.add(finger.toString(), stream.labels) + finger = fingerPrint(strJson) + labels.add(finger.toString(), stream.labels) const dates = {} if (stream.samples) { stream.samples.forEach(function (entry) { - req.log.debug({ entry, finger }, 'BULK ROW') if ( !entry && !entry.timestamp && @@ -67,20 +69,20 @@ async function handler (req, res) { dates[ new Date(parseInt((ts / BigInt('1000000')).toString())).toISOString().split('T')[0] ] = 1 - promises.push(self.bulk.add([values])) + promises.push(bulk.add([values])) }) } for (const d of Object.keys(dates)) { // Store Fingerprint - promises.push(self.bulk_labels.add([[ + promises.push(bulk_labels.add([[ d, finger, strJson, JSONLabels.__name__ || 'undefined' ]])) for (const key in JSONLabels) { - self.labels.add('_LABELS_', key) - self.labels.add(key, JSONLabels[key]) + labels.add('_LABELS_', key) + labels.add(key, JSONLabels[key]) } } } catch (err) { diff --git a/lib/handlers/prom_query.js b/lib/handlers/prom_query.js index 91a1ec70..59935bef 100644 --- a/lib/handlers/prom_query.js +++ b/lib/handlers/prom_query.js @@ -1,12 +1,11 @@ /* Emulated PromQL Query Handler */ -const { p2l } = require('@qxip/promql2logql'); const { asyncLogError, CORS } = require('../../common') +const { instantQuery } = require('../../promql') const empty = '{"status" : "success", "data" : {"resultType" : "scalar", "result" : []}}'; // to be removed const test = () => `{"status" : "success", "data" : {"resultType" : "scalar", "result" : [${Math.floor(Date.now() / 1000)}, "2"]}}`; // to be removed const exec = (val) => `{"status" : "success", "data" : {"resultType" : "scalar", "result" : [${Math.floor(Date.now() / 1000)}, val]}}`; // to be removed - async function handler (req, res) { req.log.debug('GET /loki/api/v1/query') const resp = { @@ -24,34 +23,23 @@ async function handler (req, res) { } if (req.query.query === '1+1') { return res.status(200).send(test()) - } - else if (!isNaN(parseInt(req.query.query))) { + } else if (!isNaN(parseInt(req.query.query))) { return res.status(200).send(exec(parseInt(req.query.query))) } /* remove newlines */ req.query.query = req.query.query.replace(/\n/g, ' ') + req.query.time = req.query.time ? parseInt(req.query.time) * 1000 : Date.now() /* transpile to logql */ try { - req.query.query = p2l(req.query.query) - } catch(e) { - asyncLogError({ e }, req.log) - return res.send(empty) - } - /* scan fingerprints */ - /* TODO: handle time tag + direction + limit to govern the query output */ - try { - const response = await this.instantQueryScan( - req.query - ) - res.code(200) - res.headers({ - 'Content-Type': 'application/json', - 'Access-Control-Allow-Origin': CORS - }) - return response + const response = await instantQuery(req.query.query, req.query.time) + return res.code(200) + .headers({ + 'Content-Type': 'application/json', + 'Access-Control-Allow-Origin': CORS + }).send(response) } catch (err) { asyncLogError(err, req.log) - return res.send(empty) + return res.code(500).send(JSON.stringify({ status: 'error', error: err.message })) } } diff --git a/lib/handlers/prom_query_range.js b/lib/handlers/prom_query_range.js index 74cd3460..df37c1ab 100644 --- a/lib/handlers/prom_query_range.js +++ b/lib/handlers/prom_query_range.js @@ -9,48 +9,19 @@ regexp: a regex to filter the returned results, will eventually be rolled into the query language */ -const { p2l } = require('@qxip/promql2logql') -const { asyncLogError, CORS } = require('../../common') +const { rangeQuery } = require('../../promql/index') async function handler (req, res) { req.log.debug('GET /api/v1/query_range') - const resp = { - status: "success", - data: { - resultType: "vector", - result: [] - } - } - if (req.method === 'POST') { - req.query = req.body - } - if (!req.query.query) { - return res.send(resp) - } - /* remove newlines */ - req.query.query = req.query.query.replace(/\n/g, ' ') - if (!req.query.query) { - return res.code(400).send('invalid query') - } - // Convert PromQL to LogQL and execute + const startMs = parseInt(req.query.start) * 1000 || Date.now() - 60000 + const endMs = parseInt(req.query.end) * 1000 || Date.now() + const stepMs = parseInt(req.query.step) * 1000 || 15000 + const query = req.query.query try { - req.query.query = p2l(req.query.query) - const response = await this.scanFingerprints( - { - ...req.query, - start: parseInt(req.query.start) * 1e9, - end: parseInt(req.query.end) * 1e9 - } - ) - res.code(200) - res.headers({ - 'Content-Type': 'application/json', - 'Access-Control-Allow-Origin': CORS - }) - return response + const result = await rangeQuery(query, startMs, endMs, stepMs) + return res.code(200).send(result) } catch (err) { - asyncLogError(err, req.log) - return res.send(resp) + return res.code(500).send(JSON.stringify({ status: 'error', error: err.message })) } } diff --git a/lib/handlers/prom_series.js b/lib/handlers/prom_series.js index 50f2c995..d6862b7d 100644 --- a/lib/handlers/prom_series.js +++ b/lib/handlers/prom_series.js @@ -1,21 +1,8 @@ const { scanSeries } = require('../db/clickhouse') const { CORS } = require('../../common') -const { Compiler } = require('bnf') const { isArray } = require('handlebars-helpers/lib/array') const { QrynError } = require('./errors') - -const promqlSeriesBnf = ` - ::= | "{" "}" | "{" [] "}" -label ::= ( | "_") *( | "." | "_" | ) -operator ::= "=~" | "!~" | "!=" | "=" -quoted_str ::= () | () | | -metric_name ::= label -label_selector ::=