Skip to content
38 changes: 31 additions & 7 deletions cmd/examples/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,9 @@ class MySubscribeTrackHandler : public quicr::SubscribeTrackHandler
public:
MySubscribeTrackHandler(const quicr::FullTrackName& full_track_name,
quicr::messages::FilterType filter_type,
const std::optional<JoiningFetch>& joining_fetch,
const std::filesystem::path& dir = qclient_consts::kMoqDataDir)
: SubscribeTrackHandler(full_track_name, 3, quicr::messages::GroupOrder::kAscending, filter_type)
: SubscribeTrackHandler(full_track_name, 3, quicr::messages::GroupOrder::kAscending, filter_type, joining_fetch)
{
if (qclient_vars::record) {
std::filesystem::create_directory(dir);
Expand Down Expand Up @@ -157,6 +158,7 @@ class MySubscribeTrackHandler : public quicr::SubscribeTrackHandler
SPDLOG_INFO("Track alias: {0} is ready to read", track_alias.value());
}
} break;

default:
break;
}
Expand Down Expand Up @@ -286,10 +288,11 @@ class MyFetchTrackHandler : public quicr::FetchTrackHandler
new MyFetchTrackHandler(full_track_name, start_group, end_group, start_object, end_object));
}

void ObjectReceived(const quicr::ObjectHeaders&, quicr::BytesSpan data) override
void ObjectReceived(const quicr::ObjectHeaders& headers, quicr::BytesSpan data) override
{
std::string msg(data.begin(), data.end());
SPDLOG_INFO("Received fetched object: {0}", msg);
SPDLOG_INFO(
"Received fetched object group_id: {} object_id: {} value: {}", headers.group_id, headers.object_id, msg);
}

void StatusChanged(Status status) override
Expand All @@ -300,6 +303,11 @@ class MyFetchTrackHandler : public quicr::FetchTrackHandler
SPDLOG_INFO("Track alias: {0} is ready to read", track_alias.value());
}
} break;

case Status::kError: {
SPDLOG_INFO("Fetch failed");
break;
}
default:
break;
}
Expand Down Expand Up @@ -534,9 +542,16 @@ void
DoSubscriber(const quicr::FullTrackName& full_track_name,
const std::shared_ptr<quicr::Client>& client,
quicr::messages::FilterType filter_type,
const bool& stop)
const bool& stop,
bool join_fetch)
{
auto track_handler = std::make_shared<MySubscribeTrackHandler>(full_track_name, filter_type);
typedef quicr::SubscribeTrackHandler::JoiningFetch Fetch;
const auto joining_fetch = join_fetch ? Fetch{ .group_order = quicr::messages::GroupOrder::kAscending,
.preceding_group_offset = 0,
.priority = 4,
.parameters = {} }
: std::optional<Fetch>(std::nullopt);
const auto track_handler = std::make_shared<MySubscribeTrackHandler>(full_track_name, filter_type, joining_fetch);

SPDLOG_INFO("Started subscriber");

Expand Down Expand Up @@ -598,6 +613,12 @@ DoFetch(const quicr::FullTrackName& full_track_name,
fetch_track = true;
}

if (track_handler->GetStatus() != quicr::FetchTrackHandler::Status::kOk) {
moq_example::terminate = true;
moq_example::cv.notify_all();
break;
}

std::this_thread::sleep_for(std::chrono::milliseconds(500));
}

Expand Down Expand Up @@ -726,7 +747,8 @@ main(int argc, char* argv[])
("track_alias", "Track alias to use", cxxopts::value<uint64_t>())
("new_group", "Requests a new group on subscribe")
("sub_announces", "Prefix namespace to subscribe announces to", cxxopts::value<std::string>())
("record", "Record incoming data to moq and dat files", cxxopts::value<bool>());
("record", "Record incoming data to moq and dat files", cxxopts::value<bool>())
("joining_fetch", "Subscribe with a joining fetch", cxxopts::value<bool>());

options.add_options("Fetcher")
("fetch_namespace", "Track namespace", cxxopts::value<std::string>())
Expand Down Expand Up @@ -805,12 +827,14 @@ main(int argc, char* argv[])
SPDLOG_INFO("Setting subscription filter to Latest Group");
}
}
bool joining_fetch = result.count("joining_fetch") && result["joining_fetch"].as<bool>();

const auto& sub_track_name = quicr::example::MakeFullTrackName(result["sub_namespace"].as<std::string>(),
result["sub_name"].as<std::string>(),
qclient_vars::track_alias);

sub_thread = std::thread(DoSubscriber, sub_track_name, client, filter_type, std::ref(stop_threads));
sub_thread =
std::thread(DoSubscriber, sub_track_name, client, filter_type, std::ref(stop_threads), joining_fetch);
}
if (enable_fetch) {
const auto& fetch_track_name =
Expand Down
140 changes: 59 additions & 81 deletions cmd/examples/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,9 @@ namespace qserver_vars {
std::map<quicr::TrackNamespace, std::set<quicr::ConnectionHandle>> subscribes_announces;

/**
* Cache of MoQ objects by track namespace hash
* Cache of MoQ objects by track alias
*/
std::map<quicr::TrackNamespaceHash, quicr::Cache<quicr::messages::GroupId, std::set<CacheObject>>> cache;
std::map<quicr::messages::TrackAlias, quicr::Cache<quicr::messages::GroupId, std::set<CacheObject>>> cache;

/**
* Tick Service used by the cache
Expand All @@ -128,7 +128,7 @@ namespace qserver_vars {
/**
* @brief Map of atomic bools to mark if a fetch thread should be interrupted.
*/
std::unordered_map<uint64_t, std::atomic_bool> stop_fetch;
std::map<std::pair<quicr::ConnectionHandle, quicr::messages::SubscribeId>, std::atomic_bool> stop_fetch;
}

/**
Expand Down Expand Up @@ -174,9 +174,9 @@ class MySubscribeTrackHandler : public quicr::SubscribeTrackHandler

// Cache Object
if (qserver_vars::cache.count(*track_alias) == 0) {
qserver_vars::cache.insert(std::make_pair(
*track_alias,
quicr::Cache<quicr::messages::GroupId, std::set<CacheObject>>{ 50000, 1, qserver_vars::tick_service }));
qserver_vars::cache.insert(std::make_pair(*track_alias,
quicr::Cache<quicr::messages::GroupId, std::set<CacheObject>>{
50000, 1000, qserver_vars::tick_service }));
}

auto& cache_entry = qserver_vars::cache.at(*track_alias);
Expand Down Expand Up @@ -760,62 +760,28 @@ class MyServer : public quicr::Server
}
}

/**
* @brief Checks the cache for the requested objects.
*
* @param connection_handle Source connection ID.
* @param subscribe_id Subscribe ID received.
* @param track_full_name Track full name
* @param attrs Fetch attributes received.
*
* @returns true if the range of groups and objects exist in the cache, otherwise returns false.
*/
bool FetchReceived([[maybe_unused]] quicr::ConnectionHandle connection_handle,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The thought here is that a fetch could be denied. We still need that for the future, but I suppose we can add it back then.

[[maybe_unused]] uint64_t subscribe_id,
const quicr::FullTrackName& track_full_name,
const quicr::FetchAttributes& attrs) override
LargestAvailable GetLargestAvailable(const quicr::FullTrackName& track_name) override
{
SPDLOG_INFO("Received Fetch for conn_id: {} subscribe_id: {} start_group: {} end_group: {}",
connection_handle,
subscribe_id,
attrs.start_group,
attrs.end_group);

const auto th = quicr::TrackHash(track_full_name);

auto cache_entry_it = qserver_vars::cache.find(th.track_fullname_hash);
if (cache_entry_it == qserver_vars::cache.end()) {
SPDLOG_WARN("No cache entry for the hash {}", th.track_fullname_hash);
return false;
// Get the largest object from the cache.
std::optional<uint64_t> largest_group_id = std::nullopt;
std::optional<uint64_t> largest_object_id = std::nullopt;
const auto& th = quicr::TrackHash(track_name);
const auto cache_entry_it = qserver_vars::cache.find(th.track_fullname_hash);
if (cache_entry_it != qserver_vars::cache.end()) {
auto& [_, cache] = *cache_entry_it;
if (const auto& latest_group = cache.Last(); latest_group && !latest_group->empty()) {
const auto& latest_object = std::prev(latest_group->end());
largest_group_id = latest_object->headers.group_id;
largest_object_id = latest_object->headers.object_id;
}
}

auto& [_, cache_entry] = *cache_entry_it;

const auto groups = cache_entry.Get(attrs.start_group, attrs.end_group + 1);

if (groups.empty()) {
SPDLOG_WARN("No groups found for requested range");
return false;
if (!largest_group_id.has_value() || !largest_object_id.has_value()) {
return std::nullopt;
}

return std::any_of(groups.begin(), groups.end(), [&](const auto& group) {
return !group->empty() && group->begin()->headers.object_id <= attrs.start_object &&
(!attrs.end_object.has_value() || std::prev(group->end())->headers.object_id >= *attrs.end_object);
});
return std::make_pair(*largest_group_id, *largest_object_id);
}

/**
* @brief Event run on sending FetchOk.
*
* @details Event run upon sending a FetchOk to a fetching client. Retrieves the requested objects from the cache
* and send them to the requesting client's fetch handler.
*
* @param connection_handle Source connection ID.
* @param subscribe_id Subscribe ID received.
* @param track_full_name Track full name
* @param attributes Fetch attributes received.
*/
void OnFetchOk(quicr::ConnectionHandle connection_handle,
bool OnFetchOk(quicr::ConnectionHandle connection_handle,
uint64_t subscribe_id,
const quicr::FullTrackName& track_full_name,
const quicr::FetchAttributes& attrs) override
Expand All @@ -826,40 +792,52 @@ class MyServer : public quicr::Server

const auto th = quicr::TrackHash(track_full_name);

qserver_vars::stop_fetch.emplace(std::make_pair(subscribe_id, false));
qserver_vars::stop_fetch.try_emplace({ connection_handle, subscribe_id }, false);

std::thread retrieve_cache_thread(
[=,
cache_entries = qserver_vars::cache.at(th.track_fullname_hash).Get(attrs.start_group, attrs.end_group + 1)] {
defer(UnbindFetchTrack(connection_handle, pub_fetch_h));
std::unique_lock lock(moq_example::main_mutex);
const auto cache_entries =
qserver_vars::cache.at(th.track_fullname_hash).Get(attrs.start_group, attrs.end_group + 1);
lock.unlock();

for (const auto& cache_entry : cache_entries) {
for (const auto& object : *cache_entry) {
if (qserver_vars::stop_fetch[subscribe_id]) {
qserver_vars::stop_fetch.erase(subscribe_id);
return;
}
if (cache_entries.empty())
return false;

std::thread retrieve_cache_thread([=, cache_entries = cache_entries] {
defer(UnbindFetchTrack(connection_handle, pub_fetch_h));

for (const auto& cache_entry : cache_entries) {
for (const auto& object : *cache_entry) {
if (qserver_vars::stop_fetch[{ connection_handle, subscribe_id }]) {
qserver_vars::stop_fetch.erase({ connection_handle, subscribe_id });
return;
}

if ((object.headers.group_id < attrs.start_group || object.headers.group_id > attrs.end_group) ||
(object.headers.object_id < attrs.start_object ||
(attrs.end_object.has_value() && object.headers.object_id > *attrs.end_object)))
continue;
/*
* Stop when reached end group and end object, unless end object is zero. End object of
* zero indicates all objects within end group
*/
if (attrs.end_object.has_value() && *attrs.end_object != 0 &&
object.headers.group_id == attrs.end_group && object.headers.object_id > *attrs.end_object)
break; // Done, reached end object within end group

SPDLOG_INFO("Fetching group: {} object: {}", object.headers.group_id, object.headers.object_id);
SPDLOG_DEBUG("Fetching group: {} object: {}", object.headers.group_id, object.headers.object_id);

pub_fetch_h->PublishObject(object.headers, object.data);
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
});
pub_fetch_h->PublishObject(object.headers, object.data);
// std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
});

retrieve_cache_thread.detach();
return true;
}

void FetchCancelReceived(quicr::ConnectionHandle, uint64_t subscribe_id) override
void FetchCancelReceived(quicr::ConnectionHandle connection_handle, uint64_t subscribe_id) override
{
SPDLOG_INFO("Canceling fetch for subscribe_id: {0}", subscribe_id);
qserver_vars::stop_fetch[subscribe_id] = true;
SPDLOG_INFO("Canceling fetch for connection handle: {} subscribe_id: {}", connection_handle, subscribe_id);

if (qserver_vars::stop_fetch.count({ connection_handle, subscribe_id }) == 0)
qserver_vars::stop_fetch[{ connection_handle, subscribe_id }] = true;
}

void NewGroupRequested(quicr::ConnectionHandle conn_id, uint64_t subscribe_id, uint64_t track_alias) override
Expand Down
7 changes: 3 additions & 4 deletions include/quicr/cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ namespace quicr {

std::vector<ValueType> Get(const K& start_key, const K& end_key)
{

if (!Contains(start_key, end_key)) {
return {};
}
Expand Down Expand Up @@ -141,7 +142,7 @@ namespace quicr {
inline void Advance()
{
const TickType new_ticks = tick_service_->Milliseconds();
const TickType delta = current_ticks_ ? new_ticks - current_ticks_ : 0;
const TickType delta = current_ticks_ ? (new_ticks - current_ticks_) / interval_ : 0;
current_ticks_ = new_ticks;

if (delta == 0) {
Expand All @@ -162,8 +163,6 @@ namespace quicr {
}

bucket_index_ = (bucket_index_ + delta) % total_buckets_;

return;
}

template<typename Value>
Expand All @@ -175,7 +174,7 @@ namespace quicr {
ttl = duration_;
}

ttl = ttl / interval_;
ttl /= interval_;

Advance();
const IndexType future_index = (bucket_index_ + ttl - 1) % total_buckets_;
Expand Down
33 changes: 33 additions & 0 deletions include/quicr/detail/joining_fetch_handler.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// SPDX-FileCopyrightText: Copyright (c) 2025 Cisco Systems
// SPDX-License-Identifier: BSD-2-Clause

#pragma once

#include <quicr/detail/messages.h>
#include <quicr/subscribe_track_handler.h>

namespace quicr {
/**
* JoiningFetchHandler is used internally in order to forward JOINING FETCH
* streams to their corresponding SUBSCRIBE track handler, for convenience.
*/
class JoiningFetchHandler : public SubscribeTrackHandler
{
public:
explicit JoiningFetchHandler(std::shared_ptr<SubscribeTrackHandler> joining_subscribe)
: SubscribeTrackHandler(joining_subscribe->GetFullTrackName(),
joining_subscribe->GetPriority(),
joining_subscribe->GetGroupOrder(),
joining_subscribe->GetFilterType())
, joining_subscribe_(std::move(joining_subscribe))
{
}
void StreamDataRecv(bool is_start,
uint64_t stream_id,
std::shared_ptr<const std::vector<uint8_t>> data) override;

private:
std::shared_ptr<SubscribeTrackHandler> joining_subscribe_;
};

} // namespace moq
9 changes: 8 additions & 1 deletion include/quicr/detail/messages.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,12 @@ namespace quicr::messages {

enum class FetchErrorCode : uint8_t
{
kTrackDoesNotExist = 0xF0 // Missing in draft
kInternalError = 0x0,
kUnauthorized = 0x1,
kTimeout = 0x2,
kNotSupported = 0x3,
kTrackDoesNotExist = 0x4,
kInvalidRange = 0x5,
};

// TODO (Suhas): rename it to StreamMapping
Expand Down Expand Up @@ -444,6 +449,8 @@ namespace quicr::messages {
kJoiningFetch,
};

// TODO(RichLogan): Consider a separate JoiningFetch structure.

struct Fetch
{
uint64_t subscribe_id;
Expand Down
Loading