Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions src/apps/tcp/server.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
-- Use of this source code is governed by the Apache 2.0 license; see COPYING.

-- Simple TCP echo service.

module(..., package.seeall)

local lib = require("core.lib")
local packet = require("core.packet")
local link = require("core.link")
local tcp = require("lib.tcp.tcp")
local proto = require("lib.tcp.proto")
local scheduler = require("lib.fiber.scheduler")
local fiber = require("lib.fiber.fiber")

Server = {}
local config_params = {
-- Address or list of addresses to which to bind, as strings of the
-- format ADDR:PORT, where ADDR is either an IPv4 or IPv6 address.
bind = { required=true },
}

local function parse_ipv4_address(str)
local head, tail = str:match("^([%d.]*):([1-9][0-9]*)$")
if not head then return end
local parsed = ipv4:pton(head)
if not parsed then return end
return { type='ipv4', addr=parsed, port=tonumber(tail) }
end

local function parse_ipv6_address(str)
local head, tail = str:match("^([%x:]*):([1-9][0-9]*)$")
if not head then return end
local parsed = ipv6:pton(head)
if not parsed then return end
return { type='ipv6', addr=parsed, port=tonumber(tail) }
end

function Server:new(conf)
conf = lib.parse(conf, config_params)

local o = setmetatable({}, {__index = Server})
o.tcp = tcp.new()

if type(conf.bind) == 'string' then o:bind(conf.bind)
else for _,str in ipairs(conf.bind) do o:bind(str) end end

return o
end

local function echo(fam, sock)
while fibers.wait_readable(sock) do
fibers.write(sock, sock:peek())
end
end

-- Override me!
Server.accept_fn = echo

function Server:bind(addr_and_port)
local addr, port = parse_ipv4_address(addr_and_port)
local function accept_ipv4(sock) fiber.spawn(self.accept_fn, 'ipv4', sock) end
if addr then self.tcp:listen_ipv4(addr, port, accept); return end
local addr, port = parse_ipv6_address(addr_and_port)
local function accept_ipv6(sock) fiber.spawn(self.accept_fn, 'ipv6', sock) end
if addr then self.tcp:listen_ipv6(addr, port, accept); return end
error('Invalid bind address for server, expected ADDR:PORT: '..tostring(str))
end

function Server:push()
local now = engine.now()
self.tcp:advance_clock(now)

for _ = 1, link.nreadable(self.input.input) do
local p = link.receive(self.input.input)
if proto.is_ipv4(p) then
local ip, tcp, payload_length = proto.parse_ipv4_tcp(p)
if ip then self.tcp:handle_ipv4(ip, tcp, payload_length) end
elseif proto.is_ipv6(p) then
local ip, tcp, payload_length = proto.parse_ipv6_tcp(p)
if ip then self.tcp:handle_ipv6(ip, tcp, payload_length) end
end
packet.free(pkt)
end

self.scheduler:advance_clock(now)
self.scheduler:run_tasks()
end
15 changes: 9 additions & 6 deletions src/lib/README.ctable.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,15 @@ Add an entry to the ctable, returning the index of the added entry.
*updates_allowed* is an optional parameter. If not present or false,
then the `:insert` method will raise an error if the *key* is already
present in the table. If *updates_allowed* is the string `"required"`,
then an error will be raised if *key* is *not* already in the table.
Any other true value allows updates but does not require them. An
update will replace the existing entry in the table.

Returns a pointer to the inserted entry. Any subsequent modification
to the table may invalidate this pointer.
then an error will be raised if *key* is *not* already in the table. If
*updates_allowed* is the string `"preserve"`, then no error will be
raised if a key is already present in the table, but in that case the
corresponding value already in the table won't be updated either. Any
other true value allows updates but does not require them. An update
will replace the existing entry in the table.

Returns a pointer to the inserted or existing entry. Any subsequent
modification to the table may invalidate this pointer.

— Method **:update** *key*, *value*

Expand Down
6 changes: 4 additions & 2 deletions src/lib/ctable.lua
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,10 @@ function CTable:add(key, value, updates_allowed)
local entry = entries + index
if self.equal_fn(key, entry.key) then
assert(updates_allowed, "key is already present in ctable")
entry.key = key
entry.value = value
if updates_allowed ~= 'preserve' then
entry.key = key
entry.value = value
end
return entry
end
index = index + 1
Expand Down
157 changes: 157 additions & 0 deletions src/lib/tcp/buffer.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
-- Use of this source code is governed by the Apache 2.0 license; see COPYING.

-- Ring buffer for bytes

module(...,package.seeall)

local lib = require("core.lib")
local ffi = require("ffi")
local bit = require("bit")

local band = bit.band

local buffer_t = ffi.typeof[[
struct {
uint32_t read_idx, write_idx;
uint32_t size;
uint8_t buf[?];
} __attribute__((packed))
]]

local function to_uint32(n)
return ffi.new('uint32_t[1]', n)[0]
end

function new(size)
local ret = buffer_t(size)
ret:init(size)
return ret
end

local buffer = {}
buffer.__index = buffer

function buffer:init(size)
assert(size ~= 0 and band(size, size - 1) == 0, "size not power of two")
self.size = size
return self
end

function buffer:is_empty()
return self.write_idx == self.read_idx
end
function buffer:read_avail()
return to_uint32(self.write_idx - self.read_idx)
end
function buffer:is_full()
return self:read_avail() == self.size
end
function buffer:write_avail()
return self.size - self:read_avail()
end

function buffer:write_pos()
return band(self.write_idx, self.size - 1)
end
function buffer:rewrite_pos(offset)
return band(self.read_idx + offset, self.size - 1)
end
function buffer:read_pos()
return band(self.read_idx, self.size - 1)
end

function buffer:advance_write(count)
self.write_idx = self.write_idx + count
end
function buffer:advance_read(count)
self.read_idx = self.read_idx + count
end

function buffer:write(bytes, count)
if count > self:write_avail() then error('write xrun') end
local pos = self:write_pos()
local count1 = math.min(self.size - pos, count)
ffi.copy(self.buf + pos, bytes, count1)
ffi.copy(self.buf, bytes + count1, count - count1)
self:advance_write(count)
end

function buffer:rewrite(offset, bytes, count)
if offset + count > self:read_avail() then error('rewrite xrun') end
local pos = self:rewrite_pos(offset)
local count1 = math.min(self.size - pos, count)
ffi.copy(self.buf + pos, bytes, count1)
ffi.copy(self.buf, bytes + count1, count - count1)
end

function buffer:read(bytes, count)
if count > self:read_avail() then error('read xrun') end
local pos = self:read_pos()
local count1 = math.min(self.size - pos, count)
ffi.copy(bytes, self.buf + pos, count1)
ffi.copy(bytes + count1, self.buf, count - count1)
self:advance_read(count)
end

function buffer:drop()
if count > self:read_avail() then error('read xrun') end
self:advance_read(count)
end

function buffer:peek()
local pos = self:read_pos()
return self.buf + pos, math.min(self:read_avail(), self.size - pos)
end

buffer_t = ffi.metatype(buffer_t, buffer)

function selftest()
print('selftest: lib.buffer')
local function assert_throws(f, ...)
local success, ret = pcall(f, ...)
assert(not success, "expected failure but got "..tostring(ret))
end
local function assert_avail(b, readable, writable)
assert(b:read_avail() == readable)
assert(b:write_avail() == writable)
end
local function write_str(b, str)
local scratch = ffi.new('uint8_t[?]', #str)
ffi.copy(scratch, str, #str)
b:write(scratch, #str)
end
local function read_str(b, count)
local scratch = ffi.new('uint8_t[?]', count)
b:read(scratch, count)
return ffi.string(scratch, count)
end

assert_throws(new, 10)
local b = new(16)
assert_avail(b, 0, 16)
for i = 1,10 do
local s = '0123456789'
write_str(b, s)
assert_avail(b, #s, 16-#s)
assert(read_str(b, #s) == s)
assert_avail(b, 0, 16)
end

local ptr, avail = b:peek()
assert(avail == 0)
write_str(b, "foo")
local ptr, avail = b:peek()
assert(avail > 0)

-- Test wrap of indices.
local s = "overflow"
b.read_idx = to_uint32(3 - #s)
b.write_idx = b.read_idx
assert_avail(b, 0, 16)
write_str(b, s)
assert_avail(b, #s, 16-#s)
assert(read_str(b, #s) == s)
assert_avail(b, 0, 16)

print('selftest: ok')
end
Loading