forked from sccn/liblsl
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathinfo_receiver.cpp
More file actions
96 lines (87 loc) · 3.1 KB
/
info_receiver.cpp
File metadata and controls
96 lines (87 loc) · 3.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
#include "info_receiver.h"
#include "inlet_connection.h"
#include "cancellable_streambuf.h"
#include <iostream>
#include <boost/bind.hpp>
// === implementation of the info_receiver class ===
using namespace lsl;
/// Construct a new info receiver.
info_receiver::info_receiver(inlet_connection &conn): conn_(conn) {
conn_.register_onlost(this,&fullinfo_upd_);
}
/// Destructor. Stops the background activities.
info_receiver::~info_receiver() {
try {
conn_.unregister_onlost(this);
if (info_thread_.joinable())
info_thread_.join();
}
catch(std::exception &e) {
std::cerr << "Unexpected error during destruction of an info_receiver: " << e.what() << std::endl;
}
catch(...) {
std::cerr << "Severe error during info receiver shutdown." << std::endl;
}
}
/// Retrieve the complete information of the given stream, including the extended description.
const stream_info_impl &info_receiver::info(double timeout) {
lslboost::unique_lock<lslboost::mutex> lock(fullinfo_mut_);
if (!info_ready()) {
// start thread if not yet running
if (!info_thread_.joinable())
info_thread_ = lslboost::thread(&info_receiver::info_thread,this);
// wait until we are ready to return a result (or we time out)
if (timeout >= FOREVER)
fullinfo_upd_.wait(lock, lslboost::bind(&info_receiver::info_ready,this));
else
if (!fullinfo_upd_.wait_for(lock, lslboost::chrono::duration<double>(timeout), lslboost::bind(&info_receiver::info_ready,this)))
throw timeout_error("The info() operation timed out.");
}
if (conn_.lost())
throw lost_error("The stream read by this inlet has been lost. To recover, you need to re-resolve the source and re-create the inlet.");
return *fullinfo_;
}
/// The info reader thread.
void info_receiver::info_thread() {
conn_.acquire_watchdog();
try {
while (!conn_.lost() && !conn_.shutdown()) {
try {
// make a new stream buffer & stream
cancellable_streambuf buffer;
buffer.register_at(&conn_);
std::iostream server_stream(&buffer);
// connect...
buffer.connect(conn_.get_tcp_endpoint());
// send the query
server_stream << "LSL:fullinfo\r\n" << std::flush;
// receive and parse the response
std::ostringstream os; os << server_stream.rdbuf();
stream_info_impl info;
std::string msg = os.str();
info.from_fullinfo_message(msg);
// if this is not a valid streaminfo we retry
if (!info.created_at())
continue;
// store the result for pickup & return
{
lslboost::lock_guard<lslboost::mutex> lock(fullinfo_mut_);
fullinfo_ = stream_info_impl_p(new stream_info_impl(info));
}
fullinfo_upd_.notify_all();
break;
}
catch(error_code &) {
// connection-level error: closed, reset, refused, etc.
conn_.try_recover_from_error();
}
catch(std::exception &e) {
// parsing-level error: intermittent disconnect or invalid protocol
std::cerr << "Error while receiving the stream info (" << e.what() << "); retrying..." << std::endl;
conn_.try_recover_from_error();
}
}
} catch(lost_error &) { }
conn_.release_watchdog();
}
bool info_receiver::info_ready() { return fullinfo_ || conn_.lost(); }