From e6a342de9e3e661ea74f99c3ba9909a6b5aab711 Mon Sep 17 00:00:00 2001 From: Cristian Herghelegiu Date: Sun, 12 Jan 2025 01:13:45 +0200 Subject: [PATCH] Add multiple features for jobs engine like parent child dependencies, throttling with sleep between requests, timeout for processing --- include/impl/jobs_engine_thread_pool_impl.h | 2 +- include/impl/jobs_item_impl.h | 11 +- include/impl/jobs_queue_impl.h | 179 +++++++++++------- include/jobs_engine.h | 200 ++++++++++++++++---- 4 files changed, 283 insertions(+), 109 deletions(-) diff --git a/include/impl/jobs_engine_thread_pool_impl.h b/include/impl/jobs_engine_thread_pool_impl.h index 60972d1..cd46a5f 100644 --- a/include/impl/jobs_engine_thread_pool_impl.h +++ b/include/impl/jobs_engine_thread_pool_impl.h @@ -63,7 +63,7 @@ namespace small::jobsimpl { // when items are added to be processed in parent class the start scheduler should be called // to trigger action (if needed for the new job group) // - inline void jobs_start(const JobGroupT &job_group) + inline void jobs_schedule(const JobGroupT &job_group) { auto it = m_scheduler.find(job_group); // map is not changed, so can be access without locking if (it == m_scheduler.end()) { diff --git a/include/impl/jobs_item_impl.h b/include/impl/jobs_item_impl.h index 98eaa89..7d83312 100644 --- a/include/impl/jobs_item_impl.h +++ b/include/impl/jobs_item_impl.h @@ -100,9 +100,14 @@ namespace small::jobsimpl { inline void set_state_failed () { set_state(EnumJobsState::kFailed); } inline void set_state_cancelled () { set_state(EnumJobsState::kCancelled); } - inline bool is_state_inprogress () { return m_state.load() == EnumJobsState::kInProgress; } - inline bool is_state_finished () { return m_state.load() == EnumJobsState::kFinished; } - inline bool is_state_timeout () { return m_state.load() == EnumJobsState::kTimeout; } + inline bool is_state (const EnumJobsState &state) { return m_state.load() == state; } + + inline bool is_state_inprogress () { return is_state(EnumJobsState::kInProgress); } + inline void is_state_waitchildren () { return is_state(EnumJobsState::kWaitChildren); } + inline bool is_state_finished () { return is_state(EnumJobsState::kFinished); } + inline bool is_state_timeout () { return is_state(EnumJobsState::kTimeout); } + inline void is_state_failed () { return is_state(EnumJobsState::kFailed); } + inline void is_state_cancelled () { return is_state(EnumJobsState::kCancelled); } // clang-format on // diff --git a/include/impl/jobs_queue_impl.h b/include/impl/jobs_queue_impl.h index b59889f..ec5ca74 100644 --- a/include/impl/jobs_queue_impl.h +++ b/include/impl/jobs_queue_impl.h @@ -9,7 +9,7 @@ namespace small::jobsimpl { // - // small queue helper class for jobs (parent caller must implement 'jobs_start', 'jobs_finished') + // small queue helper class for jobs (parent caller must implement 'jobs_schedule', 'jobs_finished') // template class jobs_queue @@ -27,7 +27,9 @@ namespace small::jobsimpl { using TimeClock = typename small::time_queue::TimeClock; using TimeDuration = typename small::time_queue::TimeDuration; - public: + private: + friend ParentCallerT; + // // jobs_queue // @@ -106,6 +108,10 @@ namespace small::jobsimpl { return true; } + // + // only this part is public + // + public: // // add items to be processed // push_back only add the jobs item but does not start it @@ -354,63 +360,6 @@ namespace small::jobsimpl { return push_back_and_start_child(parent_jobs_id, child_priority, std::make_shared(child_jobs_type, std::forward(child_jobs_req)), child_jobs_id); } - // - // set relationship parent-child - // - inline std::size_t jobs_parent_child(const JobsID &parent_jobs_id, const JobsID &child_jobs_id) - { - std::unique_lock l(m_lock); - - auto *parent_jobs_item = jobs_get(parent_jobs_id); - if (!parent_jobs_item) { - return 0; - } - auto *child_jobs_item = jobs_get(child_jobs_id); - if (!child_jobs_item) { - return 0; - } - - jobs_parent_child(*parent_jobs_item, *child_jobs_item); - return 1; - } - - // - // start the jobs - // - inline std::size_t jobs_start(const JobsPrioT &priority, const std::vector &jobs_ids) - { - std::size_t count = 0; - auto jobs_items = jobs_get(jobs_ids); - for (auto &jobs_item : jobs_items) { - auto ret = jobs_start(priority, jobs_item->m_type, jobs_item->m_id); - if (ret) { - ++count; - } - } - return count; - } - - inline std::size_t jobs_start(const JobsPrioT &priority, const JobsTypeT &jobs_type, const JobsID &jobs_id) - { - std::size_t ret = 0; - - // optimization to get the queue from the type - // (instead of getting the group from type from m_config.m_types and then getting the queue from the m_groups_queues) - auto it_q = m_types_queues.find(jobs_type); - if (it_q != m_types_queues.end()) { - auto *q = it_q->second; - ret = q->push_back(priority, jobs_id); - } - - if (ret) { - m_parent_caller.jobs_start(jobs_type, jobs_id); - } else { - // TODO maybe call m_parent.jobs_cancel(jobs_id)? - jobs_del(jobs_id); - } - return ret; - } - // no emplace_back do to returning the jobs_id // @@ -425,6 +374,10 @@ namespace small::jobsimpl { template inline std::size_t push_back_and_start_delay_for(const std::chrono::duration<_Rep, _Period> &__rtime, const JobsPrioT &priority, std::shared_ptr jobs_item, JobsID *jobs_id = nullptr) { + if (is_exit()) { + return 0; + } + auto id = jobs_add(jobs_item); if (jobs_id) { *jobs_id = id; @@ -446,6 +399,10 @@ namespace small::jobsimpl { inline std::size_t push_back_and_start_delay_until(const std::chrono::time_point &__atime, const JobsPrioT &priority, std::shared_ptr jobs_item, JobsID *jobs_id = nullptr) { + if (is_exit()) { + return 0; + } + auto id = jobs_add(jobs_item); if (jobs_id) { *jobs_id = id; @@ -460,6 +417,7 @@ namespace small::jobsimpl { // TODO add push_back_child_....() + private: // clang-format off // // signal exit @@ -539,21 +497,28 @@ namespace small::jobsimpl { } private: - friend ParentCallerT; - // - // get group queue - // called from parent jobs engine + // get jobs group queue // - inline JobsQueue *get_group_queue(const JobsGroupT &jobs_group) + inline JobsQueue *get_jobs_group_queue(const JobsGroupT &jobs_group) { auto it = m_groups_queues.find(jobs_group); return it != m_groups_queues.end() ? &it->second : nullptr; } + // + // get jobs type queue + // + inline JobsQueue *get_jobs_type_queue(const JobsTypeT &jobs_type) + { + // optimization to get the queue from the type + // (instead of getting the group from type from m_config.m_types and then getting the queue from the m_groups_queues) + auto it = m_types_queues.find(jobs_type); + return it != m_types_queues.end() ? it->second : nullptr; + } + // // get job items - // called from parent jobs engine // inline std::vector> jobs_get(const std::vector &jobs_ids) { @@ -573,7 +538,6 @@ namespace small::jobsimpl { return jobs_items; // will be moved } - // internal jobs_get inline std::shared_ptr *jobs_get(const JobsID &jobs_id) { std::unique_lock l(m_lock); @@ -599,18 +563,72 @@ namespace small::jobsimpl { m_jobs.emplace(id, jobs_item); // add it to the timeout queue - auto it_timeout = m_types_timeouts.find(jobs_item->m_type); - if (it_timeout != m_types_timeouts.end()) { - m_timeout_queue.queue().push_delay_for(it_timeout->second, id); + jobs_start_timeout(jobs_item); + return id; + } + + // + // start the jobs + // + inline std::size_t jobs_start(const JobsPrioT &priority, const std::vector &jobs_ids) + { + std::size_t count = 0; + auto jobs_items = jobs_get(jobs_ids); + for (auto &jobs_item : jobs_items) { + auto ret = jobs_start(priority, jobs_item->m_type, jobs_item->m_id); + if (ret) { + ++count; + } } + return count; + } - return id; + inline std::size_t jobs_start(const JobsPrioT &priority, const JobsID &jobs_id) + { + auto *jobs_item = jobs_get(jobs_id); + if (!jobs_item) { + return 0; + } + return jobs_start(priority, (*jobs_item)->m_type, (*jobs_item)->m_id); + } + + inline std::size_t jobs_start(const JobsPrioT &priority, const JobsTypeT &jobs_type, const JobsID &jobs_id) + { + std::size_t ret = 0; + + auto *q = get_jobs_type_queue(jobs_type); + if (q) { + ret = q->push_back(priority, jobs_id); + } + + if (ret) { + m_parent_caller.jobs_schedule(jobs_type, jobs_id); + } else { + // TODO maybe call m_parent.jobs_failed(jobs_id)? + jobs_erase(jobs_id); + } + return ret; + } + + // + // add it to the timeout queue + // + inline std::size_t jobs_start_timeout(std::shared_ptr jobs_item) + { + std::unique_lock l(m_lock); + + // only if job type has config a timeout + auto it_timeout = m_types_timeouts.find(jobs_item->m_type); + if (it_timeout == m_types_timeouts.end()) { + return 0; + } + return m_timeout_queue.queue().push_delay_for(it_timeout->second, jobs_item->m_id); } // - // delete jobs item + // erase jobs item // - inline void jobs_del(const JobsID &jobs_id) + inline void jobs_erase(const JobsID &jobs_id) { std::unique_lock l(m_lock); m_jobs.erase(jobs_id); @@ -619,6 +637,23 @@ namespace small::jobsimpl { // // set relationship parent-child // + inline std::size_t jobs_parent_child(const JobsID &parent_jobs_id, const JobsID &child_jobs_id) + { + std::unique_lock l(m_lock); + + auto *parent_jobs_item = jobs_get(parent_jobs_id); + if (!parent_jobs_item) { + return 0; + } + auto *child_jobs_item = jobs_get(child_jobs_id); + if (!child_jobs_item) { + return 0; + } + + jobs_parent_child(*parent_jobs_item, *child_jobs_item); + return 1; + } + inline void jobs_parent_child(std::shared_ptr parent_jobs_item, std::shared_ptr child_jobs_item) { std::unique_lock l(m_lock); diff --git a/include/jobs_engine.h b/include/jobs_engine.h index d66e2a1..7517c87 100644 --- a/include/jobs_engine.h +++ b/include/jobs_engine.h @@ -201,10 +201,88 @@ namespace small { } // - // queue access + // jobs functions + // + + // + // add items to jobs queue // inline JobsQueue &queue() { return m_queue; } + // + // start schedule jobs items + // + inline std::size_t jobs_start(const JobsPrioT &priority, const JobsID &jobs_id) + { + return queue().jobs_start(priority, jobs_id); + } + + inline std::size_t jobs_start(const JobsPrioT &priority, const std::vector &jobs_ids) + { + return queue().jobs_start(priority, jobs_ids); + } + + // + // get job items + // + inline std::shared_ptr *jobs_get(const JobsID &jobs_id) + { + return queue().jobs_get(jobs_id); + } + + inline std::vector> jobs_get(const std::vector &jobs_ids) + { + return queue().jobs_get(jobs_ids); + } + + // + // set relationship parent-child + // + inline std::size_t jobs_parent_child(const JobsID &parent_jobs_id, const JobsID &child_jobs_id) + { + return queue().jobs_parent_child(parent_jobs_id, child_jobs_id); + } + + inline void jobs_parent_child(std::shared_ptr parent_jobs_item, std::shared_ptr child_jobs_item) + { + return queue().jobs_parent_child(parent_jobs_item, child_jobs_item); + } + + // + // set jobs state + // + inline void jobs_progress(const JobsID &jobs_id, const int &progress) + { + jobs_set_progress(jobs_id, progress); + } + + inline void jobs_finished(const JobsID &jobs_id) + { + jobs_set_state(jobs_id, small::jobsimpl::EnumJobsState::kFinished); + } + inline void jobs_finished(const std::vector &jobs_ids) + { + jobs_set_state(jobs_ids, small::jobsimpl::EnumJobsState::kFinished); + } + + inline void jobs_failed(const JobsID &jobs_id) + { + jobs_set_state(jobs_id, small::jobsimpl::EnumJobsState::kFailed); + } + inline void jobs_failed(const std::vector &jobs_ids) + { + jobs_set_state(jobs_ids, small::jobsimpl::EnumJobsState::kFailed); + } + + inline void jobs_cancelled(const JobsID &jobs_id) + { + jobs_set_state(jobs_id, small::jobsimpl::EnumJobsState::kCancelled); + } + inline void jobs_cancelled(const std::vector &jobs_ids) + { + jobs_set_state(jobs_ids, small::jobsimpl::EnumJobsState::kCancelled); + } + // clang-format off // // signal exit @@ -297,7 +375,7 @@ namespace small { } // - // inner thread function for executing items (should return if there are more items) + // inner thread function for executing items (should return if there are more items) (called from thread_pool) // friend small::jobsimpl::jobs_engine_thread_pool; @@ -318,7 +396,7 @@ namespace small { group_config.m_delay_next_request = it_cfg_grp->second.m_delay_next_request; // get items to process - auto *q = m_queue.get_group_queue(jobs_group); + auto *q = m_queue.get_jobs_group_queue(jobs_group); if (!q) { return small::EnumLock::kExit; } @@ -368,15 +446,9 @@ namespace small { } } - // TODO marks the items as either wait for children (if it has children) or finished // mark the item as in wait for children of finished // if in callback the state is set to failed, cancelled or timeout setting to finish wont succeed because if less value than those - // jobs_item->set_state(small::EnumJobsState::kInProgress); - } - - // TODO move this delete on the finished thread - for (auto &jobs_id : vec_ids) { - m_queue.jobs_del(jobs_id); + jobs_waitforchildren(jobs); } // TODO group_config.m_delay_next_request @@ -386,55 +458,117 @@ namespace small { return ret; } - // TODO external set state for a job moves it to proper wait for children or finished - // TODO add functions jobs_cancel, jobs_finish(response), jobs_failed(response) - // - // inner function for activate the jobs from queue - // called from queue + // inner function for activate the jobs from queue (called from queue) // friend JobsQueue; - inline void jobs_start(const JobsTypeT &jobs_type, const JobsID & /* jobs_id */) + inline void jobs_schedule(const JobsTypeT &jobs_type, const JobsID & /* jobs_id */) + { + m_thread_pool.jobs_schedule(m_config.m_types[jobs_type].m_group); + } + + // + // jobs states + // + inline void jobs_set_progress(const JobsID &jobs_id, const int &progress) + { + auto *jobs_item = jobs_get(jobs_id); + if (!jobs_item) { + return; + } + + (*jobs_item)->set_progress(progress); + + if (progress == 100) { + jobs_finished(jobs_id); + } + } + + inline void jobs_waitforchildren(const std::vector> &jobs_items) + { + // set the jobs as waitforchildren only if there are children otherwise advance to finish + jobs_set_state(jobs_items, small::jobsimpl::EnumJobsState::kTimeout); + } + + inline void jobs_timeout(const std::vector &jobs_ids) { - m_thread_pool.jobs_start(m_config.m_types[jobs_type].m_group); + // set the jobs as timeout if it is not finished until now (called from queue) + jobs_set_state(jobs_ids, small::jobsimpl::EnumJobsState::kTimeout); } // - // set the jobs as timeout if it is not finished until now - // called from queue + // apply state // - inline std::vector> jobs_timeout(const std::vector &jobs_ids) + inline void jobs_set_state(const JobsID &jobs_id, const small::jobsimpl::EnumJobsState &jobs_state) + { + auto *jobs_item = jobs_get(jobs_id); + if (!jobs_item) { + return; + } + + auto ret = jobs_set_state(*jobs_item, jobs_state); + if (ret) { + jobs_completed({*jobs_item}); + } + } + + inline void jobs_set_state(const std::vector &jobs_ids, const small::jobsimpl::EnumJobsState &jobs_state) + { + auto jobs_items = jobs_get(jobs_ids); + jobs_set_state(jobs_items, jobs_state); + } + + inline void jobs_set_state(const std::vector> &jobs_items, const small::jobsimpl::EnumJobsState &jobs_state) { - std::vector> jobs_items = m_queue.jobs_get(jobs_ids); - std::vector> timeout_items; - timeout_items.reserve(jobs_items.size()); + std::vector> changed_items; + changed_items.reserve(jobs_items.size()); for (auto &jobs_item : jobs_items) { - // set the jobs as timeout if it is not finished until now - if (jobs_item->state.is_state_finished()) { - continue; + auto ret = jobs_set_state(jobs_item, jobs_state); + if (ret) { + changed_items.push_back(jobs_item); } + } + + jobs_completed(changed_items); + } + + inline std::size_t jobs_set_state(std::shared_ptr jobs_item, small::jobsimpl::EnumJobsState jobs_state) + { + // state is already the same + if (jobs_item->is_state(jobs_state)) { + return 0; + } - jobs_item->set_state_timeout(); - if (jobs_item->is_state_timeout()) { - timeout_items.push_back(jobs_item); + // set the jobs as timeout if it is not finished until now + if (jobs_state == small::jobsimpl::EnumJobsState::kTimeout && jobs_item->is_state_finished()) { + return 0; + } + + // set the jobs as waitforchildren only if there are children otherwise advance to finish + if (jobs_state == small::jobsimpl::EnumJobsState::kWaitChildren) { + std::unique_lock l(*this); + if (jobs_item->m_childrenIDs.size() == 0) { + jobs_state = small::jobsimpl::EnumJobsState::kFinished; } } - jobs_finished(timeout_items); + + jobs_item->set_state(jobs_state); + return jobs_item->is_state(jobs_state) ? 1 : 0; } // - // finish a job + // when a job is completed (finished/timeout/canceled/failed) // - inline void jobs_finished(const std::vector> &jobs_items) + inline void jobs_completed(const std::vector> &jobs_items) { // TODO call the custom function from config if exists // (this may be called from multiple places - queue timeout, do_action finished, above set state cancel, finish, ) // TODO delete only if there are no parents (delete all the finished children now) for (auto &jobs_item : jobs_items) { - m_queue.jobs_del(jobs_item->m_id); + m_queue.jobs_erase(jobs_item->m_id); } // TODO if it has parents call jobs_on_children_finished