-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathbgpoint.cpp
120 lines (97 loc) · 3.19 KB
/
bgpoint.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
#include <cstring>
#include <thread>
#include "bgpoint.h"
Buffer *BufferGraph::Point::AcquireBuffer() {
Buffer *b = nullptr;
_free_buffers->PopBlocking(b);
return b;
}
void BufferGraph::Point::ReleaseBuffer(Buffer *buffer) {
_lock.lock();
_buffer_refs[buffer->GetId()]++;
if (_buffer_refs[buffer->GetId()] == _sinks.size()) {
Buffer *xbuffer = nullptr;
_used_buffers->PopBlocking(xbuffer);
assert(xbuffer == buffer);
// atomic_thread_fence(memory_order_seq_cst);
_free_buffers->PushBlocking(buffer);
_buffer_refs[buffer->GetId()] = 0;
}
_lock.unlock();
}
void BufferGraph::Point::PushBuffer(Buffer *buffer) {
// lock.lock();
_used_buffers->PushBlocking(buffer);
// lock.unlock();
}
Buffer *BufferGraph::Point::PeakBuffer() {
// multiple reads from more than one thread
// since this is only a read operation
// it is assumed to be reentrant
// PeakBuffer() is not thread safe with
// respect to ReleaseBuffer() any thread can
// be releasing a buffer while another peaks
// therefore a lock is used
_lock.lock();
Buffer *b = nullptr;
_used_buffers->PeakBlocking(b);
_lock.unlock();
// __sync_synchronize ();
return b;
}
Buffer **BufferGraph::Point::PeakAllSources() {
// cout<<this_thread::get_id()<<" "<<this<<"peak"<<endl;
//_buffer_peak_update = 0;
Buffer *buffers_on_peak[_sources.size()];
size_t buffer_peak_update = 0;
memcpy(&buffers_on_peak[0], _buffers_on_peak, sizeof(Buffer *) * _sources.size());
int j = 0;
while (buffer_peak_update < _sources.size() && j < _max_retries) {
for (size_t i = 0; i < _sources.size(); i++) {
auto b = _sources[i]->PeakBuffer();
if (b == nullptr) {
j = _max_retries;
break;
} else if ((b && b->GetStreamId() < _cur_stream_id)) {
WARN(0, "drop" << b->GetStreamId() << " " << _cur_stream_id);
_sources[i]->ReleaseBuffer(b);
} else if (buffers_on_peak[i] != b) {
buffer_peak_update++;
buffers_on_peak[i] = b;
} else {
// we got the same buffer back
// give the readers another turn
// to get that buffer and check
// later
std::this_thread::yield();
}
}
j++;
}
if (j < _max_retries) {
memcpy(_buffers_on_peak, &buffers_on_peak[0], sizeof(Buffer *) * _sources.size());
return _buffers_on_peak;
} else {
return nullptr;
}
}
void BufferGraph::Point::ReleaseAllSources(Buffer **buffers_on_peak) {
for (size_t i = 0; i < _sources.size(); i++) {
_sources[i]->ReleaseBuffer(buffers_on_peak[i]);
}
}
void BufferGraph::Point::RegisterSink(Point *p) {
_sinks.push_back(p);
}
void BufferGraph::Point::RegisterSource(Point *p) {
_sources.push_back(p);
}
void BufferGraph::Point::Preallocate() {
_buffers_on_peak = new Buffer *[_sources.size()];
for (size_t i = 0; i < _sources.size(); i++) {
_buffers_on_peak[i] = nullptr;
}
}
void BufferGraph::Point::Flush() {
_cur_stream_id++;
}