Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add basic utility command ctk tail, for tailing a database table and optionally following the tail #330

Merged
merged 3 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
defer `polars` to `cratedb-toolkit[io]`.
- Fixed `cratedb-wtf record` about too large values of `ulimit_hard`
- Improved `ctk shell` to also talk to CrateDB standalone databases
- Added basic utility command `ctk tail`, for tailing a database
table, and optionally following the tail

## 2024/10/13 v0.0.29
- MongoDB: Added Zyp transformations to the CDC subsystem,
Expand Down
2 changes: 2 additions & 0 deletions cratedb_toolkit/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from .adapter.rockset.cli import cli as rockset_cli
from .cfr.cli import cli as cfr_cli
from .cluster.cli import cli as cloud_cli
from .cmd.tail.cli import cli as tail_cli
from .io.cli import cli as io_cli
from .job.cli import cli_list_jobs
from .query.cli import cli as query_cli
Expand All @@ -28,5 +29,6 @@ def cli(ctx: click.Context, verbose: bool, debug: bool):
cli.add_command(query_cli, name="query")
cli.add_command(rockset_cli, name="rockset")
cli.add_command(shell_cli, name="shell")
cli.add_command(tail_cli, name="tail")
cli.add_command(wtf_cli, name="wtf")
cli.add_command(cli_list_jobs)
Empty file added cratedb_toolkit/cmd/__init__.py
Empty file.
Empty file.
56 changes: 56 additions & 0 deletions cratedb_toolkit/cmd/tail/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import logging
import sys

import click

from cratedb_toolkit.cmd.tail.main import TableTailer
from cratedb_toolkit.model import TableAddress
from cratedb_toolkit.util import DatabaseAdapter
from cratedb_toolkit.util.cli import boot_click

logger = logging.getLogger(__name__)

cratedb_sqlalchemy_option = click.option(
"--cratedb-sqlalchemy-url", envvar="CRATEDB_SQLALCHEMY_URL", type=str, required=False, help="CrateDB SQLAlchemy URL"
)


@click.command()
@cratedb_sqlalchemy_option
@click.option(
"--lines", "-n", type=int, required=False, default=10, help="Displays n last lines of the input. Default: 10"
)
@click.option("--format", "format_", type=str, required=False, help="Select output format. Default: log / jsonl")
@click.option("--follow", "-f", is_flag=True, required=False, help="Follow new records added, by polling the table")
@click.option(
"--interval", "-i", type=float, required=False, help="When following the tail, poll each N seconds. Default: 0.5"
)
@click.option("--verbose", is_flag=True, required=False, help="Turn on logging")
@click.option("--debug", is_flag=True, required=False, help="Turn on logging with debug level")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the --interval option is not defined and passed through?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice spot. Thanks!

Copy link
Member Author

@amotl amotl Dec 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed per 9a60d62. Also added 476d4f2.

Comment on lines +20 to +29
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason not to use click default option settings for all default values instead of later on inside the code?

Copy link
Member Author

@amotl amotl Dec 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the defaults would be applied to click, they wouldn't apply when using ctk's Python APIs in your own programs, that's why I decided to put them there, also to avoid redundancy declaring them on both sides.

@click.argument("resource", nargs=-1, type=click.UNPROCESSED)
@click.version_option()
@click.pass_context
def cli(
ctx: click.Context,
cratedb_sqlalchemy_url: str,
resource: str,
lines: int,
format_: str,
follow: bool,
interval: float,
verbose: bool,
debug: bool,
):
"""
A polling tail implementation for database tables.
"""
if not cratedb_sqlalchemy_url:
logger.error("Unable to operate without database address")
sys.exit(1)
boot_click(ctx, verbose, debug)
adapter = DatabaseAdapter(dburi=cratedb_sqlalchemy_url)

Check warning on line 51 in cratedb_toolkit/cmd/tail/cli.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/cmd/tail/cli.py#L47-L51

Added lines #L47 - L51 were not covered by tests
# TODO: Tail multiple tables.
if len(resource) > 1:
raise NotImplementedError("`ctk tail` currently implements tailing a single table only")
tt = TableTailer(db=adapter, resource=TableAddress.from_string(resource[0]), interval=interval, format=format_)
tt.start(lines=lines, follow=follow)

Check warning on line 56 in cratedb_toolkit/cmd/tail/cli.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/cmd/tail/cli.py#L53-L56

Added lines #L53 - L56 were not covered by tests
136 changes: 136 additions & 0 deletions cratedb_toolkit/cmd/tail/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import datetime as dt
import logging
import sys
import time
import typing as t

import attr
import colorlog
import orjson
import sqlparse
import yaml

from cratedb_toolkit.model import TableAddress
from cratedb_toolkit.util import DatabaseAdapter

logger = logging.getLogger(__name__)


@attr.define
class SysJobsLog:
"""
Represent a single record in CrateDB's `sys.jobs_log` table.
"""

id: str
started: int
ended: int
classification: t.Dict[str, t.Any]
stmt: str
error: str
node: t.Dict[str, t.Any]
username: str

@property
def template(self) -> str:
return "{timestamp} [{duration}] {label:17s}: {message} SQL: {sql:50s}"

@property
def label(self):
red = colorlog.escape_codes.escape_codes["red"]
green = colorlog.escape_codes.escape_codes["green"]
reset = colorlog.escape_codes.escape_codes["reset"]
if self.error:
return f"{red}ERROR{reset}"

Check warning on line 44 in cratedb_toolkit/cmd/tail/main.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/cmd/tail/main.py#L44

Added line #L44 was not covered by tests
else:
return f"{green}INFO{reset}"

@property
def duration(self) -> int:
return self.ended - self.started

@property
def started_iso(self) -> str:
return str(dt.datetime.fromtimestamp(self.started / 1000))[:-3]

@property
def duration_iso(self) -> str:
d = dt.timedelta(seconds=self.duration)
return str(d)

@property
def classification_str(self) -> str:
type_ = self.classification.get("type")
labels = ",".join(self.classification.get("labels", []))
return f"{type_}: {labels}"

Check warning on line 65 in cratedb_toolkit/cmd/tail/main.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/cmd/tail/main.py#L63-L65

Added lines #L63 - L65 were not covered by tests

def to_log(self, format: str): # noqa: A002
sql = self.stmt
if "pretty" in format:
sql = "\n" + sqlparse.format(sql, reindent=True, keyword_case="upper")

Check warning on line 70 in cratedb_toolkit/cmd/tail/main.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/cmd/tail/main.py#L70

Added line #L70 was not covered by tests
item = {
"timestamp": self.started_iso,
"duration": self.duration_iso,
"label": self.label,
"sql": sql,
"message": self.error or "Success",
}
return self.template.format(**item)


@attr.define
class TableTailer:
"""
Tail a table, optionally following its tail for new records.
"""

db: DatabaseAdapter
resource: TableAddress
interval: t.Optional[float] = None
format: t.Optional[str] = None

def __attrs_post_init__(self):
self.db.internal = True
if self.interval is None:
self.interval = 0.5
if not self.format:
if self.resource.fullname == "sys.jobs_log":
self.format = "log"
else:
self.format = "json"

def start(self, lines: int = 10, follow: bool = False):
name = self.resource.fullname
constraint = "1 = 1"
if self.resource.fullname == "sys.jobs_log":
constraint = f"stmt NOT LIKE '%{self.db.internal_tag}'"
total = self.db.count_records(name, where=constraint)
offset = total - lines
if offset < 0:
offset = 0

Check warning on line 110 in cratedb_toolkit/cmd/tail/main.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/cmd/tail/main.py#L110

Added line #L110 was not covered by tests
while True:
sql = f"SELECT * FROM {name} WHERE {constraint} OFFSET {offset}" # noqa: S608
result = self.db.run_sql(sql, records=True)
for item in result:
if self.format and self.format.startswith("log"):
if self.resource.fullname == "sys.jobs_log":
record = SysJobsLog(**item)
sys.stdout.write(record.to_log(format=self.format))
sys.stdout.write("\n")
else:
raise NotImplementedError(
"Log output only implemented for `sys.jobs_log`, use `--format={json,yaml}"
)
elif self.format == "json":
sys.stdout.write(orjson.dumps(item).decode("utf-8"))
sys.stdout.write("\n")
elif self.format == "yaml":
sys.stdout.write("---\n")
sys.stdout.write(yaml.dump(item))
sys.stdout.write("\n")
else:
raise NotImplementedError(f"Output format not implemented: {self.format}")

Check warning on line 132 in cratedb_toolkit/cmd/tail/main.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/cmd/tail/main.py#L132

Added line #L132 was not covered by tests
if not follow:
return result
offset += len(result)
time.sleep(t.cast(float, self.interval))

Check warning on line 136 in cratedb_toolkit/cmd/tail/main.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/cmd/tail/main.py#L135-L136

Added lines #L135 - L136 were not covered by tests
13 changes: 10 additions & 3 deletions cratedb_toolkit/util/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,11 @@
Wrap SQLAlchemy connection to database.
"""

def __init__(self, dburi: str, echo: bool = False):
internal_tag = " -- ctk"

Check warning on line 41 in cratedb_toolkit/util/database.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/util/database.py#L41

Added line #L41 was not covered by tests

def __init__(self, dburi: str, echo: bool = False, internal: bool = False):

Check warning on line 43 in cratedb_toolkit/util/database.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/util/database.py#L43

Added line #L43 was not covered by tests
self.dburi = dburi
self.internal = internal
self.engine = sa.create_engine(self.dburi, echo=echo)
# TODO: Make that go away.
logger.debug(f"Connecting to CrateDB: {dburi}")
Expand Down Expand Up @@ -119,6 +122,8 @@
results = []
with self.engine.connect() as connection:
for statement in sqlparse.split(sql):
if self.internal:
statement += self.internal_tag
result = connection.execute(sa.text(statement), parameters)
data: t.Any
if records:
Expand All @@ -134,11 +139,13 @@
else:
return results

def count_records(self, name: str, errors: Literal["raise", "ignore"] = "raise"):
def count_records(self, name: str, errors: Literal["raise", "ignore"] = "raise", where: str = ""):

Check warning on line 142 in cratedb_toolkit/util/database.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/util/database.py#L142

Added line #L142 was not covered by tests
"""
Return number of records in table.
"""
sql = f"SELECT COUNT(*) AS count FROM {self.quote_relation_name(name)};" # noqa: S608
sql = f"SELECT COUNT(*) AS count FROM {self.quote_relation_name(name)}" # noqa: S608
if where:
sql += f" WHERE {where}"
try:
results = self.run_sql(sql=sql)
except ProgrammingError as ex:
Expand Down
46 changes: 46 additions & 0 deletions doc/cmd/index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Utility Commands

(tail)=
## ctk tail

`ctk tail` displays the most recent records of a database table.
It also provides special decoding options for the `sys.jobs_log` table.

:::{rubric} Synopsis
:::
```shell
ctk tail -n 3 sys.summits
```

:::{rubric} Options
:::
You can combine `ctk tail`'s JSON and YAML output with programs like `jq` and `yq`.
```shell
ctk tail -n 3 sys.summits --format=json | jq
ctk tail -n 3 sys.summits --format=yaml | yq
```
Optionally poll the table for new records by using the `--follow` option.
```shell
ctk tail -n 3 doc.mytable --follow
```

:::{rubric} Decoder for `sys.jobs_log`
:::
`ctk tail` provides a special decoder when processing records of the `sys.jobs_log`
table. The default output format `--format=log` prints records in a concise
single-line formatting.
```shell
ctk tail -n 3 sys.jobs_log
```
The `--format=log-pretty` option will format the SQL statements for optimal
copy/paste procedures. Together with the `--follow` option, this provides
optimal support for ad hoc tracing of SQL statements processed by CrateDB.
```shell
ctk tail -n 3 sys.jobs_log --follow --format=log-pretty
```

:::{warning}
Because `ctk tail` works by submitting SQL commands to CrateDB, using its `--follow`
option will spam the `sys.jobs_log` with additional entries. The default interval
is 0.1 seconds, and can be changed using the `--interval` option.
:::
1 change: 1 addition & 0 deletions doc/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

install
datasets
cmd/index
adapter/index
io/index
query/index
Expand Down
12 changes: 12 additions & 0 deletions doc/wtf/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ Define CrateDB database cluster address.
export CRATEDB_SQLALCHEMY_URL=crate://localhost/
```


### One shot commands
Display system and database cluster information.
```shell
cratedb-wtf info
Expand All @@ -32,6 +34,16 @@ Display database cluster log messages.
cratedb-wtf logs
```

Display the most recent entries of the `sys.jobs_log` table,
optionally polling it for updates by adding `--follow`.
For more information, see [](#tail).
```shell
ctk tail -n 3 sys.jobs_log
```


### Data collectors

Collect and display job statistics.
```shell
cratedb-wtf job-statistics collect
Expand Down
Empty file added tests/cmd/__init__.py
Empty file.
35 changes: 35 additions & 0 deletions tests/cmd/test_tail.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import pytest

from cratedb_toolkit.cmd.tail.main import TableTailer
from cratedb_toolkit.model import TableAddress


def test_tail_sys_summits_default(cratedb):
tt = TableTailer(db=cratedb.database, resource=TableAddress.from_string("sys.summits"))
results = tt.start(lines=42)
assert len(results) == 42


def test_tail_sys_summits_format_log(cratedb):
tt = TableTailer(db=cratedb.database, resource=TableAddress.from_string("sys.summits"), format="log")
with pytest.raises(NotImplementedError) as ex:
tt.start(lines=2)
assert ex.match("Log output only implemented for `sys.jobs_log`.*")


def test_tail_sys_summits_format_yaml(cratedb):
tt = TableTailer(db=cratedb.database, resource=TableAddress.from_string("sys.summits"), format="yaml")
results = tt.start(lines=2)
assert len(results) == 2


def test_tail_sys_jobs_log_default(cratedb):
tt = TableTailer(db=cratedb.database, resource=TableAddress.from_string("sys.jobs_log"))
results = tt.start(lines=2)
assert len(results) == 2


def test_tail_sys_jobs_log_format_json(cratedb):
tt = TableTailer(db=cratedb.database, resource=TableAddress.from_string("sys.jobs_log"), format="json")
results = tt.start(lines=2)
assert len(results) == 2
Loading