Skip to content

Commit

Permalink
Makes topic export async (#9599)
Browse files Browse the repository at this point in the history
* Asynchronosity

* Implements tests for async behaviours

* Fixes test issues

* Update async.dm
  • Loading branch information
covertcorvid committed Dec 11, 2023
1 parent 51eadee commit 4d1141c
Show file tree
Hide file tree
Showing 10 changed files with 118 additions and 12 deletions.
31 changes: 31 additions & 0 deletions code/__DEFINES/async.dm
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@

/// Declars that a function is async, creates the task return type and will cause it to return the task
/// upon sleeping.
/// Place this as the first line in the body of the function, but after any other set X = val settings.
#define DECLARE_ASYNC set waitfor = FALSE; \
RETURN_TYPE(/datum/task); \
var/datum/task/created_task = new(); \
. = created_task;

/// Marks an async function as finished without returning any value.
/// Async version of return;
#define ASYNC_FINISH created_task.mark_completed(); \
return;

/// Marks an async function as completed and returns a result.
/// Async version of return value;
#define ASYNC_RETURN(value) created_task.mark_completed(value);\
return;

/// Waits for the provided task to be completed, or the timeout to expire.
/// Returns null if the timeout expires, or the task's result otherwise.
/// Note that if a task's result is null, then null will be returned.
#define AWAIT(TASK, TIMEOUT) get_result(TASK, TIMEOUT)

/proc/get_result(datum/task/task, timeout)
if (!istype(task))
return task
if (task.await(timeout))
// Return the task result
return task.result
return null
4 changes: 3 additions & 1 deletion code/controllers/subsystem/ticker.dm
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,9 @@ SUBSYSTEM_DEF(ticker)
news_message = "The Sol sector has fallen into anarchistic piracy, as the Tortuga raiders used the chaos of a surprise attack by Syndicate forces to seize a large amount of territory unanswered." //NSV13 end

if(news_message)
SStopic.crosscomms_send("news_report", news_message, news_source)
if(!AWAIT(SStopic.crosscomms_send_async("news_report", news_message, news_source), 10 SECONDS))
message_admins("Failed to send news report through crosscomms. The sending task expired.")
log_game("Failed to send news report through crosscomms. The sending task expired.")

/datum/controller/subsystem/ticker/proc/GetTimeLeft()
if(isnull(SSticker.timeLeft))
Expand Down
15 changes: 10 additions & 5 deletions code/controllers/subsystem/topic.dm
Original file line number Diff line number Diff line change
Expand Up @@ -90,19 +90,20 @@ SUBSYSTEM_DEF(topic)
* Note that request will fail if a token cannot be found for the target server and anonymous is not set.
* * nocheck: TRUE or FALSE whether to check if the recieving server is authorized to get the topic call *(default: FALSE)*
*/
/datum/controller/subsystem/topic/proc/export(addr, query, list/params, anonymous = FALSE, nocheck = FALSE)
/datum/controller/subsystem/topic/proc/export_async(addr, query, list/params, anonymous = FALSE, nocheck = FALSE)
DECLARE_ASYNC
var/list/request = list()
request["query"] = query

if(anonymous)
var/datum/world_topic/topic = GLOB.topic_commands[query]
if((!istype(topic) || !topic.anonymous) && !nocheck)
return
ASYNC_RETURN(TRUE)
request["auth"] = "anonymous"
else
var/list/servers = CONFIG_GET(keyed_list/cross_server)
if(!servers[addr] || (!LAZYACCESS(GLOB.topic_servers[addr], query) && !nocheck))
return // Couldn't find an authorized key, or trying to send secure data to unsecure server
ASYNC_RETURN(TRUE) // Couldn't find an authorized key, or trying to send secure data to unsecure server
request["auth"] = servers[addr]

request.Add(params)
Expand All @@ -111,6 +112,7 @@ SUBSYSTEM_DEF(topic)
if(CONFIG_GET(flag/log_world_topic))
request["auth"] = "***[copytext(request["auth"], -4)]"
log_topic("outgoing: \"[json_encode(request)]\", response: \"[result]\", auth: [request["auth"]], to: [addr], anonymous: [anonymous]")
ASYNC_RETURN(TRUE)

/**
* Broadcast topic to all known authorized servers for things like comms consoles or ahelps.
Expand All @@ -122,7 +124,10 @@ SUBSYSTEM_DEF(topic)
* * msg: message text to send
* * sender: name of the sending entity (station name, ckey etc)
*/
/datum/controller/subsystem/topic/proc/crosscomms_send(query, msg, sender)
/datum/controller/subsystem/topic/proc/crosscomms_send_async(query, msg, sender)
RETURN_TYPE(/datum/task)
var/list/servers = CONFIG_GET(keyed_list/cross_server)
var/datum/task/parent_task = new()
for(var/I in servers)
export(I, query, list("message" = msg, "message_sender" = sender))
parent_task.add_subtask(export_async(I, query, list("message" = msg, "message_sender" = sender)))
return parent_task
8 changes: 4 additions & 4 deletions code/datums/callback.dm
Original file line number Diff line number Diff line change
Expand Up @@ -84,17 +84,17 @@
* * ... optional list of arguments to pass as arguments to the proc being called
*/
/world/proc/ImmediateInvokeAsync(thingtocall, proctocall, ...)
set waitfor = FALSE
DECLARE_ASYNC

if (!thingtocall)
return
ASYNC_FINISH

var/list/calling_arguments = length(args) > 2 ? args.Copy(3) : null

if (thingtocall == GLOBAL_PROC)
call(proctocall)(arglist(calling_arguments))
ASYNC_RETURN(call(proctocall)(arglist(calling_arguments)))
else
call(thingtocall, proctocall)(arglist(calling_arguments))
ASYNC_RETURN(call(thingtocall, proctocall)(arglist(calling_arguments)))

/**
* Invoke this callback
Expand Down
43 changes: 43 additions & 0 deletions code/datums/task.dm
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/datum/task
var/result = null
var/completed = FALSE
var/list/subtasks

/// Add a subtask to this subtask. When awaiting a parent task, it will wait for all subtasks to complete
/// and then will return a list containing all the results.
/datum/task/proc/add_subtask(datum/task/subtask)
LAZYADD(subtasks, subtask)

/// Mark the task as being completed
/datum/task/proc/mark_completed(result = null)
if (length(subtasks))
CRASH("Attempting to mark a subtask holder as completed. This is not allowed")
completed = TRUE
src.result = result

/// Wait for the task to be completed, or the timeout to expire
/// Returns true if the task was completed
/datum/task/proc/await(timeout = 30 SECONDS)
var/start_time = world.time
var/sleep_time = 1
while(world.time < start_time + timeout && !is_completed())
sleep(sleep_time)
sleep_time = min(sleep_time * 2, 1 SECONDS)
// Check for success
var/success = length(subtasks) ? TRUE : completed
if (length(subtasks) && !result)
result = list()
for (var/datum/task/subtask in subtasks)
if (!subtask.completed)
success = FALSE
if (subtask.result)
result += subtask.result
return success

/datum/task/proc/is_completed()
if (length(subtasks))
for (var/datum/task/subtask in subtasks)
if (!subtask.completed)
return FALSE
return TRUE
return completed
2 changes: 1 addition & 1 deletion code/game/machinery/computer/communications.dm
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@

playsound(src, 'sound/machines/terminal_prompt_confirm.ogg', 50, FALSE)

SStopic.crosscomms_send("comms_console", message, station_name())
SStopic.crosscomms_send_async("comms_console", message, station_name())
minor_announce(message, title = "Outgoing message to allied station", html_encode = FALSE)
usr.log_talk(message, LOG_SAY, tag="message to the other server")
message_admins("[ADMIN_LOOKUPFLW(usr)] has sent a message to the other server.")
Expand Down
2 changes: 1 addition & 1 deletion code/modules/admin/verbs/adminhelp.dm
Original file line number Diff line number Diff line change
Expand Up @@ -846,7 +846,7 @@ GLOBAL_DATUM_INIT(ahelp_tickets, /datum/admin_help_tickets, new)
else
final = "[msg] - All admins stealthed\[[english_list(stealthmins)]\], AFK\[[english_list(afkmins)]\], or lacks +BAN\[[english_list(powerlessmins)]\]! Total: [allmins.len] "
send2tgs(source,final)
SStopic.crosscomms_send("ahelp", final, source)
SStopic.crosscomms_send_async("ahelp", final, source)


/proc/send2tgs(msg,msg2)
Expand Down
1 change: 1 addition & 0 deletions code/modules/unit_tests/_unit_tests.dm
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@

#include "achievement_validation.dm"
#include "anchored_mobs.dm"
#include "async.dm"
#include "check_adjustable_clothing.dm"
#include "component_tests.dm"
#include "connect_loc.dm"
Expand Down
22 changes: 22 additions & 0 deletions code/modules/unit_tests/async.dm
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/datum/unit_test/test_async/Run()
var/datum/task/task_1 = INVOKE_ASYNC(src, PROC_REF(synchronously_sleep), 1)
var/datum/task/task_2 = INVOKE_ASYNC(src, PROC_REF(synchronously_sleep), 2)
var/datum/task/task_3 = INVOKE_ASYNC(src, PROC_REF(synchronously_sleep), 3)
var/datum/task/task_4 = INVOKE_ASYNC(src, PROC_REF(long_sleep), 4)
// Long enough for the tasks to complete async, but not long enough for them to complete synchronous
sleep(4)
TEST_ASSERT_EQUAL(1, task_1.result, "Task 1 should have completed with a result of 1")
TEST_ASSERT_EQUAL(2, task_2.result, "Task 2 should have completed with a result of 2")
TEST_ASSERT_EQUAL(3, task_3.result, "Task 3 should have completed with a result of 3")
TEST_ASSERT_EQUAL(FALSE, task_4.completed, "Task 4 should not have completed.")
// Test this task
TEST_ASSERT_EQUAL(5, AWAIT(INVOKE_ASYNC(src, PROC_REF(synchronously_sleep), 5), 3), "Awaiting a 2 ds task with a 4 ds timeout should yield the correct result.")
// Test passed, behaviour is as expected

/datum/unit_test/test_async/proc/synchronously_sleep(value)
sleep(2)
return value

/datum/unit_test/test_async/proc/long_sleep(value)
sleep(5)
return value
2 changes: 2 additions & 0 deletions nsv13.dme
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "code\__DEFINES\aquarium.dm"
#include "code\__DEFINES\areas.dm"
#include "code\__DEFINES\art.dm"
#include "code\__DEFINES\async.dm"
#include "code\__DEFINES\atmospherics.dm"
#include "code\__DEFINES\atom_hud.dm"
#include "code\__DEFINES\balloon_alert.dm"
Expand Down Expand Up @@ -466,6 +467,7 @@
#include "code\datums\soullink.dm"
#include "code\datums\soundtrack.dm"
#include "code\datums\spawners_menu.dm"
#include "code\datums\task.dm"
#include "code\datums\tgs_event_handler.dm"
#include "code\datums\view.dm"
#include "code\datums\weakrefs.dm"
Expand Down

0 comments on commit 4d1141c

Please sign in to comment.