Integrate msgconverter video renderer via tee

This commit is contained in:
Barzan Hayati 2025-09-09 13:26:22 +00:00
parent 0a4f85859d
commit 50a1af0af7
11 changed files with 287 additions and 49 deletions

View File

@ -83,6 +83,7 @@ include_directories(${PROJECT_SOURCE_DIR}/face_nv_infer_server_manager.hpp)
include_directories(${PROJECT_SOURCE_DIR}/face_nv_infer_server_manager.hpp)
include_directories(${PROJECT_SOURCE_DIR}/nv_message_converter.hpp)
include_directories(${PROJECT_SOURCE_DIR}/nv_message_broker.hpp)
include_directories(${PROJECT_SOURCE_DIR}/tee_manager.hpp)
set(SOURCES src/main.cpp src/camera_manager.cpp src/pipeline_manager.cpp src/streammux_manager.cpp
@ -94,7 +95,7 @@ set(SOURCES src/main.cpp src/camera_manager.cpp src/pipeline_manager.cpp src/st
src/config_manager.cpp
src/nv_infer_server_manager.cpp src/nv_tracker_manager.cpp src/face_candid_trace.cpp
src/face_nv_infer_server_manager.cpp src/face_nv_infer_server_manager.cpp
src/nv_message_converter.cpp src/nv_message_broker.cpp)
src/nv_message_converter.cpp src/nv_message_broker.cpp src/tee_manager.cpp)
# missing initializer for member 'NvDsInferDims::d' [-Werror=missing-field-initializers] NvDsInferDims dims = {0};

View File

@ -1,6 +1,6 @@
[message-broker]
# Redis-specific options
hostname=192.168.130.13
hostname=localhost
port=6379
streamsize=10000
payloadkey=metadata

View File

@ -11,9 +11,10 @@ NvMessageConverter::NvMessageConverter() {
bool NvMessageConverter::create_message_converter() {
msgconv = gst_element_factory_make("nvmsgconv", "nvmsg-converter");
g_object_set(G_OBJECT(msgconv), "msg2p-lib", payload_generation_library,
g_object_set(G_OBJECT(msgconv), "msg2p-lib",
payload_generation_library.c_str(), NULL);
g_object_set(G_OBJECT(msgconv), "config", msgconv_config_file.c_str(),
NULL);
g_object_set(G_OBJECT(msgconv), "config", msgconv_config_file, NULL);
g_object_set(G_OBJECT(msgconv), "payload-type", 0,
NULL); // 0 = DeepStream schema, 1 = minimal schema
g_object_set(G_OBJECT(msgconv), "msg2p-newapi", 0,
@ -32,18 +33,6 @@ bool NvMessageConverter::create_message_converter() {
// "msgconv")); msg2p_meta = ds_test4_parse_meta_type(argv[1], "msgconv");
// g_print("msg2p_meta = %d\n", msg2p_meta);
// Set up the pipeline we add all elements into the pipeline
// gst_bin_add_many (GST_BIN (pipeline),
// source, h264parser, decoder, nvstreammux, pgie,
// nvvidconv, nvosd, tee, queue1, queue2, msgconv, msgbroker, sink,
// NULL);
// /* we link the elements together */
// /* file-source -> h264-parser -> nvh264-decoder -> nvstreammux ->
// * pgie -> nvvidconv -> nvosd -> tee -> video-renderer
// * |
// * |-> msgconv -> msgbroker */
if (!msgconv) {
g_printerr("Unable to create msgconv.Exiting.");
return false;

View File

@ -236,6 +236,34 @@ bool PipelineManager::check_playing_pipeline() {
}
}
bool PipelineManager::connect_tee_to_queue() {
tee_manager->create_queue_pads();
tee_manager->create_tee_pads();
// GstCaps *src_caps = gst_pad_query_caps(tee_manager->tee_msg_pad, NULL);
// GstCaps *sink_caps = gst_pad_query_caps(tee_manager->sink_pad1, NULL);
// g_print("tee src caps: %s\n", gst_caps_to_string(src_caps));
// g_print("queue1 sink caps: %s\n", gst_caps_to_string(sink_caps));
if (gst_pad_link(tee_manager->tee_msg_pad, tee_manager->sink_pad1) !=
GST_PAD_LINK_OK) {
g_printerr("Unable to link tee and message converter\n");
gst_object_unref(tee_manager->sink_pad1);
return false;
}
gst_object_unref(tee_manager->sink_pad1);
if (gst_pad_link(tee_manager->tee_render_pad, tee_manager->sink_pad2) !=
GST_PAD_LINK_OK) {
g_printerr("Unable to link tee and render\n");
gst_object_unref(tee_manager->sink_pad2);
return false;
}
gst_object_unref(tee_manager->sink_pad2);
return true;
}
bool PipelineManager::setup_pipeline() {
/* Set up the pipeline */
/* add all elements into the pipeline */
@ -244,18 +272,33 @@ bool PipelineManager::setup_pipeline() {
// is dsexample pluging
if (dynamic_add_remove == false) {
if (sink_manager->display_output < 3) {
gst_bin_add_many(GST_BIN(pipeline),
nv_infer_server_manager->primary_detector,
nv_tracker_manager->tracker,
face_nv_infer_server_manager->face_detector,
// gstds_example_manager->custom_plugin,
tiler_manager->tiler, queue_array[2].queue,
nv_video_convert_manager->nvvidconv,
nv_osd_manager->nvosd, sink_manager->sink, NULL);
gst_bin_add_many(
GST_BIN(pipeline), nv_infer_server_manager->primary_detector,
nv_tracker_manager->tracker,
face_nv_infer_server_manager->face_detector,
// gstds_example_manager->custom_plugin,
tiler_manager->tiler, queue_array[2].queue,
nv_video_convert_manager->nvvidconv, nv_osd_manager->nvosd,
tee_manager->tee, tee_manager->queue1, tee_manager->queue2,
nv_messgae_converter_manager->msgconv,
// nv_messgae_broker_manager->msgbroker,
sink_manager->fake_sink, sink_manager->sink, NULL);
/* we link the elements together
* nvstreammux -> nvinfer -> nvtiler -> nvvidconv -> nvosd ->
* video-renderer */
// if (!gst_element_link_many(
// streammux_manager->streammux,
// nv_video_convert_manager->nvvidconv,
// nv_infer_server_manager->primary_detector,
// nv_tracker_manager->tracker,
// face_nv_infer_server_manager->face_detector,
// // gstds_example_manager->custom_plugin,
// tiler_manager->tiler, nv_osd_manager->nvosd,
// sink_manager->sink, NULL)) {
// g_printerr("Elements could not be linked.\n");
// return false;
// }
if (!gst_element_link_many(
streammux_manager->streammux,
nv_video_convert_manager->nvvidconv,
@ -264,8 +307,22 @@ bool PipelineManager::setup_pipeline() {
face_nv_infer_server_manager->face_detector,
// gstds_example_manager->custom_plugin,
tiler_manager->tiler, nv_osd_manager->nvosd,
sink_manager->sink, NULL)) {
g_printerr("Elements could not be linked.\n");
tee_manager->tee, NULL)) {
g_printerr("Could not link source into tee!.\n");
return false;
}
if (!gst_element_link_many(tee_manager->queue1,
nv_messgae_converter_manager->msgconv,
// nv_messgae_broker_manager->msgbroker,
sink_manager->fake_sink, NULL)) {
g_printerr(
"Could not link tee with message converter! Exiting.\n");
return false;
}
if (!gst_element_link_many(tee_manager->queue2, sink_manager->sink,
NULL)) {
g_printerr(
"Could not link tee with video renderer! Exiting.\n");
return false;
}
} else {
@ -276,14 +333,31 @@ bool PipelineManager::setup_pipeline() {
// gstds_example_manager->custom_plugin,
tiler_manager->tiler, queue_array[2].queue,
nv_video_convert_manager->nvvidconv, nv_osd_manager->nvosd,
tee_manager->tee, tee_manager->queue1, tee_manager->queue2,
nv_messgae_converter_manager->msgconv,
// nv_messgae_broker_manager->msgbroker,
sink_manager->nvvidconv_postosd, sink_manager->caps,
sink_manager->encoder, sink_manager->rtppay, sink_manager->sink,
NULL);
sink_manager->fake_sink, NULL);
// Link the elements together:
// file-source -> h264-parser -> nvh264-decoder ->
// nvinfer -> nvvidconv -> nvosd -> nvvidconv_postosd ->
// caps -> encoder -> rtppay -> udpsink
// if (!gst_element_link_many(
// streammux_manager->streammux,
// nv_video_convert_manager->nvvidconv,
// nv_infer_server_manager->primary_detector,
// nv_tracker_manager->tracker,
// face_nv_infer_server_manager->face_detector,
// // gstds_example_manager->custom_plugin,
// tiler_manager->tiler, nv_osd_manager->nvosd,
// sink_manager->nvvidconv_postosd, sink_manager->caps,
// sink_manager->encoder, sink_manager->rtppay,
// sink_manager->sink, NULL)) {
// g_printerr("Elements could not be linked.\n");
// return false;
// }
if (!gst_element_link_many(
streammux_manager->streammux,
nv_video_convert_manager->nvvidconv,
@ -292,27 +366,56 @@ bool PipelineManager::setup_pipeline() {
face_nv_infer_server_manager->face_detector,
// gstds_example_manager->custom_plugin,
tiler_manager->tiler, nv_osd_manager->nvosd,
sink_manager->nvvidconv_postosd, sink_manager->caps,
sink_manager->encoder, sink_manager->rtppay,
sink_manager->sink, NULL)) {
g_printerr("Elements could not be linked.\n");
tee_manager->tee, NULL)) {
g_printerr("Could not link source into tee!.\n");
return false;
}
if (!gst_element_link_many(tee_manager->queue1,
nv_messgae_converter_manager->msgconv,
// nv_messgae_broker_manager->msgbroker,
sink_manager->fake_sink, NULL)) {
g_printerr(
"Could not link tee with message converter! Exiting.\n");
return false;
}
if (!gst_element_link_many(
tee_manager->queue2, sink_manager->nvvidconv_postosd,
sink_manager->caps, sink_manager->encoder,
sink_manager->rtppay, sink_manager->sink, NULL)) {
g_printerr(
"Could not link tee with video renderer! Exiting.\n");
return false;
}
}
} else {
if (sink_manager->display_output < 3) {
gst_bin_add_many(GST_BIN(pipeline),
nv_infer_server_manager->primary_detector,
nv_tracker_manager->tracker,
face_nv_infer_server_manager->face_detector,
// gstds_example_manager->custom_plugin,
tiler_manager->tiler, queue_array[2].queue,
nv_video_convert_manager->nvvidconv,
nv_osd_manager->nvosd, sink_manager->sink, NULL);
gst_bin_add_many(
GST_BIN(pipeline), nv_infer_server_manager->primary_detector,
nv_tracker_manager->tracker,
face_nv_infer_server_manager->face_detector,
// gstds_example_manager->custom_plugin,
tiler_manager->tiler, queue_array[2].queue,
nv_video_convert_manager->nvvidconv, nv_osd_manager->nvosd,
tee_manager->tee, tee_manager->queue1, tee_manager->queue2,
nv_messgae_converter_manager->msgconv,
// nv_messgae_broker_manager->msgbroker,
sink_manager->fake_sink, sink_manager->sink, NULL);
/* we link the elements together
* nvstreammux -> nvinfer -> nvtiler -> nvvidconv -> nvosd ->
* video-renderer */
// if (!gst_element_link_many( // streammux_manager->streammux,
// SourceBin::nvmultiurisrcbin,
// // nv_video_convert_manager->nvvidconv,
// nv_infer_server_manager->primary_detector,
// nv_tracker_manager->tracker,
// face_nv_infer_server_manager->face_detector,
// // gstds_example_manager->custom_plugin,
// tiler_manager->tiler, nv_osd_manager->nvosd,
// sink_manager->sink, NULL)) {
// g_printerr("Elements could not be linked.\n");
// return false;
// }
if (!gst_element_link_many( // streammux_manager->streammux,
SourceBin::nvmultiurisrcbin,
// nv_video_convert_manager->nvvidconv,
@ -321,8 +424,23 @@ bool PipelineManager::setup_pipeline() {
face_nv_infer_server_manager->face_detector,
// gstds_example_manager->custom_plugin,
tiler_manager->tiler, nv_osd_manager->nvosd,
sink_manager->sink, NULL)) {
g_printerr("Elements could not be linked.\n");
tee_manager->tee, NULL)) {
g_printerr("Could not link source into tee!.\n");
return false;
}
if (!gst_element_link_many(tee_manager->queue1,
nv_messgae_converter_manager->msgconv,
// nv_messgae_broker_manager->msgbroker,
sink_manager->fake_sink, NULL)) {
g_printerr(
"Could not link tee with message converter! Exiting.\n");
return false;
}
if (!gst_element_link_many(tee_manager->queue2, sink_manager->sink,
NULL)) {
g_printerr(
"Could not link tee with video renderer! Exiting.\n");
return false;
}
} else {
@ -333,14 +451,31 @@ bool PipelineManager::setup_pipeline() {
// gstds_example_manager->custom_plugin,
tiler_manager->tiler, queue_array[2].queue,
nv_video_convert_manager->nvvidconv, nv_osd_manager->nvosd,
tee_manager->tee, tee_manager->queue1, tee_manager->queue2,
nv_messgae_converter_manager->msgconv,
// nv_messgae_broker_manager->msgbroker,
sink_manager->nvvidconv_postosd, sink_manager->caps,
sink_manager->encoder, sink_manager->rtppay, sink_manager->sink,
NULL);
sink_manager->fake_sink, NULL);
// Link the elements together:
// file-source -> h264-parser -> nvh264-decoder ->
// nvinfer -> nvvidconv -> nvosd -> nvvidconv_postosd ->
// caps -> encoder -> rtppay -> udpsink
// if (!gst_element_link_many( // streammux_manager->streammux,
// SourceBin::nvmultiurisrcbin,
// // nv_video_convert_manager->nvvidconv,
// nv_infer_server_manager->primary_detector,
// nv_tracker_manager->tracker,
// face_nv_infer_server_manager->face_detector,
// // gstds_example_manager->custom_plugin,
// tiler_manager->tiler, nv_osd_manager->nvosd,
// sink_manager->nvvidconv_postosd, sink_manager->caps,
// sink_manager->encoder, sink_manager->rtppay,
// sink_manager->sink, NULL)) {
// g_printerr("Elements could not be linked.\n");
// return false;
// }
if (!gst_element_link_many( // streammux_manager->streammux,
SourceBin::nvmultiurisrcbin,
// nv_video_convert_manager->nvvidconv,
@ -349,14 +484,31 @@ bool PipelineManager::setup_pipeline() {
face_nv_infer_server_manager->face_detector,
// gstds_example_manager->custom_plugin,
tiler_manager->tiler, nv_osd_manager->nvosd,
sink_manager->nvvidconv_postosd, sink_manager->caps,
sink_manager->encoder, sink_manager->rtppay,
sink_manager->sink, NULL)) {
g_printerr("Elements could not be linked.\n");
tee_manager->tee, NULL)) {
g_printerr("Could not link source into tee!.\n");
return false;
}
if (!gst_element_link_many(tee_manager->queue1,
nv_messgae_converter_manager->msgconv,
// nv_messgae_broker_manager->msgbroker,
sink_manager->fake_sink, NULL)) {
g_printerr(
"Could not link tee with message converter! Exiting.\n");
return false;
}
if (!gst_element_link_many(
tee_manager->queue2, sink_manager->nvvidconv_postosd,
sink_manager->caps, sink_manager->encoder,
sink_manager->rtppay, sink_manager->sink, NULL)) {
g_printerr(
"Could not link tee with video renderer! Exiting.\n");
return false;
}
}
}
if (!connect_tee_to_queue()) {
return false;
}
return true;
}
@ -436,6 +588,7 @@ bool PipelineManager::create_pipeline_elements(int num_sources,
nv_messgae_converter_manager->create_message_converter();
nv_messgae_broker_manager->create_message_broker();
tee_manager->create_tee();
/* Add queue elements between every two elements */
const char* base = "queue";
@ -447,6 +600,7 @@ bool PipelineManager::create_pipeline_elements(int num_sources,
nv_ds_logger_manager->create_nv_ds_logger();
sink_manager->create_sink(prop, rtsp_streaming_manager->host,
rtsp_streaming_manager->updsink_port_num);
sink_manager->create_fake_sink();
nv_infer_server_manager->create_nv_infer_server(num_sources);
// GstElement *nvinfer = gst_bin_get_by_name(GST_BIN(pipeline),
@ -527,6 +681,14 @@ bool PipelineManager::create_pipeline_elements(int num_sources,
<< "us" << std::endl;
/* Out of the main loop, clean up nicely */
g_print("Returned, stopping playback \n");
/* Release the request pads from the tee, and unref them */
gst_element_release_request_pad(tee_manager->tee, tee_manager->tee_msg_pad);
gst_element_release_request_pad(tee_manager->tee,
tee_manager->tee_render_pad);
gst_object_unref(tee_manager->tee_msg_pad);
gst_object_unref(tee_manager->tee_render_pad);
gst_element_set_state(pipeline, GST_STATE_NULL);
g_print("Deleting pipeline \n");
gst_object_unref(GST_OBJECT(pipeline));

View File

@ -21,6 +21,7 @@
#include "sink_manager.hpp"
#include "source_bin.hpp"
#include "streammux_manager.hpp"
#include "tee_manager.hpp"
#include "tiler_manager.hpp"
class PipelineManager {
@ -45,6 +46,7 @@ class PipelineManager {
new FaceNvInferServerManager();
NvMessageConverter *nv_messgae_converter_manager = new NvMessageConverter();
NvMessageBroker *nv_messgae_broker_manager = new NvMessageBroker();
TeeManager *tee_manager = new TeeManager();
static double fps_buffer_probe;
static double fps_probe;
static double fps_osd;
@ -66,6 +68,7 @@ class PipelineManager {
PipelineManager(int, char **);
int create_pipeline();
bool create_pipeline_elements(int, char **);
bool connect_tee_to_queue();
bool setup_pipeline();
bool playing_pipeline(int, char **);
bool status_playing;

View File

@ -3,7 +3,6 @@
QueueManager::QueueManager() {}
QueueManager::QueueManager(char* queue_name) {
name = queue_name;
queue = gst_element_factory_make("queue", queue_name);
}

View File

@ -4,7 +4,6 @@ class QueueManager {
private:
public:
GstElement* queue = NULL;
char* name;
QueueManager();
QueueManager(char*);
~QueueManager();

View File

@ -11,6 +11,13 @@ SinkManager::SinkManager() {
config.at("codec_rtsp_out").get_to(codec_rtsp_out);
}
void SinkManager::create_fake_sink() {
fake_sink = gst_element_factory_make("fakesink",
"fakesink-converter-broker-branch");
g_object_set(G_OBJECT(fake_sink), "qos", 0, "sync", FALSE,
NULL); //"name", "TEST",
}
bool SinkManager::create_sink(cudaDeviceProp prop, std::string host,
guint updsink_port_num) {
if (display_output == 0) {

View File

@ -12,10 +12,11 @@ class SinkManager {
public:
GstElement *sink = NULL, *nvvidconv_postosd = NULL, *caps = NULL,
*encoder = NULL, *rtppay = NULL;
*encoder = NULL, *rtppay = NULL, *fake_sink = NULL;
std::string output_sink, output_video_path;
int display_output = 1, bitrate;
SinkManager();
bool create_sink(cudaDeviceProp prop, std::string, guint);
void create_fake_sink();
~SinkManager();
};

52
src/tee_manager.cpp Normal file
View File

@ -0,0 +1,52 @@
#include "tee_manager.hpp"
TeeManager::TeeManager() {}
// Definition of static function
bool TeeManager::create_tee() {
/* Create tee to render buffer and send message simultaneously */
tee = gst_element_factory_make("tee", "nvsink-tee");
if (!tee) {
g_printerr("tee could not be created. Exiting.\n");
return false;
}
/* Create queues */
queue1 = gst_element_factory_make("queue", "msg-queue");
queue2 = gst_element_factory_make("queue", "video-render-queue");
if (!queue1) {
g_printerr("queue1 could not be created. Exiting.\n");
return false;
}
if (!queue2) {
g_printerr("queue2 could not be created. Exiting.\n");
return false;
}
return true;
}
bool TeeManager::create_queue_pads() {
sink_pad1 = gst_element_get_static_pad(queue1, "sink");
sink_pad2 = gst_element_get_static_pad(queue2, "sink");
if (!sink_pad1 || !sink_pad2) {
g_printerr("Unable to get request pads\n");
return false;
}
return true;
}
bool TeeManager::create_tee_pads() {
tee_msg_pad = gst_element_request_pad_simple(tee, "src_%u");
tee_render_pad = gst_element_request_pad_simple(tee, "src_%u");
// Request pads: pads that do not exist until you ask for them.
// Some elements (like tee, nvstreammux dynamic sources, nvmsgconv) allow
// multiple outputs, but dont create all the pads in advance. You ask the
// element: “Give me a new pad to connect to something. Return: a new
// GstPad* you can link to your downstream element.
if (!tee_msg_pad || !tee_render_pad) {
g_printerr("Unable to get request pads\n");
return false;
}
return true;
}

25
src/tee_manager.hpp Normal file
View File

@ -0,0 +1,25 @@
// #ifndef MYCLASS_H
// #define MYCLASS_H
#include <glib.h>
#include <gst/gst.h>
#include <fstream>
#include <iostream>
// #include "queue_manager.hpp"
class TeeManager {
public:
GstElement *tee = NULL, *queue1 = NULL, *queue2 = NULL;
GstPad *tee_render_pad = NULL, *tee_msg_pad = NULL, *sink_pad1 = NULL,
*src_pad = NULL, *sink_pad2 = NULL;
TeeManager();
bool create_tee();
~TeeManager();
bool create_queue_pads();
bool create_tee_pads();
private:
};
// #endif // MYCLASS_H