forked from sccn/liblsl
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsend_buffer.cpp
More file actions
68 lines (54 loc) · 2.23 KB
/
send_buffer.cpp
File metadata and controls
68 lines (54 loc) · 2.23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
#include "consumer_queue.h"
#include "send_buffer.h"
#include <boost/bind.hpp>
// === implementation of the send_buffer class ===
using namespace lsl;
/**
* Create a new send buffer.
* @param max_capacity Hard upper bound on queue capacity beyond which the oldest samples will be dropped.
*/
send_buffer::send_buffer(int max_capacity): max_capacity_(max_capacity) {}
/**
* Add a new consumer to the send buffer.
* Each consumer will get all samples (although the oldest samples will be dropped when the buffer capacity is overrun).
* @param max_buffered If non-zero, the queue size for this consumer will be constrained to be no larger than this value.
* Note that the actual queue size will also never exceed the max_capacity of the send_buffer (so this is
* a global limit).
* @return Shared pointer to the newly created queue.
*/
consumer_queue_p send_buffer::new_consumer(int max_buffered) {
max_buffered = max_buffered ? std::min(max_buffered,max_capacity_) : max_capacity_;
return consumer_queue_p(new consumer_queue(max_buffered, shared_from_this()));
}
/**
* Push a sample onto the send buffer.
* Will subsequently be seen by all consumers.
*/
void send_buffer::push_sample(const sample_p &s) {
lslboost::lock_guard<lslboost::mutex> lock(consumers_mut_);
for (consumer_set::iterator i=consumers_.begin(); i != consumers_.end(); i++)
(*i)->push_sample(s);
}
/// Registered a new consumer.
void send_buffer::register_consumer(consumer_queue *q) {
{
lslboost::lock_guard<lslboost::mutex> lock(consumers_mut_);
consumers_.insert(q);
}
some_registered_.notify_all();
}
/// Unregister a previously registered consumer.
void send_buffer::unregister_consumer(consumer_queue *q) {
lslboost::lock_guard<lslboost::mutex> lock(consumers_mut_);
consumers_.erase(q);
}
/// Check whether there currently are consumers.
bool send_buffer::have_consumers() {
lslboost::lock_guard<lslboost::mutex> lock(consumers_mut_);
return some_registered();
}
/// Wait until some consumers are present.
bool send_buffer::wait_for_consumers(double timeout) {
lslboost::unique_lock<lslboost::mutex> lock(consumers_mut_);
return some_registered_.wait_for(lock, lslboost::chrono::duration<double>(timeout), lslboost::bind(&send_buffer::some_registered,this));
}