diff --git a/kong/clustering/services/sync/rpc.lua b/kong/clustering/services/sync/rpc.lua index c84a10ecd76..cd1db7fe75c 100644 --- a/kong/clustering/services/sync/rpc.lua +++ b/kong/clustering/services/sync/rpc.lua @@ -14,6 +14,7 @@ local insert_entity_for_txn = declarative.insert_entity_for_txn local delete_entity_for_txn = declarative.delete_entity_for_txn local DECLARATIVE_HASH_KEY = constants.DECLARATIVE_HASH_KEY local CLUSTERING_DATA_PLANES_LATEST_VERSION_KEY = constants.CLUSTERING_DATA_PLANES_LATEST_VERSION_KEY +local DECLARATIVE_EMPTY_CONFIG_HASH = constants.DECLARATIVE_EMPTY_CONFIG_HASH local DECLARATIVE_DEFAULT_WORKSPACE_KEY = constants.DECLARATIVE_DEFAULT_WORKSPACE_KEY local CLUSTERING_SYNC_STATUS = constants.CLUSTERING_SYNC_STATUS local SYNC_MUTEX_OPTS = { name = "get_delta", timeout = 0, } @@ -22,7 +23,7 @@ local MAX_RETRY = 5 local assert = assert local ipairs = ipairs -local fmt = string.format +local sub = string.sub local ngx_null = ngx.null local ngx_log = ngx.log local ngx_ERR = ngx.ERR @@ -54,13 +55,29 @@ local function full_sync_result() end +local function get_current_version() + return declarative.get_current_hash() or DECLARATIVE_EMPTY_CONFIG_HASH +end + + +local is_valid_version +do + local VER_PREFIX = "v02_" + + -- version string must start with 'v02_' + is_valid_version = function(v) + return sub(v, 1, 4) == VER_PREFIX + end +end + + function _M:init_cp(manager) local purge_delay = manager.conf.cluster_data_plane_purge_delay -- CP -- Method: kong.sync.v2.get_delta -- Params: versions: list of current versions of the database - -- example: { default = { version = 1000, }, } + -- example: { default = { version = "1000", }, } manager.callbacks:register("kong.sync.v2.get_delta", function(node_id, current_versions) ngx_log(ngx_DEBUG, "[kong.sync.v2] config push (connected client)") @@ -75,7 +92,7 @@ function _M:init_cp(manager) return nil, "default namespace does not exist inside params" end - -- { default = { version = 1000, }, } + -- { default = { version = "1000", }, } local default_namespace_version = default_namespace.version local node_info = assert(kong.rpc:get_peer_info(node_id)) @@ -88,7 +105,7 @@ function _M:init_cp(manager) labels = node_info.labels, -- get from rpc call cert_details = node_info.cert_details, -- get from rpc call sync_status = CLUSTERING_SYNC_STATUS.NORMAL, - config_hash = fmt("%032d", default_namespace_version), + config_hash = default_namespace_version, rpc_capabilities = rpc_peers and rpc_peers[node_id] or {}, }, { ttl = purge_delay, no_broadcast_crud_event = true, }) if not ok then @@ -100,7 +117,8 @@ function _M:init_cp(manager) return nil, err end - if default_namespace_version == 0 or + -- string comparison effectively does the same as number comparison + if not is_valid_version(default_namespace_version) or default_namespace_version < latest_version then return full_sync_result() end @@ -115,7 +133,7 @@ function _M:init_dp(manager) -- DP -- Method: kong.sync.v2.notify_new_version -- Params: new_versions: list of namespaces and their new versions, like: - -- { default = { new_version = 1000, }, } + -- { default = { new_version = "1000", }, } manager.callbacks:register("kong.sync.v2.notify_new_version", function(node_id, new_versions) -- TODO: currently only default is supported, and anything else is ignored local default_new_version = new_versions.default @@ -128,7 +146,7 @@ function _M:init_dp(manager) return nil, "'new_version' key does not exist" end - local lmdb_ver = tonumber(declarative.get_current_hash()) or 0 + local lmdb_ver = get_current_version() if lmdb_ver < version then -- set lastest version to shm kong_shm:set(CLUSTERING_DATA_PLANES_LATEST_VERSION_KEY, version) @@ -172,11 +190,7 @@ local function do_sync() return nil, "rpc is not ready" end - local msg = { default = - { version = - tonumber(declarative.get_current_hash()) or 0, - }, - } + local msg = { default = { version = get_current_version(), }, } local ns_deltas, err = kong.rpc:call("control_plane", "kong.sync.v2.get_delta", msg) if not ns_deltas then @@ -226,13 +240,13 @@ local function do_sync() local db = kong.db - local version = 0 + local version = "" local opts = {} local crud_events = {} local crud_events_n = 0 -- delta should look like: - -- { type = ..., entity = { ... }, version = 1, ws_id = ..., } + -- { type = ..., entity = { ... }, version = "1", ws_id = ..., } for _, delta in ipairs(deltas) do local delta_version = delta.version local delta_type = delta.type @@ -308,7 +322,7 @@ local function do_sync() end -- delta.version should not be nil or ngx.null - assert(type(delta_version) == "number") + assert(type(delta_version) == "string") if delta_version ~= version then version = delta_version @@ -316,7 +330,7 @@ local function do_sync() end -- for _, delta -- store current sync version - t:set(DECLARATIVE_HASH_KEY, fmt("%032d", version)) + t:set(DECLARATIVE_HASH_KEY, version) -- store the correct default workspace uuid if default_ws_changed then @@ -391,7 +405,7 @@ function sync_once_impl(premature, retry_count) return end - local current_version = tonumber(declarative.get_current_hash()) or 0 + local current_version = get_current_version() if current_version >= latest_notified_version then ngx_log(ngx_DEBUG, "version already updated") return diff --git a/kong/clustering/services/sync/strategies/postgres.lua b/kong/clustering/services/sync/strategies/postgres.lua index 7bc0784c6e6..20e594d92f3 100644 --- a/kong/clustering/services/sync/strategies/postgres.lua +++ b/kong/clustering/services/sync/strategies/postgres.lua @@ -2,9 +2,14 @@ local _M = {} local _MT = { __index = _M } +local fmt = string.format local ngx_null = ngx.null +-- version string should look like: "v02_0000" +local VERSION_FMT = "v02_%028x" + + function _M.new(db) local self = { connector = db.connector, @@ -44,10 +49,10 @@ function _M:get_latest_version() local ver = res[1] and res[1].max if ver == ngx_null then - return 0 + return fmt(VERSION_FMT, 0) end - return ver + return fmt(VERSION_FMT, ver) end diff --git a/spec/02-integration/18-hybrid_rpc/05-sync-rpc_spec.lua b/spec/02-integration/18-hybrid_rpc/05-sync-rpc_spec.lua index 26aa9b4b5b1..c9559c6601d 100644 --- a/spec/02-integration/18-hybrid_rpc/05-sync-rpc_spec.lua +++ b/spec/02-integration/18-hybrid_rpc/05-sync-rpc_spec.lua @@ -6,6 +6,13 @@ local setup = require("spec.helpers.rpc_mock.setup") local get_node_id = misc.get_node_id local DP_PREFIX = "servroot_dp" + +-- now version must be a string +local function fmt(v) + return string.format("v02_%028x", v) +end + + local function change_config() -- the initial sync is flaky. let's trigger a sync by creating a service local admin_client = helpers.admin_client() @@ -74,12 +81,12 @@ describe("kong.sync.v2", function() local called = false mocked_cp:mock("kong.sync.v2.get_delta", function(node_id, payload) called = true - return { default = { version = 100, deltas = {} } } + return { default = { version = fmt(100), deltas = {} } } end) -- make a call from the mocked cp -- CP->DP: notify_new_version - assert(mocked_cp:call(node_id, "kong.sync.v2.notify_new_version", { default = { new_version = 100, } })) + assert(mocked_cp:call(node_id, "kong.sync.v2.notify_new_version", { default = { new_version = fmt(100), } })) -- DP->CP: get_delta -- the dp after receiving the notification will make a call to the cp @@ -120,7 +127,7 @@ describe("kong.sync.v2", function() -- this is a workaround to registers the data plane node -- CP does not register the DP node until it receives a call from the DP function register_dp() - local res, err = mocked_dp:call("control_plane", "kong.sync.v2.get_delta", { default = { version = 0,},}) + local res, err = mocked_dp:call("control_plane", "kong.sync.v2.get_delta", { default = { version = fmt(0),},}) assert.is_nil(err) assert.is_table(res and res.default and res.default.deltas) end @@ -132,7 +139,7 @@ describe("kong.sync.v2", function() end) it("rpc call", function() - local res, err = mocked_dp:call("control_plane", "kong.sync.v2.get_delta", { default = { version = 0,},}) + local res, err = mocked_dp:call("control_plane", "kong.sync.v2.get_delta", { default = { version = fmt(0),},}) assert.is_nil(err) assert.is_table(res and res.default and res.default.deltas)