From d7f3df92aa0af30e01edb8c4312217a758b072f1 Mon Sep 17 00:00:00 2001 From: chronolaw Date: Sat, 30 Nov 2024 09:35:13 +0800 Subject: [PATCH 1/6] refactor sync_once --- kong/clustering/services/sync/rpc.lua | 50 +++++++++++---------------- 1 file changed, 20 insertions(+), 30 deletions(-) diff --git a/kong/clustering/services/sync/rpc.lua b/kong/clustering/services/sync/rpc.lua index 1ec1aa475f5..576c5d65548 100644 --- a/kong/clustering/services/sync/rpc.lua +++ b/kong/clustering/services/sync/rpc.lua @@ -388,7 +388,9 @@ local function do_sync() end -local function sync_handler(premature) +local sync_handler + +sync_handler = function(premature, try_counter) if premature then return end @@ -397,52 +399,40 @@ local function sync_handler(premature) if not res and err ~= "timeout" then ngx_log(ngx_ERR, "unable to create worker mutex and sync: ", err) end -end - - -local sync_once_impl - - -local function start_sync_once_timer(retry_count) - local ok, err = ngx.timer.at(0, sync_once_impl, retry_count or 0) - if not ok then - return nil, err - end - return true -end - - -function sync_once_impl(premature, retry_count) - if premature then + if not try_counter then return end - sync_handler() - local latest_notified_version = ngx.shared.kong:get(CLUSTERING_DATA_PLANES_LATEST_VERSION_KEY) - local current_version = tonumber(declarative.get_current_hash()) or 0 - if not latest_notified_version then ngx_log(ngx_DEBUG, "no version notified yet") return end + local current_version = tonumber(declarative.get_current_hash()) or 0 + if current_version >= latest_notified_version then + return + end + -- retry if the version is not updated - if current_version < latest_notified_version then - retry_count = retry_count or 0 - if retry_count > MAX_RETRY then - ngx_log(ngx_ERR, "sync_once retry count exceeded. retry_count: ", retry_count) - return - end - return start_sync_once_timer(retry_count + 1) + if try_counter > MAX_RETRY then + ngx_log(ngx_ERR, "sync_once try count exceeded. try_counter: ", try_counter) + return end + + local ok, err = ngx.timer.at(0, sync_handler, try_counter + 1) + if not ok then + return nil, err + end + + return true end function _M:sync_once(delay) - return ngx.timer.at(delay or 0, sync_once_impl, 0) + return ngx.timer.at(delay or 0, sync_handler, 0) end From cf9da6144f555bb140dac9073a4d4d86ddf3e078 Mon Sep 17 00:00:00 2001 From: chronolaw Date: Sat, 30 Nov 2024 09:44:51 +0800 Subject: [PATCH 2/6] clean --- kong/clustering/services/sync/rpc.lua | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/kong/clustering/services/sync/rpc.lua b/kong/clustering/services/sync/rpc.lua index 576c5d65548..059f32a0737 100644 --- a/kong/clustering/services/sync/rpc.lua +++ b/kong/clustering/services/sync/rpc.lua @@ -389,7 +389,6 @@ end local sync_handler - sync_handler = function(premature, try_counter) if premature then return @@ -400,10 +399,13 @@ sync_handler = function(premature, try_counter) ngx_log(ngx_ERR, "unable to create worker mutex and sync: ", err) end + -- try_counter is not set, only run once if not try_counter then return end + assert(try_counter >= 1) + local latest_notified_version = ngx.shared.kong:get(CLUSTERING_DATA_PLANES_LATEST_VERSION_KEY) if not latest_notified_version then ngx_log(ngx_DEBUG, "no version notified yet") @@ -415,24 +417,18 @@ sync_handler = function(premature, try_counter) return end - -- retry if the version is not updated - if try_counter > MAX_RETRY then ngx_log(ngx_ERR, "sync_once try count exceeded. try_counter: ", try_counter) return end - local ok, err = ngx.timer.at(0, sync_handler, try_counter + 1) - if not ok then - return nil, err - end - - return true + -- retry if the version is not updated + return ngx.timer.at(0, sync_handler, try_counter + 1) end function _M:sync_once(delay) - return ngx.timer.at(delay or 0, sync_handler, 0) + return ngx.timer.at(delay or 0, sync_handler, 1) end From e15e5458934dd474cd7ab58ee8fadadcabfd4861 Mon Sep 17 00:00:00 2001 From: chronolaw Date: Sat, 30 Nov 2024 09:51:09 +0800 Subject: [PATCH 3/6] MAX_RETRY --- kong/clustering/services/sync/rpc.lua | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/kong/clustering/services/sync/rpc.lua b/kong/clustering/services/sync/rpc.lua index 059f32a0737..19713602e21 100644 --- a/kong/clustering/services/sync/rpc.lua +++ b/kong/clustering/services/sync/rpc.lua @@ -404,6 +404,11 @@ sync_handler = function(premature, try_counter) return end + if try_counter <= 0 then + ngx_log(ngx_ERR, "sync_once try count exceeded.") + return + end + assert(try_counter >= 1) local latest_notified_version = ngx.shared.kong:get(CLUSTERING_DATA_PLANES_LATEST_VERSION_KEY) @@ -417,18 +422,13 @@ sync_handler = function(premature, try_counter) return end - if try_counter > MAX_RETRY then - ngx_log(ngx_ERR, "sync_once try count exceeded. try_counter: ", try_counter) - return - end - -- retry if the version is not updated - return ngx.timer.at(0, sync_handler, try_counter + 1) + return ngx.timer.at(0, sync_handler, try_counter - 1) end function _M:sync_once(delay) - return ngx.timer.at(delay or 0, sync_handler, 1) + return ngx.timer.at(delay or 0, sync_handler, MAX_RETRY) end From 0b49f5d02cd9f7211138cca6d76a7385f4a959c4 Mon Sep 17 00:00:00 2001 From: chronolaw Date: Sat, 30 Nov 2024 09:53:21 +0800 Subject: [PATCH 4/6] SYNC_MAX_RETRY --- kong/clustering/services/sync/rpc.lua | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kong/clustering/services/sync/rpc.lua b/kong/clustering/services/sync/rpc.lua index 19713602e21..a4d5e8c5117 100644 --- a/kong/clustering/services/sync/rpc.lua +++ b/kong/clustering/services/sync/rpc.lua @@ -17,7 +17,7 @@ local CLUSTERING_DATA_PLANES_LATEST_VERSION_KEY = constants.CLUSTERING_DATA_PLAN local DECLARATIVE_DEFAULT_WORKSPACE_KEY = constants.DECLARATIVE_DEFAULT_WORKSPACE_KEY local CLUSTERING_SYNC_STATUS = constants.CLUSTERING_SYNC_STATUS local SYNC_MUTEX_OPTS = { name = "get_delta", timeout = 0, } -local MAX_RETRY = 5 +local SYNC_MAX_RETRY = 5 local assert = assert @@ -428,7 +428,7 @@ end function _M:sync_once(delay) - return ngx.timer.at(delay or 0, sync_handler, MAX_RETRY) + return ngx.timer.at(delay or 0, sync_handler, SYNC_MAX_RETRY) end From b5415b285e5670263f70c4113bc4533549db0ba8 Mon Sep 17 00:00:00 2001 From: chronolaw Date: Mon, 2 Dec 2024 10:53:32 +0800 Subject: [PATCH 5/6] clean --- kong/clustering/services/sync/rpc.lua | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/kong/clustering/services/sync/rpc.lua b/kong/clustering/services/sync/rpc.lua index a4d5e8c5117..2e25a0a0872 100644 --- a/kong/clustering/services/sync/rpc.lua +++ b/kong/clustering/services/sync/rpc.lua @@ -388,8 +388,7 @@ local function do_sync() end -local sync_handler -sync_handler = function(premature, try_counter) +local function sync_handler(premature, try_counter) if premature then return end From 2359635974a058061dda7c9357d5652ecf5e8775 Mon Sep 17 00:00:00 2001 From: chronolaw Date: Mon, 2 Dec 2024 11:13:52 +0800 Subject: [PATCH 6/6] Revert "clean" This reverts commit b5415b285e5670263f70c4113bc4533549db0ba8. --- kong/clustering/services/sync/rpc.lua | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/kong/clustering/services/sync/rpc.lua b/kong/clustering/services/sync/rpc.lua index 2e25a0a0872..a4d5e8c5117 100644 --- a/kong/clustering/services/sync/rpc.lua +++ b/kong/clustering/services/sync/rpc.lua @@ -388,7 +388,8 @@ local function do_sync() end -local function sync_handler(premature, try_counter) +local sync_handler +sync_handler = function(premature, try_counter) if premature then return end