Add source probe msgconv sink probe msgbroker

This commit is contained in:
Barzan Hayati 2025-09-14 14:43:39 +00:00
parent 5dde223640
commit b7e93c949e
7 changed files with 163 additions and 22 deletions

View File

@ -162,4 +162,5 @@ target_link_libraries(${PROJECT_NAME} nvdsgst_infer nvds_meta nvds_inferutils
prometheus-cpp-core prometheus-cpp-pull # prometheus-cpp-exposer nvdsgst_metnvdsa
microhttpd
nvdsgst_nvmultiurisrcbin
nvds_batch_jpegenc)
nvds_batch_jpegenc
nvds_msgbroker nvds_msgconv nvds_meta)

View File

@ -1,7 +1,7 @@
#include "nv_message_broker.hpp"
NvMessageBroker::NvMessageBroker() {
const auto& config = ConfigManager::get_instance().get_config();
const auto &config = ConfigManager::get_instance().get_config();
msgbroker_config_file = config["msgbroker"]["msgbroker_config_file"];
protocol_adaptor_library = config["msgbroker"]["protocol_adaptor_library"];
@ -33,4 +33,97 @@ bool NvMessageBroker::create_message_broker() {
return false;
}
return true;
}
void NvMessageBroker::attach_probe_to_sink_msgbroker() {
GstPad *sink_pad = gst_element_get_static_pad(msgbroker, "sink");
if (!sink_pad) {
std::cerr << "Unable to get sink_pad sink pad\n";
return;
}
gst_pad_add_probe(sink_pad, GST_PAD_PROBE_TYPE_BUFFER,
broker_sink_pad_probe, NULL, NULL);
gst_object_unref(sink_pad);
}
GstPadProbeReturn NvMessageBroker::broker_sink_pad_probe(GstPad *pad,
GstPadProbeInfo *info,
gpointer user_data) {
(void)pad;
(void)user_data;
GstBuffer *buf = GST_PAD_PROBE_INFO_BUFFER(info);
if (!buf) return GST_PAD_PROBE_OK;
// Iterate metadata in the buffer
NvDsMetaList *l_frame = NULL;
NvDsMetaList *l_user = NULL;
NvDsBatchMeta *batch_meta = gst_buffer_get_nvds_batch_meta(buf);
if (!batch_meta) return GST_PAD_PROBE_OK;
for (l_frame = batch_meta->frame_meta_list; l_frame != NULL;
l_frame = l_frame->next) {
NvDsFrameMeta *frame_meta = (NvDsFrameMeta *)(l_frame->data);
for (l_user = frame_meta->frame_user_meta_list; l_user != NULL;
l_user = l_user->next) {
NvDsUserMeta *user_meta = (NvDsUserMeta *)(l_user->data);
if (user_meta &&
user_meta->base_meta.meta_type == NVDS_EVENT_MSG_META) {
NvDsEventMsgMeta *msg_meta =
(NvDsEventMsgMeta *)user_meta->user_meta_data;
if (msg_meta && msg_meta->extMsg != NULL) {
// You can inspect or pretty-print the JSON payload
g_print("Broker Probe 1: JSON payload: %s\n",
(char *)msg_meta->extMsg);
}
}
}
}
// Also inspect per-frame metas (some code attaches to frame_user_meta_list)
for (NvDsMetaList *lf = batch_meta->frame_meta_list; lf; lf = lf->next) {
NvDsFrameMeta *fmeta = (NvDsFrameMeta *)lf->data;
if (!fmeta) continue;
for (NvDsMetaList *l = fmeta->frame_user_meta_list; l; l = l->next) {
NvDsUserMeta *um = (NvDsUserMeta *)l->data;
if (!um) continue;
// g_print("[nvmsgconv probe] frame %d user meta type=%s ptr=%p\n",
// fmeta->frame_num,
// metaTypeToString(um->base_meta.meta_type),
// (void*)um->user_meta_data);
if (um->base_meta.meta_type == NVDS_EVENT_MSG_META) {
NvDsEventMsgMeta *m = (NvDsEventMsgMeta *)um->user_meta_data;
if (!m) continue;
if (m && m->extMsg != NULL) {
// You can inspect or pretty-print the JSON payload
g_print("Broker Probe 2: JSON payload: %s\n",
(char *)m->extMsg);
}
// g_print("frame-level event msg objClassId=%d objectId=%s
// componentId=%d trackingId=%ld confidence=%f ptr=%p frameId=%"
// G_GINT64_FORMAT
// "\n",
// m->objClassId, m->objectId, m->componentId,
// m->trackingId, m->confidence, (void *)m,
// (gint64)m->frameId);
// g_print("ts_ptr=%p\n", (void *)m->ts);
// g_print("ts_ptr=%p\n", (void *)m->ts);
// if (m->ts && safe_string_print(m->ts, 256)) {
// g_print("ts: %s\n", m->ts);
// } else if (m->ts) {
// g_print("ts suspicious - not printing\n");
// } else {
// g_print("ts=NULL\n");
// }
}
}
}
return GST_PAD_PROBE_OK;
}

View File

@ -4,6 +4,8 @@
#include <iostream>
#include "config_manager.hpp"
#include "gstnvdsmeta.h"
#include "nvdsmeta_schema.h"
class NvMessageBroker {
private:
@ -16,4 +18,7 @@ class NvMessageBroker {
NvMessageBroker();
bool create_message_broker();
~NvMessageBroker();
void attach_probe_to_sink_msgbroker();
static GstPadProbeReturn broker_sink_pad_probe(GstPad *, GstPadProbeInfo *,
gpointer);
};

View File

@ -127,14 +127,13 @@ void NvMessageConverter::attach_probe_to_sink_msgconv() {
return;
}
gst_pad_add_probe(sink_pad, GST_PAD_PROBE_TYPE_BUFFER, nvmsgconv_probe_cb,
NULL, NULL);
gst_pad_add_probe(sink_pad, GST_PAD_PROBE_TYPE_BUFFER,
nvmsgconv_probe_cb_sink, NULL, NULL);
gst_object_unref(sink_pad);
}
GstPadProbeReturn NvMessageConverter::nvmsgconv_probe_cb(GstPad *pad,
GstPadProbeInfo *info,
gpointer user_data) {
GstPadProbeReturn NvMessageConverter::nvmsgconv_probe_cb_sink(
GstPad *pad, GstPadProbeInfo *info, gpointer user_data) {
(void)pad;
(void)user_data;
GstBuffer *buf = GST_PAD_PROBE_INFO_BUFFER(info);
@ -236,9 +235,13 @@ GstPadProbeReturn NvMessageConverter::nvmsgconv_probe_cb(GstPad *pad,
if (um->base_meta.meta_type == NVDS_EVENT_MSG_META) {
NvDsEventMsgMeta *m = (NvDsEventMsgMeta *)um->user_meta_data;
if (!m) continue;
g_print("frame-level event msg ptr=%p frameId=%" G_GINT64_FORMAT
"\n",
(void *)m, (gint64)m->frameId);
g_print(
"frame-level event msg objClassId=%d objectId=%s "
"componentId=%d trackingId=%ld confidence=%f ptr=%p "
"frameId=%" G_GINT64_FORMAT "\n",
m->objClassId, m->objectId, m->componentId, m->trackingId,
m->confidence, (void *)m, (gint64)m->frameId);
g_print("ts_ptr=%p\n", (void *)m->ts);
if (m->ts && safe_string_print(m->ts, 256)) {
g_print("ts: %s\n", m->ts);
@ -252,3 +255,36 @@ GstPadProbeReturn NvMessageConverter::nvmsgconv_probe_cb(GstPad *pad,
}
return GST_PAD_PROBE_OK;
}
void NvMessageConverter::attach_probe_to_src_msgconv() {
GstPad *src_pad = gst_element_get_static_pad(msgconv, "src");
if (!src_pad) {
std::cerr << "Unable to get src_pad sink pad\n";
return;
}
gst_pad_add_probe(src_pad, GST_PAD_PROBE_TYPE_BUFFER,
nvmsgconv_probe_cb_src, NULL, NULL);
gst_object_unref(src_pad);
}
// Probe callback to inspect JSON messages coming out of nvmsgconv
GstPadProbeReturn NvMessageConverter::nvmsgconv_probe_cb_src(
GstPad *pad, GstPadProbeInfo *info, gpointer user_data) {
(void)pad;
(void)user_data;
if (!(info->type & GST_PAD_PROBE_TYPE_BUFFER)) return GST_PAD_PROBE_OK;
GstBuffer *buf = GST_PAD_PROBE_INFO_BUFFER(info);
if (!buf) return GST_PAD_PROBE_OK;
// Map buffer to system memory
GstMapInfo map;
if (gst_buffer_map(buf, &map, GST_MAP_READ)) {
// nvmsgconv outputs application/json
std::string json_str(reinterpret_cast<char *>(map.data), map.size);
g_print("nvmsgconv JSON:\n%s\n", json_str.c_str());
gst_buffer_unmap(buf, &map);
}
return GST_PAD_PROBE_OK;
}

View File

@ -18,6 +18,10 @@ class NvMessageConverter {
bool create_message_converter();
~NvMessageConverter();
void attach_probe_to_sink_msgconv();
static GstPadProbeReturn nvmsgconv_probe_cb(GstPad *, GstPadProbeInfo *,
gpointer);
static GstPadProbeReturn nvmsgconv_probe_cb_sink(GstPad *,
GstPadProbeInfo *,
gpointer);
void attach_probe_to_src_msgconv();
static GstPadProbeReturn nvmsgconv_probe_cb_src(GstPad *, GstPadProbeInfo *,
gpointer);
};

View File

@ -282,7 +282,7 @@ bool PipelineManager::setup_pipeline() {
nv_video_convert_manager->nvvidconv, nv_osd_manager->nvosd,
tee_manager->tee, tee_manager->queue1, tee_manager->queue2,
nv_message_converter_manager->msgconv,
nv_messgae_broker_manager->msgbroker, sink_manager->sink, NULL);
nv_message_broker_manager->msgbroker, sink_manager->sink, NULL);
/* we link the elements together
* nvstreammux -> nvinfer -> nvtiler -> nvvidconv -> nvosd ->
@ -313,7 +313,7 @@ bool PipelineManager::setup_pipeline() {
}
if (!gst_element_link_many(
tee_manager->queue1, nv_message_converter_manager->msgconv,
nv_messgae_broker_manager->msgbroker, NULL)) {
nv_message_broker_manager->msgbroker, NULL)) {
g_printerr(
"Could not link tee with message converter! Exiting.\n");
return false;
@ -334,7 +334,7 @@ bool PipelineManager::setup_pipeline() {
nv_video_convert_manager->nvvidconv, nv_osd_manager->nvosd,
tee_manager->tee, tee_manager->queue1, tee_manager->queue2,
nv_message_converter_manager->msgconv,
nv_messgae_broker_manager->msgbroker,
nv_message_broker_manager->msgbroker,
sink_manager->nvvidconv_postosd, sink_manager->caps,
sink_manager->encoder, sink_manager->rtppay, sink_manager->sink,
NULL);
@ -371,7 +371,7 @@ bool PipelineManager::setup_pipeline() {
}
if (!gst_element_link_many(
tee_manager->queue1, nv_message_converter_manager->msgconv,
nv_messgae_broker_manager->msgbroker, NULL)) {
nv_message_broker_manager->msgbroker, NULL)) {
g_printerr(
"Could not link tee with message converter! Exiting.\n");
return false;
@ -396,7 +396,7 @@ bool PipelineManager::setup_pipeline() {
nv_video_convert_manager->nvvidconv, nv_osd_manager->nvosd,
tee_manager->tee, tee_manager->queue1, tee_manager->queue2,
nv_message_converter_manager->msgconv,
nv_messgae_broker_manager->msgbroker, sink_manager->sink, NULL);
nv_message_broker_manager->msgbroker, sink_manager->sink, NULL);
/* we link the elements together
* nvstreammux -> nvinfer -> nvtiler -> nvvidconv -> nvosd ->
@ -428,7 +428,7 @@ bool PipelineManager::setup_pipeline() {
if (!gst_element_link_many(
tee_manager->queue1, nv_message_converter_manager->msgconv,
nv_messgae_broker_manager->msgbroker, NULL)) {
nv_message_broker_manager->msgbroker, NULL)) {
g_printerr(
"Could not link tee with message converter! Exiting.\n");
return false;
@ -449,7 +449,7 @@ bool PipelineManager::setup_pipeline() {
nv_video_convert_manager->nvvidconv, nv_osd_manager->nvosd,
tee_manager->tee, tee_manager->queue1, tee_manager->queue2,
nv_message_converter_manager->msgconv,
nv_messgae_broker_manager->msgbroker,
nv_message_broker_manager->msgbroker,
sink_manager->nvvidconv_postosd, sink_manager->caps,
sink_manager->encoder, sink_manager->rtppay, sink_manager->sink,
NULL);
@ -486,7 +486,7 @@ bool PipelineManager::setup_pipeline() {
}
if (!gst_element_link_many(
tee_manager->queue1, nv_message_converter_manager->msgconv,
nv_messgae_broker_manager->msgbroker, NULL)) {
nv_message_broker_manager->msgbroker, NULL)) {
g_printerr(
"Could not link tee with message converter! Exiting.\n");
return false;
@ -582,7 +582,7 @@ bool PipelineManager::create_pipeline_elements(int num_sources,
nv_osd_manager->create_nv_osd();
nv_message_converter_manager->create_message_converter();
nv_messgae_broker_manager->create_message_broker();
nv_message_broker_manager->create_message_broker();
tee_manager->create_tee();
/* Add queue elements between every two elements */
@ -627,6 +627,8 @@ bool PipelineManager::create_pipeline_elements(int num_sources,
obj_ctx_handle); // nvinfer Or use "nvtracker" if after
nv_osd_manager->attach_probe_to_src_nvosd(obj_ctx_handle);
nv_message_converter_manager->attach_probe_to_sink_msgconv();
nv_message_converter_manager->attach_probe_to_src_msgconv();
nv_message_broker_manager->attach_probe_to_sink_msgbroker();
message_handling->create_message_handler(pipeline, g_run_forever, loop);
setup_pipeline();

View File

@ -45,7 +45,7 @@ class PipelineManager {
FaceNvInferServerManager *face_nv_infer_server_manager =
new FaceNvInferServerManager();
NvMessageConverter *nv_message_converter_manager = new NvMessageConverter();
NvMessageBroker *nv_messgae_broker_manager = new NvMessageBroker();
NvMessageBroker *nv_message_broker_manager = new NvMessageBroker();
TeeManager *tee_manager = new TeeManager();
static double fps_buffer_probe;
static double fps_probe;