Attach message converter sink probe

This commit is contained in:
Barzan Hayati 2025-09-13 10:56:42 +00:00
parent c0aeaec9c2
commit 31caf133d9
4 changed files with 232 additions and 12 deletions

View File

@ -1,7 +1,7 @@
#include "nv_message_converter.hpp"
NvMessageConverter::NvMessageConverter() {
const auto& config = ConfigManager::get_instance().get_config();
const auto &config = ConfigManager::get_instance().get_config();
msgconv_config_file = config["msgconv"]["msgconv_config_file"];
frame_interval = config["msgconv"]["msgconv_frame_interval"];
@ -38,4 +38,217 @@ bool NvMessageConverter::create_message_converter() {
return false;
}
return true;
}
}
const char *metaTypeToString(NvDsMetaType type) {
switch (type) {
case NVDS_INVALID_META:
return "NVDS_INVALID_META";
case NVDS_BATCH_META:
return "NVDS_BATCH_META";
case NVDS_FRAME_META:
return "NVDS_FRAME_META";
case NVDS_OBJ_META:
return "NVDS_OBJ_META";
case NVDS_DISPLAY_META:
return "NVDS_DISPLAY_META";
case NVDS_CLASSIFIER_META:
return "NVDS_CLASSIFIER_META";
case NVDS_LABEL_INFO_META:
return "NVDS_LABEL_INFO_META";
case NVDS_USER_META:
return "NVDS_USER_META";
case NVDS_PAYLOAD_META:
return "NVDS_PAYLOAD_META";
case NVDS_EVENT_MSG_META:
return "NVDS_EVENT_MSG_META";
case NVDS_OPTICAL_FLOW_META:
return "NVDS_OPTICAL_FLOW_META";
case NVDS_LATENCY_MEASUREMENT_META:
return "NVDS_LATENCY_MEASUREMENT_META";
case NVDSINFER_TENSOR_OUTPUT_META:
return "NVDSINFER_TENSOR_OUTPUT_META";
case NVDSINFER_SEGMENTATION_META:
return "NVDSINFER_SEGMENTATION_META";
case NVDS_CROP_IMAGE_META:
return "NVDS_CROP_IMAGE_META";
case NVDS_TRACKER_PAST_FRAME_META:
return "NVDS_TRACKER_PAST_FRAME_META";
case NVDS_TRACKER_BATCH_REID_META:
return "NVDS_TRACKER_BATCH_REID_META";
case NVDS_TRACKER_OBJ_REID_META:
return "NVDS_TRACKER_OBJ_REID_META";
case NVDS_TRACKER_TERMINATED_LIST_META:
return "NVDS_TRACKER_TERMINATED_LIST_META";
case NVDS_TRACKER_SHADOW_LIST_META:
return "NVDS_TRACKER_SHADOW_LIST_META";
case NVDS_OBJ_VISIBILITY:
return "NVDS_OBJ_VISIBILITY";
case NVDS_OBJ_IMAGE_FOOT_LOCATION:
return "NVDS_OBJ_IMAGE_FOOT_LOCATION";
case NVDS_OBJ_WORLD_FOOT_LOCATION:
return "NVDS_OBJ_WORLD_FOOT_LOCATION";
case NVDS_OBJ_IMAGE_CONVEX_HULL:
return "NVDS_OBJ_IMAGE_CONVEX_HULL";
case NVDS_AUDIO_BATCH_META:
return "NVDS_AUDIO_BATCH_META";
case NVDS_AUDIO_FRAME_META:
return "NVDS_AUDIO_FRAME_META";
case NVDS_PREPROCESS_FRAME_META:
return "NVDS_PREPROCESS_FRAME_META";
case NVDS_PREPROCESS_BATCH_META:
return "NVDS_PREPROCESS_BATCH_META";
case NVDS_CUSTOM_MSG_BLOB:
return "NVDS_CUSTOM_MSG_BLOB";
case NVDS_ROI_META:
return "NVDS_ROI_META";
case NVDS_RESERVED_META:
return "NVDS_RESERVED_META";
default:
return "UNKNOWN_META_TYPE";
}
}
static bool safe_string_print(const char *s, size_t maxlen = 512) {
if (!s) return false;
// Try to be conservative: check bytes up to maxlen for a terminating NUL
for (size_t i = 0; i < maxlen; ++i) {
// read each byte carefully; this still risks UB if pointer invalid,
// but we only call this if pointer seems reasonable (non-NULL).
if (s[i] == '\0') return true;
}
return false; // no NUL found in first maxlen bytes -> suspicious
}
void NvMessageConverter::attach_probe_to_sink_msgconv() {
GstPad *sink_pad = gst_element_get_static_pad(msgconv, "sink");
if (!sink_pad) {
std::cerr << "Unable to get sink_pad sink pad\n";
return;
}
gst_pad_add_probe(sink_pad, GST_PAD_PROBE_TYPE_BUFFER, nvmsgconv_probe_cb,
NULL, NULL);
gst_object_unref(sink_pad);
}
GstPadProbeReturn NvMessageConverter::nvmsgconv_probe_cb(GstPad *pad,
GstPadProbeInfo *info,
gpointer user_data) {
(void)pad;
(void)user_data;
GstBuffer *buf = GST_PAD_PROBE_INFO_BUFFER(info);
if (!buf) return GST_PAD_PROBE_OK;
// make a writable copy (or just use the buffer if not modifying)
// buf = gst_buffer_make_writable(buf);
// get batch meta
NvDsBatchMeta *batch_meta = gst_buffer_get_nvds_batch_meta(buf);
if (!batch_meta) {
// g_print("[nvmsgconv probe] no batch meta\n");
return GST_PAD_PROBE_OK;
}
// loop over user meta to find event msg meta
for (NvDsMetaList *l = batch_meta->batch_user_meta_list; l != NULL;
l = l->next) {
NvDsUserMeta *user_meta = (NvDsUserMeta *)l->data;
// g_print("[nvmsgconv probe] batch user meta type=%s, ptr=%p\n",
// metaTypeToString(user_meta->base_meta.meta_type),
// (void*)user_meta->user_meta_data);
if (user_meta &&
user_meta->base_meta.meta_type == NVDS_EVENT_MSG_META) {
NvDsEventMsgMeta *msg_meta =
(NvDsEventMsgMeta *)user_meta->user_meta_data;
if (!msg_meta) {
g_print(" NVDS_EVENT_MSG_META but user_meta_data==NULL\n");
continue;
}
if (msg_meta) {
g_print("=== nvmsgconv probe: received event message ===\n");
if (msg_meta->ts) g_print("timestamp: %s\n", msg_meta->ts);
if (msg_meta->objType == NVDS_OBJECT_TYPE_PERSON)
g_print("object type: person\n");
if (msg_meta->extMsg) {
// extMsg is type-specific, e.g., NvDsVehicleObject /
// NvDsPersonObject
g_print("extMsg present\n");
}
std::quick_exit(0);
}
if (msg_meta) {
g_print("nvmsgconv probe: got event msg meta\n");
if (msg_meta->ts) g_print(" ts: %s\n", msg_meta->ts);
g_print(" objType: %d\n", msg_meta->objType);
std::quick_exit(0);
}
// Print numeric fields and pointer addresses only (safe)
g_print(
" event msg ptr=%p frameId=%" G_GINT64_FORMAT " objType=%d\n",
(void *)msg_meta, (gint64)msg_meta->frameId, msg_meta->objType);
g_print(" bbox: top=%f left=%f w=%f h=%f\n", msg_meta->bbox.top,
msg_meta->bbox.left, msg_meta->bbox.width,
msg_meta->bbox.height);
// Print timestamp pointer (safe) and length check before deref
g_print(" ts_ptr=%p\n", (void *)msg_meta->ts);
if (msg_meta->ts && safe_string_print(msg_meta->ts, 256)) {
g_print(" ts: %s\n", msg_meta->ts);
} else if (msg_meta->ts) {
g_print(
" ts appears suspicious (no NUL within 256 bytes) - not "
"printing\n");
} else {
g_print(" ts=NULL\n");
}
// If images present, show pointer/size
// if (msg_meta->image_meta.data && msg_meta->image_meta.size > 0) {
// g_print(" image_meta: data_ptr=%p size=%u w=%d h=%d
// type=%d\n",
// (void*)msg_meta->image_meta.data,
// msg_meta->image_meta.size,
// msg_meta->image_meta.width,
// msg_meta->image_meta.height,
// msg_meta->image_meta.image_type);
// }
}
}
// Also inspect per-frame metas (some code attaches to frame_user_meta_list)
for (NvDsMetaList *lf = batch_meta->frame_meta_list; lf; lf = lf->next) {
NvDsFrameMeta *fmeta = (NvDsFrameMeta *)lf->data;
if (!fmeta) continue;
for (NvDsMetaList *l = fmeta->frame_user_meta_list; l; l = l->next) {
NvDsUserMeta *um = (NvDsUserMeta *)l->data;
if (!um) continue;
// g_print("[nvmsgconv probe] frame %d user meta type=%s ptr=%p\n",
// fmeta->frame_num,
// metaTypeToString(um->base_meta.meta_type),
// (void*)um->user_meta_data);
if (um->base_meta.meta_type == NVDS_EVENT_MSG_META) {
NvDsEventMsgMeta *m = (NvDsEventMsgMeta *)um->user_meta_data;
if (!m) continue;
g_print("frame-level event msg ptr=%p frameId=%" G_GINT64_FORMAT
"\n",
(void *)m, (gint64)m->frameId);
g_print("ts_ptr=%p\n", (void *)m->ts);
if (m->ts && safe_string_print(m->ts, 256)) {
g_print("ts: %s\n", m->ts);
} else if (m->ts) {
g_print("ts suspicious - not printing\n");
} else {
g_print("ts=NULL\n");
}
}
}
}
return GST_PAD_PROBE_OK;
}

View File

@ -4,6 +4,8 @@
#include <iostream>
#include "config_manager.hpp"
#include "gstnvdsmeta.h"
#include "nvdsmeta_schema.h"
class NvMessageConverter {
private:
@ -15,4 +17,7 @@ class NvMessageConverter {
NvMessageConverter();
bool create_message_converter();
~NvMessageConverter();
void attach_probe_to_sink_msgconv();
static GstPadProbeReturn nvmsgconv_probe_cb(GstPad *, GstPadProbeInfo *,
gpointer);
};

View File

@ -281,7 +281,7 @@ bool PipelineManager::setup_pipeline() {
tiler_manager->tiler, queue_array[2].queue,
nv_video_convert_manager->nvvidconv, nv_osd_manager->nvosd,
tee_manager->tee, tee_manager->queue1, tee_manager->queue2,
nv_messgae_converter_manager->msgconv,
nv_message_converter_manager->msgconv,
nv_messgae_broker_manager->msgbroker, sink_manager->sink, NULL);
/* we link the elements together
@ -312,7 +312,7 @@ bool PipelineManager::setup_pipeline() {
return false;
}
if (!gst_element_link_many(
tee_manager->queue1, nv_messgae_converter_manager->msgconv,
tee_manager->queue1, nv_message_converter_manager->msgconv,
nv_messgae_broker_manager->msgbroker, NULL)) {
g_printerr(
"Could not link tee with message converter! Exiting.\n");
@ -333,7 +333,7 @@ bool PipelineManager::setup_pipeline() {
tiler_manager->tiler, queue_array[2].queue,
nv_video_convert_manager->nvvidconv, nv_osd_manager->nvosd,
tee_manager->tee, tee_manager->queue1, tee_manager->queue2,
nv_messgae_converter_manager->msgconv,
nv_message_converter_manager->msgconv,
nv_messgae_broker_manager->msgbroker,
sink_manager->nvvidconv_postosd, sink_manager->caps,
sink_manager->encoder, sink_manager->rtppay, sink_manager->sink,
@ -370,7 +370,7 @@ bool PipelineManager::setup_pipeline() {
return false;
}
if (!gst_element_link_many(
tee_manager->queue1, nv_messgae_converter_manager->msgconv,
tee_manager->queue1, nv_message_converter_manager->msgconv,
nv_messgae_broker_manager->msgbroker, NULL)) {
g_printerr(
"Could not link tee with message converter! Exiting.\n");
@ -395,7 +395,7 @@ bool PipelineManager::setup_pipeline() {
tiler_manager->tiler, queue_array[2].queue,
nv_video_convert_manager->nvvidconv, nv_osd_manager->nvosd,
tee_manager->tee, tee_manager->queue1, tee_manager->queue2,
nv_messgae_converter_manager->msgconv,
nv_message_converter_manager->msgconv,
nv_messgae_broker_manager->msgbroker, sink_manager->sink, NULL);
/* we link the elements together
@ -427,7 +427,7 @@ bool PipelineManager::setup_pipeline() {
}
if (!gst_element_link_many(
tee_manager->queue1, nv_messgae_converter_manager->msgconv,
tee_manager->queue1, nv_message_converter_manager->msgconv,
nv_messgae_broker_manager->msgbroker, NULL)) {
g_printerr(
"Could not link tee with message converter! Exiting.\n");
@ -448,7 +448,7 @@ bool PipelineManager::setup_pipeline() {
tiler_manager->tiler, queue_array[2].queue,
nv_video_convert_manager->nvvidconv, nv_osd_manager->nvosd,
tee_manager->tee, tee_manager->queue1, tee_manager->queue2,
nv_messgae_converter_manager->msgconv,
nv_message_converter_manager->msgconv,
nv_messgae_broker_manager->msgbroker,
sink_manager->nvvidconv_postosd, sink_manager->caps,
sink_manager->encoder, sink_manager->rtppay, sink_manager->sink,
@ -485,7 +485,7 @@ bool PipelineManager::setup_pipeline() {
return false;
}
if (!gst_element_link_many(
tee_manager->queue1, nv_messgae_converter_manager->msgconv,
tee_manager->queue1, nv_message_converter_manager->msgconv,
nv_messgae_broker_manager->msgbroker, NULL)) {
g_printerr(
"Could not link tee with message converter! Exiting.\n");
@ -581,7 +581,7 @@ bool PipelineManager::create_pipeline_elements(int num_sources,
nv_video_convert_manager->create_nv_video_convert();
nv_osd_manager->create_nv_osd();
nv_messgae_converter_manager->create_message_converter();
nv_message_converter_manager->create_message_converter();
nv_messgae_broker_manager->create_message_broker();
tee_manager->create_tee();
@ -626,6 +626,8 @@ bool PipelineManager::create_pipeline_elements(int num_sources,
nv_osd_manager->attach_probe_to_src_nvosd(
obj_ctx_handle); // nvinfer Or use "nvtracker" if after
nv_message_converter_manager->attach_probe_to_sink_msgconv();
message_handling->create_message_handler(pipeline, g_run_forever, loop);
setup_pipeline();

View File

@ -44,7 +44,7 @@ class PipelineManager {
NvTrackerManager *nv_tracker_manager = new NvTrackerManager();
FaceNvInferServerManager *face_nv_infer_server_manager =
new FaceNvInferServerManager();
NvMessageConverter *nv_messgae_converter_manager = new NvMessageConverter();
NvMessageConverter *nv_message_converter_manager = new NvMessageConverter();
NvMessageBroker *nv_messgae_broker_manager = new NvMessageBroker();
TeeManager *tee_manager = new TeeManager();
static double fps_buffer_probe;