Skip to content

Commit cd4fedd

Browse files
popoaichuiniuRovicnowYoyi
authored andcommitted
[BugFix][android][ios][common] use WorkThreadExecutor instead of raw thread
Using WorkThreadExecutor can conveniently stop all threads of usb_client when it ends, avoiding unexpected resource access. issue: lynx-family#46
1 parent d5d2687 commit cd4fedd

File tree

3 files changed

+62
-86
lines changed

3 files changed

+62
-86
lines changed

debug_router/native/base/socket_guard.h

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,20 @@ constexpr SocketType kInvalidSocket = INVALID_SOCKET;
2020
typedef int SocketType;
2121
constexpr SocketType kInvalidSocket = -1;
2222
#endif
23+
#include <mutex>
2324

2425
namespace debugrouter {
2526
namespace base {
2627

2728
class SocketGuard {
2829
public:
29-
SocketType Get() const { return sock_; }
30+
SocketType Get() {
31+
std::lock_guard<std::mutex> lock(mutex_);
32+
return sock_;
33+
}
3034

3135
void Reset() {
36+
std::lock_guard<std::mutex> lock(mutex_);
3237
if (sock_ != kInvalidSocket) {
3338
CLOSESOCKET(sock_);
3439
}
@@ -37,16 +42,14 @@ class SocketGuard {
3742

3843
explicit SocketGuard(SocketType sock) : sock_(sock) {}
3944

40-
~SocketGuard() {
41-
if (sock_ != kInvalidSocket) {
42-
CLOSESOCKET(sock_);
43-
}
44-
}
45+
~SocketGuard() { Reset(); }
46+
4547
SocketGuard(const SocketGuard&) = delete;
4648
SocketGuard& operator=(const SocketGuard&) = delete;
4749

4850
private:
4951
SocketType sock_;
52+
std::mutex mutex_;
5053
};
5154

5255
} // namespace base

debug_router/native/socket/usb_client.cc

Lines changed: 43 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ void UsbClient::CloseClientSocket(SocketType socket_fd_) {
4545
socket_fd_ = kInvalidSocket;
4646
}
4747

48-
UsbClient::UsbClient(SocketType socket_fd) : socket_fd_(socket_fd) {
48+
UsbClient::UsbClient(SocketType socket_fd) : socket_guard_(socket_fd) {
4949
LOGI("UsbClient: Constructor.");
5050
}
5151

@@ -55,7 +55,12 @@ void UsbClient::SetConnectStatus(USBConnectStatus status) {
5555
});
5656
}
5757

58-
void UsbClient::Init() { work_thread_.init(); }
58+
void UsbClient::Init() {
59+
work_thread_.init();
60+
read_thread_.init();
61+
write_thread_.init();
62+
dispatch_thread_.init();
63+
}
5964

6065
void UsbClient::StartUp(const std::shared_ptr<UsbClientListener> &listener) {
6166
LOGI("UsbClient: StartUp.");
@@ -69,9 +74,8 @@ void UsbClient::StartInternal(
6974
LOGI("UsbClient: StartInternal.");
7075
connect_status_ = USBConnectStatus::CONNECTING;
7176
listener_ = listener;
72-
latch_ = std::make_unique<CountDownLatch>(kThreadCount);
73-
StartReader(socket_fd_);
74-
StartWriter(socket_fd_);
77+
StartReader();
78+
StartWriter();
7579
}
7680

7781
bool UsbClient::ReadAndCheckMessageHeader(char *header, SocketType socket_fd_) {
@@ -132,14 +136,14 @@ bool UsbClient::Read(SocketType socket_fd_, char *buffer, uint32_t read_size) {
132136
return true;
133137
}
134138

135-
void UsbClient::ReadMessage(SocketType socket_fd_) {
136-
LOGI("UsbClient: ReadMessage:" << socket_fd_);
139+
void UsbClient::ReadMessage() {
140+
LOGI("UsbClient: ReadMessage:" << socket_guard_.Get());
137141
bool isFirst = true;
138142
while (true) {
139143
char header[kFrameHeaderLen];
140144
memset(header, 0, kFrameHeaderLen);
141145
LOGI("UsbClient: start check message header.");
142-
if (!ReadAndCheckMessageHeader(header, socket_fd_)) {
146+
if (!ReadAndCheckMessageHeader(header, socket_guard_.Get())) {
143147
LOGW("UsbClient: don't match DebugRouter protocol:");
144148
// need DebugRouterReport to report invailed client.
145149
for (int i = 0; i < kFrameHeaderLen; i++) {
@@ -160,7 +164,7 @@ void UsbClient::ReadMessage(SocketType socket_fd_) {
160164
isFirst = false;
161165
}
162166
char payload_size[kPayloadSizeLen];
163-
if (!Read(socket_fd_, payload_size, kPayloadSizeLen)) {
167+
if (!Read(socket_guard_.Get(), payload_size, kPayloadSizeLen)) {
164168
LOGE("read payload data error: " << GetErrorMessage());
165169
if (listener_) {
166170
listener_->OnError(shared_from_this(), GetErrorMessage(),
@@ -181,7 +185,7 @@ void UsbClient::ReadMessage(SocketType socket_fd_) {
181185
continue;
182186
}
183187
char payload[payload_size_int];
184-
if (!Read(socket_fd_, payload, payload_size_int)) {
188+
if (!Read(socket_guard_.Get(), payload, payload_size_int)) {
185189
LOGE("read payload data error: " << GetErrorMessage());
186190
if (listener_) {
187191
listener_->OnError(shared_from_this(), GetErrorMessage(),
@@ -204,21 +208,13 @@ void UsbClient::ReadMessage(SocketType socket_fd_) {
204208
LOGI("UsbClient: ReadMessage thread exit.");
205209
incoming_message_queue_.put(std::move(kMessageQuit));
206210
outgoing_message_queue_.put(std::move(kMessageQuit));
207-
if (latch_) {
208-
latch_->CountDown();
209-
}
210211
}
211212

212-
void UsbClient::ReadThreadFunc(std::shared_ptr<UsbClient> client,
213-
SocketType socket_fd_) {
214-
client->ReadMessage(socket_fd_);
215-
}
216-
217-
void UsbClient::StartReader(SocketType socket_fd_) {
213+
void UsbClient::StartReader() {
218214
LOGI("UsbClient: start reader thread.");
219-
StartMessageDispatcher(socket_fd_);
220-
std::thread read_thread(ReadThreadFunc, shared_from_this(), socket_fd_);
221-
read_thread.detach();
215+
StartMessageDispatcher();
216+
read_thread_.submit(
217+
[client_ptr = shared_from_this()]() { client_ptr->ReadMessage(); });
222218
}
223219

224220
void UsbClient::MessageDispatcher() {
@@ -240,20 +236,13 @@ void UsbClient::MessageDispatcher() {
240236
}
241237
}
242238
LOGI("UsbClient: message dispatcher finished.");
243-
if (latch_) {
244-
latch_->CountDown();
245-
}
246239
}
247240

248-
void UsbClient::MessageDispatcherFunc(std::shared_ptr<UsbClient> client) {
249-
client->MessageDispatcher();
250-
}
251-
252-
void UsbClient::StartMessageDispatcher(SocketType socket_fd_) {
241+
void UsbClient::StartMessageDispatcher() {
253242
LOGI("UsbClient: startMessageDispatcher.");
254-
std::thread dispatch_message_thread(MessageDispatcherFunc,
255-
shared_from_this());
256-
dispatch_message_thread.detach();
243+
244+
dispatch_thread_.submit(
245+
[client_ptr = shared_from_this()]() { client_ptr->MessageDispatcher(); });
257246
}
258247

259248
void UsbClient::WrapHeader(const std::string &message, std::string &result) {
@@ -288,8 +277,8 @@ void UsbClient::WrapHeader(const std::string &message, std::string &result) {
288277
memcpy(buffer + 20, message.c_str(), message.size());
289278
}
290279

291-
void UsbClient::WriteMessage(SocketType socket_fd_) {
292-
LOGI("UsbClient: WriteMessage:" << socket_fd_);
280+
void UsbClient::WriteMessage() {
281+
LOGI("UsbClient: WriteMessage:" << socket_guard_.Get());
293282
while (true) {
294283
std::string message;
295284
message = outgoing_message_queue_.take();
@@ -303,8 +292,8 @@ void UsbClient::WriteMessage(SocketType socket_fd_) {
303292
LOGI(message);
304293
std::string result_message;
305294
WrapHeader(message, result_message);
306-
if (send(socket_fd_, result_message.c_str(), result_message.size(), 0) ==
307-
-1) {
295+
if (send(socket_guard_.Get(), result_message.c_str(),
296+
result_message.size(), 0) == -1) {
308297
LOGE("send error: " << GetErrorMessage() << " message:" << message);
309298
if (listener_) {
310299
listener_->OnError(shared_from_this(), GetErrorMessage(),
@@ -322,38 +311,19 @@ void UsbClient::WriteMessage(SocketType socket_fd_) {
322311
"writer thread finished");
323312
}
324313
LOGI("UsbClient: WriteMessage thread exit.");
325-
if (latch_) {
326-
latch_->CountDown();
327-
}
328314
}
329315

330-
void UsbClient::WriteThreadFunc(std::shared_ptr<UsbClient> client,
331-
SocketType socket_fd_) {
332-
client->WriteMessage(socket_fd_);
333-
}
334-
335-
void UsbClient::StartWriter(SocketType socket_fd_) {
316+
void UsbClient::StartWriter() {
336317
LOGI("UsbClient: start writer thread.");
337-
std::thread write_thread(WriteThreadFunc, shared_from_this(), socket_fd_);
338-
write_thread.detach();
318+
write_thread_.submit(
319+
[client_ptr = shared_from_this()]() { client_ptr->WriteMessage(); });
339320
}
340321

341322
void UsbClient::DisconnectInternal() {
342323
LOGI("UsbClient: DisconnectInternal.");
343-
if (latch_) {
344-
incoming_message_queue_.put(std::move(kMessageQuit));
345-
outgoing_message_queue_.put(std::move(kMessageQuit));
346-
347-
LOGI("UsbClient: DisconnectInternal waiting for threads exit.");
348-
latch_->Await();
349-
connect_status_ = USBConnectStatus::DISCONNECTED;
350-
351-
incoming_message_queue_.clear();
352-
outgoing_message_queue_.clear();
353-
latch_ = nullptr;
354-
LOGI("UsbClient: DisconnectInternal successfully.");
355-
}
356-
CloseClientSocket(socket_fd_);
324+
incoming_message_queue_.put(std::move(kMessageQuit));
325+
outgoing_message_queue_.put(std::move(kMessageQuit));
326+
socket_guard_.Reset();
357327
}
358328

359329
bool UsbClient::Send(const std::string &message) {
@@ -369,7 +339,16 @@ bool UsbClient::Send(const std::string &message) {
369339
return true;
370340
}
371341

372-
void UsbClient::Stop() { work_thread_.shutdown(); }
342+
void UsbClient::Stop() {
343+
DisconnectInternal();
344+
dispatch_thread_.shutdown();
345+
write_thread_.shutdown();
346+
read_thread_.shutdown();
347+
work_thread_.shutdown();
348+
incoming_message_queue_.clear();
349+
outgoing_message_queue_.clear();
350+
connect_status_ = USBConnectStatus::DISCONNECTED;
351+
}
373352

374353
void UsbClient::SendInternal(const std::string &message) {
375354
LOGI("UsbClient: SendInternal.");
@@ -383,8 +362,7 @@ void UsbClient::SendInternal(const std::string &message) {
383362

384363
UsbClient::~UsbClient() {
385364
LOGI("UsbClient: ~UsbClient.");
386-
// TODO(popoaichuiniu) optimize thread policy of UsbClient's fields
387-
DisconnectInternal();
365+
Stop();
388366
}
389367

390368
} // namespace socket_server

debug_router/native/socket/usb_client.h

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#ifndef DEBUGROUTER_NATIVE_SOCKET_USB_CLIENT_H_
66
#define DEBUGROUTER_NATIVE_SOCKET_USB_CLIENT_H_
77

8+
#include "debug_router/native/base/socket_guard.h"
89
#include "debug_router/native/socket/blocking_queue.h"
910
#include "debug_router/native/socket/count_down_latch.h"
1011
#include "debug_router/native/socket/socket_server_type.h"
@@ -41,13 +42,12 @@ class UsbClient : public std::enable_shared_from_this<UsbClient> {
4142
void DisconnectInternal();
4243
void SendInternal(const std::string &message);
4344

44-
void StartReader(SocketType socket_fd_);
45-
void StartWriter(SocketType socket_fd_);
46-
void StartMessageDispatcher(SocketType socket_fd_);
47-
void ReadMessage(SocketType socket_fd_);
45+
void StartReader();
46+
void StartWriter();
47+
void StartMessageDispatcher();
48+
void ReadMessage();
4849
void MessageDispatcher();
49-
void WriteMessage(SocketType socket_fd_);
50-
void HandleFirstFrame(SocketType socket_fd);
50+
void WriteMessage();
5151

5252
bool Read(SocketType socket_fd_, char *buffer, uint32_t read_size);
5353
bool ReadAndCheckMessageHeader(char *header, SocketType socket_fd_);
@@ -82,24 +82,19 @@ class UsbClient : public std::enable_shared_from_this<UsbClient> {
8282
*/
8383
static void WrapHeader(const std::string &message, std::string &result);
8484

85-
// work threads
86-
static void MessageDispatcherFunc(std::shared_ptr<UsbClient> client);
87-
static void ReadThreadFunc(std::shared_ptr<UsbClient> client,
88-
SocketType socket_fd_);
89-
static void WriteThreadFunc(std::shared_ptr<UsbClient> client,
90-
SocketType socket_fd_);
91-
9285
private:
9386
BlockingQueue<std::string> incoming_message_queue_;
9487
BlockingQueue<std::string> outgoing_message_queue_;
9588

9689
base::WorkThreadExecutor work_thread_;
90+
base::WorkThreadExecutor read_thread_;
91+
base::WorkThreadExecutor write_thread_;
92+
base::WorkThreadExecutor dispatch_thread_;
9793
std::shared_ptr<UsbClientListener> listener_;
9894
USBConnectStatus connect_status_ = USBConnectStatus::DISCONNECTED;
9995
std::unique_ptr<CountDownLatch> latch_;
10096

101-
// TODO(popoaichuiniu) use unique_fd to manage socket
102-
volatile SocketType socket_fd_ = kInvalidSocket;
97+
base::SocketGuard socket_guard_;
10398
// mutex for close socket_fd_
10499
std::mutex mutex_;
105100
};

0 commit comments

Comments
 (0)