diff --git a/examples/examples_jobs_engine.h b/examples/examples_jobs_engine.h index 5620a21..334d513 100644 --- a/examples/examples_jobs_engine.h +++ b/examples/examples_jobs_engine.h @@ -51,66 +51,84 @@ namespace examples::jobs_engine { using Request = std::pair; using JobsEng = small::jobs_engine; - auto jobs_processing_function = [](const std::vector> &items) { + auto jobs_function_processing = [](const std::vector> &items, JobsEng::JobsConfig::ConfigProcessing & /* config */) { // this functions is defined without the engine params (it is here just for the example) - std::cout << "this function is defined without the engine params, called for " << (int)items[0]->type << "\n"; + std::cout << "this function is defined without the engine params, called for " << (int)items[0]->m_type << "\n"; }; // // config object for priorities, groups, types, threads, timeouts, sleeps, etc // JobsEng::JobsConfig config{ - .m_engine = {.m_threads_count = 0 /*dont start any thread yet*/, // TODO add thread count for wait_for_children processing and finished, default 1/2 here use 1 due to coalesce - .m_config_prio = {.priorities = {{small::EnumPriorities::kHighest, 2}, - {small::EnumPriorities::kHigh, 2}, - {small::EnumPriorities::kNormal, 2}, - {small::EnumPriorities::kLow, 1}}}}, // overall config with default priorities - - .m_default_processing_function = jobs_processing_function, // default processing function, better use jobs.add_default_processing_function to set it - - .m_groups = {{JobsGroupType::kJobsGroup12, {.m_threads_count = 1}}, // config by jobs group // TODO add sleep_between_requests - {JobsGroupType::kJobsGroup3, {.m_threads_count = 1}}, - {JobsGroupType::kJobsGroupDatabase, {.m_threads_count = 1}}, - {JobsGroupType::kJobsGroupCache, {.m_threads_count = 1}}}, - - .m_types = {{JobsType::kJobsType1, {.m_group = JobsGroupType::kJobsGroup12}}, // - {JobsType::kJobsType2, {.m_group = JobsGroupType::kJobsGroup12}}, // - {JobsType::kJobsType3, {.m_group = JobsGroupType::kJobsGroup3}}, // TODO add timeout for job - {JobsType::kJobsDatabase, {.m_group = JobsGroupType::kJobsGroupDatabase}}, // TODO add timeout for job - {JobsType::kJobsCache, {.m_group = JobsGroupType::kJobsGroupCache}}}}; // TODO add timeout for job + .m_engine = {.m_threads_count = 0, // dont start any thread yet + .m_threads_count_finished = 1, // override how many threads to use for internal processing of finished states + .m_config_prio = {.priorities = {{small::EnumPriorities::kHighest, 2}, + {small::EnumPriorities::kHigh, 2}, + {small::EnumPriorities::kNormal, 2}, + {small::EnumPriorities::kLow, 1}}}}, // overall config with default priorities + + .m_default_function_processing = jobs_function_processing, // default processing function, better use jobs.add_default_function_processing to set it + + .m_groups = {{JobsGroupType::kJobsGroup12, {.m_threads_count = 1}}, // config by jobs group + {JobsGroupType::kJobsGroup3, {.m_threads_count = 1, .m_delay_next_request = std::chrono::milliseconds(30)}}, + {JobsGroupType::kJobsGroupDatabase, {.m_threads_count = 1}}, // these requests will coalesce results for demo purposes + {JobsGroupType::kJobsGroupCache, {.m_threads_count = 0}}}, // no threads !!, these requests are executed outside of jobs engine for demo purposes + + .m_types = {{JobsType::kJobsType1, {.m_group = JobsGroupType::kJobsGroup12}}, + {JobsType::kJobsType2, {.m_group = JobsGroupType::kJobsGroup12}}, + {JobsType::kJobsType3, {.m_group = JobsGroupType::kJobsGroup3, .m_timeout = std::chrono::milliseconds(500)}}, + {JobsType::kJobsDatabase, {.m_group = JobsGroupType::kJobsGroupDatabase}}, + {JobsType::kJobsCache, {.m_group = JobsGroupType::kJobsGroupCache}}}}; // // create jobs engine with the above config // JobsEng jobs(config); - // TODO add config as param so the sleep after can be overridden - jobs.add_default_processing_function([](auto &j /*this jobs engine*/, const auto &jobs_items) { + // create a cache server (with workers to simulate access to it) + // (as an external engine outside the jobs engine for demo purposes) + small::worker_thread cache_server({.threads_count = 1}, [](auto &w /*this*/, const auto &items) { + // process item using the workers lock (not recommended) + + for (auto &i : items) { + std::cout << "thread " << std::this_thread::get_id() + << " processing cache {" << i << "}" << "\n"; + + // TODO mark the jobs id associated as succeeded (for demo purposes to avoid creating other structures) + } + // sleep long enough + small::sleep(500); + }); + + // default processing used for job type 3 with custom delay in between requests + // one request will succeed and one request will timeout for demo purposes + jobs.add_default_function_processing([](auto &j /*this jobs engine*/, const auto &jobs_items, auto &jobs_config) { for (auto &item : jobs_items) { std::cout << "thread " << std::this_thread::get_id() << " DEFAULT processing " << "{" - << " type=" << (int)item->type - << " req.int=" << item->request.first << "," - << " req.str=\"" << item->request.second << "\"" + << " type=" << (int)item->m_type + << " req.int=" << item->m_request.first << "," + << " req.str=\"" << item->m_request.second << "\"" << "}" << " ref count " << item.use_count() << " time " << small::toISOString(small::timeNow()) << "\n"; } - small::sleep(30); + + // set a custom delay (timeout for job3 is 500 ms) + jobs_config.m_delay_next_request = std::chrono::milliseconds(1000); }); - // TODO add config as param so the sleep after can be overridden // add specific function for job1 (calling the function from jobs intead of config allows to pass the engine and extra param) - jobs.add_job_processing_function(JobsType::kJobsType1, [](auto &j /*this jobs engine*/, const auto &jobs_items, auto b /*extra param b*/) { + jobs.add_job_function_processing(JobsType::kJobsType1, [](auto &j /*this jobs engine*/, const auto &jobs_items, auto & /* config */, auto b /*extra param b*/) { for (auto &item : jobs_items) { std::cout << "thread " << std::this_thread::get_id() << " JOB1 processing " << "{" - << " type=" << (int)item->type - << " req.int=" << item->request.first << "," - << " req.str=\"" << item->request.second << "\"" + << " type=" << (int)item->m_type + << " req.int=" << item->m_request.first << "," + << " req.str=\"" << item->m_request.second << "\"" << "}" << " ref count " << item.use_count() << " time " << small::toISOString(small::timeNow()) @@ -123,18 +141,18 @@ namespace examples::jobs_engine { small::sleep(30); }, 5 /*param b*/); // TODO daca as vrea sa folosesc un alt job_server cum modelez asa incat jobul dintr-o parte sa ramana intr-o stare ca si cand ar avea copii si - // TODO sa se face un request in alta parte si ala cand se termina pe finish (sau daca e worker thread in functia de procesare) sa faca set state + // TODO sa se faca un request in alta parte si ala cand se termina pe finish (sau daca e worker thread in functia de procesare) sa faca set state // TODO set state merge daca e doar o dependinta, daca sunt mai multe atunci ar tb o functie custom - childProcessing (desi are sau nu are children - sau cum fac un dummy children - poate cu thread_count 0?) // add specific function for job2 - jobs.add_job_processing_function(JobsType::kJobsType2, [](auto &j /*this jobs engine*/, const auto &jobs_items) { + jobs.add_job_function_processing(JobsType::kJobsType2, [](auto &j /*this jobs engine*/, const auto &jobs_items, auto & /* config */) { for (auto &item : jobs_items) { std::cout << "thread " << std::this_thread::get_id() << " JOB2 processing " << "{" - << " type=" << (int)item->type - << " req.int=" << item->request.first << "," - << " req.str=\"" << item->request.second << "\"" + << " type=" << (int)item->m_type + << " req.int=" << item->m_request.first << "," + << " req.str=\"" << item->m_request.second << "\"" << "}" << " ref count " << item.use_count() << " time " << small::toISOString(small::timeNow()) @@ -158,6 +176,7 @@ namespace examples::jobs_engine { // show coalesce for children database requests std::unordered_map web_requests; + // TODO type3 one request will succeed and one request will timeout for demo purposes // push jobs.queue().push_back(small::EnumPriorities::kNormal, JobsType::kJobsType1, {1, "normal"}, &jobs_id); jobs.queue().push_back(small::EnumPriorities::kHigh, JobsType::kJobsType2, {2, "high"}, &jobs_id); diff --git a/include/jobs_engine.h b/include/jobs_engine.h index 93bd940..53581a6 100644 --- a/include/jobs_engine.h +++ b/include/jobs_engine.h @@ -33,14 +33,14 @@ // // create jobs engine // JobsEng jobs(config); // -// jobs.add_default_processing_function([](auto &j /*this jobs engine*/, const auto &jobs_items) { +// jobs.add_default_function_processing([](auto &j /*this jobs engine*/, const auto &jobs_items) { // for (auto &item : jobs_items) { // ... // } // }); // // // add specific function for job1 -// jobs.add_job_processing_function(JobsType::kJobsType1, [](auto &j /*this jobs engine*/, const auto &jobs_items, auto b /*extra param b*/) { +// jobs.add_job_function_processing(JobsType::kJobsType1, [](auto &j /*this jobs engine*/, const auto &jobs_items, auto b /*extra param b*/) { // for (auto &item : jobs_items) { // ... // } @@ -69,14 +69,16 @@ namespace small { class jobs_engine { public: - using ThisJobsEngine = small::jobs_engine; - using JobsConfig = small::jobs_config; - using JobsItem = small::jobsimpl::jobs_item; - using JobsQueue = small::jobsimpl::jobs_queue; - using JobsID = typename JobsItem::JobsID; - using TimeClock = typename JobsQueue::TimeClock; - using TimeDuration = typename JobsQueue::TimeDuration; - using ProcessingFunction = typename JobsConfig::ProcessingFunction; + using ThisJobsEngine = typename small::jobs_engine; + using JobsConfig = typename small::jobs_config; + using JobsItem = typename small::jobsimpl::jobs_item; + using JobsQueue = typename small::jobsimpl::jobs_queue; + using JobsID = typename JobsItem::JobsID; + using TimeClock = typename JobsQueue::TimeClock; + using TimeDuration = typename JobsQueue::TimeDuration; + using FunctionProcessing = typename JobsConfig::FunctionProcessing; + using FunctionOnChildrenFinished = typename JobsConfig::FunctionOnChildrenFinished; + using FunctionFinished = typename JobsConfig::FunctionFinished; public: // @@ -160,17 +162,42 @@ namespace small { apply_config(); } - // processing function + // override default jobs function template - inline void add_default_processing_function(_Callable processing_function, Args... extra_parameters) + inline void add_default_function_processing(_Callable function_processing, Args... extra_parameters) { - m_config.add_default_processing_function(std::bind(std::forward<_Callable>(processing_function), std::ref(*this), std::placeholders::_1 /*jobs_items*/, std::forward(extra_parameters)...)); + m_config.add_default_function_processing(std::bind(std::forward<_Callable>(function_processing), std::ref(*this), std::placeholders::_1 /*jobs_items*/, std::placeholders::_2 /*config*/, std::forward(extra_parameters)...)); } template - inline void add_job_processing_function(const JobsTypeT &jobs_type, _Callable processing_function, Args... extra_parameters) + inline void add_default_function_on_children_finished(_Callable function_on_children_finished, Args... extra_parameters) { - m_config.add_job_processing_function(jobs_type, std::bind(std::forward<_Callable>(processing_function), std::ref(*this), std::placeholders::_1 /*jobs_items*/, std::forward(extra_parameters)...)); + m_config.add_default_function_on_children_finished(std::bind(std::forward<_Callable>(function_on_children_finished), std::ref(*this), std::placeholders::_1 /*jobs_items*/, std::placeholders::_2 /*config*/, std::forward(extra_parameters)...)); + } + + template + inline void add_default_function_finished(_Callable function_finished, Args... extra_parameters) + { + m_config.add_default_function_finished(std::bind(std::forward<_Callable>(function_finished), std::ref(*this), std::placeholders::_1 /*jobs_items*/, std::placeholders::_2 /*config*/, std::forward(extra_parameters)...)); + } + + // specific jobs functions + template + inline void add_job_function_processing(const JobsTypeT &jobs_type, _Callable function_processing, Args... extra_parameters) + { + m_config.add_job_function_processing(jobs_type, std::bind(std::forward<_Callable>(function_processing), std::ref(*this), std::placeholders::_1 /*jobs_items*/, std::placeholders::_2 /*config*/, std::forward(extra_parameters)...)); + } + + template + inline void add_job_function_on_children_finished(const JobsTypeT &jobs_type, _Callable function_on_children_finished, Args... extra_parameters) + { + m_config.add_job_function_on_children_finished(jobs_type, std::bind(std::forward<_Callable>(function_on_children_finished), std::ref(*this), std::placeholders::_1 /*jobs_items*/, std::forward(extra_parameters)...)); + } + + template + inline void add_job_function_finished(const JobsTypeT &jobs_type, _Callable function_finished, Args... extra_parameters) + { + m_config.add_job_function_finished(jobs_type, std::bind(std::forward<_Callable>(function_finished), std::ref(*this), std::placeholders::_1 /*jobs_items*/, std::forward(extra_parameters)...)); } // @@ -251,11 +278,19 @@ namespace small { } // setup jobs types - m_config.apply_default_processing_function(); - // TODO apply default WaitChildrenFunction - pass the function as parameter using std::bind(this) - // TODO apply default FinishedFunction - pass the function as parameter using std::bind(this) + if (!m_config.m_default_function_on_children_finished) { + m_config.m_default_function_on_children_finished = std::bind(&jobs_engine::jobs_on_children_finished, this, std::placeholders::_1 /*jobs_items*/); + } + if (!m_config.m_default_function_finished) { + m_config.m_default_function_finished = std::bind(&jobs_engine::jobs_finished, this, std::placeholders::_1 /*jobs_items*/); + } + + m_config.apply_default_function_processing(); + m_config.apply_default_function_on_children_finished(); + m_config.apply_default_function_finished(); + for (auto &[jobs_type, jobs_type_config] : m_config.m_types) { - m_queue.add_jobs_type(jobs_type, jobs_type_config.m_group); + m_queue.add_jobs_type(jobs_type, jobs_type_config.m_group, jobs_type_config.m_timeout); } // auto start threads if count > 0 otherwise threads should be manually started @@ -281,6 +316,10 @@ namespace small { int bulk_count = std::max(it_cfg_grp->second.m_bulk_count, 1); + // delay request + typename JobsConfig::ConfigProcessing group_config{}; + group_config.m_delay_next_request = it_cfg_grp->second.m_delay_next_request; + // get items to process auto *q = m_queue.get_group_queue(jobs_group); if (!q) { @@ -301,9 +340,14 @@ namespace small { // get jobs std::vector> jobs_items = m_queue.jobs_get(vec_ids); for (auto &jobs_item : jobs_items) { - elems_by_type[jobs_item->type].reserve(jobs_items.size()); - elems_by_type[jobs_item->type].push_back(jobs_item); - // TODO mark the items as in progress + elems_by_type[jobs_item->m_type].reserve(jobs_items.size()); + + // mark the item as in progress + jobs_item->set_state_inprogress(); + // execute if it is still in progress (may be moved to higher states due to external factors like cancel, timeout, finish early due to other job, etc) + if (jobs_item->is_state_inprogress()) { + elems_by_type[jobs_item->m_type].push_back(jobs_item); + } } } @@ -314,11 +358,26 @@ namespace small { continue; } - // TODO pass the config parameter + // TODO if an item has timeout add it to a time queue with callback that marks that item as timeout + // TODO timeout should be set only if it is not finished/failed/cancelled + // process specific jobs by type - it_cfg_type->second.m_processing_function(jobs); + typename JobsConfig::ConfigProcessing type_config; + it_cfg_type->second.m_function_processing(jobs, type_config); + + // get the max for config + if (!group_config.m_delay_next_request) { + group_config.m_delay_next_request = type_config.m_delay_next_request; + } else { + if (type_config.m_delay_next_request) { + group_config.m_delay_next_request = std::max(group_config.m_delay_next_request, type_config.m_delay_next_request); + } + } // TODO marks the items as either wait for children (if it has children) or finished + // mark the item as in wait for children of finished + // if in callback the state is set to failed, cancelled or timeout setting to finish wont succeed because if less value than those + // jobs_item->set_state(small::EnumJobsState::kInProgress); // TODO put in proper thread for processing children and finished work (1/2 thread(s) for each - better to have a config for it?) // TODO the worker thread is configured for jobgroup, children and finished are not part of those - a solution is to add a pair or internal_group } @@ -328,8 +387,9 @@ namespace small { m_queue.jobs_del(jobs_id); } - // TODO for sleep after requests use worker_thread delay item -> check if has_items should be set properly - // TODO if sleep is set set has_items to true to force the sleep, but also a last time sleep so if there too much time and are no items dont continue + // TODO group_config.m_delay_next_request + // TODO for delay after requests use worker_thread delay item -> check if has_items should be set properly + // TODO if delay is set set has_items to true to force the sleep, but also a last time sleep so if there too much time and are no items dont continue return ret; } @@ -346,6 +406,22 @@ namespace small { m_thread_pool.job_start(m_config.m_types[jobs_type].m_group); } + inline void jobs_finished(const std::vector> &jobs_items) + { + // TODO call the custom function from config if exists + + for (auto &jobs_item : jobs_items) { + m_queue.jobs_del(jobs_item->id); + } + } + + inline void jobs_on_children_finished(const std::vector> &jobs_children) + { + // TODO update parent state and progress + // for (auto &jobs_child : jobs_children) { + // } + } + private: // // members