Skip to content

Commit

Permalink
Add multiple features for jobs engine like parent child dependencies,…
Browse files Browse the repository at this point in the history
… throttling with sleep between requests, timeout for processing
  • Loading branch information
herrcristi committed Jan 7, 2025
1 parent 4c438dc commit 86a8600
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 15 deletions.
11 changes: 5 additions & 6 deletions examples/examples_jobs_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,11 @@ namespace examples::jobs_engine {
// config object for priorities, groups, types, threads, timeouts, sleeps, etc
//
JobsEng::JobsConfig config{
.m_engine = {.m_threads_count = 0, // dont start any thread yet
.m_threads_count_finished = 1, // override how many threads to use for internal processing of finished states
.m_config_prio = {.priorities = {{small::EnumPriorities::kHighest, 2},
{small::EnumPriorities::kHigh, 2},
{small::EnumPriorities::kNormal, 2},
{small::EnumPriorities::kLow, 1}}}}, // overall config with default priorities
.m_engine = {.m_threads_count = 0, // dont start any thread yet
.m_config_prio = {.priorities = {{small::EnumPriorities::kHighest, 2},
{small::EnumPriorities::kHigh, 2},
{small::EnumPriorities::kNormal, 2},
{small::EnumPriorities::kLow, 1}}}}, // overall config with default priorities

.m_default_function_processing = jobs_function_processing, // default processing function, better use jobs.add_default_function_processing to set it

Expand Down
3 changes: 1 addition & 2 deletions include/jobs_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ namespace small {
// config for the entire jobs engine
struct ConfigJobsEngine
{
int m_threads_count{8}; // how many total threads for processing
int m_threads_count_finished{2}; // how many threads (out of total m_threads_count) to use for processing finished states
int m_threads_count{8}; // how many total threads for processing
small::config_prio_queue<JobsPrioT> m_config_prio{};
};

Expand Down
10 changes: 3 additions & 7 deletions include/jobs_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -281,9 +281,6 @@ namespace small {
if (!m_config.m_default_function_on_children_finished) {
m_config.m_default_function_on_children_finished = std::bind(&jobs_engine::jobs_on_children_finished, this, std::placeholders::_1 /*jobs_items*/);
}
if (!m_config.m_default_function_finished) {
m_config.m_default_function_finished = std::bind(&jobs_engine::jobs_finished, this, std::placeholders::_1 /*jobs_items*/);
}

m_config.apply_default_function_processing();
m_config.apply_default_function_on_children_finished();
Expand Down Expand Up @@ -358,9 +355,6 @@ namespace small {
continue;
}

// TODO if an item has timeout add it to a time queue with callback that marks that item as timeout
// TODO timeout should be set only if it is not finished/failed/cancelled

// process specific jobs by type
typename JobsConfig::ConfigProcessing type_config;
it_cfg_type->second.m_function_processing(jobs, type_config);
Expand Down Expand Up @@ -395,6 +389,7 @@ namespace small {
}

// TODO external set state for a job moves it to proper wait for children or finished
// TODO add functions jobs_cancel, jobs_finish(response), jobs_failed(response)

//
// inner function for activate the jobs from queue
Expand All @@ -409,9 +404,10 @@ namespace small {
inline void jobs_finished(const std::vector<std::shared_ptr<JobsItem>> &jobs_items)
{
// TODO call the custom function from config if exists
// (this may be called from multiple places - queue timeout, do_action finished, above set state cancel, finish, )

for (auto &jobs_item : jobs_items) {
m_queue.jobs_del(jobs_item->id);
m_queue.jobs_del(jobs_item->m_id);
}
}

Expand Down

0 comments on commit 86a8600

Please sign in to comment.