Skip to content

Commit

Permalink
Adding more specific logging
Browse files Browse the repository at this point in the history
  • Loading branch information
MarkDWilliams committed Sep 6, 2023
1 parent 0e761ec commit 1cfba0e
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 8 deletions.
3 changes: 3 additions & 0 deletions tr_sys/tr_ars/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,9 +436,12 @@ def message(req, key):
mesg.data = data
mesg.save()
if agent_name.startswith('ara-'):
logging.debug("Starting merge for "+mesg.pk)
new_merged = utils.merge_received(parent_pk,message_to_merge['message'],agent_name)
logging.debug("Merge complete for "+new_merged.pk)
#the merged versions is what gets consumed. So, it's all we do post processing on?
utils.post_process(new_merged.data,new_merged.id, agent_name)
logging.debug("Post processing complete for "+new_merged.pk)

except Exception as e:
logger.debug("Problem with merger or post processeing for %s " % key)
Expand Down
13 changes: 10 additions & 3 deletions tr_sys/tr_ars/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,11 @@ def send_message(actor_dict, mesg_dict, timeout=300):
mesg.save()
logger.debug('+++ message saved: %s' % (mesg.pk))
if agent_name.startswith('ara-'):
logging.debug("Merge starting for "+mesg.pk)
new_merged = utils.merge_received(parent_pk,message_to_merge['message'], agent_name)
logging.debug("Merge complete for "+new_merged.pk)
utils.post_process(new_merged.data,new_merged.pk, agent_name)
logging.debug("Post processing done for "+new_merged.pk)

except Exception as e:
logger.debug('Problem with post processing or merger of %s for pk: %s' % (inforesid, mesg.pk))
Expand Down Expand Up @@ -173,16 +176,20 @@ def send_message(actor_dict, mesg_dict, timeout=300):
@shared_task(name="catch_timeout")
def catch_timeout_async():
now =timezone.now()
logging.info(f'Checking timeout at {now}')
time_threshold = now - timezone.timedelta(minutes=10)
max_time = time_threshold+timezone.timedelta(minutes=5)

messages = Message.objects.filter(timestamp__gt=time_threshold,timestamp__lt=max_time, status__in='R').values_list('actor','id')
messages = Message.objects.filter(timestamp__gt=time_threshold,timestamp__lt=max_time, status__in='R').values_list('actor','id','timestamp','updated_at')
for mesg in messages:
actor = Agent.objects.get(pk=mesg[0])
mpk=mesg[0]
actor = Agent.objects.get(pk=mpk)
logging.info(f'actor: {actor} id: {mesg[1]} timestamp: {mesg[2]} updated_at {mesg[3]}')

if actor.name == 'ars-default-agent':
continue
else:
logger.info(f'for actor: {actor.name}, the status is still "Running" after 5 min, setting code to 598')
logger.info(f'for actor: {actor.name}, and pk {str(mpk)} the status is still "Running" after 5 min, setting code to 598')
message = Message.objects.get(pk=mesg[1])
message.code = 598
message.status = 'E'
Expand Down
12 changes: 7 additions & 5 deletions tr_sys/tr_ars/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -491,28 +491,29 @@ def sharedResultsJson(sharedResultsMap):

def pre_merge_process(data,key, agent_name,inforesid):
mesg=Message.objects.get(pk = key)
logging.info("Pre node norm")
logging.info("Pre node norm for"+str(key))
try:
scrub_null_attributes(data)
except Exception as e:
logging.exception("Error in the scrubbing of null attributes")
raise e
try:
normalize_nodes(data,agent_name,key)
logging.info("node norm success")
logging.info("node norm success for "+str(key))
except Exception as e:
post_processing_error(mesg,data,"Error in ARS node normalization")
logging.exception("Error in ARS node normaliztion")
raise e

logging.info("Pre decoration")
logging.info("Pre decoration for "+str(key))
try:
decorate_edges_with_infores(data,inforesid)
logging.info("decoration success")
except Exception as e:
post_processing_error(mesg,data,"Error in ARS edge sources decoration\n"+e)
logging.exception("Error in ARS edge source decoration")
raise e
logging.info("Normalizing scores for "+str(key))
try:
normalize_scores(mesg,data,key,agent_name)
except Exception as e:
Expand All @@ -524,7 +525,7 @@ def pre_merge_process(data,key, agent_name,inforesid):
def post_process(data,key, agent_name):
code =200
mesg=Message.objects.get(pk = key)
logging.info("Pre node annotation")
logging.info("Pre node annotation for "+str(key))
try:
annotate_nodes(mesg,data,agent_name)
logging.info("node annotation successful")
Expand All @@ -533,7 +534,7 @@ def post_process(data,key, agent_name):
logging.error("Error with node annotations for "+str(key))
logging.exception("problem with node annotation post process function")
raise e
logging.info("pre appraiser")
logging.info("pre appraiser for "+str(key))
try:
scrub_null_attributes(data)
except Exception as e:
Expand Down Expand Up @@ -1111,6 +1112,7 @@ def merge(pk,merged_pk):
def merge_received(parent_pk,message_to_merge, agent_name, counter=0):
parent = Message.objects.get(pk=parent_pk)
current_merged_pk=parent.merged_version_id
logging.info("Beginning merge for "+str(current_merged_pk))
#to_merge_message= Message.objects.get(pk=pk_to_merge)
#to_merge_message_dict=get_safe(to_merge_message.to_dict(),"fields","data","message")
t_to_merge_message=TranslatorMessage(message_to_merge)
Expand Down

0 comments on commit 1cfba0e

Please sign in to comment.