Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
328 changes: 156 additions & 172 deletions test/rialtoPOC/rialto-pipeline2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,210 +5,194 @@
* Copyright 2026 RDK Management
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "rialto-pipeline2.h"
*/
#include "rialto-pipeline.h"

#include <assert.h>
Copy link

Copilot AI Jan 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using C-style <assert.h> instead of C++ . Use for consistency with modern C++ practices.

Copilot generated this review using guidance from repository custom instructions.
#include <inttypes.h>
#include <gst/gst.h>
#include <gst/app/gstappsrc.h>

#include <atomic>
#include <cstdio>

using namespace firebolt::rialto;

// -----------------------------------------------------------------------------
// Track IDs (fixed mapping for test compatibility)
// -----------------------------------------------------------------------------
static const int32_t TRACK_VIDEO = 0;
static const int32_t TRACK_AUDIO = 1;

static void found_video_source_cb( GObject * object, GObject * orig, GParamSpec * pspec, class GstMediaPipeline *gstMediaPipeline )
// -----------------------------------------------------------------------------
// Per‑source state (pipeline3‑compatible)
// -----------------------------------------------------------------------------
struct SourceState
{
bool outstanding{false};
uint32_t requestId{0};
};

static std::atomic<uint32_t> g_nextRequestId{1};

// -----------------------------------------------------------------------------
// Forward declarations
// -----------------------------------------------------------------------------
static void found_video_source_cb(GObject *, GObject *, GParamSpec *, class GstMediaPipeline *);
static void found_audio_source_cb(GObject *, GObject *, GParamSpec *, class GstMediaPipeline *);
static void on_need_data_cb(GstAppSrc *, guint, gpointer);

// -----------------------------------------------------------------------------
// GStreamer callbacks
// -----------------------------------------------------------------------------
static void found_video_source_cb(GObject *, GObject *orig, GParamSpec *pspec,
class GstMediaPipeline *pipeline)
{
printf( "found_video_source_cb\n" );
gstMediaPipeline->found_source(orig,pspec,TRACK_VIDEO);
pipeline->found_source(orig, pspec, TRACK_VIDEO);
}

static void found_audio_source_cb( GObject * object, GObject * orig, GParamSpec * pspec, class GstMediaPipeline *gstMediaPipeline )
static void found_audio_source_cb(GObject *, GObject *orig, GParamSpec *pspec,
class GstMediaPipeline *pipeline)
{
printf( "found_audio_source_cb\n" );
gstMediaPipeline->found_source(orig,pspec,TRACK_AUDIO);
pipeline->found_source(orig, pspec, TRACK_AUDIO);
}

void GstMediaPipeline::found_source( GObject *orig, GParamSpec *pspec, int sourceId )
static void on_need_data_cb(GstAppSrc *src, guint /*length*/, gpointer user_data)
{
g_object_get( orig, pspec->name, &track[sourceId].appsrc, NULL );
auto *pipeline = static_cast<GstMediaPipeline *>(user_data);

int sourceId =
(src == GST_APP_SRC(pipeline->track[TRACK_VIDEO].appsrc))
? TRACK_VIDEO
: TRACK_AUDIO;

auto &state = pipeline->m_sourceState[sourceId];
if (state.outstanding)
return;

uint32_t reqId = g_nextRequestId.fetch_add(1);
state.outstanding = true;
state.requestId = reqId;

auto client = pipeline->getClient().lock();
if (client)
{
client->notifyNeedMediaData(
sourceId,
0 /* frameCount */,
reqId,
nullptr /* shmInfo */);
}
}

bool GstMediaPipeline::attachSource(const std::unique_ptr<MediaSource> &source, int32_t &sourceId ){
MediaSourceType sourceType = source->getType();
GstCaps * caps = gst_caps_new_empty_simple( source->getMimeType().c_str() );

if( sourceType == MediaSourceType::VIDEO || sourceType == MediaSourceType::AUDIO )
// -----------------------------------------------------------------------------
// GstMediaPipeline implementation
// -----------------------------------------------------------------------------
GstMediaPipeline::GstMediaPipeline()
{
printf("constructing GstMediaPipeline (appsrc-backed)\n");

pipeline = gst_pipeline_new("rialtoTest");
assert(pipeline);
Copy link

Copilot AI Jan 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace assert with proper error handling. Use logging and throw an exception or handle the error gracefully.

Copilot generated this review using guidance from repository custom instructions.

for (int i = 0; i < 2; ++i)
{
const IMediaPipeline::MediaSourceAV *mediaSourceAV = dynamic_cast<IMediaPipeline::MediaSourceAV *>(source.get());
if( mediaSourceAV )
m_sourceState[i] = {};

GstElement *playbin = gst_element_factory_make("playbin", nullptr);
assert(playbin);
Copy link

Copilot AI Jan 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace assert with proper error handling. Use logging and handle the error gracefully.

Copilot generated this review using guidance from repository custom instructions.

track[i].playbin = playbin;
track[i].appsrc = nullptr;

gst_bin_add(GST_BIN(pipeline), playbin);
g_object_set(playbin, "uri", "appsrc://", nullptr);

if (i == TRACK_VIDEO)
{
const char *streamFormat = nullptr;
switch( mediaSourceAV->getStreamFormat() )
{
case StreamFormat::HVC1:
streamFormat = "hvc1";
break;
case StreamFormat::AVC:
streamFormat = "avc";
break;
case StreamFormat::RAW:
streamFormat = "raw";
break;
default:
break;
}
if( streamFormat )
{
gst_caps_set_simple( caps, "stream-format", G_TYPE_STRING, streamFormat, nullptr );
}
const std::shared_ptr<CodecData> codecData = mediaSourceAV->getCodecData();
GstBuffer *buf = nullptr;
if( codecData )
{
buf = gst_buffer_new_and_alloc(codecData->data.size());
gst_buffer_fill(buf, 0, codecData->data.data(), codecData->data.size() );
gst_caps_set_simple( caps, "codec_data", GST_TYPE_BUFFER, buf, nullptr );
}

if( sourceType == MediaSourceType::VIDEO )
{
sourceId = TRACK_VIDEO;
const IMediaPipeline::MediaSourceVideo *mediaSourceVideo = dynamic_cast<IMediaPipeline::MediaSourceVideo *>(source.get());
if( mediaSourceVideo )
{
gst_caps_set_simple( caps,
"alignment", G_TYPE_STRING, "au",
"width", G_TYPE_INT, mediaSourceVideo->getWidth(),
"height", G_TYPE_INT, mediaSourceVideo->getHeight(),
"pixel-aspect-ratio", GST_TYPE_FRACTION, 1, 1,
nullptr);
}
}
else if( sourceType == MediaSourceType::AUDIO )
{
sourceId = TRACK_AUDIO;
const IMediaPipeline::MediaSourceAudio *mediaSourceAudio = dynamic_cast<IMediaPipeline::MediaSourceAudio *>(source.get());
if( mediaSourceAudio )
{
const auto &audioConfig = mediaSourceAudio->getAudioConfig();
gst_caps_set_simple( caps,
"framed", G_TYPE_BOOLEAN, TRUE,
"rate", G_TYPE_INT, audioConfig.sampleRate,
"channels", G_TYPE_INT, audioConfig.numberOfChannels,
nullptr );
}
}
else
{
printf( "unknown sourceType\n" );
exit(1);
}

gchar *caps_string = gst_caps_to_string(caps);
g_print("Negotiated caps: %s\n", caps_string);
g_free(caps_string);

gst_app_src_set_caps(GST_APP_SRC(track[sourceId].appsrc), caps);
gst_caps_unref(caps);
if( buf )
{
gst_buffer_unref(buf);
}
GstElement *videoSink =
gst_element_factory_make("rialtomsevideosink", "video-sink");
assert(videoSink);
Copy link

Copilot AI Jan 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace assert with proper error handling. Use logging and handle the error gracefully.

Copilot generated this review using guidance from repository custom instructions.

g_object_set(playbin, "video-sink", videoSink, nullptr);
g_signal_connect(playbin, "deep-notify::source",
G_CALLBACK(found_video_source_cb), this);
}
else
{
GstElement *audioSink =
gst_element_factory_make("rialtomseaudiosink", "audio-sink");
assert(audioSink);
Copy link

Copilot AI Jan 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace assert with proper error handling. Use logging and handle the error gracefully.

Copilot generated this review using guidance from repository custom instructions.

g_object_set(playbin, "audio-sink", audioSink, nullptr);
g_signal_connect(playbin, "deep-notify::source",
G_CALLBACK(found_audio_source_cb), this);
}
printf( "attachSource() -> sourceId=%" PRId32 "\n", sourceId );
}
return true;
};
}

bool GstMediaPipeline::removeSource(int32_t id){
printf( "removeSource(sourceId=%" PRId32 ")\n", id );
return false;
};
GstMediaPipeline::~GstMediaPipeline()
{
printf("destructing GstMediaPipeline\n");
}

bool GstMediaPipeline::allSourcesAttached(){
return true;
};
void GstMediaPipeline::found_source(GObject *orig, GParamSpec *pspec, int sourceId)
{
g_object_get(orig, pspec->name, &track[sourceId].appsrc, nullptr);

bool GstMediaPipeline::load(MediaType type, const std::string &mimeType, const std::string &url){ return true; }
GstAppSrc *appsrc = GST_APP_SRC(track[sourceId].appsrc);
gst_app_src_set_stream_type(appsrc, GST_APP_STREAM_TYPE_STREAM);

bool GstMediaPipeline::play(){
printf( "play\n" );
gst_element_set_state( pipeline, GST_STATE_PLAYING );
return true;
};
g_signal_connect(appsrc, "need-data",
G_CALLBACK(on_need_data_cb), this);
}

AddSegmentStatus GstMediaPipeline::addSegment(uint32_t needDataRequestId, const std::unique_ptr<MediaSegment> &mediaSegment)
bool GstMediaPipeline::play()
{
GstBuffer *gstBuffer = gst_buffer_new_wrapped(
(gpointer)mediaSegment->getData(),
(gsize)mediaSegment->getDataLength() );
GST_BUFFER_PTS(gstBuffer) = (GstClockTime)(mediaSegment->getTimeStamp());
GST_BUFFER_DURATION(gstBuffer) = (GstClockTime)(mediaSegment->getDuration());
gst_app_src_push_buffer(GST_APP_SRC(track[mediaSegment->getId()].appsrc), gstBuffer );
return AddSegmentStatus::OK;
gst_element_set_state(pipeline, GST_STATE_PLAYING);
return true;
}

GstMediaPipeline::GstMediaPipeline()
bool GstMediaPipeline::haveData(MediaSourceStatus, uint32_t requestId)
{
printf("constructing GstMediaPipeline\n");
pipeline = gst_pipeline_new("rialtoTest");

for (int i = 0; i < 2; i++)
{
printf("creating playbin for track#%d\n", i);

GstElement* playbin = gst_element_factory_make("playbin", nullptr);
if( !playbin )
for (int i = 0; i < 2; ++i)
{
auto &state = m_sourceState[i];
if (state.outstanding && state.requestId == requestId)
{
printf( "gst_element_factory_make failure\n" );
exit(1);
state.outstanding = false;
state.requestId = 0;
return true;
}
track[i].playbin = playbin;
track[i].appsrc = nullptr;
}
return false;
}

gboolean rc = gst_bin_add(GST_BIN(pipeline), playbin);
if( !rc )
{
printf( "gst_bin_add failure\n" );
exit(1);
}
AddSegmentStatus
GstMediaPipeline::addSegment(uint32_t requestId,
const std::unique_ptr<MediaSegment> &segment)
{
int32_t sourceId = segment->getId();
auto &state = m_sourceState[sourceId];

g_object_set(playbin, "uri", "appsrc://", nullptr);

if (i == TRACK_VIDEO) {
GstElement* videoSink = gst_element_factory_make("rialtomsevideosink", "video-sink");
if( !videoSink )
{
printf( "videoSink gst_element_factory_make failure\n" );
exit(1);
}
g_object_set(playbin, "video-sink", videoSink, nullptr);
g_signal_connect(playbin, "deep-notify::source",
G_CALLBACK(found_video_source_cb), this);
} else {
GstElement* audioSink = gst_element_factory_make("rialtomseaudiosink", "audio-sink");
if( !audioSink )
{
printf( "audioSink gst_element_factory_make failure\n" );
exit(1);
}
g_object_set(playbin, "audio-sink", audioSink, nullptr);
g_signal_connect(playbin, "deep-notify::source",
G_CALLBACK(found_audio_source_cb), this);
}
}
}
if (!state.outstanding || state.requestId != requestId)
{
printf("addSegment rejected (src=%" PRId32 ", req=%" PRIu32 ")\n",
sourceId, requestId);
return AddSegmentStatus::ERROR;
}

GstBuffer *buf = gst_buffer_new_wrapped(
const_cast<uint8_t *>(segment->getData()),
segment->getDataLength());

GstMediaPipeline::~GstMediaPipeline()
{
printf( "destructing GstMediaPipeline\n" );
GST_BUFFER_PTS(buf) =
static_cast<GstClockTime>(segment->getTimeStamp());
GST_BUFFER_DURATION(buf) =
static_cast<GstClockTime>(segment->getDuration());

gst_app_src_push_buffer(
GST_APP_SRC(track[sourceId].appsrc), buf);

return AddSegmentStatus::OK;
}
Loading