diff --git a/python/whylogs/api/logger/experimental/logger/actor/pipe_signaler.py b/python/whylogs/api/logger/experimental/logger/actor/pipe_signaler.py index d7a03c2356..17980cc08a 100644 --- a/python/whylogs/api/logger/experimental/logger/actor/pipe_signaler.py +++ b/python/whylogs/api/logger/experimental/logger/actor/pipe_signaler.py @@ -97,11 +97,6 @@ def _start_poll_conn(self) -> None: def run(self) -> None: self._start_poll_conn() - def close_child(self) -> None: - """ - This method is no longer needed as queues do not require manual closing of file descriptors. - """ - def close(self) -> None: """ Closes the thread and all resources. This should be diff --git a/python/whylogs/api/logger/experimental/logger/actor/process_actor.py b/python/whylogs/api/logger/experimental/logger/actor/process_actor.py index 661afadcb4..1e1225c722 100644 --- a/python/whylogs/api/logger/experimental/logger/actor/process_actor.py +++ b/python/whylogs/api/logger/experimental/logger/actor/process_actor.py @@ -100,6 +100,8 @@ def close(self) -> None: super().close() self._wrapper.close() + if self._pipe_signaler is not None: + self._pipe_signaler.close() def _signal(self, messages: Sequence[SyncMessage] = [], error: Optional[Exception] = None) -> None: if self._pipe_signaler is None: @@ -145,6 +147,9 @@ def start(self) -> None: it's created, unlike the thread version which can just be automatically started from within its init. There must be some post-init setup that needs to be done. """ + if self._pipe_signaler is not None: + self._pipe_signaler.start() + self.daemon = True super().start() self.join(0.1) # This does apparently need to happen after several manual tests. diff --git a/python/whylogs/api/logger/experimental/logger/actor/process_rolling_logger.py b/python/whylogs/api/logger/experimental/logger/actor/process_rolling_logger.py index 65cd31aac8..64a3aec8e8 100644 --- a/python/whylogs/api/logger/experimental/logger/actor/process_rolling_logger.py +++ b/python/whylogs/api/logger/experimental/logger/actor/process_rolling_logger.py @@ -151,8 +151,8 @@ def create_logger(self, dataset_id: str, options: LoggerOptions) -> ThreadRollin class BaseProcessRollingLogger( - ProcessActor[Union[AdditionalMessages, BuiltinMessageTypes], Union[ProcessLoggerStatus, LoggerStatus, None]], - DataLogger[Union[ProcessLoggerStatus, LoggerStatus, None]], + ProcessActor[Union[AdditionalMessages, BuiltinMessageTypes], ProcessLoggerStatus], + DataLogger[ProcessLoggerStatus], Generic[AdditionalMessages], ): """ @@ -268,9 +268,6 @@ def process_close_message(self, messages: List[CloseMessage]) -> None: self._logger.info(f"Closing whylogs logger for {datasetId}") logger.close() - if self._pipe_signaler is not None: - self._pipe_signaler.close_child() - def process_pubsub(self, messages: List[RawPubSubMessage]) -> None: self._logger.info("Processing pubsub message") msgs = [msg["log_request"] for msg in [it.to_pubsub_message() for it in messages] if msg is not None] @@ -427,8 +424,8 @@ def log( sync=sync, ) - result: Optional["Future[Union[ProcessLoggerStatus, LoggerStatus, None]]"] = ( - cast("Future[Union[ProcessLoggerStatus, LoggerStatus, None]]", Future()) if sync else None + result: Optional["Future[ProcessLoggerStatus]"] = ( + cast("Future[ProcessLoggerStatus]", Future()) if sync else None ) if result is not None: self._logger.debug(f"Registering result id {message.id} for synchronous logging") @@ -458,18 +455,8 @@ def run(self) -> None: self._logger.debug(f"Started process logger with pid {os.getpid()}") super().run() - def start(self) -> None: - self._logger.debug(f"Starting process logger from pid {os.getpid()}") - # This is started in the parent process, not in the child process. It must be started - # before the process itself start right below. - if self._pipe_signaler is not None: - self._pipe_signaler.start() - super().start() - def close(self) -> None: super().close() - if self._pipe_signaler is not None: - self._pipe_signaler.close() class ProcessRollingLogger(BaseProcessRollingLogger[NoReturn]): diff --git a/python/whylogs/api/whylabs/session/config.py b/python/whylogs/api/whylabs/session/config.py index 3416cac5fa..2e2b20d57d 100644 --- a/python/whylogs/api/whylabs/session/config.py +++ b/python/whylogs/api/whylabs/session/config.py @@ -76,6 +76,7 @@ class SessionConfig: def __init__(self, init_config: Optional[InitConfig] = None) -> None: self._init_config = init_config or InitConfig() self.logger = logging.getLogger("config") + self._ensure_config_exists = False if self._init_config.force_local is True else True self.auth_path = ( Path(self._init_config.config_path) if self._init_config.config_path else self.get_config_file_path() ) @@ -114,7 +115,11 @@ def _load_value(self, env_name: EnvVariableName, config_name: ConfigVariableName return ConfigFile.get_variable_from_config_file(self._config_parser, config_name) def _load_or_prompt( - self, env_name: EnvVariableName, config_name: ConfigVariableName, persist: bool = False, password: bool = True + self, + env_name: EnvVariableName, + config_name: ConfigVariableName, + persist: bool = False, + password: bool = True, ) -> Optional[str]: """ Loads a configuration value like _load_value does, but it will also prompt the user for the value if @@ -194,7 +199,8 @@ def set_whylabs_refernce_profile_name(self, name: str) -> None: def get_whylabs_endpoint(self) -> str: return ( self._load_value( - env_name=EnvVariableName.WHYLABS_API_ENDPOINT, config_name=ConfigVariableName.WHYLABS_API_ENDPOINT + env_name=EnvVariableName.WHYLABS_API_ENDPOINT, + config_name=ConfigVariableName.WHYLABS_API_ENDPOINT, ) or _DEFAULT_WHYLABS_HOST ) @@ -207,16 +213,18 @@ def get_config_file_path(self) -> Path: if config_dir_path is not None: Path(config_dir_path).mkdir(parents=True, exist_ok=True) else: - config_dir_path = user_config_dir(_CONFIG_APP_NAME, ensure_exists=True) + config_dir_path = user_config_dir(_CONFIG_APP_NAME, ensure_exists=self._ensure_config_exists) config_file_path = os.path.join(config_dir_path, "config.ini") path = Path(config_file_path) - path.touch(exist_ok=True) + if self._ensure_config_exists: + path.touch(exist_ok=True) return path def get_default_dataset_id(self) -> Optional[str]: return self.tmp_default_dataset_id or self._load_value( - env_name=EnvVariableName.WHYLABS_DEFAULT_DATASET_ID, config_name=ConfigVariableName.DEFAULT_DATASET_ID + env_name=EnvVariableName.WHYLABS_DEFAULT_DATASET_ID, + config_name=ConfigVariableName.DEFAULT_DATASET_ID, ) def require_default_dataset_id(self) -> str: @@ -234,7 +242,10 @@ def get_org_id(self) -> Optional[str]: except Exception: pass - org_id = self._load_value(env_name=EnvVariableName.WHYLABS_ORG_ID, config_name=ConfigVariableName.ORG_ID) + org_id = self._load_value( + env_name=EnvVariableName.WHYLABS_ORG_ID, + config_name=ConfigVariableName.ORG_ID, + ) if org_id is not None: return org_id @@ -259,7 +270,8 @@ def set_org_id(self, org_id: str) -> None: def get_api_key(self) -> Optional[str]: return self.tmp_api_key or self._load_value( - env_name=EnvVariableName.WHYLABS_API_KEY, config_name=ConfigVariableName.API_KEY + env_name=EnvVariableName.WHYLABS_API_KEY, + config_name=ConfigVariableName.API_KEY, ) def get_env_api_key(self) -> Optional[str]: @@ -278,7 +290,10 @@ def set_user_guid(self, user_guid: str) -> None: self._set_value(ConfigVariableName.USER_GUID, user_guid) def get_session_id(self) -> Optional[str]: - return self._load_value(env_name=EnvVariableName.WHYLABS_SESSION_ID, config_name=ConfigVariableName.SESSION_ID) + return self._load_value( + env_name=EnvVariableName.WHYLABS_SESSION_ID, + config_name=ConfigVariableName.SESSION_ID, + ) def set_session_id(self, sessionId: str) -> None: self._set_value(ConfigVariableName.SESSION_ID, sessionId) @@ -324,7 +339,10 @@ def _notify_type_whylabs(self, api_key: str) -> None: def _notify_type_anon(self) -> None: anonymous_session_id = self.get_session_id() - il.success(f"Using session type: {SessionType.WHYLABS_ANONYMOUS.name}", ignore_suppress=True) + il.success( + f"Using session type: {SessionType.WHYLABS_ANONYMOUS.name}", + ignore_suppress=True, + ) id_text = "" if not anonymous_session_id else anonymous_session_id il.option(f"session id: {id_text}", ignore_suppress=True)