diff --git a/.clang-format b/.clang-format index dbe3254..dccc649 100644 --- a/.clang-format +++ b/.clang-format @@ -12,6 +12,7 @@ AlignConsecutiveAssignments: AlignFunctionDeclarations: true AlignConsecutiveBitFields: true AlignConsecutiveMacros: true +# PointerAlignment: Left BraceWrapping: AfterEnum: true AfterStruct: true diff --git a/README.md b/README.md index 0e2a308..0eeadfe 100644 --- a/README.md +++ b/README.md @@ -427,7 +427,7 @@ JobsEng::JobsConfig config{ // create jobs engine JobsEng jobs(config); ... -jobs.add_default_processing_function([](auto &j /*this jobs engine*/, const auto &jobs_items) { +jobs.config_default_function_processing([](auto &j /*this jobs engine*/, const auto &jobs_items) { for (auto &item : jobs_items) { ... } @@ -435,7 +435,7 @@ jobs.add_default_processing_function([](auto &j /*this jobs engine*/, const auto }); ... // add specific function for job1 (calling the function from jobs intead of config allows to pass the engine and extra param) -jobs.add_job_processing_function(JobsType::kJobsType1, [](auto &j /*this jobs engine*/, const auto &jobs_items, auto b /*extra param b*/) { +jobs.config_jobs_function_processing(JobsType::kJobsType1, [](auto &j /*this jobs engine*/, const auto &jobs_items, auto b /*extra param b*/) { for (auto &item : jobs_items) { ... } diff --git a/examples/examples_jobs_engine.h b/examples/examples_jobs_engine.h index faa97cb..282902d 100644 --- a/examples/examples_jobs_engine.h +++ b/examples/examples_jobs_engine.h @@ -16,105 +16,238 @@ namespace examples::jobs_engine { { std::cout << "Jobs Engine example 1\n"; + // + // create a complex example for each type 1,2,3 (like web requests) + // - priorities + // - dependencies calls (parent-child relationship) + // (with children for calls that take long time like database access or less time like cache server) + // - coalesce calls (multiple (later) calls resolved when first one is called) + // - timeout + // - throttle (call with some wait-sleep interval in between) + // + // type1 will create 2 children and will be finished when at least 1 is finished + // (this will demonstrate OR and custom children function) + // type2 will create 2 children and will be finished whenn ALL children are finished + // (this will demonstrate the default case AND and default children function) + // type3 will have no children and one call will timeout + // enum class JobsType { + kJobsNone = 0, kJobsType1, - kJobsType2 + kJobsType2, + kJobsType3, + kJobsDatabase, + kJobsCache, }; + enum class JobsGroupType { - kJobsGroup1 + kJobsGroup12, + kJobsGroup3, + kJobsGroupDatabase, + kJobsGroupCache, }; - using Request = std::pair; - using JobsEng = small::jobs_engine; + using Request = std::pair; + using Response = int; + using JobsEng = small::jobs_engine; - auto jobs_processing_function = [](const std::vector> &items) { + auto jobs_function_processing = [](const std::vector> &items, JobsEng::JobsConfig::ConfigProcessing & /* config */) { // this functions is defined without the engine params (it is here just for the example) - std::cout << "this function is defined without the engine params, called for " << (int)items[0]->type << "\n"; + std::cout << "this function is defined without the engine params, called for " << (int)items[0]->m_type << "\n"; }; + // + // config object for priorities, groups, types, threads, timeouts, sleeps, etc + // JobsEng::JobsConfig config{ - .m_engine = {.m_threads_count = 0 /*dont start any thread yet*/, - .m_config_prio = {.priorities = {{small::EnumPriorities::kHighest, 2}, - {small::EnumPriorities::kHigh, 2}, - {small::EnumPriorities::kNormal, 2}, - {small::EnumPriorities::kLow, 1}}}}, // overall config with default priorities - .m_default_processing_function = jobs_processing_function, // default processing function, better use jobs.add_default_processing_function to set it - .m_groups = { - {JobsGroupType::kJobsGroup1, {.m_threads_count = 1}}}, // config by jobs group - .m_types = { - {JobsType::kJobsType1, {.m_group = JobsGroupType::kJobsGroup1}}, - {JobsType::kJobsType2, {.m_group = JobsGroupType::kJobsGroup1}}, - }}; - - // create jobs engine + .m_engine = {.m_threads_count = 0, // dont start any thread yet + .m_config_prio = {.priorities = {{small::EnumPriorities::kHighest, 2}, + {small::EnumPriorities::kHigh, 2}, + {small::EnumPriorities::kNormal, 2}, + {small::EnumPriorities::kLow, 1}}}}, // overall config with default priorities + + .m_default_function_processing = jobs_function_processing, // default processing function, better use jobs.config_default_function_processing to set it + + .m_groups = {{JobsGroupType::kJobsGroup12, {.m_threads_count = 1}}, // config by jobs group + {JobsGroupType::kJobsGroup3, {.m_threads_count = 1, .m_delay_next_request = std::chrono::milliseconds(30)}}, + {JobsGroupType::kJobsGroupDatabase, {.m_threads_count = 1}}, // these requests will coalesce results for demo purposes + {JobsGroupType::kJobsGroupCache, {.m_threads_count = 0}}}, // no threads !!, these requests are executed outside of jobs engine for demo purposes + + .m_types = {{JobsType::kJobsType1, {.m_group = JobsGroupType::kJobsGroup12}}, + {JobsType::kJobsType2, {.m_group = JobsGroupType::kJobsGroup12}}, + {JobsType::kJobsType3, {.m_group = JobsGroupType::kJobsGroup3, .m_timeout = std::chrono::milliseconds(700)}}, + {JobsType::kJobsDatabase, {.m_group = JobsGroupType::kJobsGroupDatabase}}, + {JobsType::kJobsCache, {.m_group = JobsGroupType::kJobsGroupCache}}}}; + + // + // create jobs engine with the above config + // JobsEng jobs(config); - jobs.add_default_processing_function([](auto &j /*this jobs engine*/, const auto &jobs_items) { - for (auto &item : jobs_items) { - std::cout << "thread " << std::this_thread::get_id() - << " DEFAULT processing " - << "{" - << " type=" << (int)item->type - << " req.int=" << item->request.first << "," - << " req.str=\"" << item->request.second << "\"" - << "}" - << " ref count " << item.use_count() + // create a cache server (with workers to simulate access to it) + // (as an external engine outside the jobs engine for demo purposes) + small::worker_thread> cache_server({.threads_count = 1}, [&](auto &w /*this*/, const auto &items) { + for (auto &[job_id, req] : items) { + std::cout << "worker thread " << std::this_thread::get_id() + << " CACHE processing" + << " {" << req.first << ", " << req.second << ", jobid=" << job_id << "}" << " time " << small::toISOString(small::timeNow()) << "\n"; + + // mark the jobs id associated as succeeded (for demo purposes to avoid creating other structures) + jobs.jobs_finished(job_id, (Response)job_id); } - small::sleep(30); + // sleep long enough + // no coalesce for demo purposes (sleep 500) so 3rd parent items is finished due to database and not cache server + small::sleep(500); + }); + + auto fn_print_item = [](auto item, std::string fn_type) { + std::cout << "thread " << std::this_thread::get_id() + << std::setw(10) << fn_type + << " processing " + << "{" + << " jobid=" << std::setw(2) << item->m_id + << " type=" << std::setw(1) << (int)item->m_type + << " req.int=" << std::setw(2) << item->m_request.first << "," + << " req.str=\"" << item->m_request.second << "\"" + << "}" + << " time " << small::toISOString(small::timeNow()) + << "\n"; + }; + + // default processing used for job type 3 with custom delay in between requests + // one request will succeed and one request will timeout for demo purposes + jobs.config_default_function_processing([&](auto &j /*this jobs engine*/, const auto &jobs_items, auto &jobs_config) { + for (auto &item : jobs_items) { + fn_print_item(item, "DEFAULT"); + } + + // set a custom delay (timeout for job3 is 500 ms) + jobs_config.m_delay_next_request = std::chrono::milliseconds(500); + small::sleep(500); // TODO remove this after delay works }); // add specific function for job1 (calling the function from jobs intead of config allows to pass the engine and extra param) - jobs.add_job_processing_function(JobsType::kJobsType1, [](auto &j /*this jobs engine*/, const auto &jobs_items, auto b /*extra param b*/) { + jobs.config_jobs_function_processing(JobsType::kJobsType1, [&](auto &j /*this jobs engine*/, const auto &jobs_items, auto & /* config */, auto b /*extra param b*/) { for (auto &item : jobs_items) { - std::cout << "thread " << std::this_thread::get_id() - << " JOB1 processing " - << "{" - << " type=" << (int)item->type - << " req.int=" << item->request.first << "," - << " req.str=\"" << item->request.second << "\"" - << "}" - << " ref count " << item.use_count() - << " time " << small::toISOString(small::timeNow()) - << "\n"; + fn_print_item(item, "JOB1"); + + // add 2 more children jobs for current one for database and server cache + JobsEng::JobsID jobs_child_db_id{}; + JobsEng::JobsID jobs_child_cache_id{}; + + auto ret = j.queue().push_back_child(item->m_id /*parent*/, JobsType::kJobsDatabase, item->m_request, &jobs_child_db_id); + if (!ret) { + j.jobs_failed(item->m_id); + } + ret = j.queue().push_back_child(item->m_id /*parent*/, JobsType::kJobsCache, item->m_request, &jobs_child_cache_id); + if (!ret) { + j.jobs_failed(jobs_child_db_id); + j.jobs_failed(item->m_id); + } + + j.jobs_start(small::EnumPriorities::kNormal, jobs_child_db_id); + // jobs_child_cache_id has no threads to execute, it has external executors + cache_server.push_back({jobs_child_cache_id, item->m_request}); } small::sleep(30); }, 5 /*param b*/); + // TODO save type1 requests into a promises unordered_map and complete on finishing the job + // TODO add custom finish function for jobtype1 to complete the promises + + // TODO save somewhere in an unordered_map the database requests (passes as request params for job type1) + // TODO daca as vrea sa folosesc un alt job_server cum modelez asa incat jobul dintr-o parte sa ramana intr-o stare ca si cand ar avea copii si + // TODO sa se faca un request in alta parte si ala cand se termina pe finish (sau daca e worker thread in functia de procesare) sa faca set state + // TODO set state merge daca e doar o dependinta, daca sunt mai multe atunci ar tb o functie custom - childProcessing (desi are sau nu are children - sau cum fac un dummy children - poate cu thread_count 0?) + + // add specific function for job2 + jobs.config_jobs_function_processing(JobsType::kJobsType2, [&](auto &j /*this jobs engine*/, const auto &jobs_items, auto &jobs_config) { + static bool first_job = true; + for (auto &item : jobs_items) { + fn_print_item(item, "JOB2"); + + if (first_job) { + // for type 2 only database children (for demo purposes no result will be used from database) + auto ret = j.queue().push_back_and_start_child(item->m_id /*parent*/, + small::EnumPriorities::kNormal, + JobsType::kJobsDatabase, + item->m_request); + if (!ret) { + j.jobs_failed(item->m_id); + } + } else { + j.jobs_failed(item->m_id); + } + + first_job = false; + } + // TODO config to wait after request (even if it is not specified in the global config - so custom throttle) + small::sleep(30); + jobs_config.m_delay_next_request = std::chrono::milliseconds(30); + }); + + // add specific function for job2 + jobs.config_jobs_function_processing(JobsType::kJobsDatabase, [&](auto &j /*this jobs engine*/, const auto &jobs_items, auto & /* config */) { + for (auto &item : jobs_items) { + // simulate long db call + small::sleep(200); + + fn_print_item(item, "DATABASE"); + + // this job will be auto-finished + } + }); + + // TODO add function for database where demonstrate coalesce of 3 items (sleep 1000) + // TODO add function for cache server - no coalesce for demo purposes (sleep 500) so 3rd parent items is finished due to database and not cache server + + // TODO add function for custom wait children and demonstrate set progress to another item + // TODO add function for custom finished (for type1 to set the promises completed) + JobsEng::JobsID jobs_id{}; std::vector jobs_ids; - // push - jobs.queue().push_back(small::EnumPriorities::kNormal, JobsType::kJobsType1, {1, "normal"}, &jobs_id); - jobs.queue().push_back(small::EnumPriorities::kHigh, JobsType::kJobsType2, {2, "high"}, &jobs_id); + // type3 one request will succeed and one request will timeout for demo purposes + jobs.queue().push_back_and_start_delay_for(std::chrono::milliseconds(100), small::EnumPriorities::kNormal, JobsType::kJobsType3, {3, "normal3"}, &jobs_id); + jobs.queue().push_back_and_start_delay_for(std::chrono::milliseconds(100), small::EnumPriorities::kHigh, JobsType::kJobsType3, {3, "high3"}, &jobs_id); + + // type2 only the first request succeeds and waits for child the other fails from the start + jobs.queue().push_back_and_start(small::EnumPriorities::kNormal, JobsType::kJobsType2, {2, "normal2"}, &jobs_id); + jobs.queue().push_back_and_start(small::EnumPriorities::kHigh, JobsType::kJobsType2, {2, "high2"}, &jobs_id); - jobs.queue().push_back(small::EnumPriorities::kNormal, JobsType::kJobsType1, std::make_pair(3, "normal"), &jobs_id); - jobs.queue().push_back(small::EnumPriorities::kHigh, JobsType::kJobsType1, {4, "high"}, &jobs_id); - jobs.queue().push_back(small::EnumPriorities::kLow, JobsType::kJobsType1, {5, "low"}, &jobs_id); + // show coalesce for children database requests + std::unordered_map web_requests; - Request req = {6, "normal"}; - jobs.queue().push_back(small::EnumPriorities::kNormal, JobsType::kJobsType1, req, nullptr); + // TODO create a promises/futures unordered_map for type1 requests and wait later + // push with multiple variants + jobs.queue().push_back_and_start(small::EnumPriorities::kNormal, JobsType::kJobsType1, {11, "normal11"}, &jobs_id); std::vector> jobs_items = { - std::make_shared(JobsType::kJobsType1, Request{7, "highest"}), - std::make_shared(JobsType::kJobsType1, Request{8, "highest"}), + std::make_shared(JobsType::kJobsType1, Request{12, "highest12"}), }; - jobs.queue().push_back(small::EnumPriorities::kHighest, jobs_items, &jobs_ids); - jobs.queue().push_back(small::EnumPriorities::kHighest, {std::make_shared(JobsType::kJobsType1, Request{9, "highest"})}, &jobs_ids); + jobs.queue().push_back_and_start(small::EnumPriorities::kHighest, jobs_items, &jobs_ids); + + jobs.queue().push_back_and_start(small::EnumPriorities::kLow, JobsType::kJobsType1, {13, "low13"}, nullptr); + + Request req = {14, "normal14"}; + jobs.queue().push_back(JobsType::kJobsType1, req, &jobs_id); + jobs.jobs_start(small::EnumPriorities::kNormal, jobs_id); - jobs.queue().push_back_delay_for(std::chrono::milliseconds(300), small::EnumPriorities::kNormal, JobsType::kJobsType1, {100, "delay normal"}, &jobs_id); - jobs.queue().push_back_delay_until(small::timeNow() + std::chrono::milliseconds(350), small::EnumPriorities::kNormal, JobsType::kJobsType1, {101, "delay normal"}, &jobs_id); - jobs.queue().push_back_delay_for(std::chrono::milliseconds(400), small::EnumPriorities::kNormal, JobsType::kJobsType1, {102, "delay normal"}, &jobs_id); + jobs.queue().push_back_and_start_delay_for(std::chrono::milliseconds(300), small::EnumPriorities::kNormal, JobsType::kJobsType1, {115, "delay normal115"}, &jobs_id); - jobs.start_threads(3); // manual start threads + // manual start threads + jobs.start_threads(3); small::sleep(50); // jobs.signal_exit_force(); auto ret = jobs.wait_for(std::chrono::milliseconds(100)); // wait to finished std::cout << "wait for with timeout, ret = " << static_cast(ret) << " as timeout\n"; jobs.wait(); // wait here for jobs to finish due to exit flag + cache_server.signal_exit_force(); + cache_server.wait(); std::cout << "size = " << jobs.size() << "\n"; diff --git a/include/impl/jobs_engine_thread_pool_impl.h b/include/impl/jobs_engine_thread_pool_impl.h index 28d941e..cd46a5f 100644 --- a/include/impl/jobs_engine_thread_pool_impl.h +++ b/include/impl/jobs_engine_thread_pool_impl.h @@ -54,7 +54,7 @@ namespace small::jobsimpl { // config processing by job group type // this should be done in the initial setup phase once // - inline void add_job_group(const JobGroupT &job_group, const int &threads_count) + inline void config_jobs_group(const JobGroupT &job_group, const int &threads_count) { m_scheduler[job_group].m_threads_count = threads_count; } @@ -63,7 +63,7 @@ namespace small::jobsimpl { // when items are added to be processed in parent class the start scheduler should be called // to trigger action (if needed for the new job group) // - inline void job_start(const JobGroupT &job_group) + inline void jobs_schedule(const JobGroupT &job_group) { auto it = m_scheduler.find(job_group); // map is not changed, so can be access without locking if (it == m_scheduler.end()) { @@ -73,7 +73,7 @@ namespace small::jobsimpl { // even if here it is considered that there are items and something will be scheduled, // the actual check if work will still exists will be done in do_action of parent auto &stats = it->second; - job_action_start(job_group, true, stats); + jobs_action_start(job_group, true, stats); } // clang-format off @@ -129,7 +129,7 @@ namespace small::jobsimpl { // // to trigger action (if needed for the new job group) // - inline void job_action_start(const JobGroupT &job_group, const bool has_items, JobGroupStats &stats) + inline void jobs_action_start(const JobGroupT &job_group, const bool has_items, JobGroupStats &stats) { if (!has_items) { return; @@ -148,7 +148,7 @@ namespace small::jobsimpl { // // job action ended // - inline void job_action_end(const JobGroupT &job_group, const bool has_items) + inline void jobs_action_end(const JobGroupT &job_group, const bool has_items) { auto it = m_scheduler.find(job_group); // map is not changed, so can be access without locking if (it == m_scheduler.end()) { @@ -160,7 +160,7 @@ namespace small::jobsimpl { auto &stats = it->second; --stats.m_running; - job_action_start(job_group, has_items, stats); + jobs_action_start(job_group, has_items, stats); } // @@ -174,7 +174,7 @@ namespace small::jobsimpl { m_parent_caller.do_action(job_group, &has_items); // start another action - job_action_end(job_group, has_items); + jobs_action_end(job_group, has_items); } } diff --git a/include/impl/jobs_item_impl.h b/include/impl/jobs_item_impl.h index 684d5e1..007f37c 100644 --- a/include/impl/jobs_item_impl.h +++ b/include/impl/jobs_item_impl.h @@ -10,15 +10,16 @@ #include "../base_lock.h" namespace small::jobsimpl { - // a job can be in the following states + // a job can be in the following states (order is important because it may progress only to higher states) enum class EnumJobsState : unsigned int { kNone = 0, kInProgress, + kWaitChildren, kFinished, + kTimeout, kFailed, kCancelled, - kTimeout }; // a job item @@ -27,43 +28,57 @@ namespace small::jobsimpl { { using JobsID = unsigned long long; - JobsID id{}; // job unique id - JobsTypeT type{}; // job type - std::atomic state{EnumJobsState::kNone}; // job state - std::atomic progress{}; // progress 0-100 for state kInProgress - JobsRequestT request{}; // request needed for processing function - JobsResponseT response{}; // where the results are saved (for the finished callback if exists) + JobsID m_id{}; // job unique id + JobsTypeT m_type{}; // job type + std::atomic m_state{EnumJobsState::kNone}; // job state + std::atomic m_progress{}; // progress 0-100 for state kInProgress + std::atomic_bool m_has_parents{}; // for dependencies relationships parent-child + std::atomic_bool m_has_children{}; // for dependencies relationships parent-child + std::vector m_parentIDs{}; // for dependencies relationships parent-child + std::vector m_childrenIDs{}; // for dependencies relationships parent-child + JobsRequestT m_request{}; // request needed for processing function + JobsResponseT m_response{}; // where the results are saved (for the finished callback if exists) explicit jobs_item() = default; + explicit jobs_item(const JobsID &jobs_id, const JobsTypeT &jobs_type, const JobsRequestT &jobs_request) - : id(jobs_id), type(jobs_type), request(jobs_request) {} - explicit jobs_item(const JobsTypeT &jobs_type, const JobsRequestT &jobs_request) - : type(jobs_type), request(jobs_request) {} + : m_id(jobs_id), m_type(jobs_type), m_request(jobs_request) {} explicit jobs_item(const JobsID &jobs_id, const JobsTypeT &jobs_type, JobsRequestT &&jobs_request) - : id(jobs_id), type(jobs_type), request(std::forward(jobs_request)) {} + : m_id(jobs_id), m_type(jobs_type), m_request(std::forward(jobs_request)) {} + + explicit jobs_item(const JobsTypeT &jobs_type, const JobsRequestT &jobs_request) + : m_type(jobs_type), m_request(jobs_request) {} explicit jobs_item(const JobsTypeT &jobs_type, JobsRequestT &&jobs_request) - : type(jobs_type), request(std::forward(jobs_request)) {} + : m_type(jobs_type), m_request(std::forward(jobs_request)) {} jobs_item(const jobs_item &other) { operator=(other); }; jobs_item(jobs_item &&other) noexcept { operator=(other); }; jobs_item &operator=(const jobs_item &other) { - id = other.id; - type = other.type; - state = other.state.load(); - progress = other.progress.load(); - request = other.request; - response = other.response; + m_id = other.m_id; + m_type = other.m_type; + m_state = other.m_state.load(); + m_progress = other.m_progress.load(); + m_has_parents = other.m_has_parents.load(); + m_has_children = other.m_has_children.load(); + m_parentIDs = other.m_parentIDs; + m_childrenIDs = other.m_childrenIDs; + m_request = other.m_request; + m_response = other.m_response; return *this; } jobs_item &operator=(jobs_item &&other) noexcept { - id = std::move(other.id); - type = std::move(other.type); - state = other.state.load(); - progress = other.progress.load(); - request = std::move(other.request); - response = std::move(other.response); + m_id = std::move(other.m_id); + m_type = std::move(other.m_type); + m_state = other.m_state.load(); + m_progress = other.m_progress.load(); + m_has_parents = other.m_has_parents.load(); + m_has_children = other.m_has_children.load(); + m_parentIDs = std::move(other.m_parentIDs); + m_childrenIDs = std::move(other.m_childrenIDs); + m_request = std::move(other.m_request); + m_response = std::move(other.m_response); return *this; } @@ -73,31 +88,81 @@ namespace small::jobsimpl { inline void set_state(const EnumJobsState &new_state) { for (;;) { - EnumJobsState current_state = state.load(); + EnumJobsState current_state = m_state.load(); if (current_state >= new_state) { return; } - if (state.compare_exchange_weak(current_state, new_state)) { + if (m_state.compare_exchange_weak(current_state, new_state)) { return; } } } + // clang-format off + inline void set_state_inprogress () { set_state(EnumJobsState::kInProgress); } + inline void set_state_waitchildren () { set_state(EnumJobsState::kWaitChildren); } + inline void set_state_finished () { set_state(EnumJobsState::kFinished); } + inline void set_state_timeout () { set_state(EnumJobsState::kTimeout); } + inline void set_state_failed () { set_state(EnumJobsState::kFailed); } + inline void set_state_cancelled () { set_state(EnumJobsState::kCancelled); } + + static bool is_state_complete (const EnumJobsState &state) { return state >= EnumJobsState::kFinished; } + + inline EnumJobsState get_state () const { return m_state.load(); } + inline bool is_state (const EnumJobsState &state) const { return get_state() == state; } + inline bool is_complete () const { return is_state_complete(get_state()); } + + inline bool is_state_inprogress () const { return is_state(EnumJobsState::kInProgress); } + inline void is_state_waitchildren () const { return is_state(EnumJobsState::kWaitChildren); } + inline bool is_state_finished () const { return is_state(EnumJobsState::kFinished); } + inline bool is_state_timeout () const { return is_state(EnumJobsState::kTimeout); } + inline void is_state_failed () const { return is_state(EnumJobsState::kFailed); } + inline void is_state_cancelled () const { return is_state(EnumJobsState::kCancelled); } + // clang-format on + // // set job progress (can only increase) // inline void set_progress(const int &new_progress) { for (;;) { - int current_progress = progress.load(); + int current_progress = m_progress.load(); if (current_progress >= new_progress) { return; } - if (progress.compare_exchange_weak(current_progress, new_progress)) { + if (m_progress.compare_exchange_weak(current_progress, new_progress)) { return; } } } + + // + // add child + // + inline void add_child(const JobsID &child_jobs_id) + { + m_childrenIDs.push_back(child_jobs_id); // this should be set under locked area + m_has_children = true; + } + + inline bool has_children() const + { + return m_has_children.load(); + } + + // + // add parent + // + inline void add_parent(const JobsID &parent_jobs_id) + { + m_parentIDs.push_back(parent_jobs_id); // this should be set under locked area + m_has_parents = true; + } + + inline bool has_parents() const + { + return m_has_parents.load(); + } }; } // namespace small::jobsimpl diff --git a/include/impl/jobs_queue_impl.h b/include/impl/jobs_queue_impl.h index b3007ac..f6069f6 100644 --- a/include/impl/jobs_queue_impl.h +++ b/include/impl/jobs_queue_impl.h @@ -9,7 +9,7 @@ namespace small::jobsimpl { // - // small queue helper class for jobs (parent caller must implement 'jobs_activate') + // small queue helper class for jobs (parent caller must implement 'jobs_add', 'jobs_schedule', 'jobs_finished') // template class jobs_queue @@ -27,7 +27,9 @@ namespace small::jobsimpl { using TimeClock = typename small::time_queue::TimeClock; using TimeDuration = typename small::time_queue::TimeDuration; - public: + private: + friend ParentCallerT; + // // jobs_queue // @@ -83,7 +85,7 @@ namespace small::jobsimpl { // config groups // m_groups_queues will be initialized in the initial setup phase and will be accessed without locking afterwards // - inline void add_jobs_group(const JobsGroupT &job_group, const small::config_prio_queue &config_prio) + inline void config_jobs_group(const JobsGroupT &job_group, const small::config_prio_queue &config_prio) { m_groups_queues[job_group] = JobsQueue{config_prio}; } @@ -92,7 +94,7 @@ namespace small::jobsimpl { // config job types // m_types_queues will be initialized in the initial setup phase and will be accessed without locking afterwards // - inline bool add_jobs_type(const JobsTypeT &jobs_type, const JobsGroupT &jobs_group) + inline bool config_jobs_type(const JobsTypeT &jobs_type, const JobsGroupT &jobs_group) { auto it_g = m_groups_queues.find(jobs_group); if (it_g == m_groups_queues.end()) { @@ -103,35 +105,42 @@ namespace small::jobsimpl { return true; } + // + // only this part is public + // + public: // // add items to be processed - // push_back + // push_back only add the jobs item but does not start it // - inline std::size_t push_back(const JobsPrioT &priority, const JobsTypeT &jobs_type, const JobsRequestT &job_req, JobsID *jobs_id = nullptr) + inline std::size_t push_back(const JobsTypeT &jobs_type, const JobsRequestT &job_req, JobsID *jobs_id) { - return push_back(priority, std::make_shared(jobs_type, job_req), jobs_id); + // this job should be manually started by calling jobs_start + return push_back(std::make_shared(jobs_type, job_req), jobs_id); } - inline std::size_t push_back(const JobsPrioT &priority, std::shared_ptr jobs_item, JobsID *jobs_id = nullptr) + inline std::size_t push_back(std::shared_ptr jobs_item, JobsID *jobs_id) { if (is_exit()) { return 0; } + // this job should be manually started by calling jobs_start auto id = jobs_add(jobs_item); if (jobs_id) { *jobs_id = id; } - return jobs_activate(priority, jobs_item->type, id); + return 1; } - inline std::size_t push_back(const JobsPrioT &priority, const std::vector> &jobs_items, std::vector *jobs_ids) + inline std::size_t push_back(const std::vector> &jobs_items, std::vector *jobs_ids) { if (is_exit()) { return 0; } + // this jobs should be manually started by calling jobs_start std::unique_lock l(m_lock); std::size_t count = 0; @@ -141,7 +150,7 @@ namespace small::jobsimpl { } JobsID jobs_id{}; for (auto &jobs_item : jobs_items) { - auto ret = push_back(priority, jobs_item, &jobs_id); + auto ret = push_back(jobs_item, &jobs_id); if (ret) { if (jobs_ids) { jobs_ids->push_back(jobs_id); @@ -153,9 +162,199 @@ namespace small::jobsimpl { } // push_back move semantics - inline std::size_t push_back(const JobsPrioT &priority, const JobsTypeT &jobs_type, JobsRequestT &&jobs_req, JobsID *jobs_id = nullptr) + inline std::size_t push_back(const JobsTypeT &jobs_type, JobsRequestT &&jobs_req, JobsID *jobs_id) + { + // this job should be manually started by calling jobs_start + return push_back(std::make_shared(jobs_type, std::forward(jobs_req)), jobs_id); + } + + // + // push back and start the job + // + inline std::size_t push_back_and_start(const JobsPrioT &priority, const JobsTypeT &jobs_type, const JobsRequestT &job_req, JobsID *jobs_id = nullptr) + { + return push_back_and_start(priority, std::make_shared(jobs_type, job_req), jobs_id); + } + + inline std::size_t push_back_and_start(const JobsPrioT &priority, std::shared_ptr jobs_item, JobsID *jobs_id = nullptr) + { + std::unique_lock l(m_lock); + + JobsID id{}; + auto ret = push_back(jobs_item, &id); + if (!ret) { + return ret; + } + + if (jobs_id) { + *jobs_id = id; + } + + // start the job + return jobs_start(priority, jobs_item->m_type, id); + } + + inline std::size_t push_back_and_start(const JobsPrioT &priority, const std::vector> &jobs_items, std::vector *jobs_ids = nullptr) + { + if (is_exit()) { + return 0; + } + + std::unique_lock l(m_lock); + + std::vector ids; + + auto ret = push_back(jobs_items, &ids); + if (!ret) { + return ret; + } + + // start the jobs + jobs_start(priority, ids); + + if (jobs_ids) { + *jobs_ids = std::move(ids); + } + + return ret; + } + + // push_back move semantics + inline std::size_t push_back_and_start(const JobsPrioT &priority, const JobsTypeT &jobs_type, JobsRequestT &&jobs_req, JobsID *jobs_id = nullptr) + { + return push_back_and_start(priority, std::make_shared(jobs_type, std::forward(jobs_req)), jobs_id); + } + + // + // helper push_back a new job child and link with the parent + // + inline std::size_t push_back_child(const JobsID &parent_jobs_id, const JobsTypeT &child_jobs_type, const JobsRequestT &child_job_req, JobsID *child_jobs_id) + { + // this job should be manually started by calling jobs_start + return push_back_child(parent_jobs_id, std::make_shared(child_jobs_type, child_job_req), child_jobs_id); + } + + inline std::size_t push_back_child(const JobsID &parent_jobs_id, std::shared_ptr child_jobs_item, JobsID *child_jobs_id) + { + if (is_exit()) { + return 0; + } + + std::unique_lock l(m_lock); + + auto *parent_jobs_item = jobs_get(parent_jobs_id); + if (!parent_jobs_item) { + return 0; + } + + // this job should be manually started by calling jobs_start + JobsID id{}; + auto ret = push_back(child_jobs_item, &id); + if (!ret) { + return ret; + } + + if (child_jobs_id) { + *child_jobs_id = id; + } + + jobs_parent_child(*parent_jobs_item, child_jobs_item); + return 1; + } + + inline std::size_t push_back_child(const JobsID &parent_jobs_id, const std::vector> &children_jobs_items, std::vector *children_jobs_ids) + { + if (is_exit()) { + return 0; + } + + // this job should be manually started by calling jobs_start + std::unique_lock l(m_lock); + + std::size_t count = 0; + if (children_jobs_ids) { + children_jobs_ids->reserve(children_jobs_items.size()); + children_jobs_ids->clear(); + } + JobsID child_jobs_id{}; + for (auto &child_jobs_item : children_jobs_items) { + auto ret = push_back_child(parent_jobs_id, child_jobs_item, &child_jobs_id); + if (ret) { + if (children_jobs_ids) { + children_jobs_ids->push_back(child_jobs_id); + } + } + count += ret; + } + return count; + } + + // push_back_child move semantics + inline std::size_t push_back_child(const JobsID &parent_jobs_id, const JobsTypeT &child_jobs_type, JobsRequestT &&child_jobs_req, JobsID *child_jobs_id) + { + // this job should be manually started by calling jobs_start + return push_back_child(parent_jobs_id, std::make_shared(child_jobs_type, std::forward(child_jobs_req)), child_jobs_id); + } + + // + // helper push_back a new job child and link with the parent and start the child job + // + inline std::size_t push_back_and_start_child(const JobsID &parent_jobs_id, const JobsPrioT &child_priority, const JobsTypeT &child_jobs_type, const JobsRequestT &child_job_req, JobsID *child_jobs_id = nullptr) + { + return push_back_and_start_child(parent_jobs_id, child_priority, std::make_shared(child_jobs_type, child_job_req), child_jobs_id); + } + + inline std::size_t push_back_and_start_child(const JobsID &parent_jobs_id, const JobsPrioT &child_priority, std::shared_ptr child_jobs_item, JobsID *child_jobs_id = nullptr) + { + if (is_exit()) { + return 0; + } + + std::unique_lock l(m_lock); + + JobsID id{}; + auto ret = push_back_child(parent_jobs_id, child_jobs_item, &id); + if (!ret) { + return ret; + } + + if (child_jobs_id) { + *child_jobs_id = id; + } + + // start the job + return jobs_start(child_priority, child_jobs_item->m_type, id); + } + + inline std::size_t push_back_and_start_child(const JobsID &parent_jobs_id, const JobsPrioT &children_priority, const std::vector> &children_jobs_items, std::vector *children_jobs_ids = nullptr) + { + if (is_exit()) { + return 0; + } + + std::unique_lock l(m_lock); + + std::vector ids; + + auto ret = push_back_child(parent_jobs_id, children_jobs_items, &ids); + if (!ret) { + return ret; + } + + // start the jobs + jobs_start(children_priority, ids); + + if (children_jobs_ids) { + *children_jobs_ids = std::move(ids); + } + + return ret; + } + + // push_back move semantics + inline std::size_t push_back_and_start_child(const JobsID &parent_jobs_id, const JobsPrioT &child_priority, const JobsTypeT &child_jobs_type, JobsRequestT &&child_jobs_req, JobsID *child_jobs_id = nullptr) { - return push_back(priority, std::make_shared(jobs_type, std::forward(jobs_req)), jobs_id); + return push_back_and_start_child(parent_jobs_id, child_priority, std::make_shared(child_jobs_type, std::forward(child_jobs_req)), child_jobs_id); } // no emplace_back do to returning the jobs_id @@ -164,47 +363,58 @@ namespace small::jobsimpl { // push_back with specific timeings // template - inline std::size_t push_back_delay_for(const std::chrono::duration<_Rep, _Period> &__rtime, const JobsPrioT &priority, const JobsTypeT &jobs_type, const JobsRequestT &jobs_req, JobsID *jobs_id = nullptr) + inline std::size_t push_back_and_start_delay_for(const std::chrono::duration<_Rep, _Period> &__rtime, const JobsPrioT &priority, const JobsTypeT &jobs_type, const JobsRequestT &jobs_req, JobsID *jobs_id = nullptr) { - return push_back_delay_for(__rtime, priority, std::make_shared(jobs_type, jobs_req), jobs_id); + return push_back_and_start_delay_for(__rtime, priority, std::make_shared(jobs_type, jobs_req), jobs_id); } template - inline std::size_t push_back_delay_for(const std::chrono::duration<_Rep, _Period> &__rtime, const JobsPrioT &priority, std::shared_ptr jobs_item, JobsID *jobs_id = nullptr) + inline std::size_t push_back_and_start_delay_for(const std::chrono::duration<_Rep, _Period> &__rtime, const JobsPrioT &priority, std::shared_ptr jobs_item, JobsID *jobs_id = nullptr) { + if (is_exit()) { + return 0; + } + auto id = jobs_add(jobs_item); if (jobs_id) { *jobs_id = id; } - return m_delayed_items.queue().push_delay_for(__rtime, {priority, jobs_item->type, id}); + return m_delayed_items.queue().push_delay_for(__rtime, {priority, jobs_item->m_type, id}); } template - inline std::size_t push_back_delay_for(const std::chrono::duration<_Rep, _Period> &__rtime, const JobsPrioT &priority, const JobsTypeT &jobs_type, JobsRequestT &&jobs_req, JobsID *jobs_id = nullptr) + inline std::size_t push_back_and_start_delay_for(const std::chrono::duration<_Rep, _Period> &__rtime, const JobsPrioT &priority, const JobsTypeT &jobs_type, JobsRequestT &&jobs_req, JobsID *jobs_id = nullptr) { - return push_back_delay_for(__rtime, priority, std::make_shared(jobs_type, std::forward(jobs_req)), jobs_id); + return push_back_and_start_delay_for(__rtime, priority, std::make_shared(jobs_type, std::forward(jobs_req)), jobs_id); } // avoid time_casting from one clock to another // template // - inline std::size_t push_back_delay_until(const std::chrono::time_point &__atime, const JobsPrioT &priority, const JobsTypeT &jobs_type, const JobsRequestT &jobs_req, JobsID *jobs_id = nullptr) + inline std::size_t push_back_and_start_delay_until(const std::chrono::time_point &__atime, const JobsPrioT &priority, const JobsTypeT &jobs_type, const JobsRequestT &jobs_req, JobsID *jobs_id = nullptr) { - return push_back_delay_until(__atime, priority, std::make_shared(jobs_type, jobs_req), jobs_id); + return push_back_and_start_delay_until(__atime, priority, std::make_shared(jobs_type, jobs_req), jobs_id); } - inline std::size_t push_back_delay_until(const std::chrono::time_point &__atime, const JobsPrioT &priority, std::shared_ptr jobs_item, JobsID *jobs_id = nullptr) + inline std::size_t push_back_and_start_delay_until(const std::chrono::time_point &__atime, const JobsPrioT &priority, std::shared_ptr jobs_item, JobsID *jobs_id = nullptr) { + if (is_exit()) { + return 0; + } + auto id = jobs_add(jobs_item); if (jobs_id) { *jobs_id = id; } - return m_delayed_items.queue().push_delay_until(__atime, {priority, jobs_item->type, id}); + return m_delayed_items.queue().push_delay_until(__atime, {priority, jobs_item->m_type, id}); } - inline std::size_t push_back_delay_until(const std::chrono::time_point &__atime, const JobsPrioT &priority, const JobsTypeT &jobs_type, JobsRequestT &&jobs_req, JobsID *jobs_id = nullptr) + inline std::size_t push_back_and_start_delay_until(const std::chrono::time_point &__atime, const JobsPrioT &priority, const JobsTypeT &jobs_type, JobsRequestT &&jobs_req, JobsID *jobs_id = nullptr) { - return push_back_delay_until(__atime, priority, std::make_shared(jobs_type, std::forward(jobs_req)), jobs_id); + return push_back_and_start_delay_until(__atime, priority, std::make_shared(jobs_type, std::forward(jobs_req)), jobs_id); } + // TODO add push_back_child_....() + + private: // clang-format off // // signal exit @@ -284,17 +494,29 @@ namespace small::jobsimpl { } private: - friend ParentCallerT; - // - // get group queue + // get jobs group queue // - inline JobsQueue *get_group_queue(const JobsGroupT &jobs_group) + inline JobsQueue *get_jobs_group_queue(const JobsGroupT &jobs_group) { auto it = m_groups_queues.find(jobs_group); return it != m_groups_queues.end() ? &it->second : nullptr; } + // + // get jobs type queue + // + inline JobsQueue *get_jobs_type_queue(const JobsTypeT &jobs_type) + { + // optimization to get the queue from the type + // (instead of getting the group from type from m_config.m_types and then getting the queue from the m_groups_queues) + auto it = m_types_queues.find(jobs_type); + return it != m_types_queues.end() ? it->second : nullptr; + } + + // + // get job items + // inline std::vector> jobs_get(const std::vector &jobs_ids) { std::vector> jobs_items; @@ -313,22 +535,19 @@ namespace small::jobsimpl { return jobs_items; // will be moved } - inline void jobs_del(const JobsID &jobs_id) + inline std::shared_ptr *jobs_get(const JobsID &jobs_id) { std::unique_lock l(m_lock); - m_jobs.erase(jobs_id); - } - private: - // some prevention - jobs_queue(const jobs_queue &) = delete; - jobs_queue(jobs_queue &&) = delete; - jobs_queue &operator=(const jobs_queue &) = delete; - jobs_queue &operator=(jobs_queue &&__t) = delete; + auto it_j = m_jobs.find(jobs_id); + if (it_j == m_jobs.end()) { + return nullptr; + } + return &it_j->second; + } - private: // - // add job items + // add jobs item // inline JobsID jobs_add(std::shared_ptr jobs_item) { @@ -336,36 +555,122 @@ namespace small::jobsimpl { ++m_jobs_seq_id; - JobsID id = m_jobs_seq_id; - jobs_item->id = id; + JobsID id = m_jobs_seq_id; + jobs_item->m_id = id; m_jobs.emplace(id, jobs_item); + // call parent for extra processing + m_parent_caller.jobs_add(jobs_item); + return id; } - // activate the jobs - inline std::size_t jobs_activate(const JobsPrioT &priority, const JobsTypeT &jobs_type, const JobsID &jobs_id) + // + // start the jobs + // + inline std::size_t jobs_start(const JobsPrioT &priority, const std::vector &jobs_ids) + { + std::size_t count = 0; + auto jobs_items = jobs_get(jobs_ids); + for (auto &jobs_item : jobs_items) { + auto ret = jobs_start(priority, jobs_item->m_type, jobs_item->m_id); + if (ret) { + ++count; + } + } + return count; + } + + inline std::size_t jobs_start(const JobsPrioT &priority, const JobsID &jobs_id) + { + auto *jobs_item = jobs_get(jobs_id); + if (!jobs_item) { + return 0; + } + return jobs_start(priority, (*jobs_item)->m_type, (*jobs_item)->m_id); + } + + inline std::size_t jobs_start(const JobsPrioT &priority, const JobsTypeT &jobs_type, const JobsID &jobs_id) { std::size_t ret = 0; - // optimization to get the queue from the type - // (instead of getting the group from type from m_config.m_types and then getting the queue from the m_groups_queues) - auto it_q = m_types_queues.find(jobs_type); - if (it_q != m_types_queues.end()) { - auto *q = it_q->second; - ret = q->push_back(priority, jobs_id); + auto *q = get_jobs_type_queue(jobs_type); + if (q) { + ret = q->push_back(priority, jobs_id); } if (ret) { - m_parent_caller.jobs_activate(jobs_type, jobs_id); + m_parent_caller.jobs_schedule(jobs_type, jobs_id); } else { - jobs_del(jobs_id); + // TODO call m_parent.jobs_failed(jobs_id)? // jobs_start should not be under lock then + jobs_erase(jobs_id); } return ret; } + // + // erase jobs item + // + inline void jobs_erase(const JobsID &jobs_id) + { + std::unique_lock l(m_lock); + + auto jobs_item = jobs_get(jobs_id); + if (!jobs_item) { + // already deleted + return; + } + // if not a final state, set it to cancelled (in case it is executing at this point) + if (!JobsItem::is_state_complete((*jobs_item)->get_state())) { + (*jobs_item)->set_state_cancelled(); + } + + m_jobs.erase(jobs_id); + + // delete all children + for (auto &child_jobs_id : (*jobs_item)->m_childrenIDs) { + jobs_erase(child_jobs_id); + } + } + + // + // set relationship parent-child + // + inline std::size_t jobs_parent_child(const JobsID &parent_jobs_id, const JobsID &child_jobs_id) + { + std::unique_lock l(m_lock); + + auto *parent_jobs_item = jobs_get(parent_jobs_id); + if (!parent_jobs_item) { + return 0; + } + auto *child_jobs_item = jobs_get(child_jobs_id); + if (!child_jobs_item) { + return 0; + } + + jobs_parent_child(*parent_jobs_item, *child_jobs_item); + return 1; + } + + inline void jobs_parent_child(std::shared_ptr parent_jobs_item, std::shared_ptr child_jobs_item) + { + std::unique_lock l(m_lock); + parent_jobs_item->add_child(child_jobs_item->m_id); + child_jobs_item->add_parent(parent_jobs_item->m_id); + } + + private: + // some prevention + jobs_queue(const jobs_queue &) = delete; + jobs_queue(jobs_queue &&) = delete; + jobs_queue &operator=(const jobs_queue &) = delete; + jobs_queue &operator=(jobs_queue &&__t) = delete; + + private: // // inner thread function for delayed items + // called from m_delayed_items // friend JobQueueDelayedT; @@ -373,7 +678,7 @@ namespace small::jobsimpl { { std::size_t count = 0; for (auto &[priority, jobs_type, jobs_id] : items) { - count += jobs_activate(priority, jobs_type, jobs_id); + count += jobs_start(priority, jobs_type, jobs_id); } return count; } diff --git a/include/jobs_config.h b/include/jobs_config.h index 81d95dc..1b3dffb 100644 --- a/include/jobs_config.h +++ b/include/jobs_config.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include "prio_queue.h" @@ -16,8 +17,7 @@ namespace small { template struct jobs_config { - using JobsItem = typename small::jobsimpl::jobs_item; - using ProcessingFunction = std::function> &)>; + using JobsItem = typename small::jobsimpl::jobs_item; // config for the entire jobs engine struct ConfigJobsEngine @@ -26,48 +26,125 @@ namespace small { small::config_prio_queue m_config_prio{}; }; + // config for the job group (where job types can be grouped) + struct ConfigJobsGroup + { + int m_threads_count{1}; // how many threads for processing (out of the global threads) + int m_bulk_count{1}; // how many objects are processed at once + std::optional m_delay_next_request{}; // if need to delay the next request processing to have some throtelling + }; + + // to be passed to processing function + struct ConfigProcessing + { + std::optional m_delay_next_request{}; // if need to delay the next request processing to have some throtelling + }; + + // functions + using FunctionProcessing = std::function> &, ConfigProcessing &config)>; + using FunctionOnChildrenFinished = std::function> &)>; + using FunctionFinished = std::function> &)>; + // config for an individual job type struct ConfigJobsType { - JobsGroupT m_group{}; // job type group (multiple job types can be configured to same group) - bool m_has_processing_function{false}; // use default processing function - ProcessingFunction m_processing_function{}; // processing Function + JobsGroupT m_group{}; // job type group (multiple job types can be configured to same group) + std::optional m_timeout{}; // if need to delay the next request processing to have some throtelling + bool m_has_function_processing{false}; // use default processing function + bool m_has_function_children_finished{false}; // use default function for children finished + bool m_has_function_finished{false}; // use default finished function + FunctionProcessing m_function_processing{}; // processing Function for jobs items + FunctionOnChildrenFinished m_function_children_finished{}; // function called for a parent when a child is finished + FunctionFinished m_function_finished{}; // function called when jobs items are finished }; - // config for the job group (where job types can be grouped) - struct ConfigJobsGroup + ConfigJobsEngine m_engine{}; // config for entire engine (threads, priorities, etc) + FunctionProcessing m_default_function_processing{}; // default processing function + FunctionOnChildrenFinished m_default_function_children_finished{}; // default function to call for a parent when children are finished + FunctionFinished m_default_function_finished{}; // default function to call when jobs items are finished + std::unordered_map m_groups; // config by jobs group + std::unordered_map m_types; // config by jobs type + + // + // add default processing function + // + inline void config_default_function_processing(FunctionProcessing function_processing) { - int m_threads_count{1}; // how many threads for processing (out of the global threads) - int m_bulk_count{1}; // how many objects are processed at once - }; + m_default_function_processing = function_processing; + apply_default_function_processing(); + } + + inline void config_default_function_children_finished(FunctionOnChildrenFinished function_children_finished) + { + m_default_function_children_finished = function_children_finished; + apply_default_function_children_finished(); + } - ConfigJobsEngine m_engine{}; // config for entire engine (threads, priorities, etc) - ProcessingFunction m_default_processing_function{}; // default processing function - std::unordered_map m_groups; // config by jobs group - std::unordered_map m_types; // config by jobs type + inline void config_default_function_finished(FunctionFinished function_finished) + { + m_default_function_finished = function_finished; + apply_default_function_finished(); + } + + // + // add job functions + // + inline void config_jobs_function_processing(const JobsTypeT &jobs_type, FunctionProcessing function_processing) + { + auto it_f = m_types.find(jobs_type); + if (it_f == m_types.end()) { + return; + } + it_f->second.m_has_function_processing = true; + it_f->second.m_function_processing = function_processing; + } - // default processing function - inline void add_default_processing_function(ProcessingFunction processing_function) + inline void config_jobs_function_children_finished(const JobsTypeT &jobs_type, FunctionOnChildrenFinished function_children_finished) { - m_default_processing_function = processing_function; - apply_default_processing_function(); + auto it_f = m_types.find(jobs_type); + if (it_f == m_types.end()) { + return; + } + it_f->second.m_has_function_children_finished = true; + it_f->second.m_function_children_finished = function_children_finished; } - inline void add_job_processing_function(const JobsTypeT &jobs_type, ProcessingFunction processing_function) + inline void config_jobs_function_finished(const JobsTypeT &jobs_type, FunctionFinished function_finished) { auto it_f = m_types.find(jobs_type); if (it_f == m_types.end()) { return; } - it_f->second.m_has_processing_function = true; - it_f->second.m_processing_function = processing_function; + it_f->second.m_has_function_finished = true; + it_f->second.m_function_finished = function_finished; + } + + // + // apply default function where it is not set a specific one + // + inline void apply_default_function_processing() + { + for (auto &[type, jobs_type_config] : m_types) { + if (jobs_type_config.m_has_function_processing == false) { + jobs_type_config.m_function_processing = m_default_function_processing; + } + } + } + + inline void apply_default_function_children_finished() + { + for (auto &[type, jobs_type_config] : m_types) { + if (jobs_type_config.m_has_function_children_finished == false) { + jobs_type_config.m_function_children_finished = m_default_function_children_finished; + } + } } - inline void apply_default_processing_function() + inline void apply_default_function_finished() { for (auto &[type, jobs_type_config] : m_types) { - if (jobs_type_config.m_has_processing_function == false) { - jobs_type_config.m_processing_function = m_default_processing_function; + if (jobs_type_config.m_has_function_finished == false) { + jobs_type_config.m_function_finished = m_default_function_finished; } } } diff --git a/include/jobs_engine.h b/include/jobs_engine.h index 9fb17b0..fc711c5 100644 --- a/include/jobs_engine.h +++ b/include/jobs_engine.h @@ -33,14 +33,14 @@ // // create jobs engine // JobsEng jobs(config); // -// jobs.add_default_processing_function([](auto &j /*this jobs engine*/, const auto &jobs_items) { +// jobs.config_default_function_processing([](auto &j /*this jobs engine*/, const auto &jobs_items) { // for (auto &item : jobs_items) { // ... // } // }); // // // add specific function for job1 -// jobs.add_job_processing_function(JobsType::kJobsType1, [](auto &j /*this jobs engine*/, const auto &jobs_items, auto b /*extra param b*/) { +// jobs.config_jobs_function_processing(JobsType::kJobsType1, [](auto &j /*this jobs engine*/, const auto &jobs_items, auto b /*extra param b*/) { // for (auto &item : jobs_items) { // ... // } @@ -69,14 +69,16 @@ namespace small { class jobs_engine { public: - using ThisJobsEngine = small::jobs_engine; - using JobsConfig = small::jobs_config; - using JobsItem = small::jobsimpl::jobs_item; - using JobsQueue = small::jobsimpl::jobs_queue; - using JobsID = typename JobsItem::JobsID; - using TimeClock = typename JobsQueue::TimeClock; - using TimeDuration = typename JobsQueue::TimeDuration; - using ProcessingFunction = typename JobsConfig::ProcessingFunction; + using ThisJobsEngine = typename small::jobs_engine; + using JobsConfig = typename small::jobs_config; + using JobsItem = typename small::jobsimpl::jobs_item; + using JobsQueue = typename small::jobsimpl::jobs_queue; + using JobsID = typename JobsItem::JobsID; + using TimeClock = typename JobsQueue::TimeClock; + using TimeDuration = typename JobsQueue::TimeDuration; + using FunctionProcessing = typename JobsConfig::FunctionProcessing; + using FunctionOnChildrenFinished = typename JobsConfig::FunctionOnChildrenFinished; + using FunctionFinished = typename JobsConfig::FunctionFinished; public: // @@ -106,7 +108,7 @@ namespace small { // clear inline void clear() { - std::unique_lock l(m_queue); + std::unique_lock l(*this); m_queue.clear(); m_thread_pool.clear(); } @@ -141,6 +143,7 @@ namespace small { { m_config.m_engine.m_threads_count = threads_count; m_queue.start_threads(threads_count); + m_timeout_queue.start_threads(); m_thread_pool.start_threads(threads_count); } @@ -160,29 +163,165 @@ namespace small { apply_config(); } - // processing function + // override default jobs function template - inline void add_default_processing_function(_Callable processing_function, Args... extra_parameters) + inline void config_default_function_processing(_Callable function_processing, Args... extra_parameters) { - m_config.add_default_processing_function(std::bind(std::forward<_Callable>(processing_function), std::ref(*this), std::placeholders::_1 /*jobs_items*/, std::forward(extra_parameters)...)); + m_config.config_default_function_processing(std::bind(std::forward<_Callable>(function_processing), std::ref(*this), std::placeholders::_1 /*jobs_items*/, std::placeholders::_2 /*config*/, std::forward(extra_parameters)...)); } template - inline void add_job_processing_function(const JobsTypeT &jobs_type, _Callable processing_function, Args... extra_parameters) + inline void config_default_function_children_finished(_Callable function_children_finished, Args... extra_parameters) { - m_config.add_job_processing_function(jobs_type, std::bind(std::forward<_Callable>(processing_function), std::ref(*this), std::placeholders::_1 /*jobs_items*/, std::forward(extra_parameters)...)); + m_config.config_default_function_children_finished(std::bind(std::forward<_Callable>(function_children_finished), std::ref(*this), std::placeholders::_1 /*jobs_items*/, std::placeholders::_2 /*config*/, std::forward(extra_parameters)...)); } + template + inline void config_default_function_finished(_Callable function_finished, Args... extra_parameters) + { + m_config.config_default_function_finished(std::bind(std::forward<_Callable>(function_finished), std::ref(*this), std::placeholders::_1 /*jobs_items*/, std::placeholders::_2 /*config*/, std::forward(extra_parameters)...)); + } + + // specific jobs functions + template + inline void config_jobs_function_processing(const JobsTypeT &jobs_type, _Callable function_processing, Args... extra_parameters) + { + m_config.config_jobs_function_processing(jobs_type, std::bind(std::forward<_Callable>(function_processing), std::ref(*this), std::placeholders::_1 /*jobs_items*/, std::placeholders::_2 /*config*/, std::forward(extra_parameters)...)); + } + + template + inline void config_jobs_function_children_finished(const JobsTypeT &jobs_type, _Callable function_children_finished, Args... extra_parameters) + { + m_config.config_jobs_function_children_finished(jobs_type, std::bind(std::forward<_Callable>(function_children_finished), std::ref(*this), std::placeholders::_1 /*jobs_items*/, std::forward(extra_parameters)...)); + } + + template + inline void config_jobs_function_finished(const JobsTypeT &jobs_type, _Callable function_finished, Args... extra_parameters) + { + m_config.config_jobs_function_finished(jobs_type, std::bind(std::forward<_Callable>(function_finished), std::ref(*this), std::placeholders::_1 /*jobs_items*/, std::forward(extra_parameters)...)); + } + + // + // jobs functions + // + // - // queue access + // add items to jobs queue // inline JobsQueue &queue() { return m_queue; } + // + // start schedule jobs items + // + inline std::size_t jobs_start(const JobsPrioT &priority, const JobsID &jobs_id) + { + return queue().jobs_start(priority, jobs_id); + } + + inline std::size_t jobs_start(const JobsPrioT &priority, const std::vector &jobs_ids) + { + return queue().jobs_start(priority, jobs_ids); + } + + // + // get job items + // + inline std::shared_ptr *jobs_get(const JobsID &jobs_id) + { + return queue().jobs_get(jobs_id); + } + + inline std::vector> jobs_get(const std::vector &jobs_ids) + { + return queue().jobs_get(jobs_ids); + } + + // + // set relationship parent-child + // + inline std::size_t jobs_parent_child(const JobsID &parent_jobs_id, const JobsID &child_jobs_id) + { + return queue().jobs_parent_child(parent_jobs_id, child_jobs_id); + } + + inline void jobs_parent_child(std::shared_ptr parent_jobs_item, std::shared_ptr child_jobs_item) + { + return queue().jobs_parent_child(parent_jobs_item, child_jobs_item); + } + + // + // set jobs state + // + inline bool jobs_progress(const JobsID &jobs_id, const int &progress) + { + return jobs_set_progress(jobs_id, progress); + } + + inline bool jobs_response(const JobsID &jobs_id, const JobsResponseT &jobs_response) + { + return jobs_set_response(jobs_id, jobs_response); + } + inline bool jobs_response(const JobsID &jobs_id, JobsResponseT &&jobs_response) + { + return jobs_set_response(jobs_id, jobs_response); + } + + inline bool jobs_finished(const JobsID &jobs_id) + { + return jobs_set_state(jobs_id, small::jobsimpl::EnumJobsState::kFinished); + } + inline bool jobs_finished(const std::vector &jobs_ids) + { + return jobs_set_state(jobs_ids, small::jobsimpl::EnumJobsState::kFinished); + } + inline bool jobs_finished(const JobsID &jobs_id, const JobsResponseT &jobs_response) + { + return jobs_set_state(jobs_id, small::jobsimpl::EnumJobsState::kFinished, jobs_response); + } + inline bool jobs_finished(const JobsID &jobs_id, JobsResponseT &&jobs_response) + { + return jobs_set_state(jobs_id, small::jobsimpl::EnumJobsState::kFinished, std::forward(jobs_response)); + } + + inline bool jobs_failed(const JobsID &jobs_id) + { + return jobs_set_state(jobs_id, small::jobsimpl::EnumJobsState::kFailed); + } + inline bool jobs_failed(const std::vector &jobs_ids) + { + return jobs_set_state(jobs_ids, small::jobsimpl::EnumJobsState::kFailed); + } + inline bool jobs_failed(const JobsID &jobs_id, const JobsResponseT &jobs_response) + { + return jobs_set_state(jobs_id, small::jobsimpl::EnumJobsState::kFailed, jobs_response); + } + inline bool jobs_failed(const JobsID &jobs_id, JobsResponseT &&jobs_response) + { + return jobs_set_state(jobs_id, small::jobsimpl::EnumJobsState::kFailed, std::forward(jobs_response)); + } + + inline bool jobs_cancelled(const JobsID &jobs_id) + { + return jobs_set_state(jobs_id, small::jobsimpl::EnumJobsState::kCancelled); + } + inline bool jobs_cancelled(const std::vector &jobs_ids) + { + return jobs_set_state(jobs_ids, small::jobsimpl::EnumJobsState::kCancelled); + } + inline bool jobs_cancelled(const JobsID &jobs_id, const JobsResponseT &jobs_response) + { + return jobs_set_state(jobs_id, small::jobsimpl::EnumJobsState::kCancelled, jobs_response); + } + inline bool jobs_cancelled(const JobsID &jobs_id, JobsResponseT &&jobs_response) + { + return jobs_set_state(jobs_id, small::jobsimpl::EnumJobsState::kCancelled, std::forward(jobs_response)); + } + // clang-format off // // signal exit // - inline void signal_exit_force () { m_thread_pool.signal_exit_force(); m_queue.signal_exit_force(); } + inline void signal_exit_force () { m_thread_pool.signal_exit_force(); m_timeout_queue.queue().signal_exit_force(); m_queue.signal_exit_force(); } inline void signal_exit_when_done () { m_queue.signal_exit_when_done(); /*when the delayed will be finished will signal the active queue items to exit when done, then the processing pool */ } // to be used in processing function @@ -199,8 +338,16 @@ namespace small { // first wait for queue items to finish m_queue.wait(); + // TODO wait for not finished items to be finished (some are finished by external) + // only now can signal exit when done for workers (when no more items exists) - return m_thread_pool.wait(); + m_thread_pool.wait(); + + // the timeouts are no longer necessay, force exit + m_timeout_queue.queue().signal_exit_force(); + m_timeout_queue.wait(); + + return small::EnumLock::kExit; } // wait some time then signal exit @@ -227,8 +374,19 @@ namespace small { return small::EnumLock::kTimeout; } + // TODO wait for not finished items to be finished (some are finished by external) + // only now can signal exit when done for workers (when no more items exists) - return m_thread_pool.wait_until(__atime); + delayed_status = m_thread_pool.wait_until(__atime); + if (delayed_status == small::EnumLock::kTimeout) { + return small::EnumLock::kTimeout; + } + + // the timeouts are no longer necessay, force exit + m_timeout_queue.queue().signal_exit_force(); + m_timeout_queue.wait(); + + return small::EnumLock::kExit; } private: @@ -246,14 +404,24 @@ namespace small { { // setup jobs groups for (auto &[jobs_group, jobs_group_config] : m_config.m_groups) { - m_queue.add_jobs_group(jobs_group, m_config.m_engine.m_config_prio); - m_thread_pool.add_job_group(jobs_group, jobs_group_config.m_threads_count); + m_queue.config_jobs_group(jobs_group, m_config.m_engine.m_config_prio); + m_thread_pool.config_jobs_group(jobs_group, jobs_group_config.m_threads_count); } // setup jobs types - m_config.apply_default_processing_function(); + if (!m_config.m_default_function_finished) { + m_config.m_default_function_finished = std::bind(&jobs_engine::jobs_on_finished, this, std::placeholders::_1 /*jobs_items*/); + } + if (!m_config.m_default_function_children_finished) { + m_config.m_default_function_children_finished = std::bind(&jobs_engine::jobs_on_children_finished, this, std::placeholders::_1 /*jobs_items*/); + } + + m_config.apply_default_function_processing(); + m_config.apply_default_function_children_finished(); + m_config.apply_default_function_finished(); + for (auto &[jobs_type, jobs_type_config] : m_config.m_types) { - m_queue.add_jobs_type(jobs_type, jobs_type_config.m_group); + m_queue.config_jobs_type(jobs_type, jobs_type_config.m_group); } // auto start threads if count > 0 otherwise threads should be manually started @@ -263,7 +431,7 @@ namespace small { } // - // inner thread function for executing items (should return if there are more items) + // inner thread function for executing items (should return if there are more items) (called from thread_pool) // friend small::jobsimpl::jobs_engine_thread_pool; @@ -279,8 +447,12 @@ namespace small { int bulk_count = std::max(it_cfg_grp->second.m_bulk_count, 1); + // delay request + typename JobsConfig::ConfigProcessing group_config{}; + group_config.m_delay_next_request = it_cfg_grp->second.m_delay_next_request; + // get items to process - auto *q = m_queue.get_group_queue(jobs_group); + auto *q = m_queue.get_jobs_group_queue(jobs_group); if (!q) { return small::EnumLock::kExit; } @@ -299,8 +471,14 @@ namespace small { // get jobs std::vector> jobs_items = m_queue.jobs_get(vec_ids); for (auto &jobs_item : jobs_items) { - elems_by_type[jobs_item->type].reserve(jobs_items.size()); - elems_by_type[jobs_item->type].push_back(jobs_item); + elems_by_type[jobs_item->m_type].reserve(jobs_items.size()); + + // mark the item as in progress + jobs_item->set_state_inprogress(); + // execute if it is still in progress (may be moved to higher states due to external factors like cancel, timeout, finish early due to other job, etc) + if (jobs_item->is_state_inprogress()) { + elems_by_type[jobs_item->m_type].push_back(jobs_item); + } } } @@ -312,24 +490,312 @@ namespace small { } // process specific jobs by type - it_cfg_type->second.m_processing_function(jobs); - } + typename JobsConfig::ConfigProcessing type_config; + it_cfg_type->second.m_function_processing(jobs, type_config); + + // get the max for config + if (!group_config.m_delay_next_request) { + group_config.m_delay_next_request = type_config.m_delay_next_request; + } else { + if (type_config.m_delay_next_request) { + group_config.m_delay_next_request = std::max(group_config.m_delay_next_request, type_config.m_delay_next_request); + } + } - for (auto &jobs_id : vec_ids) { - m_queue.jobs_del(jobs_id); + // mark the item as in wait for children of finished + // if in callback the state is set to failed, cancelled or timeout setting to finish wont succeed because if less value than those + jobs_waitforchildren(jobs); } + // TODO group_config.m_delay_next_request + // TODO for delay after requests use worker_thread delay item -> check if has_items should be set properly + // TODO if delay is set set has_items to true to force the sleep, but also a last time sleep so if there too much time and are no items dont continue + return ret; } // - // inner function for activate the jobs from queue + // inner function for extra processing after addding the jobs into queue (called from queue) // friend JobsQueue; - inline void jobs_activate(const JobsTypeT &jobs_type, const JobsID & /* jobs_id */) + inline void jobs_add(std::shared_ptr jobs_item) + { + // add it to the timeout queue + // only if job type has config a timeout + auto timeout = m_config.m_types[jobs_item->m_type].m_timeout; + if (timeout) { + m_timeout_queue.queue().push_delay_for(*timeout, jobs_item->m_id); + } + } + + // + // inner function for activate the jobs from queue (called from queue) + // + inline void jobs_schedule(const JobsTypeT &jobs_type, const JobsID & /* jobs_id */) + { + m_thread_pool.jobs_schedule(m_config.m_types[jobs_type].m_group); + } + + // + // inner thread function for timeout items (called from m_timeout_queue) + // + using JobsQueueTimeout = small::time_queue_thread; + friend JobsQueueTimeout; + + inline std::size_t push_back(std::vector &&jobs_ids) + { + jobs_timeout(jobs_ids); + return jobs_ids.size(); + } + + // + // jobs area transform from id to jobs_item + // + inline bool jobs_set_progress(const JobsID &jobs_id, const int &progress) + { + auto *jobs_item = jobs_get(jobs_id); + if (!jobs_item) { + return false; + } + return jobs_set_progress(*jobs_item, progress); + } + + inline bool jobs_set_response(const JobsID &jobs_id, const JobsResponseT &jobs_response) + { + std::unique_lock l(*this); + auto *jobs_item = jobs_get(jobs_id); + if (!jobs_item) { + return false; + } + return jobs_set_response(*jobs_item, jobs_response); + } + + inline bool jobs_set_response(const JobsID &jobs_id, JobsResponseT &&jobs_response) + { + std::unique_lock l(*this); + auto *jobs_item = jobs_get(jobs_id); + if (!jobs_item) { + return false; + } + return jobs_set_response(*jobs_item, std::forward(jobs_response)); + } + + inline bool jobs_set_state(const JobsID &jobs_id, const small::jobsimpl::EnumJobsState &jobs_state) + { + auto *jobs_item = jobs_get(jobs_id); + if (!jobs_item) { + return false; + } + return jobs_set_state(*jobs_item, jobs_state); + } + + inline bool jobs_set_state(const JobsID &jobs_id, const small::jobsimpl::EnumJobsState &jobs_state, const JobsResponseT &jobs_response) + { + auto *jobs_item = jobs_get(jobs_id); + if (!jobs_item) { + return false; + } + jobs_set_response(*jobs_item, jobs_response); + return jobs_set_state(*jobs_item, jobs_state); + } + + inline bool jobs_set_state(const JobsID &jobs_id, const small::jobsimpl::EnumJobsState &jobs_state, JobsResponseT &&jobs_response) + { + auto *jobs_item = jobs_get(jobs_id); + if (!jobs_item) { + return false; + } + jobs_set_response(*jobs_item, std::forward(jobs_response)); + return jobs_set_state(*jobs_item, jobs_state); + } + + inline std::size_t jobs_timeout(const std::vector &jobs_ids) + { + // set the jobs as timeout if it is not finished until now (called from queue) + return jobs_set_state(jobs_get(jobs_ids), small::jobsimpl::EnumJobsState::kTimeout); + } + + // + // jobs set progress + // + inline bool jobs_set_progress(const std::shared_ptr &jobs_item, const int &progress) + { + jobs_item->set_progress(progress); + if (progress == 100) { + jobs_set_state(jobs_item, small::jobsimpl::EnumJobsState::kFinished); + } + return true; + } + + // + // jobs set response + // + inline void jobs_set_response(std::shared_ptr &jobs_item, const JobsResponseT &jobs_response) + { + std::unique_lock l(*this); + jobs_item->m_response = jobs_response; + } + + inline void jobs_set_response(std::shared_ptr &jobs_item, JobsResponseT &&jobs_response) + { + std::unique_lock l(*this); + jobs_item->m_response = std::move(jobs_response); + } + + // + // jobs set states + // + inline std::size_t jobs_waitforchildren(const std::vector> &jobs_items) + { + // set the jobs as waitforchildren only if there are children otherwise advance to finish + return jobs_set_state(jobs_items, small::jobsimpl::EnumJobsState::kWaitChildren); + } + + // + // apply state + // + inline bool jobs_set_state(const std::shared_ptr &jobs_item, const small::jobsimpl::EnumJobsState &jobs_state) + { + small::jobsimpl::EnumJobsState set_state = jobs_state; + + auto ret = jobs_apply_state(jobs_item, jobs_state, &set_state); + if (ret && JobsItem::is_state_complete(set_state)) { + jobs_completed({jobs_item}); + } + return ret; + } + + inline std::size_t jobs_set_state(const std::vector> &jobs_items, const small::jobsimpl::EnumJobsState &jobs_state) + { + small::jobsimpl::EnumJobsState set_state = jobs_state; + std::vector> completed_items; + completed_items.reserve(jobs_items.size()); + + std::size_t changed_count{}; + for (auto &jobs_item : jobs_items) { + auto ret = jobs_apply_state(jobs_item, jobs_state, &set_state); + if (ret) { + ++changed_count; + if (JobsItem::is_state_complete(set_state)) { + completed_items.push_back(jobs_item); + } + } + } + + jobs_completed(completed_items); + return changed_count; + } + + inline bool jobs_apply_state(std::shared_ptr jobs_item, const small::jobsimpl::EnumJobsState &jobs_state, small::jobsimpl::EnumJobsState *jobs_set_state) + { + *jobs_set_state = jobs_state; + // state is already the same + if (jobs_item->is_state(*jobs_set_state)) { + return false; + } + + // set the jobs as timeout if it is not finished until now + if (*jobs_set_state == small::jobsimpl::EnumJobsState::kTimeout && jobs_item->is_state_finished()) { + return false; + } + + // set the jobs as waitforchildren only if there are children otherwise advance to finish + if (*jobs_set_state == small::jobsimpl::EnumJobsState::kWaitChildren) { + if (!jobs_item->has_children()) { + *jobs_set_state = small::jobsimpl::EnumJobsState::kFinished; + } + } + + jobs_item->set_state(*jobs_set_state); + return jobs_item->is_state(*jobs_set_state); + } + + // + // when a job is completed (finished/timeout/canceled/failed) + // + inline void jobs_completed(const std::vector> &jobs_items) { - m_thread_pool.job_start(m_config.m_types[jobs_type].m_group); + // (this may be called from multiple places - queue timeout, do_action finished, above set state cancel, finish, ) + + // call the custom function from config (if exists, otherwise the default will be called) + for (auto &jobs_item : jobs_items) { + m_config.m_types[jobs_item->m_type].m_function_finished({jobs_item}); + + // if it has parents call jobs_on_children_finished (or custom function) + if (jobs_item->has_parents()) { + jobs_set_progress(jobs_item, 100); // TODO update parents too + m_config.m_types[jobs_item->m_type].m_function_children_finished({jobs_item}); + } else { + // delete only if there are no parents (+delete all children) + m_queue.jobs_erase(jobs_item->m_id); + } + } + } + + // + // when is finished + // + inline void jobs_on_finished(const std::vector> &/* jobs_items */) + { + // by default nothing to here, but it can be setup for each jobs type + } + + // + // after child is finished + // + inline void jobs_on_children_finished(const std::vector> &jobs_children) + { + for (auto &jobs_item : jobs_children) { + // + // compute parent state and progress based on children + // if a children has failed/timeout/cancelled then parent is set to failed + // if all children are finished then the parent is finished + // + std::unordered_map, std::vector>>> unfinished_parents; + { + std::unique_lock l(*this); + + auto parent_jobs_items = jobs_get(jobs_item->m_parentIDs); + for (auto &parent_jobs_item : parent_jobs_items) { + if (parent_jobs_item->is_complete()) { + continue; + } + // add to the unfinished parents map (with all children) + unfinished_parents[parent_jobs_item->m_id] = {parent_jobs_item, jobs_get(parent_jobs_item->m_childrenIDs)}; + } + } // lock finished + + for (auto &[parent_jobs_id, parent_info] : unfinished_parents) { + // compute progress from all finished children + std::size_t count_failed_children = 0; + std::size_t count_completed_children = 0; + std::size_t count_total_children = parent_info.second.size(); + for (auto &child_jobs_item : parent_info.second) { + if (!child_jobs_item->is_complete()) { + continue; + } + + ++count_completed_children; + if (!child_jobs_item->is_state_finished()) { + ++count_failed_children; + } + } + + if (count_failed_children) { + jobs_failed(parent_jobs_id); + } else { + + std::size_t progress = count_total_children ? (count_completed_children * 100 / count_total_children) : 100; + jobs_progress(parent_jobs_id, progress); // TODO this should be recursive child->parent->parent (taking into account state) + + // set finished state + if (count_total_children == count_completed_children) { + jobs_finished(parent_jobs_id); + } + } + } + } } private: @@ -338,6 +804,7 @@ namespace small { // JobsConfig m_config; JobsQueue m_queue{*this}; - small::jobsimpl::jobs_engine_thread_pool m_thread_pool{*this}; // for processing items (by group) using a pool of threads + JobsQueueTimeout m_timeout_queue{*this}; // for timeout elements + small::jobsimpl::jobs_engine_thread_pool m_thread_pool{*this}; // for processing items (by group) using a pool of threads }; } // namespace small diff --git a/include/worker_thread.h b/include/worker_thread.h index d110e44..057f9b1 100644 --- a/include/worker_thread.h +++ b/include/worker_thread.h @@ -77,7 +77,7 @@ namespace small { template worker_thread(const config_worker_thread config, _Callable function, Args... extra_parameters) : m_config(config), - m_processing_function(std::bind(std::forward<_Callable>(function), std::ref(*this), std::placeholders::_1 /*item*/, std::forward(extra_parameters)...)) + m_function_processing(std::bind(std::forward<_Callable>(function), std::ref(*this), std::placeholders::_1 /*item*/, std::forward(extra_parameters)...)) { // auto start threads if count > 0 otherwise threads should be manually started if (config.threads_count) { @@ -310,7 +310,7 @@ namespace small { // callback for queue_items inline void process_items(std::vector &&items) { - m_processing_function(std::forward>(items)); // bind the std::placeholders::_1 + m_function_processing(std::forward>(items)); // bind the std::placeholders::_1 } private: @@ -320,6 +320,6 @@ namespace small { config_worker_thread m_config; // config small::lock_queue_thread> m_queue_items{*this}; // queue of items small::time_queue_thread> m_delayed_items{*this}; // queue of delayed items - std::function &)> m_processing_function{}; // processing Function + std::function &)> m_function_processing{}; // processing Function }; } // namespace small