From 253b16e8049ba659021e331741ede9e651abf4ae Mon Sep 17 00:00:00 2001 From: Cristian Herghelegiu Date: Tue, 14 Jan 2025 21:18:04 +0200 Subject: [PATCH] Add multiple features for jobs engine like parent child dependencies, throttling with sleep between requests, timeout for processing --- .clang-format | 1 + examples/examples_jobs_engine.h | 100 +++++++++++++++++--------------- include/impl/jobs_item_impl.h | 1 + include/impl/jobs_queue_impl.h | 53 ++++------------- include/jobs_engine.h | 94 +++++++++++++++++++++++------- 5 files changed, 137 insertions(+), 112 deletions(-) diff --git a/.clang-format b/.clang-format index dbe3254..dccc649 100644 --- a/.clang-format +++ b/.clang-format @@ -12,6 +12,7 @@ AlignConsecutiveAssignments: AlignFunctionDeclarations: true AlignConsecutiveBitFields: true AlignConsecutiveMacros: true +# PointerAlignment: Left BraceWrapping: AfterEnum: true AfterStruct: true diff --git a/examples/examples_jobs_engine.h b/examples/examples_jobs_engine.h index 0f648e2..282902d 100644 --- a/examples/examples_jobs_engine.h +++ b/examples/examples_jobs_engine.h @@ -77,7 +77,7 @@ namespace examples::jobs_engine { .m_types = {{JobsType::kJobsType1, {.m_group = JobsGroupType::kJobsGroup12}}, {JobsType::kJobsType2, {.m_group = JobsGroupType::kJobsGroup12}}, - {JobsType::kJobsType3, {.m_group = JobsGroupType::kJobsGroup3, .m_timeout = std::chrono::milliseconds(500)}}, + {JobsType::kJobsType3, {.m_group = JobsGroupType::kJobsGroup3, .m_timeout = std::chrono::milliseconds(700)}}, {JobsType::kJobsDatabase, {.m_group = JobsGroupType::kJobsGroupDatabase}}, {JobsType::kJobsCache, {.m_group = JobsGroupType::kJobsGroupCache}}}}; @@ -88,53 +88,52 @@ namespace examples::jobs_engine { // create a cache server (with workers to simulate access to it) // (as an external engine outside the jobs engine for demo purposes) - small::worker_thread cache_server({.threads_count = 1}, [&](auto &w /*this*/, const auto &items) { - for (auto &i : items) { - std::cout << "thread " << std::this_thread::get_id() - << " CACHE processing" - << " {" << i << "}" << "\n"; + small::worker_thread> cache_server({.threads_count = 1}, [&](auto &w /*this*/, const auto &items) { + for (auto &[job_id, req] : items) { + std::cout << "worker thread " << std::this_thread::get_id() + << " CACHE processing" + << " {" << req.first << ", " << req.second << ", jobid=" << job_id << "}" + << " time " << small::toISOString(small::timeNow()) + << "\n"; // mark the jobs id associated as succeeded (for demo purposes to avoid creating other structures) - jobs.jobs_finished(i, (Response)i); + jobs.jobs_finished(job_id, (Response)job_id); } // sleep long enough // no coalesce for demo purposes (sleep 500) so 3rd parent items is finished due to database and not cache server small::sleep(500); }); + auto fn_print_item = [](auto item, std::string fn_type) { + std::cout << "thread " << std::this_thread::get_id() + << std::setw(10) << fn_type + << " processing " + << "{" + << " jobid=" << std::setw(2) << item->m_id + << " type=" << std::setw(1) << (int)item->m_type + << " req.int=" << std::setw(2) << item->m_request.first << "," + << " req.str=\"" << item->m_request.second << "\"" + << "}" + << " time " << small::toISOString(small::timeNow()) + << "\n"; + }; + // default processing used for job type 3 with custom delay in between requests // one request will succeed and one request will timeout for demo purposes - jobs.config_default_function_processing([](auto &j /*this jobs engine*/, const auto &jobs_items, auto &jobs_config) { + jobs.config_default_function_processing([&](auto &j /*this jobs engine*/, const auto &jobs_items, auto &jobs_config) { for (auto &item : jobs_items) { - std::cout << "thread " << std::this_thread::get_id() - << " DEFAULT processing " - << "{" - << " type=" << (int)item->m_type - << " req.int=" << item->m_request.first << "," - << " req.str=\"" << item->m_request.second << "\"" - << "}" - << " ref count " << item.use_count() - << " time " << small::toISOString(small::timeNow()) - << "\n"; + fn_print_item(item, "DEFAULT"); } // set a custom delay (timeout for job3 is 500 ms) - jobs_config.m_delay_next_request = std::chrono::milliseconds(1000); + jobs_config.m_delay_next_request = std::chrono::milliseconds(500); + small::sleep(500); // TODO remove this after delay works }); // add specific function for job1 (calling the function from jobs intead of config allows to pass the engine and extra param) jobs.config_jobs_function_processing(JobsType::kJobsType1, [&](auto &j /*this jobs engine*/, const auto &jobs_items, auto & /* config */, auto b /*extra param b*/) { for (auto &item : jobs_items) { - std::cout << "thread " << std::this_thread::get_id() - << " JOB1 processing " - << "{" - << " type=" << (int)item->m_type - << " req.int=" << item->m_request.first << "," - << " req.str=\"" << item->m_request.second << "\"" - << "}" - << " ref count " << item.use_count() - << " time " << small::toISOString(small::timeNow()) - << "\n"; + fn_print_item(item, "JOB1"); // add 2 more children jobs for current one for database and server cache JobsEng::JobsID jobs_child_db_id{}; @@ -152,7 +151,7 @@ namespace examples::jobs_engine { j.jobs_start(small::EnumPriorities::kNormal, jobs_child_db_id); // jobs_child_cache_id has no threads to execute, it has external executors - cache_server.push_back(jobs_child_cache_id); + cache_server.push_back({jobs_child_cache_id, item->m_request}); } small::sleep(30); }, 5 /*param b*/); @@ -165,25 +164,16 @@ namespace examples::jobs_engine { // TODO set state merge daca e doar o dependinta, daca sunt mai multe atunci ar tb o functie custom - childProcessing (desi are sau nu are children - sau cum fac un dummy children - poate cu thread_count 0?) // add specific function for job2 - jobs.config_jobs_function_processing(JobsType::kJobsType2, [](auto &j /*this jobs engine*/, const auto &jobs_items, auto & /* config */) { - bool first_job = true; + jobs.config_jobs_function_processing(JobsType::kJobsType2, [&](auto &j /*this jobs engine*/, const auto &jobs_items, auto &jobs_config) { + static bool first_job = true; for (auto &item : jobs_items) { - std::cout << "thread " << std::this_thread::get_id() - << " JOB2 processing " - << "{" - << " type=" << (int)item->m_type - << " req.int=" << item->m_request.first << "," - << " req.str=\"" << item->m_request.second << "\"" - << "}" - << " ref count " << item.use_count() - << " time " << small::toISOString(small::timeNow()) - << "\n"; + fn_print_item(item, "JOB2"); if (first_job) { // for type 2 only database children (for demo purposes no result will be used from database) auto ret = j.queue().push_back_and_start_child(item->m_id /*parent*/, - small::EnumPriorities::kNormal, - JobsType::kJobsDatabase, + small::EnumPriorities::kNormal, + JobsType::kJobsDatabase, item->m_request); if (!ret) { j.jobs_failed(item->m_id); @@ -195,7 +185,21 @@ namespace examples::jobs_engine { first_job = false; } // TODO config to wait after request (even if it is not specified in the global config - so custom throttle) - small::sleep(30); }); + small::sleep(30); + jobs_config.m_delay_next_request = std::chrono::milliseconds(30); + }); + + // add specific function for job2 + jobs.config_jobs_function_processing(JobsType::kJobsDatabase, [&](auto &j /*this jobs engine*/, const auto &jobs_items, auto & /* config */) { + for (auto &item : jobs_items) { + // simulate long db call + small::sleep(200); + + fn_print_item(item, "DATABASE"); + + // this job will be auto-finished + } + }); // TODO add function for database where demonstrate coalesce of 3 items (sleep 1000) // TODO add function for cache server - no coalesce for demo purposes (sleep 500) so 3rd parent items is finished due to database and not cache server @@ -207,8 +211,8 @@ namespace examples::jobs_engine { std::vector jobs_ids; // type3 one request will succeed and one request will timeout for demo purposes - jobs.queue().push_back_and_start(small::EnumPriorities::kNormal, JobsType::kJobsType3, {3, "normal3"}, &jobs_id); - jobs.queue().push_back_and_start(small::EnumPriorities::kHigh, JobsType::kJobsType3, {3, "high3"}, &jobs_id); + jobs.queue().push_back_and_start_delay_for(std::chrono::milliseconds(100), small::EnumPriorities::kNormal, JobsType::kJobsType3, {3, "normal3"}, &jobs_id); + jobs.queue().push_back_and_start_delay_for(std::chrono::milliseconds(100), small::EnumPriorities::kHigh, JobsType::kJobsType3, {3, "high3"}, &jobs_id); // type2 only the first request succeeds and waits for child the other fails from the start jobs.queue().push_back_and_start(small::EnumPriorities::kNormal, JobsType::kJobsType2, {2, "normal2"}, &jobs_id); @@ -242,6 +246,8 @@ namespace examples::jobs_engine { auto ret = jobs.wait_for(std::chrono::milliseconds(100)); // wait to finished std::cout << "wait for with timeout, ret = " << static_cast(ret) << " as timeout\n"; jobs.wait(); // wait here for jobs to finish due to exit flag + cache_server.signal_exit_force(); + cache_server.wait(); std::cout << "size = " << jobs.size() << "\n"; diff --git a/include/impl/jobs_item_impl.h b/include/impl/jobs_item_impl.h index 7d83312..6500fb6 100644 --- a/include/impl/jobs_item_impl.h +++ b/include/impl/jobs_item_impl.h @@ -101,6 +101,7 @@ namespace small::jobsimpl { inline void set_state_cancelled () { set_state(EnumJobsState::kCancelled); } inline bool is_state (const EnumJobsState &state) { return m_state.load() == state; } + static bool is_state_complete (const EnumJobsState &state) { return state >= EnumJobsState::kFinished; } inline bool is_state_inprogress () { return is_state(EnumJobsState::kInProgress); } inline void is_state_waitchildren () { return is_state(EnumJobsState::kWaitChildren); } diff --git a/include/impl/jobs_queue_impl.h b/include/impl/jobs_queue_impl.h index ec5ca74..ca7b70f 100644 --- a/include/impl/jobs_queue_impl.h +++ b/include/impl/jobs_queue_impl.h @@ -9,7 +9,7 @@ namespace small::jobsimpl { // - // small queue helper class for jobs (parent caller must implement 'jobs_schedule', 'jobs_finished') + // small queue helper class for jobs (parent caller must implement 'jobs_add', 'jobs_schedule', 'jobs_finished') // template class jobs_queue @@ -94,7 +94,7 @@ namespace small::jobsimpl { // config job types // m_types_queues will be initialized in the initial setup phase and will be accessed without locking afterwards // - inline bool config_jobs_type(const JobsTypeT &jobs_type, const JobsGroupT &jobs_group, const std::optional &jobs_timeout) + inline bool config_jobs_type(const JobsTypeT &jobs_type, const JobsGroupT &jobs_group) { auto it_g = m_groups_queues.find(jobs_group); if (it_g == m_groups_queues.end()) { @@ -102,9 +102,6 @@ namespace small::jobsimpl { } m_types_queues[jobs_type] = &it_g->second; - if (jobs_timeout) { - m_types_timeouts[jobs_type] = *jobs_timeout; - } return true; } @@ -562,8 +559,9 @@ namespace small::jobsimpl { jobs_item->m_id = id; m_jobs.emplace(id, jobs_item); - // add it to the timeout queue - jobs_start_timeout(jobs_item); + // call parent for extra processing + m_parent_caller.jobs_add(jobs_item); + return id; } @@ -610,21 +608,6 @@ namespace small::jobsimpl { return ret; } - // - // add it to the timeout queue - // - inline std::size_t jobs_start_timeout(std::shared_ptr jobs_item) - { - std::unique_lock l(m_lock); - - // only if job type has config a timeout - auto it_timeout = m_types_timeouts.find(jobs_item->m_type); - if (it_timeout == m_types_timeouts.end()) { - return 0; - } - return m_timeout_queue.queue().push_delay_for(it_timeout->second, jobs_item->m_id); - } - // // erase jobs item // @@ -685,34 +668,18 @@ namespace small::jobsimpl { return count; } - // - // inner thread function for timeout items - // called from m_timeout_queue - // - using JobsQueueTimeout = small::time_queue_thread; - friend JobsQueueTimeout; - - inline std::size_t push_back(std::vector &&jobs_ids) - { - m_parent_caller.jobs_timeout(jobs_ids); - return jobs_ids.size(); - } - private: // // members // - mutable small::base_lock m_lock; // global locker - std::atomic m_jobs_seq_id{}; // to get the next jobs id - std::unordered_map> m_jobs; // current jobs - std::unordered_map m_groups_queues; // map of queues by group - std::unordered_map m_types_queues; // optimize to have queues by type (which reference queues by group) - std::unordered_map m_types_timeouts; // timeouts for types + mutable small::base_lock m_lock; // global locker + std::atomic m_jobs_seq_id{}; // to get the next jobs id + std::unordered_map> m_jobs; // current jobs + std::unordered_map m_groups_queues; // map of queues by group + std::unordered_map m_types_queues; // optimize to have queues by type (which reference queues by group) JobQueueDelayedT m_delayed_items{*this}; // queue of delayed items - JobsQueueTimeout m_timeout_queue{*this}; // for timeout elements - ParentCallerT &m_parent_caller; // jobs engine }; } // namespace small::jobsimpl diff --git a/include/jobs_engine.h b/include/jobs_engine.h index f24fae4..42a4e0d 100644 --- a/include/jobs_engine.h +++ b/include/jobs_engine.h @@ -143,6 +143,7 @@ namespace small { { m_config.m_engine.m_threads_count = threads_count; m_queue.start_threads(threads_count); + m_timeout_queue.start_threads(); m_thread_pool.start_threads(threads_count); } @@ -320,7 +321,7 @@ namespace small { // // signal exit // - inline void signal_exit_force () { m_thread_pool.signal_exit_force(); m_queue.signal_exit_force(); } + inline void signal_exit_force () { m_thread_pool.signal_exit_force(); m_timeout_queue.queue().signal_exit_force(); m_queue.signal_exit_force(); } inline void signal_exit_when_done () { m_queue.signal_exit_when_done(); /*when the delayed will be finished will signal the active queue items to exit when done, then the processing pool */ } // to be used in processing function @@ -338,7 +339,13 @@ namespace small { m_queue.wait(); // only now can signal exit when done for workers (when no more items exists) - return m_thread_pool.wait(); + m_thread_pool.wait(); + + // the timeouts are no longer necessay, force exit + m_timeout_queue.queue().signal_exit_force(); + m_timeout_queue.wait(); + + return small::EnumLock::kExit; } // wait some time then signal exit @@ -366,7 +373,16 @@ namespace small { } // only now can signal exit when done for workers (when no more items exists) - return m_thread_pool.wait_until(__atime); + delayed_status = m_thread_pool.wait_until(__atime); + if (delayed_status == small::EnumLock::kTimeout) { + return small::EnumLock::kTimeout; + } + + // the timeouts are no longer necessay, force exit + m_timeout_queue.queue().signal_exit_force(); + m_timeout_queue.wait(); + + return small::EnumLock::kExit; } private: @@ -398,7 +414,7 @@ namespace small { m_config.apply_default_function_finished(); for (auto &[jobs_type, jobs_type_config] : m_config.m_types) { - m_queue.config_jobs_type(jobs_type, jobs_type_config.m_group, jobs_type_config.m_timeout); + m_queue.config_jobs_type(jobs_type, jobs_type_config.m_group); } // auto start threads if count > 0 otherwise threads should be manually started @@ -492,15 +508,40 @@ namespace small { } // - // inner function for activate the jobs from queue (called from queue) + // inner function for extra processing after addding the jobs into queue (called from queue) // friend JobsQueue; + inline void jobs_add(std::shared_ptr jobs_item) + { + // add it to the timeout queue + // only if job type has config a timeout + auto timeout = m_config.m_types[jobs_item->m_type].m_timeout; + if (timeout) { + m_timeout_queue.queue().push_delay_for(*timeout, jobs_item->m_id); + } + } + + // + // inner function for activate the jobs from queue (called from queue) + // inline void jobs_schedule(const JobsTypeT &jobs_type, const JobsID & /* jobs_id */) { m_thread_pool.jobs_schedule(m_config.m_types[jobs_type].m_group); } + // + // inner thread function for timeout items (called from m_timeout_queue) + // + using JobsQueueTimeout = small::time_queue_thread; + friend JobsQueueTimeout; + + inline std::size_t push_back(std::vector &&jobs_ids) + { + jobs_timeout(jobs_ids); + return jobs_ids.size(); + } + // // jobs area transform from id to jobs_item // @@ -601,7 +642,7 @@ namespace small { inline std::size_t jobs_waitforchildren(const std::vector> &jobs_items) { // set the jobs as waitforchildren only if there are children otherwise advance to finish - return jobs_set_state(jobs_items, small::jobsimpl::EnumJobsState::kTimeout); + return jobs_set_state(jobs_items, small::jobsimpl::EnumJobsState::kWaitChildren); } // @@ -609,8 +650,10 @@ namespace small { // inline bool jobs_set_state(const std::shared_ptr &jobs_item, const small::jobsimpl::EnumJobsState &jobs_state) { - auto ret = jobs_apply_state(jobs_item, jobs_state); - if (ret) { + small::jobsimpl::EnumJobsState set_state = jobs_state; + + auto ret = jobs_apply_state(jobs_item, jobs_state, &set_state); + if (ret && JobsItem::is_state_complete(set_state)) { jobs_completed({jobs_item}); } return ret; @@ -618,42 +661,48 @@ namespace small { inline std::size_t jobs_set_state(const std::vector> &jobs_items, const small::jobsimpl::EnumJobsState &jobs_state) { - std::vector> changed_items; - changed_items.reserve(jobs_items.size()); + small::jobsimpl::EnumJobsState set_state = jobs_state; + std::vector> completed_items; + completed_items.reserve(jobs_items.size()); + std::size_t changed_count{}; for (auto &jobs_item : jobs_items) { - auto ret = jobs_apply_state(jobs_item, jobs_state); + auto ret = jobs_apply_state(jobs_item, jobs_state, &set_state); if (ret) { - changed_items.push_back(jobs_item); + ++changed_count; + if (JobsItem::is_state_complete(set_state)) { + completed_items.push_back(jobs_item); + } } } - jobs_completed(changed_items); - return changed_items.size(); + jobs_completed(completed_items); + return changed_count; } - inline bool jobs_apply_state(std::shared_ptr jobs_item, small::jobsimpl::EnumJobsState jobs_state) + inline bool jobs_apply_state(std::shared_ptr jobs_item, const small::jobsimpl::EnumJobsState &jobs_state, small::jobsimpl::EnumJobsState *jobs_set_state) { + *jobs_set_state = jobs_state; // state is already the same - if (jobs_item->is_state(jobs_state)) { + if (jobs_item->is_state(*jobs_set_state)) { return false; } // set the jobs as timeout if it is not finished until now - if (jobs_state == small::jobsimpl::EnumJobsState::kTimeout && jobs_item->is_state_finished()) { + if (*jobs_set_state == small::jobsimpl::EnumJobsState::kTimeout && jobs_item->is_state_finished()) { return false; } // set the jobs as waitforchildren only if there are children otherwise advance to finish - if (jobs_state == small::jobsimpl::EnumJobsState::kWaitChildren) { + if (*jobs_set_state == small::jobsimpl::EnumJobsState::kWaitChildren) { std::unique_lock l(*this); if (jobs_item->m_childrenIDs.size() == 0) { - jobs_state = small::jobsimpl::EnumJobsState::kFinished; + *jobs_set_state = small::jobsimpl::EnumJobsState::kFinished; } } - jobs_item->set_state(jobs_state); - return jobs_item->is_state(jobs_state); + jobs_item->set_state(*jobs_set_state); + return jobs_item->is_state(*jobs_set_state); } // @@ -688,6 +737,7 @@ namespace small { // JobsConfig m_config; JobsQueue m_queue{*this}; - small::jobsimpl::jobs_engine_thread_pool m_thread_pool{*this}; // for processing items (by group) using a pool of threads + JobsQueueTimeout m_timeout_queue{*this}; // for timeout elements + small::jobsimpl::jobs_engine_thread_pool m_thread_pool{*this}; // for processing items (by group) using a pool of threads }; } // namespace small