-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add basic utility command ctk tail
, for tailing a database table and optionally following the tail
#330
Add basic utility command ctk tail
, for tailing a database table and optionally following the tail
#330
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
import logging | ||
import sys | ||
|
||
import click | ||
|
||
from cratedb_toolkit.cmd.tail.main import TableTailer | ||
from cratedb_toolkit.model import TableAddress | ||
from cratedb_toolkit.util import DatabaseAdapter | ||
from cratedb_toolkit.util.cli import boot_click | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
cratedb_sqlalchemy_option = click.option( | ||
"--cratedb-sqlalchemy-url", envvar="CRATEDB_SQLALCHEMY_URL", type=str, required=False, help="CrateDB SQLAlchemy URL" | ||
) | ||
|
||
|
||
@click.command() | ||
@cratedb_sqlalchemy_option | ||
@click.option( | ||
"--lines", "-n", type=int, required=False, default=10, help="Displays n last lines of the input. Default: 10" | ||
) | ||
@click.option("--format", "format_", type=str, required=False, help="Select output format. Default: log / jsonl") | ||
@click.option("--follow", "-f", is_flag=True, required=False, help="Follow new records added, by polling the table") | ||
@click.option( | ||
"--interval", "-i", type=float, required=False, help="When following the tail, poll each N seconds. Default: 0.5" | ||
) | ||
@click.option("--verbose", is_flag=True, required=False, help="Turn on logging") | ||
@click.option("--debug", is_flag=True, required=False, help="Turn on logging with debug level") | ||
Comment on lines
+20
to
+29
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any reason not to use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the defaults would be applied to click, they wouldn't apply when using ctk's Python APIs in your own programs, that's why I decided to put them there, also to avoid redundancy declaring them on both sides. |
||
@click.argument("resource", nargs=-1, type=click.UNPROCESSED) | ||
@click.version_option() | ||
@click.pass_context | ||
def cli( | ||
ctx: click.Context, | ||
cratedb_sqlalchemy_url: str, | ||
resource: str, | ||
lines: int, | ||
format_: str, | ||
follow: bool, | ||
interval: float, | ||
verbose: bool, | ||
debug: bool, | ||
): | ||
""" | ||
A polling tail implementation for database tables. | ||
""" | ||
if not cratedb_sqlalchemy_url: | ||
logger.error("Unable to operate without database address") | ||
sys.exit(1) | ||
boot_click(ctx, verbose, debug) | ||
adapter = DatabaseAdapter(dburi=cratedb_sqlalchemy_url) | ||
# TODO: Tail multiple tables. | ||
if len(resource) > 1: | ||
raise NotImplementedError("`ctk tail` currently implements tailing a single table only") | ||
tt = TableTailer(db=adapter, resource=TableAddress.from_string(resource[0]), interval=interval, format=format_) | ||
tt.start(lines=lines, follow=follow) | ||
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,136 @@ | ||
import datetime as dt | ||
import logging | ||
import sys | ||
import time | ||
import typing as t | ||
|
||
import attr | ||
import colorlog | ||
import orjson | ||
import sqlparse | ||
import yaml | ||
|
||
from cratedb_toolkit.model import TableAddress | ||
from cratedb_toolkit.util import DatabaseAdapter | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
@attr.define | ||
class SysJobsLog: | ||
""" | ||
Represent a single record in CrateDB's `sys.jobs_log` table. | ||
""" | ||
|
||
id: str | ||
started: int | ||
ended: int | ||
classification: t.Dict[str, t.Any] | ||
stmt: str | ||
error: str | ||
node: t.Dict[str, t.Any] | ||
username: str | ||
|
||
@property | ||
def template(self) -> str: | ||
return "{timestamp} [{duration}] {label:17s}: {message} SQL: {sql:50s}" | ||
|
||
@property | ||
def label(self): | ||
red = colorlog.escape_codes.escape_codes["red"] | ||
green = colorlog.escape_codes.escape_codes["green"] | ||
reset = colorlog.escape_codes.escape_codes["reset"] | ||
if self.error: | ||
return f"{red}ERROR{reset}" | ||
else: | ||
return f"{green}INFO{reset}" | ||
|
||
@property | ||
def duration(self) -> int: | ||
return self.ended - self.started | ||
|
||
@property | ||
def started_iso(self) -> str: | ||
return str(dt.datetime.fromtimestamp(self.started / 1000))[:-3] | ||
|
||
@property | ||
def duration_iso(self) -> str: | ||
d = dt.timedelta(seconds=self.duration) | ||
return str(d) | ||
|
||
@property | ||
def classification_str(self) -> str: | ||
type_ = self.classification.get("type") | ||
labels = ",".join(self.classification.get("labels", [])) | ||
return f"{type_}: {labels}" | ||
|
||
def to_log(self, format: str): # noqa: A002 | ||
sql = self.stmt | ||
if "pretty" in format: | ||
sql = "\n" + sqlparse.format(sql, reindent=True, keyword_case="upper") | ||
item = { | ||
"timestamp": self.started_iso, | ||
"duration": self.duration_iso, | ||
"label": self.label, | ||
"sql": sql, | ||
"message": self.error or "Success", | ||
} | ||
return self.template.format(**item) | ||
|
||
|
||
@attr.define | ||
class TableTailer: | ||
""" | ||
Tail a table, optionally following its tail for new records. | ||
""" | ||
|
||
db: DatabaseAdapter | ||
resource: TableAddress | ||
interval: t.Optional[float] = None | ||
format: t.Optional[str] = None | ||
|
||
def __attrs_post_init__(self): | ||
self.db.internal = True | ||
if self.interval is None: | ||
self.interval = 0.5 | ||
if not self.format: | ||
if self.resource.fullname == "sys.jobs_log": | ||
self.format = "log" | ||
else: | ||
self.format = "json" | ||
|
||
def start(self, lines: int = 10, follow: bool = False): | ||
name = self.resource.fullname | ||
constraint = "1 = 1" | ||
if self.resource.fullname == "sys.jobs_log": | ||
constraint = f"stmt NOT LIKE '%{self.db.internal_tag}'" | ||
total = self.db.count_records(name, where=constraint) | ||
offset = total - lines | ||
if offset < 0: | ||
offset = 0 | ||
while True: | ||
sql = f"SELECT * FROM {name} WHERE {constraint} OFFSET {offset}" # noqa: S608 | ||
result = self.db.run_sql(sql, records=True) | ||
for item in result: | ||
if self.format and self.format.startswith("log"): | ||
if self.resource.fullname == "sys.jobs_log": | ||
record = SysJobsLog(**item) | ||
sys.stdout.write(record.to_log(format=self.format)) | ||
sys.stdout.write("\n") | ||
else: | ||
raise NotImplementedError( | ||
"Log output only implemented for `sys.jobs_log`, use `--format={json,yaml}" | ||
) | ||
elif self.format == "json": | ||
sys.stdout.write(orjson.dumps(item).decode("utf-8")) | ||
sys.stdout.write("\n") | ||
elif self.format == "yaml": | ||
sys.stdout.write("---\n") | ||
sys.stdout.write(yaml.dump(item)) | ||
sys.stdout.write("\n") | ||
else: | ||
raise NotImplementedError(f"Output format not implemented: {self.format}") | ||
if not follow: | ||
return result | ||
offset += len(result) | ||
time.sleep(t.cast(float, self.interval)) | ||
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
# Utility Commands | ||
|
||
(tail)= | ||
## ctk tail | ||
|
||
`ctk tail` displays the most recent records of a database table. | ||
It also provides special decoding options for the `sys.jobs_log` table. | ||
|
||
:::{rubric} Synopsis | ||
::: | ||
```shell | ||
ctk tail -n 3 sys.summits | ||
``` | ||
|
||
:::{rubric} Options | ||
::: | ||
You can combine `ctk tail`'s JSON and YAML output with programs like `jq` and `yq`. | ||
```shell | ||
ctk tail -n 3 sys.summits --format=json | jq | ||
ctk tail -n 3 sys.summits --format=yaml | yq | ||
``` | ||
Optionally poll the table for new records by using the `--follow` option. | ||
```shell | ||
ctk tail -n 3 doc.mytable --follow | ||
``` | ||
|
||
:::{rubric} Decoder for `sys.jobs_log` | ||
::: | ||
`ctk tail` provides a special decoder when processing records of the `sys.jobs_log` | ||
table. The default output format `--format=log` prints records in a concise | ||
single-line formatting. | ||
```shell | ||
ctk tail -n 3 sys.jobs_log | ||
``` | ||
The `--format=log-pretty` option will format the SQL statements for optimal | ||
copy/paste procedures. Together with the `--follow` option, this provides | ||
optimal support for ad hoc tracing of SQL statements processed by CrateDB. | ||
```shell | ||
ctk tail -n 3 sys.jobs_log --follow --format=log-pretty | ||
``` | ||
|
||
:::{warning} | ||
Because `ctk tail` works by submitting SQL commands to CrateDB, using its `--follow` | ||
option will spam the `sys.jobs_log` with additional entries. The default interval | ||
is 0.1 seconds, and can be changed using the `--interval` option. | ||
::: |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,6 +23,7 @@ | |
|
||
install | ||
datasets | ||
cmd/index | ||
adapter/index | ||
io/index | ||
query/index | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
import pytest | ||
|
||
from cratedb_toolkit.cmd.tail.main import TableTailer | ||
from cratedb_toolkit.model import TableAddress | ||
|
||
|
||
def test_tail_sys_summits_default(cratedb): | ||
tt = TableTailer(db=cratedb.database, resource=TableAddress.from_string("sys.summits")) | ||
results = tt.start(lines=42) | ||
assert len(results) == 42 | ||
|
||
|
||
def test_tail_sys_summits_format_log(cratedb): | ||
tt = TableTailer(db=cratedb.database, resource=TableAddress.from_string("sys.summits"), format="log") | ||
with pytest.raises(NotImplementedError) as ex: | ||
tt.start(lines=2) | ||
assert ex.match("Log output only implemented for `sys.jobs_log`.*") | ||
|
||
|
||
def test_tail_sys_summits_format_yaml(cratedb): | ||
tt = TableTailer(db=cratedb.database, resource=TableAddress.from_string("sys.summits"), format="yaml") | ||
results = tt.start(lines=2) | ||
assert len(results) == 2 | ||
|
||
|
||
def test_tail_sys_jobs_log_default(cratedb): | ||
tt = TableTailer(db=cratedb.database, resource=TableAddress.from_string("sys.jobs_log")) | ||
results = tt.start(lines=2) | ||
assert len(results) == 2 | ||
|
||
|
||
def test_tail_sys_jobs_log_format_json(cratedb): | ||
tt = TableTailer(db=cratedb.database, resource=TableAddress.from_string("sys.jobs_log"), format="json") | ||
results = tt.start(lines=2) | ||
assert len(results) == 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like the
--interval
option is not defined and passed through?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice spot. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed per 9a60d62. Also added 476d4f2.