diff --git a/README.md b/README.md index b69c4b2..0e2a308 100644 --- a/README.md +++ b/README.md @@ -399,7 +399,7 @@ Signal exit when we no longer want to use it, `signal_exit_when_done` -Use it like this +Use it like this (for a more complete example see the [example](examples/examples_jobs_engine.h) ) ``` enum class JobsType @@ -412,44 +412,48 @@ enum class JobsGroupType kJobsGroup1 }; ... -using JobsRequest = std::pair; -using JobsResponse = int; -using JobsEng = small::jobs_engine; -... -JobsEng jobs( - {.threads_count = 0 /*dont start any thread yet*/}, // overall config with default priorities - {.threads_count = 1, .bulk_count = 1}, // default jobs group config - {.group = JobsGroupType::kJobsGroup1}, // default jobs type config - [](auto &j /*this*/, const auto &items) { - for (auto &item : items) { - ... // item-> - } +using Request = std::pair; +using JobsEng = small::jobs_engine; +... +JobsEng::JobsConfig config{ + .m_engine = {.m_threads_count = 0 /*dont start any thread yet*/ }, // overall config with default priorities + .m_groups = { + {JobsGroupType::kJobsGroup1, {.m_threads_count = 1}}}, // config by jobs group + .m_types = { + {JobsType::kJobsType1, {.m_group = JobsGroupType::kJobsGroup1}}, + {JobsType::kJobsType2, {.m_group = JobsGroupType::kJobsGroup1}}, + }}; +... +// create jobs engine +JobsEng jobs(config); +... +jobs.add_default_processing_function([](auto &j /*this jobs engine*/, const auto &jobs_items) { + for (auto &item : jobs_items) { ... - }); - -jobs.add_jobs_group(JobsGroupType::kJobsGroup1, {.threads_count = 1}); + } + ... +}); ... -// add specific function for job1 -jobs.add_jobs_type(JobsType::kJobsType1, {.group = JobsGroupType::kJobsGroup1}, [](auto &j /*this*/, const auto &items, auto b /*extra param b*/) { - for (auto &item : items) { - ... // item-> +// add specific function for job1 (calling the function from jobs intead of config allows to pass the engine and extra param) +jobs.add_job_processing_function(JobsType::kJobsType1, [](auto &j /*this jobs engine*/, const auto &jobs_items, auto b /*extra param b*/) { + for (auto &item : jobs_items) { + ... } - -}, 5 /*extra param b*/); - -// use default config and default function for job2 -jobs.add_jobs_type(JobsType::kJobsType2); +}, 5 /*param b*/); ... JobsEng::JobsID jobs_id{}; std::vector jobs_ids; - -jobs.push_back(small::EnumPriorities::kNormal, JobsType::kJobsType1, {1, "normal"}, &jobs_id); -jobs.push_back(small::EnumPriorities::kHigh, JobsType::kJobsType2, {2, "high"}, &jobs_id); - -std::vector jobs_items = {{.type = JobsType::kJobsType1, .request = {7, "highest"}}}; -jobs.push_back(small::EnumPriorities::kHighest, jobs_items, &jobs_ids); - -jobs.push_back_delay_for(std::chrono::milliseconds(300), small::EnumPriorities::kNormal, JobsType::kJobsType1, {100, "delay normal"}, &jobs_id); +... +// push +jobs.queue().push_back(small::EnumPriorities::kNormal, JobsType::kJobsType1, {1, "normal"}, &jobs_id); +... +std::vector> jobs_items = { + std::make_shared(JobsType::kJobsType1, Request{7, "highest"}), + std::make_shared(JobsType::kJobsType1, Request{8, "highest"}), +}; +jobs.queue().push_back(small::EnumPriorities::kHighest, jobs_items, &jobs_ids); +... +jobs.queue().push_back_delay_for(std::chrono::milliseconds(300), small::EnumPriorities::kNormal, JobsType::kJobsType1, {100, "delay normal"}, &jobs_id); ... jobs.start_threads(3); // manual start threads ... diff --git a/examples/examples_jobs_engine.h b/examples/examples_jobs_engine.h index 0926d90..faa97cb 100644 --- a/examples/examples_jobs_engine.h +++ b/examples/examples_jobs_engine.h @@ -27,72 +27,86 @@ namespace examples::jobs_engine { }; using Request = std::pair; - using JobsEng = small::jobs_engine; - - JobsEng jobs( - {.threads_count = 0 /*dont start any thread yet*/}, // overall config with default priorities - {.threads_count = 1, .bulk_count = 1}, // default jobs group config - {.group = JobsGroupType::kJobsGroup1}, // default jobs type config - [](auto &j /*this*/, const auto &items) { - for (auto &item : items) { - std::cout << "thread " << std::this_thread::get_id() - << " DEFAULT processing " - << "{" - << " type=" << (int)item->type - << " req.int=" << item->request.first << "," - << " req.str=\"" << item->request.second << "\"" - << "}" - << " time " << small::toISOString(small::timeNow()) - << "\n"; - } - small::sleep(30); - }); - - jobs.add_jobs_group(JobsGroupType::kJobsGroup1, {.threads_count = 1}); - - // add specific function for job1 - jobs.add_jobs_type(JobsType::kJobsType1, {.group = JobsGroupType::kJobsGroup1}, [](auto &j /*this*/, const auto &items, auto b /*extra param b*/) { - // process item using the jobs lock (not recommended) - { - std::unique_lock mlock( j ); - for (auto &item : items) { - std::cout << "thread " << std::this_thread::get_id() - << " JOB1 processing " - << "{" - << " type=" << (int)item->type - << " req.int=" << item->request.first << "," - << " req.str=\"" << item->request.second << "\"" - << "}" - << " time " << small::toISOString(small::timeNow()) - << "\n"; - } - } - small::sleep(30); }, 5 /*param b*/); + using JobsEng = small::jobs_engine; + + auto jobs_processing_function = [](const std::vector> &items) { + // this functions is defined without the engine params (it is here just for the example) + std::cout << "this function is defined without the engine params, called for " << (int)items[0]->type << "\n"; + }; - // use default config and default function for job2 - jobs.add_jobs_type(JobsType::kJobsType2); + JobsEng::JobsConfig config{ + .m_engine = {.m_threads_count = 0 /*dont start any thread yet*/, + .m_config_prio = {.priorities = {{small::EnumPriorities::kHighest, 2}, + {small::EnumPriorities::kHigh, 2}, + {small::EnumPriorities::kNormal, 2}, + {small::EnumPriorities::kLow, 1}}}}, // overall config with default priorities + .m_default_processing_function = jobs_processing_function, // default processing function, better use jobs.add_default_processing_function to set it + .m_groups = { + {JobsGroupType::kJobsGroup1, {.m_threads_count = 1}}}, // config by jobs group + .m_types = { + {JobsType::kJobsType1, {.m_group = JobsGroupType::kJobsGroup1}}, + {JobsType::kJobsType2, {.m_group = JobsGroupType::kJobsGroup1}}, + }}; + + // create jobs engine + JobsEng jobs(config); + + jobs.add_default_processing_function([](auto &j /*this jobs engine*/, const auto &jobs_items) { + for (auto &item : jobs_items) { + std::cout << "thread " << std::this_thread::get_id() + << " DEFAULT processing " + << "{" + << " type=" << (int)item->type + << " req.int=" << item->request.first << "," + << " req.str=\"" << item->request.second << "\"" + << "}" + << " ref count " << item.use_count() + << " time " << small::toISOString(small::timeNow()) + << "\n"; + } + small::sleep(30); + }); + + // add specific function for job1 (calling the function from jobs intead of config allows to pass the engine and extra param) + jobs.add_job_processing_function(JobsType::kJobsType1, [](auto &j /*this jobs engine*/, const auto &jobs_items, auto b /*extra param b*/) { + for (auto &item : jobs_items) { + std::cout << "thread " << std::this_thread::get_id() + << " JOB1 processing " + << "{" + << " type=" << (int)item->type + << " req.int=" << item->request.first << "," + << " req.str=\"" << item->request.second << "\"" + << "}" + << " ref count " << item.use_count() + << " time " << small::toISOString(small::timeNow()) + << "\n"; + } + small::sleep(30); }, 5 /*param b*/); JobsEng::JobsID jobs_id{}; std::vector jobs_ids; // push - jobs.push_back(small::EnumPriorities::kNormal, JobsType::kJobsType1, {1, "normal"}, &jobs_id); - jobs.push_back(small::EnumPriorities::kHigh, JobsType::kJobsType2, {2, "high"}, &jobs_id); + jobs.queue().push_back(small::EnumPriorities::kNormal, JobsType::kJobsType1, {1, "normal"}, &jobs_id); + jobs.queue().push_back(small::EnumPriorities::kHigh, JobsType::kJobsType2, {2, "high"}, &jobs_id); - jobs.push_back(small::EnumPriorities::kNormal, JobsType::kJobsType1, std::make_pair(3, "normal"), &jobs_id); - jobs.push_back(small::EnumPriorities::kHigh, {.type = JobsType::kJobsType1, .request = {4, "high"}}, &jobs_id); - jobs.push_back(small::EnumPriorities::kLow, JobsType::kJobsType1, {5, "low"}, &jobs_id); + jobs.queue().push_back(small::EnumPriorities::kNormal, JobsType::kJobsType1, std::make_pair(3, "normal"), &jobs_id); + jobs.queue().push_back(small::EnumPriorities::kHigh, JobsType::kJobsType1, {4, "high"}, &jobs_id); + jobs.queue().push_back(small::EnumPriorities::kLow, JobsType::kJobsType1, {5, "low"}, &jobs_id); Request req = {6, "normal"}; - jobs.push_back(small::EnumPriorities::kNormal, {.type = JobsType::kJobsType1, .request = req}, nullptr); + jobs.queue().push_back(small::EnumPriorities::kNormal, JobsType::kJobsType1, req, nullptr); - std::vector jobs_items = {{.type = JobsType::kJobsType1, .request = {7, "highest"}}}; - jobs.push_back(small::EnumPriorities::kHighest, jobs_items, &jobs_ids); - jobs.push_back(small::EnumPriorities::kHighest, {{.type = JobsType::kJobsType1, .request = {8, "highest"}}}, &jobs_ids); + std::vector> jobs_items = { + std::make_shared(JobsType::kJobsType1, Request{7, "highest"}), + std::make_shared(JobsType::kJobsType1, Request{8, "highest"}), + }; + jobs.queue().push_back(small::EnumPriorities::kHighest, jobs_items, &jobs_ids); + jobs.queue().push_back(small::EnumPriorities::kHighest, {std::make_shared(JobsType::kJobsType1, Request{9, "highest"})}, &jobs_ids); - jobs.push_back_delay_for(std::chrono::milliseconds(300), small::EnumPriorities::kNormal, JobsType::kJobsType1, {100, "delay normal"}, &jobs_id); - jobs.push_back_delay_until(small::timeNow() + std::chrono::milliseconds(350), small::EnumPriorities::kNormal, JobsType::kJobsType1, {101, "delay normal"}, &jobs_id); - jobs.push_back_delay_for(std::chrono::milliseconds(400), small::EnumPriorities::kNormal, JobsType::kJobsType1, {102, "delay normal"}, &jobs_id); + jobs.queue().push_back_delay_for(std::chrono::milliseconds(300), small::EnumPriorities::kNormal, JobsType::kJobsType1, {100, "delay normal"}, &jobs_id); + jobs.queue().push_back_delay_until(small::timeNow() + std::chrono::milliseconds(350), small::EnumPriorities::kNormal, JobsType::kJobsType1, {101, "delay normal"}, &jobs_id); + jobs.queue().push_back_delay_for(std::chrono::milliseconds(400), small::EnumPriorities::kNormal, JobsType::kJobsType1, {102, "delay normal"}, &jobs_id); jobs.start_threads(3); // manual start threads @@ -102,6 +116,8 @@ namespace examples::jobs_engine { std::cout << "wait for with timeout, ret = " << static_cast(ret) << " as timeout\n"; jobs.wait(); // wait here for jobs to finish due to exit flag + std::cout << "size = " << jobs.size() << "\n"; + std::cout << "Jobs Engine example 1 finish\n\n"; return 0; diff --git a/include/buffer.h b/include/buffer.h index 1062c3a..e87a08b 100644 --- a/include/buffer.h +++ b/include/buffer.h @@ -53,7 +53,7 @@ namespace small { // // class for representing a buffer // - class buffer : public base_buffer + class buffer : public small::bufferimpl::base_buffer { public: // buffer (allocates in chunks) diff --git a/include/impl/base_buffer_impl.h b/include/impl/base_buffer_impl.h index 50f2e80..ddd47d3 100644 --- a/include/impl/base_buffer_impl.h +++ b/include/impl/base_buffer_impl.h @@ -9,7 +9,7 @@ #include #include -namespace small { +namespace small::bufferimpl { // class for representing a base_buffer that implements // all the needed functions and operators // it must be supplied with derived class with proper functions @@ -457,4 +457,4 @@ namespace small { std::size_t m_buffer_length{0}; }; -} // namespace small +} // namespace small::bufferimpl diff --git a/include/jobs_engine_thread_pool.h b/include/impl/jobs_engine_thread_pool_impl.h similarity index 97% rename from include/jobs_engine_thread_pool.h rename to include/impl/jobs_engine_thread_pool_impl.h index 6b4a2a8..28d941e 100644 --- a/include/jobs_engine_thread_pool.h +++ b/include/impl/jobs_engine_thread_pool_impl.h @@ -2,9 +2,9 @@ #include -#include "worker_thread.h" +#include "../worker_thread.h" -namespace small { +namespace small::jobsimpl { // // helper class for jobs_engine to execute group of jobs (parent caller must implement 'do_action') @@ -188,7 +188,7 @@ namespace small { // struct JobWorkerThreadFunction { - void operator()(small::worker_thread &, const std::vector &items, small::jobs_engine_thread_pool *pThis) const + void operator()(small::worker_thread &, const std::vector &items, jobs_engine_thread_pool *pThis) const { pThis->thread_function(items); } @@ -198,4 +198,4 @@ namespace small { small::worker_thread m_workers{{.threads_count = 0}, JobWorkerThreadFunction(), this}; ParentCallerT &m_parent_caller; // parent jobs engine }; -} // namespace small +} // namespace small::jobsimpl diff --git a/include/impl/jobs_item_impl.h b/include/impl/jobs_item_impl.h new file mode 100644 index 0000000..684d5e1 --- /dev/null +++ b/include/impl/jobs_item_impl.h @@ -0,0 +1,103 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "../base_lock.h" + +namespace small::jobsimpl { + // a job can be in the following states + enum class EnumJobsState : unsigned int + { + kNone = 0, + kInProgress, + kFinished, + kFailed, + kCancelled, + kTimeout + }; + + // a job item + template + struct jobs_item + { + using JobsID = unsigned long long; + + JobsID id{}; // job unique id + JobsTypeT type{}; // job type + std::atomic state{EnumJobsState::kNone}; // job state + std::atomic progress{}; // progress 0-100 for state kInProgress + JobsRequestT request{}; // request needed for processing function + JobsResponseT response{}; // where the results are saved (for the finished callback if exists) + + explicit jobs_item() = default; + explicit jobs_item(const JobsID &jobs_id, const JobsTypeT &jobs_type, const JobsRequestT &jobs_request) + : id(jobs_id), type(jobs_type), request(jobs_request) {} + explicit jobs_item(const JobsTypeT &jobs_type, const JobsRequestT &jobs_request) + : type(jobs_type), request(jobs_request) {} + explicit jobs_item(const JobsID &jobs_id, const JobsTypeT &jobs_type, JobsRequestT &&jobs_request) + : id(jobs_id), type(jobs_type), request(std::forward(jobs_request)) {} + explicit jobs_item(const JobsTypeT &jobs_type, JobsRequestT &&jobs_request) + : type(jobs_type), request(std::forward(jobs_request)) {} + + jobs_item(const jobs_item &other) { operator=(other); }; + jobs_item(jobs_item &&other) noexcept { operator=(other); }; + jobs_item &operator=(const jobs_item &other) + { + id = other.id; + type = other.type; + state = other.state.load(); + progress = other.progress.load(); + request = other.request; + response = other.response; + return *this; + } + jobs_item &operator=(jobs_item &&other) noexcept + { + id = std::move(other.id); + type = std::move(other.type); + state = other.state.load(); + progress = other.progress.load(); + request = std::move(other.request); + response = std::move(other.response); + return *this; + } + + // + // set job state (can only go from lower to upper state) + // + inline void set_state(const EnumJobsState &new_state) + { + for (;;) { + EnumJobsState current_state = state.load(); + if (current_state >= new_state) { + return; + } + if (state.compare_exchange_weak(current_state, new_state)) { + return; + } + } + } + + // + // set job progress (can only increase) + // + inline void set_progress(const int &new_progress) + { + for (;;) { + int current_progress = progress.load(); + if (current_progress >= new_progress) { + return; + } + if (progress.compare_exchange_weak(current_progress, new_progress)) { + return; + } + } + } + }; + +} // namespace small::jobsimpl diff --git a/include/impl/jobs_queue_impl.h b/include/impl/jobs_queue_impl.h new file mode 100644 index 0000000..b3007ac --- /dev/null +++ b/include/impl/jobs_queue_impl.h @@ -0,0 +1,395 @@ +#pragma once + +#include + +#include "../prio_queue.h" +#include "../time_queue_thread.h" + +#include "jobs_item_impl.h" + +namespace small::jobsimpl { + // + // small queue helper class for jobs (parent caller must implement 'jobs_activate') + // + template + class jobs_queue + { + public: + using JobsItem = typename small::jobsimpl::jobs_item; + using JobsID = typename JobsItem::JobsID; + using JobsQueue = small::prio_queue; + + using ThisJobsQueue = jobs_queue; + + using JobDelayedItems = std::tuple; + using JobQueueDelayedT = small::time_queue_thread; + + using TimeClock = typename small::time_queue::TimeClock; + using TimeDuration = typename small::time_queue::TimeDuration; + + public: + // + // jobs_queue + // + explicit jobs_queue(ParentCallerT &parent_caller) + : m_parent_caller(parent_caller) {} + + // size of active items + inline size_t size() + { + std::unique_lock l(m_lock); + return m_jobs.size(); + } + // empty + inline bool empty() { return size() == 0; } + // clear + inline void clear() + { + std::unique_lock l(m_lock); + m_jobs.clear(); + + m_delayed_items.clear(); + + for (auto &[group, q] : m_groups_queues) { + q.clear(); + } + } + + // clang-format off + // size of delayed items + inline size_t size_delayed() { return m_delayed_items.queue().size(); } + // empty + inline bool empty_delayed() { return size_delayed() == 0; } + // clear + inline void clear_delayed() { m_delayed_items.queue().clear(); } + // clang-format on + + // clang-format off + // use it as locker (std::unique_lock m...) + inline void lock () { m_lock.lock(); } + inline void unlock () { m_lock.unlock(); } + inline bool try_lock () { return m_lock.try_lock(); } + // clang-format on + + // + // create threads + // + inline void start_threads(const int threads_count /* = 1 */) + { + m_delayed_items.start_threads(); + } + + // + // config groups + // m_groups_queues will be initialized in the initial setup phase and will be accessed without locking afterwards + // + inline void add_jobs_group(const JobsGroupT &job_group, const small::config_prio_queue &config_prio) + { + m_groups_queues[job_group] = JobsQueue{config_prio}; + } + + // + // config job types + // m_types_queues will be initialized in the initial setup phase and will be accessed without locking afterwards + // + inline bool add_jobs_type(const JobsTypeT &jobs_type, const JobsGroupT &jobs_group) + { + auto it_g = m_groups_queues.find(jobs_group); + if (it_g == m_groups_queues.end()) { + return false; + } + + m_types_queues[jobs_type] = &it_g->second; + return true; + } + + // + // add items to be processed + // push_back + // + inline std::size_t push_back(const JobsPrioT &priority, const JobsTypeT &jobs_type, const JobsRequestT &job_req, JobsID *jobs_id = nullptr) + { + return push_back(priority, std::make_shared(jobs_type, job_req), jobs_id); + } + + inline std::size_t push_back(const JobsPrioT &priority, std::shared_ptr jobs_item, JobsID *jobs_id = nullptr) + { + if (is_exit()) { + return 0; + } + + auto id = jobs_add(jobs_item); + if (jobs_id) { + *jobs_id = id; + } + + return jobs_activate(priority, jobs_item->type, id); + } + + inline std::size_t push_back(const JobsPrioT &priority, const std::vector> &jobs_items, std::vector *jobs_ids) + { + if (is_exit()) { + return 0; + } + + std::unique_lock l(m_lock); + + std::size_t count = 0; + if (jobs_ids) { + jobs_ids->reserve(jobs_items.size()); + jobs_ids->clear(); + } + JobsID jobs_id{}; + for (auto &jobs_item : jobs_items) { + auto ret = push_back(priority, jobs_item, &jobs_id); + if (ret) { + if (jobs_ids) { + jobs_ids->push_back(jobs_id); + } + } + count += ret; + } + return count; + } + + // push_back move semantics + inline std::size_t push_back(const JobsPrioT &priority, const JobsTypeT &jobs_type, JobsRequestT &&jobs_req, JobsID *jobs_id = nullptr) + { + return push_back(priority, std::make_shared(jobs_type, std::forward(jobs_req)), jobs_id); + } + + // no emplace_back do to returning the jobs_id + + // + // push_back with specific timeings + // + template + inline std::size_t push_back_delay_for(const std::chrono::duration<_Rep, _Period> &__rtime, const JobsPrioT &priority, const JobsTypeT &jobs_type, const JobsRequestT &jobs_req, JobsID *jobs_id = nullptr) + { + return push_back_delay_for(__rtime, priority, std::make_shared(jobs_type, jobs_req), jobs_id); + } + + template + inline std::size_t push_back_delay_for(const std::chrono::duration<_Rep, _Period> &__rtime, const JobsPrioT &priority, std::shared_ptr jobs_item, JobsID *jobs_id = nullptr) + { + auto id = jobs_add(jobs_item); + if (jobs_id) { + *jobs_id = id; + } + return m_delayed_items.queue().push_delay_for(__rtime, {priority, jobs_item->type, id}); + } + + template + inline std::size_t push_back_delay_for(const std::chrono::duration<_Rep, _Period> &__rtime, const JobsPrioT &priority, const JobsTypeT &jobs_type, JobsRequestT &&jobs_req, JobsID *jobs_id = nullptr) + { + return push_back_delay_for(__rtime, priority, std::make_shared(jobs_type, std::forward(jobs_req)), jobs_id); + } + + // avoid time_casting from one clock to another // template // + inline std::size_t push_back_delay_until(const std::chrono::time_point &__atime, const JobsPrioT &priority, const JobsTypeT &jobs_type, const JobsRequestT &jobs_req, JobsID *jobs_id = nullptr) + { + return push_back_delay_until(__atime, priority, std::make_shared(jobs_type, jobs_req), jobs_id); + } + + inline std::size_t push_back_delay_until(const std::chrono::time_point &__atime, const JobsPrioT &priority, std::shared_ptr jobs_item, JobsID *jobs_id = nullptr) + { + auto id = jobs_add(jobs_item); + if (jobs_id) { + *jobs_id = id; + } + return m_delayed_items.queue().push_delay_until(__atime, {priority, jobs_item->type, id}); + } + + inline std::size_t push_back_delay_until(const std::chrono::time_point &__atime, const JobsPrioT &priority, const JobsTypeT &jobs_type, JobsRequestT &&jobs_req, JobsID *jobs_id = nullptr) + { + return push_back_delay_until(__atime, priority, std::make_shared(jobs_type, std::forward(jobs_req)), jobs_id); + } + + // clang-format off + // + // signal exit + // + inline void signal_exit_force () { + m_delayed_items.queue().signal_exit_force(); + + m_lock.signal_exit_force(); + for (auto &[group, q] : m_groups_queues) { + q.signal_exit_force(); + } + } + inline void signal_exit_when_done () { m_delayed_items.queue().signal_exit_when_done(); /*when the delayed will be finished will signal the active queue items to exit when done*/ } + + // to be used in processing function + inline bool is_exit () { return m_delayed_items.queue().is_exit_force(); } + // clang-format on + + // + // wait for threads to finish processing + // + inline EnumLock wait() + { + signal_exit_when_done(); + + // first wait for delayed items to finish + m_delayed_items.wait(); + + // only now can signal exit when done for all queues (when no more delayed items can be pushed) + for (auto &[group, q] : m_groups_queues) { + q.signal_exit_when_done(); + } + for (auto &[group, q] : m_groups_queues) { + q.wait(); + } + + return small::EnumLock::kExit; + } + + // wait some time then signal exit + template + inline EnumLock wait_for(const std::chrono::duration<_Rep, _Period> &__rtime) + { + using __dur = typename std::chrono::system_clock::duration; + auto __reltime = std::chrono::duration_cast<__dur>(__rtime); + if (__reltime < __rtime) { + ++__reltime; + } + return wait_until(std::chrono::system_clock::now() + __reltime); + } + + // wait until then signal exit + template + inline EnumLock wait_until(const std::chrono::time_point<_Clock, _Duration> &__atime) + { + signal_exit_when_done(); + + // first wait for delayed items to finish + auto delayed_status = m_delayed_items.wait_until(__atime); + if (delayed_status == small::EnumLock::kTimeout) { + return small::EnumLock::kTimeout; + } + + // only now can signal exit when done for all queues (when no more delayed items can be pushed) + for (auto &[group, q] : m_groups_queues) { + q.signal_exit_when_done(); + } + + for (auto &[group, q] : m_groups_queues) { + auto status = q.wait_until(__atime); + if (status == small::EnumLock::kTimeout) { + return small::EnumLock::kTimeout; + } + } + + return small::EnumLock::kExit; + } + + private: + friend ParentCallerT; + + // + // get group queue + // + inline JobsQueue *get_group_queue(const JobsGroupT &jobs_group) + { + auto it = m_groups_queues.find(jobs_group); + return it != m_groups_queues.end() ? &it->second : nullptr; + } + + inline std::vector> jobs_get(const std::vector &jobs_ids) + { + std::vector> jobs_items; + jobs_items.reserve(jobs_ids.size()); + + std::unique_lock l(m_lock); + + for (auto &jobs_id : jobs_ids) { + auto it_j = m_jobs.find(jobs_id); + if (it_j == m_jobs.end()) { + continue; + } + jobs_items.push_back(it_j->second); + } + + return jobs_items; // will be moved + } + + inline void jobs_del(const JobsID &jobs_id) + { + std::unique_lock l(m_lock); + m_jobs.erase(jobs_id); + } + + private: + // some prevention + jobs_queue(const jobs_queue &) = delete; + jobs_queue(jobs_queue &&) = delete; + jobs_queue &operator=(const jobs_queue &) = delete; + jobs_queue &operator=(jobs_queue &&__t) = delete; + + private: + // + // add job items + // + inline JobsID jobs_add(std::shared_ptr jobs_item) + { + std::unique_lock l(m_lock); + + ++m_jobs_seq_id; + + JobsID id = m_jobs_seq_id; + jobs_item->id = id; + m_jobs.emplace(id, jobs_item); + + return id; + } + + // activate the jobs + inline std::size_t jobs_activate(const JobsPrioT &priority, const JobsTypeT &jobs_type, const JobsID &jobs_id) + { + std::size_t ret = 0; + + // optimization to get the queue from the type + // (instead of getting the group from type from m_config.m_types and then getting the queue from the m_groups_queues) + auto it_q = m_types_queues.find(jobs_type); + if (it_q != m_types_queues.end()) { + auto *q = it_q->second; + ret = q->push_back(priority, jobs_id); + } + + if (ret) { + m_parent_caller.jobs_activate(jobs_type, jobs_id); + } else { + jobs_del(jobs_id); + } + return ret; + } + + // + // inner thread function for delayed items + // + friend JobQueueDelayedT; + + inline std::size_t push_back(std::vector &&items) + { + std::size_t count = 0; + for (auto &[priority, jobs_type, jobs_id] : items) { + count += jobs_activate(priority, jobs_type, jobs_id); + } + return count; + } + + private: + // + // members + // + mutable small::base_lock m_lock; // global locker + std::atomic m_jobs_seq_id{}; // to get the next jobs id + std::unordered_map> m_jobs; // current jobs + std::unordered_map m_groups_queues; // map of queues by group + std::unordered_map m_types_queues; // optimize to have queues by type (which reference queues by group) + + JobQueueDelayedT m_delayed_items{*this}; // queue of delayed items + + ParentCallerT &m_parent_caller; // jobs engine + }; +} // namespace small::jobsimpl diff --git a/include/jobs_config.h b/include/jobs_config.h new file mode 100644 index 0000000..81d95dc --- /dev/null +++ b/include/jobs_config.h @@ -0,0 +1,75 @@ +#pragma once + +#include + +#include "prio_queue.h" + +#include "impl/jobs_item_impl.h" + +namespace small { + // + // small class for jobs config + // - setup how many threads to use overall, and the priorities + // - for each job group how many threads to use + // - for each job type how to process and to which group it belongs + // + template + struct jobs_config + { + using JobsItem = typename small::jobsimpl::jobs_item; + using ProcessingFunction = std::function> &)>; + + // config for the entire jobs engine + struct ConfigJobsEngine + { + int m_threads_count{8}; // how many total threads for processing + small::config_prio_queue m_config_prio{}; + }; + + // config for an individual job type + struct ConfigJobsType + { + JobsGroupT m_group{}; // job type group (multiple job types can be configured to same group) + bool m_has_processing_function{false}; // use default processing function + ProcessingFunction m_processing_function{}; // processing Function + }; + + // config for the job group (where job types can be grouped) + struct ConfigJobsGroup + { + int m_threads_count{1}; // how many threads for processing (out of the global threads) + int m_bulk_count{1}; // how many objects are processed at once + }; + + ConfigJobsEngine m_engine{}; // config for entire engine (threads, priorities, etc) + ProcessingFunction m_default_processing_function{}; // default processing function + std::unordered_map m_groups; // config by jobs group + std::unordered_map m_types; // config by jobs type + + // default processing function + inline void add_default_processing_function(ProcessingFunction processing_function) + { + m_default_processing_function = processing_function; + apply_default_processing_function(); + } + + inline void add_job_processing_function(const JobsTypeT &jobs_type, ProcessingFunction processing_function) + { + auto it_f = m_types.find(jobs_type); + if (it_f == m_types.end()) { + return; + } + it_f->second.m_has_processing_function = true; + it_f->second.m_processing_function = processing_function; + } + + inline void apply_default_processing_function() + { + for (auto &[type, jobs_type_config] : m_types) { + if (jobs_type_config.m_has_processing_function == false) { + jobs_type_config.m_processing_function = m_default_processing_function; + } + } + } + }; +} // namespace small diff --git a/include/jobs_engine.h b/include/jobs_engine.h index fe74bb2..9fb17b0 100644 --- a/include/jobs_engine.h +++ b/include/jobs_engine.h @@ -2,8 +2,10 @@ #include -#include "jobs_engine_thread_pool.h" -#include "prio_queue.h" +#include "impl/jobs_engine_thread_pool_impl.h" +#include "impl/jobs_item_impl.h" +#include "impl/jobs_queue_impl.h" +#include "jobs_config.h" // enum class JobsType // { @@ -15,87 +17,51 @@ // kJobsGroup1 // }; // -// using JobsRequest = std::pair; -// using JobsResponse = int; -// using JobsEng = small::jobs_engine; +// using Request = std::pair; +// using JobsEng = small::jobs_engine; // -// JobsEng jobs( -// {.threads_count = 0 /*dont start any thread yet*/}, // overall config with default priorities -// {.threads_count = 1, .bulk_count = 1}, // default jobs group config -// {.group = JobsGroupType::kJobsGroup1}, // default jobs type config -// [](auto &j /*this*/, const auto &items) { -// for (auto &item : items) { -// ... -// } -// }); // -// jobs.add_jobs_group(JobsGroupType::kJobsGroup1, {.threads_count = 1}); +// JobsEng::JobsConfig config{ +// .m_engine = {.m_threads_count = 0 /*dont start any thread yet*/}, // overall config with default priorities +// .m_groups = { +// {JobsGroupType::kJobsGroup1, {.m_threads_count = 1}}}, // config by jobs group +// .m_types = { +// {JobsType::kJobsType1, {.m_group = JobsGroupType::kJobsGroup1}}, +// {JobsType::kJobsType2, {.m_group = JobsGroupType::kJobsGroup1}}, +// }}; +// +// // create jobs engine +// JobsEng jobs(config); +// +// jobs.add_default_processing_function([](auto &j /*this jobs engine*/, const auto &jobs_items) { +// for (auto &item : jobs_items) { +// ... +// } +// }); // // // add specific function for job1 -// jobs.add_jobs_type(JobsType::kJobsType1, {.group = JobsGroupType::kJobsGroup1}, [](auto &j /*this*/, const auto &items, auto b /*extra param b*/) { -// for (auto &item : items) { -// ... -// } +// jobs.add_job_processing_function(JobsType::kJobsType1, [](auto &j /*this jobs engine*/, const auto &jobs_items, auto b /*extra param b*/) { +// for (auto &item : jobs_items) { +// ... +// } // }, 5 /*param b*/); // -// // use default config and default function for job2 -// jobs.add_jobs_type(JobsType::kJobsType2); -// // JobsEng::JobsID jobs_id{}; // std::vector jobs_ids; // // // push -// jobs.push_back(small::EnumPriorities::kNormal, JobsType::kJobsType1, {1, "normal"}, &jobs_id); -// jobs.push_back(small::EnumPriorities::kHigh, {.type = JobsType::kJobsType1, .request = {4, "high"}}, &jobs_id); -// -// std::vector jobs_items = {{.type = JobsType::kJobsType1, .request = {7, "highest"}}}; -// jobs.push_back(small::EnumPriorities::kHighest, jobs_items, &jobs_ids); -// -// jobs.push_back_delay_for(std::chrono::milliseconds(300), small::EnumPriorities::kNormal, JobsType::kJobsType1, {100, "delay normal"}, &jobs_id); +// jobs.queue().push_back(small::EnumPriorities::kNormal, JobsType::kJobsType1, {1, "normal"}, &jobs_id); +// jobs.queue().push_back_delay_for(std::chrono::milliseconds(300), small::EnumPriorities::kNormal, JobsType::kJobsType1, {100, "delay normal"}, &jobs_id); // // jobs.start_threads(3); // manual start threads // // // jobs.signal_exit_force(); -// auto ret = jobs.wait_for(std::chrono::milliseconds(100)); // wait to finished -// ... // jobs.wait(); // wait here for jobs to finish due to exit flag // +// for a more complex example see examples/examples_jobs_engine.h namespace small { - // config the entire jobs engine - template - struct config_jobs_engine - { - int threads_count{8}; // how many total threads for processing - small::config_prio_queue config_prio{}; - }; - - // config an individual job type - template - struct config_jobs_type - { - JobsGroupT group{}; // job type group (multiple job types can be configured to same group) - }; - - // config the job group (where job types can be grouped) - struct config_jobs_group - { - int threads_count{1}; // how many threads for processing (out of the global threads) - int bulk_count{1}; // how many objects are processed at once - }; - - // a job can be in the following states - enum class EnumJobsState : unsigned int - { - kNone = 0, - kInProgress, - kFinished, - kFailed, - kCancelled, - kTimeout - }; - // // small class for jobs where job items have a type (which are further grouped by group), priority, request and response // @@ -103,58 +69,29 @@ namespace small { class jobs_engine { public: - using JobsID = unsigned long long; - using JobsQueue = small::prio_queue; - using JobDelayedItems = std::tuple; - using ThisJobsEngine = small::jobs_engine; - using JobQueueDelayedT = small::time_queue_thread; - using TimeClock = typename small::time_queue::TimeClock; - using TimeDuration = typename small::time_queue::TimeDuration; - - // a job item - struct JobsItem - { - JobsID id{}; // job unique id - JobsTypeT type{}; // job type - EnumJobsState state{}; // job state - int progress{}; // progress 0-100 for state kInProgress - JobsRequestT request{}; // request needed for processing function - JobsResponseT response{}; // where the results are saved (for the finished callback if exists) - }; - using ProcessingFunction = std::function &)>; - - struct JobsTypeConfig - { - config_jobs_type m_config{}; // config for this job type (to which group it belongs) - ProcessingFunction m_processing_function{}; // processing Function - }; + using ThisJobsEngine = small::jobs_engine; + using JobsConfig = small::jobs_config; + using JobsItem = small::jobsimpl::jobs_item; + using JobsQueue = small::jobsimpl::jobs_queue; + using JobsID = typename JobsItem::JobsID; + using TimeClock = typename JobsQueue::TimeClock; + using TimeDuration = typename JobsQueue::TimeDuration; + using ProcessingFunction = typename JobsConfig::ProcessingFunction; public: // // jobs_engine // - explicit jobs_engine(const config_jobs_engine &config_engine = {}) - : m_config{ - .m_engine{config_engine}} + explicit jobs_engine(const JobsConfig &config = {}) + : m_config{config} { - // auto start threads if count > 0 otherwise threads should be manually started - if (m_config.threads_count) { - start_threads(m_config.threads_count); - } + apply_config(); } - template - jobs_engine(const config_jobs_engine config_engine, const config_jobs_group config_default_group, const config_jobs_type &config_default_jobs_type, _Callable processing_function, Args... extra_parameters) - : m_config{ - .m_engine{config_engine}, - .m_default_group{config_default_group}, - .m_default_jobs_type{config_default_jobs_type}, - .m_default_processing_function{std::bind(std::forward<_Callable>(processing_function), std::ref(*this), std::placeholders::_1 /*jobs_item*/, std::forward(extra_parameters)...)}} - + explicit jobs_engine(JobsConfig &&config) + : m_config{config} { - if (m_config.m_engine.threads_count) { - start_threads(m_config.m_engine.threads_count); - } + apply_config(); } ~jobs_engine() @@ -163,25 +100,15 @@ namespace small { } // size of active items - inline size_t size() - { - std::unique_lock l(m_lock); - return m_jobs.size(); - } + inline size_t size() { return m_queue.size(); } // empty inline bool empty() { return size() == 0; } // clear inline void clear() { - std::unique_lock l(m_lock); - m_jobs.clear(); - - m_delayed_items.clear(); + std::unique_lock l(m_queue); + m_queue.clear(); m_thread_pool.clear(); - - for (auto &[group, q] : m_groups_queues) { - q.clear(); - } } // clang-format off @@ -193,18 +120,18 @@ namespace small { inline void clear_processing() { m_thread_pool.clear(); } // size of delayed items - inline size_t size_delayed() { return m_delayed_items.queue().size(); } + inline size_t size_delayed() { return queue().size_delayed(); } // empty inline bool empty_delayed() { return size_delayed() == 0; } // clear - inline void clear_delayed() { m_delayed_items.queue().clear(); } + inline void clear_delayed() { queue().clear_delayed(); } // clang-format on // clang-format off // use it as locker (std::unique_lock> m...) - inline void lock () { m_lock.lock(); } - inline void unlock () { m_lock.unlock(); } - inline bool try_lock () { return m_lock.try_lock(); } + inline void lock () { queue().lock(); } + inline void unlock () { queue().unlock(); } + inline bool try_lock () { return queue().try_lock(); } // clang-format on // @@ -212,289 +139,54 @@ namespace small { // inline void start_threads(const int threads_count /* = 1 */) { - m_config.m_engine.threads_count = threads_count; - m_delayed_items.start_threads(); + m_config.m_engine.m_threads_count = threads_count; + m_queue.start_threads(threads_count); m_thread_pool.start_threads(threads_count); } // - // config processing by job type + // config // THIS SHOULD BE DONE IN THE INITIAL SETUP PHASE ONCE // - // - // set default job group, job type config and processing function - // - template - inline void add_default_config(const config_jobs_group &config_default_group, const config_jobs_type &config_default_jobs_type, _Callable processing_function, Args... extra_parameters) + inline void set_config(const JobsConfig &config) { - m_config.m_default_group = config_default_group; - m_config.m_default_jobs_type = config_default_jobs_type; - m_config.m_default_processing_function = std::bind(std::forward<_Callable>(processing_function), std::ref(*this), std::placeholders::_1 /*jobs_item*/, std::forward(extra_parameters)...); + m_config = config; + apply_config(); } - // - // config groups - // - inline void add_jobs_group(const JobsGroupT &job_group) + inline void set_config(JobsConfig &&config) { - add_jobs_group(job_group, m_config.m_default_group); - } - - inline void add_jobs_group(const JobsGroupT &job_group, const config_jobs_group &config_group) - { - m_config.m_groups[job_group] = config_group; - m_groups_queues[job_group] = JobsQueue{m_config.m_engine.config_prio}; - m_thread_pool.add_job_group(job_group, config_group.threads_count); - } - - inline void add_jobs_groups(const std::vector> &job_group_configs) - { - for (auto &[job_group, config_group] : job_group_configs) { - add_jobs_group(job_group, config_group); - } - } - - // - // config job typpes - // m_job_queues will be initialized in the initial setup phase and will be accessed without locking afterwards - // - bool add_jobs_type(const JobsTypeT &jobs_type) - { - return add_jobs_type(jobs_type, {.m_config = m_config.m_default_jobs_type, - .m_processing_function = m_config.m_default_processing_function}); - } - - inline bool add_jobs_type(const JobsTypeT &jobs_type, const config_jobs_type &config) - { - return add_jobs_type(jobs_type, {.m_config = config, - .m_processing_function = m_config.m_default_processing_function}); - } - inline bool add_jobs_type(const JobsTypeT &jobs_type, config_jobs_type &&config) - { - return add_jobs_type(jobs_type, {.m_config = std::forward>(config), - .m_processing_function = m_config.m_default_processing_function}); + m_config = std::move(config); + apply_config(); } + // processing function template - inline bool add_jobs_type(const JobsTypeT jobs_type, const config_jobs_type &config, _Callable processing_function, Args... extra_parameters) + inline void add_default_processing_function(_Callable processing_function, Args... extra_parameters) { - return add_jobs_type(jobs_type, {.m_config = config, - .m_processing_function = std::bind(std::forward<_Callable>(processing_function), std::ref(*this), std::placeholders::_1 /*jobs_items*/, std::forward(extra_parameters)...)}); + m_config.add_default_processing_function(std::bind(std::forward<_Callable>(processing_function), std::ref(*this), std::placeholders::_1 /*jobs_items*/, std::forward(extra_parameters)...)); } template - inline bool add_jobs_type(const JobsTypeT jobs_type, config_jobs_type &&config, _Callable processing_function, Args... extra_parameters) - { - return add_jobs_type(jobs_type, {.m_config = std::forward>(config), - .m_processing_function = std::bind(std::forward<_Callable>(processing_function), std::ref(*this), std::placeholders::_1 /*jobs_items*/, std::forward(extra_parameters)...)}); - } - - inline bool add_jobs_type(const JobsTypeT &jobs_type, JobsTypeConfig &&jobs_config) - { - auto job_group = jobs_config.m_config.group; - auto it_g = m_config.m_groups.find(job_group); - if (it_g == m_config.m_groups.end()) { - return false; - } - - m_config.m_types[jobs_type] = std::forward(jobs_config); - m_types_queues[jobs_type] = &m_groups_queues[job_group]; - return true; - } - - // - // get - // - inline JobsItem *jobs_get(const JobsID &jobs_id) - { - std::unique_lock l(m_lock); - - auto it_j = m_jobs.find(jobs_id); - return it_j != m_jobs.end() ? &it_j->second : nullptr; - } - - // TODO add function for progress and state - - // - // add items to be processed - // push_back - // - inline std::size_t push_back(const JobsPrioT &priority, const JobsTypeT &jobs_type, const JobsRequestT &job_req, JobsID *jobs_id = nullptr) - { - return push_back(priority, {.type = jobs_type, .request = job_req}, jobs_id); - } - - inline std::size_t push_back(const JobsPrioT &priority, const JobsItem &jobs_item, JobsID *jobs_id = nullptr) - { - if (is_exit()) { - return 0; - } - - auto id = jobs_add(jobs_item); - if (jobs_id) { - *jobs_id = id; - } - - return jobs_activate(priority, jobs_item.type, id); - } - - inline std::size_t push_back(const JobsPrioT &priority, const std::vector &jobs_items, std::vector *jobs_ids) - { - if (is_exit()) { - return 0; - } - - std::unique_lock l(m_lock); - - std::size_t count = 0; - if (jobs_ids) { - jobs_ids->reserve(jobs_items.size()); - jobs_ids->clear(); - } - JobsID jobs_id{}; - for (auto &jobs_item : jobs_items) { - auto ret = push_back(priority, jobs_item, &jobs_id); - if (ret) { - if (jobs_ids) { - jobs_ids->push_back(jobs_id); - } - } - count += ret; - } - return count; - } - - // push_back move semantics - inline std::size_t push_back(const JobsPrioT &priority, const JobsTypeT &jobs_type, JobsRequestT &&jobs_req, JobsID *jobs_id = nullptr) - { - return push_back(priority, {.type = jobs_type, .request = std::forward(jobs_req)}, jobs_id); - } - - inline std::size_t push_back(const JobsPrioT &priority, JobsItem &&jobs_item, JobsID *jobs_id = nullptr) - { - if (is_exit()) { - return 0; - } - auto jobs_type = jobs_item.type; // save the type because the object will be moved - auto id = jobs_add(std::forward(jobs_item)); - if (jobs_id) { - *jobs_id = id; - } - - return jobs_activate(priority, jobs_type, id); - } - - inline std::size_t push_back(const JobsPrioT &priority, std::vector &&jobs_items, std::vector *jobs_ids) + inline void add_job_processing_function(const JobsTypeT &jobs_type, _Callable processing_function, Args... extra_parameters) { - if (is_exit()) { - return 0; - } - - std::unique_lock l(m_lock); - - std::size_t count = 0; - if (jobs_ids) { - jobs_ids->reserve(jobs_items.size()); - jobs_ids->clear(); - } - JobsID jobs_id{}; - for (auto &&jobs_item : jobs_items) { - auto ret = push_back(priority, std::forward(jobs_item), &jobs_id); - if (ret) { - if (jobs_ids) { - jobs_ids->push_back(jobs_id); - } - } - count += ret; - } - return count; + m_config.add_job_processing_function(jobs_type, std::bind(std::forward<_Callable>(processing_function), std::ref(*this), std::placeholders::_1 /*jobs_items*/, std::forward(extra_parameters)...)); } - // no emplace_back do to returning the jobs_id - // - // push_back with specific timeings + // queue access // - template - inline std::size_t push_back_delay_for(const std::chrono::duration<_Rep, _Period> &__rtime, const JobsPrioT &priority, const JobsTypeT &jobs_type, const JobsRequestT &jobs_req, JobsID *jobs_id = nullptr) - { - return push_back_delay_for(__rtime, priority, {.type = jobs_type, .request = jobs_req}, jobs_id); - } - - template - inline std::size_t push_back_delay_for(const std::chrono::duration<_Rep, _Period> &__rtime, const JobsPrioT &priority, const JobsItem &jobs_item, JobsID *jobs_id = nullptr) - { - auto id = jobs_add(jobs_item); - if (jobs_id) { - *jobs_id = id; - } - return m_delayed_items.queue().push_delay_for(__rtime, {priority, jobs_item.type, id}); - } - - template - inline std::size_t push_back_delay_for(const std::chrono::duration<_Rep, _Period> &__rtime, const JobsPrioT &priority, const JobsTypeT &jobs_type, JobsRequestT &&jobs_req, JobsID *jobs_id = nullptr) - { - return push_back_delay_for(__rtime, priority, {.type = jobs_type, .request = std::forward(jobs_req)}, jobs_id); - } - - template - inline std::size_t push_back_delay_for(const std::chrono::duration<_Rep, _Period> &__rtime, const JobsPrioT &priority, JobsItem &&jobs_item, JobsID *jobs_id = nullptr) - { - auto jobs_type = jobs_item.type; // save the type because the object will be moved - auto id = jobs_add(std::forward(jobs_item)); - if (jobs_id) { - *jobs_id = id; - } - return m_delayed_items.queue().push_delay_for(__rtime, {priority, jobs_type, id}); - } - - // avoid time_casting from one clock to another // template // - inline std::size_t push_back_delay_until(const std::chrono::time_point &__atime, const JobsPrioT &priority, const JobsTypeT &jobs_type, const JobsRequestT &jobs_req, JobsID *jobs_id = nullptr) - { - return push_back_delay_until(__atime, priority, {.type = jobs_type, .request = jobs_req}, jobs_id); - } - - inline std::size_t push_back_delay_until(const std::chrono::time_point &__atime, const JobsPrioT &priority, const JobsItem &jobs_item, JobsID *jobs_id = nullptr) - { - auto id = jobs_add(jobs_item); - if (jobs_id) { - *jobs_id = id; - } - return m_delayed_items.queue().push_delay_until(__atime, {priority, jobs_item.type, id}); - } - - inline std::size_t push_back_delay_until(const std::chrono::time_point &__atime, const JobsPrioT &priority, const JobsTypeT &jobs_type, JobsRequestT &&jobs_req, JobsID *jobs_id = nullptr) - { - return push_back_delay_until(__atime, priority, {.type = jobs_type, .request = std::forward(jobs_req)}, jobs_id); - } - - inline std::size_t push_back_delay_until(const std::chrono::time_point &__atime, const JobsPrioT &priority, JobsItem &&jobs_item, JobsID *jobs_id = nullptr) - { - auto jobs_type = jobs_item.type; // save the type because the object will be moved - auto id = jobs_add(std::forward(jobs_item)); - if (jobs_id) { - *jobs_id = id; - } - return m_delayed_items.queue().push_delay_until(__atime, {priority, jobs_type, id}); - } + inline JobsQueue &queue() { return m_queue; } // clang-format off // // signal exit // - inline void signal_exit_force () { - m_thread_pool.signal_exit_force(); - m_delayed_items.queue().signal_exit_force(); - - m_lock.signal_exit_force(); - for (auto &[group, q] : m_groups_queues) { - q.signal_exit_force(); - } - } - inline void signal_exit_when_done () { m_delayed_items.queue().signal_exit_when_done(); /*when the delayed will be finished will signal the active queue items to exit when done, then the processing pool */ } + inline void signal_exit_force () { m_thread_pool.signal_exit_force(); m_queue.signal_exit_force(); } + inline void signal_exit_when_done () { m_queue.signal_exit_when_done(); /*when the delayed will be finished will signal the active queue items to exit when done, then the processing pool */ } // to be used in processing function - inline bool is_exit () { return m_delayed_items.queue().is_exit_force(); } + inline bool is_exit () { return m_queue.is_exit_force(); } // clang-format on // @@ -504,16 +196,8 @@ namespace small { { signal_exit_when_done(); - // first wait for delayed items to finish - m_delayed_items.wait(); - - // only now can signal exit when done for all queues (when no more delayed items can be pushed) - for (auto &[group, q] : m_groups_queues) { - q.signal_exit_when_done(); - } - for (auto &[group, q] : m_groups_queues) { - q.wait(); - } + // first wait for queue items to finish + m_queue.wait(); // only now can signal exit when done for workers (when no more items exists) return m_thread_pool.wait(); @@ -538,23 +222,11 @@ namespace small { signal_exit_when_done(); // first wait for delayed items to finish - auto delayed_status = m_delayed_items.wait_until(__atime); + auto delayed_status = m_queue.wait_until(__atime); if (delayed_status == small::EnumLock::kTimeout) { return small::EnumLock::kTimeout; } - // only now can signal exit when done for all queues (when no more delayed items can be pushed) - for (auto &[group, q] : m_groups_queues) { - q.signal_exit_when_done(); - } - - for (auto &[group, q] : m_groups_queues) { - auto status = q.wait_until(__atime); - if (status == small::EnumLock::kTimeout) { - return small::EnumLock::kTimeout; - } - } - // only now can signal exit when done for workers (when no more items exists) return m_thread_pool.wait_until(__atime); } @@ -568,10 +240,32 @@ namespace small { private: // - // inner thread function for executing items (should return if there are more items) + // apply config // + inline void apply_config() + { + // setup jobs groups + for (auto &[jobs_group, jobs_group_config] : m_config.m_groups) { + m_queue.add_jobs_group(jobs_group, m_config.m_engine.m_config_prio); + m_thread_pool.add_job_group(jobs_group, jobs_group_config.m_threads_count); + } - friend small::jobs_engine_thread_pool; + // setup jobs types + m_config.apply_default_processing_function(); + for (auto &[jobs_type, jobs_type_config] : m_config.m_types) { + m_queue.add_jobs_type(jobs_type, jobs_type_config.m_group); + } + + // auto start threads if count > 0 otherwise threads should be manually started + if (m_config.m_engine.m_threads_count) { + start_threads(m_config.m_engine.m_threads_count); + } + } + + // + // inner thread function for executing items (should return if there are more items) + // + friend small::jobsimpl::jobs_engine_thread_pool; inline EnumLock do_action(const JobsGroupT &jobs_group, bool *has_items) { @@ -583,144 +277,67 @@ namespace small { return small::EnumLock::kExit; } - int bulk_count = std::max(it_cfg_grp->second.bulk_count, 1); + int bulk_count = std::max(it_cfg_grp->second.m_bulk_count, 1); // get items to process - auto it_q = m_groups_queues.find(jobs_group); - if (it_q == m_groups_queues.end()) { + auto *q = m_queue.get_group_queue(jobs_group); + if (!q) { return small::EnumLock::kExit; } - auto &q = it_q->second; std::vector vec_ids; - auto ret = q.wait_pop_front_for(std::chrono::nanoseconds(0), vec_ids, bulk_count); - if (ret == small::EnumLock::kElement) { - *has_items = true; - - // split by type - std::unordered_map> elems_by_type; - for (auto &jobs_id : vec_ids) { - auto *jobs_item = jobs_get(jobs_id); - if (!jobs_item) { - continue; - } - elems_by_type[jobs_item->type].reserve(vec_ids.size()); - elems_by_type[jobs_item->type].push_back(jobs_item); - } + auto ret = q->wait_pop_front_for(std::chrono::nanoseconds(0), vec_ids, bulk_count); + if (ret != small::EnumLock::kElement) { + return ret; + } - // process specific job by type - for (auto &[jobs_type, jobs_items] : elems_by_type) { - auto it_cfg_type = m_config.m_types.find(jobs_type); - if (it_cfg_type == m_config.m_types.end()) { - continue; - } + *has_items = true; - // process specific jobs by type - it_cfg_type->second.m_processing_function(std::move(jobs_items)); - } - - for (auto &jobs_id : vec_ids) { - jobs_del(jobs_id); + // split by type + std::unordered_map>> elems_by_type; + { + // get jobs + std::vector> jobs_items = m_queue.jobs_get(vec_ids); + for (auto &jobs_item : jobs_items) { + elems_by_type[jobs_item->type].reserve(jobs_items.size()); + elems_by_type[jobs_item->type].push_back(jobs_item); } } - return ret; - } - - // - // add job items - // - inline JobsID - jobs_add(const JobsItem &jobs_item) - { - std::unique_lock l(m_lock); - - JobsID id = ++m_jobs_seq_id; - m_jobs[id] = jobs_item; - m_jobs[id].id = id; - - return id; - } - - inline JobsID jobs_add(JobsItem &&jobs_item) - { - std::unique_lock l(m_lock); - - JobsID id = ++m_jobs_seq_id; - jobs_item.id = id; - m_jobs.emplace(id, std::forward(jobs_item)); - - return id; - } - - inline void jobs_del(const JobsID &jobs_id) - { - std::unique_lock l(m_lock); - m_jobs.erase(jobs_id); - } + // process specific job by type + for (auto &[jobs_type, jobs] : elems_by_type) { + auto it_cfg_type = m_config.m_types.find(jobs_type); + if (it_cfg_type == m_config.m_types.end()) { + continue; + } - // activate the jobs - inline std::size_t jobs_activate(const JobsPrioT &priority, const JobsTypeT &jobs_type, const JobsID &jobs_id) - { - std::size_t ret = 0; - - // optimization to get the queue from the type - // (instead of getting the group from type from m_config.m_types and then getting the queue from the m_groups_queues) - auto it_q = m_types_queues.find(jobs_type); - if (it_q != m_types_queues.end()) { - auto *q = it_q->second; - ret = q->push_back(priority, jobs_id); + // process specific jobs by type + it_cfg_type->second.m_processing_function(jobs); } - if (ret) { - m_thread_pool.job_start(m_config.m_types[jobs_type].m_config.group); - } else { - jobs_del(jobs_id); + for (auto &jobs_id : vec_ids) { + m_queue.jobs_del(jobs_id); } + return ret; } // - // inner thread function for delayed items + // inner function for activate the jobs from queue // - friend JobQueueDelayedT; + friend JobsQueue; - inline std::size_t push_back(std::vector &&items) + inline void jobs_activate(const JobsTypeT &jobs_type, const JobsID & /* jobs_id */) { - std::size_t count = 0; - for (auto &[priority, jobs_type, jobs_id] : items) { - count += jobs_activate(priority, jobs_type, jobs_id); - } - return count; + m_thread_pool.job_start(m_config.m_types[jobs_type].m_group); } private: // // members // - - // configs - struct JobEngineConfig - { - config_jobs_engine m_engine; // config for entire engine (threads, priorities, etc) - config_jobs_group m_default_group{}; // default config for group - config_jobs_type m_default_jobs_type{}; // default config for jobs type - ProcessingFunction m_default_processing_function{}; // default processing function - std::unordered_map m_groups; // config by jobs group - std::unordered_map m_types; // config by jobs type - // TODO add default processing children and default finish function - }; - - JobEngineConfig m_config; // configs for all: engine, groups, job types - - mutable small::base_lock m_lock; // global locker - std::atomic m_jobs_seq_id{}; // to get the next jobs id - std::unordered_map m_jobs; // current jobs - std::unordered_map m_groups_queues; // map of queues by group - std::unordered_map m_types_queues; // optimize to have queues by type (which reference queues by group) - - JobQueueDelayedT m_delayed_items{*this}; // queue of delayed items - - small::jobs_engine_thread_pool m_thread_pool{*this}; // scheduler for processing items (by group) using a pool of threads + JobsConfig m_config; + JobsQueue m_queue{*this}; + small::jobsimpl::jobs_engine_thread_pool m_thread_pool{*this}; // for processing items (by group) using a pool of threads }; } // namespace small