Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(clustering/rpc): rpc batching on concentrator #14055

Merged
merged 12 commits into from
Jan 20, 2025
171 changes: 130 additions & 41 deletions kong/clustering/rpc/concentrator.lua
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@ local queue = require("kong.clustering.rpc.queue")
local cjson = require("cjson")
local jsonrpc = require("kong.clustering.rpc.json_rpc_v2")
local rpc_utils = require("kong.clustering.rpc.utils")
local isarray = require("table.isarray")
local isempty = require("table.isempty")
local tb_insert = table.insert


local type = type
local setmetatable = setmetatable
local tostring = tostring
local pcall = pcall
Expand All @@ -22,6 +26,7 @@ local ngx_log = ngx.log
local ngx_ERR = ngx.ERR
local ngx_WARN = ngx.WARN
local ngx_DEBUG = ngx.DEBUG
local new_error = jsonrpc.new_error


local RESP_CHANNEL_PREFIX = "rpc:resp:" -- format: rpc:resp:<worker_uuid>
Expand Down Expand Up @@ -90,6 +95,89 @@ local function enqueue_notifications(notifications, notifications_queue)
end


function _M:process_one_response(payload)
assert(payload.jsonrpc == jsonrpc.VERSION)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to handle errors here. It could be an error of the opponent thus we should not fail in this case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now this PR just refactored and moved the original code, kept the logic, we could change it later.

local payload_id = payload.id

-- may be some error message for peer
if not payload_id then
if payload.error then
ngx_log(ngx_ERR, "[rpc] RPC failed, code: ",
payload.error.code, ", err: ",
payload.error.message)
end
return
end

-- response
local cb = self.interest[payload_id]
self.interest[payload_id] = nil -- edge trigger only once

if not cb then
ngx_log(ngx_WARN, "[rpc] no interest for concentrator response id: ", payload_id, ", dropping it")
return
end

local res, err = cb(payload)
if not res then
ngx_log(ngx_WARN, "[rpc] concentrator response interest handler failed: id: ",
payload_id, ", err: ", err)
end
end


function _M:process_one_request(target_id, reply_to, payload, collection)
if type(payload) ~= "table" then
local res, err = self:_enqueue_rpc_response(
reply_to,
new_error(nil, jsonrpc.INVALID_REQUEST, "not an valid object"),
collection)
if not res then
ngx_log(ngx_WARN, "[rpc] unable to enqueue RPC error: ", err)
end

return
end

local payload_id = payload.id
StarlightIbuki marked this conversation as resolved.
Show resolved Hide resolved

local res, err = self.manager:_local_call(target_id, payload.method,
payload.params, not payload_id)

-- notification has no callback or id
if not payload_id then
ngx_log(ngx_DEBUG, "[rpc] notification has no response")
return
end

if res then
-- call success
res, err = self:_enqueue_rpc_response(reply_to, {
jsonrpc = jsonrpc.VERSION,
id = payload_id,
result = res,
}, collection)
if not res then
ngx_log(ngx_WARN, "[rpc] unable to enqueue RPC call result: ", err)
end

else
-- call failure
res, err = self:_enqueue_rpc_response(reply_to, {
jsonrpc = jsonrpc.VERSION,
id = payload_id,
error = {
code = jsonrpc.SERVER_ERROR,
message = tostring(err),
}
}, collection)
if not res then
ngx_log(ngx_WARN, "[rpc] unable to enqueue RPC error: ", err)
end
end
end


function _M:_event_loop(lconn)
local notifications_queue = queue.new(4096)
local rpc_resp_channel_name = RESP_CHANNEL_PREFIX .. self.worker_id
Expand All @@ -116,21 +204,16 @@ function _M:_event_loop(lconn)
if n.channel == rpc_resp_channel_name then
-- an response for a previous RPC call we asked for
local payload = cjson_decode(n.payload)
assert(payload.jsonrpc == jsonrpc.VERSION)

-- response
local cb = self.interest[payload.id]
self.interest[payload.id] = nil -- edge trigger only once

if cb then
local res, err = cb(payload)
if not res then
ngx_log(ngx_WARN, "[rpc] concentrator response interest handler failed: id: ",
payload.id, ", err: ", err)
end
if not isarray(payload) then
StarlightIbuki marked this conversation as resolved.
Show resolved Hide resolved
-- one rpc response
self:process_one_response(payload)

else
ngx_log(ngx_WARN, "[rpc] no interest for concentrator response id: ", payload.id, ", dropping it")
-- batch rpc response
for _, v in ipairs(payload) do
self:process_one_response(v)
end
end

else
Expand All @@ -153,45 +236,44 @@ function _M:_event_loop(lconn)
local reply_to = assert(call.reply_to,
"unknown requester for RPC")

local res, err = self.manager:_local_call(target_id, payload.method,
payload.params, not payload.id)
if not isarray(payload) then
-- one rpc call
self:process_one_request(target_id, reply_to, payload)

-- notification has no callback or id
if not payload.id then
ngx_log(ngx_DEBUG, "[rpc] notification has no response")
goto continue
end

if res then
-- call success
res, err = self:_enqueue_rpc_response(reply_to, {
jsonrpc = jsonrpc.VERSION,
id = payload.id,
result = res,
})
-- rpc call with an empty Array
if isempty(payload) then
local res, err = self:_enqueue_rpc_response(
reply_to,
new_error(nil, jsonrpc.INVALID_REQUEST, "empty batch array"))
if not res then
ngx_log(ngx_WARN, "[rpc] unable to enqueue RPC call result: ", err)
ngx_log(ngx_WARN, "[rpc] unable to enqueue RPC error: ", err)
end

else
-- call failure
res, err = self:_enqueue_rpc_response(reply_to, {
jsonrpc = jsonrpc.VERSION,
id = payload.id,
error = {
code = jsonrpc.SERVER_ERROR,
message = tostring(err),
}
})
goto continue
end

-- batching rpc call

local collection = {}

for _, v in ipairs(payload) do
self:process_one_request(target_id, reply_to, v, collection)
end

if not isempty(collection) then
local res, err = self:_enqueue_rpc_response(reply_to, collection)
if not res then
ngx_log(ngx_WARN, "[rpc] unable to enqueue RPC error: ", err)
ngx_log(ngx_WARN, "[rpc] unable to enqueue RPC call result: ", err)
end
end

::continue::
end
end
end
end -- for _, call
end -- if n.channel == rpc_resp_channel_name
end -- while true

local res, err = lconn:wait_for_notification()
if not res then
Expand All @@ -217,7 +299,7 @@ function _M:_event_loop(lconn)
else
notifications_queue:push(res)
end
end
end -- while not exiting()
end


Expand Down Expand Up @@ -270,7 +352,14 @@ end


-- enqueue a RPC response from CP worker with ID worker_id
function _M:_enqueue_rpc_response(worker_id, payload)
-- collection is only for rpc batch call.
-- if collection is nil, it means the rpc is a single call.
function _M:_enqueue_rpc_response(worker_id, payload, collection)
ADD-SP marked this conversation as resolved.
Show resolved Hide resolved
if collection then
tb_insert(collection, payload)
return
end

local sql = string_format("SELECT pg_notify(%s, %s);",
self.db.connector:escape_literal(RESP_CHANNEL_PREFIX .. worker_id),
self.db.connector:escape_literal(cjson_encode(payload)))
Expand Down
Loading