Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add FetchAvailability result #519

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions cmd/examples/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -770,10 +770,10 @@ class MyServer : public quicr::Server
*
* @returns true if the range of groups and objects exist in the cache, otherwise returns false.
*/
bool FetchReceived([[maybe_unused]] quicr::ConnectionHandle connection_handle,
[[maybe_unused]] uint64_t subscribe_id,
const quicr::FullTrackName& track_full_name,
const quicr::FetchAttributes& attrs) override
std::optional<FetchAvailability> FetchReceived([[maybe_unused]] quicr::ConnectionHandle connection_handle,
[[maybe_unused]] uint64_t subscribe_id,
const quicr::FullTrackName& track_full_name,
const quicr::FetchAttributes& attrs) override
{
SPDLOG_INFO("Received Fetch for conn_id: {} subscribe_id: {} start_group: {} end_group: {}",
connection_handle,
Expand All @@ -786,7 +786,7 @@ class MyServer : public quicr::Server
auto cache_entry_it = qserver_vars::cache.find(th.track_fullname_hash);
if (cache_entry_it == qserver_vars::cache.end()) {
SPDLOG_WARN("No cache entry for the hash {}", th.track_fullname_hash);
return false;
return std::nullopt;
}

auto& [_, cache_entry] = *cache_entry_it;
Expand All @@ -795,13 +795,21 @@ class MyServer : public quicr::Server

if (groups.empty()) {
SPDLOG_WARN("No groups found for requested range");
return false;
return std::nullopt;
}

return std::any_of(groups.begin(), groups.end(), [&](const auto& group) {
const bool available = std::any_of(groups.begin(), groups.end(), [&](const auto& group) {
return !group->empty() && group->begin()->headers.object_id <= attrs.start_object &&
std::prev(group->end())->headers.object_id >= (attrs.end_object - 1);
});
if (!available) {
SPDLOG_WARN("No objects found for requested range");
return std::nullopt;
}
const auto last_cached = std::prev(groups.back()->end())->headers;
return FetchAvailability{ .end_of_track = false,
.largest_group = last_cached.group_id,
.largest_object = last_cached.object_id };
}

/**
Expand Down
18 changes: 13 additions & 5 deletions include/quicr/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ namespace quicr {
std::optional<Bytes> reason_phrase;
};

// Availability of the track.
struct FetchAvailability
{
bool end_of_track;
messages::GroupId largest_group;
messages::ObjectId largest_object;
};

/**
* @brief MoQ Server constructor to create the MOQ server mode instance
*
Expand Down Expand Up @@ -282,12 +290,12 @@ namespace quicr {
* @param track_full_name Track full name
* @param attributes Fetch attributes received.
*
* @returns true if user defined conditions of Fetch are satisfied, false otherwise.
* @returns Availability for FETCH_OK, if possible. If the fetch cannot be served, return std::nullopt.
*/
virtual bool FetchReceived(ConnectionHandle connection_handle,
uint64_t subscribe_id,
const FullTrackName& track_full_name,
const FetchAttributes& attributes);
virtual std::optional<FetchAvailability> FetchReceived(ConnectionHandle connection_handle,
uint64_t subscribe_id,
const FullTrackName& track_full_name,
const FetchAttributes& attributes);

/**
* @brief Event to run on sending FetchOk.
Expand Down
17 changes: 13 additions & 4 deletions src/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,12 @@ namespace quicr {
{
}

bool Server::FetchReceived(ConnectionHandle, uint64_t, const FullTrackName&, const FetchAttributes&)
std::optional<Server::FetchAvailability> Server::FetchReceived(ConnectionHandle,
uint64_t,
const FullTrackName&,
const FetchAttributes&)
{
return false;
return std::nullopt;
}

void Server::OnFetchOk(ConnectionHandle, uint64_t, const FullTrackName&, const FetchAttributes&) {}
Expand Down Expand Up @@ -548,7 +551,8 @@ namespace quicr {
.end_object = msg.end_object,
};

if (!FetchReceived(conn_ctx.connection_handle, msg.subscribe_id, tfn, attrs)) {
const auto available = FetchReceived(conn_ctx.connection_handle, msg.subscribe_id, tfn, attrs);
if (!available) {
SendFetchError(
conn_ctx, msg.subscribe_id, messages::FetchErrorCode::kTrackDoesNotExist, "Track does not exist");

Expand All @@ -562,7 +566,12 @@ namespace quicr {
conn_ctx.current_subscribe_id = msg.subscribe_id + 1;
}

SendFetchOk(conn_ctx, msg.subscribe_id, msg.group_order, false, 0, 0);
SendFetchOk(conn_ctx,
msg.subscribe_id,
msg.group_order,
available->end_of_track,
available->largest_group,
available->largest_object);
OnFetchOk(conn_ctx.connection_handle, msg.subscribe_id, tfn, attrs);

return true;
Expand Down