Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(clustering/sync): clean the logic of sync_once #13956

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 17 additions & 31 deletions kong/clustering/services/sync/rpc.lua
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ local CLUSTERING_DATA_PLANES_LATEST_VERSION_KEY = constants.CLUSTERING_DATA_PLAN
local DECLARATIVE_DEFAULT_WORKSPACE_KEY = constants.DECLARATIVE_DEFAULT_WORKSPACE_KEY
local CLUSTERING_SYNC_STATUS = constants.CLUSTERING_SYNC_STATUS
local SYNC_MUTEX_OPTS = { name = "get_delta", timeout = 0, }
local MAX_RETRY = 5
local SYNC_MAX_RETRY = 5


local assert = assert
Expand Down Expand Up @@ -388,7 +388,8 @@ local function do_sync()
end


local function sync_handler(premature)
local sync_handler
sync_handler = function(premature, try_counter)
chobits marked this conversation as resolved.
Show resolved Hide resolved
if premature then
return
end
Expand All @@ -397,52 +398,37 @@ local function sync_handler(premature)
if not res and err ~= "timeout" then
ngx_log(ngx_ERR, "unable to create worker mutex and sync: ", err)
end
end


local sync_once_impl


local function start_sync_once_timer(retry_count)
local ok, err = ngx.timer.at(0, sync_once_impl, retry_count or 0)
if not ok then
return nil, err
-- try_counter is not set, only run once
if not try_counter then
return
end

return true
end


function sync_once_impl(premature, retry_count)
if premature then
if try_counter <= 0 then
ngx_log(ngx_ERR, "sync_once try count exceeded.")
return
end

sync_handler()

local latest_notified_version = ngx.shared.kong:get(CLUSTERING_DATA_PLANES_LATEST_VERSION_KEY)
local current_version = tonumber(declarative.get_current_hash()) or 0
assert(try_counter >= 1)

local latest_notified_version = ngx.shared.kong:get(CLUSTERING_DATA_PLANES_LATEST_VERSION_KEY)
if not latest_notified_version then
ngx_log(ngx_DEBUG, "no version notified yet")
return
end

-- retry if the version is not updated
if current_version < latest_notified_version then
retry_count = retry_count or 0
if retry_count > MAX_RETRY then
ngx_log(ngx_ERR, "sync_once retry count exceeded. retry_count: ", retry_count)
return
end

return start_sync_once_timer(retry_count + 1)
local current_version = tonumber(declarative.get_current_hash()) or 0
if current_version >= latest_notified_version then
return
end

-- retry if the version is not updated
return ngx.timer.at(0, sync_handler, try_counter - 1)
end


function _M:sync_once(delay)
return ngx.timer.at(delay or 0, sync_once_impl, 0)
return ngx.timer.at(delay or 0, sync_handler, SYNC_MAX_RETRY)
end


Expand Down
Loading