diff --git a/core/io/resource.cpp b/core/io/resource.cpp index 83d9770c88..36d627d20b 100644 --- a/core/io/resource.cpp +++ b/core/io/resource.cpp @@ -42,12 +42,12 @@ #include void Resource::emit_changed() { - if (ResourceLoader::is_within_load() && MessageQueue::get_main_singleton() != MessageQueue::get_singleton() && !MessageQueue::get_singleton()->is_flushing()) { - // Let the connection happen on the call queue, later, since signals are not thread-safe. - call_deferred("emit_signal", CoreStringName(changed)); - } else { - emit_signal(CoreStringName(changed)); + if (ResourceLoader::is_within_load() && !Thread::is_main_thread()) { + ResourceLoader::resource_changed_emit(this); + return; } + + emit_signal(CoreStringName(changed)); } void Resource::_resource_path_changed() { @@ -168,22 +168,22 @@ bool Resource::editor_can_reload_from_file() { } void Resource::connect_changed(const Callable &p_callable, uint32_t p_flags) { - if (ResourceLoader::is_within_load() && MessageQueue::get_main_singleton() != MessageQueue::get_singleton() && !MessageQueue::get_singleton()->is_flushing()) { - // Let the check and connection happen on the call queue, later, since signals are not thread-safe. - callable_mp(this, &Resource::connect_changed).call_deferred(p_callable, p_flags); + if (ResourceLoader::is_within_load() && !Thread::is_main_thread()) { + ResourceLoader::resource_changed_connect(this, p_callable, p_flags); return; } + if (!is_connected(CoreStringName(changed), p_callable) || p_flags & CONNECT_REFERENCE_COUNTED) { connect(CoreStringName(changed), p_callable, p_flags); } } void Resource::disconnect_changed(const Callable &p_callable) { - if (ResourceLoader::is_within_load() && MessageQueue::get_main_singleton() != MessageQueue::get_singleton() && !MessageQueue::get_singleton()->is_flushing()) { - // Let the check and disconnection happen on the call queue, later, since signals are not thread-safe. - callable_mp(this, &Resource::disconnect_changed).call_deferred(p_callable); + if (ResourceLoader::is_within_load() && !Thread::is_main_thread()) { + ResourceLoader::resource_changed_disconnect(this, p_callable); return; } + if (is_connected(CoreStringName(changed), p_callable)) { disconnect(CoreStringName(changed), p_callable); } diff --git a/core/io/resource_loader.cpp b/core/io/resource_loader.cpp index 502ffc96c9..e8b4ad179c 100644 --- a/core/io/resource_loader.cpp +++ b/core/io/resource_loader.cpp @@ -33,6 +33,7 @@ #include "resource_loader.h" #include "core/config/project_settings.h" +#include "core/core_bind.h" #include "core/io/file_access.h" #include "core/io/resource_importer.h" #include "core/object/script_language.h" @@ -209,34 +210,58 @@ void ResourceFormatLoader::_bind_methods() { /////////////////////////////////// +// These are used before and after a wait for a WorkerThreadPool task +// because that can lead to another load started in the same thread, +// something we must treat as a different stack for the purposes +// of tracking nesting. + +#define PREPARE_FOR_WTP_WAIT \ + int load_nesting_backup = ResourceLoader::load_nesting; \ + Vector load_paths_stack_backup = ResourceLoader::load_paths_stack; \ + ResourceLoader::load_nesting = 0; \ + ResourceLoader::load_paths_stack.clear(); + +#define RESTORE_AFTER_WTP_WAIT \ + DEV_ASSERT(ResourceLoader::load_nesting == 0); \ + DEV_ASSERT(ResourceLoader::load_paths_stack.is_empty()); \ + ResourceLoader::load_nesting = load_nesting_backup; \ + ResourceLoader::load_paths_stack = load_paths_stack_backup; \ + load_paths_stack_backup.clear(); + // This should be robust enough to be called redundantly without issues. void ResourceLoader::LoadToken::clear() { thread_load_mutex.lock(); WorkerThreadPool::TaskID task_to_await = 0; - if (!local_path.is_empty()) { // Empty is used for the special case where the load task is not registered. - DEV_ASSERT(thread_load_tasks.has(local_path)); - ThreadLoadTask &load_task = thread_load_tasks[local_path]; - if (!load_task.awaited) { - task_to_await = load_task.task_id; - load_task.awaited = true; - } - thread_load_tasks.erase(local_path); - local_path.clear(); - } + // User-facing tokens shouldn't be deleted until completely claimed. + DEV_ASSERT(user_rc == 0 && user_path.is_empty()); - if (!user_path.is_empty()) { - DEV_ASSERT(user_load_tokens.has(user_path)); - user_load_tokens.erase(user_path); - user_path.clear(); + if (!local_path.is_empty()) { + if (task_if_unregistered) { + memdelete(task_if_unregistered); + task_if_unregistered = nullptr; + } else { + DEV_ASSERT(thread_load_tasks.has(local_path)); + ThreadLoadTask &load_task = thread_load_tasks[local_path]; + if (load_task.task_id && !load_task.awaited) { + task_to_await = load_task.task_id; + } + // Removing a task which is still in progress would be catastrophic. + // Tokens must be alive until the task thread function is done. + DEV_ASSERT(load_task.status == THREAD_LOAD_FAILED || load_task.status == THREAD_LOAD_LOADED); + thread_load_tasks.erase(local_path); + } + local_path.clear(); // Mark as already cleared. } thread_load_mutex.unlock(); // If task is unused, await it here, locally, now the token data is consistent. if (task_to_await) { + PREPARE_FOR_WTP_WAIT WorkerThreadPool::get_singleton()->wait_for_task_completion(task_to_await); + RESTORE_AFTER_WTP_WAIT } } @@ -247,9 +272,9 @@ ResourceLoader::LoadToken::~LoadToken() { Ref ResourceLoader::_load(const String &p_path, const String &p_original_path, const String &p_type_hint, ResourceFormatLoader::CacheMode p_cache_mode, Error *r_error, bool p_use_sub_threads, float *r_progress) { const String &original_path = p_original_path.is_empty() ? p_path : p_original_path; load_nesting++; - if (load_paths_stack->size()) { + if (load_paths_stack.size()) { thread_load_mutex.lock(); - const String &parent_task_path = load_paths_stack->get(load_paths_stack->size() - 1); + const String &parent_task_path = load_paths_stack.get(load_paths_stack.size() - 1); HashMap::Iterator E = thread_load_tasks.find(parent_task_path); // Avoid double-tracking, for progress reporting, resources that boil down to a remapped path containing the real payload (e.g., imported resources). bool is_remapped_load = original_path == parent_task_path; @@ -258,23 +283,23 @@ Ref ResourceLoader::_load(const String &p_path, const String &p_origin } thread_load_mutex.unlock(); } - load_paths_stack->push_back(original_path); + load_paths_stack.push_back(original_path); // Try all loaders and pick the first match for the type hint - bool found = false; + bool loader_found = false; Ref res; for (int i = 0; i < loader_count; i++) { if (!loader[i]->recognize_path(p_path, p_type_hint)) { continue; } - found = true; + loader_found = true; res = loader[i]->load(p_path, original_path, r_error, p_use_sub_threads, r_progress, p_cache_mode); if (!res.is_null()) { break; } } - load_paths_stack->resize(load_paths_stack->size() - 1); + load_paths_stack.resize(load_paths_stack.size() - 1); res_ref_overrides.erase(load_nesting); load_nesting--; @@ -282,22 +307,32 @@ Ref ResourceLoader::_load(const String &p_path, const String &p_origin return res; } - ERR_FAIL_COND_V_MSG(found, Ref(), - vformat("Failed loading resource: %s. Make sure resources have been imported by opening the project in the editor at least once.", p_path)); + if (!loader_found) { + if (r_error) { + *r_error = ERR_FILE_UNRECOGNIZED; + } + ERR_FAIL_V_MSG(Ref(), vformat("No loader found for resource: %s (expected type: %s)", p_path, p_type_hint)); + } #ifdef TOOLS_ENABLED Ref file_check = FileAccess::create(FileAccess::ACCESS_RESOURCES); - ERR_FAIL_COND_V_MSG(!file_check->file_exists(p_path), Ref(), vformat("Resource file not found: %s (expected type: %s)", p_path, p_type_hint)); + if (!file_check->file_exists(p_path)) { + if (r_error) { + *r_error = ERR_FILE_NOT_FOUND; + } + ERR_FAIL_V_MSG(Ref(), vformat("Resource file not found: %s (expected type: %s)", p_path, p_type_hint)); + } #endif - ERR_FAIL_V_MSG(Ref(), vformat("No loader found for resource: %s (expected type: %s)", p_path, p_type_hint)); + ERR_FAIL_V_MSG(Ref(), vformat("Failed loading resource: %s. Make sure resources have been imported by opening the project in the editor at least once.", p_path)); } -void ResourceLoader::_thread_load_function(void *p_userdata) { +// This implementation must allow re-entrancy for a task that started awaiting in a deeper stack frame. +// The load task token must be manually re-referenced before this is called, which includes threaded runs. +void ResourceLoader::_run_load_task(void *p_userdata) { ThreadLoadTask &load_task = *(ThreadLoadTask *)p_userdata; thread_load_mutex.lock(); - caller_task_id = load_task.task_id; if (cleaning_tasks) { load_task.status = THREAD_LOAD_FAILED; thread_load_mutex.unlock(); @@ -305,11 +340,13 @@ void ResourceLoader::_thread_load_function(void *p_userdata) { } thread_load_mutex.unlock(); + ThreadLoadTask *curr_load_task_backup = curr_load_task; + curr_load_task = &load_task; + // Thread-safe either if it's the current thread or a brand new one. CallQueue *own_mq_override = nullptr; if (load_nesting == 0) { - load_paths_stack = memnew(Vector); - + DEV_ASSERT(load_paths_stack.is_empty()); if (!Thread::is_main_thread()) { // Let the caller thread use its own, for added flexibility. Provide one otherwise. if (MessageQueue::get_singleton() == MessageQueue::get_main_singleton()) { @@ -321,12 +358,21 @@ void ResourceLoader::_thread_load_function(void *p_userdata) { } // -- + bool xl_remapped = false; + const String &remapped_path = _path_remap(load_task.local_path, &xl_remapped); + + print_verbose("Loading resource: " + remapped_path); + Error load_err = OK; - Ref res = _load(load_task.remapped_path, load_task.remapped_path != load_task.local_path ? load_task.local_path : String(), load_task.type_hint, load_task.cache_mode, &load_err, load_task.use_sub_threads, &load_task.progress); + Ref res = _load(remapped_path, remapped_path != load_task.local_path ? load_task.local_path : String(), load_task.type_hint, load_task.cache_mode, &load_err, load_task.use_sub_threads, &load_task.progress); if (MessageQueue::get_singleton() != MessageQueue::get_main_singleton()) { MessageQueue::get_singleton()->flush(); } + if (res.is_null()) { + print_verbose("Failed loading resource: " + remapped_path); + } + thread_load_mutex.lock(); load_task.resource = res; @@ -339,11 +385,10 @@ void ResourceLoader::_thread_load_function(void *p_userdata) { load_task.status = THREAD_LOAD_LOADED; } - if (load_task.cond_var) { + if (load_task.cond_var && load_task.need_wait) { load_task.cond_var->notify_all(); - memdelete(load_task.cond_var); - load_task.cond_var = nullptr; } + load_task.need_wait = false; bool ignoring = load_task.cache_mode == ResourceFormatLoader::CACHE_MODE_IGNORE || load_task.cache_mode == ResourceFormatLoader::CACHE_MODE_IGNORE_DEEP; bool replacing = load_task.cache_mode == ResourceFormatLoader::CACHE_MODE_REPLACE || load_task.cache_mode == ResourceFormatLoader::CACHE_MODE_REPLACE_DEEP; @@ -356,27 +401,40 @@ void ResourceLoader::_thread_load_function(void *p_userdata) { unlock_pending = false; if (!ignoring) { - if (replacing) { - Ref old_res = ResourceCache::get_ref(load_task.local_path); - if (old_res.is_valid() && old_res != load_task.resource) { - // If resource is already loaded, only replace its data, to avoid existing invalidating instances. - old_res->copy_from(load_task.resource); + ResourceCache::lock.lock(); // Check and operations must happen atomically. + bool pending_unlock = true; + Ref old_res = ResourceCache::get_ref(load_task.local_path); + if (old_res.is_valid()) { + if (old_res != load_task.resource) { + // Resource can already exists at this point for two reasons: + // a) The load uses replace mode. + // b) There were more than one load in flight for the same path because of deadlock prevention. + // Either case, we want to keep the resource that was already there. + ResourceCache::lock.unlock(); + pending_unlock = false; + if (replacing) { + old_res->copy_from(load_task.resource); + } load_task.resource = old_res; } + } else { + load_task.resource->set_path(load_task.local_path); + } + if (pending_unlock) { + ResourceCache::lock.unlock(); } - load_task.resource->set_path(load_task.local_path, replacing); } else { load_task.resource->set_path_cache(load_task.local_path); } - if (load_task.xl_remapped) { + if (xl_remapped) { load_task.resource->set_as_translation_remapped(true); } #ifdef TOOLS_ENABLED load_task.resource->set_edited(false); if (timestamp_on_load) { - uint64_t mt = FileAccess::get_modified_time(load_task.remapped_path); + uint64_t mt = FileAccess::get_modified_time(remapped_path); //printf("mt %s: %lli\n",remapped_path.utf8().get_data(),mt); load_task.resource->set_last_modified_time(mt); } @@ -401,6 +459,9 @@ void ResourceLoader::_thread_load_function(void *p_userdata) { } } + // It's safe now to let the task go in case no one else was grabbing the token. + load_task.load_token->unreference(); + if (unlock_pending) { thread_load_mutex.unlock(); } @@ -410,11 +471,10 @@ void ResourceLoader::_thread_load_function(void *p_userdata) { MessageQueue::set_thread_singleton_override(nullptr); memdelete(own_mq_override); } - if (load_paths_stack) { - memdelete(load_paths_stack); - load_paths_stack = nullptr; - } + DEV_ASSERT(load_paths_stack.is_empty()); } + + curr_load_task = curr_load_task_backup; } static String _validate_local_path(const String &p_path) { @@ -429,36 +489,44 @@ static String _validate_local_path(const String &p_path) { } Error ResourceLoader::load_threaded_request(const String &p_path, const String &p_type_hint, bool p_use_sub_threads, ResourceFormatLoader::CacheMode p_cache_mode) { - thread_load_mutex.lock(); - if (user_load_tokens.has(p_path)) { - print_verbose("load_threaded_request(): Another threaded load for resource path '" + p_path + "' has been initiated. Not an error."); - user_load_tokens[p_path]->reference(); // Additional request. - thread_load_mutex.unlock(); - return OK; - } - user_load_tokens[p_path] = nullptr; - thread_load_mutex.unlock(); + Ref token = _load_start(p_path, p_type_hint, p_use_sub_threads ? LOAD_THREAD_DISTRIBUTE : LOAD_THREAD_SPAWN_SINGLE, p_cache_mode, true); + return token.is_valid() ? OK : FAILED; +} - Ref token = _load_start(p_path, p_type_hint, p_use_sub_threads ? LOAD_THREAD_DISTRIBUTE : LOAD_THREAD_SPAWN_SINGLE, p_cache_mode); - if (token.is_valid()) { - thread_load_mutex.lock(); - token->user_path = p_path; - token->reference(); // First request. - user_load_tokens[p_path] = token.ptr(); - print_lt("REQUEST: user load tokens: " + itos(user_load_tokens.size())); - thread_load_mutex.unlock(); - return OK; +ResourceLoader::LoadToken *ResourceLoader::_load_threaded_request_reuse_user_token(const String &p_path) { + HashMap::Iterator E = user_load_tokens.find(p_path); + if (E) { + print_verbose("load_threaded_request(): Another threaded load for resource path '" + p_path + "' has been initiated. Not an error."); + LoadToken *token = E->value; + token->user_rc++; + return token; } else { - return FAILED; + return nullptr; } } +void ResourceLoader::_load_threaded_request_setup_user_token(LoadToken *p_token, const String &p_path) { + p_token->user_path = p_path; + p_token->reference(); // Extra RC until all user requests have been gotten. + p_token->user_rc = 1; + user_load_tokens[p_path] = p_token; + print_lt("REQUEST: user load tokens: " + itos(user_load_tokens.size())); +} + Ref ResourceLoader::load(const String &p_path, const String &p_type_hint, ResourceFormatLoader::CacheMode p_cache_mode, Error *r_error) { if (r_error) { *r_error = OK; } - Ref load_token = _load_start(p_path, p_type_hint, LOAD_THREAD_FROM_CURRENT, p_cache_mode); + LoadThreadMode thread_mode = LOAD_THREAD_FROM_CURRENT; + if (WorkerThreadPool::get_singleton()->get_caller_task_id() != WorkerThreadPool::INVALID_TASK_ID) { + // If user is initiating a single-threaded load from a WorkerThreadPool task, + // we instead spawn a new task so there's a precondition that a load in a pool task + // is always initiated by the engine. That makes certain aspects simpler, such as + // cyclic load detection and awaiting. + thread_mode = LOAD_THREAD_SPAWN_SINGLE; + } + Ref load_token = _load_start(p_path, p_type_hint, thread_mode, p_cache_mode); if (!load_token.is_valid()) { if (r_error) { *r_error = FAILED; @@ -470,22 +538,32 @@ Ref ResourceLoader::load(const String &p_path, const String &p_type_hi return res; } -Ref ResourceLoader::_load_start(const String &p_path, const String &p_type_hint, LoadThreadMode p_thread_mode, ResourceFormatLoader::CacheMode p_cache_mode) { +Ref ResourceLoader::_load_start(const String &p_path, const String &p_type_hint, LoadThreadMode p_thread_mode, ResourceFormatLoader::CacheMode p_cache_mode, bool p_for_user) { String local_path = _validate_local_path(p_path); bool ignoring_cache = p_cache_mode == ResourceFormatLoader::CACHE_MODE_IGNORE || p_cache_mode == ResourceFormatLoader::CACHE_MODE_IGNORE_DEEP; Ref load_token; bool must_not_register = false; - ThreadLoadTask unregistered_load_task; // Once set, must be valid up to the call to do the load. ThreadLoadTask *load_task_ptr = nullptr; - bool run_on_current_thread = false; { MutexLock thread_load_lock(thread_load_mutex); + if (p_for_user) { + LoadToken *existing_token = _load_threaded_request_reuse_user_token(p_path); + if (existing_token) { + return Ref(existing_token); + } + } + if (!ignoring_cache && thread_load_tasks.has(local_path)) { load_token = Ref(thread_load_tasks[local_path].load_token); if (load_token.is_valid()) { + if (p_for_user) { + // Load task exists, with no user tokens at the moment. + // Let's "attach" to it. + _load_threaded_request_setup_user_token(load_token.ptr(), p_path); + } return load_token; } else { // The token is dying (reached 0 on another thread). @@ -496,12 +574,14 @@ Ref ResourceLoader::_load_start(const String &p_path, load_token.instantiate(); load_token->local_path = local_path; + if (p_for_user) { + _load_threaded_request_setup_user_token(load_token.ptr(), p_path); + } //create load task { ThreadLoadTask load_task; - load_task.remapped_path = _path_remap(local_path, &load_task.xl_remapped); load_task.load_token = load_token.ptr(); load_task.local_path = local_path; load_task.type_hint = p_type_hint; @@ -514,17 +594,17 @@ Ref ResourceLoader::_load_start(const String &p_path, load_task.resource = existing; load_task.status = THREAD_LOAD_LOADED; load_task.progress = 1.0; + DEV_ASSERT(!thread_load_tasks.has(local_path)); thread_load_tasks[local_path] = load_task; return load_token; } } - // If we want to ignore cache, but there's another task loading it, we can't add this one to the map and we also have to finish within scope. + // If we want to ignore cache, but there's another task loading it, we can't add this one to the map. must_not_register = ignoring_cache && thread_load_tasks.has(local_path); if (must_not_register) { - load_token->local_path.clear(); - unregistered_load_task = load_task; - load_task_ptr = &unregistered_load_task; + load_token->task_if_unregistered = memnew(ThreadLoadTask(load_task)); + load_task_ptr = load_token->task_if_unregistered; } else { DEV_ASSERT(!thread_load_tasks.has(local_path)); HashMap::Iterator E = thread_load_tasks.insert(local_path, load_task); @@ -532,20 +612,26 @@ Ref ResourceLoader::_load_start(const String &p_path, } } - run_on_current_thread = must_not_register || p_thread_mode == LOAD_THREAD_FROM_CURRENT; + // It's important to keep the token alive because until the load completes, + // which includes before the thread start, it may happen that no one is grabbing + // the token anymore so it's released. + load_task_ptr->load_token->reference(); - if (run_on_current_thread) { - load_task_ptr->thread_id = Thread::get_caller_id(); + if (p_thread_mode == LOAD_THREAD_FROM_CURRENT) { + // The current thread may happen to be a thread from the pool. + WorkerThreadPool::TaskID tid = WorkerThreadPool::get_singleton()->get_caller_task_id(); + if (tid != WorkerThreadPool::INVALID_TASK_ID) { + load_task_ptr->task_id = tid; + } else { + load_task_ptr->thread_id = Thread::get_caller_id(); + } } else { - load_task_ptr->task_id = WorkerThreadPool::get_singleton()->add_native_task(&ResourceLoader::_thread_load_function, load_task_ptr); + load_task_ptr->task_id = WorkerThreadPool::get_singleton()->add_native_task(&ResourceLoader::_run_load_task, load_task_ptr); } - } + } // MutexLock(thread_load_mutex). - if (run_on_current_thread) { - _thread_load_function(load_task_ptr); - if (must_not_register) { - load_token->res_if_unregistered = load_task_ptr->resource; - } + if (p_thread_mode == LOAD_THREAD_FROM_CURRENT) { + _run_load_task(load_task_ptr); } return load_token; @@ -585,13 +671,7 @@ ResourceLoader::ThreadLoadStatus ResourceLoader::load_threaded_get_status(const } String local_path = _validate_local_path(p_path); - if (!thread_load_tasks.has(local_path)) { -#ifdef DEV_ENABLED - CRASH_NOW(); -#endif - // On non-dev, be defensive and at least avoid crashing (at this point at least). - return THREAD_LOAD_INVALID_RESOURCE; - } + ERR_FAIL_COND_V_MSG(!thread_load_tasks.has(local_path), THREAD_LOAD_INVALID_RESOURCE, "Bug in ResourceLoader logic, please report."); ThreadLoadTask &load_task = thread_load_tasks[local_path]; status = load_task.status; @@ -635,13 +715,7 @@ Ref ResourceLoader::load_threaded_get(const String &p_path, Error *r_e } LoadToken *load_token = user_load_tokens[p_path]; - if (!load_token) { - // This happens if requested from one thread and rapidly querying from another. - if (r_error) { - *r_error = ERR_BUSY; - } - return Ref(); - } + DEV_ASSERT(load_token->user_rc >= 1); // Support userland requesting on the main thread before the load is reported to be complete. if (Thread::is_main_thread() && !load_token->local_path.is_empty()) { @@ -658,8 +732,15 @@ Ref ResourceLoader::load_threaded_get(const String &p_path, Error *r_e } res = _load_complete_inner(*load_token, r_error, thread_load_lock); - if (load_token->unreference()) { - memdelete(load_token); + + load_token->user_rc--; + if (load_token->user_rc == 0) { + load_token->user_path.clear(); + user_load_tokens.erase(p_path); + if (load_token->unreference()) { + memdelete(load_token); + load_token = nullptr; + } } } @@ -678,16 +759,15 @@ Ref ResourceLoader::_load_complete_inner(LoadToken &p_load_token, Erro *r_error = OK; } - if (!p_load_token.local_path.is_empty()) { + ThreadLoadTask *load_task_ptr = nullptr; + if (p_load_token.task_if_unregistered) { + load_task_ptr = p_load_token.task_if_unregistered; + } else { if (!thread_load_tasks.has(p_load_token.local_path)) { -#ifdef DEV_ENABLED - CRASH_NOW(); -#endif - // On non-dev, be defensive and at least avoid crashing (at this point at least). if (r_error) { *r_error = ERR_BUG; } - return Ref(); + ERR_FAIL_V_MSG(Ref(), "Bug in ResourceLoader logic, please report."); } ThreadLoadTask &load_task = thread_load_tasks[p_load_token.local_path]; @@ -695,7 +775,7 @@ Ref ResourceLoader::_load_complete_inner(LoadToken &p_load_token, Erro if (load_task.status == THREAD_LOAD_IN_PROGRESS) { DEV_ASSERT((load_task.task_id == 0) != (load_task.thread_id == 0)); - if ((load_task.task_id != 0 && load_task.task_id == caller_task_id) || + if ((load_task.task_id != 0 && load_task.task_id == WorkerThreadPool::get_singleton()->get_caller_task_id()) || (load_task.thread_id != 0 && load_task.thread_id == Thread::get_caller_id())) { // Load is in progress, but it's precisely this thread the one in charge. // That means this is a cyclic load. @@ -706,49 +786,46 @@ Ref ResourceLoader::_load_complete_inner(LoadToken &p_load_token, Erro } bool loader_is_wtp = load_task.task_id != 0; - Error wtp_task_err = FAILED; if (loader_is_wtp) { // Loading thread is in the worker pool. - load_task.awaited = true; thread_load_mutex.unlock(); - wtp_task_err = WorkerThreadPool::get_singleton()->wait_for_task_completion(load_task.task_id); - } - if (load_task.status == THREAD_LOAD_IN_PROGRESS) { // If early errored, awaiting would deadlock. - if (loader_is_wtp) { - if (wtp_task_err == ERR_BUSY) { - // The WorkerThreadPool has reported that the current task wants to await on an older one. - // That't not allowed for safety, to avoid deadlocks. Fortunately, though, in the context of - // resource loading that means that the task to wait for can be restarted here to break the - // cycle, with as much recursion into this process as needed. - // When the stack is eventually unrolled, the original load will have been notified to go on. - // CACHE_MODE_IGNORE is needed because, otherwise, the new request would just see there's - // an ongoing load for that resource and wait for it again. This value forces a new load. - Ref token = _load_start(load_task.local_path, load_task.type_hint, LOAD_THREAD_DISTRIBUTE, ResourceFormatLoader::CACHE_MODE_IGNORE); - Ref resource = _load_complete(*token.ptr(), &wtp_task_err); - if (r_error) { - *r_error = wtp_task_err; - } - thread_load_mutex.lock(); - return resource; - } else { - DEV_ASSERT(wtp_task_err == OK); - thread_load_mutex.lock(); - } - } else { - // Loading thread is main or user thread. - if (!load_task.cond_var) { - load_task.cond_var = memnew(ConditionVariable); - } - do { - load_task.cond_var->wait(p_thread_load_lock); - DEV_ASSERT(thread_load_tasks.has(p_load_token.local_path) && p_load_token.get_reference_count()); - } while (load_task.cond_var); + PREPARE_FOR_WTP_WAIT + Error wait_err = WorkerThreadPool::get_singleton()->wait_for_task_completion(load_task.task_id); + RESTORE_AFTER_WTP_WAIT + + DEV_ASSERT(!wait_err || wait_err == ERR_BUSY); + if (wait_err == ERR_BUSY) { + // The WorkerThreadPool has reported that the current task wants to await on an older one. + // That't not allowed for safety, to avoid deadlocks. Fortunately, though, in the context of + // resource loading that means that the task to wait for can be restarted here to break the + // cycle, with as much recursion into this process as needed. + // When the stack is eventually unrolled, the original load will have been notified to go on. + load_task.load_token->reference(); + _run_load_task(&load_task); } - } else { - if (loader_is_wtp) { - thread_load_mutex.lock(); + + thread_load_mutex.lock(); + load_task.awaited = true; + + DEV_ASSERT(load_task.status == THREAD_LOAD_FAILED || load_task.status == THREAD_LOAD_LOADED); + } else if (load_task.need_wait) { + // Loading thread is main or user thread. + if (!load_task.cond_var) { + load_task.cond_var = memnew(ConditionVariable); } + load_task.awaiters_count++; + do { + load_task.cond_var->wait(p_thread_load_lock); + DEV_ASSERT(thread_load_tasks.has(p_load_token.local_path) && p_load_token.get_reference_count()); + } while (load_task.need_wait); + load_task.awaiters_count--; + if (load_task.awaiters_count == 0) { + memdelete(load_task.cond_var); + load_task.cond_var = nullptr; + } + + DEV_ASSERT(load_task.status == THREAD_LOAD_FAILED || load_task.status == THREAD_LOAD_LOADED); } } @@ -757,22 +834,51 @@ Ref ResourceLoader::_load_complete_inner(LoadToken &p_load_token, Erro load_task.error = FAILED; } - Ref resource = load_task.resource; - if (r_error) { - *r_error = load_task.error; - } - return resource; - } else { - // Special case of an unregistered task. - // The resource should have been loaded by now. - Ref resource = p_load_token.res_if_unregistered; - if (!resource.is_valid()) { - if (r_error) { - *r_error = FAILED; + load_task_ptr = &load_task; + } + + thread_load_mutex.unlock(); + + Ref resource = load_task_ptr->resource; + if (r_error) { + *r_error = load_task_ptr->error; + } + + if (resource.is_valid()) { + if (curr_load_task) { + // A task awaiting another => Let the awaiter accumulate the resource changed connections. + DEV_ASSERT(curr_load_task != load_task_ptr); + for (const ThreadLoadTask::ResourceChangedConnection &rcc : load_task_ptr->resource_changed_connections) { + curr_load_task->resource_changed_connections.push_back(rcc); + } + } else { + // A leaf task being awaited => Propagate the resource changed connections. + if (Thread::is_main_thread()) { + // On the main thread it's safe to migrate the connections to the standard signal mechanism. + for (const ThreadLoadTask::ResourceChangedConnection &rcc : load_task_ptr->resource_changed_connections) { + if (rcc.callable.is_valid()) { + rcc.source->connect_changed(rcc.callable, rcc.flags); + } + } + } else { + // On non-main threads, we have to queue and call it done when processed. + if (!load_task_ptr->resource_changed_connections.is_empty()) { + for (const ThreadLoadTask::ResourceChangedConnection &rcc : load_task_ptr->resource_changed_connections) { + if (rcc.callable.is_valid()) { + MessageQueue::get_main_singleton()->push_callable(callable_mp(rcc.source, &Resource::connect_changed).bind(rcc.callable, rcc.flags)); + } + } + core_bind::Semaphore done; + MessageQueue::get_main_singleton()->push_callable(callable_mp(&done, &core_bind::Semaphore::post)); + done.wait(); + } } } - return resource; } + + thread_load_mutex.lock(); + + return resource; } bool ResourceLoader::_ensure_load_progress() { @@ -786,6 +892,50 @@ bool ResourceLoader::_ensure_load_progress() { return true; } +void ResourceLoader::resource_changed_connect(Resource *p_source, const Callable &p_callable, uint32_t p_flags) { + print_lt(vformat("%d\t%ud:%s\t" FUNCTION_STR "\t%d", Thread::get_caller_id(), p_source->get_instance_id(), p_source->get_class(), p_callable.get_object_id())); + + MutexLock lock(thread_load_mutex); + + for (const ThreadLoadTask::ResourceChangedConnection &rcc : curr_load_task->resource_changed_connections) { + if (unlikely(rcc.source == p_source && rcc.callable == p_callable)) { + return; + } + } + + ThreadLoadTask::ResourceChangedConnection rcc; + rcc.source = p_source; + rcc.callable = p_callable; + rcc.flags = p_flags; + curr_load_task->resource_changed_connections.push_back(rcc); +} + +void ResourceLoader::resource_changed_disconnect(Resource *p_source, const Callable &p_callable) { + print_lt(vformat("%d\t%ud:%s\t" FUNCTION_STR "t%d", Thread::get_caller_id(), p_source->get_instance_id(), p_source->get_class(), p_callable.get_object_id())); + + MutexLock lock(thread_load_mutex); + + for (uint32_t i = 0; i < curr_load_task->resource_changed_connections.size(); ++i) { + const ThreadLoadTask::ResourceChangedConnection &rcc = curr_load_task->resource_changed_connections[i]; + if (unlikely(rcc.source == p_source && rcc.callable == p_callable)) { + curr_load_task->resource_changed_connections.remove_at_unordered(i); + return; + } + } +} + +void ResourceLoader::resource_changed_emit(Resource *p_source) { + print_lt(vformat("%d\t%ud:%s\t" FUNCTION_STR, Thread::get_caller_id(), p_source->get_instance_id(), p_source->get_class())); + + MutexLock lock(thread_load_mutex); + + for (const ThreadLoadTask::ResourceChangedConnection &rcc : curr_load_task->resource_changed_connections) { + if (unlikely(rcc.source == p_source)) { + rcc.callable.call(); + } + } +} + Ref ResourceLoader::ensure_resource_ref_override_for_outer_load(const String &p_path, const String &p_res_type) { ERR_FAIL_COND_V(load_nesting == 0, Ref()); // It makes no sense to use this from nesting level 0. const String &local_path = _validate_local_path(p_path); @@ -1171,11 +1321,10 @@ void ResourceLoader::clear_thread_load_tasks() { if (thread_load_tasks.size()) { for (KeyValue &E : thread_load_tasks) { if (E.value.status == THREAD_LOAD_IN_PROGRESS) { - if (E.value.cond_var) { + if (E.value.cond_var && E.value.need_wait) { E.value.cond_var->notify_all(); - memdelete(E.value.cond_var); - E.value.cond_var = nullptr; } + E.value.need_wait = false; none_running = false; } } @@ -1189,10 +1338,13 @@ void ResourceLoader::clear_thread_load_tasks() { } while (user_load_tokens.begin()) { - // User load tokens remove themselves from the map on destruction. - memdelete(user_load_tokens.begin()->value); + LoadToken *user_token = user_load_tokens.begin()->value; + user_load_tokens.remove(user_load_tokens.begin()); + DEV_ASSERT(user_token->user_rc > 0 && !user_token->user_path.is_empty()); + user_token->user_path.clear(); + user_token->user_rc = 0; + user_token->unreference(); } - user_load_tokens.clear(); thread_load_tasks.clear(); @@ -1310,12 +1462,16 @@ bool ResourceLoader::abort_on_missing_resource = true; bool ResourceLoader::timestamp_on_load = false; thread_local int ResourceLoader::load_nesting = 0; -thread_local WorkerThreadPool::TaskID ResourceLoader::caller_task_id = 0; -thread_local Vector *ResourceLoader::load_paths_stack = nullptr; +thread_local Vector ResourceLoader::load_paths_stack; thread_local HashMap>> ResourceLoader::res_ref_overrides; +thread_local ResourceLoader::ThreadLoadTask *ResourceLoader::curr_load_task = nullptr; + +SafeBinaryMutex &_get_res_loader_mutex() { + return ResourceLoader::thread_load_mutex; +} template <> -thread_local uint32_t SafeBinaryMutex::count = 0; +thread_local SafeBinaryMutex::TLSData SafeBinaryMutex::tls_data(_get_res_loader_mutex()); SafeBinaryMutex ResourceLoader::thread_load_mutex; HashMap ResourceLoader::thread_load_tasks; bool ResourceLoader::cleaning_tasks = false; diff --git a/core/io/resource_loader.h b/core/io/resource_loader.h index 9fa558299c..35aff65856 100644 --- a/core/io/resource_loader.h +++ b/core/io/resource_loader.h @@ -102,10 +102,14 @@ typedef Error (*ResourceLoaderImport)(const String &p_path); typedef void (*ResourceLoadedCallback)(Ref p_resource, const String &p_path); class ResourceLoader { + friend class LoadToken; + enum { MAX_LOADERS = 64 }; + struct ThreadLoadTask; + public: enum ThreadLoadStatus { THREAD_LOAD_INVALID_RESOURCE, @@ -123,7 +127,8 @@ class ResourceLoader { struct LoadToken : public RefCounted { String local_path; String user_path; - Ref res_if_unregistered; + uint32_t user_rc = 0; // Having user RC implies regular RC incremented in one, until the user RC reaches zero. + ThreadLoadTask *task_if_unregistered = nullptr; void clear(); @@ -132,10 +137,13 @@ class ResourceLoader { static const int BINARY_MUTEX_TAG = 1; - static Ref _load_start(const String &p_path, const String &p_type_hint, LoadThreadMode p_thread_mode, ResourceFormatLoader::CacheMode p_cache_mode); + static Ref _load_start(const String &p_path, const String &p_type_hint, LoadThreadMode p_thread_mode, ResourceFormatLoader::CacheMode p_cache_mode, bool p_for_user = false); static Ref _load_complete(LoadToken &p_load_token, Error *r_error); private: + static LoadToken *_load_threaded_request_reuse_user_token(const String &p_path); + static void _load_threaded_request_setup_user_token(LoadToken *p_token, const String &p_path); + static Ref _load_complete_inner(LoadToken &p_load_token, Error *r_error, MutexLock> &p_thread_load_lock); static Ref loader[MAX_LOADERS]; @@ -169,9 +177,10 @@ class ResourceLoader { Thread::ID thread_id = 0; // Used if running on an user thread (e.g., simple non-threaded load). bool awaited = false; // If it's in the pool, this helps not awaiting from more than one dependent thread. ConditionVariable *cond_var = nullptr; // In not in the worker pool or already awaiting, this is used as a secondary awaiting mechanism. + uint32_t awaiters_count = 0; + bool need_wait = true; LoadToken *load_token = nullptr; String local_path; - String remapped_path; String type_hint; float progress = 0.0f; float max_reported_progress = 0.0f; @@ -180,18 +189,27 @@ class ResourceLoader { ResourceFormatLoader::CacheMode cache_mode = ResourceFormatLoader::CACHE_MODE_REUSE; Error error = OK; Ref resource; - bool xl_remapped = false; bool use_sub_threads = false; HashSet sub_tasks; + + struct ResourceChangedConnection { + Resource *source = nullptr; + Callable callable; + uint32_t flags = 0; + }; + LocalVector resource_changed_connections; }; - static void _thread_load_function(void *p_userdata); + static void _run_load_task(void *p_userdata); static thread_local int load_nesting; - static thread_local WorkerThreadPool::TaskID caller_task_id; static thread_local HashMap>> res_ref_overrides; // Outermost key is nesting level. - static thread_local Vector *load_paths_stack; // A pointer to avoid broken TLS implementations from double-running the destructor. + static thread_local Vector load_paths_stack; + static thread_local ThreadLoadTask *curr_load_task; + static SafeBinaryMutex thread_load_mutex; + friend SafeBinaryMutex &_get_res_loader_mutex(); + static HashMap thread_load_tasks; static bool cleaning_tasks; @@ -208,6 +226,10 @@ class ResourceLoader { static bool is_within_load() { return load_nesting > 0; }; + static void resource_changed_connect(Resource *p_source, const Callable &p_callable, uint32_t p_flags); + static void resource_changed_disconnect(Resource *p_source, const Callable &p_callable); + static void resource_changed_emit(Resource *p_source); + static Ref load(const String &p_path, const String &p_type_hint = "", ResourceFormatLoader::CacheMode p_cache_mode = ResourceFormatLoader::CACHE_MODE_REUSE, Error *r_error = nullptr); static bool exists(const String &p_path, const String &p_type_hint = ""); diff --git a/core/object/class_db.cpp b/core/object/class_db.cpp index dcd41c9fb0..3a07d7dad6 100644 --- a/core/object/class_db.cpp +++ b/core/object/class_db.cpp @@ -666,58 +666,76 @@ void ClassDB::set_object_extension_instance(Object *p_object, const StringName & } bool ClassDB::can_instantiate(const StringName &p_class) { - OBJTYPE_RLOCK; + String script_path; + { + OBJTYPE_RLOCK; - ClassInfo *ti = classes.getptr(p_class); - if (!ti) { - if (!ScriptServer::is_global_class(p_class)) { - ERR_FAIL_V_MSG(false, "Cannot get class '" + String(p_class) + "'."); + ClassInfo *ti = classes.getptr(p_class); + if (!ti) { + if (!ScriptServer::is_global_class(p_class)) { + ERR_FAIL_V_MSG(false, "Cannot get class '" + String(p_class) + "'."); + } + script_path = ScriptServer::get_global_class_path(p_class); + goto use_script; // Open the lock for resource loading. } - String path = ScriptServer::get_global_class_path(p_class); - Ref