Skip to content

Commit

Permalink
feat(clustering/rpc): change sync version to string (#14078)
Browse files Browse the repository at this point in the history
This allows more flexibility for CP when implementing it's version generation method.

KAG-5894
  • Loading branch information
chronolaw authored Jan 12, 2025
1 parent 1903d97 commit 4ee00b3
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 23 deletions.
48 changes: 31 additions & 17 deletions kong/clustering/services/sync/rpc.lua
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ local insert_entity_for_txn = declarative.insert_entity_for_txn
local delete_entity_for_txn = declarative.delete_entity_for_txn
local DECLARATIVE_HASH_KEY = constants.DECLARATIVE_HASH_KEY
local CLUSTERING_DATA_PLANES_LATEST_VERSION_KEY = constants.CLUSTERING_DATA_PLANES_LATEST_VERSION_KEY
local DECLARATIVE_EMPTY_CONFIG_HASH = constants.DECLARATIVE_EMPTY_CONFIG_HASH
local DECLARATIVE_DEFAULT_WORKSPACE_KEY = constants.DECLARATIVE_DEFAULT_WORKSPACE_KEY
local CLUSTERING_SYNC_STATUS = constants.CLUSTERING_SYNC_STATUS
local SYNC_MUTEX_OPTS = { name = "get_delta", timeout = 0, }
Expand All @@ -22,7 +23,7 @@ local MAX_RETRY = 5

local assert = assert
local ipairs = ipairs
local fmt = string.format
local sub = string.sub
local ngx_null = ngx.null
local ngx_log = ngx.log
local ngx_ERR = ngx.ERR
Expand Down Expand Up @@ -54,13 +55,29 @@ local function full_sync_result()
end


local function get_current_version()
return declarative.get_current_hash() or DECLARATIVE_EMPTY_CONFIG_HASH
end


local is_valid_version
do
local VER_PREFIX = "v02_"

-- version string must start with 'v02_'
is_valid_version = function(v)
return sub(v, 1, 4) == VER_PREFIX
end
end


function _M:init_cp(manager)
local purge_delay = manager.conf.cluster_data_plane_purge_delay

-- CP
-- Method: kong.sync.v2.get_delta
-- Params: versions: list of current versions of the database
-- example: { default = { version = 1000, }, }
-- example: { default = { version = "1000", }, }
manager.callbacks:register("kong.sync.v2.get_delta", function(node_id, current_versions)
ngx_log(ngx_DEBUG, "[kong.sync.v2] config push (connected client)")

Expand All @@ -75,7 +92,7 @@ function _M:init_cp(manager)
return nil, "default namespace does not exist inside params"
end

-- { default = { version = 1000, }, }
-- { default = { version = "1000", }, }
local default_namespace_version = default_namespace.version
local node_info = assert(kong.rpc:get_peer_info(node_id))

Expand All @@ -88,7 +105,7 @@ function _M:init_cp(manager)
labels = node_info.labels, -- get from rpc call
cert_details = node_info.cert_details, -- get from rpc call
sync_status = CLUSTERING_SYNC_STATUS.NORMAL,
config_hash = fmt("%032d", default_namespace_version),
config_hash = default_namespace_version,
rpc_capabilities = rpc_peers and rpc_peers[node_id] or {},
}, { ttl = purge_delay, no_broadcast_crud_event = true, })
if not ok then
Expand All @@ -100,7 +117,8 @@ function _M:init_cp(manager)
return nil, err
end

if default_namespace_version == 0 or
-- string comparison effectively does the same as number comparison
if not is_valid_version(default_namespace_version) or
default_namespace_version < latest_version then
return full_sync_result()
end
Expand All @@ -115,7 +133,7 @@ function _M:init_dp(manager)
-- DP
-- Method: kong.sync.v2.notify_new_version
-- Params: new_versions: list of namespaces and their new versions, like:
-- { default = { new_version = 1000, }, }
-- { default = { new_version = "1000", }, }
manager.callbacks:register("kong.sync.v2.notify_new_version", function(node_id, new_versions)
-- TODO: currently only default is supported, and anything else is ignored
local default_new_version = new_versions.default
Expand All @@ -128,7 +146,7 @@ function _M:init_dp(manager)
return nil, "'new_version' key does not exist"
end

local lmdb_ver = tonumber(declarative.get_current_hash()) or 0
local lmdb_ver = get_current_version()
if lmdb_ver < version then
-- set lastest version to shm
kong_shm:set(CLUSTERING_DATA_PLANES_LATEST_VERSION_KEY, version)
Expand Down Expand Up @@ -172,11 +190,7 @@ local function do_sync()
return nil, "rpc is not ready"
end

local msg = { default =
{ version =
tonumber(declarative.get_current_hash()) or 0,
},
}
local msg = { default = { version = get_current_version(), }, }

local ns_deltas, err = kong.rpc:call("control_plane", "kong.sync.v2.get_delta", msg)
if not ns_deltas then
Expand Down Expand Up @@ -226,13 +240,13 @@ local function do_sync()

local db = kong.db

local version = 0
local version = ""
local opts = {}
local crud_events = {}
local crud_events_n = 0

-- delta should look like:
-- { type = ..., entity = { ... }, version = 1, ws_id = ..., }
-- { type = ..., entity = { ... }, version = "1", ws_id = ..., }
for _, delta in ipairs(deltas) do
local delta_version = delta.version
local delta_type = delta.type
Expand Down Expand Up @@ -308,15 +322,15 @@ local function do_sync()
end

-- delta.version should not be nil or ngx.null
assert(type(delta_version) == "number")
assert(type(delta_version) == "string")

if delta_version ~= version then
version = delta_version
end
end -- for _, delta

-- store current sync version
t:set(DECLARATIVE_HASH_KEY, fmt("%032d", version))
t:set(DECLARATIVE_HASH_KEY, version)

-- store the correct default workspace uuid
if default_ws_changed then
Expand Down Expand Up @@ -393,7 +407,7 @@ function sync_once_impl(premature, retry_count)
return
end

local current_version = tonumber(declarative.get_current_hash()) or 0
local current_version = get_current_version()
if current_version >= latest_notified_version then
ngx_log(ngx_DEBUG, "version already updated")
return
Expand Down
9 changes: 7 additions & 2 deletions kong/clustering/services/sync/strategies/postgres.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,14 @@ local _M = {}
local _MT = { __index = _M }


local fmt = string.format
local ngx_null = ngx.null


-- version string should look like: "v02_0000"
local VERSION_FMT = "v02_%028x"


function _M.new(db)
local self = {
connector = db.connector,
Expand Down Expand Up @@ -44,10 +49,10 @@ function _M:get_latest_version()

local ver = res[1] and res[1].max
if ver == ngx_null then
return 0
return fmt(VERSION_FMT, 0)
end

return ver
return fmt(VERSION_FMT, ver)
end


Expand Down
15 changes: 11 additions & 4 deletions spec/02-integration/18-hybrid_rpc/05-sync-rpc_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@ local setup = require("spec.helpers.rpc_mock.setup")
local get_node_id = misc.get_node_id
local DP_PREFIX = "servroot_dp"


-- now version must be a string
local function fmt(v)
return string.format("v02_%028x", v)
end


local function change_config()
-- the initial sync is flaky. let's trigger a sync by creating a service
local admin_client = helpers.admin_client()
Expand Down Expand Up @@ -74,12 +81,12 @@ describe("kong.sync.v2", function()
local called = false
mocked_cp:mock("kong.sync.v2.get_delta", function(node_id, payload)
called = true
return { default = { version = 100, deltas = {} } }
return { default = { version = fmt(100), deltas = {} } }
end)

-- make a call from the mocked cp
-- CP->DP: notify_new_version
assert(mocked_cp:call(node_id, "kong.sync.v2.notify_new_version", { default = { new_version = 100, } }))
assert(mocked_cp:call(node_id, "kong.sync.v2.notify_new_version", { default = { new_version = fmt(100), } }))

-- DP->CP: get_delta
-- the dp after receiving the notification will make a call to the cp
Expand Down Expand Up @@ -120,7 +127,7 @@ describe("kong.sync.v2", function()
-- this is a workaround to registers the data plane node
-- CP does not register the DP node until it receives a call from the DP
function register_dp()
local res, err = mocked_dp:call("control_plane", "kong.sync.v2.get_delta", { default = { version = 0,},})
local res, err = mocked_dp:call("control_plane", "kong.sync.v2.get_delta", { default = { version = fmt(0),},})
assert.is_nil(err)
assert.is_table(res and res.default and res.default.deltas)
end
Expand All @@ -132,7 +139,7 @@ describe("kong.sync.v2", function()
end)

it("rpc call", function()
local res, err = mocked_dp:call("control_plane", "kong.sync.v2.get_delta", { default = { version = 0,},})
local res, err = mocked_dp:call("control_plane", "kong.sync.v2.get_delta", { default = { version = fmt(0),},})
assert.is_nil(err)
assert.is_table(res and res.default and res.default.deltas)

Expand Down

1 comment on commit 4ee00b3

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bazel Build

Docker image available kong/kong:4ee00b3c5eae00c6fd500c2bc6ce8ad2554f39da
Artifacts available https://github.com/Kong/kong/actions/runs/12730198491

Please sign in to comment.