Skip to content

Commit

Permalink
Merge pull request #516 from NCATSTranslator/makePostProcessingAsyncMDW
Browse files Browse the repository at this point in the history
initial work
  • Loading branch information
MarkDWilliams authored Oct 16, 2023
2 parents 4c6431e + ebe3f6a commit 374c68a
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 49 deletions.
44 changes: 17 additions & 27 deletions tr_sys/tr_ars/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -428,33 +428,23 @@ def message(req, key):
scorestat = utils.ScoreStatCalc(res)
mesg.result_stat = scorestat
#before we do basically anything else, we normalize
try:
parent_pk = mesg.ref_id
#message_to_merge =utils.get_safe(data,"message")
message_to_merge = data
agent_name = str(mesg.actor.agent.name)
utils.pre_merge_process(message_to_merge,key, agent_name, inforesid)
if mesg.data and 'results' in mesg.data and mesg.data['results'] != None and len(mesg.data['results']) > 0:
mesg = Message.create(name=mesg.name, status=status, actor=mesg.actor, ref=mesg)
mesg.status = status
mesg.code = code
mesg.data = data
mesg.save()
if agent_name.startswith('ara-'):
logging.debug("Starting merge for "+str(mesg.pk))
new_merged = utils.merge_received(parent_pk,message_to_merge['message'],agent_name)
logging.debug("Merge complete for "+str(new_merged.pk))
#the merged versions is what gets consumed. So, it's all we do post processing on?
utils.post_process(new_merged.data,new_merged.id, agent_name)
logging.debug("Post processing complete for "+str(new_merged.pk))

except Exception as e:
logger.debug("Problem with merger or post processeing for %s " % key)
logger.exception("error in merger or post processin")
new_merged.status='E'
new_merged.code = 422
new_merged.save()
else:
parent_pk = mesg.ref_id
#message_to_merge =utils.get_safe(data,"message")
message_to_merge = data
agent_name = str(mesg.actor.agent.name)
utils.pre_merge_process(message_to_merge,key, agent_name, inforesid)
if mesg.data and 'results' in mesg.data and mesg.data['results'] != None and len(mesg.data['results']) > 0:
mesg = Message.create(name=mesg.name, status=status, actor=mesg.actor, ref=mesg)
mesg.status = status
mesg.code = code
mesg.data = data
mesg.save()
logging.info("pre async call")
if agent_name.startswith('ara-'):
utils.merge_and_post_process.apply_async((parent_pk,message_to_merge['message'],agent_name))
logging.info("post async call")


# create child message if this one already has results
if mesg.data and 'results' in mesg.data and mesg.data['results'] != None and len(mesg.data['results']) > 0:
mesg = Message.create(name=mesg.name, status=status, actor=mesg.actor, ref=mesg)
Expand Down
34 changes: 16 additions & 18 deletions tr_sys/tr_ars/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,24 +158,22 @@ def send_message(actor_dict, mesg_dict, timeout=300):
mesg.url = url
mesg.save()
logger.debug('+++ message saved: %s' % (mesg.pk))
try:
agent_name = str(mesg.actor.agent.name)
if mesg.code == 200:
if agent_name.startswith('ara-'):
logging.debug("Merge starting for "+str(mesg.pk))
new_merged = utils.merge_received(parent_pk,message_to_merge['message'], agent_name)
logging.debug("Merge complete for "+str(new_merged.pk))
utils.post_process(new_merged.data,new_merged.pk, agent_name)
logging.debug("Post processing done for "+str(new_merged.pk))
else:
logging.debug("Skipping merge and post for "+str(mesg.pk)+
" because the contributing message is in state: "+str(mesg.code))
#This exception relates to the merged version, not the original. Never the two may interfere
except Exception as e:
logger.debug('Problem with post processing or merger of %s for pk: %s' % (inforesid, mesg.pk))
new_merged.status='E'
new_merged.code = 422
new_merged.save()

agent_name = str(mesg.actor.agent.name)
if mesg.code == 200:
logger.info("pre async call")
if agent_name.startswith('ara-'):
# logging.debug("Merge starting for "+str(mesg.pk))
# new_merged = utils.merge_received(parent_pk,message_to_merge['message'], agent_name)
# logging.debug("Merge complete for "+str(new_merged.pk))
# utils.post_process(new_merged.data,new_merged.pk, agent_name)
# logging.debug("Post processing done for "+str(new_merged.pk))
utils.merge_and_post_process.apply_async((parent_pk,message_to_merge['message'],agent_name))
logger.info("post async call")
else:
logging.debug("Skipping merge and post for "+str(mesg.pk)+
" because the contributing message is in state: "+str(mesg.code))


@shared_task(name="catch_timeout")
def catch_timeout_async():
Expand Down
19 changes: 15 additions & 4 deletions tr_sys/tr_ars/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,20 @@ def post_process(data,key, agent_name):
mesg.data = data
mesg.save()

@shared_task(name="merge_and_post_process")
def merge_and_post_process(parent_pk,message_to_merge, agent_name):
#logging.debug("Starting merge for "+str(mesg.pk))
try:
merged = merge_received(parent_pk,message_to_merge, agent_name)
post_process(merged.data,merged.id, agent_name)
except Exception as e:
logging.debug("Problem with merger or post processeing for %s " % str(parent_pk))
logging.exception("error in merger or post processing")
merged.status='E'
merged.code = 422
merged.save()


def scrub_null_attributes(data):
nodes = get_safe(data,"message","knowledge_graph","nodes")
edges = get_safe(data,"message","knowledge_graph","edges")
Expand Down Expand Up @@ -682,10 +696,7 @@ def annotate_nodes(mesg,data,agent_name):
else:
post_processing_error(mesg,data,"Error in annotation of nodes")
except Exception as e:
logging.exception("error in node annotation internal function for agent: %s" % agent_name)
logging.error("Unexpected error 3: {}".format(traceback.format_exception(type(e), e, e.__traceback__)))
logging.error(type(e).__name__)
logging.error(e.args)
logging.exception("error in node annotation internal function")
raise e
#else:
# with open(str(mesg.actor)+".json", "w") as outfile:
Expand Down

0 comments on commit 374c68a

Please sign in to comment.