From f45d912017c070100978c45fe11471cae6050d31 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Mon, 16 Dec 2024 23:02:57 +0100 Subject: [PATCH 1/3] ctk tail: Add basic utility command for tailing a database table ... and optionally following the tail. --- CHANGES.md | 2 + cratedb_toolkit/cli.py | 2 + cratedb_toolkit/cmd/__init__.py | 0 cratedb_toolkit/cmd/tail/__init__.py | 0 cratedb_toolkit/cmd/tail/cli.py | 52 +++++++++++ cratedb_toolkit/cmd/tail/main.py | 134 +++++++++++++++++++++++++++ cratedb_toolkit/util/database.py | 13 ++- doc/cmd/index.md | 46 +++++++++ doc/index.md | 1 + doc/wtf/index.md | 12 +++ tests/cmd/__init__.py | 0 tests/cmd/test_tail.py | 35 +++++++ 12 files changed, 294 insertions(+), 3 deletions(-) create mode 100644 cratedb_toolkit/cmd/__init__.py create mode 100644 cratedb_toolkit/cmd/tail/__init__.py create mode 100644 cratedb_toolkit/cmd/tail/cli.py create mode 100644 cratedb_toolkit/cmd/tail/main.py create mode 100644 doc/cmd/index.md create mode 100644 tests/cmd/__init__.py create mode 100644 tests/cmd/test_tail.py diff --git a/CHANGES.md b/CHANGES.md index d8eeafd8..eadd0cfb 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -5,6 +5,8 @@ defer `polars` to `cratedb-toolkit[io]`. - Fixed `cratedb-wtf record` about too large values of `ulimit_hard` - Improved `ctk shell` to also talk to CrateDB standalone databases +- Added basic utility command `ctk tail`, for tailing a database + table, and optionally following the tail ## 2024/10/13 v0.0.29 - MongoDB: Added Zyp transformations to the CDC subsystem, diff --git a/cratedb_toolkit/cli.py b/cratedb_toolkit/cli.py index a45e7f15..d87a9bc5 100644 --- a/cratedb_toolkit/cli.py +++ b/cratedb_toolkit/cli.py @@ -6,6 +6,7 @@ from .adapter.rockset.cli import cli as rockset_cli from .cfr.cli import cli as cfr_cli from .cluster.cli import cli as cloud_cli +from .cmd.tail.cli import cli as tail_cli from .io.cli import cli as io_cli from .job.cli import cli_list_jobs from .query.cli import cli as query_cli @@ -28,5 +29,6 @@ def cli(ctx: click.Context, verbose: bool, debug: bool): cli.add_command(query_cli, name="query") cli.add_command(rockset_cli, name="rockset") cli.add_command(shell_cli, name="shell") +cli.add_command(tail_cli, name="tail") cli.add_command(wtf_cli, name="wtf") cli.add_command(cli_list_jobs) diff --git a/cratedb_toolkit/cmd/__init__.py b/cratedb_toolkit/cmd/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/cratedb_toolkit/cmd/tail/__init__.py b/cratedb_toolkit/cmd/tail/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/cratedb_toolkit/cmd/tail/cli.py b/cratedb_toolkit/cmd/tail/cli.py new file mode 100644 index 00000000..6b10e1d3 --- /dev/null +++ b/cratedb_toolkit/cmd/tail/cli.py @@ -0,0 +1,52 @@ +import logging +import sys + +import click + +from cratedb_toolkit.cmd.tail.main import TableTailer +from cratedb_toolkit.model import TableAddress +from cratedb_toolkit.util import DatabaseAdapter +from cratedb_toolkit.util.cli import boot_click + +logger = logging.getLogger(__name__) + +cratedb_sqlalchemy_option = click.option( + "--cratedb-sqlalchemy-url", envvar="CRATEDB_SQLALCHEMY_URL", type=str, required=False, help="CrateDB SQLAlchemy URL" +) + + +@click.command() +@cratedb_sqlalchemy_option +@click.option( + "--lines", "-n", type=int, required=False, default=10, help="Displays n last lines of the input. Default: 10" +) +@click.option("--format", "format_", type=str, required=False, help="Select output format. Default: log / jsonl") +@click.option("--follow", "-f", is_flag=True, required=False, help="Follow new records added, by polling the table") +@click.option("--verbose", is_flag=True, required=False, help="Turn on logging") +@click.option("--debug", is_flag=True, required=False, help="Turn on logging with debug level") +@click.argument("resource", nargs=-1, type=click.UNPROCESSED) +@click.version_option() +@click.pass_context +def cli( + ctx: click.Context, + cratedb_sqlalchemy_url: str, + resource: str, + lines: int, + format_: str, + follow: bool, + verbose: bool, + debug: bool, +): + """ + A polling tail implementation for database tables. + """ + if not cratedb_sqlalchemy_url: + logger.error("Unable to operate without database address") + sys.exit(1) + boot_click(ctx, verbose, debug) + adapter = DatabaseAdapter(dburi=cratedb_sqlalchemy_url) + # TODO: Tail multiple tables. + if len(resource) > 1: + raise NotImplementedError("`ctk tail` currently implements tailing a single table only") + tt = TableTailer(db=adapter, resource=TableAddress.from_string(resource[0]), format=format_) + tt.start(lines=lines, follow=follow) diff --git a/cratedb_toolkit/cmd/tail/main.py b/cratedb_toolkit/cmd/tail/main.py new file mode 100644 index 00000000..081620c3 --- /dev/null +++ b/cratedb_toolkit/cmd/tail/main.py @@ -0,0 +1,134 @@ +import datetime as dt +import logging +import sys +import time +import typing as t + +import attr +import colorlog +import orjson +import sqlparse +import yaml + +from cratedb_toolkit.model import TableAddress +from cratedb_toolkit.util import DatabaseAdapter + +logger = logging.getLogger(__name__) + + +@attr.define +class SysJobsLog: + """ + Represent a single record in CrateDB's `sys.jobs_log` table. + """ + + id: str + started: int + ended: int + classification: t.Dict[str, t.Any] + stmt: str + error: str + node: t.Dict[str, t.Any] + username: str + + @property + def template(self) -> str: + return "{timestamp} [{duration}] {label:17s}: {message} SQL: {sql:50s}" + + @property + def label(self): + red = colorlog.escape_codes.escape_codes["red"] + green = colorlog.escape_codes.escape_codes["green"] + reset = colorlog.escape_codes.escape_codes["reset"] + if self.error: + return f"{red}ERROR{reset}" + else: + return f"{green}INFO{reset}" + + @property + def duration(self) -> int: + return self.ended - self.started + + @property + def started_iso(self) -> str: + return str(dt.datetime.fromtimestamp(self.started / 1000))[:-3] + + @property + def duration_iso(self) -> str: + d = dt.timedelta(seconds=self.duration) + return str(d) + + @property + def classification_str(self) -> str: + type_ = self.classification.get("type") + labels = ",".join(self.classification.get("labels", [])) + return f"{type_}: {labels}" + + def to_log(self, format: str): # noqa: A002 + sql = self.stmt + if "pretty" in format: + sql = "\n" + sqlparse.format(sql, reindent=True, keyword_case="upper") + item = { + "timestamp": self.started_iso, + "duration": self.duration_iso, + "label": self.label, + "sql": sql, + "message": self.error or "Success", + } + return self.template.format(**item) + + +@attr.define +class TableTailer: + """ + Tail a table, optionally following its tail for new records. + """ + + db: DatabaseAdapter + resource: TableAddress + interval: float = 0.1 + format: t.Optional[str] = None + + def __attrs_post_init__(self): + self.db.internal = True + if not self.format: + if self.resource.fullname == "sys.jobs_log": + self.format = "log" + else: + self.format = "json" + + def start(self, lines: int = 10, follow: bool = False): + name = self.resource.fullname + constraint = "1 = 1" + if self.resource.fullname == "sys.jobs_log": + constraint = f"stmt NOT LIKE '%{self.db.internal_tag}'" + total = self.db.count_records(name, where=constraint) + offset = total - lines + if offset < 0: + offset = 0 + while True: + sql = f"SELECT * FROM {name} WHERE {constraint} OFFSET {offset}" # noqa: S608 + result = self.db.run_sql(sql, records=True) + for item in result: + if self.format and self.format.startswith("log"): + if self.resource.fullname == "sys.jobs_log": + record = SysJobsLog(**item) + sys.stdout.write(record.to_log(format=self.format)) + sys.stdout.write("\n") + else: + raise NotImplementedError( + "Log output only implemented for `sys.jobs_log`, use `--format={json,yaml}" + ) + elif self.format == "json": + sys.stdout.write(orjson.dumps(item).decode("utf-8")) + sys.stdout.write("\n") + elif self.format == "yaml": + sys.stdout.write("---\n") + sys.stdout.write(yaml.dump(item)) + sys.stdout.write("\n") + else: + raise NotImplementedError(f"Output format not implemented: {self.format}") + if not follow: + return result + offset += len(result) + time.sleep(self.interval) diff --git a/cratedb_toolkit/util/database.py b/cratedb_toolkit/util/database.py index ae410daa..dc495c05 100644 --- a/cratedb_toolkit/util/database.py +++ b/cratedb_toolkit/util/database.py @@ -38,8 +38,11 @@ class DatabaseAdapter: Wrap SQLAlchemy connection to database. """ - def __init__(self, dburi: str, echo: bool = False): + internal_tag = " -- ctk" + + def __init__(self, dburi: str, echo: bool = False, internal: bool = False): self.dburi = dburi + self.internal = internal self.engine = sa.create_engine(self.dburi, echo=echo) # TODO: Make that go away. logger.debug(f"Connecting to CrateDB: {dburi}") @@ -119,6 +122,8 @@ def run_sql_real(self, sql: str, parameters: t.Mapping[str, str] = None, records results = [] with self.engine.connect() as connection: for statement in sqlparse.split(sql): + if self.internal: + statement += self.internal_tag result = connection.execute(sa.text(statement), parameters) data: t.Any if records: @@ -134,11 +139,13 @@ def run_sql_real(self, sql: str, parameters: t.Mapping[str, str] = None, records else: return results - def count_records(self, name: str, errors: Literal["raise", "ignore"] = "raise"): + def count_records(self, name: str, errors: Literal["raise", "ignore"] = "raise", where: str = ""): """ Return number of records in table. """ - sql = f"SELECT COUNT(*) AS count FROM {self.quote_relation_name(name)};" # noqa: S608 + sql = f"SELECT COUNT(*) AS count FROM {self.quote_relation_name(name)}" # noqa: S608 + if where: + sql += f" WHERE {where}" try: results = self.run_sql(sql=sql) except ProgrammingError as ex: diff --git a/doc/cmd/index.md b/doc/cmd/index.md new file mode 100644 index 00000000..a51dec84 --- /dev/null +++ b/doc/cmd/index.md @@ -0,0 +1,46 @@ +# Utility Commands + +(tail)= +## ctk tail + +`ctk tail` displays the most recent records of a database table. +It also provides special decoding options for the `sys.jobs_log` table. + +:::{rubric} Synopsis +::: +```shell +ctk tail -n 3 sys.summits +``` + +:::{rubric} Options +::: +You can combine `ctk tail`'s JSON and YAML output with programs like `jq` and `yq`. +```shell +ctk tail -n 3 sys.summits --format=json | jq +ctk tail -n 3 sys.summits --format=yaml | yq +``` +Optionally poll the table for new records by using the `--follow` option. +```shell +ctk tail -n 3 doc.mytable --follow +``` + +:::{rubric} Decoder for `sys.jobs_log` +::: +`ctk tail` provides a special decoder when processing records of the `sys.jobs_log` +table. The default output format `--format=log` prints records in a concise +single-line formatting. +```shell +ctk tail -n 3 sys.jobs_log +``` +The `--format=log-pretty` option will format the SQL statements for optimal +copy/paste procedures. Together with the `--follow` option, this provides +optimal support for ad hoc tracing of SQL statements processed by CrateDB. +```shell +ctk tail -n 3 sys.jobs_log --follow --format=log-pretty +``` + +:::{warning} +Because `ctk tail` works by submitting SQL commands to CrateDB, using its `--follow` +option will spam the `sys.jobs_log` with additional entries. The default interval +is 0.1 seconds, and can be changed using the `--interval` option. +::: diff --git a/doc/index.md b/doc/index.md index eea44f6c..e6e2f699 100644 --- a/doc/index.md +++ b/doc/index.md @@ -23,6 +23,7 @@ install datasets +cmd/index adapter/index io/index query/index diff --git a/doc/wtf/index.md b/doc/wtf/index.md index 74a46afc..d94f93dc 100644 --- a/doc/wtf/index.md +++ b/doc/wtf/index.md @@ -17,6 +17,8 @@ Define CrateDB database cluster address. export CRATEDB_SQLALCHEMY_URL=crate://localhost/ ``` + +### One shot commands Display system and database cluster information. ```shell cratedb-wtf info @@ -32,6 +34,16 @@ Display database cluster log messages. cratedb-wtf logs ``` +Display the most recent entries of the `sys.jobs_log` table, +optionally polling it for updates by adding `--follow`. +For more information, see [](#tail). +```shell +ctk tail -n 3 sys.jobs_log +``` + + +### Data collectors + Collect and display job statistics. ```shell cratedb-wtf job-statistics collect diff --git a/tests/cmd/__init__.py b/tests/cmd/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/cmd/test_tail.py b/tests/cmd/test_tail.py new file mode 100644 index 00000000..b53aed6c --- /dev/null +++ b/tests/cmd/test_tail.py @@ -0,0 +1,35 @@ +import pytest + +from cratedb_toolkit.cmd.tail.main import TableTailer +from cratedb_toolkit.model import TableAddress + + +def test_tail_sys_summits_default(cratedb): + tt = TableTailer(db=cratedb.database, resource=TableAddress.from_string("sys.summits")) + results = tt.start(lines=42) + assert len(results) == 42 + + +def test_tail_sys_summits_format_log(cratedb): + tt = TableTailer(db=cratedb.database, resource=TableAddress.from_string("sys.summits"), format="log") + with pytest.raises(NotImplementedError) as ex: + tt.start(lines=2) + assert ex.match("Log output only implemented for `sys.jobs_log`.*") + + +def test_tail_sys_summits_format_yaml(cratedb): + tt = TableTailer(db=cratedb.database, resource=TableAddress.from_string("sys.summits"), format="yaml") + results = tt.start(lines=2) + assert len(results) == 2 + + +def test_tail_sys_jobs_log_default(cratedb): + tt = TableTailer(db=cratedb.database, resource=TableAddress.from_string("sys.jobs_log")) + results = tt.start(lines=2) + assert len(results) == 2 + + +def test_tail_sys_jobs_log_format_json(cratedb): + tt = TableTailer(db=cratedb.database, resource=TableAddress.from_string("sys.jobs_log"), format="json") + results = tt.start(lines=2) + assert len(results) == 2 From 9a60d6248c64d40987e014d6a9cb905ea42abd18 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Tue, 17 Dec 2024 23:29:25 +0100 Subject: [PATCH 2/3] ctk tail: Add `--interval` option --- cratedb_toolkit/cmd/tail/cli.py | 4 +++- cratedb_toolkit/cmd/tail/main.py | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/cratedb_toolkit/cmd/tail/cli.py b/cratedb_toolkit/cmd/tail/cli.py index 6b10e1d3..16d9d1ad 100644 --- a/cratedb_toolkit/cmd/tail/cli.py +++ b/cratedb_toolkit/cmd/tail/cli.py @@ -22,6 +22,7 @@ ) @click.option("--format", "format_", type=str, required=False, help="Select output format. Default: log / jsonl") @click.option("--follow", "-f", is_flag=True, required=False, help="Follow new records added, by polling the table") +@click.option("--interval", "-i", type=float, required=False, help="When following the tail, poll each N seconds. Default: 0.1") @click.option("--verbose", is_flag=True, required=False, help="Turn on logging") @click.option("--debug", is_flag=True, required=False, help="Turn on logging with debug level") @click.argument("resource", nargs=-1, type=click.UNPROCESSED) @@ -34,6 +35,7 @@ def cli( lines: int, format_: str, follow: bool, + interval: float, verbose: bool, debug: bool, ): @@ -48,5 +50,5 @@ def cli( # TODO: Tail multiple tables. if len(resource) > 1: raise NotImplementedError("`ctk tail` currently implements tailing a single table only") - tt = TableTailer(db=adapter, resource=TableAddress.from_string(resource[0]), format=format_) + tt = TableTailer(db=adapter, resource=TableAddress.from_string(resource[0]), interval=interval, format=format_) tt.start(lines=lines, follow=follow) diff --git a/cratedb_toolkit/cmd/tail/main.py b/cratedb_toolkit/cmd/tail/main.py index 081620c3..f7fd9e79 100644 --- a/cratedb_toolkit/cmd/tail/main.py +++ b/cratedb_toolkit/cmd/tail/main.py @@ -91,6 +91,8 @@ class TableTailer: def __attrs_post_init__(self): self.db.internal = True + if self.interval is None: + self.interval = 0.1 if not self.format: if self.resource.fullname == "sys.jobs_log": self.format = "log" @@ -131,4 +133,4 @@ def start(self, lines: int = 10, follow: bool = False): if not follow: return result offset += len(result) - time.sleep(self.interval) + time.sleep(t.cast(float, self.interval)) From 476d4f20245ae449e538e8c1d3a8a59852a5fc52 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Tue, 17 Dec 2024 23:40:54 +0100 Subject: [PATCH 3/3] ctk tail: Use 0.5 seconds polling interval instead of 0.1 It is mostly sufficient and still provides enough interactivity. By reducing the frequency, it will incur less spam on sys.jobs_log itself. --- cratedb_toolkit/cmd/tail/cli.py | 4 +++- cratedb_toolkit/cmd/tail/main.py | 4 ++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/cratedb_toolkit/cmd/tail/cli.py b/cratedb_toolkit/cmd/tail/cli.py index 16d9d1ad..1a9bb8d8 100644 --- a/cratedb_toolkit/cmd/tail/cli.py +++ b/cratedb_toolkit/cmd/tail/cli.py @@ -22,7 +22,9 @@ ) @click.option("--format", "format_", type=str, required=False, help="Select output format. Default: log / jsonl") @click.option("--follow", "-f", is_flag=True, required=False, help="Follow new records added, by polling the table") -@click.option("--interval", "-i", type=float, required=False, help="When following the tail, poll each N seconds. Default: 0.1") +@click.option( + "--interval", "-i", type=float, required=False, help="When following the tail, poll each N seconds. Default: 0.5" +) @click.option("--verbose", is_flag=True, required=False, help="Turn on logging") @click.option("--debug", is_flag=True, required=False, help="Turn on logging with debug level") @click.argument("resource", nargs=-1, type=click.UNPROCESSED) diff --git a/cratedb_toolkit/cmd/tail/main.py b/cratedb_toolkit/cmd/tail/main.py index f7fd9e79..4f3024c3 100644 --- a/cratedb_toolkit/cmd/tail/main.py +++ b/cratedb_toolkit/cmd/tail/main.py @@ -86,13 +86,13 @@ class TableTailer: db: DatabaseAdapter resource: TableAddress - interval: float = 0.1 + interval: t.Optional[float] = None format: t.Optional[str] = None def __attrs_post_init__(self): self.db.internal = True if self.interval is None: - self.interval = 0.1 + self.interval = 0.5 if not self.format: if self.resource.fullname == "sys.jobs_log": self.format = "log"