Skip to content

Commit

Permalink
Merge pull request #1475 from whylabs/container-updates
Browse files Browse the repository at this point in the history
Fix types and remove dead code in actor classes
  • Loading branch information
naddeoa authored Mar 4, 2024
2 parents 8ac59e2 + 42f7b7c commit 127db5b
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,6 @@ def _start_poll_conn(self) -> None:
def run(self) -> None:
self._start_poll_conn()

def close_child(self) -> None:
"""
This method is no longer needed as queues do not require manual closing of file descriptors.
"""

def close(self) -> None:
"""
Closes the thread and all resources. This should be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ def close(self) -> None:

super().close()
self._wrapper.close()
if self._pipe_signaler is not None:
self._pipe_signaler.close()

def _signal(self, messages: Sequence[SyncMessage] = [], error: Optional[Exception] = None) -> None:
if self._pipe_signaler is None:
Expand Down Expand Up @@ -145,6 +147,9 @@ def start(self) -> None:
it's created, unlike the thread version which can just be automatically started
from within its init. There must be some post-init setup that needs to be done.
"""
if self._pipe_signaler is not None:
self._pipe_signaler.start()

self.daemon = True
super().start()
self.join(0.1) # This does apparently need to happen after several manual tests.
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,8 @@ def create_logger(self, dataset_id: str, options: LoggerOptions) -> ThreadRollin


class BaseProcessRollingLogger(
ProcessActor[Union[AdditionalMessages, BuiltinMessageTypes], Union[ProcessLoggerStatus, LoggerStatus, None]],
DataLogger[Union[ProcessLoggerStatus, LoggerStatus, None]],
ProcessActor[Union[AdditionalMessages, BuiltinMessageTypes], ProcessLoggerStatus],
DataLogger[ProcessLoggerStatus],
Generic[AdditionalMessages],
):
"""
Expand Down Expand Up @@ -268,9 +268,6 @@ def process_close_message(self, messages: List[CloseMessage]) -> None:
self._logger.info(f"Closing whylogs logger for {datasetId}")
logger.close()

if self._pipe_signaler is not None:
self._pipe_signaler.close_child()

def process_pubsub(self, messages: List[RawPubSubMessage]) -> None:
self._logger.info("Processing pubsub message")
msgs = [msg["log_request"] for msg in [it.to_pubsub_message() for it in messages] if msg is not None]
Expand Down Expand Up @@ -427,8 +424,8 @@ def log(
sync=sync,
)

result: Optional["Future[Union[ProcessLoggerStatus, LoggerStatus, None]]"] = (
cast("Future[Union[ProcessLoggerStatus, LoggerStatus, None]]", Future()) if sync else None
result: Optional["Future[ProcessLoggerStatus]"] = (
cast("Future[ProcessLoggerStatus]", Future()) if sync else None
)
if result is not None:
self._logger.debug(f"Registering result id {message.id} for synchronous logging")
Expand Down Expand Up @@ -458,18 +455,8 @@ def run(self) -> None:
self._logger.debug(f"Started process logger with pid {os.getpid()}")
super().run()

def start(self) -> None:
self._logger.debug(f"Starting process logger from pid {os.getpid()}")
# This is started in the parent process, not in the child process. It must be started
# before the process itself start right below.
if self._pipe_signaler is not None:
self._pipe_signaler.start()
super().start()

def close(self) -> None:
super().close()
if self._pipe_signaler is not None:
self._pipe_signaler.close()


class ProcessRollingLogger(BaseProcessRollingLogger[NoReturn]):
Expand Down
36 changes: 27 additions & 9 deletions python/whylogs/api/whylabs/session/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class SessionConfig:
def __init__(self, init_config: Optional[InitConfig] = None) -> None:
self._init_config = init_config or InitConfig()
self.logger = logging.getLogger("config")
self._ensure_config_exists = False if self._init_config.force_local is True else True
self.auth_path = (
Path(self._init_config.config_path) if self._init_config.config_path else self.get_config_file_path()
)
Expand Down Expand Up @@ -114,7 +115,11 @@ def _load_value(self, env_name: EnvVariableName, config_name: ConfigVariableName
return ConfigFile.get_variable_from_config_file(self._config_parser, config_name)

def _load_or_prompt(
self, env_name: EnvVariableName, config_name: ConfigVariableName, persist: bool = False, password: bool = True
self,
env_name: EnvVariableName,
config_name: ConfigVariableName,
persist: bool = False,
password: bool = True,
) -> Optional[str]:
"""
Loads a configuration value like _load_value does, but it will also prompt the user for the value if
Expand Down Expand Up @@ -194,7 +199,8 @@ def set_whylabs_refernce_profile_name(self, name: str) -> None:
def get_whylabs_endpoint(self) -> str:
return (
self._load_value(
env_name=EnvVariableName.WHYLABS_API_ENDPOINT, config_name=ConfigVariableName.WHYLABS_API_ENDPOINT
env_name=EnvVariableName.WHYLABS_API_ENDPOINT,
config_name=ConfigVariableName.WHYLABS_API_ENDPOINT,
)
or _DEFAULT_WHYLABS_HOST
)
Expand All @@ -207,16 +213,18 @@ def get_config_file_path(self) -> Path:
if config_dir_path is not None:
Path(config_dir_path).mkdir(parents=True, exist_ok=True)
else:
config_dir_path = user_config_dir(_CONFIG_APP_NAME, ensure_exists=True)
config_dir_path = user_config_dir(_CONFIG_APP_NAME, ensure_exists=self._ensure_config_exists)

config_file_path = os.path.join(config_dir_path, "config.ini")
path = Path(config_file_path)
path.touch(exist_ok=True)
if self._ensure_config_exists:
path.touch(exist_ok=True)
return path

def get_default_dataset_id(self) -> Optional[str]:
return self.tmp_default_dataset_id or self._load_value(
env_name=EnvVariableName.WHYLABS_DEFAULT_DATASET_ID, config_name=ConfigVariableName.DEFAULT_DATASET_ID
env_name=EnvVariableName.WHYLABS_DEFAULT_DATASET_ID,
config_name=ConfigVariableName.DEFAULT_DATASET_ID,
)

def require_default_dataset_id(self) -> str:
Expand All @@ -234,7 +242,10 @@ def get_org_id(self) -> Optional[str]:
except Exception:
pass

org_id = self._load_value(env_name=EnvVariableName.WHYLABS_ORG_ID, config_name=ConfigVariableName.ORG_ID)
org_id = self._load_value(
env_name=EnvVariableName.WHYLABS_ORG_ID,
config_name=ConfigVariableName.ORG_ID,
)

if org_id is not None:
return org_id
Expand All @@ -259,7 +270,8 @@ def set_org_id(self, org_id: str) -> None:

def get_api_key(self) -> Optional[str]:
return self.tmp_api_key or self._load_value(
env_name=EnvVariableName.WHYLABS_API_KEY, config_name=ConfigVariableName.API_KEY
env_name=EnvVariableName.WHYLABS_API_KEY,
config_name=ConfigVariableName.API_KEY,
)

def get_env_api_key(self) -> Optional[str]:
Expand All @@ -278,7 +290,10 @@ def set_user_guid(self, user_guid: str) -> None:
self._set_value(ConfigVariableName.USER_GUID, user_guid)

def get_session_id(self) -> Optional[str]:
return self._load_value(env_name=EnvVariableName.WHYLABS_SESSION_ID, config_name=ConfigVariableName.SESSION_ID)
return self._load_value(
env_name=EnvVariableName.WHYLABS_SESSION_ID,
config_name=ConfigVariableName.SESSION_ID,
)

def set_session_id(self, sessionId: str) -> None:
self._set_value(ConfigVariableName.SESSION_ID, sessionId)
Expand Down Expand Up @@ -324,7 +339,10 @@ def _notify_type_whylabs(self, api_key: str) -> None:

def _notify_type_anon(self) -> None:
anonymous_session_id = self.get_session_id()
il.success(f"Using session type: {SessionType.WHYLABS_ANONYMOUS.name}", ignore_suppress=True)
il.success(
f"Using session type: {SessionType.WHYLABS_ANONYMOUS.name}",
ignore_suppress=True,
)
id_text = "<will be generated before upload>" if not anonymous_session_id else anonymous_session_id
il.option(f"session id: {id_text}", ignore_suppress=True)

Expand Down

0 comments on commit 127db5b

Please sign in to comment.