forked from sccn/liblsl
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathresolver_impl.cpp
More file actions
249 lines (222 loc) · 9.26 KB
/
resolver_impl.cpp
File metadata and controls
249 lines (222 loc) · 9.26 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
#include <iostream>
#include <boost/asio/ip/udp.hpp>
#include <boost/asio/placeholders.hpp>
#include <boost/bind.hpp>
#include <boost/thread/thread_only.hpp>
#include "api_config.h"
#include "cast.h"
#include "resolve_attempt_udp.h"
#include "resolver_impl.h"
#include "socket_utils.h"
// === implementation of the resolver_impl class ===
using namespace lsl;
using namespace lslboost::asio;
/**
* Instantiate a new resolver and configure timing parameters.
* A note on resolution logic. If KnownPeers in the api_config is empty, a new multicast wave will be scheduled every mcast_min_rtt (until a timeout expires or the desired number of streams has been resolved).
* If KnownPeers is non-empty, a multicast wave an a unicast wave will be schedule in alternation. The spacing between waves will be no shorter than the respective minimum RTTs.
* TCP resolves are currently not implemented (but may be at a later time); these are only necessary when UDP traffic is disabled on a particular router.
*/
resolver_impl::resolver_impl(): cfg_(api_config::get_instance()), cancelled_(false), expired_(false), forget_after_(FOREVER), fast_mode_(true),
io_(io_context_p(new io_context())), resolve_timeout_expired_(*io_), wave_timer_(*io_), unicast_timer_(*io_)
{
// parse the multicast addresses into endpoints and store them
std::vector<std::string> mcast_addrs = cfg_->multicast_addresses();
uint16_t mcast_port = cfg_->multicast_port();
for (std::size_t k=0;k<mcast_addrs.size();k++) {
try {
mcast_endpoints_.push_back(udp::endpoint(ip::make_address(mcast_addrs[k]),(uint16_t)mcast_port));
}
catch(std::exception &) { }
}
// parse the per-host addresses into endpoints, and store them, too
std::vector<std::string> peers = cfg_->known_peers();
udp::resolver udp_resolver(*io_);
// for each known peer...
for (std::size_t k=0;k<peers.size();k++) {
try {
// resolve the name
udp::resolver::results_type res = udp_resolver.resolve(peers[k], to_string(cfg_->base_port()));
// for each endpoint...
for (udp::resolver::results_type::iterator i=res.begin(); i != res.end(); i++) {
// for each port in the range...
for (int p=cfg_->base_port(); p<cfg_->base_port()+cfg_->port_range(); p++)
// add a record
ucast_endpoints_.push_back(udp::endpoint(i->endpoint().address(),p));
}
} catch(std::exception &) { }
}
// generate the list of protocols to use
if (cfg_->allow_ipv6()) {
udp_protocols_.push_back(udp::v6());
tcp_protocols_.push_back(tcp::v6());
}
if (cfg_->allow_ipv4()) {
udp_protocols_.push_back(udp::v4());
tcp_protocols_.push_back(tcp::v4());
}
}
// === resolve functions ===
/**
* Resolve a query string into a list of matching stream_info's on the network.
* Blocks until at least the minimum number of streams has been resolved, or the timeout fires, or the resolve has been cancelled.
*/
std::vector<stream_info_impl> resolver_impl::resolve_oneshot(const std::string &query, int minimum, double timeout, double minimum_time) {
// reset the IO service & set up the query parameters
io_->restart();
query_ = query;
minimum_ = minimum;
wait_until_ = lsl_clock() + minimum_time;
results_.clear();
forget_after_ = FOREVER;
fast_mode_ = true;
expired_ = false;
// start a timer that cancels all outstanding IO operations and wave schedules after the timeout has expired
if (timeout != FOREVER) {
resolve_timeout_expired_.expires_after(timeout_sec(timeout));
resolve_timeout_expired_.async_wait(lslboost::bind(&resolver_impl::resolve_timeout_expired,this,placeholders::error));
}
// start the first wave of resolve packets
next_resolve_wave();
// run the IO operations until finished
if (!cancelled_) {
io_->run();
// collect output
std::vector<stream_info_impl> output;
for(result_container::iterator i=results_.begin(); i!= results_.end();i++)
output.push_back(i->second.first);
return output;
} else
return std::vector<stream_info_impl>();
}
void resolver_impl::resolve_continuous(const std::string &query, double forget_after) {
// reset the IO service & set up the query parameters
io_->restart();
query_ = query;
minimum_ = 0;
wait_until_ = 0;
results_.clear();
forget_after_ = forget_after;
fast_mode_ = false;
expired_ = false;
// start a wave of resolve packets
next_resolve_wave();
// spawn a thread that runs the IO operations
background_io_.reset(new lslboost::thread(lslboost::bind(&io_context::run,io_)));
}
/// Get the current set of results (e.g., during continuous operation).
std::vector<stream_info_impl> resolver_impl::results() {
std::vector<stream_info_impl> output;
lslboost::lock_guard<lslboost::mutex> lock(results_mut_);
double expired_before = lsl_clock() - forget_after_;
for(result_container::iterator i=results_.begin(); i!=results_.end();) {
if (i->second.second < expired_before) {
result_container::iterator tmp = i++;
results_.erase(tmp);
} else {
output.push_back(i->second.first);
i++;
}
}
return output;
}
// === timer-driven async handlers ===
/// This function starts a new wave of resolves.
void resolver_impl::next_resolve_wave() {
std::size_t num_results = 0;
{
lslboost::lock_guard<lslboost::mutex> lock(results_mut_);
num_results = results_.size();
}
if (cancelled_ || expired_ || (minimum_ && (num_results >= (std::size_t)minimum_) && lsl_clock() >= wait_until_)) {
// stopping criteria satisfied: cancel the ongoing operations
cancel_ongoing_resolve();
} else {
// start a new multicast wave
udp_multicast_burst();
if (!ucast_endpoints_.empty()) {
// we have known peer addresses: we spawn a unicast wave and shortly thereafter the next wave
unicast_timer_.expires_after(timeout_sec(cfg_->multicast_min_rtt()));
unicast_timer_.async_wait(lslboost::bind(&resolver_impl::udp_unicast_burst,this,placeholders::error));
wave_timer_.expires_after(timeout_sec((fast_mode_?0:cfg_->continuous_resolve_interval())+(cfg_->multicast_min_rtt()+cfg_->unicast_min_rtt())));
wave_timer_.async_wait(lslboost::bind(&resolver_impl::wave_timeout_expired,this,placeholders::error));
} else {
// there are no known peer addresses; just set up the next wave
wave_timer_.expires_after(timeout_sec((fast_mode_?0:cfg_->continuous_resolve_interval())+cfg_->multicast_min_rtt()));
wave_timer_.async_wait(lslboost::bind(&resolver_impl::wave_timeout_expired,this,placeholders::error));
}
}
}
/// Start a new resolver attepmpt on the multicast hosts.
void resolver_impl::udp_multicast_burst() {
// start one per IP stack under consideration
for (std::size_t k=0,failures=0;k<udp_protocols_.size();k++) {
try {
resolve_attempt_udp_p attempt(new resolve_attempt_udp(*io_,udp_protocols_[k],mcast_endpoints_,query_,results_,results_mut_,cfg_->multicast_max_rtt(),this));
attempt->begin();
} catch(std::exception &e) {
if (++failures == udp_protocols_.size())
std::cerr << "Could not start a multicast resolve attempt for any of the allowed protocol stacks: " << e.what() << std::endl;
}
}
}
/// Start a new resolver attempt on the known peers.
void resolver_impl::udp_unicast_burst(error_code err) {
if (err != error::operation_aborted) {
// start one per IP stack under consideration
for (std::size_t k=0,failures=0;k<udp_protocols_.size();k++) {
try {
resolve_attempt_udp_p attempt(new resolve_attempt_udp(*io_,udp_protocols_[k],ucast_endpoints_,query_,results_,results_mut_,cfg_->unicast_max_rtt(),this));
attempt->begin();
} catch(std::exception &e) {
if (++failures == udp_protocols_.size())
std::cerr << "Could not start a unicast resolve attempt for any of the allowed protocol stacks: " << e.what() << std::endl;
}
}
}
}
/// This handler is called when the overall resolve timeout (if any) expires.
void resolver_impl::resolve_timeout_expired(error_code err) {
if (err != error::operation_aborted)
cancel_ongoing_resolve();
}
/// This handler is called when the wave timeout expires.
void resolver_impl::wave_timeout_expired(error_code err) {
if (err != error::operation_aborted)
next_resolve_wave();
}
// === cancellation and teardown ===
/// Cancel any ongoing operations and render the resolver unusable.
/// This can be used to cancel a blocking resolve_oneshot() from another thread (e.g., to initiate teardown of the object).
void resolver_impl::cancel() {
cancelled_ = true;
cancel_ongoing_resolve();
}
/// Cancel an ongoing resolve, if any (otherwise without effect).
void resolver_impl::cancel_ongoing_resolve() {
// make sure that ongoing handler loops terminate
expired_ = true;
// timer fires: cancel the next wave schedule
post(*io_, lslboost::bind(&steady_timer::cancel, &wave_timer_));
post(*io_, lslboost::bind(&steady_timer::cancel, &unicast_timer_));
// and cancel the timeout, too
post(*io_, lslboost::bind(&steady_timer::cancel, &resolve_timeout_expired_));
// cancel all currently active resolve attempts
cancel_all_registered();
}
/// Destructor.
/// Cancels any ongoing processes and waits until they finish.
resolver_impl::~resolver_impl() {
try {
if (background_io_) {
cancel();
background_io_->join();
}
}
catch(std::exception &e) {
std::cerr << "Error during destruction of a resolver_impl: " << e.what() << std::endl;
}
catch(...) {
std::cerr << "Severe error during destruction of a resolver_impl." << std::endl;
}
}