33
44#include " simple_socket/TCPSocket.hpp"
55#include " simple_socket/mqtt/mqtt_common.hpp"
6+
7+ #ifdef SIMPLE_SOCKET_WITH_WEBSOCKETS
68#include " simple_socket/ws/WebSocket.hpp"
9+ #endif
710
811#include < iostream>
912#include < mutex>
1013#include < thread>
1114#include < unordered_map>
1215#include < unordered_set>
1316#include < vector>
14- #include < queue >
17+ #include < deque >
1518
1619
1720using namespace simple_socket ;
@@ -24,15 +27,19 @@ struct MQTTBroker::Impl {
2427
2528 void start () {
2629 listener_ = std::thread ([this ] { acceptLoop (); });
30+ #ifdef SIMPLE_SOCKET_WITH_WEBSOCKETS
2731 wsListener_ = std::thread ([this ] { wsAcceptLoop (); });
32+ #endif
2833 }
2934
3035 void stop () {
3136 stop_ = true ;
3237 server_.close ();
33- ws_.stop ();
3438 if (listener_.joinable ()) listener_.join ();
39+ #ifdef SIMPLE_SOCKET_WITH_WEBSOCKETS
40+ ws_.stop ();
3541 if (wsListener_.joinable ()) wsListener_.join ();
42+ #endif
3643 }
3744
3845private:
@@ -42,8 +49,12 @@ struct MQTTBroker::Impl {
4249 std::unordered_set<std::string> topics;
4350 };
4451
45- TCPServer server_;
52+ # ifdef SIMPLE_SOCKET_WITH_WEBSOCKETS
4653 WebSocket ws_;
54+ #endif
55+
56+ TCPServer server_;
57+
4758 std::atomic<bool > stop_;
4859 std::thread listener_;
4960 std::thread wsListener_;
@@ -52,19 +63,18 @@ struct MQTTBroker::Impl {
5263 std::unordered_map<std::string, std::vector<Client*>> subscribers_;
5364 std::vector<Client*> clients_;
5465
55-
66+ # ifdef SIMPLE_SOCKET_WITH_WEBSOCKETS
5667 void wsAcceptLoop () {
5768
5869 struct WsWrapper : SimpleConnection {
5970
60- WebSocketConnection* connection;
6171
6272 explicit WsWrapper (WebSocketConnection* c): connection(c) {}
6373
6474 int read (uint8_t * buffer, size_t size) override {
6575 std::unique_lock lock (m_);
66- cv_.wait (lock, [&]{ return closed_ || !queue_.empty (); });
67- if (queue_.empty ()) return -1 ; // closed and no data
76+ cv_.wait (lock, [&] { return closed_ || !queue_.empty (); });
77+ if (queue_.empty ()) return -1 ;// closed and no data
6878
6979 std::string msg = std::move (queue_.front ());
7080 queue_.pop_front ();
@@ -110,6 +120,7 @@ struct MQTTBroker::Impl {
110120 std::deque<std::string> queue_;
111121 std::mutex m_;
112122 std::condition_variable cv_;
123+ WebSocketConnection* connection;
113124 };
114125
115126 std::unordered_map<WebSocketConnection*, WsWrapper*> connections;
@@ -131,7 +142,6 @@ struct MQTTBroker::Impl {
131142 ws_.onClose = [&connections](WebSocketConnection* conn) {
132143 std::cout << " MQTTBroker: WebSocket connection closed" << std::endl;
133144 connections[conn]->close ();
134-
135145 };
136146 ws_.start ();
137147
@@ -149,7 +159,7 @@ struct MQTTBroker::Impl {
149159 }
150160 ws_.stop ();
151161 }
152-
162+ # endif
153163
154164 void acceptLoop () {
155165
@@ -197,7 +207,7 @@ struct MQTTBroker::Impl {
197207
198208 // CONNACK
199209 const std::vector<uint8_t > connack = {CONNACK, 0x02 , 0x00 , 0x00 };
200- c->conn ->write (connack);
210+ if (! c->conn ->write (connack)) return ;
201211
202212 // Main loop
203213 bool running = true ;
@@ -208,8 +218,8 @@ struct MQTTBroker::Impl {
208218 std::vector<uint8_t > buf (rem);
209219 if (rem > 0 && !c->conn ->readExact (buf.data (), rem)) break ;
210220
211- const uint8_t typeNibble = static_cast <uint8_t >(hdr & 0xF0 );
212- const uint8_t flagsNibble = static_cast <uint8_t >(hdr & 0x0F );
221+ const auto typeNibble = static_cast <uint8_t >(hdr & 0xF0 );
222+ const auto flagsNibble = static_cast <uint8_t >(hdr & 0x0F );
213223
214224 switch (typeNibble) {
215225 case (PUBLISH & 0xF0 ): {
@@ -256,11 +266,20 @@ struct MQTTBroker::Impl {
256266 packet.insert (packet.end (), pl.begin (), pl.end ());
257267
258268 std::lock_guard lock (subsMutex_);
259- for (auto [t, subs] : subscribers_) {
260- if (t == topic) {
269+ for (auto it = subscribers_.begin (); it != subscribers_.end (); ) {
270+ if (it->first == topic) {
271+ auto & subs = it->second ;
272+ bool erased = false ;
261273 for (auto * sub : subs) {
262- sub->conn ->write (packet);
274+ if (!sub->conn ->write (packet)) {
275+ it = subscribers_.erase (it);
276+ erased = true ;
277+ break ; // exit inner loop
278+ }
263279 }
280+ if (!erased) ++it;
281+ } else {
282+ ++it;
264283 }
265284 }
266285 } break ;
@@ -291,7 +310,9 @@ struct MQTTBroker::Impl {
291310 SUBACK, 0x03 ,
292311 static_cast <uint8_t >(pid >> 8 ), static_cast <uint8_t >(pid & 0xFF ),
293312 0x00 };
294- c->conn ->write (suback);
313+ if (!c->conn ->write (suback)) {
314+ running = false ;
315+ }
295316 } break ;
296317
297318 case (UNSUBSCRIBE & 0xF0 ): {
@@ -317,13 +338,17 @@ struct MQTTBroker::Impl {
317338 const std::vector<uint8_t > unsuback = {
318339 UNSUBACK, 0x02 ,
319340 static_cast <uint8_t >(pid >> 8 ), static_cast <uint8_t >(pid & 0xFF )};
320- c->conn ->write (unsuback);
341+ if (!c->conn ->write (unsuback)) {
342+ running = false ;
343+ }
321344 } break ;
322345
323346 case PINGREQ: {
324347 if (flagsNibble != 0x00 ) break ;
325348 const std::vector<uint8_t > pingresp = {PINGRESP, 0x00 };
326- c->conn ->write (pingresp);
349+ if (!c->conn ->write (pingresp)) {
350+ running = false ;
351+ }
327352 } break ;
328353
329354 case DISCONNECT:
0 commit comments