diff --git a/CMakeLists.txt b/CMakeLists.txt index 43351cd..465a6ba 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -162,4 +162,5 @@ target_link_libraries(${PROJECT_NAME} nvdsgst_infer nvds_meta nvds_inferutils prometheus-cpp-core prometheus-cpp-pull # prometheus-cpp-exposer nvdsgst_metnvdsa microhttpd nvdsgst_nvmultiurisrcbin - nvds_batch_jpegenc) \ No newline at end of file + nvds_batch_jpegenc + nvds_msgbroker nvds_msgconv nvds_meta) \ No newline at end of file diff --git a/src/nv_message_broker.cpp b/src/nv_message_broker.cpp index 31d2e5f..1e26efb 100644 --- a/src/nv_message_broker.cpp +++ b/src/nv_message_broker.cpp @@ -1,7 +1,7 @@ #include "nv_message_broker.hpp" NvMessageBroker::NvMessageBroker() { - const auto& config = ConfigManager::get_instance().get_config(); + const auto &config = ConfigManager::get_instance().get_config(); msgbroker_config_file = config["msgbroker"]["msgbroker_config_file"]; protocol_adaptor_library = config["msgbroker"]["protocol_adaptor_library"]; @@ -33,4 +33,97 @@ bool NvMessageBroker::create_message_broker() { return false; } return true; +} + +void NvMessageBroker::attach_probe_to_sink_msgbroker() { + GstPad *sink_pad = gst_element_get_static_pad(msgbroker, "sink"); + if (!sink_pad) { + std::cerr << "Unable to get sink_pad sink pad\n"; + return; + } + + gst_pad_add_probe(sink_pad, GST_PAD_PROBE_TYPE_BUFFER, + broker_sink_pad_probe, NULL, NULL); + gst_object_unref(sink_pad); +} + +GstPadProbeReturn NvMessageBroker::broker_sink_pad_probe(GstPad *pad, + GstPadProbeInfo *info, + gpointer user_data) { + (void)pad; + (void)user_data; + GstBuffer *buf = GST_PAD_PROBE_INFO_BUFFER(info); + if (!buf) return GST_PAD_PROBE_OK; + + // Iterate metadata in the buffer + NvDsMetaList *l_frame = NULL; + NvDsMetaList *l_user = NULL; + NvDsBatchMeta *batch_meta = gst_buffer_get_nvds_batch_meta(buf); + if (!batch_meta) return GST_PAD_PROBE_OK; + + for (l_frame = batch_meta->frame_meta_list; l_frame != NULL; + l_frame = l_frame->next) { + NvDsFrameMeta *frame_meta = (NvDsFrameMeta *)(l_frame->data); + + for (l_user = frame_meta->frame_user_meta_list; l_user != NULL; + l_user = l_user->next) { + NvDsUserMeta *user_meta = (NvDsUserMeta *)(l_user->data); + if (user_meta && + user_meta->base_meta.meta_type == NVDS_EVENT_MSG_META) { + NvDsEventMsgMeta *msg_meta = + (NvDsEventMsgMeta *)user_meta->user_meta_data; + + if (msg_meta && msg_meta->extMsg != NULL) { + // You can inspect or pretty-print the JSON payload + g_print("Broker Probe 1: JSON payload: %s\n", + (char *)msg_meta->extMsg); + } + } + } + } + + // Also inspect per-frame metas (some code attaches to frame_user_meta_list) + for (NvDsMetaList *lf = batch_meta->frame_meta_list; lf; lf = lf->next) { + NvDsFrameMeta *fmeta = (NvDsFrameMeta *)lf->data; + if (!fmeta) continue; + + for (NvDsMetaList *l = fmeta->frame_user_meta_list; l; l = l->next) { + NvDsUserMeta *um = (NvDsUserMeta *)l->data; + if (!um) continue; + + // g_print("[nvmsgconv probe] frame %d user meta type=%s ptr=%p\n", + // fmeta->frame_num, + // metaTypeToString(um->base_meta.meta_type), + // (void*)um->user_meta_data); + + if (um->base_meta.meta_type == NVDS_EVENT_MSG_META) { + NvDsEventMsgMeta *m = (NvDsEventMsgMeta *)um->user_meta_data; + if (!m) continue; + if (m && m->extMsg != NULL) { + // You can inspect or pretty-print the JSON payload + g_print("Broker Probe 2: JSON payload: %s\n", + (char *)m->extMsg); + } + // g_print("frame-level event msg objClassId=%d objectId=%s + // componentId=%d trackingId=%ld confidence=%f ptr=%p frameId=%" + // G_GINT64_FORMAT + // "\n", + // m->objClassId, m->objectId, m->componentId, + // m->trackingId, m->confidence, (void *)m, + // (gint64)m->frameId); + + // g_print("ts_ptr=%p\n", (void *)m->ts); + // g_print("ts_ptr=%p\n", (void *)m->ts); + // if (m->ts && safe_string_print(m->ts, 256)) { + // g_print("ts: %s\n", m->ts); + // } else if (m->ts) { + // g_print("ts suspicious - not printing\n"); + // } else { + // g_print("ts=NULL\n"); + // } + } + } + } + + return GST_PAD_PROBE_OK; } \ No newline at end of file diff --git a/src/nv_message_broker.hpp b/src/nv_message_broker.hpp index 9041949..bc375af 100644 --- a/src/nv_message_broker.hpp +++ b/src/nv_message_broker.hpp @@ -4,6 +4,8 @@ #include #include "config_manager.hpp" +#include "gstnvdsmeta.h" +#include "nvdsmeta_schema.h" class NvMessageBroker { private: @@ -16,4 +18,7 @@ class NvMessageBroker { NvMessageBroker(); bool create_message_broker(); ~NvMessageBroker(); + void attach_probe_to_sink_msgbroker(); + static GstPadProbeReturn broker_sink_pad_probe(GstPad *, GstPadProbeInfo *, + gpointer); }; \ No newline at end of file diff --git a/src/nv_message_converter.cpp b/src/nv_message_converter.cpp index 96255fb..708eeb5 100644 --- a/src/nv_message_converter.cpp +++ b/src/nv_message_converter.cpp @@ -127,14 +127,13 @@ void NvMessageConverter::attach_probe_to_sink_msgconv() { return; } - gst_pad_add_probe(sink_pad, GST_PAD_PROBE_TYPE_BUFFER, nvmsgconv_probe_cb, - NULL, NULL); + gst_pad_add_probe(sink_pad, GST_PAD_PROBE_TYPE_BUFFER, + nvmsgconv_probe_cb_sink, NULL, NULL); gst_object_unref(sink_pad); } -GstPadProbeReturn NvMessageConverter::nvmsgconv_probe_cb(GstPad *pad, - GstPadProbeInfo *info, - gpointer user_data) { +GstPadProbeReturn NvMessageConverter::nvmsgconv_probe_cb_sink( + GstPad *pad, GstPadProbeInfo *info, gpointer user_data) { (void)pad; (void)user_data; GstBuffer *buf = GST_PAD_PROBE_INFO_BUFFER(info); @@ -236,9 +235,13 @@ GstPadProbeReturn NvMessageConverter::nvmsgconv_probe_cb(GstPad *pad, if (um->base_meta.meta_type == NVDS_EVENT_MSG_META) { NvDsEventMsgMeta *m = (NvDsEventMsgMeta *)um->user_meta_data; if (!m) continue; - g_print("frame-level event msg ptr=%p frameId=%" G_GINT64_FORMAT - "\n", - (void *)m, (gint64)m->frameId); + g_print( + "frame-level event msg objClassId=%d objectId=%s " + "componentId=%d trackingId=%ld confidence=%f ptr=%p " + "frameId=%" G_GINT64_FORMAT "\n", + m->objClassId, m->objectId, m->componentId, m->trackingId, + m->confidence, (void *)m, (gint64)m->frameId); + g_print("ts_ptr=%p\n", (void *)m->ts); if (m->ts && safe_string_print(m->ts, 256)) { g_print("ts: %s\n", m->ts); @@ -252,3 +255,36 @@ GstPadProbeReturn NvMessageConverter::nvmsgconv_probe_cb(GstPad *pad, } return GST_PAD_PROBE_OK; } + +void NvMessageConverter::attach_probe_to_src_msgconv() { + GstPad *src_pad = gst_element_get_static_pad(msgconv, "src"); + if (!src_pad) { + std::cerr << "Unable to get src_pad sink pad\n"; + return; + } + + gst_pad_add_probe(src_pad, GST_PAD_PROBE_TYPE_BUFFER, + nvmsgconv_probe_cb_src, NULL, NULL); + gst_object_unref(src_pad); +} + +// Probe callback to inspect JSON messages coming out of nvmsgconv +GstPadProbeReturn NvMessageConverter::nvmsgconv_probe_cb_src( + GstPad *pad, GstPadProbeInfo *info, gpointer user_data) { + (void)pad; + (void)user_data; + if (!(info->type & GST_PAD_PROBE_TYPE_BUFFER)) return GST_PAD_PROBE_OK; + + GstBuffer *buf = GST_PAD_PROBE_INFO_BUFFER(info); + if (!buf) return GST_PAD_PROBE_OK; + + // Map buffer to system memory + GstMapInfo map; + if (gst_buffer_map(buf, &map, GST_MAP_READ)) { + // nvmsgconv outputs application/json + std::string json_str(reinterpret_cast(map.data), map.size); + g_print("nvmsgconv JSON:\n%s\n", json_str.c_str()); + gst_buffer_unmap(buf, &map); + } + return GST_PAD_PROBE_OK; +} diff --git a/src/nv_message_converter.hpp b/src/nv_message_converter.hpp index a70b6c4..3e5367a 100644 --- a/src/nv_message_converter.hpp +++ b/src/nv_message_converter.hpp @@ -18,6 +18,10 @@ class NvMessageConverter { bool create_message_converter(); ~NvMessageConverter(); void attach_probe_to_sink_msgconv(); - static GstPadProbeReturn nvmsgconv_probe_cb(GstPad *, GstPadProbeInfo *, - gpointer); + static GstPadProbeReturn nvmsgconv_probe_cb_sink(GstPad *, + GstPadProbeInfo *, + gpointer); + void attach_probe_to_src_msgconv(); + static GstPadProbeReturn nvmsgconv_probe_cb_src(GstPad *, GstPadProbeInfo *, + gpointer); }; \ No newline at end of file diff --git a/src/pipeline_manager.cpp b/src/pipeline_manager.cpp index 35e1d1a..e0823ea 100644 --- a/src/pipeline_manager.cpp +++ b/src/pipeline_manager.cpp @@ -282,7 +282,7 @@ bool PipelineManager::setup_pipeline() { nv_video_convert_manager->nvvidconv, nv_osd_manager->nvosd, tee_manager->tee, tee_manager->queue1, tee_manager->queue2, nv_message_converter_manager->msgconv, - nv_messgae_broker_manager->msgbroker, sink_manager->sink, NULL); + nv_message_broker_manager->msgbroker, sink_manager->sink, NULL); /* we link the elements together * nvstreammux -> nvinfer -> nvtiler -> nvvidconv -> nvosd -> @@ -313,7 +313,7 @@ bool PipelineManager::setup_pipeline() { } if (!gst_element_link_many( tee_manager->queue1, nv_message_converter_manager->msgconv, - nv_messgae_broker_manager->msgbroker, NULL)) { + nv_message_broker_manager->msgbroker, NULL)) { g_printerr( "Could not link tee with message converter! Exiting.\n"); return false; @@ -334,7 +334,7 @@ bool PipelineManager::setup_pipeline() { nv_video_convert_manager->nvvidconv, nv_osd_manager->nvosd, tee_manager->tee, tee_manager->queue1, tee_manager->queue2, nv_message_converter_manager->msgconv, - nv_messgae_broker_manager->msgbroker, + nv_message_broker_manager->msgbroker, sink_manager->nvvidconv_postosd, sink_manager->caps, sink_manager->encoder, sink_manager->rtppay, sink_manager->sink, NULL); @@ -371,7 +371,7 @@ bool PipelineManager::setup_pipeline() { } if (!gst_element_link_many( tee_manager->queue1, nv_message_converter_manager->msgconv, - nv_messgae_broker_manager->msgbroker, NULL)) { + nv_message_broker_manager->msgbroker, NULL)) { g_printerr( "Could not link tee with message converter! Exiting.\n"); return false; @@ -396,7 +396,7 @@ bool PipelineManager::setup_pipeline() { nv_video_convert_manager->nvvidconv, nv_osd_manager->nvosd, tee_manager->tee, tee_manager->queue1, tee_manager->queue2, nv_message_converter_manager->msgconv, - nv_messgae_broker_manager->msgbroker, sink_manager->sink, NULL); + nv_message_broker_manager->msgbroker, sink_manager->sink, NULL); /* we link the elements together * nvstreammux -> nvinfer -> nvtiler -> nvvidconv -> nvosd -> @@ -428,7 +428,7 @@ bool PipelineManager::setup_pipeline() { if (!gst_element_link_many( tee_manager->queue1, nv_message_converter_manager->msgconv, - nv_messgae_broker_manager->msgbroker, NULL)) { + nv_message_broker_manager->msgbroker, NULL)) { g_printerr( "Could not link tee with message converter! Exiting.\n"); return false; @@ -449,7 +449,7 @@ bool PipelineManager::setup_pipeline() { nv_video_convert_manager->nvvidconv, nv_osd_manager->nvosd, tee_manager->tee, tee_manager->queue1, tee_manager->queue2, nv_message_converter_manager->msgconv, - nv_messgae_broker_manager->msgbroker, + nv_message_broker_manager->msgbroker, sink_manager->nvvidconv_postosd, sink_manager->caps, sink_manager->encoder, sink_manager->rtppay, sink_manager->sink, NULL); @@ -486,7 +486,7 @@ bool PipelineManager::setup_pipeline() { } if (!gst_element_link_many( tee_manager->queue1, nv_message_converter_manager->msgconv, - nv_messgae_broker_manager->msgbroker, NULL)) { + nv_message_broker_manager->msgbroker, NULL)) { g_printerr( "Could not link tee with message converter! Exiting.\n"); return false; @@ -582,7 +582,7 @@ bool PipelineManager::create_pipeline_elements(int num_sources, nv_osd_manager->create_nv_osd(); nv_message_converter_manager->create_message_converter(); - nv_messgae_broker_manager->create_message_broker(); + nv_message_broker_manager->create_message_broker(); tee_manager->create_tee(); /* Add queue elements between every two elements */ @@ -627,6 +627,8 @@ bool PipelineManager::create_pipeline_elements(int num_sources, obj_ctx_handle); // nvinfer Or use "nvtracker" if after nv_osd_manager->attach_probe_to_src_nvosd(obj_ctx_handle); nv_message_converter_manager->attach_probe_to_sink_msgconv(); + nv_message_converter_manager->attach_probe_to_src_msgconv(); + nv_message_broker_manager->attach_probe_to_sink_msgbroker(); message_handling->create_message_handler(pipeline, g_run_forever, loop); setup_pipeline(); diff --git a/src/pipeline_manager.hpp b/src/pipeline_manager.hpp index edcf837..aac8106 100644 --- a/src/pipeline_manager.hpp +++ b/src/pipeline_manager.hpp @@ -45,7 +45,7 @@ class PipelineManager { FaceNvInferServerManager *face_nv_infer_server_manager = new FaceNvInferServerManager(); NvMessageConverter *nv_message_converter_manager = new NvMessageConverter(); - NvMessageBroker *nv_messgae_broker_manager = new NvMessageBroker(); + NvMessageBroker *nv_message_broker_manager = new NvMessageBroker(); TeeManager *tee_manager = new TeeManager(); static double fps_buffer_probe; static double fps_probe;