From 856ef5074c60965c4dbbea11e61139397a1223d7 Mon Sep 17 00:00:00 2001 From: ThejasNU Date: Tue, 4 Feb 2025 14:19:19 +0530 Subject: [PATCH] add logs deletion based on time --- libs/agentc_cli/agentc_cli/cmds/clean.py | 168 +++++++++++++++-------- libs/agentc_cli/agentc_cli/cmds/util.py | 13 ++ libs/agentc_cli/agentc_cli/main.py | 12 +- 3 files changed, 131 insertions(+), 62 deletions(-) diff --git a/libs/agentc_cli/agentc_cli/cmds/clean.py b/libs/agentc_cli/agentc_cli/cmds/clean.py index 29b6383..bd85098 100644 --- a/libs/agentc_cli/agentc_cli/cmds/clean.py +++ b/libs/agentc_cli/agentc_cli/cmds/clean.py @@ -1,43 +1,66 @@ import click import couchbase.cluster +import dateparser import importlib.util import logging -import os import pathlib -import shutil import typing from ..models.context import Context -from agentc_core.defaults import DEFAULT_ACTIVITY_FOLDER +from .util import remove_directory +from agentc_core.analytics import Log +from agentc_core.defaults import DEFAULT_AUDIT_COLLECTION from agentc_core.defaults import DEFAULT_AUDIT_SCOPE from agentc_core.defaults import DEFAULT_CATALOG_COLLECTION_NAME -from agentc_core.defaults import DEFAULT_CATALOG_FOLDER from agentc_core.defaults import DEFAULT_CATALOG_SCOPE +from agentc_core.defaults import DEFAULT_LLM_ACTIVITY_NAME from agentc_core.defaults import DEFAULT_META_COLLECTION_NAME from agentc_core.util.query import execute_query +from pydantic import ValidationError from typing_extensions import Literal +from tzlocal import get_localzone logger = logging.getLogger(__name__) -def clean_local(ctx: Context | None, type_metadata: str): - if type_metadata == "catalog": - default_folder = [DEFAULT_CATALOG_FOLDER] - elif type_metadata == "activity": - default_folder = [DEFAULT_ACTIVITY_FOLDER] - else: - default_folder = [DEFAULT_ACTIVITY_FOLDER, DEFAULT_CATALOG_FOLDER] +def clean_local(ctx: Context | None, type_metadata: str, date: str = None): + clean_catalog = type_metadata == "catalog" or type_metadata == "all" + clean_activity = type_metadata == "activity" or type_metadata == "all" + + if clean_catalog: + remove_directory(ctx.catalog) + + if clean_activity: + if date is not None: + req_date = dateparser.parse(date) + local_tz = get_localzone() + req_date = req_date.replace(tzinfo=local_tz) + + if req_date is None: + raise ValueError(f"Invalid date provided: {date}") + + log_path = pathlib.Path(ctx.activity) / DEFAULT_LLM_ACTIVITY_NAME + try: + with log_path.open("r+") as fp: + # read all lines before editing the file + lines = fp.readlines() + # move file pointer to the beginning of a file + fp.seek(0) + for line in lines: + try: + cur_log = Log.model_validate_json(line.strip()) + cur_log_timestamp = dateparser.parse(cur_log.timestamp.isoformat()) + if cur_log_timestamp >= req_date: + fp.write(line) + except ValidationError as e: + logger.error(f"Invalid log entry: {e}") + # truncate the file + fp.truncate() + except FileNotFoundError: + raise ValueError("No log file found! Please run auditor!") from None - for folder in default_folder: - if not folder or not os.path.exists(folder): - continue - - folder_path = pathlib.Path(folder) - - if folder_path.is_file(): - os.remove(folder_path.absolute()) - elif folder_path.is_dir(): - shutil.rmtree(folder_path.absolute()) + else: + remove_directory(ctx.activity) def clean_db( @@ -47,60 +70,82 @@ def clean_db( catalog_ids: list[str], kind_list: list[typing.Literal["tool", "prompt"]], type_metadata: Literal["catalog", "activity", "all"], + date: str = None, ) -> int: all_errs = [] - clean_catalog = "catalog" in type_metadata or "all" in type_metadata - clean_activity = "activity" in type_metadata or "all" in type_metadata + clean_catalog = type_metadata == "catalog" or type_metadata == "all" + clean_activity = type_metadata == "activity" or type_metadata == "all" - for kind in kind_list: + if clean_catalog: if len(catalog_ids) > 0: - click.secho(f"Removing catalog(s): {[catalog for catalog in catalog_ids]}", fg="yellow") + for kind in kind_list: + click.secho(f"Removing catalog(s): {[catalog for catalog in catalog_ids]}", fg="yellow") + + catalog_condition = " AND ".join([f"catalog_identifier = '{catalog}'" for catalog in catalog_ids]) + meta_catalog_condition = " AND ".join([f"version.identifier = '{catalog}'" for catalog in catalog_ids]) + remove_catalogs_query = f""" + DELETE FROM + `{bucket}`.`{DEFAULT_CATALOG_SCOPE}`.{kind}{DEFAULT_CATALOG_COLLECTION_NAME} + WHERE + {catalog_condition}; + """ + remove_metadata_query = f""" + DELETE FROM + `{bucket}`.`{DEFAULT_CATALOG_SCOPE}`.{kind}{DEFAULT_META_COLLECTION_NAME} + WHERE + {meta_catalog_condition}; + """ + + res, err = execute_query(cluster, remove_catalogs_query) + for r in res.rows(): + logger.debug(r) + if err is not None: + all_errs.append(err) + + res, err = execute_query(cluster, remove_metadata_query) + for r in res.rows(): + logger.debug(r) + if err is not None: + all_errs.append(err) + + else: + drop_scope_query = f"DROP SCOPE `{bucket}`.`{DEFAULT_CATALOG_SCOPE}` IF EXISTS;" + res, err = execute_query(cluster, drop_scope_query) + for r in res.rows(): + logger.debug(r) + if err is not None: + all_errs.append(err) + + if clean_activity: + if date is not None: + req_date = dateparser.parse(date) + local_tz = get_localzone() + req_date = req_date.replace(tzinfo=local_tz) - catalog_condition = " AND ".join([f"catalog_identifier = '{catalog}'" for catalog in catalog_ids]) - meta_catalog_condition = " AND ".join([f"version.identifier = '{catalog}'" for catalog in catalog_ids]) remove_catalogs_query = f""" - DELETE FROM - `{bucket}`.`{DEFAULT_CATALOG_SCOPE}`.{kind}{DEFAULT_CATALOG_COLLECTION_NAME} - WHERE - {catalog_condition}; - """ - remove_metadata_query = f""" - DELETE FROM - `{bucket}`.`{DEFAULT_CATALOG_SCOPE}`.{kind}{DEFAULT_META_COLLECTION_NAME} - WHERE - {meta_catalog_condition}; - """ + DELETE FROM + `{bucket}`.`{DEFAULT_AUDIT_SCOPE}`.`{DEFAULT_AUDIT_COLLECTION}` + WHERE + STR_TO_MILLIS(timestamp) < STR_TO_MILLIS('{req_date.isoformat()}'); + """ res, err = execute_query(cluster, remove_catalogs_query) + for r in res.rows(): logger.debug(r) if err is not None: all_errs.append(err) - res, err = execute_query(cluster, remove_metadata_query) + else: + drop_scope_query = f"DROP SCOPE `{bucket}`.`{DEFAULT_AUDIT_SCOPE}` IF EXISTS;" + res, err = execute_query(cluster, drop_scope_query) for r in res.rows(): logger.debug(r) if err is not None: all_errs.append(err) - else: - if clean_catalog: - drop_scope_query = f"DROP SCOPE `{bucket}`.`{DEFAULT_CATALOG_SCOPE}` IF EXISTS;" - res, err = execute_query(cluster, drop_scope_query) - for r in res.rows(): - logger.debug(r) - if err is not None: - all_errs.append(err) - - if clean_activity: - drop_scope_query = f"DROP SCOPE `{bucket}`.`{DEFAULT_AUDIT_SCOPE}` IF EXISTS;" - res, err = execute_query(cluster, drop_scope_query) - for r in res.rows(): - logger.debug(r) - if err is not None: - all_errs.append(err) - if len(all_errs) > 0: - logger.error(all_errs) + if len(all_errs) > 0: + logger.error(all_errs) return len(all_errs) @@ -114,17 +159,18 @@ def cmd_clean( kind: list[typing.Literal["tool", "prompt"]], ctx: Context = None, type_metadata: Literal["catalog", "activity", "all"] = "all", + date: str = None, ): if is_local: - clean_local(ctx, type_metadata) - click.secho("Local catalog/metadata has been deleted!", fg="green") + clean_local(ctx, type_metadata, date) + click.secho("Local catalog/metadata/logs has been deleted!", fg="green") if is_db: - num_errs = clean_db(ctx, bucket, cluster, catalog_ids, kind, type_metadata) + num_errs = clean_db(ctx, bucket, cluster, catalog_ids, kind, type_metadata, date) if num_errs > 0: raise ValueError("Failed to cleanup db catalog/metadata!") else: - click.secho("Database catalog/metadata has been deleted!", fg="green") + click.secho("Database catalog/metadata/logs has been deleted!", fg="green") # Note: flask is an optional dependency. diff --git a/libs/agentc_cli/agentc_cli/cmds/util.py b/libs/agentc_cli/agentc_cli/cmds/util.py index 9405869..15477c6 100644 --- a/libs/agentc_cli/agentc_cli/cmds/util.py +++ b/libs/agentc_cli/agentc_cli/cmds/util.py @@ -7,6 +7,7 @@ import pathlib import pydantic import re +import shutil import typing from ..models.context import Context @@ -167,6 +168,18 @@ def logging_printer(content: str, *args, **kwargs): return catalog +def remove_directory(folder: str): + if not folder or not os.path.exists(folder): + return + + folder_path = pathlib.Path(folder) + + if folder_path.is_file(): + os.remove(folder_path.absolute()) + elif folder_path.is_dir(): + shutil.rmtree(folder_path.absolute()) + + # TODO: One use case is a user's repo (like agent-catalog-example) might # have multiple, independent subdirectories in it which should each # have its own, separate local catalog. We might consider using diff --git a/libs/agentc_cli/agentc_cli/main.py b/libs/agentc_cli/agentc_cli/main.py index 54b8d08..7b28a56 100644 --- a/libs/agentc_cli/agentc_cli/main.py +++ b/libs/agentc_cli/agentc_cli/main.py @@ -189,8 +189,16 @@ def add(ctx, output: pathlib.Path, record_kind: RecordKind): help="Kind of catalog to remove versions from.", show_default=True, ) +@click.option( + "-d", + "--date", + default=None, + type=str, + help="Date and Time before which the logs must be removed. Ex: 2021-09-01T00:00:00, 20th Jan 2024 8:00PM, 2 days ago", + show_default=False, +) @click.pass_context -def clean(ctx, catalog, type_metadata, bucket, catalog_id, skip_confirm, kind): +def clean(ctx, catalog, type_metadata, bucket, catalog_id, skip_confirm, kind, date): """Delete all or specific (catalog and/or activity) agent related files / collections.""" ctx_obj: Context = ctx.obj clean_db = "db" in catalog @@ -220,6 +228,7 @@ def clean(ctx, catalog, type_metadata, bucket, catalog_id, skip_confirm, kind): catalog_ids=None, kind=None, type_metadata=type_metadata, + date=date, ) if clean_db: @@ -267,6 +276,7 @@ def clean(ctx, catalog, type_metadata, bucket, catalog_id, skip_confirm, kind): cluster=cluster, catalog_ids=catalog_id, kind=kind_list, + date=date, ) cluster.close()