-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathio_queue.h
195 lines (176 loc) · 5.92 KB
/
io_queue.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
/*
The MIT License (MIT)
Copyright (c) 2012-2014 Erik Soma
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#ifndef IO_QUEUE_H
#define IO_QUEUE_H
// standard library
#include <assert.h>
#include <malloc.h>
#include <stdatomic.h>
#include <string.h>
#ifndef NDEBUG
#define IO_QUEUE_ASSERT(x) assert(x)
#else
#define IO_QUEUE_ASSERT(x) ((void)(x))
#endif
// all io-queue operations return an IoQueueResult
typedef int IoQueueResult;
#define IO_QUEUE_RESULT_TRUE (2)
#define IO_QUEUE_RESULT_SUCCESS (1)
#define IO_QUEUE_RESULT_FALSE (0)
#define IO_QUEUE_RESULT_OUT_OF_MEMORY (-1)
// internal structure used for implementing the queue
typedef struct IoQueueNode IoQueueNode;
typedef struct IoQueueNode
{
atomic_uintptr_t next;
// data for the node is stored "after" it
} IoQueueNode;
// queue object
typedef struct IoQueue
{
atomic_uintptr_t head;
atomic_uintptr_t tail;
size_t item_size;
} IoQueue;
// initialize an io queue structure
// should be called before anything else
//
// item_size indicates the size (in bytes, sizeof()) of the items you are
// storing in the queue
IoQueueResult io_queue_init(IoQueue* io_queue, size_t item_size)
{
IO_QUEUE_ASSERT(io_queue);
atomic_init(&io_queue->head, NULL);
atomic_init(&io_queue->tail, NULL);
io_queue->item_size = item_size;
return IO_QUEUE_RESULT_SUCCESS;
}
// checks if the queue has any data
// this is a consumer operation
//
// returns IO_QUEUE_RESULT_TRUE if there is a front and IO_QUEUE_RESULT_FALSE
// if there is not
IoQueueResult io_queue_has_front(IoQueue* io_queue)
{
IO_QUEUE_ASSERT(io_queue);
if (atomic_load(&io_queue->head) == 0)
{
return IO_QUEUE_RESULT_FALSE;
}
return IO_QUEUE_RESULT_TRUE;
}
// gets the value at the front of the queue
// this is a consumer operation
//
// the value stored will be copied to value pointed to by the data argument
IoQueueResult io_queue_front(IoQueue* io_queue, void* data)
{
IO_QUEUE_ASSERT(io_queue);
IO_QUEUE_ASSERT(data);
IoQueueNode* head = (IoQueueNode*)atomic_load(&io_queue->head);
IO_QUEUE_ASSERT(head);
memcpy(data, (void*)(head + 1), io_queue->item_size);
return IO_QUEUE_RESULT_SUCCESS;
}
// removes the item at the front of the queue
// this is a consumer operation
IoQueueResult io_queue_pop(IoQueue* io_queue)
{
assert(io_queue);
assert(io_queue_has_front(io_queue) == IO_QUEUE_RESULT_TRUE);
// get the head
IoQueueNode* popped = atomic_load(&io_queue->head);
IoQueueNode* compare = popped;
// set the tail and head to nothing if they are the same
if (atomic_compare_exchange_strong(&io_queue->tail, &compare, 0))
{
compare = popped;
// its possible for another thread to have pushed after we swap out the
// tail, in this case the head will be different then what was popped,
// so we just do a blind exchange, not caring about the result
atomic_compare_exchange_strong(&io_queue->head, &compare, 0);
}
else
// tail is different from head, set the head to the next value
{
IoQueueNode* new_head = 0;
while(!new_head)
{
// its possible that the next node hasn't been assigned yet, so just
// spin until the pushing thread stores the value
new_head = (IoQueueNode*)atomic_load(&popped->next);
}
atomic_store(&io_queue->head, new_head);
}
// delete the popped node
free(popped);
return IO_QUEUE_RESULT_SUCCESS;
}
// adds an item to the back of the queue
// this is a producer operation
//
// the value stored will be copied from the value pointed to by the data
// argument
//
// may fail with IO_QUEUE_RESULT_OUT_OF_MEMORY if the heap is exhausted
IoQueueResult io_queue_push(IoQueue* io_queue, void* data)
{
IO_QUEUE_ASSERT(io_queue);
// create the new tail
IoQueueNode* new_tail = malloc(
sizeof(IoQueueNode) + io_queue->item_size
);
if (!new_tail){ return IO_QUEUE_RESULT_OUT_OF_MEMORY; }
atomic_init(&new_tail->next, 0);
memcpy(new_tail + 1, data, io_queue->item_size);
// swap the new tail with the old
IoQueueNode* old_tail = (IoQueueNode*)atomic_exchange(
&io_queue->tail,
new_tail
);
// link the old tail to the new
if (old_tail)
{
atomic_store(&old_tail->next, new_tail);
}
else
{
atomic_store(&io_queue->head, new_tail);
}
return IO_QUEUE_RESULT_SUCCESS;
}
// clears the entire queue
// this is a consumer operation
//
// consider this to be similar to a destructor for the queue, although the queue
// will still be usueable after a clear, you should always clear it before
// deleting the IoQueue structure itself
IoQueueResult io_queue_clear(IoQueue* io_queue)
{
IO_QUEUE_ASSERT(io_queue);
// pop everything
while(io_queue_has_front(io_queue) == IO_QUEUE_RESULT_TRUE)
{
IoQueueResult result = io_queue_pop(io_queue);
IO_QUEUE_ASSERT(result == IO_QUEUE_RESULT_SUCCESS);
}
return IO_QUEUE_RESULT_SUCCESS;
}
#endif