diff --git a/kong/clustering/services/sync/rpc.lua b/kong/clustering/services/sync/rpc.lua index 1ec1aa475f5a..a4d5e8c51171 100644 --- a/kong/clustering/services/sync/rpc.lua +++ b/kong/clustering/services/sync/rpc.lua @@ -17,7 +17,7 @@ local CLUSTERING_DATA_PLANES_LATEST_VERSION_KEY = constants.CLUSTERING_DATA_PLAN local DECLARATIVE_DEFAULT_WORKSPACE_KEY = constants.DECLARATIVE_DEFAULT_WORKSPACE_KEY local CLUSTERING_SYNC_STATUS = constants.CLUSTERING_SYNC_STATUS local SYNC_MUTEX_OPTS = { name = "get_delta", timeout = 0, } -local MAX_RETRY = 5 +local SYNC_MAX_RETRY = 5 local assert = assert @@ -388,7 +388,8 @@ local function do_sync() end -local function sync_handler(premature) +local sync_handler +sync_handler = function(premature, try_counter) if premature then return end @@ -397,52 +398,37 @@ local function sync_handler(premature) if not res and err ~= "timeout" then ngx_log(ngx_ERR, "unable to create worker mutex and sync: ", err) end -end - - -local sync_once_impl - -local function start_sync_once_timer(retry_count) - local ok, err = ngx.timer.at(0, sync_once_impl, retry_count or 0) - if not ok then - return nil, err + -- try_counter is not set, only run once + if not try_counter then + return end - return true -end - - -function sync_once_impl(premature, retry_count) - if premature then + if try_counter <= 0 then + ngx_log(ngx_ERR, "sync_once try count exceeded.") return end - sync_handler() - - local latest_notified_version = ngx.shared.kong:get(CLUSTERING_DATA_PLANES_LATEST_VERSION_KEY) - local current_version = tonumber(declarative.get_current_hash()) or 0 + assert(try_counter >= 1) + local latest_notified_version = ngx.shared.kong:get(CLUSTERING_DATA_PLANES_LATEST_VERSION_KEY) if not latest_notified_version then ngx_log(ngx_DEBUG, "no version notified yet") return end - -- retry if the version is not updated - if current_version < latest_notified_version then - retry_count = retry_count or 0 - if retry_count > MAX_RETRY then - ngx_log(ngx_ERR, "sync_once retry count exceeded. retry_count: ", retry_count) - return - end - - return start_sync_once_timer(retry_count + 1) + local current_version = tonumber(declarative.get_current_hash()) or 0 + if current_version >= latest_notified_version then + return end + + -- retry if the version is not updated + return ngx.timer.at(0, sync_handler, try_counter - 1) end function _M:sync_once(delay) - return ngx.timer.at(delay or 0, sync_once_impl, 0) + return ngx.timer.at(delay or 0, sync_handler, SYNC_MAX_RETRY) end