forked from sccn/liblsl
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathudp_server.cpp
More file actions
173 lines (148 loc) · 6.8 KB
/
udp_server.cpp
File metadata and controls
173 lines (148 loc) · 6.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
#include <iostream>
#include <boost/asio/ip/udp.hpp>
#include <boost/asio/ip/multicast.hpp>
#include <boost/asio/placeholders.hpp>
#include <boost/bind.hpp>
#include <boost/algorithm/string/trim.hpp>
#include <boost/thread/thread_only.hpp>
#include "udp_server.h"
#include "socket_utils.h"
#include "stream_info_impl.h"
// === implementation of the udp_server class ===
using namespace lsl;
using namespace lslboost::asio;
/*
* Create a UDP responder in unicast mode that listens next to a TCP server.
* This server will listen on a free local port for timedata and shortinfo requests -- mainly for timing information (unless shortinfo is needed by clients).
* @param info The stream_info of the stream to serve (shared). After success, the appropriate service port will be assigned.
* @param protocol The protocol stack to use (tcp::v4() or tcp::v6()).
*/
udp_server::udp_server(const stream_info_impl_p &info, io_context &io, udp protocol): info_(info), io_(io), socket_(new udp::socket(io)), time_services_enabled_(true) {
// open the socket for the specified protocol
socket_->open(protocol);
// bind to a free port
uint16_t port = bind_port_in_range(*socket_,protocol);
// assign the service port field
if (protocol == udp::v4())
info_->v4service_port(port);
else
info_->v6service_port(port);
}
/*
* Create a new UDP server in multicast mode.
* This server will listen on a multicast address and responds only to LSL:shortinfo requests. This is for multicast/broadcast local service discovery.
*/
udp_server::udp_server(const stream_info_impl_p &info, io_context &io, const std::string &address, uint16_t port, int ttl, const std::string &listen_address): info_(info), io_(io), socket_(new udp::socket(io)), time_services_enabled_(false) {
ip::address addr = ip::make_address(address);
bool is_broadcast = addr == ip::address_v4::broadcast();
// set up the endpoint where we listen (note: this is not yet the multicast address)
udp::endpoint listen_endpoint;
if (listen_address.empty()) {
// pick the default endpoint
if (addr.is_v4())
listen_endpoint = udp::endpoint(udp::v4(), port);
else
listen_endpoint = udp::endpoint(udp::v6(), port);
}
else {
// choose an endpoint explicitly
ip::address listen_addr = ip::make_address(listen_address);
listen_endpoint = udp::endpoint(listen_addr, (uint16_t)port);
}
// open the socket and make sure that we can reuse the address, and bind it
socket_->open(listen_endpoint.protocol());
socket_->set_option(udp::socket::reuse_address(true));
// set the multicast TTL
if (addr.is_multicast() && !is_broadcast)
socket_->set_option(ip::multicast::hops(ttl));
// bind to the listen endpoint
socket_->bind(listen_endpoint);
// join the multicast group, if any
if (addr.is_multicast() && !is_broadcast) {
if (addr.is_v4())
socket_->set_option(ip::multicast::join_group(addr.to_v4(),listen_endpoint.address().to_v4()));
else
socket_->set_option(ip::multicast::join_group(addr));
}
}
// === externally issued asynchronous commands ===
/// Start serving UDP traffic.
/// Call this only after the (shared) info object has been initialized by all other parties, too.
void udp_server::begin_serving() {
// pre-calculate the shortinfo message (now that everyone should have initialized their part).
shortinfo_msg_ = info_->to_shortinfo_message();
// start asking for a packet
request_next_packet();
}
/// Gracefully close a socket.
void close_if_open(udp_socket_p sock) {
try {
if (sock->is_open())
sock->close();
} catch(std::exception &e) {
std::cerr << "Error during close_if_open (thread id: " << lslboost::this_thread::get_id() << "): " << e.what() << std::endl;
}
}
/// Initiate teardown of UDP traffic.
void udp_server::end_serving() {
// gracefully close the socket; this will eventually lead to the cancellation of the IO operation(s) tied to its socket
post(io_, lslboost::bind(&close_if_open, socket_));
}
// === receive / reply loop ===
/// Initiate next packet request.
/// The result of the operation will eventually trigger the handle_receive_outcome() handler.
void udp_server::request_next_packet() {
socket_->async_receive_from(lslboost::asio::buffer(buffer_), remote_endpoint_,
lslboost::bind(&udp_server::handle_receive_outcome, shared_from_this(), placeholders::error, placeholders::bytes_transferred));
}
/// Handler that gets called when the next packet was received (or the op was cancelled).
void udp_server::handle_receive_outcome(error_code err, std::size_t len) {
if (err != error::operation_aborted && err != error::shut_down) {
try {
if (!err) {
// remember the time of packet reception for possible later use
double t1 = time_services_enabled_ ? lsl_clock() : 0.0;
// wrap received packet into a request stream and parse the method from it
std::istringstream request_stream(std::string(buffer_,buffer_+len));
std::string method; getline(request_stream,method); lslboost::trim(method);
if (method == "LSL:shortinfo") {
// shortinfo request: parse content query string
std::string query; getline(request_stream,query); lslboost::trim(query);
// parse return address, port, and query ID
uint16_t return_port; request_stream >> return_port;
std::string query_id; request_stream >> query_id;
// check query
if (info_->matches_query(query)) {
// query matches: send back reply
udp::endpoint return_endpoint(remote_endpoint_.address(), return_port);
string_p replymsg(new std::string((query_id += "\r\n") += shortinfo_msg_));
socket_->async_send_to(lslboost::asio::buffer(*replymsg), return_endpoint,
lslboost::bind(&udp_server::handle_send_outcome,shared_from_this(),replymsg,placeholders::error));
return;
}
} else {
if (time_services_enabled_ && method == "LSL:timedata") {
// timedata request: parse time of original transmission
int wave_id; request_stream >> wave_id;
double t0; request_stream >> t0;
// send it off (including the time of packet submission and a shared ptr to the message content owned by the handler)
std::ostringstream reply; reply.precision(16); reply << " " << wave_id << " " << t0 << " " << t1 << " " << lsl_clock();
string_p replymsg(new std::string(reply.str()));
socket_->async_send_to(lslboost::asio::buffer(*replymsg), remote_endpoint_,
lslboost::bind(&udp_server::handle_send_outcome,shared_from_this(),replymsg,placeholders::error));
return;
}
}
}
} catch(std::exception &e) {
std::cerr << "udp_server: hiccup during request processing: " << e.what() << std::endl;
}
request_next_packet();
}
}
/// Handler that's called after a response packet has been sent off.
void udp_server::handle_send_outcome(string_p /*replymsg*/, error_code err) {
if (err != error::operation_aborted && err != error::shut_down)
// done sending: ask for next packet
request_next_packet();
}