Skip to content

Commit 0f007f2

Browse files
committed
add mutithread sub
1 parent 4437ddd commit 0f007f2

2 files changed

Lines changed: 44 additions & 34 deletions

File tree

infinite_sense_core/include/messenger.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,14 @@ class Messenger {
2222
[[nodiscard]] std::string GetPubEndpoint() const;
2323
void Sub(const std::string& topic, const std::function<void(const std::string&)>& callback);
2424
void SubStruct(const std::string& topic, const std::function<void(const void*, size_t)>& callback);
25+
2526
private:
2627
Messenger();
2728
~Messenger();
2829
void CleanUp();
2930
zmq::context_t context_{};
30-
zmq::socket_t publisher_{},subscriber_{};
31+
zmq::socket_t publisher_{}, subscriber_{};
3132
std::string endpoint_{};
33+
std::vector<std::thread> sub_threads_;
3234
};
3335
} // namespace infinite_sense

infinite_sense_core/src/messenger.cpp

Lines changed: 41 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -53,46 +53,54 @@ void Messenger::PubStruct(const std::string& topic, const void* data, size_t siz
5353
std::string Messenger::GetPubEndpoint() const { return endpoint_; }
5454

5555
void Messenger::Sub(const std::string& topic, const std::function<void(const std::string&)>& callback) {
56-
try {
57-
subscriber_.set(zmq::sockopt::subscribe, topic);
58-
59-
while (true) {
60-
zmq::message_t topic_msg, data_msg;
61-
if (!subscriber_.recv(topic_msg) || !subscriber_.recv(data_msg)) {
62-
LOG(WARNING) << "Subscription receive failed.";
63-
continue;
56+
sub_threads_.emplace_back([=, this]() {
57+
try {
58+
zmq::socket_t subscriber(context_, zmq::socket_type::sub);
59+
subscriber.connect(endpoint_);
60+
subscriber.set(zmq::sockopt::subscribe, topic);
61+
62+
while (true) {
63+
zmq::message_t topic_msg, data_msg;
64+
if (!subscriber.recv(topic_msg) || !subscriber.recv(data_msg)) {
65+
LOG(WARNING) << "Subscription receive failed for topic: " << topic;
66+
continue;
67+
}
68+
69+
std::string received_topic(static_cast<char*>(topic_msg.data()), topic_msg.size());
70+
if (received_topic != topic) continue;
71+
72+
std::string data(static_cast<char*>(data_msg.data()), data_msg.size());
73+
callback(data);
6474
}
65-
66-
std::string received_topic(static_cast<char*>(topic_msg.data()), topic_msg.size());
67-
if (received_topic != topic) continue;
68-
69-
std::string data(static_cast<char*>(data_msg.data()), data_msg.size());
70-
callback(data);
75+
} catch (const zmq::error_t& e) {
76+
LOG(ERROR) << "Exception in Sub thread for topic [" << topic << "]: " << e.what();
7177
}
72-
} catch (const zmq::error_t& e) {
73-
LOG(ERROR) << "Exception in Sub: " << e.what();
74-
}
78+
});
7579
}
7680

7781
void Messenger::SubStruct(const std::string& topic, const std::function<void(const void*, size_t)>& callback) {
78-
try {
79-
subscriber_.set(zmq::sockopt::subscribe, topic);
80-
81-
while (true) {
82-
zmq::message_t topic_msg, data_msg;
83-
if (!subscriber_.recv(topic_msg) || !subscriber_.recv(data_msg)) {
84-
LOG(WARNING) << "Subscription struct receive failed.";
85-
continue;
82+
sub_threads_.emplace_back([=, this]() {
83+
try {
84+
zmq::socket_t subscriber(context_, zmq::socket_type::sub);
85+
subscriber.connect(endpoint_);
86+
subscriber.set(zmq::sockopt::subscribe, topic);
87+
88+
while (true) {
89+
zmq::message_t topic_msg, data_msg;
90+
if (!subscriber.recv(topic_msg) || !subscriber.recv(data_msg)) {
91+
LOG(WARNING) << "Subscription to topic [" << topic << "] failed.";
92+
continue;
93+
}
94+
95+
std::string received_topic(static_cast<char*>(topic_msg.data()), topic_msg.size());
96+
if (received_topic != topic) continue;
97+
98+
callback(data_msg.data(), data_msg.size());
8699
}
87-
88-
std::string received_topic(static_cast<char*>(topic_msg.data()), topic_msg.size());
89-
if (received_topic != topic) continue;
90-
91-
callback(data_msg.data(), data_msg.size());
100+
} catch (const zmq::error_t& e) {
101+
LOG(ERROR) << "Exception in SubStruct for topic [" << topic << "]: " << e.what();
92102
}
93-
} catch (const zmq::error_t& e) {
94-
LOG(ERROR) << "Exception in SubStruct: " << e.what();
95-
}
103+
});
96104
}
97105

98106
} // namespace infinite_sense

0 commit comments

Comments
 (0)