Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(clustering/rpc): change sync version to string #14078

Merged
merged 19 commits into from
Jan 12, 2025
48 changes: 31 additions & 17 deletions kong/clustering/services/sync/rpc.lua
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ local insert_entity_for_txn = declarative.insert_entity_for_txn
local delete_entity_for_txn = declarative.delete_entity_for_txn
local DECLARATIVE_HASH_KEY = constants.DECLARATIVE_HASH_KEY
local CLUSTERING_DATA_PLANES_LATEST_VERSION_KEY = constants.CLUSTERING_DATA_PLANES_LATEST_VERSION_KEY
local DECLARATIVE_EMPTY_CONFIG_HASH = constants.DECLARATIVE_EMPTY_CONFIG_HASH
local DECLARATIVE_DEFAULT_WORKSPACE_KEY = constants.DECLARATIVE_DEFAULT_WORKSPACE_KEY
local CLUSTERING_SYNC_STATUS = constants.CLUSTERING_SYNC_STATUS
local SYNC_MUTEX_OPTS = { name = "get_delta", timeout = 0, }
Expand All @@ -22,7 +23,7 @@ local MAX_RETRY = 5

local assert = assert
local ipairs = ipairs
local fmt = string.format
local sub = string.sub
local ngx_null = ngx.null
local ngx_log = ngx.log
local ngx_ERR = ngx.ERR
Expand Down Expand Up @@ -54,13 +55,29 @@ local function full_sync_result()
end


local function get_current_version()
return declarative.get_current_hash() or DECLARATIVE_EMPTY_CONFIG_HASH
end


local is_valid_version
do
local VER_PREFIX = "v02_"

-- version string must start with 'v02_'
is_valid_version = function(v)
return sub(v, 1, 4) == VER_PREFIX
end
end


function _M:init_cp(manager)
local purge_delay = manager.conf.cluster_data_plane_purge_delay

-- CP
-- Method: kong.sync.v2.get_delta
-- Params: versions: list of current versions of the database
-- example: { default = { version = 1000, }, }
-- example: { default = { version = "1000", }, }
chronolaw marked this conversation as resolved.
Show resolved Hide resolved
manager.callbacks:register("kong.sync.v2.get_delta", function(node_id, current_versions)
ngx_log(ngx_DEBUG, "[kong.sync.v2] config push (connected client)")

Expand All @@ -75,7 +92,7 @@ function _M:init_cp(manager)
return nil, "default namespace does not exist inside params"
end

-- { default = { version = 1000, }, }
-- { default = { version = "1000", }, }
local default_namespace_version = default_namespace.version
local node_info = assert(kong.rpc:get_peer_info(node_id))

Expand All @@ -88,7 +105,7 @@ function _M:init_cp(manager)
labels = node_info.labels, -- get from rpc call
cert_details = node_info.cert_details, -- get from rpc call
sync_status = CLUSTERING_SYNC_STATUS.NORMAL,
config_hash = fmt("%032d", default_namespace_version),
config_hash = default_namespace_version,
rpc_capabilities = rpc_peers and rpc_peers[node_id] or {},
}, { ttl = purge_delay, no_broadcast_crud_event = true, })
if not ok then
Expand All @@ -100,7 +117,8 @@ function _M:init_cp(manager)
return nil, err
end

if default_namespace_version == 0 or
-- string comparison effectively does the same as number comparison
if not is_valid_version(default_namespace_version) or
default_namespace_version < latest_version then
chronolaw marked this conversation as resolved.
Show resolved Hide resolved
return full_sync_result()
end
Expand All @@ -115,7 +133,7 @@ function _M:init_dp(manager)
-- DP
-- Method: kong.sync.v2.notify_new_version
-- Params: new_versions: list of namespaces and their new versions, like:
-- { default = { new_version = 1000, }, }
-- { default = { new_version = "1000", }, }
manager.callbacks:register("kong.sync.v2.notify_new_version", function(node_id, new_versions)
-- TODO: currently only default is supported, and anything else is ignored
local default_new_version = new_versions.default
Expand All @@ -128,7 +146,7 @@ function _M:init_dp(manager)
return nil, "'new_version' key does not exist"
end

local lmdb_ver = tonumber(declarative.get_current_hash()) or 0
local lmdb_ver = get_current_version()
if lmdb_ver < version then
-- set lastest version to shm
kong_shm:set(CLUSTERING_DATA_PLANES_LATEST_VERSION_KEY, version)
Expand Down Expand Up @@ -172,11 +190,7 @@ local function do_sync()
return nil, "rpc is not ready"
end

local msg = { default =
{ version =
tonumber(declarative.get_current_hash()) or 0,
},
}
local msg = { default = { version = get_current_version(), }, }

local ns_deltas, err = kong.rpc:call("control_plane", "kong.sync.v2.get_delta", msg)
if not ns_deltas then
Expand Down Expand Up @@ -226,13 +240,13 @@ local function do_sync()

local db = kong.db

local version = 0
local version = ""
local opts = {}
local crud_events = {}
local crud_events_n = 0

-- delta should look like:
-- { type = ..., entity = { ... }, version = 1, ws_id = ..., }
-- { type = ..., entity = { ... }, version = "1", ws_id = ..., }
for _, delta in ipairs(deltas) do
local delta_version = delta.version
local delta_type = delta.type
Expand Down Expand Up @@ -308,15 +322,15 @@ local function do_sync()
end

-- delta.version should not be nil or ngx.null
assert(type(delta_version) == "number")
assert(type(delta_version) == "string")

if delta_version ~= version then
version = delta_version
end
end -- for _, delta

-- store current sync version
t:set(DECLARATIVE_HASH_KEY, fmt("%032d", version))
t:set(DECLARATIVE_HASH_KEY, version)

-- store the correct default workspace uuid
if default_ws_changed then
Expand Down Expand Up @@ -391,7 +405,7 @@ function sync_once_impl(premature, retry_count)
return
end

local current_version = tonumber(declarative.get_current_hash()) or 0
local current_version = get_current_version()
if current_version >= latest_notified_version then
ngx_log(ngx_DEBUG, "version already updated")
return
Expand Down
9 changes: 7 additions & 2 deletions kong/clustering/services/sync/strategies/postgres.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,14 @@ local _M = {}
local _MT = { __index = _M }


local fmt = string.format
local ngx_null = ngx.null


-- version string should look like: "v02_0000"
local VERSION_FMT = "v02_%028x"


function _M.new(db)
local self = {
connector = db.connector,
Expand Down Expand Up @@ -44,10 +49,10 @@ function _M:get_latest_version()

local ver = res[1] and res[1].max
if ver == ngx_null then
return 0
return fmt(VERSION_FMT, 0)
end

return ver
return fmt(VERSION_FMT, ver)
end


Expand Down
15 changes: 11 additions & 4 deletions spec/02-integration/18-hybrid_rpc/05-sync-rpc_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@ local setup = require("spec.helpers.rpc_mock.setup")
local get_node_id = misc.get_node_id
local DP_PREFIX = "servroot_dp"


-- now version must be a string
local function fmt(v)
return string.format("v02_%028x", v)
end


local function change_config()
-- the initial sync is flaky. let's trigger a sync by creating a service
local admin_client = helpers.admin_client()
Expand Down Expand Up @@ -74,12 +81,12 @@ describe("kong.sync.v2", function()
local called = false
mocked_cp:mock("kong.sync.v2.get_delta", function(node_id, payload)
called = true
return { default = { version = 100, deltas = {} } }
return { default = { version = fmt(100), deltas = {} } }
end)

-- make a call from the mocked cp
-- CP->DP: notify_new_version
assert(mocked_cp:call(node_id, "kong.sync.v2.notify_new_version", { default = { new_version = 100, } }))
assert(mocked_cp:call(node_id, "kong.sync.v2.notify_new_version", { default = { new_version = fmt(100), } }))

-- DP->CP: get_delta
-- the dp after receiving the notification will make a call to the cp
Expand Down Expand Up @@ -120,7 +127,7 @@ describe("kong.sync.v2", function()
-- this is a workaround to registers the data plane node
-- CP does not register the DP node until it receives a call from the DP
function register_dp()
local res, err = mocked_dp:call("control_plane", "kong.sync.v2.get_delta", { default = { version = 0,},})
local res, err = mocked_dp:call("control_plane", "kong.sync.v2.get_delta", { default = { version = fmt(0),},})
assert.is_nil(err)
assert.is_table(res and res.default and res.default.deltas)
end
Expand All @@ -132,7 +139,7 @@ describe("kong.sync.v2", function()
end)

it("rpc call", function()
local res, err = mocked_dp:call("control_plane", "kong.sync.v2.get_delta", { default = { version = 0,},})
local res, err = mocked_dp:call("control_plane", "kong.sync.v2.get_delta", { default = { version = fmt(0),},})
assert.is_nil(err)
assert.is_table(res and res.default and res.default.deltas)

Expand Down
Loading