Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding more specific logging #497

Merged
merged 1 commit into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions tr_sys/tr_ars/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,9 +436,12 @@ def message(req, key):
mesg.data = data
mesg.save()
if agent_name.startswith('ara-'):
logging.debug("Starting merge for "+mesg.pk)
new_merged = utils.merge_received(parent_pk,message_to_merge['message'],agent_name)
logging.debug("Merge complete for "+new_merged.pk)
#the merged versions is what gets consumed. So, it's all we do post processing on?
utils.post_process(new_merged.data,new_merged.id, agent_name)
logging.debug("Post processing complete for "+new_merged.pk)

except Exception as e:
logger.debug("Problem with merger or post processeing for %s " % key)
Expand Down
13 changes: 10 additions & 3 deletions tr_sys/tr_ars/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,11 @@ def send_message(actor_dict, mesg_dict, timeout=300):
mesg.save()
logger.debug('+++ message saved: %s' % (mesg.pk))
if agent_name.startswith('ara-'):
logging.debug("Merge starting for "+mesg.pk)
new_merged = utils.merge_received(parent_pk,message_to_merge['message'], agent_name)
logging.debug("Merge complete for "+new_merged.pk)
utils.post_process(new_merged.data,new_merged.pk, agent_name)
logging.debug("Post processing done for "+new_merged.pk)

except Exception as e:
logger.debug('Problem with post processing or merger of %s for pk: %s' % (inforesid, mesg.pk))
Expand Down Expand Up @@ -173,16 +176,20 @@ def send_message(actor_dict, mesg_dict, timeout=300):
@shared_task(name="catch_timeout")
def catch_timeout_async():
now =timezone.now()
logging.info(f'Checking timeout at {now}')
time_threshold = now - timezone.timedelta(minutes=10)
max_time = time_threshold+timezone.timedelta(minutes=5)

messages = Message.objects.filter(timestamp__gt=time_threshold,timestamp__lt=max_time, status__in='R').values_list('actor','id')
messages = Message.objects.filter(timestamp__gt=time_threshold,timestamp__lt=max_time, status__in='R').values_list('actor','id','timestamp','updated_at')
for mesg in messages:
actor = Agent.objects.get(pk=mesg[0])
mpk=mesg[0]
actor = Agent.objects.get(pk=mpk)
logging.info(f'actor: {actor} id: {mesg[1]} timestamp: {mesg[2]} updated_at {mesg[3]}')

if actor.name == 'ars-default-agent':
continue
else:
logger.info(f'for actor: {actor.name}, the status is still "Running" after 5 min, setting code to 598')
logger.info(f'for actor: {actor.name}, and pk {str(mpk)} the status is still "Running" after 5 min, setting code to 598')
message = Message.objects.get(pk=mesg[1])
message.code = 598
message.status = 'E'
Expand Down
12 changes: 7 additions & 5 deletions tr_sys/tr_ars/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -491,28 +491,29 @@ def sharedResultsJson(sharedResultsMap):

def pre_merge_process(data,key, agent_name,inforesid):
mesg=Message.objects.get(pk = key)
logging.info("Pre node norm")
logging.info("Pre node norm for"+str(key))
try:
scrub_null_attributes(data)
except Exception as e:
logging.exception("Error in the scrubbing of null attributes")
raise e
try:
normalize_nodes(data,agent_name,key)
logging.info("node norm success")
logging.info("node norm success for "+str(key))
except Exception as e:
post_processing_error(mesg,data,"Error in ARS node normalization")
logging.exception("Error in ARS node normaliztion")
raise e

logging.info("Pre decoration")
logging.info("Pre decoration for "+str(key))
try:
decorate_edges_with_infores(data,inforesid)
logging.info("decoration success")
except Exception as e:
post_processing_error(mesg,data,"Error in ARS edge sources decoration\n"+e)
logging.exception("Error in ARS edge source decoration")
raise e
logging.info("Normalizing scores for "+str(key))
try:
normalize_scores(mesg,data,key,agent_name)
except Exception as e:
Expand All @@ -524,7 +525,7 @@ def pre_merge_process(data,key, agent_name,inforesid):
def post_process(data,key, agent_name):
code =200
mesg=Message.objects.get(pk = key)
logging.info("Pre node annotation")
logging.info("Pre node annotation for "+str(key))
try:
annotate_nodes(mesg,data,agent_name)
logging.info("node annotation successful")
Expand All @@ -533,7 +534,7 @@ def post_process(data,key, agent_name):
logging.error("Error with node annotations for "+str(key))
logging.exception("problem with node annotation post process function")
raise e
logging.info("pre appraiser")
logging.info("pre appraiser for "+str(key))
try:
scrub_null_attributes(data)
except Exception as e:
Expand Down Expand Up @@ -1111,6 +1112,7 @@ def merge(pk,merged_pk):
def merge_received(parent_pk,message_to_merge, agent_name, counter=0):
parent = Message.objects.get(pk=parent_pk)
current_merged_pk=parent.merged_version_id
logging.info("Beginning merge for "+str(current_merged_pk))
#to_merge_message= Message.objects.get(pk=pk_to_merge)
#to_merge_message_dict=get_safe(to_merge_message.to_dict(),"fields","data","message")
t_to_merge_message=TranslatorMessage(message_to_merge)
Expand Down