Skip to content
21 changes: 16 additions & 5 deletions cmd/examples/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,9 @@ class MySubscribeTrackHandler : public quicr::SubscribeTrackHandler
public:
MySubscribeTrackHandler(const quicr::FullTrackName& full_track_name,
quicr::messages::FilterType filter_type,
const std::optional<JoiningFetch>& joining_fetch,
const std::filesystem::path& dir = qclient_consts::kMoqDataDir)
: SubscribeTrackHandler(full_track_name, 3, quicr::messages::GroupOrder::kAscending, filter_type)
: SubscribeTrackHandler(full_track_name, 3, quicr::messages::GroupOrder::kAscending, filter_type, joining_fetch)
{
if (qclient_vars::record) {
std::filesystem::create_directory(dir);
Expand Down Expand Up @@ -534,9 +535,16 @@ void
DoSubscriber(const quicr::FullTrackName& full_track_name,
const std::shared_ptr<quicr::Client>& client,
quicr::messages::FilterType filter_type,
const bool& stop)
const bool& stop,
bool join_fetch)
{
auto track_handler = std::make_shared<MySubscribeTrackHandler>(full_track_name, filter_type);
typedef quicr::SubscribeTrackHandler::JoiningFetch Fetch;
const auto joining_fetch = join_fetch ? Fetch{ .group_order = quicr::messages::GroupOrder::kAscending,
.preceding_group_offset = 0,
.priority = 4,
.parameters = {} }
: std::optional<Fetch>(std::nullopt);
const auto track_handler = std::make_shared<MySubscribeTrackHandler>(full_track_name, filter_type, joining_fetch);

SPDLOG_INFO("Started subscriber");

Expand Down Expand Up @@ -726,7 +734,8 @@ main(int argc, char* argv[])
("track_alias", "Track alias to use", cxxopts::value<uint64_t>())
("new_group", "Requests a new group on subscribe")
("sub_announces", "Prefix namespace to subscribe announces to", cxxopts::value<std::string>())
("record", "Record incoming data to moq and dat files", cxxopts::value<bool>());
("record", "Record incoming data to moq and dat files", cxxopts::value<bool>())
("joining_fetch", "Subscribe with a joining fetch", cxxopts::value<bool>());

options.add_options("Fetcher")
("fetch_namespace", "Track namespace", cxxopts::value<std::string>())
Expand Down Expand Up @@ -805,12 +814,14 @@ main(int argc, char* argv[])
SPDLOG_INFO("Setting subscription filter to Latest Group");
}
}
bool joining_fetch = result.count("joining_fetch") && result["joining_fetch"].as<bool>();

const auto& sub_track_name = quicr::example::MakeFullTrackName(result["sub_namespace"].as<std::string>(),
result["sub_name"].as<std::string>(),
qclient_vars::track_alias);

sub_thread = std::thread(DoSubscriber, sub_track_name, client, filter_type, std::ref(stop_threads));
sub_thread =
std::thread(DoSubscriber, sub_track_name, client, filter_type, std::ref(stop_threads), joining_fetch);
}
if (enable_fetch) {
const auto& fetch_track_name =
Expand Down
55 changes: 16 additions & 39 deletions cmd/examples/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -760,48 +760,25 @@ class MyServer : public quicr::Server
}
}

/**
* @brief Checks the cache for the requested objects.
*
* @param connection_handle Source connection ID.
* @param subscribe_id Subscribe ID received.
* @param track_full_name Track full name
* @param attrs Fetch attributes received.
*
* @returns true if the range of groups and objects exist in the cache, otherwise returns false.
*/
bool FetchReceived([[maybe_unused]] quicr::ConnectionHandle connection_handle,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The thought here is that a fetch could be denied. We still need that for the future, but I suppose we can add it back then.

[[maybe_unused]] uint64_t subscribe_id,
const quicr::FullTrackName& track_full_name,
const quicr::FetchAttributes& attrs) override
LargestAvailable GetLargestAvailable(const quicr::FullTrackName& track_name) override
{
SPDLOG_INFO("Received Fetch for conn_id: {} subscribe_id: {} start_group: {} end_group: {}",
connection_handle,
subscribe_id,
attrs.start_group,
attrs.end_group);

const auto th = quicr::TrackHash(track_full_name);

auto cache_entry_it = qserver_vars::cache.find(th.track_fullname_hash);
if (cache_entry_it == qserver_vars::cache.end()) {
SPDLOG_WARN("No cache entry for the hash {}", th.track_fullname_hash);
return false;
// Get the largest object from the cache.
std::optional<uint64_t> largest_group_id = std::nullopt;
std::optional<uint64_t> largest_object_id = std::nullopt;
const auto& th = quicr::TrackHash(track_name);
const auto cache_entry_it = qserver_vars::cache.find(th.track_fullname_hash);
if (cache_entry_it != qserver_vars::cache.end()) {
auto& [_, cache] = *cache_entry_it;
if (const auto& latest_group = cache.Last(); latest_group && !latest_group->empty()) {
const auto& latest_object = std::prev(latest_group->end());
largest_group_id = latest_object->headers.group_id;
largest_object_id = latest_object->headers.object_id;
}
}

auto& [_, cache_entry] = *cache_entry_it;

const auto groups = cache_entry.Get(attrs.start_group, attrs.end_group + 1);

if (groups.empty()) {
SPDLOG_WARN("No groups found for requested range");
return false;
if (!largest_group_id.has_value() || !largest_object_id.has_value()) {
return std::nullopt;
}

return std::any_of(groups.begin(), groups.end(), [&](const auto& group) {
return !group->empty() && group->begin()->headers.object_id <= attrs.start_object &&
(!attrs.end_object.has_value() || std::prev(group->end())->headers.object_id >= *attrs.end_object);
});
return std::make_pair(*largest_group_id, *largest_object_id);
}

/**
Expand Down
33 changes: 33 additions & 0 deletions include/quicr/detail/joining_fetch_handler.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// SPDX-FileCopyrightText: Copyright (c) 2025 Cisco Systems
// SPDX-License-Identifier: BSD-2-Clause

#pragma once

#include <quicr/detail/messages.h>
#include <quicr/subscribe_track_handler.h>

namespace quicr {
/**
* JoiningFetchHandler is used internally in order to forward JOINING FETCH
* streams to their corresponding SUBSCRIBE track handler, for convenience.
*/
class JoiningFetchHandler : public SubscribeTrackHandler
{
public:
explicit JoiningFetchHandler(std::shared_ptr<SubscribeTrackHandler> joining_subscribe)
: SubscribeTrackHandler(joining_subscribe->GetFullTrackName(),
joining_subscribe->GetPriority(),
joining_subscribe->GetGroupOrder(),
joining_subscribe->GetFilterType())
, joining_subscribe_(std::move(joining_subscribe))
{
}
void StreamDataRecv(bool is_start,
uint64_t stream_id,
std::shared_ptr<const std::vector<uint8_t>> data) override;

private:
std::shared_ptr<SubscribeTrackHandler> joining_subscribe_;
};

} // namespace moq
9 changes: 8 additions & 1 deletion include/quicr/detail/messages.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,12 @@ namespace quicr::messages {

enum class FetchErrorCode : uint8_t
{
kTrackDoesNotExist = 0xF0 // Missing in draft
kInternalError = 0x0,
kUnauthorized = 0x1,
kTimeout = 0x2,
kNotSupported = 0x3,
kTrackDoesNotExist = 0x4,
kInvalidRange = 0x5,
};

// TODO (Suhas): rename it to StreamMapping
Expand Down Expand Up @@ -444,6 +449,8 @@ namespace quicr::messages {
kJoiningFetch,
};

// TODO(RichLogan): Consider a separate JoiningFetch structure.

struct Fetch
{
uint64_t subscribe_id;
Expand Down
17 changes: 15 additions & 2 deletions include/quicr/detail/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,9 +247,15 @@ namespace quicr {

uint64_t current_subscribe_id{ 0 }; ///< Connection specific ID for subscribe messages

/// Track namespace/name by received subscribe IDs
/// Subscribe Context by received subscribe IDs
/// Used to map published tracks to subscribes in client mode
std::map<messages::SubscribeId, std::pair<TrackNamespaceHash, TrackNameHash>> recv_sub_id;
struct SubscribeContext
{
FullTrackName track_full_name;
std::optional<messages::GroupId> largest_group;
std::optional<messages::ObjectId> largest_object;
};
std::map<messages::SubscribeId, SubscribeContext> recv_sub_id;

/// Tracks by subscribe ID (Subscribe and Fetch)
std::map<messages::SubscribeId, std::shared_ptr<SubscribeTrackHandler>> tracks_by_sub_id;
Expand Down Expand Up @@ -348,6 +354,13 @@ namespace quicr {
messages::GroupId start_object,
messages::GroupId end_group,
messages::GroupId end_object);
void SendJoiningFetch(ConnectionContext& conn_ctx,
uint64_t subscribe_id,
messages::ObjectPriority priority,
messages::GroupOrder group_order,
uint64_t joining_subscribe_id,
messages::GroupId preceding_group_offset,
const std::vector<messages::Parameter>& parameters);
void SendFetchCancel(ConnectionContext& conn_ctx, uint64_t subscribe_id);
void SendFetchOk(ConnectionContext& conn_ctx,
uint64_t subscribe_id,
Expand Down
28 changes: 16 additions & 12 deletions include/quicr/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,18 @@ namespace quicr {
void UnbindPublisherTrack(ConnectionHandle connection_handle,
const std::shared_ptr<PublishTrackHandler>& track_handler);

/**
* @brief Bind a server fetch publisher track handler.
* @param conn_id Connection Id of the client/fetcher.
* @param track_handler The fetch publisher.
*/
void BindFetchTrack(TransportConnId conn_id, std::shared_ptr<PublishFetchHandler> track_handler);

/**
* @brief Unbind a server fetch publisher track handler.
* @param conn_id Connection ID of the client/fetcher.
* @param track_handler The fetch publisher.
*/
void UnbindFetchTrack(ConnectionHandle conn_id, const std::shared_ptr<PublishFetchHandler>& track_handler);

/**
Expand Down Expand Up @@ -279,20 +289,14 @@ namespace quicr {
*/
virtual void UnsubscribeReceived(ConnectionHandle connection_handle, uint64_t subscribe_id) = 0;

// TODO: Their is probably a distinction between track not found, and no objects.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// TODO: Their is probably a distinction between track not found, and no objects.
// TODO: There is probably a distinction between track not found, and no objects.

typedef std::optional<std::pair<messages::GroupId, messages::ObjectId>> LargestAvailable;
/**
* @brief Callback notification on Fetch message received.
*
* @param connection_handle Source connection ID.
* @param subscribe_id Subscribe ID received.
* @param track_full_name Track full name
* @param attributes Fetch attributes received.
*
* @returns true if user defined conditions of Fetch are satisfied, false otherwise.
* @brief Get the largest available object for the given track, if any.
* @param track_name The track to lookup on.
* @return The largest available object, if any.
*/
virtual bool FetchReceived(ConnectionHandle connection_handle,
uint64_t subscribe_id,
const FullTrackName& track_full_name,
const FetchAttributes& attributes);
virtual LargestAvailable GetLargestAvailable(const FullTrackName& track_name);

/**
* @brief Event to run on sending FetchOk.
Expand Down
22 changes: 21 additions & 1 deletion include/quicr/subscribe_track_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,20 +46,34 @@ namespace quicr {
kSendingUnsubscribe ///< In this state, callbacks will not be called
};

/**
* @brief Attributes to use when subscribing with a Joining Fetch.
*/
struct JoiningFetch
{
const messages::ObjectPriority priority;
const messages::GroupOrder group_order;
const std::vector<messages::Parameter> parameters;
const messages::GroupId preceding_group_offset;
};

protected:
/**
* @brief Subscribe track handler constructor
*
* @param full_track_name Full track name struct
* @param joining_fetch If set, subscribe with a joining fetch using these attributes.
*/
SubscribeTrackHandler(const FullTrackName& full_track_name,
messages::ObjectPriority priority,
messages::GroupOrder group_order,
messages::FilterType filter_type)
messages::FilterType filter_type,
const std::optional<JoiningFetch>& joining_fetch = std::nullopt)
: BaseTrackHandler(full_track_name)
, priority_(priority)
, group_order_(group_order)
, filter_type_(filter_type)
, joining_fetch_(joining_fetch)
{
}

Expand Down Expand Up @@ -125,6 +139,11 @@ namespace quicr {
constexpr void SetLatestGroupID(messages::GroupId new_id) noexcept { latest_group_id_ = new_id; }
constexpr void SetLatestObjectID(messages::ObjectId new_id) noexcept { latest_object_id_ = new_id; }

/**
* @brief Get joining fetch info, if any.
*/
std::optional<JoiningFetch> GetJoiningFetch() const noexcept { return joining_fetch_; }

// --------------------------------------------------------------------------
// Public Virtual API callback event methods
// --------------------------------------------------------------------------
Expand Down Expand Up @@ -237,6 +256,7 @@ namespace quicr {
uint64_t current_stream_id_{ 0 };
std::optional<messages::GroupId> latest_group_id_;
std::optional<messages::ObjectId> latest_object_id_;
std::optional<JoiningFetch> joining_fetch_;

friend class Transport;
friend class Client;
Expand Down
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ target_sources (quicr PRIVATE
quic_transport.cpp
transport.cpp
transport_picoquic.cpp
joining_fetch_handler.cpp
)

target_include_directories(quicr PUBLIC ${CMAKE_BINARY_DIR}/include )
Expand Down
12 changes: 7 additions & 5 deletions src/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ namespace quicr {
conn_ctx.current_subscribe_id = msg.subscribe_id + 1;
}

conn_ctx.recv_sub_id[msg.subscribe_id] = { th.track_namespace_hash, th.track_name_hash };
conn_ctx.recv_sub_id[msg.subscribe_id] = { .track_full_name = tfn };
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Supported >= c++20


// For client/publisher, notify track that there is a subscriber
auto ptd = GetPubTrackHandler(conn_ctx, th);
Expand Down Expand Up @@ -120,7 +120,7 @@ namespace quicr {
ptd->SetTrackAlias(msg.track_alias);
ptd->SetStatus(PublishTrackHandler::Status::kOk);

conn_ctx.recv_sub_id[msg.subscribe_id] = { th.track_namespace_hash, th.track_name_hash };
conn_ctx.recv_sub_id[msg.subscribe_id] = { .track_full_name = tfn };
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Supported >= c++20

return true;
}
case messages::ControlMessageType::kSubscribeUpdate: {
Expand All @@ -142,8 +142,8 @@ namespace quicr {
return true;
}

auto [ns_hash, n_hash] = conn_ctx.recv_sub_id[msg.subscribe_id];
auto th = TrackHash(ns_hash, n_hash);
auto tfn = conn_ctx.recv_sub_id[msg.subscribe_id].track_full_name;
auto th = TrackHash(tfn);

// For client/publisher, notify track that there is a subscriber
auto ptd = GetPubTrackHandler(conn_ctx, th);
Expand Down Expand Up @@ -333,7 +333,9 @@ namespace quicr {
return true;
}

const auto& [ns_hash, name_hash] = th_it->second;
const auto& th = TrackHash(th_it->second.track_full_name);
const auto& ns_hash = th.track_namespace_hash;
const auto& name_hash = th.track_name_hash;
SPDLOG_LOGGER_DEBUG(logger_,
"Received unsubscribe conn_id: {0} subscribe_id: {1}",
conn_ctx.connection_handle,
Expand Down
Loading