Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ThejasNU/logs deletion based on date time mentioned #70

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 128 additions & 61 deletions libs/agentc_cli/agentc_cli/cmds/clean.py
Original file line number Diff line number Diff line change
@@ -1,43 +1,83 @@
import click
import couchbase.cluster
import dateparser
import importlib.util
import json
import logging
import os
import pathlib
import shutil
import typing

from ..models.context import Context
from agentc_core.defaults import DEFAULT_ACTIVITY_FOLDER
from .util import remove_directory
from agentc_core.defaults import DEFAULT_AUDIT_COLLECTION
from agentc_core.defaults import DEFAULT_AUDIT_SCOPE
from agentc_core.defaults import DEFAULT_CATALOG_COLLECTION_NAME
from agentc_core.defaults import DEFAULT_CATALOG_FOLDER
from agentc_core.defaults import DEFAULT_CATALOG_SCOPE
from agentc_core.defaults import DEFAULT_LLM_ACTIVITY_NAME
from agentc_core.defaults import DEFAULT_META_COLLECTION_NAME
from agentc_core.util.query import execute_query
from json import JSONDecodeError
from typing_extensions import Literal
from tzlocal import get_localzone

logger = logging.getLogger(__name__)


def clean_local(ctx: Context | None, type_metadata: str):
if type_metadata == "catalog":
default_folder = [DEFAULT_CATALOG_FOLDER]
elif type_metadata == "activity":
default_folder = [DEFAULT_ACTIVITY_FOLDER]
else:
default_folder = [DEFAULT_ACTIVITY_FOLDER, DEFAULT_CATALOG_FOLDER]
def clean_local(ctx: Context | None, type_metadata: str, date: str = None):
clean_catalog = type_metadata == "catalog" or type_metadata == "all"
clean_activity = type_metadata == "activity" or type_metadata == "all"

if clean_catalog:
remove_directory(ctx.catalog)

if clean_activity:
if date is not None:
req_date = dateparser.parse(date)
if req_date is None:
raise ValueError(f"Invalid datetime provided: {date}")

if req_date.tzinfo is None:
local_tz = get_localzone()
req_date = req_date.replace(tzinfo=local_tz)

if req_date is None:
raise ValueError(f"Invalid date provided: {date}")

log_path = pathlib.Path(ctx.activity) / DEFAULT_LLM_ACTIVITY_NAME
try:
with log_path.open("r+") as fp:
# move file pointer to the beginning of a file
fp.seek(0)
pos = 0
while True:
line = fp.readline()
if not line:
break
try:
cur_log_timestamp = dateparser.parse(json.loads(line.strip())["timestamp"])
if cur_log_timestamp >= req_date:
break
except (JSONDecodeError, KeyError) as e:
logger.error(f"Invalid log entry: {e}")
pos = fp.tell()

# no log found before the date, might be present in old log files which are compressed
if pos == 0:
raise NotImplementedError("No log entries found before the given date in the current log!")

# seek to the last log before the mentioned date once again to be on safer side
fp.seek(pos)
# move file pointer to the beginning of a file and write remaining lines
remaining_lines = fp.readlines()
fp.seek(0)
fp.writelines(remaining_lines)
# truncate the file
fp.truncate()
ThejasNU marked this conversation as resolved.
Show resolved Hide resolved
except FileNotFoundError:
raise ValueError("No log file found! Please run auditor!") from None

for folder in default_folder:
if not folder or not os.path.exists(folder):
continue

folder_path = pathlib.Path(folder)

if folder_path.is_file():
os.remove(folder_path.absolute())
elif folder_path.is_dir():
shutil.rmtree(folder_path.absolute())
else:
remove_directory(ctx.activity)


def clean_db(
Expand All @@ -47,60 +87,86 @@ def clean_db(
catalog_ids: list[str],
kind_list: list[typing.Literal["tool", "prompt"]],
type_metadata: Literal["catalog", "activity", "all"],
date: str = None,
) -> int:
all_errs = []
clean_catalog = "catalog" in type_metadata or "all" in type_metadata
clean_activity = "activity" in type_metadata or "all" in type_metadata
clean_catalog = type_metadata == "catalog" or type_metadata == "all"
clean_activity = type_metadata == "activity" or type_metadata == "all"

for kind in kind_list:
if clean_catalog:
if len(catalog_ids) > 0:
click.secho(f"Removing catalog(s): {[catalog for catalog in catalog_ids]}", fg="yellow")
for kind in kind_list:
click.secho(f"Removing catalog(s): {[catalog for catalog in catalog_ids]}", fg="yellow")

catalog_condition = " AND ".join([f"catalog_identifier = '{catalog}'" for catalog in catalog_ids])
meta_catalog_condition = " AND ".join([f"version.identifier = '{catalog}'" for catalog in catalog_ids])
remove_catalogs_query = f"""
DELETE FROM
`{bucket}`.`{DEFAULT_CATALOG_SCOPE}`.{kind}{DEFAULT_CATALOG_COLLECTION_NAME}
WHERE
{catalog_condition};
"""
remove_metadata_query = f"""
DELETE FROM
`{bucket}`.`{DEFAULT_CATALOG_SCOPE}`.{kind}{DEFAULT_META_COLLECTION_NAME}
WHERE
{meta_catalog_condition};
"""

res, err = execute_query(cluster, remove_catalogs_query)
for r in res.rows():
logger.debug(r)
if err is not None:
all_errs.append(err)

res, err = execute_query(cluster, remove_metadata_query)
for r in res.rows():
logger.debug(r)
if err is not None:
all_errs.append(err)

else:
drop_scope_query = f"DROP SCOPE `{bucket}`.`{DEFAULT_CATALOG_SCOPE}` IF EXISTS;"
res, err = execute_query(cluster, drop_scope_query)
for r in res.rows():
logger.debug(r)
if err is not None:
all_errs.append(err)

if clean_activity:
if date is not None:
req_date = dateparser.parse(date)
if req_date is None:
raise ValueError(f"Invalid datetime provided: {date}")

if req_date.tzinfo is None:
local_tz = get_localzone()
req_date = req_date.replace(tzinfo=local_tz)

catalog_condition = " AND ".join([f"catalog_identifier = '{catalog}'" for catalog in catalog_ids])
meta_catalog_condition = " AND ".join([f"version.identifier = '{catalog}'" for catalog in catalog_ids])
remove_catalogs_query = f"""
DELETE FROM
`{bucket}`.`{DEFAULT_CATALOG_SCOPE}`.{kind}{DEFAULT_CATALOG_COLLECTION_NAME}
WHERE
{catalog_condition};
"""
remove_metadata_query = f"""
DELETE FROM
`{bucket}`.`{DEFAULT_CATALOG_SCOPE}`.{kind}{DEFAULT_META_COLLECTION_NAME}
WHERE
{meta_catalog_condition};
"""
DELETE FROM
`{bucket}`.`{DEFAULT_AUDIT_SCOPE}`.`{DEFAULT_AUDIT_COLLECTION}`
WHERE
STR_TO_MILLIS(timestamp) < STR_TO_MILLIS('{req_date.isoformat()}');
"""

res, err = execute_query(cluster, remove_catalogs_query)

for r in res.rows():
logger.debug(r)
if err is not None:
all_errs.append(err)

res, err = execute_query(cluster, remove_metadata_query)
else:
drop_scope_query = f"DROP SCOPE `{bucket}`.`{DEFAULT_AUDIT_SCOPE}` IF EXISTS;"
res, err = execute_query(cluster, drop_scope_query)
for r in res.rows():
logger.debug(r)
if err is not None:
all_errs.append(err)
else:
if clean_catalog:
drop_scope_query = f"DROP SCOPE `{bucket}`.`{DEFAULT_CATALOG_SCOPE}` IF EXISTS;"
res, err = execute_query(cluster, drop_scope_query)
for r in res.rows():
logger.debug(r)
if err is not None:
all_errs.append(err)

if clean_activity:
drop_scope_query = f"DROP SCOPE `{bucket}`.`{DEFAULT_AUDIT_SCOPE}` IF EXISTS;"
res, err = execute_query(cluster, drop_scope_query)
for r in res.rows():
logger.debug(r)
if err is not None:
all_errs.append(err)

if len(all_errs) > 0:
logger.error(all_errs)
if len(all_errs) > 0:
logger.error(all_errs)

return len(all_errs)

Expand All @@ -114,17 +180,18 @@ def cmd_clean(
kind: list[typing.Literal["tool", "prompt"]],
ctx: Context = None,
type_metadata: Literal["catalog", "activity", "all"] = "all",
date: str = None,
):
if is_local:
clean_local(ctx, type_metadata)
click.secho("Local catalog/metadata has been deleted!", fg="green")
clean_local(ctx, type_metadata, date)
click.secho("Local catalog/metadata/logs has been deleted!", fg="green")

if is_db:
num_errs = clean_db(ctx, bucket, cluster, catalog_ids, kind, type_metadata)
num_errs = clean_db(ctx, bucket, cluster, catalog_ids, kind, type_metadata, date)
if num_errs > 0:
raise ValueError("Failed to cleanup db catalog/metadata!")
else:
click.secho("Database catalog/metadata has been deleted!", fg="green")
click.secho("Database catalog/metadata/logs has been deleted!", fg="green")


# Note: flask is an optional dependency.
Expand Down
13 changes: 13 additions & 0 deletions libs/agentc_cli/agentc_cli/cmds/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import pathlib
import pydantic
import re
import shutil
import typing

from ..models.context import Context
Expand Down Expand Up @@ -167,6 +168,18 @@ def logging_printer(content: str, *args, **kwargs):
return catalog


def remove_directory(folder: str):
if not folder or not os.path.exists(folder):
return

folder_path = pathlib.Path(folder)

if folder_path.is_file():
os.remove(folder_path.absolute())
elif folder_path.is_dir():
shutil.rmtree(folder_path.absolute())


# TODO: One use case is a user's repo (like agent-catalog-example) might
# have multiple, independent subdirectories in it which should each
# have its own, separate local catalog. We might consider using
Expand Down
17 changes: 16 additions & 1 deletion libs/agentc_cli/agentc_cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,16 @@ def add(ctx, output: pathlib.Path, record_kind: RecordKind):
help="Kind of catalog to remove versions from.",
show_default=True,
)
@click.option(
"-d",
"--date",
default=None,
type=str,
help="Datetime of the oldest log entry to keep (older log entries will be deleted).\nEx: 2021-09-01T00:00:00, 20th Jan 2024 8:00PM, 2 days ago",
show_default=False,
)
@click.pass_context
def clean(ctx, catalog, type_metadata, bucket, catalog_id, skip_confirm, kind):
def clean(ctx, catalog, type_metadata, bucket, catalog_id, skip_confirm, kind, date):
"""Delete all or specific (catalog and/or activity) agent related files / collections."""
ctx_obj: Context = ctx.obj
clean_db = "db" in catalog
Expand All @@ -205,6 +213,11 @@ def clean(ctx, catalog, type_metadata, bucket, catalog_id, skip_confirm, kind):
)
return

if date is not None and (type_metadata == "all" or type_metadata == "catalog"):
raise ValueError(
"Datetime can only be specified for deletion of activity logs, not catalog entries.\nExecute 'agentc clean [[local|db]] activity --date DATETIME' separately to delete logs based on time."
)

# Similar to the rm command, we will prompt the user for each catalog to delete.
if clean_local:
if not skip_confirm:
Expand All @@ -220,6 +233,7 @@ def clean(ctx, catalog, type_metadata, bucket, catalog_id, skip_confirm, kind):
catalog_ids=None,
kind=None,
type_metadata=type_metadata,
date=date,
)

if clean_db:
Expand Down Expand Up @@ -267,6 +281,7 @@ def clean(ctx, catalog, type_metadata, bucket, catalog_id, skip_confirm, kind):
cluster=cluster,
catalog_ids=catalog_id,
kind=kind_list,
date=date,
)
cluster.close()

Expand Down
1 change: 1 addition & 0 deletions libs/agentc_cli/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ agentc = "agentc_cli.main:main"
python = "^3.12"
click = "^8.1.7"
tqdm = "^4.66.5"
dateparser = "^1.2.0"

[tool.poetry.dependencies.agentc-core]
path = "../agentc_core"
Expand Down