forked from sccn/liblsl
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstream_outlet_impl.cpp
More file actions
145 lines (131 loc) · 6.22 KB
/
stream_outlet_impl.cpp
File metadata and controls
145 lines (131 loc) · 6.22 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
#include "stream_outlet_impl.h"
#include <boost/bind.hpp>
#include <boost/thread/thread_only.hpp>
#include "tcp_server.h"
#include "udp_server.h"
// === implementation of the stream_outlet_impl class ===
using namespace lsl;
using namespace lslboost::asio;
/**
* Establish a new stream outlet. This makes the stream discoverable.
* @param info The stream information to use for creating this stream stays constant over the lifetime of the outlet.
* @param chunk_size The preferred chunk size, in samples, at which data shall be transmitted over the network. Can be selectively overridden by the inlet.
* If 0 (=default), the chunk size is determined by the pushthrough flag in push_sample or push_chunk.
* @param max_capacity The maximum number of samples buffered for unresponsive receivers. If more samples get pushed, the oldest will be dropped.
* The default is sufficient to hold a bit more than 15 minutes of data at 512Hz, while consuming not more than ca. 512MB of RAM.
*/
stream_outlet_impl::stream_outlet_impl(const stream_info_impl &info, int chunk_size, int max_capacity): chunk_size_(chunk_size), info_(new stream_info_impl(info)),
sample_factory_(new factory(info.channel_format(),info.channel_count(),info.nominal_srate()?info.nominal_srate()*api_config::get_instance()->outlet_buffer_reserve_ms()/1000:api_config::get_instance()->outlet_buffer_reserve_samples())), send_buffer_(new send_buffer(max_capacity))
{
ensure_lsl_initialized();
const api_config *cfg = api_config::get_instance();
// instantiate IPv4 and/or IPv6 stacks (depending on settings)
if (cfg->allow_ipv4()) try {
instantiate_stack(tcp::v4(), udp::v4());
} catch (std::exception &e) {
std::cerr << "Could not instantiate IPv4 stack: " << e.what() << std::endl;
}
if (cfg->allow_ipv6()) try {
instantiate_stack(tcp::v6(), udp::v6());
} catch (std::exception &e) {
std::cerr << "Could not instantiate IPv6 stack: " << e.what() << std::endl;
}
// fail if both stacks failed to instantiate
if (tcp_servers_.empty() || udp_servers_.empty())
throw std::runtime_error("Neither the IPv4 nor the IPv6 stack could be instantiated.");
// get the async request chains set up
for (std::size_t k=0;k<tcp_servers_.size();k++)
tcp_servers_[k]->begin_serving();
for (std::size_t k = 0; k < udp_servers_.size(); k++)
udp_servers_[k]->begin_serving();
for (std::size_t k = 0; k < responders_.size(); k++)
responders_[k]->begin_serving();
// and start the IO threads to handle them
for (std::size_t k = 0; k < ios_.size(); k++)
io_threads_.push_back(thread_p(new lslboost::thread(lslboost::bind(&stream_outlet_impl::run_io,this,ios_[k]))));
}
/**
* Instantiate a new server stack.
*/
void stream_outlet_impl::instantiate_stack(tcp tcp_protocol, udp udp_protocol) {
// get api_config
const api_config *cfg = api_config::get_instance();
std::string listen_address = cfg->listen_address();
std::vector<std::string> multicast_addrs = cfg->multicast_addresses();
int multicast_ttl = cfg->multicast_ttl();
uint16_t multicast_port = cfg->multicast_port();
// create TCP data server
ios_.push_back(io_context_p(new io_context()));
tcp_servers_.push_back(tcp_server_p(new tcp_server(info_, ios_.back(), send_buffer_, sample_factory_, tcp_protocol, chunk_size_)));
// create UDP time server
ios_.push_back(io_context_p(new io_context()));
udp_servers_.push_back(udp_server_p(new udp_server(info_, *ios_.back(), udp_protocol)));
// create UDP multicast responders
for (std::vector<std::string>::iterator i=multicast_addrs.begin(); i != multicast_addrs.end(); i++) {
try {
// use only addresses for the protocol that we're supposed to use here
ip::address address(ip::make_address(*i));
if (udp_protocol == udp::v4() ? address.is_v4() : address.is_v6())
responders_.push_back(udp_server_p(new udp_server(info_, *ios_.back(), *i, multicast_port, multicast_ttl, listen_address)));
} catch(std::exception &e) {
std::clog << "Note (minor): could not create multicast responder for address " << *i << " (failed with: " << e.what() << ")" << std::endl;
}
}
}
/**
* Destructor.
* The stream will no longer be discoverable after destruction and all paired inlets will stop delivering data.
*/
stream_outlet_impl::~stream_outlet_impl() {
try {
// cancel all request chains
for (std::size_t k=0;k<tcp_servers_.size();k++)
tcp_servers_[k]->end_serving();
for (std::size_t k=0;k<udp_servers_.size();k++)
udp_servers_[k]->end_serving();
for (std::size_t k=0;k<responders_.size();k++)
responders_[k]->end_serving();
// join the IO threads
for (std::size_t k=0;k<io_threads_.size();k++)
if (!io_threads_[k]->try_join_for(lslboost::chrono::milliseconds(1000))) {
// .. using force, if necessary (should only ever happen if the CPU is maxed out)
std::cerr << "Tearing down stream_outlet of thread " << io_threads_[k]->get_id() << " (in id: " << lslboost::this_thread::get_id() << "): " << std::endl;
ios_[k]->stop();
for (int attempt=1; !io_threads_[k]->try_join_for(lslboost::chrono::milliseconds(1000)); attempt++) {
std::cerr << "Trying to kill stream_outlet (attempt #" << attempt << ")..." << std::endl;
io_threads_[k]->interrupt();
}
}
}
catch(std::exception &e) {
std::cerr << "Unexpected error during destruction of a stream outlet (id: " << lslboost::this_thread::get_id() << "): " << e.what() << std::endl;
}
catch(...) {
std::cerr << "Severe error during stream outlet shutdown." << std::endl;
}
}
// Run an IO service.
void stream_outlet_impl::run_io(io_context_p &ios) {
while (true) {
try {
ios->run();
return;
} catch(std::exception &e) {
std::cerr << "Error during io_context processing (id: " << lslboost::this_thread::get_id() << "): " << e.what() << std::endl;
}
}
}
/**
* Retrieve the stream info associated with this outlet.
* This is the constant meta-data header that was used to create the stream.
*/
const stream_info_impl &stream_outlet_impl::info() const { return *info_; }
/**
* Check whether consumers are currently registered.
*/
bool stream_outlet_impl::have_consumers() { return send_buffer_->have_consumers(); }
/**
* Wait until some consumer shows up.
* @return Whether the wait was successful.
*/
bool stream_outlet_impl::wait_for_consumers(double timeout) { return send_buffer_->wait_for_consumers(timeout); }