diff --git a/CMakeLists.txt b/CMakeLists.txt index c80e634..2427c8a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -81,7 +81,8 @@ include_directories(${PROJECT_SOURCE_DIR}/nv_tracker_manager.hpp) include_directories(${PROJECT_SOURCE_DIR}/face_candid_trace.hpp) include_directories(${PROJECT_SOURCE_DIR}/face_nv_infer_server_manager.hpp) include_directories(${PROJECT_SOURCE_DIR}/face_nv_infer_server_manager.hpp) -include_directories(${PROJECT_SOURCE_DIR}/nv_messgae_converter.hpp) +include_directories(${PROJECT_SOURCE_DIR}/nv_message_converter.hpp) +include_directories(${PROJECT_SOURCE_DIR}/nv_message_broker.hpp) set(SOURCES src/main.cpp src/camera_manager.cpp src/pipeline_manager.cpp src/streammux_manager.cpp @@ -93,7 +94,7 @@ set(SOURCES src/main.cpp src/camera_manager.cpp src/pipeline_manager.cpp src/st src/config_manager.cpp src/nv_infer_server_manager.cpp src/nv_tracker_manager.cpp src/face_candid_trace.cpp src/face_nv_infer_server_manager.cpp src/face_nv_infer_server_manager.cpp - src/nv_messgae_converter.cpp) + src/nv_message_converter.cpp src/nv_message_broker.cpp) # missing initializer for member 'NvDsInferDims::d' [-Werror=missing-field-initializers] NvDsInferDims dims = {0}; diff --git a/data/configuration.json b/data/configuration.json index 18bf68c..a1e1a9c 100644 --- a/data/configuration.json +++ b/data/configuration.json @@ -54,6 +54,15 @@ "num-extra-surfaces": 1, "num-surfaces-per-frame": 0 }, - "msgconv_config_file": "../data/nvmsgconv_configs/msgconv_config.txt", - "msgconv_frame_interval": 30 + "msgconv": { + "msgconv_config_file": "../data/nvmsgconv_configs/msgconv_config.txt", + "msgconv_frame_interval": 30 + }, + "msgbroker": { + "msgbroker_config_file": "../data/nvmsgboker_configs/msgbroker_config.txt", + "protocol_adaptor_library": "../data/nvmsgboker_configs/libnvds_redis_proto.so", + "redis_broker_host": "ABC", + "redis_broker_port": 1234, + "topic_redis": "redis_stream" + } } \ No newline at end of file diff --git a/data/nvmsgboker_configs/libnvds_redis_proto.so b/data/nvmsgboker_configs/libnvds_redis_proto.so new file mode 100755 index 0000000..2e4c633 Binary files /dev/null and b/data/nvmsgboker_configs/libnvds_redis_proto.so differ diff --git a/data/nvmsgboker_configs/msgbroker_config.txt b/data/nvmsgboker_configs/msgbroker_config.txt new file mode 100644 index 0000000..45bede2 --- /dev/null +++ b/data/nvmsgboker_configs/msgbroker_config.txt @@ -0,0 +1,9 @@ +[message-broker] +# Redis-specific options +hostname=192.168.130.13 +port=6379 +streamsize=10000 +payloadkey=metadata +consumergroup=mygroup +consumername=myname +share-connection=1 \ No newline at end of file diff --git a/src/nv_message_broker.cpp b/src/nv_message_broker.cpp new file mode 100644 index 0000000..bfba071 --- /dev/null +++ b/src/nv_message_broker.cpp @@ -0,0 +1,32 @@ +#include "nv_message_broker.hpp" + +NvMessageBroker::NvMessageBroker() { + const auto& config = ConfigManager::get_instance().get_config(); + + msgbroker_config_file = config["msgbroker"]["msgbroker_config_file"]; + protocol_adaptor_library = config["msgbroker"]["protocol_adaptor_library"]; + topic_redis = config["msgbroker"]["topic_redis"]; + redis_host = config["msgbroker"]["redis_broker_host"]; + redis_port = config["msgbroker"]["redis_broker_port"]; + conn_str = redis_host + ";" + std::to_string(redis_port); +} + +bool NvMessageBroker::create_message_broker() { + msgbroker = gst_element_factory_make("nvmsgbroker", "nvmsg-broker"); + g_object_set(G_OBJECT(msgbroker), "config", msgbroker_config_file, NULL); + + // nvmsgbroker looks first at the --conn-str (or conn-str property). + // If it’s not provided, and you gave a --cfg-file (or config property), it + // will read hostname and port from the config file. If you set both, the + // conn-str overrides the file values. g_object_set (G_OBJECT (msgbroker), + // "conn-str", conn_str, NULL); + g_object_set(G_OBJECT(msgbroker), "proto-lib", protocol_adaptor_library, + "sync", FALSE, NULL); + g_object_set(G_OBJECT(msgbroker), "topic", topic_redis, NULL); + + if (!msgbroker) { + g_printerr("Unable to create msgbroker.Exiting."); + return false; + } + return true; +} \ No newline at end of file diff --git a/src/nv_message_broker.hpp b/src/nv_message_broker.hpp new file mode 100644 index 0000000..9041949 --- /dev/null +++ b/src/nv_message_broker.hpp @@ -0,0 +1,19 @@ +#include + +#include +#include + +#include "config_manager.hpp" + +class NvMessageBroker { + private: + public: + gint frame_interval; + GstElement *msgbroker = NULL; + std::string msgbroker_config_file, protocol_adaptor_library, topic_redis, + redis_host, conn_str; + int redis_port; + NvMessageBroker(); + bool create_message_broker(); + ~NvMessageBroker(); +}; \ No newline at end of file diff --git a/src/nv_messgae_converter.cpp b/src/nv_message_converter.cpp similarity index 87% rename from src/nv_messgae_converter.cpp rename to src/nv_message_converter.cpp index 6b7a22d..55fedd6 100644 --- a/src/nv_messgae_converter.cpp +++ b/src/nv_message_converter.cpp @@ -1,14 +1,10 @@ -#include "nv_messgae_converter.hpp" - -#define SET_GPU_ID(object, gpu_id) \ - g_object_set(G_OBJECT(object), "gpu-id", gpu_id, NULL); -#define GPU_ID 0 +#include "nv_message_converter.hpp" NvMessageConverter::NvMessageConverter() { const auto& config = ConfigManager::get_instance().get_config(); - msgconv_config_file = config["msgconv_config_file"]; - frame_interval = config["msgconv_frame_interval"]; + msgconv_config_file = config["msgconv"]["msgconv_config_file"]; + frame_interval = config["msgconv"]["msgconv_frame_interval"]; } bool NvMessageConverter::create_message_converter() { @@ -21,7 +17,6 @@ bool NvMessageConverter::create_message_converter() { // the "payload-type: 1" and "msg2p-newapi: 1" // msg2p-newapi: TRUE for DeepStream 6.x+ (recommended). g_object_set(G_OBJECT(msgconv), "frame-interval", frame_interval, NULL); - SET_GPU_ID(msgconv, GPU_ID); // g_object_set(G_OBJECT(msgconv), // "config", "dstest5_msgconv.cfg", // message schema config diff --git a/src/nv_messgae_converter.hpp b/src/nv_message_converter.hpp similarity index 100% rename from src/nv_messgae_converter.hpp rename to src/nv_message_converter.hpp diff --git a/src/pipeline_manager.cpp b/src/pipeline_manager.cpp index ec32129..bcef512 100644 --- a/src/pipeline_manager.cpp +++ b/src/pipeline_manager.cpp @@ -435,6 +435,7 @@ bool PipelineManager::create_pipeline_elements(int num_sources, nv_osd_manager->create_nv_osd(); nv_messgae_converter_manager->create_message_converter(); + nv_messgae_broker_manager->create_message_broker(); /* Add queue elements between every two elements */ const char* base = "queue"; diff --git a/src/pipeline_manager.hpp b/src/pipeline_manager.hpp index b089d38..070514d 100644 --- a/src/pipeline_manager.hpp +++ b/src/pipeline_manager.hpp @@ -11,7 +11,8 @@ #include "message_handling.hpp" #include "nv_ds_logger_manager.hpp" #include "nv_infer_server_manager.hpp" -#include "nv_messgae_converter.hpp" +#include "nv_message_broker.hpp" +#include "nv_message_converter.hpp" #include "nv_osd_manager.hpp" #include "nv_tracker_manager.hpp" #include "nv_video_convert_manager.hpp" @@ -43,6 +44,7 @@ class PipelineManager { FaceNvInferServerManager *face_nv_infer_server_manager = new FaceNvInferServerManager(); NvMessageConverter *nv_messgae_converter_manager = new NvMessageConverter(); + NvMessageBroker *nv_messgae_broker_manager = new NvMessageBroker(); static double fps_buffer_probe; static double fps_probe; static double fps_osd;