Skip to content

Commit

Permalink
Table Loader: Add capability to load InfluxDB Line Protocol (ILP) files
Browse files Browse the repository at this point in the history
Synopsis:

  ctk load table "file://influxdb-export.lp"
  • Loading branch information
amotl committed Dec 23, 2024
1 parent a85172b commit 3172163
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
- Improved `ctk shell` to also talk to CrateDB standalone databases
- Added basic utility command `ctk tail`, for tailing a database
table, and optionally following the tail
- Table Loader: Added capability to load InfluxDB Line Protocol (ILP) files

## 2024/10/13 v0.0.29
- MongoDB: Added Zyp transformations to the CDC subsystem,
Expand Down
6 changes: 5 additions & 1 deletion cratedb_toolkit/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,11 @@ def load_table(
logger.error("Data loading failed or incomplete")
return False

elif source_url_obj.scheme.startswith("influxdb"):
elif (
source_url_obj.scheme.startswith("influxdb")
or resource.url.endswith(".lp")
or resource.url.endswith(".lp.gz")
):
from cratedb_toolkit.io.influxdb import influxdb_copy

http_scheme = "http"
Expand Down
10 changes: 10 additions & 0 deletions cratedb_toolkit/io/influxdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from influxio.core import copy

from cratedb_toolkit.model import DatabaseAddress

logger = logging.getLogger(__name__)


Expand All @@ -12,6 +14,14 @@ def influxdb_copy(source_url, target_url, progress: bool = False):
export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/testdrive/demo
ctk load table influxdb2://example:token@localhost:8086/testdrive/demo
"""

# Sanity checks.
target_address = DatabaseAddress.from_string(target_url)
url, table_address = target_address.decode()
if table_address.table is None:
raise ValueError("Table name is missing. Please adjust CrateDB database URL.")

# Invoke copy operation.
logger.info("Running InfluxDB copy")
copy(source_url, target_url, progress=progress)
return True
30 changes: 25 additions & 5 deletions doc/io/influxdb/loader.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@
# InfluxDB Table Loader

## About
Load data from InfluxDB into CrateDB using a one-stop command
`ctk load table influxdb2://...`, in order to facilitate convenient
Load data from InfluxDB into CrateDB using one-stop commands
`ctk load table ...`, in order to facilitate convenient
data transfers to be used within data pipelines or ad hoc operations.

## Synopsis
- Load from InfluxDB server: `ctk load table influxdb2://...`
- Load from InfluxDB line protocol: `ctk load table file://observations.lp`

## Details
The InfluxDB table loader is based on the [influxio] package. Please also check
its documentation to learn about more of its capabilities, supporting you when
Expand All @@ -18,7 +22,13 @@ pip install --upgrade 'cratedb-toolkit[influxdb]'

## Usage

### Workstation
Prepare subsequent commands by defining the database address of your
CrateDB database cluster.
```shell
export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/testdrive/demo
```

### InfluxDB 2 API

An exemplary walkthrough, copying data from InfluxDB to CrateDB, both services
expected to be listening on `localhost`.
Expand All @@ -37,13 +47,11 @@ influx query "from(bucket:\"${INFLUX_BUCKET_NAME}\") |> range(start:-100y)"

Transfer data from InfluxDB bucket/measurement into CrateDB schema/table.
```shell
export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/testdrive/demo
ctk load table influxdb2://example:token@localhost:8086/testdrive/demo
```

Query data in CrateDB.
```shell
export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/testdrive/demo
ctk shell --command "SELECT * FROM testdrive.demo;"
ctk show table "testdrive.demo"
```
Expand All @@ -52,6 +60,18 @@ ctk show table "testdrive.demo"
- More convenient table querying.
:::

### InfluxDB Line protocol file (ILP)

Import ILP file from local filesystem.
```shell
ctk load table "file://influxdb-export.lp"
```

Import ILP file from a remote resource.
```shell
ctk load table \
"https://github.com/influxdata/influxdb2-sample-data/raw/master/air-sensor-data/air-sensor-data.lp"
```

### Cloud

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ full = [
]
influxdb = [
"cratedb-toolkit[io]",
"influxio>=0.4,<1",
"influxio>=0.5,<1",
]
io = [
"cr8",
Expand Down
40 changes: 40 additions & 0 deletions tests/io/influxdb/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,46 @@
from influxio.adapter import InfluxDbApiAdapter


def test_line_protocol_load_table_success(caplog, cratedb, needs_sqlalchemy2):
"""
CLI test: Invoke `ctk load table` for InfluxDB line protocol (ILP) file.
"""
ilp_url = "https://github.com/influxdata/influxdb2-sample-data/raw/master/noaa-ndbc-data/latest-observations.lp"
cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo"

# Run transfer command.
runner = CliRunner(env={"CRATEDB_SQLALCHEMY_URL": cratedb_url})
result = runner.invoke(
cli,
args=f"load table {ilp_url}",
catch_exceptions=False,
)
assert result.exit_code == 0

# Verify data in target database.
assert cratedb.database.table_exists("testdrive.demo") is True
assert cratedb.database.refresh_table("testdrive.demo") is True
assert cratedb.database.count_records("testdrive.demo") >= 500


def test_line_protocol_load_table_wrong_cratedb_url_failure(caplog, cratedb, needs_sqlalchemy2):
"""
CLI test: Invoke `ctk load table` for InfluxDB line protocol (ILP) file.
"""
ilp_url = "https://github.com/influxdata/influxdb2-sample-data/raw/master/noaa-ndbc-data/latest-observations.lp"
cratedb_url = f"{cratedb.get_connection_url()}/testdrive"

# Run transfer command.
runner = CliRunner(env={"CRATEDB_SQLALCHEMY_URL": cratedb_url})
with pytest.raises(ValueError) as ex:
runner.invoke(
cli,
args=f"load table {ilp_url}",
catch_exceptions=False,
)
assert ex.match("Table name is missing. Please adjust CrateDB database URL.")


def test_influxdb2_load_table(caplog, cratedb, influxdb, needs_sqlalchemy2):
"""
CLI test: Invoke `ctk load table` for InfluxDB.
Expand Down

0 comments on commit 3172163

Please sign in to comment.