From 50a1af0af73407dae7884f87a771d60e528b8502 Mon Sep 17 00:00:00 2001 From: Barzan Hayati Date: Tue, 9 Sep 2025 13:26:22 +0000 Subject: [PATCH] Integrate msgconverter video renderer via tee --- CMakeLists.txt | 3 +- data/nvmsgboker_configs/msgbroker_config.txt | 2 +- src/nv_message_converter.cpp | 17 +- src/pipeline_manager.cpp | 222 ++++++++++++++++--- src/pipeline_manager.hpp | 3 + src/queue_manager.cpp | 1 - src/queue_manager.hpp | 1 - src/sink_manager.cpp | 7 + src/sink_manager.hpp | 3 +- src/tee_manager.cpp | 52 +++++ src/tee_manager.hpp | 25 +++ 11 files changed, 287 insertions(+), 49 deletions(-) create mode 100644 src/tee_manager.cpp create mode 100644 src/tee_manager.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 2427c8a..608a6ce 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -83,6 +83,7 @@ include_directories(${PROJECT_SOURCE_DIR}/face_nv_infer_server_manager.hpp) include_directories(${PROJECT_SOURCE_DIR}/face_nv_infer_server_manager.hpp) include_directories(${PROJECT_SOURCE_DIR}/nv_message_converter.hpp) include_directories(${PROJECT_SOURCE_DIR}/nv_message_broker.hpp) +include_directories(${PROJECT_SOURCE_DIR}/tee_manager.hpp) set(SOURCES src/main.cpp src/camera_manager.cpp src/pipeline_manager.cpp src/streammux_manager.cpp @@ -94,7 +95,7 @@ set(SOURCES src/main.cpp src/camera_manager.cpp src/pipeline_manager.cpp src/st src/config_manager.cpp src/nv_infer_server_manager.cpp src/nv_tracker_manager.cpp src/face_candid_trace.cpp src/face_nv_infer_server_manager.cpp src/face_nv_infer_server_manager.cpp - src/nv_message_converter.cpp src/nv_message_broker.cpp) + src/nv_message_converter.cpp src/nv_message_broker.cpp src/tee_manager.cpp) # missing initializer for member 'NvDsInferDims::d' [-Werror=missing-field-initializers] NvDsInferDims dims = {0}; diff --git a/data/nvmsgboker_configs/msgbroker_config.txt b/data/nvmsgboker_configs/msgbroker_config.txt index 45bede2..585d492 100644 --- a/data/nvmsgboker_configs/msgbroker_config.txt +++ b/data/nvmsgboker_configs/msgbroker_config.txt @@ -1,6 +1,6 @@ [message-broker] # Redis-specific options -hostname=192.168.130.13 +hostname=localhost port=6379 streamsize=10000 payloadkey=metadata diff --git a/src/nv_message_converter.cpp b/src/nv_message_converter.cpp index 1b0313a..53af8f4 100644 --- a/src/nv_message_converter.cpp +++ b/src/nv_message_converter.cpp @@ -11,9 +11,10 @@ NvMessageConverter::NvMessageConverter() { bool NvMessageConverter::create_message_converter() { msgconv = gst_element_factory_make("nvmsgconv", "nvmsg-converter"); - g_object_set(G_OBJECT(msgconv), "msg2p-lib", payload_generation_library, + g_object_set(G_OBJECT(msgconv), "msg2p-lib", + payload_generation_library.c_str(), NULL); + g_object_set(G_OBJECT(msgconv), "config", msgconv_config_file.c_str(), NULL); - g_object_set(G_OBJECT(msgconv), "config", msgconv_config_file, NULL); g_object_set(G_OBJECT(msgconv), "payload-type", 0, NULL); // 0 = DeepStream schema, 1 = minimal schema g_object_set(G_OBJECT(msgconv), "msg2p-newapi", 0, @@ -32,18 +33,6 @@ bool NvMessageConverter::create_message_converter() { // "msgconv")); msg2p_meta = ds_test4_parse_meta_type(argv[1], "msgconv"); // g_print("msg2p_meta = %d\n", msg2p_meta); - // Set up the pipeline we add all elements into the pipeline - // gst_bin_add_many (GST_BIN (pipeline), - // source, h264parser, decoder, nvstreammux, pgie, - // nvvidconv, nvosd, tee, queue1, queue2, msgconv, msgbroker, sink, - // NULL); - - // /* we link the elements together */ - // /* file-source -> h264-parser -> nvh264-decoder -> nvstreammux -> - // * pgie -> nvvidconv -> nvosd -> tee -> video-renderer - // * | - // * |-> msgconv -> msgbroker */ - if (!msgconv) { g_printerr("Unable to create msgconv.Exiting."); return false; diff --git a/src/pipeline_manager.cpp b/src/pipeline_manager.cpp index bcef512..c5359b3 100644 --- a/src/pipeline_manager.cpp +++ b/src/pipeline_manager.cpp @@ -236,6 +236,34 @@ bool PipelineManager::check_playing_pipeline() { } } +bool PipelineManager::connect_tee_to_queue() { + tee_manager->create_queue_pads(); + tee_manager->create_tee_pads(); + + // GstCaps *src_caps = gst_pad_query_caps(tee_manager->tee_msg_pad, NULL); + // GstCaps *sink_caps = gst_pad_query_caps(tee_manager->sink_pad1, NULL); + // g_print("tee src caps: %s\n", gst_caps_to_string(src_caps)); + // g_print("queue1 sink caps: %s\n", gst_caps_to_string(sink_caps)); + + if (gst_pad_link(tee_manager->tee_msg_pad, tee_manager->sink_pad1) != + GST_PAD_LINK_OK) { + g_printerr("Unable to link tee and message converter\n"); + gst_object_unref(tee_manager->sink_pad1); + return false; + } + gst_object_unref(tee_manager->sink_pad1); + + if (gst_pad_link(tee_manager->tee_render_pad, tee_manager->sink_pad2) != + GST_PAD_LINK_OK) { + g_printerr("Unable to link tee and render\n"); + gst_object_unref(tee_manager->sink_pad2); + return false; + } + + gst_object_unref(tee_manager->sink_pad2); + return true; +} + bool PipelineManager::setup_pipeline() { /* Set up the pipeline */ /* add all elements into the pipeline */ @@ -244,18 +272,33 @@ bool PipelineManager::setup_pipeline() { // is dsexample pluging if (dynamic_add_remove == false) { if (sink_manager->display_output < 3) { - gst_bin_add_many(GST_BIN(pipeline), - nv_infer_server_manager->primary_detector, - nv_tracker_manager->tracker, - face_nv_infer_server_manager->face_detector, - // gstds_example_manager->custom_plugin, - tiler_manager->tiler, queue_array[2].queue, - nv_video_convert_manager->nvvidconv, - nv_osd_manager->nvosd, sink_manager->sink, NULL); + gst_bin_add_many( + GST_BIN(pipeline), nv_infer_server_manager->primary_detector, + nv_tracker_manager->tracker, + face_nv_infer_server_manager->face_detector, + // gstds_example_manager->custom_plugin, + tiler_manager->tiler, queue_array[2].queue, + nv_video_convert_manager->nvvidconv, nv_osd_manager->nvosd, + tee_manager->tee, tee_manager->queue1, tee_manager->queue2, + nv_messgae_converter_manager->msgconv, + // nv_messgae_broker_manager->msgbroker, + sink_manager->fake_sink, sink_manager->sink, NULL); /* we link the elements together * nvstreammux -> nvinfer -> nvtiler -> nvvidconv -> nvosd -> * video-renderer */ + // if (!gst_element_link_many( + // streammux_manager->streammux, + // nv_video_convert_manager->nvvidconv, + // nv_infer_server_manager->primary_detector, + // nv_tracker_manager->tracker, + // face_nv_infer_server_manager->face_detector, + // // gstds_example_manager->custom_plugin, + // tiler_manager->tiler, nv_osd_manager->nvosd, + // sink_manager->sink, NULL)) { + // g_printerr("Elements could not be linked.\n"); + // return false; + // } if (!gst_element_link_many( streammux_manager->streammux, nv_video_convert_manager->nvvidconv, @@ -264,8 +307,22 @@ bool PipelineManager::setup_pipeline() { face_nv_infer_server_manager->face_detector, // gstds_example_manager->custom_plugin, tiler_manager->tiler, nv_osd_manager->nvosd, - sink_manager->sink, NULL)) { - g_printerr("Elements could not be linked.\n"); + tee_manager->tee, NULL)) { + g_printerr("Could not link source into tee!.\n"); + return false; + } + if (!gst_element_link_many(tee_manager->queue1, + nv_messgae_converter_manager->msgconv, + // nv_messgae_broker_manager->msgbroker, + sink_manager->fake_sink, NULL)) { + g_printerr( + "Could not link tee with message converter! Exiting.\n"); + return false; + } + if (!gst_element_link_many(tee_manager->queue2, sink_manager->sink, + NULL)) { + g_printerr( + "Could not link tee with video renderer! Exiting.\n"); return false; } } else { @@ -276,14 +333,31 @@ bool PipelineManager::setup_pipeline() { // gstds_example_manager->custom_plugin, tiler_manager->tiler, queue_array[2].queue, nv_video_convert_manager->nvvidconv, nv_osd_manager->nvosd, + tee_manager->tee, tee_manager->queue1, tee_manager->queue2, + nv_messgae_converter_manager->msgconv, + // nv_messgae_broker_manager->msgbroker, sink_manager->nvvidconv_postosd, sink_manager->caps, sink_manager->encoder, sink_manager->rtppay, sink_manager->sink, - NULL); + sink_manager->fake_sink, NULL); // Link the elements together: // file-source -> h264-parser -> nvh264-decoder -> // nvinfer -> nvvidconv -> nvosd -> nvvidconv_postosd -> // caps -> encoder -> rtppay -> udpsink + // if (!gst_element_link_many( + // streammux_manager->streammux, + // nv_video_convert_manager->nvvidconv, + // nv_infer_server_manager->primary_detector, + // nv_tracker_manager->tracker, + // face_nv_infer_server_manager->face_detector, + // // gstds_example_manager->custom_plugin, + // tiler_manager->tiler, nv_osd_manager->nvosd, + // sink_manager->nvvidconv_postosd, sink_manager->caps, + // sink_manager->encoder, sink_manager->rtppay, + // sink_manager->sink, NULL)) { + // g_printerr("Elements could not be linked.\n"); + // return false; + // } if (!gst_element_link_many( streammux_manager->streammux, nv_video_convert_manager->nvvidconv, @@ -292,27 +366,56 @@ bool PipelineManager::setup_pipeline() { face_nv_infer_server_manager->face_detector, // gstds_example_manager->custom_plugin, tiler_manager->tiler, nv_osd_manager->nvosd, - sink_manager->nvvidconv_postosd, sink_manager->caps, - sink_manager->encoder, sink_manager->rtppay, - sink_manager->sink, NULL)) { - g_printerr("Elements could not be linked.\n"); + tee_manager->tee, NULL)) { + g_printerr("Could not link source into tee!.\n"); + return false; + } + if (!gst_element_link_many(tee_manager->queue1, + nv_messgae_converter_manager->msgconv, + // nv_messgae_broker_manager->msgbroker, + sink_manager->fake_sink, NULL)) { + g_printerr( + "Could not link tee with message converter! Exiting.\n"); + return false; + } + if (!gst_element_link_many( + tee_manager->queue2, sink_manager->nvvidconv_postosd, + sink_manager->caps, sink_manager->encoder, + sink_manager->rtppay, sink_manager->sink, NULL)) { + g_printerr( + "Could not link tee with video renderer! Exiting.\n"); return false; } } } else { if (sink_manager->display_output < 3) { - gst_bin_add_many(GST_BIN(pipeline), - nv_infer_server_manager->primary_detector, - nv_tracker_manager->tracker, - face_nv_infer_server_manager->face_detector, - // gstds_example_manager->custom_plugin, - tiler_manager->tiler, queue_array[2].queue, - nv_video_convert_manager->nvvidconv, - nv_osd_manager->nvosd, sink_manager->sink, NULL); + gst_bin_add_many( + GST_BIN(pipeline), nv_infer_server_manager->primary_detector, + nv_tracker_manager->tracker, + face_nv_infer_server_manager->face_detector, + // gstds_example_manager->custom_plugin, + tiler_manager->tiler, queue_array[2].queue, + nv_video_convert_manager->nvvidconv, nv_osd_manager->nvosd, + tee_manager->tee, tee_manager->queue1, tee_manager->queue2, + nv_messgae_converter_manager->msgconv, + // nv_messgae_broker_manager->msgbroker, + sink_manager->fake_sink, sink_manager->sink, NULL); /* we link the elements together * nvstreammux -> nvinfer -> nvtiler -> nvvidconv -> nvosd -> * video-renderer */ + // if (!gst_element_link_many( // streammux_manager->streammux, + // SourceBin::nvmultiurisrcbin, + // // nv_video_convert_manager->nvvidconv, + // nv_infer_server_manager->primary_detector, + // nv_tracker_manager->tracker, + // face_nv_infer_server_manager->face_detector, + // // gstds_example_manager->custom_plugin, + // tiler_manager->tiler, nv_osd_manager->nvosd, + // sink_manager->sink, NULL)) { + // g_printerr("Elements could not be linked.\n"); + // return false; + // } if (!gst_element_link_many( // streammux_manager->streammux, SourceBin::nvmultiurisrcbin, // nv_video_convert_manager->nvvidconv, @@ -321,8 +424,23 @@ bool PipelineManager::setup_pipeline() { face_nv_infer_server_manager->face_detector, // gstds_example_manager->custom_plugin, tiler_manager->tiler, nv_osd_manager->nvosd, - sink_manager->sink, NULL)) { - g_printerr("Elements could not be linked.\n"); + tee_manager->tee, NULL)) { + g_printerr("Could not link source into tee!.\n"); + return false; + } + + if (!gst_element_link_many(tee_manager->queue1, + nv_messgae_converter_manager->msgconv, + // nv_messgae_broker_manager->msgbroker, + sink_manager->fake_sink, NULL)) { + g_printerr( + "Could not link tee with message converter! Exiting.\n"); + return false; + } + if (!gst_element_link_many(tee_manager->queue2, sink_manager->sink, + NULL)) { + g_printerr( + "Could not link tee with video renderer! Exiting.\n"); return false; } } else { @@ -333,14 +451,31 @@ bool PipelineManager::setup_pipeline() { // gstds_example_manager->custom_plugin, tiler_manager->tiler, queue_array[2].queue, nv_video_convert_manager->nvvidconv, nv_osd_manager->nvosd, + tee_manager->tee, tee_manager->queue1, tee_manager->queue2, + nv_messgae_converter_manager->msgconv, + // nv_messgae_broker_manager->msgbroker, sink_manager->nvvidconv_postosd, sink_manager->caps, sink_manager->encoder, sink_manager->rtppay, sink_manager->sink, - NULL); + sink_manager->fake_sink, NULL); // Link the elements together: // file-source -> h264-parser -> nvh264-decoder -> // nvinfer -> nvvidconv -> nvosd -> nvvidconv_postosd -> // caps -> encoder -> rtppay -> udpsink + // if (!gst_element_link_many( // streammux_manager->streammux, + // SourceBin::nvmultiurisrcbin, + // // nv_video_convert_manager->nvvidconv, + // nv_infer_server_manager->primary_detector, + // nv_tracker_manager->tracker, + // face_nv_infer_server_manager->face_detector, + // // gstds_example_manager->custom_plugin, + // tiler_manager->tiler, nv_osd_manager->nvosd, + // sink_manager->nvvidconv_postosd, sink_manager->caps, + // sink_manager->encoder, sink_manager->rtppay, + // sink_manager->sink, NULL)) { + // g_printerr("Elements could not be linked.\n"); + // return false; + // } if (!gst_element_link_many( // streammux_manager->streammux, SourceBin::nvmultiurisrcbin, // nv_video_convert_manager->nvvidconv, @@ -349,14 +484,31 @@ bool PipelineManager::setup_pipeline() { face_nv_infer_server_manager->face_detector, // gstds_example_manager->custom_plugin, tiler_manager->tiler, nv_osd_manager->nvosd, - sink_manager->nvvidconv_postosd, sink_manager->caps, - sink_manager->encoder, sink_manager->rtppay, - sink_manager->sink, NULL)) { - g_printerr("Elements could not be linked.\n"); + tee_manager->tee, NULL)) { + g_printerr("Could not link source into tee!.\n"); + return false; + } + if (!gst_element_link_many(tee_manager->queue1, + nv_messgae_converter_manager->msgconv, + // nv_messgae_broker_manager->msgbroker, + sink_manager->fake_sink, NULL)) { + g_printerr( + "Could not link tee with message converter! Exiting.\n"); + return false; + } + if (!gst_element_link_many( + tee_manager->queue2, sink_manager->nvvidconv_postosd, + sink_manager->caps, sink_manager->encoder, + sink_manager->rtppay, sink_manager->sink, NULL)) { + g_printerr( + "Could not link tee with video renderer! Exiting.\n"); return false; } } } + if (!connect_tee_to_queue()) { + return false; + } return true; } @@ -436,6 +588,7 @@ bool PipelineManager::create_pipeline_elements(int num_sources, nv_messgae_converter_manager->create_message_converter(); nv_messgae_broker_manager->create_message_broker(); + tee_manager->create_tee(); /* Add queue elements between every two elements */ const char* base = "queue"; @@ -447,6 +600,7 @@ bool PipelineManager::create_pipeline_elements(int num_sources, nv_ds_logger_manager->create_nv_ds_logger(); sink_manager->create_sink(prop, rtsp_streaming_manager->host, rtsp_streaming_manager->updsink_port_num); + sink_manager->create_fake_sink(); nv_infer_server_manager->create_nv_infer_server(num_sources); // GstElement *nvinfer = gst_bin_get_by_name(GST_BIN(pipeline), @@ -527,6 +681,14 @@ bool PipelineManager::create_pipeline_elements(int num_sources, << "us" << std::endl; /* Out of the main loop, clean up nicely */ g_print("Returned, stopping playback \n"); + + /* Release the request pads from the tee, and unref them */ + gst_element_release_request_pad(tee_manager->tee, tee_manager->tee_msg_pad); + gst_element_release_request_pad(tee_manager->tee, + tee_manager->tee_render_pad); + gst_object_unref(tee_manager->tee_msg_pad); + gst_object_unref(tee_manager->tee_render_pad); + gst_element_set_state(pipeline, GST_STATE_NULL); g_print("Deleting pipeline \n"); gst_object_unref(GST_OBJECT(pipeline)); diff --git a/src/pipeline_manager.hpp b/src/pipeline_manager.hpp index 070514d..9689739 100644 --- a/src/pipeline_manager.hpp +++ b/src/pipeline_manager.hpp @@ -21,6 +21,7 @@ #include "sink_manager.hpp" #include "source_bin.hpp" #include "streammux_manager.hpp" +#include "tee_manager.hpp" #include "tiler_manager.hpp" class PipelineManager { @@ -45,6 +46,7 @@ class PipelineManager { new FaceNvInferServerManager(); NvMessageConverter *nv_messgae_converter_manager = new NvMessageConverter(); NvMessageBroker *nv_messgae_broker_manager = new NvMessageBroker(); + TeeManager *tee_manager = new TeeManager(); static double fps_buffer_probe; static double fps_probe; static double fps_osd; @@ -66,6 +68,7 @@ class PipelineManager { PipelineManager(int, char **); int create_pipeline(); bool create_pipeline_elements(int, char **); + bool connect_tee_to_queue(); bool setup_pipeline(); bool playing_pipeline(int, char **); bool status_playing; diff --git a/src/queue_manager.cpp b/src/queue_manager.cpp index daf1127..c1ea5fe 100644 --- a/src/queue_manager.cpp +++ b/src/queue_manager.cpp @@ -3,7 +3,6 @@ QueueManager::QueueManager() {} QueueManager::QueueManager(char* queue_name) { - name = queue_name; queue = gst_element_factory_make("queue", queue_name); } diff --git a/src/queue_manager.hpp b/src/queue_manager.hpp index 8ceead0..9956228 100644 --- a/src/queue_manager.hpp +++ b/src/queue_manager.hpp @@ -4,7 +4,6 @@ class QueueManager { private: public: GstElement* queue = NULL; - char* name; QueueManager(); QueueManager(char*); ~QueueManager(); diff --git a/src/sink_manager.cpp b/src/sink_manager.cpp index ad62e61..1385dae 100644 --- a/src/sink_manager.cpp +++ b/src/sink_manager.cpp @@ -11,6 +11,13 @@ SinkManager::SinkManager() { config.at("codec_rtsp_out").get_to(codec_rtsp_out); } +void SinkManager::create_fake_sink() { + fake_sink = gst_element_factory_make("fakesink", + "fakesink-converter-broker-branch"); + g_object_set(G_OBJECT(fake_sink), "qos", 0, "sync", FALSE, + NULL); //"name", "TEST", +} + bool SinkManager::create_sink(cudaDeviceProp prop, std::string host, guint updsink_port_num) { if (display_output == 0) { diff --git a/src/sink_manager.hpp b/src/sink_manager.hpp index aa1cb35..2d203b7 100644 --- a/src/sink_manager.hpp +++ b/src/sink_manager.hpp @@ -12,10 +12,11 @@ class SinkManager { public: GstElement *sink = NULL, *nvvidconv_postosd = NULL, *caps = NULL, - *encoder = NULL, *rtppay = NULL; + *encoder = NULL, *rtppay = NULL, *fake_sink = NULL; std::string output_sink, output_video_path; int display_output = 1, bitrate; SinkManager(); bool create_sink(cudaDeviceProp prop, std::string, guint); + void create_fake_sink(); ~SinkManager(); }; \ No newline at end of file diff --git a/src/tee_manager.cpp b/src/tee_manager.cpp new file mode 100644 index 0000000..a71facd --- /dev/null +++ b/src/tee_manager.cpp @@ -0,0 +1,52 @@ +#include "tee_manager.hpp" + +TeeManager::TeeManager() {} + +// Definition of static function +bool TeeManager::create_tee() { + /* Create tee to render buffer and send message simultaneously */ + tee = gst_element_factory_make("tee", "nvsink-tee"); + if (!tee) { + g_printerr("tee could not be created. Exiting.\n"); + return false; + } + + /* Create queues */ + queue1 = gst_element_factory_make("queue", "msg-queue"); + queue2 = gst_element_factory_make("queue", "video-render-queue"); + if (!queue1) { + g_printerr("queue1 could not be created. Exiting.\n"); + return false; + } + if (!queue2) { + g_printerr("queue2 could not be created. Exiting.\n"); + return false; + } + + return true; +} + +bool TeeManager::create_queue_pads() { + sink_pad1 = gst_element_get_static_pad(queue1, "sink"); + sink_pad2 = gst_element_get_static_pad(queue2, "sink"); + if (!sink_pad1 || !sink_pad2) { + g_printerr("Unable to get request pads\n"); + return false; + } + return true; +} + +bool TeeManager::create_tee_pads() { + tee_msg_pad = gst_element_request_pad_simple(tee, "src_%u"); + tee_render_pad = gst_element_request_pad_simple(tee, "src_%u"); + // Request pads: pads that do not exist until you ask for them. + // Some elements (like tee, nvstreammux dynamic sources, nvmsgconv) allow + // multiple outputs, but don’t create all the pads in advance. You ask the + // element: “Give me a new pad to connect to something. Return: a new + // GstPad* you can link to your downstream element. + if (!tee_msg_pad || !tee_render_pad) { + g_printerr("Unable to get request pads\n"); + return false; + } + return true; +} \ No newline at end of file diff --git a/src/tee_manager.hpp b/src/tee_manager.hpp new file mode 100644 index 0000000..5677e10 --- /dev/null +++ b/src/tee_manager.hpp @@ -0,0 +1,25 @@ +// #ifndef MYCLASS_H +// #define MYCLASS_H +#include +#include + +#include +#include + +// #include "queue_manager.hpp" + +class TeeManager { + public: + GstElement *tee = NULL, *queue1 = NULL, *queue2 = NULL; + GstPad *tee_render_pad = NULL, *tee_msg_pad = NULL, *sink_pad1 = NULL, + *src_pad = NULL, *sink_pad2 = NULL; + TeeManager(); + bool create_tee(); + ~TeeManager(); + bool create_queue_pads(); + bool create_tee_pads(); + + private: +}; + +// #endif // MYCLASS_H \ No newline at end of file