Skip to content

Commit

Permalink
ctk tail: Add basic utility command for tailing a database table
Browse files Browse the repository at this point in the history
... and optionally following the tail.
  • Loading branch information
amotl committed Dec 18, 2024
1 parent 3f86f7d commit 428e75b
Show file tree
Hide file tree
Showing 12 changed files with 294 additions and 3 deletions.
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
defer `polars` to `cratedb-toolkit[io]`.
- Fixed `cratedb-wtf record` about too large values of `ulimit_hard`
- Improved `ctk shell` to also talk to CrateDB standalone databases
- Added basic utility command `ctk tail`, for tailing a database
table, and optionally following the tail

## 2024/10/13 v0.0.29
- MongoDB: Added Zyp transformations to the CDC subsystem,
Expand Down
2 changes: 2 additions & 0 deletions cratedb_toolkit/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from .adapter.rockset.cli import cli as rockset_cli
from .cfr.cli import cli as cfr_cli
from .cluster.cli import cli as cloud_cli
from .cmd.tail.cli import cli as tail_cli
from .io.cli import cli as io_cli
from .job.cli import cli_list_jobs
from .query.cli import cli as query_cli
Expand All @@ -28,5 +29,6 @@ def cli(ctx: click.Context, verbose: bool, debug: bool):
cli.add_command(query_cli, name="query")
cli.add_command(rockset_cli, name="rockset")
cli.add_command(shell_cli, name="shell")
cli.add_command(tail_cli, name="tail")
cli.add_command(wtf_cli, name="wtf")
cli.add_command(cli_list_jobs)
Empty file added cratedb_toolkit/cmd/__init__.py
Empty file.
Empty file.
52 changes: 52 additions & 0 deletions cratedb_toolkit/cmd/tail/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import logging
import sys

import click

from cratedb_toolkit.cmd.tail.main import TableTailer
from cratedb_toolkit.model import TableAddress
from cratedb_toolkit.util import DatabaseAdapter
from cratedb_toolkit.util.cli import boot_click

logger = logging.getLogger(__name__)

cratedb_sqlalchemy_option = click.option(
"--cratedb-sqlalchemy-url", envvar="CRATEDB_SQLALCHEMY_URL", type=str, required=False, help="CrateDB SQLAlchemy URL"
)


@click.command()
@cratedb_sqlalchemy_option
@click.option(
"--lines", "-n", type=int, required=False, default=10, help="Displays n last lines of the input. Default: 10"
)
@click.option("--format", "format_", type=str, required=False, help="Select output format. Default: log / jsonl")
@click.option("--follow", "-f", is_flag=True, required=False, help="Follow new records added, by polling the table")
@click.option("--verbose", is_flag=True, required=False, help="Turn on logging")
@click.option("--debug", is_flag=True, required=False, help="Turn on logging with debug level")
@click.argument("resource", nargs=-1, type=click.UNPROCESSED)
@click.version_option()
@click.pass_context
def cli(
ctx: click.Context,
cratedb_sqlalchemy_url: str,
resource: str,
lines: int,
format_: str,
follow: bool,
verbose: bool,
debug: bool,
):
"""
A polling tail implementation for database tables.
"""
if not cratedb_sqlalchemy_url:
logger.error("Unable to operate without database address")
sys.exit(1)
boot_click(ctx, verbose, debug)
adapter = DatabaseAdapter(dburi=cratedb_sqlalchemy_url)
# TODO: Tail multiple tables.
if len(resource) > 1:
raise NotImplementedError("`ctk tail` currently implements tailing a single table only")
tt = TableTailer(db=adapter, resource=TableAddress.from_string(resource[0]), format=format_)
tt.start(lines=lines, follow=follow)
134 changes: 134 additions & 0 deletions cratedb_toolkit/cmd/tail/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import datetime as dt
import logging
import sys
import time
import typing as t

import attr
import colorlog
import orjson
import sqlparse
import yaml

from cratedb_toolkit.model import TableAddress
from cratedb_toolkit.util import DatabaseAdapter

logger = logging.getLogger(__name__)


@attr.define
class SysJobsLog:
"""
Represent a single record in CrateDB's `sys.jobs_log` table.
"""

id: str
started: int
ended: int
classification: t.Dict[str, t.Any]
stmt: str
error: str
node: t.Dict[str, t.Any]
username: str

@property
def template(self) -> str:
return "{timestamp} [{duration}] {label:17s}: {message} SQL: {sql:50s}"

@property
def label(self):
red = colorlog.escape_codes.escape_codes["red"]
green = colorlog.escape_codes.escape_codes["green"]
reset = colorlog.escape_codes.escape_codes["reset"]
if self.error:
return f"{red}ERROR{reset}"
else:
return f"{green}INFO{reset}"

@property
def duration(self) -> int:
return self.ended - self.started

@property
def started_iso(self) -> str:
return str(dt.datetime.fromtimestamp(self.started / 1000))[:-3]

@property
def duration_iso(self) -> str:
d = dt.timedelta(seconds=self.duration)
return str(d)

@property
def classification_str(self) -> str:
type_ = self.classification.get("type")
labels = ",".join(self.classification.get("labels", []))
return f"{type_}: {labels}"

def to_log(self, format: str): # noqa: A002
sql = self.stmt
if "pretty" in format:
sql = "\n" + sqlparse.format(sql, reindent=True, keyword_case="upper")
item = {
"timestamp": self.started_iso,
"duration": self.duration_iso,
"label": self.label,
"sql": sql,
"message": self.error or "Success",
}
return self.template.format(**item)


@attr.define
class TableTailer:
"""
Tail a table, optionally following its tail for new records.
"""

db: DatabaseAdapter
resource: TableAddress
interval: float = 0.1
format: t.Optional[str] = None

def __attrs_post_init__(self):
self.db.internal = True
if not self.format:
if self.resource.fullname == "sys.jobs_log":
self.format = "log"
else:
self.format = "json"

def start(self, lines: int = 10, follow: bool = False):
name = self.resource.fullname
constraint = "1 = 1"
if self.resource.fullname == "sys.jobs_log":
constraint = f"stmt NOT LIKE '%{self.db.internal_tag}'"
total = self.db.count_records(name, where=constraint)
offset = total - lines
if offset < 0:
offset = 0
while True:
sql = f"SELECT * FROM {name} WHERE {constraint} OFFSET {offset}" # noqa: S608
result = self.db.run_sql(sql, records=True)
for item in result:
if self.format and self.format.startswith("log"):
if self.resource.fullname == "sys.jobs_log":
record = SysJobsLog(**item)
sys.stdout.write(record.to_log(format=self.format))
sys.stdout.write("\n")
else:
raise NotImplementedError(
"Log output only implemented for `sys.jobs_log`, use `--format={json,yaml}"
)
elif self.format == "json":
sys.stdout.write(orjson.dumps(item).decode("utf-8"))
sys.stdout.write("\n")
elif self.format == "yaml":
sys.stdout.write("---\n")
sys.stdout.write(yaml.dump(item))
sys.stdout.write("\n")
else:
raise NotImplementedError(f"Output format not implemented: {self.format}")
if not follow:
return result
offset += len(result)
time.sleep(self.interval)
13 changes: 10 additions & 3 deletions cratedb_toolkit/util/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,11 @@ class DatabaseAdapter:
Wrap SQLAlchemy connection to database.
"""

def __init__(self, dburi: str, echo: bool = False):
internal_tag = " -- ctk"

def __init__(self, dburi: str, echo: bool = False, internal: bool = False):
self.dburi = dburi
self.internal = internal
self.engine = sa.create_engine(self.dburi, echo=echo)
# TODO: Make that go away.
logger.debug(f"Connecting to CrateDB: {dburi}")
Expand Down Expand Up @@ -119,6 +122,8 @@ def run_sql_real(self, sql: str, parameters: t.Mapping[str, str] = None, records
results = []
with self.engine.connect() as connection:
for statement in sqlparse.split(sql):
if self.internal:
statement += self.internal_tag
result = connection.execute(sa.text(statement), parameters)
data: t.Any
if records:
Expand All @@ -134,11 +139,13 @@ def run_sql_real(self, sql: str, parameters: t.Mapping[str, str] = None, records
else:
return results

def count_records(self, name: str, errors: Literal["raise", "ignore"] = "raise"):
def count_records(self, name: str, errors: Literal["raise", "ignore"] = "raise", where: str = ""):
"""
Return number of records in table.
"""
sql = f"SELECT COUNT(*) AS count FROM {self.quote_relation_name(name)};" # noqa: S608
sql = f"SELECT COUNT(*) AS count FROM {self.quote_relation_name(name)}" # noqa: S608
if where:
sql += f" WHERE {where}"
try:
results = self.run_sql(sql=sql)
except ProgrammingError as ex:
Expand Down
46 changes: 46 additions & 0 deletions doc/cmd/index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Utility Commands

(tail)=
## ctk tail

`ctk tail` displays the most recent records of a database table.
It also provides special decoding options for the `sys.jobs_log` table.

:::{rubric} Synopsis
:::
```shell
ctk tail -n 3 sys.summits
```

:::{rubric} Options
:::
You can combine `ctk tail`'s JSON and YAML output with programs like `jq` and `yq`.
```shell
ctk tail -n 3 sys.summits --format=json | jq
ctk tail -n 3 sys.summits --format=yaml | yq
```
Optionally poll the table for new records by using the `--follow` option.
```shell
ctk tail -n 3 doc.mytable --follow
```

:::{rubric} Decoder for `sys.jobs_log`
:::
`ctk tail` provides a special decoder when processing records of the `sys.jobs_log`
table. The default output format `--format=log` prints records in a concise
single-line formatting.
```shell
ctk tail -n 3 sys.jobs_log
```
The `--format=log-pretty` option will format the SQL statements for optimal
copy/paste procedures. Together with the `--follow` option, this provides
optimal support for ad hoc tracing of SQL statements processed by CrateDB.
```shell
ctk tail -n 3 sys.jobs_log --follow --format=log-pretty
```

:::{warning}
Because `ctk tail` works by submitting SQL commands to CrateDB, using its `--follow`
option will spam the `sys.jobs_log` with additional entries. The default interval
is 0.1 seconds, and can be changed using the `--interval` option.
:::
1 change: 1 addition & 0 deletions doc/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
install
datasets
cmd/index
adapter/index
io/index
query/index
Expand Down
12 changes: 12 additions & 0 deletions doc/wtf/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ Define CrateDB database cluster address.
export CRATEDB_SQLALCHEMY_URL=crate://localhost/
```


### One shot commands
Display system and database cluster information.
```shell
cratedb-wtf info
Expand All @@ -32,6 +34,16 @@ Display database cluster log messages.
cratedb-wtf logs
```

Display the most recent entries of the `sys.jobs_log` table,
optionally polling it for updates by adding `--follow`.
For more information, see [](#tail).
```shell
ctk tail -n 3 sys.jobs_log
```


### Data collectors

Collect and display job statistics.
```shell
cratedb-wtf job-statistics collect
Expand Down
Empty file added tests/cmd/__init__.py
Empty file.
35 changes: 35 additions & 0 deletions tests/cmd/test_tail.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import pytest

from cratedb_toolkit.cmd.tail.main import TableTailer
from cratedb_toolkit.model import TableAddress


def test_tail_sys_summits_default(cratedb):
tt = TableTailer(db=cratedb.database, resource=TableAddress.from_string("sys.summits"))
results = tt.start(lines=42)
assert len(results) == 42


def test_tail_sys_summits_format_log(cratedb):
tt = TableTailer(db=cratedb.database, resource=TableAddress.from_string("sys.summits"), format="log")
with pytest.raises(NotImplementedError) as ex:
tt.start(lines=2)
assert ex.match("Log output only implemented for `sys.jobs_log`.*")


def test_tail_sys_summits_format_yaml(cratedb):
tt = TableTailer(db=cratedb.database, resource=TableAddress.from_string("sys.summits"), format="yaml")
results = tt.start(lines=2)
assert len(results) == 2


def test_tail_sys_jobs_log_default(cratedb):
tt = TableTailer(db=cratedb.database, resource=TableAddress.from_string("sys.jobs_log"))
results = tt.start(lines=2)
assert len(results) == 2


def test_tail_sys_jobs_log_format_json(cratedb):
tt = TableTailer(db=cratedb.database, resource=TableAddress.from_string("sys.jobs_log"), format="json")
results = tt.start(lines=2)
assert len(results) == 2

0 comments on commit 428e75b

Please sign in to comment.