From 86a8600c1059ed953a6fc91f29709ad9e3b5c9ce Mon Sep 17 00:00:00 2001 From: Cristian Herghelegiu Date: Tue, 7 Jan 2025 21:20:15 +0200 Subject: [PATCH] Add multiple features for jobs engine like parent child dependencies, throttling with sleep between requests, timeout for processing --- examples/examples_jobs_engine.h | 11 +++++------ include/jobs_config.h | 3 +-- include/jobs_engine.h | 10 +++------- 3 files changed, 9 insertions(+), 15 deletions(-) diff --git a/examples/examples_jobs_engine.h b/examples/examples_jobs_engine.h index 334d513..10a7e19 100644 --- a/examples/examples_jobs_engine.h +++ b/examples/examples_jobs_engine.h @@ -60,12 +60,11 @@ namespace examples::jobs_engine { // config object for priorities, groups, types, threads, timeouts, sleeps, etc // JobsEng::JobsConfig config{ - .m_engine = {.m_threads_count = 0, // dont start any thread yet - .m_threads_count_finished = 1, // override how many threads to use for internal processing of finished states - .m_config_prio = {.priorities = {{small::EnumPriorities::kHighest, 2}, - {small::EnumPriorities::kHigh, 2}, - {small::EnumPriorities::kNormal, 2}, - {small::EnumPriorities::kLow, 1}}}}, // overall config with default priorities + .m_engine = {.m_threads_count = 0, // dont start any thread yet + .m_config_prio = {.priorities = {{small::EnumPriorities::kHighest, 2}, + {small::EnumPriorities::kHigh, 2}, + {small::EnumPriorities::kNormal, 2}, + {small::EnumPriorities::kLow, 1}}}}, // overall config with default priorities .m_default_function_processing = jobs_function_processing, // default processing function, better use jobs.add_default_function_processing to set it diff --git a/include/jobs_config.h b/include/jobs_config.h index a0f1770..ebef9bd 100644 --- a/include/jobs_config.h +++ b/include/jobs_config.h @@ -22,8 +22,7 @@ namespace small { // config for the entire jobs engine struct ConfigJobsEngine { - int m_threads_count{8}; // how many total threads for processing - int m_threads_count_finished{2}; // how many threads (out of total m_threads_count) to use for processing finished states + int m_threads_count{8}; // how many total threads for processing small::config_prio_queue m_config_prio{}; }; diff --git a/include/jobs_engine.h b/include/jobs_engine.h index 53581a6..a9f6917 100644 --- a/include/jobs_engine.h +++ b/include/jobs_engine.h @@ -281,9 +281,6 @@ namespace small { if (!m_config.m_default_function_on_children_finished) { m_config.m_default_function_on_children_finished = std::bind(&jobs_engine::jobs_on_children_finished, this, std::placeholders::_1 /*jobs_items*/); } - if (!m_config.m_default_function_finished) { - m_config.m_default_function_finished = std::bind(&jobs_engine::jobs_finished, this, std::placeholders::_1 /*jobs_items*/); - } m_config.apply_default_function_processing(); m_config.apply_default_function_on_children_finished(); @@ -358,9 +355,6 @@ namespace small { continue; } - // TODO if an item has timeout add it to a time queue with callback that marks that item as timeout - // TODO timeout should be set only if it is not finished/failed/cancelled - // process specific jobs by type typename JobsConfig::ConfigProcessing type_config; it_cfg_type->second.m_function_processing(jobs, type_config); @@ -395,6 +389,7 @@ namespace small { } // TODO external set state for a job moves it to proper wait for children or finished + // TODO add functions jobs_cancel, jobs_finish(response), jobs_failed(response) // // inner function for activate the jobs from queue @@ -409,9 +404,10 @@ namespace small { inline void jobs_finished(const std::vector> &jobs_items) { // TODO call the custom function from config if exists + // (this may be called from multiple places - queue timeout, do_action finished, above set state cancel, finish, ) for (auto &jobs_item : jobs_items) { - m_queue.jobs_del(jobs_item->id); + m_queue.jobs_del(jobs_item->m_id); } }