FluxPipe is an experimental stand-alone Flux API for serverless workers and embedded datasources
Execute your Flux scripts locally, in serverless functions or anywhere else - decoupled from the data and database.
Fluxpipe runs at 141,6Km/h* and is compatible with InfluxDB 3.0 / IOx, ClickHouse, Grafana and beyond!
InfluxDB Flux is a lightweight scripting language for querying databases and working with data. 1
Need a Flux introduction? Check out the official page, documentation or "3 Minutes to Flux" guide.
Try our serverless demo or launch your own instance to instantly fall in love with flux
![](https://user-images.githubusercontent.com/1423657/236479471-a1cb0484-dfd3-4dc2-8d62-121debd7faa3.png)
Download a binary release, docker or build from source
curl -fsSL github.com/metrico/fluxpipe/releases/latest/download/fluxpipe-server -O \
&& chmod +x fluxpipe-server
./fluxpipe-server -port 8086
Run with -h
for a full list of parameters
docker pull ghcr.io/metrico/fluxpipe:latest
docker run -ti --rm -p 8086:8086 ghcr.io/metrico/fluxpipe:latest
💡 Check out the scripts folder for working examples
Fluxpipe embeds a playground interface to instantly execute queries (borrowed from ClickHouse 2)
Fluxpipe serves a simple REST API loosely compatible with existing flux integrations and clients
Grafana Flux 1
Fluxpipe is compatible with the native Grafana InfluxDB/Flux datasource (url + organization fields are required!)
You can query InfluxDB 3.0 IOx with raw SQL using the native sql.from
handler
import "sql"
sql.from(
driverName: "influxdb-iox",
dataSourceName: "iox://iox-server:443/qryn_logs",
query: "SELECT level, sender, body FROM logs WHERE body LIKE '%DELETE%' limit 10",
)
You can query InfluxDB 3.0 IOx with Flux using the iox.from
handler
import "contrib/qxip/iox"
iox.from(
bucket: "test",
host: "eu-central-1-1.aws.cloud2.influxdata.com:443",
token: "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX",
limit: "10",
columns: "time, level, sender",
table: "logs",
start: -100d,
)
_time:time level:string sender:string
------------------------------ ---------------------- ----------------------
2023-08-31T00:00:00.091362490Z info logtest
2023-08-31T00:00:00.091380034Z info logtest
2023-08-31T00:00:00.091381374Z info logtest
2023-08-31T00:00:00.091382470Z info logtest
2023-08-31T00:00:00.091383354Z info logtest
...
You write data back to InfluxDB 3.0 IOx using the to
and wideTo
functions
import "contrib/qxip/iox"
import "influxdata/influxdb"
iox.from(
bucket: "qxip",
host: "eu-central-1-1.aws.cloud2.influxdata.com:443",
token: "",
limit: "10",
table: "machine_data",
start: -2d,
)
|> range(start: -2d)
|> aggregateWindow(every: 5s, fn: mean, column: "load", createEmpty: false)
|> set(key: "_measurement", value: "downsampled")
|> wideTo(
bucket: "qxip",
host: "https://eu-central-1-1.aws.cloud2.influxdata.com",
token: "",
orgID: "6a841c0c08328fb1"
)
import "contrib/qxip/clickhouse"
clickhouse.query(
url: "https://[email protected]",
query: "SELECT database, total_rows FROM tables WHERE total_rows > 0"
)
|> rename(columns: {database: "_value", total_rows: "_data"})
|> keep(columns: ["_value","_data"])
import "contrib/qxip/logql"
option logql.defaultURL = "http://qryn:3100"
logql.query_range(
query: "rate({job=\"dummy-server\"}[5m])",
start: v.timeRangeStart,
end: v.timeRangeStop
)
|> map(fn: (r) => ({r with _time: time(v: uint(v: r.timestamp_ns)), _value: float(v: r.value) }))
|> drop(columns: ["timestamp_ns", "value"])
|> sort(columns: ["_time"])
|> group(columns: ["labels"])
Usage with curl
curl -XPOST localhost:8086/api/v2/query -sS \
-H 'Accept:application/csv' \
-H 'Content-type:application/vnd.flux' \
-d 'import g "generate" g.from(start: 2022-04-01T00:00:00Z, stop: 2022-04-01T00:03:00Z, count: 3, fn: (n) => n)'
Flux builds using EnvironmentSecretService
accessing system environment variables from flux scripts.
import "influxdata/influxdb/secrets"
key = secrets.get(key: "ENV_SECRET")
Fluxpipe can be used as a command-line tool and stdin pipeline processor
echo 'import g "generate" g.from(start: 2022-04-01T00:00:00Z, stop: 2022-04-01T00:03:00Z, count: 5, fn: (n) => 1)' \
| ./fluxpipe-server -stdin
cat scripts/csv.flux | ./fluxpipe-server -stdin
cat scripts/sql.flux | ./fluxpipe-server -stdin
Configure your Grafana instance with our public demo endpoint (limited resources)
Flux(pipe) is built using the InfluxCommunity/flux fork which contains additional features and contributions.
All the standard and additional functions available in Fluxpipe are included in the Flux community documentation
- Fluxlib
- parser
- executor
- Contribs
- contrib/qxip/clickhouse
- contrib/qxip/logql
- contrib/qxip/hash
- ENV secrets
- STDIN pipeline
- HTTP api
- plaintext
- json support
- web playground