From 1cfba0ec46113e190a0e2ff728f44eac293bd638 Mon Sep 17 00:00:00 2001 From: Williams Date: Wed, 6 Sep 2023 13:59:12 -0400 Subject: [PATCH] Adding more specific logging --- tr_sys/tr_ars/api.py | 3 +++ tr_sys/tr_ars/tasks.py | 13 ++++++++++--- tr_sys/tr_ars/utils.py | 12 +++++++----- 3 files changed, 20 insertions(+), 8 deletions(-) diff --git a/tr_sys/tr_ars/api.py b/tr_sys/tr_ars/api.py index fe795678..bffa5a8c 100644 --- a/tr_sys/tr_ars/api.py +++ b/tr_sys/tr_ars/api.py @@ -436,9 +436,12 @@ def message(req, key): mesg.data = data mesg.save() if agent_name.startswith('ara-'): + logging.debug("Starting merge for "+mesg.pk) new_merged = utils.merge_received(parent_pk,message_to_merge['message'],agent_name) + logging.debug("Merge complete for "+new_merged.pk) #the merged versions is what gets consumed. So, it's all we do post processing on? utils.post_process(new_merged.data,new_merged.id, agent_name) + logging.debug("Post processing complete for "+new_merged.pk) except Exception as e: logger.debug("Problem with merger or post processeing for %s " % key) diff --git a/tr_sys/tr_ars/tasks.py b/tr_sys/tr_ars/tasks.py index 79c8e339..72137790 100644 --- a/tr_sys/tr_ars/tasks.py +++ b/tr_sys/tr_ars/tasks.py @@ -126,8 +126,11 @@ def send_message(actor_dict, mesg_dict, timeout=300): mesg.save() logger.debug('+++ message saved: %s' % (mesg.pk)) if agent_name.startswith('ara-'): + logging.debug("Merge starting for "+mesg.pk) new_merged = utils.merge_received(parent_pk,message_to_merge['message'], agent_name) + logging.debug("Merge complete for "+new_merged.pk) utils.post_process(new_merged.data,new_merged.pk, agent_name) + logging.debug("Post processing done for "+new_merged.pk) except Exception as e: logger.debug('Problem with post processing or merger of %s for pk: %s' % (inforesid, mesg.pk)) @@ -173,16 +176,20 @@ def send_message(actor_dict, mesg_dict, timeout=300): @shared_task(name="catch_timeout") def catch_timeout_async(): now =timezone.now() + logging.info(f'Checking timeout at {now}') time_threshold = now - timezone.timedelta(minutes=10) max_time = time_threshold+timezone.timedelta(minutes=5) - messages = Message.objects.filter(timestamp__gt=time_threshold,timestamp__lt=max_time, status__in='R').values_list('actor','id') + messages = Message.objects.filter(timestamp__gt=time_threshold,timestamp__lt=max_time, status__in='R').values_list('actor','id','timestamp','updated_at') for mesg in messages: - actor = Agent.objects.get(pk=mesg[0]) + mpk=mesg[0] + actor = Agent.objects.get(pk=mpk) + logging.info(f'actor: {actor} id: {mesg[1]} timestamp: {mesg[2]} updated_at {mesg[3]}') + if actor.name == 'ars-default-agent': continue else: - logger.info(f'for actor: {actor.name}, the status is still "Running" after 5 min, setting code to 598') + logger.info(f'for actor: {actor.name}, and pk {str(mpk)} the status is still "Running" after 5 min, setting code to 598') message = Message.objects.get(pk=mesg[1]) message.code = 598 message.status = 'E' diff --git a/tr_sys/tr_ars/utils.py b/tr_sys/tr_ars/utils.py index c43abe0f..b90891c7 100644 --- a/tr_sys/tr_ars/utils.py +++ b/tr_sys/tr_ars/utils.py @@ -491,7 +491,7 @@ def sharedResultsJson(sharedResultsMap): def pre_merge_process(data,key, agent_name,inforesid): mesg=Message.objects.get(pk = key) - logging.info("Pre node norm") + logging.info("Pre node norm for"+str(key)) try: scrub_null_attributes(data) except Exception as e: @@ -499,13 +499,13 @@ def pre_merge_process(data,key, agent_name,inforesid): raise e try: normalize_nodes(data,agent_name,key) - logging.info("node norm success") + logging.info("node norm success for "+str(key)) except Exception as e: post_processing_error(mesg,data,"Error in ARS node normalization") logging.exception("Error in ARS node normaliztion") raise e - logging.info("Pre decoration") + logging.info("Pre decoration for "+str(key)) try: decorate_edges_with_infores(data,inforesid) logging.info("decoration success") @@ -513,6 +513,7 @@ def pre_merge_process(data,key, agent_name,inforesid): post_processing_error(mesg,data,"Error in ARS edge sources decoration\n"+e) logging.exception("Error in ARS edge source decoration") raise e + logging.info("Normalizing scores for "+str(key)) try: normalize_scores(mesg,data,key,agent_name) except Exception as e: @@ -524,7 +525,7 @@ def pre_merge_process(data,key, agent_name,inforesid): def post_process(data,key, agent_name): code =200 mesg=Message.objects.get(pk = key) - logging.info("Pre node annotation") + logging.info("Pre node annotation for "+str(key)) try: annotate_nodes(mesg,data,agent_name) logging.info("node annotation successful") @@ -533,7 +534,7 @@ def post_process(data,key, agent_name): logging.error("Error with node annotations for "+str(key)) logging.exception("problem with node annotation post process function") raise e - logging.info("pre appraiser") + logging.info("pre appraiser for "+str(key)) try: scrub_null_attributes(data) except Exception as e: @@ -1111,6 +1112,7 @@ def merge(pk,merged_pk): def merge_received(parent_pk,message_to_merge, agent_name, counter=0): parent = Message.objects.get(pk=parent_pk) current_merged_pk=parent.merged_version_id + logging.info("Beginning merge for "+str(current_merged_pk)) #to_merge_message= Message.objects.get(pk=pk_to_merge) #to_merge_message_dict=get_safe(to_merge_message.to_dict(),"fields","data","message") t_to_merge_message=TranslatorMessage(message_to_merge)