-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathThreadpool.hpp
89 lines (76 loc) · 2.17 KB
/
Threadpool.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
#ifndef THREADPOOL_HPP_
# define THREADPOOL_HPP_
# include <future>
# include <type_traits>
# include <memory>
# include <functional>
# include <mutex>
# include <vector>
# include <iostream>
# include <queue>
# include "SafeQueue.hpp"
namespace tea {
namespace concurrency {
class Threadpool
{
protected:
using task_t = std::function<void ()>;
public:
Threadpool(size_t nb)
{
_workers.reserve(nb);
for (size_t i = 0; i < nb; ++i) {
_workers.emplace_back([this]() {
while (true) {
try { _tasks.pop()(); }
catch (UserAbort const&) { break ; }
}
});
}
}
~Threadpool()
{
_tasks.abort();
for (auto& worker: _workers) {
worker.join();
}
}
public:
Threadpool(Threadpool const& other) = delete;
Threadpool& operator=(Threadpool const& other) = delete;
public:
template <typename Callable, typename ...Types>
std::future<typename std::result_of<Callable(Types...)>::type>
push(Callable&& f, Types&&... args)
{
using return_type_t =
typename std::result_of<Callable(Types...)>::type;
using rt_promise_t = std::promise<return_type_t>;
std::shared_ptr<rt_promise_t> promise(new rt_promise_t());
task_t task = [promise, f, args...]() {
promise->set_value(f(args...));
};
{
std::lock_guard<std::mutex> lock(_mutex);
_tasks.push(task);
}
return promise->get_future();
}
size_t
unsafe_pending_tasks() const
{
return _tasks.unsafe_size();
}
size_t
pending_tasks()
{
return _tasks.size();
}
protected:
std::mutex _mutex;
std::vector<std::thread> _workers;
SafeQueue<task_t> _tasks;
};
}
}
#endif /* end of include guard: THREADPOOL_HPP_ */