Skip to content

Commit

Permalink
add logs deletion based on time
Browse files Browse the repository at this point in the history
  • Loading branch information
ThejasNU committed Feb 4, 2025
1 parent 1b61def commit 856ef50
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 62 deletions.
168 changes: 107 additions & 61 deletions libs/agentc_cli/agentc_cli/cmds/clean.py
Original file line number Diff line number Diff line change
@@ -1,43 +1,66 @@
import click
import couchbase.cluster
import dateparser
import importlib.util
import logging
import os
import pathlib
import shutil
import typing

from ..models.context import Context
from agentc_core.defaults import DEFAULT_ACTIVITY_FOLDER
from .util import remove_directory
from agentc_core.analytics import Log
from agentc_core.defaults import DEFAULT_AUDIT_COLLECTION
from agentc_core.defaults import DEFAULT_AUDIT_SCOPE
from agentc_core.defaults import DEFAULT_CATALOG_COLLECTION_NAME
from agentc_core.defaults import DEFAULT_CATALOG_FOLDER
from agentc_core.defaults import DEFAULT_CATALOG_SCOPE
from agentc_core.defaults import DEFAULT_LLM_ACTIVITY_NAME
from agentc_core.defaults import DEFAULT_META_COLLECTION_NAME
from agentc_core.util.query import execute_query
from pydantic import ValidationError
from typing_extensions import Literal
from tzlocal import get_localzone

logger = logging.getLogger(__name__)


def clean_local(ctx: Context | None, type_metadata: str):
if type_metadata == "catalog":
default_folder = [DEFAULT_CATALOG_FOLDER]
elif type_metadata == "activity":
default_folder = [DEFAULT_ACTIVITY_FOLDER]
else:
default_folder = [DEFAULT_ACTIVITY_FOLDER, DEFAULT_CATALOG_FOLDER]
def clean_local(ctx: Context | None, type_metadata: str, date: str = None):
clean_catalog = type_metadata == "catalog" or type_metadata == "all"
clean_activity = type_metadata == "activity" or type_metadata == "all"

if clean_catalog:
remove_directory(ctx.catalog)

if clean_activity:
if date is not None:
req_date = dateparser.parse(date)
local_tz = get_localzone()
req_date = req_date.replace(tzinfo=local_tz)

if req_date is None:
raise ValueError(f"Invalid date provided: {date}")

log_path = pathlib.Path(ctx.activity) / DEFAULT_LLM_ACTIVITY_NAME
try:
with log_path.open("r+") as fp:
# read all lines before editing the file
lines = fp.readlines()
# move file pointer to the beginning of a file
fp.seek(0)
for line in lines:
try:
cur_log = Log.model_validate_json(line.strip())
cur_log_timestamp = dateparser.parse(cur_log.timestamp.isoformat())
if cur_log_timestamp >= req_date:
fp.write(line)
except ValidationError as e:
logger.error(f"Invalid log entry: {e}")
# truncate the file
fp.truncate()
except FileNotFoundError:
raise ValueError("No log file found! Please run auditor!") from None

for folder in default_folder:
if not folder or not os.path.exists(folder):
continue

folder_path = pathlib.Path(folder)

if folder_path.is_file():
os.remove(folder_path.absolute())
elif folder_path.is_dir():
shutil.rmtree(folder_path.absolute())
else:
remove_directory(ctx.activity)


def clean_db(
Expand All @@ -47,60 +70,82 @@ def clean_db(
catalog_ids: list[str],
kind_list: list[typing.Literal["tool", "prompt"]],
type_metadata: Literal["catalog", "activity", "all"],
date: str = None,
) -> int:
all_errs = []
clean_catalog = "catalog" in type_metadata or "all" in type_metadata
clean_activity = "activity" in type_metadata or "all" in type_metadata
clean_catalog = type_metadata == "catalog" or type_metadata == "all"
clean_activity = type_metadata == "activity" or type_metadata == "all"

for kind in kind_list:
if clean_catalog:
if len(catalog_ids) > 0:
click.secho(f"Removing catalog(s): {[catalog for catalog in catalog_ids]}", fg="yellow")
for kind in kind_list:
click.secho(f"Removing catalog(s): {[catalog for catalog in catalog_ids]}", fg="yellow")

catalog_condition = " AND ".join([f"catalog_identifier = '{catalog}'" for catalog in catalog_ids])
meta_catalog_condition = " AND ".join([f"version.identifier = '{catalog}'" for catalog in catalog_ids])
remove_catalogs_query = f"""
DELETE FROM
`{bucket}`.`{DEFAULT_CATALOG_SCOPE}`.{kind}{DEFAULT_CATALOG_COLLECTION_NAME}
WHERE
{catalog_condition};
"""
remove_metadata_query = f"""
DELETE FROM
`{bucket}`.`{DEFAULT_CATALOG_SCOPE}`.{kind}{DEFAULT_META_COLLECTION_NAME}
WHERE
{meta_catalog_condition};
"""

res, err = execute_query(cluster, remove_catalogs_query)
for r in res.rows():
logger.debug(r)
if err is not None:
all_errs.append(err)

res, err = execute_query(cluster, remove_metadata_query)
for r in res.rows():
logger.debug(r)
if err is not None:
all_errs.append(err)

else:
drop_scope_query = f"DROP SCOPE `{bucket}`.`{DEFAULT_CATALOG_SCOPE}` IF EXISTS;"
res, err = execute_query(cluster, drop_scope_query)
for r in res.rows():
logger.debug(r)
if err is not None:
all_errs.append(err)

if clean_activity:
if date is not None:
req_date = dateparser.parse(date)
local_tz = get_localzone()
req_date = req_date.replace(tzinfo=local_tz)

catalog_condition = " AND ".join([f"catalog_identifier = '{catalog}'" for catalog in catalog_ids])
meta_catalog_condition = " AND ".join([f"version.identifier = '{catalog}'" for catalog in catalog_ids])
remove_catalogs_query = f"""
DELETE FROM
`{bucket}`.`{DEFAULT_CATALOG_SCOPE}`.{kind}{DEFAULT_CATALOG_COLLECTION_NAME}
WHERE
{catalog_condition};
"""
remove_metadata_query = f"""
DELETE FROM
`{bucket}`.`{DEFAULT_CATALOG_SCOPE}`.{kind}{DEFAULT_META_COLLECTION_NAME}
WHERE
{meta_catalog_condition};
"""
DELETE FROM
`{bucket}`.`{DEFAULT_AUDIT_SCOPE}`.`{DEFAULT_AUDIT_COLLECTION}`
WHERE
STR_TO_MILLIS(timestamp) < STR_TO_MILLIS('{req_date.isoformat()}');
"""

res, err = execute_query(cluster, remove_catalogs_query)

for r in res.rows():
logger.debug(r)
if err is not None:
all_errs.append(err)

res, err = execute_query(cluster, remove_metadata_query)
else:
drop_scope_query = f"DROP SCOPE `{bucket}`.`{DEFAULT_AUDIT_SCOPE}` IF EXISTS;"
res, err = execute_query(cluster, drop_scope_query)
for r in res.rows():
logger.debug(r)
if err is not None:
all_errs.append(err)
else:
if clean_catalog:
drop_scope_query = f"DROP SCOPE `{bucket}`.`{DEFAULT_CATALOG_SCOPE}` IF EXISTS;"
res, err = execute_query(cluster, drop_scope_query)
for r in res.rows():
logger.debug(r)
if err is not None:
all_errs.append(err)

if clean_activity:
drop_scope_query = f"DROP SCOPE `{bucket}`.`{DEFAULT_AUDIT_SCOPE}` IF EXISTS;"
res, err = execute_query(cluster, drop_scope_query)
for r in res.rows():
logger.debug(r)
if err is not None:
all_errs.append(err)

if len(all_errs) > 0:
logger.error(all_errs)
if len(all_errs) > 0:
logger.error(all_errs)

return len(all_errs)

Expand All @@ -114,17 +159,18 @@ def cmd_clean(
kind: list[typing.Literal["tool", "prompt"]],
ctx: Context = None,
type_metadata: Literal["catalog", "activity", "all"] = "all",
date: str = None,
):
if is_local:
clean_local(ctx, type_metadata)
click.secho("Local catalog/metadata has been deleted!", fg="green")
clean_local(ctx, type_metadata, date)
click.secho("Local catalog/metadata/logs has been deleted!", fg="green")

if is_db:
num_errs = clean_db(ctx, bucket, cluster, catalog_ids, kind, type_metadata)
num_errs = clean_db(ctx, bucket, cluster, catalog_ids, kind, type_metadata, date)
if num_errs > 0:
raise ValueError("Failed to cleanup db catalog/metadata!")
else:
click.secho("Database catalog/metadata has been deleted!", fg="green")
click.secho("Database catalog/metadata/logs has been deleted!", fg="green")


# Note: flask is an optional dependency.
Expand Down
13 changes: 13 additions & 0 deletions libs/agentc_cli/agentc_cli/cmds/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import pathlib
import pydantic
import re
import shutil
import typing

from ..models.context import Context
Expand Down Expand Up @@ -167,6 +168,18 @@ def logging_printer(content: str, *args, **kwargs):
return catalog


def remove_directory(folder: str):
if not folder or not os.path.exists(folder):
return

folder_path = pathlib.Path(folder)

if folder_path.is_file():
os.remove(folder_path.absolute())
elif folder_path.is_dir():
shutil.rmtree(folder_path.absolute())


# TODO: One use case is a user's repo (like agent-catalog-example) might
# have multiple, independent subdirectories in it which should each
# have its own, separate local catalog. We might consider using
Expand Down
12 changes: 11 additions & 1 deletion libs/agentc_cli/agentc_cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,16 @@ def add(ctx, output: pathlib.Path, record_kind: RecordKind):
help="Kind of catalog to remove versions from.",
show_default=True,
)
@click.option(
"-d",
"--date",
default=None,
type=str,
help="Date and Time before which the logs must be removed. Ex: 2021-09-01T00:00:00, 20th Jan 2024 8:00PM, 2 days ago",
show_default=False,
)
@click.pass_context
def clean(ctx, catalog, type_metadata, bucket, catalog_id, skip_confirm, kind):
def clean(ctx, catalog, type_metadata, bucket, catalog_id, skip_confirm, kind, date):
"""Delete all or specific (catalog and/or activity) agent related files / collections."""
ctx_obj: Context = ctx.obj
clean_db = "db" in catalog
Expand Down Expand Up @@ -220,6 +228,7 @@ def clean(ctx, catalog, type_metadata, bucket, catalog_id, skip_confirm, kind):
catalog_ids=None,
kind=None,
type_metadata=type_metadata,
date=date,
)

if clean_db:
Expand Down Expand Up @@ -267,6 +276,7 @@ def clean(ctx, catalog, type_metadata, bucket, catalog_id, skip_confirm, kind):
cluster=cluster,
catalog_ids=catalog_id,
kind=kind_list,
date=date,
)
cluster.close()

Expand Down

0 comments on commit 856ef50

Please sign in to comment.