-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathurcu-defer-impl.h
471 lines (418 loc) · 13 KB
/
urcu-defer-impl.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
#ifndef _URCU_DEFER_IMPL_H
#define _URCU_DEFER_IMPL_H
/*
* urcu-defer-impl.h
*
* Userspace RCU header - memory reclamation.
*
* TO BE INCLUDED ONLY FROM URCU LIBRARY CODE. See urcu-defer.h for linking
* dynamically with the userspace rcu reclamation library.
*
* Copyright (c) 2009 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
* Copyright (c) 2009 Paul E. McKenney, IBM Corporation.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*
* IBM's contributions to this file may be relicensed under LGPLv2 or later.
*/
#include <stdlib.h>
#include <pthread.h>
#include <stdio.h>
#include <signal.h>
#include <assert.h>
#include <string.h>
#include <errno.h>
#include <poll.h>
#include <sys/time.h>
#include <unistd.h>
#include <stdint.h>
#include "urcu/futex.h"
#include <urcu/compiler.h>
#include <urcu/arch.h>
#include <urcu/uatomic.h>
#include <urcu/list.h>
#include <urcu/system.h>
#include <urcu/tls-compat.h>
#include "urcu-die.h"
/*
* Number of entries in the per-thread defer queue. Must be power of 2.
*/
#define DEFER_QUEUE_SIZE (1 << 12)
#define DEFER_QUEUE_MASK (DEFER_QUEUE_SIZE - 1)
/*
* Typically, data is aligned at least on the architecture size.
* Use lowest bit to indicate that the current callback is changing.
* Assumes that (void *)-2L is not used often. Used to encode non-aligned
* functions and non-aligned data using extra space.
* We encode the (void *)-2L fct as: -2L, fct, data.
* We encode the (void *)-2L data as either:
* fct | DQ_FCT_BIT, data (if fct is aligned), or
* -2L, fct, data (if fct is not aligned).
* Here, DQ_FCT_MARK == ~DQ_FCT_BIT. Required for the test order.
*/
#define DQ_FCT_BIT (1 << 0)
#define DQ_IS_FCT_BIT(x) ((unsigned long)(x) & DQ_FCT_BIT)
#define DQ_SET_FCT_BIT(x) \
(x = (void *)((unsigned long)(x) | DQ_FCT_BIT))
#define DQ_CLEAR_FCT_BIT(x) \
(x = (void *)((unsigned long)(x) & ~DQ_FCT_BIT))
#define DQ_FCT_MARK ((void *)(~DQ_FCT_BIT))
/*
* This code section can only be included in LGPL 2.1 compatible source code.
* See below for the function call wrappers which can be used in code meant to
* be only linked with the Userspace RCU library. This comes with a small
* performance degradation on the read-side due to the added function calls.
* This is required to permit relinking with newer versions of the library.
*/
/*
* defer queue.
* Contains pointers. Encoded to save space when same callback is often used.
* When looking up the next item:
* - if DQ_FCT_BIT is set, set the current callback to DQ_CLEAR_FCT_BIT(ptr)
* - next element contains pointer to data.
* - else if item == DQ_FCT_MARK
* - set the current callback to next element ptr
* - following next element contains pointer to data.
* - else current element contains data
*/
struct defer_queue {
unsigned long head; /* add element at head */
void *last_fct_in; /* last fct pointer encoded */
unsigned long tail; /* next element to remove at tail */
void *last_fct_out; /* last fct pointer encoded */
void **q;
/* registry information */
unsigned long last_head;
struct cds_list_head list; /* list of thread queues */
};
/* Do not #define _LGPL_SOURCE to ensure we can emit the wrapper symbols */
#include "urcu-defer.h"
void __attribute__((destructor)) rcu_defer_exit(void);
extern void synchronize_rcu(void);
/*
* rcu_defer_mutex nests inside defer_thread_mutex.
*/
static pthread_mutex_t rcu_defer_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t defer_thread_mutex = PTHREAD_MUTEX_INITIALIZER;
static int32_t defer_thread_futex;
static int32_t defer_thread_stop;
/*
* Written to only by each individual deferer. Read by both the deferer and
* the reclamation tread.
*/
static DEFINE_URCU_TLS(struct defer_queue, defer_queue);
static CDS_LIST_HEAD(registry_defer);
static pthread_t tid_defer;
static void mutex_lock_defer(pthread_mutex_t *mutex)
{
int ret;
#ifndef DISTRUST_SIGNALS_EXTREME
ret = pthread_mutex_lock(mutex);
if (ret)
urcu_die(ret);
#else /* #ifndef DISTRUST_SIGNALS_EXTREME */
while ((ret = pthread_mutex_trylock(mutex)) != 0) {
if (ret != EBUSY && ret != EINTR)
urcu_die(ret);
(void) poll(NULL,0,10);
}
#endif /* #else #ifndef DISTRUST_SIGNALS_EXTREME */
}
/*
* Wake-up any waiting defer thread. Called from many concurrent threads.
*/
static void wake_up_defer(void)
{
if (caa_unlikely(uatomic_read(&defer_thread_futex) == -1)) {
uatomic_set(&defer_thread_futex, 0);
if (futex_noasync(&defer_thread_futex, FUTEX_WAKE, 1,
NULL, NULL, 0) < 0)
urcu_die(errno);
}
}
static unsigned long rcu_defer_num_callbacks(void)
{
unsigned long num_items = 0, head;
struct defer_queue *index;
mutex_lock_defer(&rcu_defer_mutex);
cds_list_for_each_entry(index, ®istry_defer, list) {
head = CMM_LOAD_SHARED(index->head);
num_items += head - index->tail;
}
mutex_unlock(&rcu_defer_mutex);
return num_items;
}
/*
* Defer thread waiting. Single thread.
*/
static void wait_defer(void)
{
uatomic_dec(&defer_thread_futex);
/* Write futex before read queue */
/* Write futex before read defer_thread_stop */
cmm_smp_mb();
if (_CMM_LOAD_SHARED(defer_thread_stop)) {
uatomic_set(&defer_thread_futex, 0);
pthread_exit(0);
}
if (rcu_defer_num_callbacks()) {
cmm_smp_mb(); /* Read queue before write futex */
/* Callbacks are queued, don't wait. */
uatomic_set(&defer_thread_futex, 0);
} else {
cmm_smp_rmb(); /* Read queue before read futex */
if (uatomic_read(&defer_thread_futex) != -1)
return;
while (futex_noasync(&defer_thread_futex, FUTEX_WAIT, -1,
NULL, NULL, 0)) {
switch (errno) {
case EWOULDBLOCK:
/* Value already changed. */
return;
case EINTR:
/* Retry if interrupted by signal. */
break; /* Get out of switch. */
default:
/* Unexpected error. */
urcu_die(errno);
}
}
}
}
/*
* Must be called after Q.S. is reached.
*/
static void rcu_defer_barrier_queue(struct defer_queue *queue,
unsigned long head)
{
unsigned long i;
void (*fct)(void *p);
void *p;
/*
* Tail is only modified when lock is held.
* Head is only modified by owner thread.
*/
for (i = queue->tail; i != head;) {
cmm_smp_rmb(); /* read head before q[]. */
p = CMM_LOAD_SHARED(queue->q[i++ & DEFER_QUEUE_MASK]);
if (caa_unlikely(DQ_IS_FCT_BIT(p))) {
DQ_CLEAR_FCT_BIT(p);
queue->last_fct_out = p;
p = CMM_LOAD_SHARED(queue->q[i++ & DEFER_QUEUE_MASK]);
} else if (caa_unlikely(p == DQ_FCT_MARK)) {
p = CMM_LOAD_SHARED(queue->q[i++ & DEFER_QUEUE_MASK]);
queue->last_fct_out = p;
p = CMM_LOAD_SHARED(queue->q[i++ & DEFER_QUEUE_MASK]);
}
fct = queue->last_fct_out;
fct(p);
}
cmm_smp_mb(); /* push tail after having used q[] */
CMM_STORE_SHARED(queue->tail, i);
}
static void _rcu_defer_barrier_thread(void)
{
unsigned long head, num_items;
head = URCU_TLS(defer_queue).head;
num_items = head - URCU_TLS(defer_queue).tail;
if (caa_unlikely(!num_items))
return;
synchronize_rcu();
rcu_defer_barrier_queue(&URCU_TLS(defer_queue), head);
}
void rcu_defer_barrier_thread(void)
{
mutex_lock_defer(&rcu_defer_mutex);
_rcu_defer_barrier_thread();
mutex_unlock(&rcu_defer_mutex);
}
/*
* rcu_defer_barrier - Execute all queued rcu callbacks.
*
* Execute all RCU callbacks queued before rcu_defer_barrier() execution.
* All callbacks queued on the local thread prior to a rcu_defer_barrier() call
* are guaranteed to be executed.
* Callbacks queued by other threads concurrently with rcu_defer_barrier()
* execution are not guaranteed to be executed in the current batch (could
* be left for the next batch). These callbacks queued by other threads are only
* guaranteed to be executed if there is explicit synchronization between
* the thread adding to the queue and the thread issuing the defer_barrier call.
*/
void rcu_defer_barrier(void)
{
struct defer_queue *index;
unsigned long num_items = 0;
if (cds_list_empty(®istry_defer))
return;
mutex_lock_defer(&rcu_defer_mutex);
cds_list_for_each_entry(index, ®istry_defer, list) {
index->last_head = CMM_LOAD_SHARED(index->head);
num_items += index->last_head - index->tail;
}
if (caa_likely(!num_items)) {
/*
* We skip the grace period because there are no queued
* callbacks to execute.
*/
goto end;
}
synchronize_rcu();
cds_list_for_each_entry(index, ®istry_defer, list)
rcu_defer_barrier_queue(index, index->last_head);
end:
mutex_unlock(&rcu_defer_mutex);
}
/*
* _defer_rcu - Queue a RCU callback.
*/
static void _defer_rcu(void (*fct)(void *p), void *p)
{
unsigned long head, tail;
/*
* Head is only modified by ourself. Tail can be modified by reclamation
* thread.
*/
head = URCU_TLS(defer_queue).head;
tail = CMM_LOAD_SHARED(URCU_TLS(defer_queue).tail);
/*
* If queue is full, or reached threshold. Empty queue ourself.
* Worse-case: must allow 2 supplementary entries for fct pointer.
*/
if (caa_unlikely(head - tail >= DEFER_QUEUE_SIZE - 2)) {
assert(head - tail <= DEFER_QUEUE_SIZE);
rcu_defer_barrier_thread();
assert(head - CMM_LOAD_SHARED(URCU_TLS(defer_queue).tail) == 0);
}
/*
* Encode:
* if the function is not changed and the data is aligned and it is
* not the marker:
* store the data
* otherwise if the function is aligned and its not the marker:
* store the function with DQ_FCT_BIT
* store the data
* otherwise:
* store the marker (DQ_FCT_MARK)
* store the function
* store the data
*
* Decode: see the comments before 'struct defer_queue'
* or the code in rcu_defer_barrier_queue().
*/
if (caa_unlikely(URCU_TLS(defer_queue).last_fct_in != fct
|| DQ_IS_FCT_BIT(p)
|| p == DQ_FCT_MARK)) {
URCU_TLS(defer_queue).last_fct_in = fct;
if (caa_unlikely(DQ_IS_FCT_BIT(fct) || fct == DQ_FCT_MARK)) {
_CMM_STORE_SHARED(URCU_TLS(defer_queue).q[head++ & DEFER_QUEUE_MASK],
DQ_FCT_MARK);
_CMM_STORE_SHARED(URCU_TLS(defer_queue).q[head++ & DEFER_QUEUE_MASK],
fct);
} else {
DQ_SET_FCT_BIT(fct);
_CMM_STORE_SHARED(URCU_TLS(defer_queue).q[head++ & DEFER_QUEUE_MASK],
fct);
}
}
_CMM_STORE_SHARED(URCU_TLS(defer_queue).q[head++ & DEFER_QUEUE_MASK], p);
cmm_smp_wmb(); /* Publish new pointer before head */
/* Write q[] before head. */
CMM_STORE_SHARED(URCU_TLS(defer_queue).head, head);
cmm_smp_mb(); /* Write queue head before read futex */
/*
* Wake-up any waiting defer thread.
*/
wake_up_defer();
}
static void *thr_defer(void *args)
{
for (;;) {
/*
* "Be green". Don't wake up the CPU if there is no RCU work
* to perform whatsoever. Aims at saving laptop battery life by
* leaving the processor in sleep state when idle.
*/
wait_defer();
/* Sleeping after wait_defer to let many callbacks enqueue */
(void) poll(NULL,0,100); /* wait for 100ms */
rcu_defer_barrier();
}
return NULL;
}
/*
* library wrappers to be used by non-LGPL compatible source code.
*/
void defer_rcu(void (*fct)(void *p), void *p)
{
_defer_rcu(fct, p);
}
static void start_defer_thread(void)
{
int ret;
ret = pthread_create(&tid_defer, NULL, thr_defer, NULL);
assert(!ret);
}
static void stop_defer_thread(void)
{
int ret;
void *tret;
_CMM_STORE_SHARED(defer_thread_stop, 1);
/* Store defer_thread_stop before testing futex */
cmm_smp_mb();
wake_up_defer();
ret = pthread_join(tid_defer, &tret);
assert(!ret);
CMM_STORE_SHARED(defer_thread_stop, 0);
/* defer thread should always exit when futex value is 0 */
assert(uatomic_read(&defer_thread_futex) == 0);
}
int rcu_defer_register_thread(void)
{
int was_empty;
assert(URCU_TLS(defer_queue).last_head == 0);
assert(URCU_TLS(defer_queue).q == NULL);
URCU_TLS(defer_queue).q = malloc(sizeof(void *) * DEFER_QUEUE_SIZE);
if (!URCU_TLS(defer_queue).q)
return -ENOMEM;
mutex_lock_defer(&defer_thread_mutex);
mutex_lock_defer(&rcu_defer_mutex);
was_empty = cds_list_empty(®istry_defer);
cds_list_add(&URCU_TLS(defer_queue).list, ®istry_defer);
mutex_unlock(&rcu_defer_mutex);
if (was_empty)
start_defer_thread();
mutex_unlock(&defer_thread_mutex);
return 0;
}
void rcu_defer_unregister_thread(void)
{
int is_empty;
mutex_lock_defer(&defer_thread_mutex);
mutex_lock_defer(&rcu_defer_mutex);
cds_list_del(&URCU_TLS(defer_queue).list);
_rcu_defer_barrier_thread();
free(URCU_TLS(defer_queue).q);
URCU_TLS(defer_queue).q = NULL;
is_empty = cds_list_empty(®istry_defer);
mutex_unlock(&rcu_defer_mutex);
if (is_empty)
stop_defer_thread();
mutex_unlock(&defer_thread_mutex);
}
void rcu_defer_exit(void)
{
assert(cds_list_empty(®istry_defer));
}
#endif /* _URCU_DEFER_IMPL_H */