-
Notifications
You must be signed in to change notification settings - Fork 0
/
CircAcqBuffer.h
213 lines (184 loc) · 5.28 KB
/
CircAcqBuffer.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
#pragma once
#include <cstdint>
#include <atomic>
#include <mutex>
#include <deque>
#include <chrono>
/*
Push-only ring buffer inspired by ring buffer interface of National Instruments IMAQ software.
Elements pushed to the ring are given a count corresponding to the number of times push() has been called
since the buffer was initialized. A push() constitutes a copy into buffer-managed memory.
Any n-th element can be locked out of the ring for processing, copy or display and then subsequently released.
If the n-th element isn't available yet, is already locked out, or is being accessed by another thread,
lock_out() returns -1 after timing out.
If the n-th element has been overwritten, the buffer where the n-th element would have been
is returned instead along with the count of the element you have actually locked out.
github.com/sstucker
2021
*/
typedef std::chrono::high_resolution_clock clk;
typedef std::chrono::microseconds us;
inline int mod2(int a, int b)
{
int r = a % b;
return r < 0 ? r + b : r;
}
template <typename T>
struct CircAcqElement
{
T* arr; // the buffer
int index; // position of data in ring
std::atomic_int count; // the count of the data currently in the buffer. Needs to be atomic as it is polled from outside the lock
};
template <class T>
class CircAcqBuffer
{
protected:
CircAcqElement<T>** ring;
CircAcqElement<T>* locked_out_buffer;
int ring_size;
uint64_t element_size;
std::atomic_long count; // cumulative count
std::atomic_int locked; // index of currently locked out buffer
std::atomic_int head; // Head of buffer (receives push)
std::deque<std::mutex> locks;
inline void _swap(int n)
{
locked.store(n); // Update locked out value
// Pointer swap
CircAcqElement<T>* tmp = locked_out_buffer;
locked_out_buffer = ring[n];
ring[n] = tmp;
// Update index to buffer's new position in ring
ring[n]->index = n;
}
inline long _lock_out(int n, T** buffer, int timeout_ms)
{
auto start = clk::now(); // Start timeout timer
int timeout_us = timeout_ms * 1000; // Compare using integer microseconds
while (locked.load() != -1)
{
if (std::chrono::duration_cast<us>(clk::now() - start).count() > timeout_us)
{
printf("CircAcqBuffer: Timed out waiting for locked out buffer to be released.\n");
return -1;
}
}
int requested = mod2(n, ring_size); // Get index of buffer where requested element is/was
while (n > ring[requested]->count.load())
{
if (std::chrono::duration_cast<us>(clk::now() - start).count() > timeout_us)
{
printf("CircAcqBuffer: Timed out trying to acquire %i for %i ms.\n", n, timeout_ms);
return -1;
}
}
while (!locks[requested].try_lock())
{
if (std::chrono::duration_cast<us>(clk::now() - start).count() > timeout_us)
{
printf("CircAcqBuffer: Timed out trying to unlock buffer %i for %i ms.\n", requested, timeout_ms);
return -1;
}
}
_swap(requested);
*buffer = locked_out_buffer->arr; // Return pointer to locked out buffer's array by reference
auto locked_out = locked_out_buffer->count.load(); // Return true count of the locked out buffer
locks[requested].unlock();
return locked_out;
}
public:
CircAcqBuffer()
{
ring_size = 0;
element_size = 0;
head = ATOMIC_VAR_INIT(0);
}
CircAcqBuffer(int number_of_buffers, uint64_t frame_size)
{
ring_size = number_of_buffers;
element_size = frame_size;
head = ATOMIC_VAR_INIT(0);
locked = ATOMIC_VAR_INIT(-1);
ring = new CircAcqElement<T>*[ring_size];
locks.resize(ring_size);
for (int i = 0; i < ring_size; i++)
{
ring[i] = new(CircAcqElement<T>);
ring[i]->arr = new T[element_size];
ring[i]->index = i;
ring[i]->count = -1;
}
// locked_out_buffer maps to memory swapped in to replace a buffer when it is locked out
locked_out_buffer = new(CircAcqElement<T>);
locked_out_buffer->arr = new T[element_size];
locked_out_buffer->index = -1;
locked_out_buffer->count = -1;
count = ATOMIC_VAR_INIT(-1);
}
long lock_out(int n, T** buffer, int timeout_ms)
{
return _lock_out(n, buffer, timeout_ms);
}
long lock_out(int n, T** buffer)
{
return _lock_out(n, buffer, 0);
}
void release()
{
locked.store(-1);
}
int push(T* src)
{
int oldhead = head;
locks[head].lock();
memcpy(ring[head]->arr, src, sizeof(T) * element_size);
ring[head]->count.store(count);
head = mod2(head + 1, ring_size);
count += 1;
locks[oldhead].unlock();
return oldhead;
}
T* lock_out_head()
{
locks[head].lock();
return ring[head]->arr;
}
int release_head()
{
count += 1;
ring[head]->count = count;
int oldhead = head;
head = mod2(head + 1, ring_size);
locks[oldhead].unlock();
return oldhead;
}
int get_count()
{
return count.load();
}
void clear()
{
for (int i = 0; i < ring_size; i++)
{
locks[i].lock();
ring[i]->index = i;
ring[i]->count = -1;
locks[i].unlock();
}
count.store(-1);
head.store(0);
locked.store(-1);
locked_out_buffer->index = -1;
locked_out_buffer->count = -1;
}
~CircAcqBuffer()
{
for (int i = 0; i < ring_size; i++)
{
delete[] ring[i]->arr;
}
delete[] ring;
delete locked_out_buffer;
}
};