Skip to content

Commit

Permalink
added some logging information
Browse files Browse the repository at this point in the history
  • Loading branch information
abdollahis2 authored and abdollahis2 committed Nov 8, 2023
1 parent be8e14b commit 7661f7b
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 17 deletions.
5 changes: 3 additions & 2 deletions tr_sys/tr_ars/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,17 +432,18 @@ def message(req, key):
#message_to_merge =utils.get_safe(data,"message")
message_to_merge = data
agent_name = str(mesg.actor.agent.name)
logger.info("Running pre_merge_process for agent %s with %s" % (agent_name, len(res)))
utils.pre_merge_process(message_to_merge,key, agent_name, inforesid)
if mesg.data and 'results' in mesg.data and mesg.data['results'] != None and len(mesg.data['results']) > 0:
mesg = Message.create(name=mesg.name, status=status, actor=mesg.actor, ref=mesg)
mesg.status = status
mesg.code = code
mesg.data = data
mesg.save()
logging.info("pre async call")
logger.info("pre async call for agent %s" % agent_name)
if agent_name.startswith('ara-'):
utils.merge_and_post_process.apply_async((parent_pk,message_to_merge['message'],agent_name))
logging.info("post async call")
logger.info("post async call for agent %s" % agent_name)


# create child message if this one already has results
Expand Down
9 changes: 5 additions & 4 deletions tr_sys/tr_ars/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def send_message(actor_dict, mesg_dict, timeout=300):
status_code = 422

else:
logger.debug("Not async? "+query_endpoint)
logger.debug("Not async for agent: %s and endpoint: %s? " % (inforesid,query_endpoint))
status = 'D'
status_code = 200
results = utils.get_safe(rdata,"message","results")
Expand All @@ -116,6 +116,7 @@ def send_message(actor_dict, mesg_dict, timeout=300):
message_to_merge=rdata
agent_name = str(mesg.actor.agent.name)
child_pk=str(mesg.pk)
logger.info("Running pre_merge_process for agent %s with %s" % (agent_name, len(results)))
utils.pre_merge_process(message_to_merge,child_pk, agent_name, inforesid)
#Whether we did any additional processing or not, we need to save what we have
mesg.code = status_code
Expand Down Expand Up @@ -160,16 +161,16 @@ def send_message(actor_dict, mesg_dict, timeout=300):
logger.debug('+++ message saved: %s' % (mesg.pk))

agent_name = str(mesg.actor.agent.name)
if mesg.code == 200:
logger.info("pre async call")
if mesg.code == 200 and results is not None and len(results)>0:
logger.info("pre async call for agent %s" % agent_name)
if agent_name.startswith('ara-'):
# logging.debug("Merge starting for "+str(mesg.pk))
# new_merged = utils.merge_received(parent_pk,message_to_merge['message'], agent_name)
# logging.debug("Merge complete for "+str(new_merged.pk))
# utils.post_process(new_merged.data,new_merged.pk, agent_name)
# logging.debug("Post processing done for "+str(new_merged.pk))
utils.merge_and_post_process.apply_async((parent_pk,message_to_merge['message'],agent_name))
logger.info("post async call")
logger.info("post async call for agent %s" % agent_name)
else:
logging.debug("Skipping merge and post for "+str(mesg.pk)+
" because the contributing message is in state: "+str(mesg.code))
Expand Down
27 changes: 16 additions & 11 deletions tr_sys/tr_ars/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,12 +288,15 @@ def mergeMessagesRecursive(mergedMessage,messageList,pk):
results = mergedMessage.getResults()
if results is not None:
logging.info(f'Averaing normalized scores for {pk}')
results = results.getRaw()
for result in results:
if "normalized_score" in result.keys():
ns = result["normalized_score"]
if isinstance(ns,list) and len(ns)>0:
result["normalized_score"]= sum(ns) / len(ns)
try:
results = results.getRaw()
for result in results:
if "normalized_score" in result.keys():
ns = result["normalized_score"]
if isinstance(ns,list) and len(ns)>0:
result["normalized_score"]= sum(ns) / len(ns)
except Exception as e:
logging.debug(e.__traceback__)
except Exception as e:
logging.debug(e.__traceback__)

Expand Down Expand Up @@ -499,7 +502,7 @@ def sharedResultsJson(sharedResultsMap):

def pre_merge_process(data,key, agent_name,inforesid):
mesg=Message.objects.get(pk = key)
logging.info("Pre node norm for"+str(key))
logging.info("Pre node norm for "+str(key))
try:
scrub_null_attributes(data)
except Exception as e:
Expand Down Expand Up @@ -533,10 +536,10 @@ def pre_merge_process(data,key, agent_name,inforesid):
def post_process(data,key, agent_name):
code =200
mesg=Message.objects.get(pk = key)
logging.info("Pre node annotation for "+str(key))
logging.info("Pre node annotation for agent %s pk: %s" % (agent_name, str(key)))
try:
annotate_nodes(mesg,data,agent_name)
logging.info("node annotation successful")
logging.info("node annotation successful for agent %s" % agent_name)
except Exception as e:
post_processing_error(mesg,data,"Error in annotation of nodes")
logging.error("Error with node annotations for "+str(key))
Expand Down Expand Up @@ -596,7 +599,7 @@ def merge_and_post_process(parent_pk,message_to_merge, agent_name):
merged = merge_received(parent_pk,message_to_merge, agent_name)
post_process(merged.data,merged.id, agent_name)
except Exception as e:
logging.debug("Problem with merger or post processeing for %s " % str(parent_pk))
logging.debug("Problem with merger or post processing for agent %s pk: %s " % (agent_name, (parent_pk)))
logging.exception("error in merger or post processing")
merged.status='E'
merged.code = 422
Expand Down Expand Up @@ -681,6 +684,7 @@ def annotate_nodes(mesg,data,agent_name):

json_data = json.dumps(nodes_message)
try:
logging.info('posting data to the annotator URL %s' % ANNOTATOR_URL)
r = requests.post(ANNOTATOR_URL,data=json_data,headers=headers)
rj=r.json()
logging.info('the response status for agent %s node annotator is: %s' % (agent_name,r.status_code))
Expand Down Expand Up @@ -1136,7 +1140,7 @@ def merge(pk,merged_pk):
def merge_received(parent_pk,message_to_merge, agent_name, counter=0):
parent = Message.objects.get(pk=parent_pk)
current_merged_pk=parent.merged_version_id
logging.info("Beginning merge for "+str(current_merged_pk))
logging.info("Beginning merge for agent %s with pk: %s" %(agent_name,str(current_merged_pk)))
#to_merge_message= Message.objects.get(pk=pk_to_merge)
#to_merge_message_dict=get_safe(to_merge_message.to_dict(),"fields","data","message")
t_to_merge_message=TranslatorMessage(message_to_merge)
Expand Down Expand Up @@ -1165,6 +1169,7 @@ def merge_received(parent_pk,message_to_merge, agent_name, counter=0):
print()
#If not, we make the newcomer the current "merged" Message
else:
logging.info("first merge done on agent: %s" % agent_name)
merged = t_to_merge_message


Expand Down

0 comments on commit 7661f7b

Please sign in to comment.