Integrate msgconverter msgbroker via tee

This commit is contained in:
Barzan Hayati 2025-09-09 14:46:45 +00:00
parent 50a1af0af7
commit 9e9b645b95
3 changed files with 27 additions and 30 deletions

View File

@ -62,8 +62,8 @@
"msgbroker": {
"msgbroker_config_file": "../data/nvmsgboker_configs/msgbroker_config.txt",
"protocol_adaptor_library": "../data/nvmsgboker_configs/libnvds_redis_proto.so",
"redis_broker_host": "ABC",
"redis_broker_port": 1234,
"redis_broker_host": "localhost",
"redis_broker_port": 6379,
"topic_redis": "redis_stream"
}
}

View File

@ -13,16 +13,19 @@ NvMessageBroker::NvMessageBroker() {
bool NvMessageBroker::create_message_broker() {
msgbroker = gst_element_factory_make("nvmsgbroker", "nvmsg-broker");
g_object_set(G_OBJECT(msgbroker), "config", msgbroker_config_file, NULL);
g_object_set(G_OBJECT(msgbroker), "proto-lib",
protocol_adaptor_library.c_str(), "conn-str", conn_str.c_str(),
"sync", FALSE, NULL);
g_object_set(G_OBJECT(msgbroker), "config", msgbroker_config_file.c_str(),
NULL);
// nvmsgbroker looks first at the --conn-str (or conn-str property).
// If its not provided, and you gave a --cfg-file (or config property), it
// will read hostname and port from the config file. If you set both, the
// conn-str overrides the file values. g_object_set (G_OBJECT (msgbroker),
// "conn-str", conn_str, NULL);
g_object_set(G_OBJECT(msgbroker), "proto-lib", protocol_adaptor_library,
"sync", FALSE, NULL);
g_object_set(G_OBJECT(msgbroker), "topic", topic_redis, NULL);
g_object_set(G_OBJECT(msgbroker), "topic", topic_redis.c_str(), NULL);
if (!msgbroker) {
g_printerr("Unable to create msgbroker.Exiting.");

View File

@ -281,8 +281,7 @@ bool PipelineManager::setup_pipeline() {
nv_video_convert_manager->nvvidconv, nv_osd_manager->nvosd,
tee_manager->tee, tee_manager->queue1, tee_manager->queue2,
nv_messgae_converter_manager->msgconv,
// nv_messgae_broker_manager->msgbroker,
sink_manager->fake_sink, sink_manager->sink, NULL);
nv_messgae_broker_manager->msgbroker, sink_manager->sink, NULL);
/* we link the elements together
* nvstreammux -> nvinfer -> nvtiler -> nvvidconv -> nvosd ->
@ -311,10 +310,9 @@ bool PipelineManager::setup_pipeline() {
g_printerr("Could not link source into tee!.\n");
return false;
}
if (!gst_element_link_many(tee_manager->queue1,
nv_messgae_converter_manager->msgconv,
// nv_messgae_broker_manager->msgbroker,
sink_manager->fake_sink, NULL)) {
if (!gst_element_link_many(
tee_manager->queue1, nv_messgae_converter_manager->msgconv,
nv_messgae_broker_manager->msgbroker, NULL)) {
g_printerr(
"Could not link tee with message converter! Exiting.\n");
return false;
@ -335,10 +333,10 @@ bool PipelineManager::setup_pipeline() {
nv_video_convert_manager->nvvidconv, nv_osd_manager->nvosd,
tee_manager->tee, tee_manager->queue1, tee_manager->queue2,
nv_messgae_converter_manager->msgconv,
// nv_messgae_broker_manager->msgbroker,
nv_messgae_broker_manager->msgbroker,
sink_manager->nvvidconv_postosd, sink_manager->caps,
sink_manager->encoder, sink_manager->rtppay, sink_manager->sink,
sink_manager->fake_sink, NULL);
NULL);
// Link the elements together:
// file-source -> h264-parser -> nvh264-decoder ->
@ -370,10 +368,9 @@ bool PipelineManager::setup_pipeline() {
g_printerr("Could not link source into tee!.\n");
return false;
}
if (!gst_element_link_many(tee_manager->queue1,
nv_messgae_converter_manager->msgconv,
// nv_messgae_broker_manager->msgbroker,
sink_manager->fake_sink, NULL)) {
if (!gst_element_link_many(
tee_manager->queue1, nv_messgae_converter_manager->msgconv,
nv_messgae_broker_manager->msgbroker, NULL)) {
g_printerr(
"Could not link tee with message converter! Exiting.\n");
return false;
@ -398,8 +395,7 @@ bool PipelineManager::setup_pipeline() {
nv_video_convert_manager->nvvidconv, nv_osd_manager->nvosd,
tee_manager->tee, tee_manager->queue1, tee_manager->queue2,
nv_messgae_converter_manager->msgconv,
// nv_messgae_broker_manager->msgbroker,
sink_manager->fake_sink, sink_manager->sink, NULL);
nv_messgae_broker_manager->msgbroker, sink_manager->sink, NULL);
/* we link the elements together
* nvstreammux -> nvinfer -> nvtiler -> nvvidconv -> nvosd ->
@ -429,10 +425,9 @@ bool PipelineManager::setup_pipeline() {
return false;
}
if (!gst_element_link_many(tee_manager->queue1,
nv_messgae_converter_manager->msgconv,
// nv_messgae_broker_manager->msgbroker,
sink_manager->fake_sink, NULL)) {
if (!gst_element_link_many(
tee_manager->queue1, nv_messgae_converter_manager->msgconv,
nv_messgae_broker_manager->msgbroker, NULL)) {
g_printerr(
"Could not link tee with message converter! Exiting.\n");
return false;
@ -453,10 +448,10 @@ bool PipelineManager::setup_pipeline() {
nv_video_convert_manager->nvvidconv, nv_osd_manager->nvosd,
tee_manager->tee, tee_manager->queue1, tee_manager->queue2,
nv_messgae_converter_manager->msgconv,
// nv_messgae_broker_manager->msgbroker,
nv_messgae_broker_manager->msgbroker,
sink_manager->nvvidconv_postosd, sink_manager->caps,
sink_manager->encoder, sink_manager->rtppay, sink_manager->sink,
sink_manager->fake_sink, NULL);
NULL);
// Link the elements together:
// file-source -> h264-parser -> nvh264-decoder ->
@ -488,10 +483,9 @@ bool PipelineManager::setup_pipeline() {
g_printerr("Could not link source into tee!.\n");
return false;
}
if (!gst_element_link_many(tee_manager->queue1,
nv_messgae_converter_manager->msgconv,
// nv_messgae_broker_manager->msgbroker,
sink_manager->fake_sink, NULL)) {
if (!gst_element_link_many(
tee_manager->queue1, nv_messgae_converter_manager->msgconv,
nv_messgae_broker_manager->msgbroker, NULL)) {
g_printerr(
"Could not link tee with message converter! Exiting.\n");
return false;