From 31caf133d992162420817915a7935ab4cfdd1536 Mon Sep 17 00:00:00 2001 From: Barzan Hayati Date: Sat, 13 Sep 2025 10:56:42 +0000 Subject: [PATCH] Attach message converter sink probe --- src/nv_message_converter.cpp | 217 ++++++++++++++++++++++++++++++++++- src/nv_message_converter.hpp | 5 + src/pipeline_manager.cpp | 20 ++-- src/pipeline_manager.hpp | 2 +- 4 files changed, 232 insertions(+), 12 deletions(-) diff --git a/src/nv_message_converter.cpp b/src/nv_message_converter.cpp index 53af8f4..96255fb 100644 --- a/src/nv_message_converter.cpp +++ b/src/nv_message_converter.cpp @@ -1,7 +1,7 @@ #include "nv_message_converter.hpp" NvMessageConverter::NvMessageConverter() { - const auto& config = ConfigManager::get_instance().get_config(); + const auto &config = ConfigManager::get_instance().get_config(); msgconv_config_file = config["msgconv"]["msgconv_config_file"]; frame_interval = config["msgconv"]["msgconv_frame_interval"]; @@ -38,4 +38,217 @@ bool NvMessageConverter::create_message_converter() { return false; } return true; -} \ No newline at end of file +} + +const char *metaTypeToString(NvDsMetaType type) { + switch (type) { + case NVDS_INVALID_META: + return "NVDS_INVALID_META"; + case NVDS_BATCH_META: + return "NVDS_BATCH_META"; + case NVDS_FRAME_META: + return "NVDS_FRAME_META"; + case NVDS_OBJ_META: + return "NVDS_OBJ_META"; + case NVDS_DISPLAY_META: + return "NVDS_DISPLAY_META"; + case NVDS_CLASSIFIER_META: + return "NVDS_CLASSIFIER_META"; + case NVDS_LABEL_INFO_META: + return "NVDS_LABEL_INFO_META"; + case NVDS_USER_META: + return "NVDS_USER_META"; + case NVDS_PAYLOAD_META: + return "NVDS_PAYLOAD_META"; + case NVDS_EVENT_MSG_META: + return "NVDS_EVENT_MSG_META"; + case NVDS_OPTICAL_FLOW_META: + return "NVDS_OPTICAL_FLOW_META"; + case NVDS_LATENCY_MEASUREMENT_META: + return "NVDS_LATENCY_MEASUREMENT_META"; + case NVDSINFER_TENSOR_OUTPUT_META: + return "NVDSINFER_TENSOR_OUTPUT_META"; + case NVDSINFER_SEGMENTATION_META: + return "NVDSINFER_SEGMENTATION_META"; + case NVDS_CROP_IMAGE_META: + return "NVDS_CROP_IMAGE_META"; + case NVDS_TRACKER_PAST_FRAME_META: + return "NVDS_TRACKER_PAST_FRAME_META"; + case NVDS_TRACKER_BATCH_REID_META: + return "NVDS_TRACKER_BATCH_REID_META"; + case NVDS_TRACKER_OBJ_REID_META: + return "NVDS_TRACKER_OBJ_REID_META"; + case NVDS_TRACKER_TERMINATED_LIST_META: + return "NVDS_TRACKER_TERMINATED_LIST_META"; + case NVDS_TRACKER_SHADOW_LIST_META: + return "NVDS_TRACKER_SHADOW_LIST_META"; + case NVDS_OBJ_VISIBILITY: + return "NVDS_OBJ_VISIBILITY"; + case NVDS_OBJ_IMAGE_FOOT_LOCATION: + return "NVDS_OBJ_IMAGE_FOOT_LOCATION"; + case NVDS_OBJ_WORLD_FOOT_LOCATION: + return "NVDS_OBJ_WORLD_FOOT_LOCATION"; + case NVDS_OBJ_IMAGE_CONVEX_HULL: + return "NVDS_OBJ_IMAGE_CONVEX_HULL"; + case NVDS_AUDIO_BATCH_META: + return "NVDS_AUDIO_BATCH_META"; + case NVDS_AUDIO_FRAME_META: + return "NVDS_AUDIO_FRAME_META"; + case NVDS_PREPROCESS_FRAME_META: + return "NVDS_PREPROCESS_FRAME_META"; + case NVDS_PREPROCESS_BATCH_META: + return "NVDS_PREPROCESS_BATCH_META"; + case NVDS_CUSTOM_MSG_BLOB: + return "NVDS_CUSTOM_MSG_BLOB"; + case NVDS_ROI_META: + return "NVDS_ROI_META"; + case NVDS_RESERVED_META: + return "NVDS_RESERVED_META"; + default: + return "UNKNOWN_META_TYPE"; + } +} + +static bool safe_string_print(const char *s, size_t maxlen = 512) { + if (!s) return false; + // Try to be conservative: check bytes up to maxlen for a terminating NUL + for (size_t i = 0; i < maxlen; ++i) { + // read each byte carefully; this still risks UB if pointer invalid, + // but we only call this if pointer seems reasonable (non-NULL). + if (s[i] == '\0') return true; + } + return false; // no NUL found in first maxlen bytes -> suspicious +} + +void NvMessageConverter::attach_probe_to_sink_msgconv() { + GstPad *sink_pad = gst_element_get_static_pad(msgconv, "sink"); + if (!sink_pad) { + std::cerr << "Unable to get sink_pad sink pad\n"; + return; + } + + gst_pad_add_probe(sink_pad, GST_PAD_PROBE_TYPE_BUFFER, nvmsgconv_probe_cb, + NULL, NULL); + gst_object_unref(sink_pad); +} + +GstPadProbeReturn NvMessageConverter::nvmsgconv_probe_cb(GstPad *pad, + GstPadProbeInfo *info, + gpointer user_data) { + (void)pad; + (void)user_data; + GstBuffer *buf = GST_PAD_PROBE_INFO_BUFFER(info); + if (!buf) return GST_PAD_PROBE_OK; + + // make a writable copy (or just use the buffer if not modifying) + // buf = gst_buffer_make_writable(buf); + + // get batch meta + NvDsBatchMeta *batch_meta = gst_buffer_get_nvds_batch_meta(buf); + if (!batch_meta) { + // g_print("[nvmsgconv probe] no batch meta\n"); + return GST_PAD_PROBE_OK; + } + + // loop over user meta to find event msg meta + for (NvDsMetaList *l = batch_meta->batch_user_meta_list; l != NULL; + l = l->next) { + NvDsUserMeta *user_meta = (NvDsUserMeta *)l->data; + + // g_print("[nvmsgconv probe] batch user meta type=%s, ptr=%p\n", + // metaTypeToString(user_meta->base_meta.meta_type), + // (void*)user_meta->user_meta_data); + if (user_meta && + user_meta->base_meta.meta_type == NVDS_EVENT_MSG_META) { + NvDsEventMsgMeta *msg_meta = + (NvDsEventMsgMeta *)user_meta->user_meta_data; + if (!msg_meta) { + g_print(" NVDS_EVENT_MSG_META but user_meta_data==NULL\n"); + continue; + } + + if (msg_meta) { + g_print("=== nvmsgconv probe: received event message ===\n"); + if (msg_meta->ts) g_print("timestamp: %s\n", msg_meta->ts); + if (msg_meta->objType == NVDS_OBJECT_TYPE_PERSON) + g_print("object type: person\n"); + if (msg_meta->extMsg) { + // extMsg is type-specific, e.g., NvDsVehicleObject / + // NvDsPersonObject + g_print("extMsg present\n"); + } + std::quick_exit(0); + } + + if (msg_meta) { + g_print("nvmsgconv probe: got event msg meta\n"); + if (msg_meta->ts) g_print(" ts: %s\n", msg_meta->ts); + g_print(" objType: %d\n", msg_meta->objType); + std::quick_exit(0); + } + + // Print numeric fields and pointer addresses only (safe) + g_print( + " event msg ptr=%p frameId=%" G_GINT64_FORMAT " objType=%d\n", + (void *)msg_meta, (gint64)msg_meta->frameId, msg_meta->objType); + g_print(" bbox: top=%f left=%f w=%f h=%f\n", msg_meta->bbox.top, + msg_meta->bbox.left, msg_meta->bbox.width, + msg_meta->bbox.height); + + // Print timestamp pointer (safe) and length check before deref + g_print(" ts_ptr=%p\n", (void *)msg_meta->ts); + if (msg_meta->ts && safe_string_print(msg_meta->ts, 256)) { + g_print(" ts: %s\n", msg_meta->ts); + } else if (msg_meta->ts) { + g_print( + " ts appears suspicious (no NUL within 256 bytes) - not " + "printing\n"); + } else { + g_print(" ts=NULL\n"); + } + // If images present, show pointer/size + // if (msg_meta->image_meta.data && msg_meta->image_meta.size > 0) { + // g_print(" image_meta: data_ptr=%p size=%u w=%d h=%d + // type=%d\n", + // (void*)msg_meta->image_meta.data, + // msg_meta->image_meta.size, + // msg_meta->image_meta.width, + // msg_meta->image_meta.height, + // msg_meta->image_meta.image_type); + // } + } + } + + // Also inspect per-frame metas (some code attaches to frame_user_meta_list) + for (NvDsMetaList *lf = batch_meta->frame_meta_list; lf; lf = lf->next) { + NvDsFrameMeta *fmeta = (NvDsFrameMeta *)lf->data; + if (!fmeta) continue; + + for (NvDsMetaList *l = fmeta->frame_user_meta_list; l; l = l->next) { + NvDsUserMeta *um = (NvDsUserMeta *)l->data; + if (!um) continue; + + // g_print("[nvmsgconv probe] frame %d user meta type=%s ptr=%p\n", + // fmeta->frame_num, + // metaTypeToString(um->base_meta.meta_type), + // (void*)um->user_meta_data); + + if (um->base_meta.meta_type == NVDS_EVENT_MSG_META) { + NvDsEventMsgMeta *m = (NvDsEventMsgMeta *)um->user_meta_data; + if (!m) continue; + g_print("frame-level event msg ptr=%p frameId=%" G_GINT64_FORMAT + "\n", + (void *)m, (gint64)m->frameId); + g_print("ts_ptr=%p\n", (void *)m->ts); + if (m->ts && safe_string_print(m->ts, 256)) { + g_print("ts: %s\n", m->ts); + } else if (m->ts) { + g_print("ts suspicious - not printing\n"); + } else { + g_print("ts=NULL\n"); + } + } + } + } + return GST_PAD_PROBE_OK; +} diff --git a/src/nv_message_converter.hpp b/src/nv_message_converter.hpp index 15edd30..a70b6c4 100644 --- a/src/nv_message_converter.hpp +++ b/src/nv_message_converter.hpp @@ -4,6 +4,8 @@ #include #include "config_manager.hpp" +#include "gstnvdsmeta.h" +#include "nvdsmeta_schema.h" class NvMessageConverter { private: @@ -15,4 +17,7 @@ class NvMessageConverter { NvMessageConverter(); bool create_message_converter(); ~NvMessageConverter(); + void attach_probe_to_sink_msgconv(); + static GstPadProbeReturn nvmsgconv_probe_cb(GstPad *, GstPadProbeInfo *, + gpointer); }; \ No newline at end of file diff --git a/src/pipeline_manager.cpp b/src/pipeline_manager.cpp index c6c5e4f..67e14bd 100644 --- a/src/pipeline_manager.cpp +++ b/src/pipeline_manager.cpp @@ -281,7 +281,7 @@ bool PipelineManager::setup_pipeline() { tiler_manager->tiler, queue_array[2].queue, nv_video_convert_manager->nvvidconv, nv_osd_manager->nvosd, tee_manager->tee, tee_manager->queue1, tee_manager->queue2, - nv_messgae_converter_manager->msgconv, + nv_message_converter_manager->msgconv, nv_messgae_broker_manager->msgbroker, sink_manager->sink, NULL); /* we link the elements together @@ -312,7 +312,7 @@ bool PipelineManager::setup_pipeline() { return false; } if (!gst_element_link_many( - tee_manager->queue1, nv_messgae_converter_manager->msgconv, + tee_manager->queue1, nv_message_converter_manager->msgconv, nv_messgae_broker_manager->msgbroker, NULL)) { g_printerr( "Could not link tee with message converter! Exiting.\n"); @@ -333,7 +333,7 @@ bool PipelineManager::setup_pipeline() { tiler_manager->tiler, queue_array[2].queue, nv_video_convert_manager->nvvidconv, nv_osd_manager->nvosd, tee_manager->tee, tee_manager->queue1, tee_manager->queue2, - nv_messgae_converter_manager->msgconv, + nv_message_converter_manager->msgconv, nv_messgae_broker_manager->msgbroker, sink_manager->nvvidconv_postosd, sink_manager->caps, sink_manager->encoder, sink_manager->rtppay, sink_manager->sink, @@ -370,7 +370,7 @@ bool PipelineManager::setup_pipeline() { return false; } if (!gst_element_link_many( - tee_manager->queue1, nv_messgae_converter_manager->msgconv, + tee_manager->queue1, nv_message_converter_manager->msgconv, nv_messgae_broker_manager->msgbroker, NULL)) { g_printerr( "Could not link tee with message converter! Exiting.\n"); @@ -395,7 +395,7 @@ bool PipelineManager::setup_pipeline() { tiler_manager->tiler, queue_array[2].queue, nv_video_convert_manager->nvvidconv, nv_osd_manager->nvosd, tee_manager->tee, tee_manager->queue1, tee_manager->queue2, - nv_messgae_converter_manager->msgconv, + nv_message_converter_manager->msgconv, nv_messgae_broker_manager->msgbroker, sink_manager->sink, NULL); /* we link the elements together @@ -427,7 +427,7 @@ bool PipelineManager::setup_pipeline() { } if (!gst_element_link_many( - tee_manager->queue1, nv_messgae_converter_manager->msgconv, + tee_manager->queue1, nv_message_converter_manager->msgconv, nv_messgae_broker_manager->msgbroker, NULL)) { g_printerr( "Could not link tee with message converter! Exiting.\n"); @@ -448,7 +448,7 @@ bool PipelineManager::setup_pipeline() { tiler_manager->tiler, queue_array[2].queue, nv_video_convert_manager->nvvidconv, nv_osd_manager->nvosd, tee_manager->tee, tee_manager->queue1, tee_manager->queue2, - nv_messgae_converter_manager->msgconv, + nv_message_converter_manager->msgconv, nv_messgae_broker_manager->msgbroker, sink_manager->nvvidconv_postosd, sink_manager->caps, sink_manager->encoder, sink_manager->rtppay, sink_manager->sink, @@ -485,7 +485,7 @@ bool PipelineManager::setup_pipeline() { return false; } if (!gst_element_link_many( - tee_manager->queue1, nv_messgae_converter_manager->msgconv, + tee_manager->queue1, nv_message_converter_manager->msgconv, nv_messgae_broker_manager->msgbroker, NULL)) { g_printerr( "Could not link tee with message converter! Exiting.\n"); @@ -581,7 +581,7 @@ bool PipelineManager::create_pipeline_elements(int num_sources, nv_video_convert_manager->create_nv_video_convert(); nv_osd_manager->create_nv_osd(); - nv_messgae_converter_manager->create_message_converter(); + nv_message_converter_manager->create_message_converter(); nv_messgae_broker_manager->create_message_broker(); tee_manager->create_tee(); @@ -626,6 +626,8 @@ bool PipelineManager::create_pipeline_elements(int num_sources, nv_osd_manager->attach_probe_to_src_nvosd( obj_ctx_handle); // nvinfer Or use "nvtracker" if after + nv_message_converter_manager->attach_probe_to_sink_msgconv(); + message_handling->create_message_handler(pipeline, g_run_forever, loop); setup_pipeline(); diff --git a/src/pipeline_manager.hpp b/src/pipeline_manager.hpp index 9689739..edcf837 100644 --- a/src/pipeline_manager.hpp +++ b/src/pipeline_manager.hpp @@ -44,7 +44,7 @@ class PipelineManager { NvTrackerManager *nv_tracker_manager = new NvTrackerManager(); FaceNvInferServerManager *face_nv_infer_server_manager = new FaceNvInferServerManager(); - NvMessageConverter *nv_messgae_converter_manager = new NvMessageConverter(); + NvMessageConverter *nv_message_converter_manager = new NvMessageConverter(); NvMessageBroker *nv_messgae_broker_manager = new NvMessageBroker(); TeeManager *tee_manager = new TeeManager(); static double fps_buffer_probe;