-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy paththreadpool.cpp
82 lines (70 loc) · 2.3 KB
/
threadpool.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
#include "threadpool.h"
#include "common.h"
#include "debug.h"
#include "util/mutil.h"
// interval in usec
#define TIMEOUT 10000
vrok::ThreadPool::ThreadPool(size_t thread_count) {
_runnables.resize(thread_count);
_threads.resize(thread_count);
_thread_data.resize(thread_count);
}
bool vrok::ThreadPool::RegisterWork(size_t thread_id, Runnable *runnable) {
if (thread_id < _runnables.size()) {
_runnables[thread_id].push_back(runnable);
return true;
} else {
return false;
}
}
void vrok::ThreadPool::CreateThreads() {
for (size_t i = 0; i < _threads.size(); i++) {
ThreadData *th = new ThreadData;
th->runnables = &_runnables;
th->thread_id = i;
th->work = true;
_thread_data[i] = th;
_threads[i] = new thread(ThreadPool::Work, th);
}
}
void vrok::ThreadPool::StopThreads() {
for (size_t i = 0; i < _threads.size(); i++) {
_thread_data[i]->work = false;
}
}
void vrok::ThreadPool::JoinThreads() {
for (size_t i = 0; i < _threads.size(); i++) {
_threads[i]->join();
}
}
vrok::ThreadPool::~ThreadPool() {
for (size_t i = 0; i < _threads.size(); i++) {
delete _threads[i];
delete _thread_data[i];
}
}
void vrok::ThreadPool::Work(ThreadData *th) {
std::stringstream sstr;
sstr << "vrok:" << th->thread_id;
__set_thread_name(sstr.str());
for (size_t i = 0; i < (*th->runnables)[th->thread_id].size(); i++) {
// DBG(th->thread_id<<" "<<i);
(*th->runnables)[th->thread_id][i]->ThreadStart();
}
while (th->work) {
for (size_t i = 0; i < (*th->runnables)[th->thread_id].size(); i++) {
auto start = std::chrono::steady_clock::now();
(*th->runnables)[th->thread_id][i]->Run();
auto end = std::chrono::steady_clock::now();
long time = std::chrono::duration_cast<std::chrono::microseconds>(end - start).count();
if (time < TIMEOUT) {
vrok::Sleep((int)(TIMEOUT - time));
}
}
}
for (size_t i = 0; i < (*th->runnables)[th->thread_id].size(); i++) {
// DBG(th->thread_id<<" "<<i);
(*th->runnables)[th->thread_id][i]->ThreadEnd();
}
INFO("in use on thread:" << th->thread_id << " " << mutil_get_in_use() << " bytes");
}