diff --git a/temporalio/bridge/Cargo.lock b/temporalio/bridge/Cargo.lock index 25454cda..916f561d 100644 --- a/temporalio/bridge/Cargo.lock +++ b/temporalio/bridge/Cargo.lock @@ -375,7 +375,7 @@ dependencies = [ "autocfg", "cfg-if", "crossbeam-utils", - "memoffset 0.9.0", + "memoffset", "scopeguard", ] @@ -1155,15 +1155,6 @@ version = "2.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f665ee40bc4a3c5590afb1e9677db74a508659dfd71e126420da8274909a0167" -[[package]] -name = "memoffset" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d61c719bcfbcf5d62b3a09efa6088de8c54bc0bfcd3ea7ae39fcc186108b8de1" -dependencies = [ - "autocfg", -] - [[package]] name = "memoffset" version = "0.9.0" @@ -1712,14 +1703,14 @@ checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" [[package]] name = "pyo3" -version = "0.18.3" +version = "0.19.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3b1ac5b3731ba34fdaa9785f8d74d17448cd18f30cf19e0c7e7b1fdb5272109" +checksum = "e681a6cfdc4adcc93b4d3cf993749a4552018ee0a9b65fc0ccfad74352c72a38" dependencies = [ "cfg-if", "indoc", "libc", - "memoffset 0.8.0", + "memoffset", "parking_lot", "pyo3-build-config", "pyo3-ffi", @@ -1729,9 +1720,9 @@ dependencies = [ [[package]] name = "pyo3-asyncio" -version = "0.18.0" +version = "0.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d3564762e37035cfc486228e10b0528460fa026d681b5763873c693aa0d5c260" +checksum = "a2cc34c1f907ca090d7add03dc523acdd91f3a4dab12286604951e2f5152edad" dependencies = [ "futures", "once_cell", @@ -1742,9 +1733,9 @@ dependencies = [ [[package]] name = "pyo3-build-config" -version = "0.18.3" +version = "0.19.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9cb946f5ac61bb61a5014924910d936ebd2b23b705f7a4a3c40b05c720b079a3" +checksum = "076c73d0bc438f7a4ef6fdd0c3bb4732149136abd952b110ac93e4edb13a6ba5" dependencies = [ "once_cell", "target-lexicon", @@ -1752,9 +1743,9 @@ dependencies = [ [[package]] name = "pyo3-ffi" -version = "0.18.3" +version = "0.19.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd4d7c5337821916ea2a1d21d1092e8443cf34879e53a0ac653fbb98f44ff65c" +checksum = "e53cee42e77ebe256066ba8aa77eff722b3bb91f3419177cf4cd0f304d3284d9" dependencies = [ "libc", "pyo3-build-config", @@ -1762,9 +1753,9 @@ dependencies = [ [[package]] name = "pyo3-macros" -version = "0.18.3" +version = "0.19.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9d39c55dab3fc5a4b25bbd1ac10a2da452c4aca13bb450f22818a002e29648d" +checksum = "dfeb4c99597e136528c6dd7d5e3de5434d1ceaf487436a3f03b2d56b6fc9efd1" dependencies = [ "proc-macro2", "pyo3-macros-backend", @@ -1774,15 +1765,25 @@ dependencies = [ [[package]] name = "pyo3-macros-backend" -version = "0.18.3" +version = "0.19.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97daff08a4c48320587b5224cc98d609e3c27b6d437315bd40b605c98eeb5918" +checksum = "947dc12175c254889edc0c02e399476c2f652b4b9ebd123aa655c224de259536" dependencies = [ "proc-macro2", "quote", "syn 1.0.109", ] +[[package]] +name = "pythonize" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e35b716d430ace57e2d1b4afb51c9e5b7c46d2bce72926e07f9be6a98ced03e" +dependencies = [ + "pyo3", + "serde", +] + [[package]] name = "quanta" version = "0.11.1" @@ -2395,6 +2396,7 @@ dependencies = [ name = "temporal-sdk-bridge" version = "0.1.0" dependencies = [ + "futures", "log", "once_cell", "parking_lot", @@ -2402,6 +2404,7 @@ dependencies = [ "prost-types", "pyo3", "pyo3-asyncio", + "pythonize", "temporal-client", "temporal-sdk-core", "temporal-sdk-core-api", diff --git a/temporalio/bridge/Cargo.toml b/temporalio/bridge/Cargo.toml index a2fa315b..fe0d5fcf 100644 --- a/temporalio/bridge/Cargo.toml +++ b/temporalio/bridge/Cargo.toml @@ -8,13 +8,15 @@ name = "temporal_sdk_bridge" crate-type = ["cdylib"] [dependencies] +futures = "0.3" log = "0.4" once_cell = "1.16.0" parking_lot = "0.12" prost = "0.11" prost-types = "0.11" -pyo3 = { version = "0.18", features = ["extension-module", "abi3-py37"] } -pyo3-asyncio = { version = "0.18", features = ["tokio-runtime"] } +pyo3 = { version = "0.19", features = ["extension-module", "abi3-py37"] } +pyo3-asyncio = { version = "0.19", features = ["tokio-runtime"] } +pythonize = "0.19" temporal-client = { version = "0.1.0", path = "./sdk-core/client" } temporal-sdk-core = { version = "0.1.0", path = "./sdk-core/core", features = ["ephemeral-server"] } temporal-sdk-core-api = { version = "0.1.0", path = "./sdk-core/core-api" } diff --git a/temporalio/bridge/runtime.py b/temporalio/bridge/runtime.py index f86c4a87..c61c76e0 100644 --- a/temporalio/bridge/runtime.py +++ b/temporalio/bridge/runtime.py @@ -6,7 +6,9 @@ from __future__ import annotations from dataclasses import dataclass -from typing import Any, Mapping, Optional, Sequence, Type +from typing import Any, Callable, Dict, Mapping, Optional, Sequence, Type + +from typing_extensions import Protocol import temporalio.bridge.temporal_sdk_bridge @@ -29,13 +31,21 @@ def retrieve_buffered_metrics(self) -> Sequence[Any]: """Get buffered metrics.""" return self._ref.retrieve_buffered_metrics() + def write_test_info_log(self, message: str, extra_data: str) -> None: + """Write a test core log at INFO level.""" + self._ref.write_test_info_log(message, extra_data) + + def write_test_debug_log(self, message: str, extra_data: str) -> None: + """Write a test core log at DEBUG level.""" + self._ref.write_test_debug_log(message, extra_data) + @dataclass(frozen=True) class LoggingConfig: """Python representation of the Rust struct for logging config.""" filter: str - forward: bool + forward_to: Optional[Callable[[Sequence[BufferedLogEntry]], None]] @dataclass(frozen=True) @@ -75,3 +85,42 @@ class TelemetryConfig: logging: Optional[LoggingConfig] metrics: Optional[MetricsConfig] + + +# WARNING: This must match Rust runtime::BufferedLogEntry +class BufferedLogEntry(Protocol): + """A buffered log entry.""" + + @property + def target(self) -> str: + """Target category for the log entry.""" + ... + + @property + def message(self) -> str: + """Log message.""" + ... + + @property + def time(self) -> float: + """Time as from ``time.time`` since Unix epoch.""" + ... + + @property + def level(self) -> int: + """Python log level, with trace as 9.""" + ... + + @property + def fields(self) -> Dict[str, Any]: + """Additional log entry fields. + Requesting this property performs a conversion from the internal + representation to the Python representation on every request. Therefore + callers should store the result instead of repeatedly calling. + + Raises: + Exception: If the internal representation cannot be converted. This + should not happen and if it does it is considered a bug in the + SDK and should be reported. + """ + ... diff --git a/temporalio/bridge/sdk-core b/temporalio/bridge/sdk-core index ee7f6caa..7b0b1708 160000 --- a/temporalio/bridge/sdk-core +++ b/temporalio/bridge/sdk-core @@ -1 +1 @@ -Subproject commit ee7f6caaa9ad00bca2e2db8093d444ce563d6016 +Subproject commit 7b0b170830a034cc47820d9b8d5bbcec459001c7 diff --git a/temporalio/bridge/src/lib.rs b/temporalio/bridge/src/lib.rs index 660ce94a..6d10dea4 100644 --- a/temporalio/bridge/src/lib.rs +++ b/temporalio/bridge/src/lib.rs @@ -26,6 +26,7 @@ fn temporal_sdk_bridge(py: Python, m: &PyModule) -> PyResult<()> { // Runtime stuff m.add_class::()?; + m.add_class::()?; m.add_function(wrap_pyfunction!(init_runtime, m)?)?; m.add_function(wrap_pyfunction!(raise_in_thread, m)?)?; diff --git a/temporalio/bridge/src/runtime.rs b/temporalio/bridge/src/runtime.rs index dff05921..65b218d0 100644 --- a/temporalio/bridge/src/runtime.rs +++ b/temporalio/bridge/src/runtime.rs @@ -1,22 +1,28 @@ +use futures::channel::mpsc::Receiver; use pyo3::exceptions::{PyRuntimeError, PyValueError}; use pyo3::prelude::*; use pyo3::AsPyPointer; +use pythonize::pythonize; use std::collections::HashMap; use std::future::Future; use std::net::SocketAddr; use std::pin::Pin; use std::str::FromStr; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, UNIX_EPOCH}; use temporal_sdk_core::telemetry::{ - build_otlp_metric_exporter, start_prometheus_metric_exporter, MetricsCallBuffer, + build_otlp_metric_exporter, start_prometheus_metric_exporter, CoreLogStreamConsumer, + MetricsCallBuffer, }; use temporal_sdk_core::CoreRuntime; use temporal_sdk_core_api::telemetry::metrics::{CoreMeter, MetricCallBufferer}; use temporal_sdk_core_api::telemetry::{ - Logger, MetricTemporality, OtelCollectorOptionsBuilder, PrometheusExporterOptionsBuilder, - TelemetryOptions, TelemetryOptionsBuilder, + CoreLog, Logger, MetricTemporality, OtelCollectorOptionsBuilder, + PrometheusExporterOptionsBuilder, TelemetryOptionsBuilder, }; +use tokio::task::JoinHandle; +use tokio_stream::StreamExt; +use tracing::Level; use url::Url; use crate::metric::{convert_metric_events, BufferedMetricRef, BufferedMetricUpdate}; @@ -30,6 +36,7 @@ pub struct RuntimeRef { pub(crate) struct Runtime { pub(crate) core: Arc, metrics_call_buffer: Option>>, + log_forwarder_handle: Option>>, } #[derive(FromPyObject)] @@ -41,7 +48,12 @@ pub struct TelemetryConfig { #[derive(FromPyObject)] pub struct LoggingConfig { filter: String, - forward: bool, + forward_to: Option, +} + +#[pyclass] +pub struct BufferedLogEntry { + core_log: CoreLog, } #[derive(FromPyObject)] @@ -71,18 +83,52 @@ pub struct PrometheusConfig { unit_suffix: bool, } +const FORWARD_LOG_BUFFER_SIZE: usize = 2048; +const FORWARD_LOG_MAX_FREQ_MS: u64 = 10; + pub fn init_runtime(telemetry_config: TelemetryConfig) -> PyResult { + // Have to build/start telemetry config pieces + let mut telemetry_build = TelemetryOptionsBuilder::default(); + + // Build logging config, capturing forwarding info to start later + let mut log_forwarding: Option<(Receiver, PyObject)> = None; + if let Some(logging_conf) = telemetry_config.logging { + telemetry_build.logging(if let Some(forward_to) = logging_conf.forward_to { + // Note, actual log forwarding is started later + let (consumer, stream) = CoreLogStreamConsumer::new(FORWARD_LOG_BUFFER_SIZE); + log_forwarding = Some((stream, forward_to)); + Logger::Push { + filter: logging_conf.filter.to_string(), + consumer: Arc::new(consumer), + } + } else { + Logger::Console { + filter: logging_conf.filter.to_string(), + } + }); + } + + // Build metric config, but actual metrics instance is late-bound after + // CoreRuntime is created since it needs Tokio runtime + if let Some(metrics_conf) = telemetry_config.metrics.as_ref() { + telemetry_build.attach_service_name(metrics_conf.attach_service_name); + if let Some(prefix) = &metrics_conf.metric_prefix { + telemetry_build.metric_prefix(prefix.to_string()); + } + } + + // Create core runtime which starts tokio multi-thread runtime let mut core = CoreRuntime::new( - // We don't move telemetry config here because we need it for - // late-binding metrics - (&telemetry_config).try_into()?, + telemetry_build + .build() + .map_err(|err| PyValueError::new_err(format!("Invalid telemetry config: {}", err)))?, tokio::runtime::Builder::new_multi_thread(), ) .map_err(|err| PyRuntimeError::new_err(format!("Failed initializing telemetry: {}", err)))?; // We late-bind the metrics after core runtime is created since it needs // the Tokio handle - let mut maybe_metrics_call_buffer: Option>> = None; + let mut metrics_call_buffer: Option>> = None; if let Some(metrics_conf) = telemetry_config.metrics { let _guard = core.tokio_handle().enter(); // If they want buffered, cannot have Prom/OTel and we make buffered @@ -92,21 +138,41 @@ pub fn init_runtime(telemetry_config: TelemetryConfig) -> PyResult { "Cannot have buffer size with OpenTelemetry or Prometheus metric config", )); } - let metrics_call_buffer = - Arc::new(MetricsCallBuffer::new(metrics_conf.buffered_with_size)); + let buffer = Arc::new(MetricsCallBuffer::new(metrics_conf.buffered_with_size)); core.telemetry_mut() - .attach_late_init_metrics(metrics_call_buffer.clone()); - maybe_metrics_call_buffer = Some(metrics_call_buffer); + .attach_late_init_metrics(buffer.clone()); + metrics_call_buffer = Some(buffer); } else { core.telemetry_mut() .attach_late_init_metrics(metrics_conf.try_into()?); } } + // Start log forwarding if needed + let log_forwarder_handle = log_forwarding.map(|(stream, callback)| { + Arc::new(core.tokio_handle().spawn(async move { + let mut stream = std::pin::pin!(stream.chunks_timeout( + FORWARD_LOG_BUFFER_SIZE, + Duration::from_millis(FORWARD_LOG_MAX_FREQ_MS) + )); + while let Some(core_logs) = stream.next().await { + // Create vec of buffered logs + let entries = core_logs + .into_iter() + .map(|core_log| BufferedLogEntry { core_log }) + .collect::>(); + // We silently swallow errors here because logging them could + // cause a bad loop and we don't want to assume console presence + let _ = Python::with_gil(|py| callback.call1(py, (entries,))); + } + })) + }); + Ok(RuntimeRef { runtime: Runtime { core: Arc::new(core), - metrics_call_buffer: maybe_metrics_call_buffer, + metrics_call_buffer, + log_forwarder_handle, }, }) } @@ -126,6 +192,15 @@ impl Runtime { } } +impl Drop for Runtime { + fn drop(&mut self) { + // Stop the log forwarder + if let Some(handle) = self.log_forwarder_handle.as_ref() { + handle.abort(); + } + } +} + #[pymethods] impl RuntimeRef { fn retrieve_buffered_metrics<'p>(&self, py: Python<'p>) -> Vec { @@ -138,34 +213,76 @@ impl RuntimeRef { .retrieve(), ) } + + fn write_test_info_log(&self, message: &str, extra_data: &str) { + let _g = tracing::subscriber::set_default( + self.runtime + .core + .telemetry() + .trace_subscriber() + .unwrap() + .clone(), + ); + tracing::info!(message, extra_data = extra_data); + } + + fn write_test_debug_log(&self, message: &str, extra_data: &str) { + let _g = tracing::subscriber::set_default( + self.runtime + .core + .telemetry() + .trace_subscriber() + .unwrap() + .clone(), + ); + tracing::debug!(message, extra_data = extra_data); + } } -impl TryFrom<&TelemetryConfig> for TelemetryOptions { - type Error = PyErr; +// WARNING: This must match temporalio.bridge.runtime.BufferedLogEntry protocol +#[pymethods] +impl BufferedLogEntry { + #[getter] + fn target(&self) -> &str { + &self.core_log.target + } - fn try_from(conf: &TelemetryConfig) -> PyResult { - let mut build = TelemetryOptionsBuilder::default(); - if let Some(logging_conf) = &conf.logging { - build.logging(if logging_conf.forward { - Logger::Forward { - filter: logging_conf.filter.to_string(), - } - } else { - Logger::Console { - filter: logging_conf.filter.to_string(), - } - }); - } - if let Some(metrics_conf) = &conf.metrics { - // Note, actual metrics instance is late-bound in init_runtime - build.attach_service_name(metrics_conf.attach_service_name); - if let Some(prefix) = &metrics_conf.metric_prefix { - build.metric_prefix(prefix.to_string()); - } + #[getter] + fn message(&self) -> &str { + &self.core_log.message + } + + #[getter] + fn time(&self) -> f64 { + self.core_log + .timestamp + .duration_since(UNIX_EPOCH) + .unwrap_or(Duration::ZERO) + .as_secs_f64() + } + + #[getter] + fn level(&self) -> u8 { + // Convert to Python log levels, with trace as 9 + match self.core_log.level { + Level::TRACE => 9, + Level::DEBUG => 10, + Level::INFO => 20, + Level::WARN => 30, + Level::ERROR => 40, } - build - .build() - .map_err(|err| PyValueError::new_err(format!("Invalid telemetry config: {}", err))) + } + + #[getter] + fn fields(&self, py: Python<'_>) -> PyResult> { + self.core_log + .fields + .iter() + .map(|(key, value)| match pythonize(py, value) { + Ok(value) => Ok((key.as_str(), value)), + Err(err) => Err(err.into()), + }) + .collect() } } diff --git a/temporalio/runtime.py b/temporalio/runtime.py index b2650e9d..494d9e09 100644 --- a/temporalio/runtime.py +++ b/temporalio/runtime.py @@ -5,12 +5,14 @@ from __future__ import annotations +import logging +import time from dataclasses import dataclass, field from datetime import timedelta from enum import Enum from typing import ClassVar, Mapping, NewType, Optional, Sequence, Union -from typing_extensions import Literal, Protocol +from typing_extensions import Protocol import temporalio.bridge.metric import temporalio.bridge.runtime @@ -114,6 +116,10 @@ class LoggingConfig: filter: Union[TelemetryFilter, str] """Filter for logging. Can use :py:class:`TelemetryFilter` or raw string.""" + forwarding: Optional[LogForwardingConfig] = None + """If present, Core logger messages will be forwarded to a Python logger. + See the :py:class:`LogForwardingConfig` docs for more info.""" + default: ClassVar[LoggingConfig] """Default logging configuration of Core WARN level and other ERROR level. @@ -124,8 +130,7 @@ def _to_bridge_config(self) -> temporalio.bridge.runtime.LoggingConfig: filter=self.filter if isinstance(self.filter, str) else self.filter.formatted(), - # Log forwarding not currently supported in Python - forward=False, + forward_to=None if not self.forwarding else self.forwarding._on_logs, ) @@ -133,6 +138,92 @@ def _to_bridge_config(self) -> temporalio.bridge.runtime.LoggingConfig: filter=TelemetryFilter(core_level="WARN", other_level="ERROR") ) +_module_start_time = time.time() + + +@dataclass +class LogForwardingConfig: + """Configuration for log forwarding from Core. + + Configuring this will send logs from Core to the given Python logger. By + default, log timestamps are overwritten and internally throttled/buffered + for a few milliseconds to prevent overloading Python. This means those log + records may have a time in the past and technically may appear out of order + with Python-originated log messages by a few milliseconds. + + If for some reason lots of logs occur within the buffered time (i.e. + thousands), they may be sent earlier. Users are discouraged from using this + with ``TRACE`` Core logging. + + All log records produced have a ``temporal_log`` attribute that contains a + representation of the Core log. This representation has a ``fields`` + attribute which has arbitrary extra data from Core. By default a string + representation of this extra ``fields`` attribute is appended to the + message. + """ + + logger: logging.Logger + """Core logger messages will be sent to this logger.""" + + append_target_to_name: bool = True + """If true, the default, the target is appended to the name.""" + + prepend_target_on_message: bool = True + """If true, the default, the target is appended to the name.""" + + overwrite_log_record_time: bool = True + """If true, the default, the log record time is overwritten with the core + log time.""" + + append_log_fields_to_message: bool = True + """If true, the default, the extra fields dict is appended to the + message.""" + + def _on_logs( + self, logs: Sequence[temporalio.bridge.runtime.BufferedLogEntry] + ) -> None: + for log in logs: + # Don't go further if not enabled + level = log.level + if not self.logger.isEnabledFor(level): + continue + + # Create the record + name = self.logger.name + if self.append_target_to_name: + name += f"-sdk_core::{log.target}" + message = log.message + if self.prepend_target_on_message: + message = f"[sdk_core::{log.target}] {message}" + if self.append_log_fields_to_message: + # Swallow error converting fields (should never happen, but + # just in case) + try: + message += f" {log.fields}" + except: + pass + record = self.logger.makeRecord( + name, + level, + "(sdk-core)", + 0, + message, + (), + None, + "(sdk-core)", + {"temporal_log": log}, + None, + ) + if self.overwrite_log_record_time: + record.created = log.time + record.msecs = (record.created - int(record.created)) * 1000 + # We can't access logging module's start time and it's not worth + # doing difference math to get relative time right here, so + # we'll make time relative to _our_ module's start time + self.relativeCreated = (record.created - _module_start_time) * 1000 + # Log the record + self.logger.handle(record) + class OpenTelemetryMetricTemporality(Enum): """Temporality for OpenTelemetry metrics.""" diff --git a/tests/test_runtime.py b/tests/test_runtime.py index b90ddace..31b713f8 100644 --- a/tests/test_runtime.py +++ b/tests/test_runtime.py @@ -1,11 +1,22 @@ +import logging +import logging.handlers +import queue import uuid +from typing import List, cast from urllib.request import urlopen from temporalio import workflow from temporalio.client import Client -from temporalio.runtime import PrometheusConfig, Runtime, TelemetryConfig +from temporalio.runtime import ( + LogForwardingConfig, + LoggingConfig, + PrometheusConfig, + Runtime, + TelemetryConfig, + TelemetryFilter, +) from temporalio.worker import Worker -from tests.helpers import find_free_port +from tests.helpers import assert_eq_eventually, find_free_port @workflow.defn @@ -54,3 +65,119 @@ async def run_workflow(client: Client): assert "long_request" in f.read().decode("utf-8") with urlopen(url=f"http://{prom_addr2}/metrics") as f: assert "long_request" in f.read().decode("utf-8") + + +async def test_runtime_log_forwarding(): + # Create logger with record capture + log_queue: queue.Queue[logging.LogRecord] = queue.Queue() + log_queue_list = cast(List[logging.LogRecord], log_queue.queue) + logger = logging.getLogger(f"log-{uuid.uuid4()}") + logger.addHandler(logging.handlers.QueueHandler(log_queue)) + + async def log_queue_len() -> int: + return len(log_queue_list) + + # Create runtime + runtime = Runtime( + telemetry=TelemetryConfig( + logging=LoggingConfig( + filter=TelemetryFilter(core_level="DEBUG", other_level="ERROR"), + forwarding=LogForwardingConfig(logger=logger), + ) + ) + ) + + # Set capture only info logs + logger.setLevel(logging.INFO) + # Write some logs + runtime._core_runtime.write_test_info_log("info1", "extra1") + runtime._core_runtime.write_test_debug_log("debug2", "extra2") + runtime._core_runtime.write_test_info_log("info3", "extra3") + + # Check the expected records + await assert_eq_eventually(2, log_queue_len) + assert log_queue_list[0].levelno == logging.INFO + assert log_queue_list[0].message.startswith( + "[sdk_core::temporal_sdk_bridge::runtime] info1" + ) + assert ( + log_queue_list[0].name + == f"{logger.name}-sdk_core::temporal_sdk_bridge::runtime" + ) + assert log_queue_list[0].created == log_queue_list[0].temporal_log.time # type: ignore + assert log_queue_list[0].temporal_log.fields == {"extra_data": "extra1"} # type: ignore + assert log_queue_list[1].levelno == logging.INFO + assert log_queue_list[1].message.startswith( + "[sdk_core::temporal_sdk_bridge::runtime] info3" + ) + + # Clear logs and enable debug and try again + log_queue_list.clear() + logger.setLevel(logging.DEBUG) + runtime._core_runtime.write_test_info_log("info4", "extra4") + runtime._core_runtime.write_test_debug_log("debug5", "extra5") + runtime._core_runtime.write_test_info_log("info6", "extra6") + await assert_eq_eventually(3, log_queue_len) + assert log_queue_list[0].levelno == logging.INFO + assert log_queue_list[0].message.startswith( + "[sdk_core::temporal_sdk_bridge::runtime] info4" + ) + assert log_queue_list[1].levelno == logging.DEBUG + assert log_queue_list[1].message.startswith( + "[sdk_core::temporal_sdk_bridge::runtime] debug5" + ) + assert log_queue_list[2].levelno == logging.INFO + assert log_queue_list[2].message.startswith( + "[sdk_core::temporal_sdk_bridge::runtime] info6" + ) + + +@workflow.defn +class TaskFailWorkflow: + @workflow.run + async def run(self) -> None: + raise RuntimeError("Intentional error") + + +async def test_runtime_task_fail_log_forwarding(client: Client): + # Client with lo capturing runtime + log_queue: queue.Queue[logging.LogRecord] = queue.Queue() + log_queue_list = cast(List[logging.LogRecord], log_queue.queue) + logger = logging.getLogger(f"log-{uuid.uuid4()}") + logger.addHandler(logging.handlers.QueueHandler(log_queue)) + logger.setLevel(logging.WARN) + client = await Client.connect( + client.service_client.config.target_host, + namespace=client.namespace, + runtime=Runtime( + telemetry=TelemetryConfig( + logging=LoggingConfig( + filter=TelemetryFilter(core_level="WARN", other_level="ERROR"), + forwarding=LogForwardingConfig(logger=logger), + ) + ) + ), + ) + + # Start workflow + task_queue = f"task-queue-{uuid.uuid4()}" + async with Worker(client, task_queue=task_queue, workflows=[TaskFailWorkflow]): + handle = await client.start_workflow( + TaskFailWorkflow.run, + id=f"workflow-{uuid.uuid4()}", + task_queue=task_queue, + ) + + # Wait for log to appear + async def has_log() -> bool: + return any( + l for l in log_queue_list if "Failing workflow task" in l.message + ) + + await assert_eq_eventually(True, has_log) + + # Check record + record = next((l for l in log_queue_list if "Failing workflow task" in l.message)) + assert record.levelno == logging.WARNING + assert record.name == f"{logger.name}-sdk_core::temporal_sdk_core::worker::workflow" + assert record.temporal_log.fields["run_id"] == handle.result_run_id # type: ignore