Skip to content

Commit

Permalink
Add multiple features for jobs engine like parent child dependencies,…
Browse files Browse the repository at this point in the history
… throttling with sleep between requests, timeout for processing
  • Loading branch information
herrcristi committed Jan 11, 2025
1 parent d24a8bd commit e6a342d
Show file tree
Hide file tree
Showing 4 changed files with 283 additions and 109 deletions.
2 changes: 1 addition & 1 deletion include/impl/jobs_engine_thread_pool_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ namespace small::jobsimpl {
// when items are added to be processed in parent class the start scheduler should be called
// to trigger action (if needed for the new job group)
//
inline void jobs_start(const JobGroupT &job_group)
inline void jobs_schedule(const JobGroupT &job_group)
{
auto it = m_scheduler.find(job_group); // map is not changed, so can be access without locking
if (it == m_scheduler.end()) {
Expand Down
11 changes: 8 additions & 3 deletions include/impl/jobs_item_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,14 @@ namespace small::jobsimpl {
inline void set_state_failed () { set_state(EnumJobsState::kFailed); }
inline void set_state_cancelled () { set_state(EnumJobsState::kCancelled); }

inline bool is_state_inprogress () { return m_state.load() == EnumJobsState::kInProgress; }
inline bool is_state_finished () { return m_state.load() == EnumJobsState::kFinished; }
inline bool is_state_timeout () { return m_state.load() == EnumJobsState::kTimeout; }
inline bool is_state (const EnumJobsState &state) { return m_state.load() == state; }

inline bool is_state_inprogress () { return is_state(EnumJobsState::kInProgress); }
inline void is_state_waitchildren () { return is_state(EnumJobsState::kWaitChildren); }
inline bool is_state_finished () { return is_state(EnumJobsState::kFinished); }
inline bool is_state_timeout () { return is_state(EnumJobsState::kTimeout); }
inline void is_state_failed () { return is_state(EnumJobsState::kFailed); }
inline void is_state_cancelled () { return is_state(EnumJobsState::kCancelled); }
// clang-format on

//
Expand Down
179 changes: 107 additions & 72 deletions include/impl/jobs_queue_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

namespace small::jobsimpl {
//
// small queue helper class for jobs (parent caller must implement 'jobs_start', 'jobs_finished')
// small queue helper class for jobs (parent caller must implement 'jobs_schedule', 'jobs_finished')
//
template <typename JobsTypeT, typename JobsRequestT, typename JobsResponseT, typename JobsGroupT, typename JobsPrioT, typename ParentCallerT>
class jobs_queue
Expand All @@ -27,7 +27,9 @@ namespace small::jobsimpl {
using TimeClock = typename small::time_queue<JobDelayedItems>::TimeClock;
using TimeDuration = typename small::time_queue<JobDelayedItems>::TimeDuration;

public:
private:
friend ParentCallerT;

//
// jobs_queue
//
Expand Down Expand Up @@ -106,6 +108,10 @@ namespace small::jobsimpl {
return true;
}

//
// only this part is public
//
public:
//
// add items to be processed
// push_back only add the jobs item but does not start it
Expand Down Expand Up @@ -354,63 +360,6 @@ namespace small::jobsimpl {
return push_back_and_start_child(parent_jobs_id, child_priority, std::make_shared<JobsItem>(child_jobs_type, std::forward<JobsRequestT>(child_jobs_req)), child_jobs_id);
}

//
// set relationship parent-child
//
inline std::size_t jobs_parent_child(const JobsID &parent_jobs_id, const JobsID &child_jobs_id)
{
std::unique_lock l(m_lock);

auto *parent_jobs_item = jobs_get(parent_jobs_id);
if (!parent_jobs_item) {
return 0;
}
auto *child_jobs_item = jobs_get(child_jobs_id);
if (!child_jobs_item) {
return 0;
}

jobs_parent_child(*parent_jobs_item, *child_jobs_item);
return 1;
}

//
// start the jobs
//
inline std::size_t jobs_start(const JobsPrioT &priority, const std::vector<JobsID> &jobs_ids)
{
std::size_t count = 0;
auto jobs_items = jobs_get(jobs_ids);
for (auto &jobs_item : jobs_items) {
auto ret = jobs_start(priority, jobs_item->m_type, jobs_item->m_id);
if (ret) {
++count;
}
}
return count;
}

inline std::size_t jobs_start(const JobsPrioT &priority, const JobsTypeT &jobs_type, const JobsID &jobs_id)
{
std::size_t ret = 0;

// optimization to get the queue from the type
// (instead of getting the group from type from m_config.m_types and then getting the queue from the m_groups_queues)
auto it_q = m_types_queues.find(jobs_type);
if (it_q != m_types_queues.end()) {
auto *q = it_q->second;
ret = q->push_back(priority, jobs_id);
}

if (ret) {
m_parent_caller.jobs_start(jobs_type, jobs_id);
} else {
// TODO maybe call m_parent.jobs_cancel(jobs_id)?
jobs_del(jobs_id);
}
return ret;
}

// no emplace_back do to returning the jobs_id

//
Expand All @@ -425,6 +374,10 @@ namespace small::jobsimpl {
template <typename _Rep, typename _Period>
inline std::size_t push_back_and_start_delay_for(const std::chrono::duration<_Rep, _Period> &__rtime, const JobsPrioT &priority, std::shared_ptr<JobsItem> jobs_item, JobsID *jobs_id = nullptr)
{
if (is_exit()) {
return 0;
}

auto id = jobs_add(jobs_item);
if (jobs_id) {
*jobs_id = id;
Expand All @@ -446,6 +399,10 @@ namespace small::jobsimpl {

inline std::size_t push_back_and_start_delay_until(const std::chrono::time_point<TimeClock, TimeDuration> &__atime, const JobsPrioT &priority, std::shared_ptr<JobsItem> jobs_item, JobsID *jobs_id = nullptr)
{
if (is_exit()) {
return 0;
}

auto id = jobs_add(jobs_item);
if (jobs_id) {
*jobs_id = id;
Expand All @@ -460,6 +417,7 @@ namespace small::jobsimpl {

// TODO add push_back_child_....()

private:
// clang-format off
//
// signal exit
Expand Down Expand Up @@ -539,21 +497,28 @@ namespace small::jobsimpl {
}

private:
friend ParentCallerT;

//
// get group queue
// called from parent jobs engine
// get jobs group queue
//
inline JobsQueue *get_group_queue(const JobsGroupT &jobs_group)
inline JobsQueue *get_jobs_group_queue(const JobsGroupT &jobs_group)
{
auto it = m_groups_queues.find(jobs_group);
return it != m_groups_queues.end() ? &it->second : nullptr;
}

//
// get jobs type queue
//
inline JobsQueue *get_jobs_type_queue(const JobsTypeT &jobs_type)
{
// optimization to get the queue from the type
// (instead of getting the group from type from m_config.m_types and then getting the queue from the m_groups_queues)
auto it = m_types_queues.find(jobs_type);
return it != m_types_queues.end() ? it->second : nullptr;
}

//
// get job items
// called from parent jobs engine
//
inline std::vector<std::shared_ptr<JobsItem>> jobs_get(const std::vector<JobsID> &jobs_ids)
{
Expand All @@ -573,7 +538,6 @@ namespace small::jobsimpl {
return jobs_items; // will be moved
}

// internal jobs_get
inline std::shared_ptr<JobsItem> *jobs_get(const JobsID &jobs_id)
{
std::unique_lock l(m_lock);
Expand All @@ -599,18 +563,72 @@ namespace small::jobsimpl {
m_jobs.emplace(id, jobs_item);

// add it to the timeout queue
auto it_timeout = m_types_timeouts.find(jobs_item->m_type);
if (it_timeout != m_types_timeouts.end()) {
m_timeout_queue.queue().push_delay_for(it_timeout->second, id);
jobs_start_timeout(jobs_item);
return id;
}

//
// start the jobs
//
inline std::size_t jobs_start(const JobsPrioT &priority, const std::vector<JobsID> &jobs_ids)
{
std::size_t count = 0;
auto jobs_items = jobs_get(jobs_ids);
for (auto &jobs_item : jobs_items) {
auto ret = jobs_start(priority, jobs_item->m_type, jobs_item->m_id);
if (ret) {
++count;
}
}
return count;
}

return id;
inline std::size_t jobs_start(const JobsPrioT &priority, const JobsID &jobs_id)
{
auto *jobs_item = jobs_get(jobs_id);
if (!jobs_item) {
return 0;
}
return jobs_start(priority, (*jobs_item)->m_type, (*jobs_item)->m_id);
}

inline std::size_t jobs_start(const JobsPrioT &priority, const JobsTypeT &jobs_type, const JobsID &jobs_id)
{
std::size_t ret = 0;

auto *q = get_jobs_type_queue(jobs_type);
if (q) {
ret = q->push_back(priority, jobs_id);
}

if (ret) {
m_parent_caller.jobs_schedule(jobs_type, jobs_id);
} else {
// TODO maybe call m_parent.jobs_failed(jobs_id)?
jobs_erase(jobs_id);
}
return ret;
}

//
// add it to the timeout queue
//
inline std::size_t jobs_start_timeout(std::shared_ptr<JobsItem> jobs_item)
{
std::unique_lock l(m_lock);

// only if job type has config a timeout
auto it_timeout = m_types_timeouts.find(jobs_item->m_type);
if (it_timeout == m_types_timeouts.end()) {
return 0;
}
return m_timeout_queue.queue().push_delay_for(it_timeout->second, jobs_item->m_id);
}

//
// delete jobs item
// erase jobs item
//
inline void jobs_del(const JobsID &jobs_id)
inline void jobs_erase(const JobsID &jobs_id)
{
std::unique_lock l(m_lock);
m_jobs.erase(jobs_id);
Expand All @@ -619,6 +637,23 @@ namespace small::jobsimpl {
//
// set relationship parent-child
//
inline std::size_t jobs_parent_child(const JobsID &parent_jobs_id, const JobsID &child_jobs_id)
{
std::unique_lock l(m_lock);

auto *parent_jobs_item = jobs_get(parent_jobs_id);
if (!parent_jobs_item) {
return 0;
}
auto *child_jobs_item = jobs_get(child_jobs_id);
if (!child_jobs_item) {
return 0;
}

jobs_parent_child(*parent_jobs_item, *child_jobs_item);
return 1;
}

inline void jobs_parent_child(std::shared_ptr<JobsItem> parent_jobs_item, std::shared_ptr<JobsItem> child_jobs_item)
{
std::unique_lock l(m_lock);
Expand Down
Loading

0 comments on commit e6a342d

Please sign in to comment.