diff --git a/poetry.lock b/poetry.lock index ad80bbf..a118812 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1380,6 +1380,20 @@ files = [ {file = "progress-1.6.tar.gz", hash = "sha256:c9c86e98b5c03fa1fe11e3b67c1feda4788b8d0fe7336c2ff7d5644ccfba34cd"}, ] +[[package]] +name = "prometheus-client" +version = "0.19.0" +description = "Python client for the Prometheus monitoring system." +optional = false +python-versions = ">=3.8" +files = [ + {file = "prometheus_client-0.19.0-py3-none-any.whl", hash = "sha256:c88b1e6ecf6b41cd8fb5731c7ae919bf66df6ec6fafa555cd6c0e16ca169ae92"}, + {file = "prometheus_client-0.19.0.tar.gz", hash = "sha256:4585b0d1223148c27a225b10dbec5ae9bc4c81a99a3fa80774fa6209935324e1"}, +] + +[package.extras] +twisted = ["twisted"] + [[package]] name = "proto-plus" version = "1.22.3" @@ -1419,6 +1433,34 @@ files = [ {file = "protobuf-4.24.3.tar.gz", hash = "sha256:12e9ad2ec079b833176d2921be2cb24281fa591f0b119b208b788adc48c2561d"}, ] +[[package]] +name = "psutil" +version = "5.9.6" +description = "Cross-platform lib for process and system monitoring in Python." +optional = false +python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, !=3.5.*" +files = [ + {file = "psutil-5.9.6-cp27-cp27m-macosx_10_9_x86_64.whl", hash = "sha256:fb8a697f11b0f5994550555fcfe3e69799e5b060c8ecf9e2f75c69302cc35c0d"}, + {file = "psutil-5.9.6-cp27-cp27m-manylinux2010_i686.whl", hash = "sha256:91ecd2d9c00db9817a4b4192107cf6954addb5d9d67a969a4f436dbc9200f88c"}, + {file = "psutil-5.9.6-cp27-cp27m-manylinux2010_x86_64.whl", hash = "sha256:10e8c17b4f898d64b121149afb136c53ea8b68c7531155147867b7b1ac9e7e28"}, + {file = "psutil-5.9.6-cp27-cp27mu-manylinux2010_i686.whl", hash = "sha256:18cd22c5db486f33998f37e2bb054cc62fd06646995285e02a51b1e08da97017"}, + {file = "psutil-5.9.6-cp27-cp27mu-manylinux2010_x86_64.whl", hash = "sha256:ca2780f5e038379e520281e4c032dddd086906ddff9ef0d1b9dcf00710e5071c"}, + {file = "psutil-5.9.6-cp27-none-win32.whl", hash = "sha256:70cb3beb98bc3fd5ac9ac617a327af7e7f826373ee64c80efd4eb2856e5051e9"}, + {file = "psutil-5.9.6-cp27-none-win_amd64.whl", hash = "sha256:51dc3d54607c73148f63732c727856f5febec1c7c336f8f41fcbd6315cce76ac"}, + {file = "psutil-5.9.6-cp36-abi3-macosx_10_9_x86_64.whl", hash = "sha256:c69596f9fc2f8acd574a12d5f8b7b1ba3765a641ea5d60fb4736bf3c08a8214a"}, + {file = "psutil-5.9.6-cp36-abi3-manylinux_2_12_i686.manylinux2010_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:92e0cc43c524834af53e9d3369245e6cc3b130e78e26100d1f63cdb0abeb3d3c"}, + {file = "psutil-5.9.6-cp36-abi3-manylinux_2_12_x86_64.manylinux2010_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:748c9dd2583ed86347ed65d0035f45fa8c851e8d90354c122ab72319b5f366f4"}, + {file = "psutil-5.9.6-cp36-cp36m-win32.whl", hash = "sha256:3ebf2158c16cc69db777e3c7decb3c0f43a7af94a60d72e87b2823aebac3d602"}, + {file = "psutil-5.9.6-cp36-cp36m-win_amd64.whl", hash = "sha256:ff18b8d1a784b810df0b0fff3bcb50ab941c3b8e2c8de5726f9c71c601c611aa"}, + {file = "psutil-5.9.6-cp37-abi3-win32.whl", hash = "sha256:a6f01f03bf1843280f4ad16f4bde26b817847b4c1a0db59bf6419807bc5ce05c"}, + {file = "psutil-5.9.6-cp37-abi3-win_amd64.whl", hash = "sha256:6e5fb8dc711a514da83098bc5234264e551ad980cec5f85dabf4d38ed6f15e9a"}, + {file = "psutil-5.9.6-cp38-abi3-macosx_11_0_arm64.whl", hash = "sha256:daecbcbd29b289aac14ece28eca6a3e60aa361754cf6da3dfb20d4d32b6c7f57"}, + {file = "psutil-5.9.6.tar.gz", hash = "sha256:e4b92ddcd7dd4cdd3f900180ea1e104932c7bce234fb88976e2a3b296441225a"}, +] + +[package.extras] +test = ["enum34", "ipaddress", "mock", "pywin32", "wmi"] + [[package]] name = "py-gfm" version = "2.0.0" @@ -2183,4 +2225,4 @@ testing = ["big-O", "jaraco.functools", "jaraco.itertools", "more-itertools", "p [metadata] lock-version = "2.0" python-versions = "^3.10" -content-hash = "22e595f7b751cc2e473578678c2afc8313ee0d4bc70b81f51dc32ad7f7259b5d" +content-hash = "1d49e189dc48f93cbaa830732cce87cf07c37b854cd25ca010c5da45ea4b9cc8" diff --git a/pyproject.toml b/pyproject.toml index 162fa7f..1e47bc3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,6 +30,8 @@ py-gfm = "^2.0.0" backoff = "^2.2.1" progress = "^1.6" opentelemetry-exporter-gcp-trace = "^1.6.0" +prometheus-client = "^0.19.0" +psutil = "^5.9.6" [tool.poetry.group.dev.dependencies] diff --git a/src/pudl_output_differ/cli.py b/src/pudl_output_differ/cli.py index 71da899..6cd97d4 100755 --- a/src/pudl_output_differ/cli.py +++ b/src/pudl_output_differ/cli.py @@ -11,12 +11,14 @@ import argparse import atexit import logging +import os import shutil import sys import tempfile import fsspec import markdown +import psutil from mdx_gfm import GithubFlavoredMarkdownExtension from opentelemetry import trace from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter @@ -24,9 +26,10 @@ from opentelemetry.sdk.resources import SERVICE_NAME, Resource from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor +from prometheus_client import Gauge, start_http_server from pudl_output_differ.files import DirectoryAnalyzer, is_remote -from pudl_output_differ.task_queue import TaskQueue +from pudl_output_differ.task_queue import TaskQueue, TaskQueueSettings from pudl_output_differ.types import ObjectPath logger = logging.getLogger(__name__) @@ -96,19 +99,6 @@ def parse_command_line(argv) -> argparse.Namespace: default="", help="""If set, write html markdown report into this file.""", ) - - parser.add_argument( - "--github-repo", - type=str, - default="", - help="Name of the github repository where comments should be posted.", - ) - parser.add_argument( - "--github-pr", - type=int, - default=0, - help="If supplied, diff will be published as a comment to the github PR.", - ) parser.add_argument( "--gcp-cloud-trace", type=bool, @@ -128,6 +118,25 @@ def parse_command_line(argv) -> argparse.Namespace: # default="INFO", help="Controls the severity of logging.", ) + parser.add_argument( + "--prometheus-port", + type=int, + default=9101, + help="Port on which to start prometheus metrics server." + ) + # parser.add_argument( + # "--github-repo", + # type=str, + # default="", + # help="Name of the github repository where comments should be posted.", + # ) + # parser.add_argument( + # "--github-pr", + # type=int, + # default=0, + # help="If supplied, diff will be published as a comment to the github PR.", + # ) + arguments = parser.parse_args(argv[1:]) return arguments @@ -143,14 +152,14 @@ def setup_tracing(args: argparse.Namespace) -> None: ) if args.otel_trace_backend: logger.info(f"Publishing traces to OTEL backend {args.otel_trace_backend}") - processor = BatchSpanProcessor(OTLPSpanExporter(endpoint=args.trace_backend)) + processor = BatchSpanProcessor(OTLPSpanExporter(endpoint=args.otel_trace_backend)) provider.add_span_processor(processor) if args.gcp_cloud_trace: logger.info("Publishing traces to Google Cloud Trace service.") provider.add_span_processor( BatchSpanProcessor(CloudTraceSpanExporter()) - ) + ) trace.set_tracer_provider(provider) @@ -169,20 +178,44 @@ def main() -> int: lpath = args.left rpath = args.right - with tracer.start_as_current_span(name="main"): - task_queue = TaskQueue(max_workers=args.max_workers) - - task_queue.put( - DirectoryAnalyzer( - object_path=ObjectPath(), - left_path=lpath, - right_path=rpath, - local_cache_root=args.cache_dir, - filename_filter=args.filename_filter, - ) + if args.prometheus_port: + start_http_server(args.prometheus_port) + Gauge("cpu_usage", "Usage of the CPU in percent.").set_function( + lambda: psutil.cpu_percent(interval=1) + ) + Gauge("memory_usage", "Usage of the memory in percent.").set_function( + lambda: psutil.virtual_memory().percent + ) + proc_self = psutil.Process(os.getpid()) + Gauge("process_memory_rss", "RSS of the Python process").set_function( + lambda: proc_self.memory_info().rss ) - task_queue.run() - task_queue.wait() + Gauge("process_memory_vms", "VMS of the Python process").set_function( + lambda: proc_self.memory_info().vms + ) + # TODO(rousik): proc_self.cpu_times() can also be helpful to get total CPU burn. + # Note that the above approach may not be optimal, we might choose to use + # with proc_self.oneshot(): to avoid repeated calls, and perhaps run the + # updater in a background thread that can sleep a lot. + + + task_queue = TaskQueue( + settings=TaskQueueSettings( + max_workers=args.max_workers, + ) + ) + + task_queue.put( + DirectoryAnalyzer( + object_path=ObjectPath(), + left_path=lpath, + right_path=rpath, + local_cache_root=args.cache_dir, + filename_filter=args.filename_filter, + ) + ) + task_queue.run() + task_queue.wait() if args.html_report: md = task_queue.to_markdown() diff --git a/src/pudl_output_differ/files.py b/src/pudl_output_differ/files.py index 627a3b6..86d3e10 100644 --- a/src/pudl_output_differ/files.py +++ b/src/pudl_output_differ/files.py @@ -1,18 +1,16 @@ """Generic utilities for diffing PUDL_OUTPUT directories.""" -from pathlib import Path import logging import re +from pathlib import Path from typing import Counter, Iterator -from opentelemetry import trace import fsspec +from opentelemetry import trace + from pudl_output_differ.parquet import ParquetAnalyzer, ParquetFile from pudl_output_differ.sqlite import Database, SQLiteAnalyzer - -from pudl_output_differ.types import ( - Result, Analyzer, KeySetDiff, TaskQueueInterface -) +from pudl_output_differ.types import Analyzer, KeySetDiff, Result, TaskQueueInterface logger = logging.getLogger(__name__) tracer = trace.get_tracer(__name__) @@ -38,7 +36,8 @@ class DirectoryAnalyzer(Analyzer): filename_filter: str = "" def get_title(self) -> str: - return "## Files" + return "Files" + def get_files(self, root_path: str) -> dict[str, str]: """Returns list of files in the output directory. diff --git a/src/pudl_output_differ/sqlite.py b/src/pudl_output_differ/sqlite.py index ecf1d92..d54ebf4 100644 --- a/src/pudl_output_differ/sqlite.py +++ b/src/pudl_output_differ/sqlite.py @@ -7,6 +7,7 @@ import backoff import pandas as pd from opentelemetry import trace +from prometheus_client import Summary from pydantic_settings import BaseSettings, SettingsConfigDict from sqlalchemy import Connection, Engine, create_engine, inspect, text from sqlalchemy.exc import OperationalError @@ -23,12 +24,13 @@ logger = logging.getLogger(__name__) tracer = trace.get_tracer(__name__) +SQLITE_CONNECT_SECONDS = Summary("sqlite_connect_seconds", "Time spent connecting sqlite databases.", ["db_name", "table_name"]) REPORT_YEAR_PARTITION = "substr(report_date, 1, 4)" -# TODO(rousik): for the sake of unit-testing, we should be passing -# these as `settings` to the individual analyzers. +# TODO(rousik): for the sake of unit-testing, we should be passing +# these as `settings` to the individual analyzers. class SQLiteSettings(BaseSettings): """Default configuration for this module.""" @@ -46,6 +48,10 @@ class SQLiteSettings(BaseSettings): # rows for the matching PKs. single_pass_pk_comparison: bool = False + # If this is set to True, records and columns outside of PK columns will be + # compared. By default we only look for PK columns. + enable_full_row_comparison: bool = False + class Database(TypeDef): """Represents a database.""" @@ -138,7 +144,7 @@ class TableAnalyzer(Analyzer): def get_title(self) -> str: """Returns the title of the analysis.""" - title = f"## Table {self.db_name}/{self.table_name}" + title = f"Table {self.db_name}/{self.table_name}" if self.partition_key: title += f" (partition {self.get_partition_func()}=`{self.partition_key}`)" return title @@ -247,7 +253,7 @@ def split_to_partitioned_tasks( if md.tell() > 0: yield Result(markdown=md.getvalue()) - + @backoff.on_exception(backoff.expo, OperationalError, max_tries=4) def retry_connect(self, engine: Engine) -> Connection: """Connects to the database, retrying on OperationalError.""" @@ -266,9 +272,6 @@ def execute(self, task_queue: TaskQueueInterface) -> Iterator[Result]: l_db_engine = create_engine(f"sqlite:///{self.left_db_path}") r_db_engine = create_engine(f"sqlite:///{self.right_db_path}") - lconn = self.retry_connect(l_db_engine) - rconn = self.retry_connect(r_db_engine) - # TODO(rousik): test for schema discrepancies here. l_pk = self.get_pk_columns(l_db_engine) r_pk = self.get_pk_columns(r_db_engine) @@ -278,11 +281,17 @@ def execute(self, task_queue: TaskQueueInterface) -> Iterator[Result]: f"Primary key columns for {self.table_name} do not match." ) + with SQLITE_CONNECT_SECONDS.labels(self.db_name, self.table_name).time(): + lconn = self.retry_connect(l_db_engine) + rconn = self.retry_connect(r_db_engine) + if not self.partition_key and self.is_partitioned_table(): for res in self.split_to_partitioned_tasks(task_queue, lconn, rconn): yield res if not l_pk: + if not self.settings.enable_full_row_comparison: + return for res in self.compare_raw_tables(lconn, rconn): yield res else: @@ -446,34 +455,35 @@ def compare_pk_tables( ) yield Result(markdown=f" * removed {rows_removed} rows {pct_change}\n") - with tracer.start_as_current_span("compare_overlapping_rows") as sp: - sp.set_attribute("num_rows", len(overlap_index)) - if self.settings.single_pass_pk_comparison: - ldf = ldf.loc[overlap_index] - rdf = rdf.loc[overlap_index] - else: - with tracer.start_as_current_span("load_overlapping_rows"): - ldf = self.get_records(ldb, columns=cols_intact, index_columns=pk_cols) + if self.settings.enable_full_row_comparison: + with tracer.start_as_current_span("compare_overlapping_rows") as sp: + sp.set_attribute("num_rows", len(overlap_index)) + if self.settings.single_pass_pk_comparison: ldf = ldf.loc[overlap_index] - - rdf = self.get_records(rdb, columns=cols_intact, index_columns=pk_cols) rdf = rdf.loc[overlap_index] - - diff_rows = ldf.compare(rdf, result_names=("left", "right")) - rows_changed = len(diff_rows) - if rows_changed: - pct_change = float(rows_changed) * 100 / orig_row_count - yield Result(markdown=f" * changed {rows_changed} rows ({pct_change:.2f}% change)\n") - - # calculate number of rows that have changes in a particular column - changes_per_col = (~diff_rows.T.isna()).groupby(level=0).any().T.sum() - changes_per_col = changes_per_col.to_frame().reset_index() - changes_per_col.columns = ["column_name", "num_rows"] - - # TODO(rousik): assign column names: column_name, rows_changed - # TODO(rousik): This could be severity DIAGNOSTIC. - - cc = StringIO() - cc.write("\nNumber of changes found per column:\n\n") - cc.write(changes_per_col.to_markdown()) - yield Result(severity=ReportSeverity.DIAGNOSTIC, markdown=cc.getvalue()) \ No newline at end of file + else: + with tracer.start_as_current_span("load_overlapping_rows"): + ldf = self.get_records(ldb, columns=cols_intact, index_columns=pk_cols) + ldf = ldf.loc[overlap_index] + + rdf = self.get_records(rdb, columns=cols_intact, index_columns=pk_cols) + rdf = rdf.loc[overlap_index] + + diff_rows = ldf.compare(rdf, result_names=("left", "right")) + rows_changed = len(diff_rows) + if rows_changed: + pct_change = float(rows_changed) * 100 / orig_row_count + yield Result(markdown=f" * changed {rows_changed} rows ({pct_change:.2f}% change)\n") + + # calculate number of rows that have changes in a particular column + changes_per_col = (~diff_rows.T.isna()).groupby(level=0).any().T.sum() + changes_per_col = changes_per_col.to_frame().reset_index() + changes_per_col.columns = ["column_name", "num_rows"] + + # TODO(rousik): assign column names: column_name, rows_changed + # TODO(rousik): This could be severity DIAGNOSTIC. + + cc = StringIO() + cc.write("\nNumber of changes found per column:\n\n") + cc.write(changes_per_col.to_markdown()) + yield Result(severity=ReportSeverity.DIAGNOSTIC, markdown=cc.getvalue()) \ No newline at end of file diff --git a/src/pudl_output_differ/task_queue.py b/src/pudl_output_differ/task_queue.py index 4b85145..25af110 100644 --- a/src/pudl_output_differ/task_queue.py +++ b/src/pudl_output_differ/task_queue.py @@ -8,14 +8,17 @@ import threading import traceback from collections import Counter +from datetime import datetime, timedelta from enum import IntEnum from io import StringIO from time import sleep from uuid import uuid1 -from opentelemetry import trace +import prometheus_client as prom +from opentelemetry import context, trace from progress.bar import Bar from pydantic import UUID1, BaseModel, Field +from pydantic_settings import BaseSettings, SettingsConfigDict from pudl_output_differ.sqlite import Database from pudl_output_differ.types import ( @@ -29,14 +32,23 @@ tracer = trace.get_tracer(__name__) +class TaskQueueSettings(BaseSettings): + """Settings for the task queue.""" + model_config = SettingsConfigDict(env_prefix="diff_") + + max_workers: int = 1 + max_db_concurrency: int = 4 + task_duration_warning_threshold: timedelta = timedelta(minutes=5) + + class ExecutionState(IntEnum): """Encodes possible state of Analysis in the queue. - + Each analysis will start as PENDING, once it is chosen to be executed, it moves to RUNNING and once it is completed, it can be either COMPLETED or FAILED. """ - PENDING = 0 + PENDING = 0 RUNNING = 1 COMPLETED = 2 FAILED = 3 @@ -49,6 +61,8 @@ class AnalysisMetadata(BaseModel): analyzer: Analyzer results: list[Result] = [] state: ExecutionState = ExecutionState.PENDING + start_time: datetime | None = None + end_time: datetime | None = None # TODO(rousik): perhaps add field for the traceback of the exception # or the exception itself (?); perhaps embedding this in the report is # okay also. @@ -56,7 +70,7 @@ class AnalysisMetadata(BaseModel): class TaskQueue: """Thread pool backed executor for diff evaluation.""" - def __init__(self, max_workers: int = 1, no_threadpool: bool = False): + def __init__(self, settings = TaskQueueSettings()): # TODO(rousik): when dealing with sqlite tables, we could consider # estimating their payload size and assigning cost to each task # to ensure that we do not overload the worker with memory @@ -66,7 +80,7 @@ def __init__(self, max_workers: int = 1, no_threadpool: bool = False): # We could also indicate which tables are possibly expensive # in the differ configuration, which will eliminate the need # for dynamically estimating the cost. - self.max_workers = max_workers + self.settings = settings self.executor = concurrent.futures.ThreadPoolExecutor() self._lock = threading.Lock() self._cond = threading.Condition(self._lock) @@ -74,25 +88,35 @@ def __init__(self, max_workers: int = 1, no_threadpool: bool = False): self.inflight_tasks: dict[UUID1, AnalysisMetadata] = {} self.completed_tasks: dict[UUID1, AnalysisMetadata] = {} self.runners: list[concurrent.futures.Future] = [] - self.max_db_concurrency = 2 self.progress_bar = Bar(max=100) - + self.prom_tasks_pending = prom.Gauge("tasks_pending", "Number of tasks pending") + self.prom_tasks_inflight = prom.Gauge("tasks_inflight", "Number of tasks in-flight") + self.prom_tasks_done = prom.Gauge("tasks_done", "Number of tasks completed") + self.prom_tasks_processed = prom.Counter("tasks_processed", "Number of tasks processed by the differ", ["runner_id"]) + self.prom_wait_for_work = prom.Summary("wait_for_work_seconds", "Time spent waiting for work", ["runner_id"]) + def has_unfinished_tasks(self): """Returns true if any tasks are pending or in-flight.""" with self._lock: return len(self.pending_tasks) + len(self.inflight_tasks) > 0 - + + def update_task_stats(self): + """Updates prometheus metrics for task stats.""" + self.prom_tasks_pending.set(len(self.pending_tasks)) + self.prom_tasks_inflight.set(len(self.inflight_tasks)) + self.prom_tasks_done.set(len(self.completed_tasks)) + def get_next_task(self) -> AnalysisMetadata | None: """Returns next pending analysis to run. - + Returns None if there's no task to run. This could mean the queue - is empty, or maybe that all tasks have constraints that can't be + is empty, or maybe that all tasks have constraints that can't be currently met. """ with self._lock: if not self.pending_tasks: return None - + # TODO(rousik): pick next task such there's only single RUNNING # task that has specific Database() instance in its path. @@ -105,20 +129,42 @@ def get_next_task(self) -> AnalysisMetadata | None: for am in self.pending_tasks.values(): db: Database | None = am.object_path.get_first(Database) # type: ignore - if db is not None and db_open_count.get(db.name, 0) >= self.max_db_concurrency: - # This task is currently not viable for scheduling. - continue - + if db is not None: + if db_open_count.get(db.name, 0) >= self.settings.max_db_concurrency: + # Task not schedulable due to db concurrency limits. + continue + # This task can be scheduled, move it to in-flight, change status and return. am.state = ExecutionState.RUNNING + am.start_time = datetime.now() del self.pending_tasks[am.id] self.inflight_tasks[am.id] = am + self.update_task_stats() return am - + return None - + + def slow_task_tracker(self) -> None: + """This method monitors in-flight tasks and emits warning if any tasks take too long.""" + already_warned = set() + while self.has_unfinished_tasks(): + now = datetime.now() + with self._lock: + for am in self.inflight_tasks.values(): + if am.start_time is None: + continue + runs_for = now - am.start_time + if runs_for > self.settings.task_duration_warning_threshold: + if am.id in already_warned: + continue + already_warned.add(am.id) + logger.warning( + f"{am.analyzer.get_title()}: slow! Running for {runs_for}." + ) + sleep(5) + @tracer.start_as_current_span("TaskQueue.analysis_runner") - def analysis_runner(self, runner_id: str = "") -> None: + def analysis_runner(self, runner_id: str = "", context: None | context.Context = None) -> None: """Runs the analyses in the queue until there are no more pending tasks. This method is expected to be run multiple times in parallel and process @@ -126,54 +172,55 @@ def analysis_runner(self, runner_id: str = "") -> None: possible runtime exceptions. """ tasks_processed = 0 - trace.get_current_span().set_attribute("runner_id", runner_id) - # TODO(rousik): wrap this in spans to make it work well. - while self.has_unfinished_tasks(): - with tracer.start_as_current_span("select_next_task"): - cur_task = self.get_next_task() - if cur_task is None: - logger.debug("No tasks to run, waiting for more...") - # We should ideally wait for when any of the in-flight tasks - # complete and then recalculate. But waiting for 1s and eagerly - # trying again is a reasonable workaround. - sleep(1) - continue + with tracer.start_as_current_span("TaskQueue.analysis_runner", context=context) as sp: + sp.set_attribute("runner_id", runner_id) + # TODO(rousik): wrap this in spans to make it work well. + while self.has_unfinished_tasks(): + with self.prom_wait_for_work.labels(runner_id).time(): + cur_task = self.get_next_task() + if cur_task is None: + logger.debug("No tasks to run, waiting for more...") + # We should ideally wait for when any of the in-flight tasks + # complete and then recalculate. But waiting for 1s and eagerly + # trying again is a reasonable workaround. + sleep(1) + continue - analyzer_name = cur_task.analyzer.__class__.__name__ - try: - tasks_processed += 1 - with tracer.start_as_current_span(f"{analyzer_name}.execute") as sp: - sp.set_attribute("object_path", str(cur_task.object_path)) - cur_task.results = cur_task.analyzer.execute_sync(self) - cur_task.state = ExecutionState.COMPLETED - except Exception: - cur_task.state = ExecutionState.FAILED - cur_task.results.append( - Result( - severity=ReportSeverity.EXCEPTION, - markdown=f"\n```\n{traceback.format_exc()}\n```\n", + analyzer_name = cur_task.analyzer.__class__.__name__ + try: + tasks_processed += 1 + with tracer.start_as_current_span(f"{analyzer_name}.execute") as sp: + sp.set_attribute("object_path", str(cur_task.object_path)) + cur_task.results = cur_task.analyzer.execute_sync(self) + cur_task.state = ExecutionState.COMPLETED + cur_task.end_time = datetime.now() + except Exception: + cur_task.state = ExecutionState.FAILED + cur_task.end_time = datetime.now() + + cur_task.results.append( + Result( + severity=ReportSeverity.EXCEPTION, + markdown=f"\n```\n{traceback.format_exc()}\n```\n", + ) ) - ) - # Move the analysis to completed state. - with self._lock: - self.progress_bar.max = ( - len(self.pending_tasks) + - len(self.inflight_tasks) + - len(self.completed_tasks) - ) - self.progress_bar.next() - self.completed_tasks[cur_task.id] = cur_task - del self.inflight_tasks[cur_task.id] - - # The following print-as-they-become available should be a debug feature. - # TODO(rousik): It should be possible to turn this functionality off. - # with self._lock: - # if cur_task.results: - # print(cur_task.analyzer.get_title()) - # for res in cur_task.results: - # print(res.markdown) - logger.info(f"Runner {runner_id} terminating after {tasks_processed} tasks.") + # Move the analysis to completed state. + with self._lock: + self.prom_tasks_processed.labels(runner_id).inc() + self.progress_bar.max = ( + len(self.pending_tasks) + + len(self.inflight_tasks) + + len(self.completed_tasks) + ) + self.progress_bar.next() + self.completed_tasks[cur_task.id] = cur_task + del self.inflight_tasks[cur_task.id] + self.update_task_stats() + + logger.info(f"Runner {runner_id} terminating after {tasks_processed} tasks.") + + @tracer.start_as_current_span("TaskQueue.run") def run(self, wait: bool=True): """Kicks off analysis_runners in the thread pool. @@ -181,8 +228,10 @@ def run(self, wait: bool=True): wait: if True, this method will block until all tasks are completed. """ - for i in range(self.max_workers): - self.runners.append(self.executor.submit(self.analysis_runner, runner_id=str(i))) + ctx = context.get_current() + for i in range(self.settings.max_workers): + self.runners.append(self.executor.submit(self.analysis_runner, runner_id=str(i), context=ctx)) + self.runners.append(self.executor.submit(self.slow_task_tracker)) if wait: self.wait() @@ -194,8 +243,9 @@ def put(self, analyzer: Analyzer): analyzer=analyzer, ) self.pending_tasks[am.id] = am + self.update_task_stats() # Do we need to kick of analyzer threads/tasks? - + def wait(self): """Awaits until all tasks are completed. """ @@ -225,15 +275,39 @@ def to_markdown(self) -> str: for sev, cnt in by_severity.most_common(): md.write(f"| {sev} | {cnt} |\n") - # TODO(rousik): We might want to add a summary here, with the following - # information: - # 1. number of results by type - # 2. number of exceptions (if any) - # 3. generated TOC for quick navigation (we may use hash of object_path as href) - # 4. evaluation stats (e.g. number of workers, total time elapsed, ...) + # List slow tasks (if any) + slow_tasks = [] + for t in sorted_tasks: + if t.start_time is None or t.end_time is None: + continue + if t.end_time - t.start_time > self.settings.task_duration_warning_threshold: + slow_tasks.append(t) + + if slow_tasks: + slow_tasks = sorted(slow_tasks, key=lambda t: t.end_time - t.start_time, reverse=True) + md.write("\n") + md.write(f"{len(slow_tasks)} slow tasks:\n\n") + md.write("| Analysis | Duration |\n") + md.write("| -------- | -------- |\n") + for st in slow_tasks: + md.write(f"| {st.analyzer.get_title()} | {st.end_time - st.start_time} |\n") + md.write("\n") + # TODO(rousik): it might make sense to let analyzers construct their own fully qualified + # names with the relevant path components, e.g. + # TableAnalyzer(pudl.sqlite/ferc1_respondent_id) or something like that. + + # TODO(rousik): Other things we may want to add to the summary: + # - left and right paths + # - total time for evaulation + # - runtime stats (e.g. number of workers, settings, ...) + # - slowest 5 eval tasks + # - generate TOC for quick navigation + + # Exceptions may be moved to some other section rather than being + # embedded in the regular report as they are now. for task in sorted_tasks: if task.results: - md.write(f"\n{task.analyzer.get_title()}\n") + md.write(f"\n## {task.analyzer.get_title()}\n") for res in task.results: # TODO(rousik): We may add some indication of severity here. md.write(res.markdown.rstrip() + "\n") diff --git a/src/pudl_output_differ/types.py b/src/pudl_output_differ/types.py index 3d0eebc..6af207b 100644 --- a/src/pudl_output_differ/types.py +++ b/src/pudl_output_differ/types.py @@ -1,7 +1,7 @@ """Generic types used in output diffing.""" -from enum import IntEnum import logging from abc import ABC, abstractmethod +from enum import IntEnum from functools import total_ordering from io import StringIO from typing import Iterator, Protocol