diff --git a/test/rialtoPOC/rialto-pipeline2.cpp b/test/rialtoPOC/rialto-pipeline2.cpp index db0a1671c..fe26c19c6 100644 --- a/test/rialtoPOC/rialto-pipeline2.cpp +++ b/test/rialtoPOC/rialto-pipeline2.cpp @@ -5,210 +5,194 @@ * Copyright 2026 RDK Management * * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -#include "rialto-pipeline2.h" + */ +#include "rialto-pipeline.h" + +#include #include +#include #include +#include +#include + +using namespace firebolt::rialto; + +// ----------------------------------------------------------------------------- +// Track IDs (fixed mapping for test compatibility) +// ----------------------------------------------------------------------------- static const int32_t TRACK_VIDEO = 0; static const int32_t TRACK_AUDIO = 1; -static void found_video_source_cb( GObject * object, GObject * orig, GParamSpec * pspec, class GstMediaPipeline *gstMediaPipeline ) +// ----------------------------------------------------------------------------- +// Per‑source state (pipeline3‑compatible) +// ----------------------------------------------------------------------------- +struct SourceState +{ + bool outstanding{false}; + uint32_t requestId{0}; +}; + +static std::atomic g_nextRequestId{1}; + +// ----------------------------------------------------------------------------- +// Forward declarations +// ----------------------------------------------------------------------------- +static void found_video_source_cb(GObject *, GObject *, GParamSpec *, class GstMediaPipeline *); +static void found_audio_source_cb(GObject *, GObject *, GParamSpec *, class GstMediaPipeline *); +static void on_need_data_cb(GstAppSrc *, guint, gpointer); + +// ----------------------------------------------------------------------------- +// GStreamer callbacks +// ----------------------------------------------------------------------------- +static void found_video_source_cb(GObject *, GObject *orig, GParamSpec *pspec, + class GstMediaPipeline *pipeline) { - printf( "found_video_source_cb\n" ); - gstMediaPipeline->found_source(orig,pspec,TRACK_VIDEO); + pipeline->found_source(orig, pspec, TRACK_VIDEO); } -static void found_audio_source_cb( GObject * object, GObject * orig, GParamSpec * pspec, class GstMediaPipeline *gstMediaPipeline ) +static void found_audio_source_cb(GObject *, GObject *orig, GParamSpec *pspec, + class GstMediaPipeline *pipeline) { - printf( "found_audio_source_cb\n" ); - gstMediaPipeline->found_source(orig,pspec,TRACK_AUDIO); + pipeline->found_source(orig, pspec, TRACK_AUDIO); } -void GstMediaPipeline::found_source( GObject *orig, GParamSpec *pspec, int sourceId ) +static void on_need_data_cb(GstAppSrc *src, guint /*length*/, gpointer user_data) { - g_object_get( orig, pspec->name, &track[sourceId].appsrc, NULL ); + auto *pipeline = static_cast(user_data); + + int sourceId = + (src == GST_APP_SRC(pipeline->track[TRACK_VIDEO].appsrc)) + ? TRACK_VIDEO + : TRACK_AUDIO; + + auto &state = pipeline->m_sourceState[sourceId]; + if (state.outstanding) + return; + + uint32_t reqId = g_nextRequestId.fetch_add(1); + state.outstanding = true; + state.requestId = reqId; + + auto client = pipeline->getClient().lock(); + if (client) + { + client->notifyNeedMediaData( + sourceId, + 0 /* frameCount */, + reqId, + nullptr /* shmInfo */); + } } -bool GstMediaPipeline::attachSource(const std::unique_ptr &source, int32_t &sourceId ){ - MediaSourceType sourceType = source->getType(); - GstCaps * caps = gst_caps_new_empty_simple( source->getMimeType().c_str() ); - - if( sourceType == MediaSourceType::VIDEO || sourceType == MediaSourceType::AUDIO ) +// ----------------------------------------------------------------------------- +// GstMediaPipeline implementation +// ----------------------------------------------------------------------------- +GstMediaPipeline::GstMediaPipeline() +{ + printf("constructing GstMediaPipeline (appsrc-backed)\n"); + + pipeline = gst_pipeline_new("rialtoTest"); + assert(pipeline); + + for (int i = 0; i < 2; ++i) { - const IMediaPipeline::MediaSourceAV *mediaSourceAV = dynamic_cast(source.get()); - if( mediaSourceAV ) + m_sourceState[i] = {}; + + GstElement *playbin = gst_element_factory_make("playbin", nullptr); + assert(playbin); + + track[i].playbin = playbin; + track[i].appsrc = nullptr; + + gst_bin_add(GST_BIN(pipeline), playbin); + g_object_set(playbin, "uri", "appsrc://", nullptr); + + if (i == TRACK_VIDEO) { - const char *streamFormat = nullptr; - switch( mediaSourceAV->getStreamFormat() ) - { - case StreamFormat::HVC1: - streamFormat = "hvc1"; - break; - case StreamFormat::AVC: - streamFormat = "avc"; - break; - case StreamFormat::RAW: - streamFormat = "raw"; - break; - default: - break; - } - if( streamFormat ) - { - gst_caps_set_simple( caps, "stream-format", G_TYPE_STRING, streamFormat, nullptr ); - } - const std::shared_ptr codecData = mediaSourceAV->getCodecData(); - GstBuffer *buf = nullptr; - if( codecData ) - { - buf = gst_buffer_new_and_alloc(codecData->data.size()); - gst_buffer_fill(buf, 0, codecData->data.data(), codecData->data.size() ); - gst_caps_set_simple( caps, "codec_data", GST_TYPE_BUFFER, buf, nullptr ); - } - - if( sourceType == MediaSourceType::VIDEO ) - { - sourceId = TRACK_VIDEO; - const IMediaPipeline::MediaSourceVideo *mediaSourceVideo = dynamic_cast(source.get()); - if( mediaSourceVideo ) - { - gst_caps_set_simple( caps, - "alignment", G_TYPE_STRING, "au", - "width", G_TYPE_INT, mediaSourceVideo->getWidth(), - "height", G_TYPE_INT, mediaSourceVideo->getHeight(), - "pixel-aspect-ratio", GST_TYPE_FRACTION, 1, 1, - nullptr); - } - } - else if( sourceType == MediaSourceType::AUDIO ) - { - sourceId = TRACK_AUDIO; - const IMediaPipeline::MediaSourceAudio *mediaSourceAudio = dynamic_cast(source.get()); - if( mediaSourceAudio ) - { - const auto &audioConfig = mediaSourceAudio->getAudioConfig(); - gst_caps_set_simple( caps, - "framed", G_TYPE_BOOLEAN, TRUE, - "rate", G_TYPE_INT, audioConfig.sampleRate, - "channels", G_TYPE_INT, audioConfig.numberOfChannels, - nullptr ); - } - } - else - { - printf( "unknown sourceType\n" ); - exit(1); - } - - gchar *caps_string = gst_caps_to_string(caps); - g_print("Negotiated caps: %s\n", caps_string); - g_free(caps_string); - - gst_app_src_set_caps(GST_APP_SRC(track[sourceId].appsrc), caps); - gst_caps_unref(caps); - if( buf ) - { - gst_buffer_unref(buf); - } + GstElement *videoSink = + gst_element_factory_make("rialtomsevideosink", "video-sink"); + assert(videoSink); + + g_object_set(playbin, "video-sink", videoSink, nullptr); + g_signal_connect(playbin, "deep-notify::source", + G_CALLBACK(found_video_source_cb), this); + } + else + { + GstElement *audioSink = + gst_element_factory_make("rialtomseaudiosink", "audio-sink"); + assert(audioSink); + + g_object_set(playbin, "audio-sink", audioSink, nullptr); + g_signal_connect(playbin, "deep-notify::source", + G_CALLBACK(found_audio_source_cb), this); } - printf( "attachSource() -> sourceId=%" PRId32 "\n", sourceId ); } - return true; -}; +} -bool GstMediaPipeline::removeSource(int32_t id){ - printf( "removeSource(sourceId=%" PRId32 ")\n", id ); - return false; -}; +GstMediaPipeline::~GstMediaPipeline() +{ + printf("destructing GstMediaPipeline\n"); +} -bool GstMediaPipeline::allSourcesAttached(){ - return true; -}; +void GstMediaPipeline::found_source(GObject *orig, GParamSpec *pspec, int sourceId) +{ + g_object_get(orig, pspec->name, &track[sourceId].appsrc, nullptr); -bool GstMediaPipeline::load(MediaType type, const std::string &mimeType, const std::string &url){ return true; } + GstAppSrc *appsrc = GST_APP_SRC(track[sourceId].appsrc); + gst_app_src_set_stream_type(appsrc, GST_APP_STREAM_TYPE_STREAM); -bool GstMediaPipeline::play(){ - printf( "play\n" ); - gst_element_set_state( pipeline, GST_STATE_PLAYING ); - return true; -}; + g_signal_connect(appsrc, "need-data", + G_CALLBACK(on_need_data_cb), this); +} -AddSegmentStatus GstMediaPipeline::addSegment(uint32_t needDataRequestId, const std::unique_ptr &mediaSegment) +bool GstMediaPipeline::play() { - GstBuffer *gstBuffer = gst_buffer_new_wrapped( - (gpointer)mediaSegment->getData(), - (gsize)mediaSegment->getDataLength() ); - GST_BUFFER_PTS(gstBuffer) = (GstClockTime)(mediaSegment->getTimeStamp()); - GST_BUFFER_DURATION(gstBuffer) = (GstClockTime)(mediaSegment->getDuration()); - gst_app_src_push_buffer(GST_APP_SRC(track[mediaSegment->getId()].appsrc), gstBuffer ); - return AddSegmentStatus::OK; + gst_element_set_state(pipeline, GST_STATE_PLAYING); + return true; } -GstMediaPipeline::GstMediaPipeline() +bool GstMediaPipeline::haveData(MediaSourceStatus, uint32_t requestId) { - printf("constructing GstMediaPipeline\n"); - pipeline = gst_pipeline_new("rialtoTest"); - - for (int i = 0; i < 2; i++) - { - printf("creating playbin for track#%d\n", i); - - GstElement* playbin = gst_element_factory_make("playbin", nullptr); - if( !playbin ) + for (int i = 0; i < 2; ++i) + { + auto &state = m_sourceState[i]; + if (state.outstanding && state.requestId == requestId) { - printf( "gst_element_factory_make failure\n" ); - exit(1); + state.outstanding = false; + state.requestId = 0; + return true; } - track[i].playbin = playbin; - track[i].appsrc = nullptr; + } + return false; +} - gboolean rc = gst_bin_add(GST_BIN(pipeline), playbin); - if( !rc ) - { - printf( "gst_bin_add failure\n" ); - exit(1); - } +AddSegmentStatus +GstMediaPipeline::addSegment(uint32_t requestId, + const std::unique_ptr &segment) +{ + int32_t sourceId = segment->getId(); + auto &state = m_sourceState[sourceId]; - g_object_set(playbin, "uri", "appsrc://", nullptr); - - if (i == TRACK_VIDEO) { - GstElement* videoSink = gst_element_factory_make("rialtomsevideosink", "video-sink"); - if( !videoSink ) - { - printf( "videoSink gst_element_factory_make failure\n" ); - exit(1); - } - g_object_set(playbin, "video-sink", videoSink, nullptr); - g_signal_connect(playbin, "deep-notify::source", - G_CALLBACK(found_video_source_cb), this); - } else { - GstElement* audioSink = gst_element_factory_make("rialtomseaudiosink", "audio-sink"); - if( !audioSink ) - { - printf( "audioSink gst_element_factory_make failure\n" ); - exit(1); - } - g_object_set(playbin, "audio-sink", audioSink, nullptr); - g_signal_connect(playbin, "deep-notify::source", - G_CALLBACK(found_audio_source_cb), this); - } - } -} + if (!state.outstanding || state.requestId != requestId) + { + printf("addSegment rejected (src=%" PRId32 ", req=%" PRIu32 ")\n", + sourceId, requestId); + return AddSegmentStatus::ERROR; + } + GstBuffer *buf = gst_buffer_new_wrapped( + const_cast(segment->getData()), + segment->getDataLength()); -GstMediaPipeline::~GstMediaPipeline() -{ - printf( "destructing GstMediaPipeline\n" ); + GST_BUFFER_PTS(buf) = + static_cast(segment->getTimeStamp()); + GST_BUFFER_DURATION(buf) = + static_cast(segment->getDuration()); + + gst_app_src_push_buffer( + GST_APP_SRC(track[sourceId].appsrc), buf); + + return AddSegmentStatus::OK; }