diff --git a/examples/examples_jobs_engine.h b/examples/examples_jobs_engine.h index faa97cb..5620a21 100644 --- a/examples/examples_jobs_engine.h +++ b/examples/examples_jobs_engine.h @@ -16,14 +16,36 @@ namespace examples::jobs_engine { { std::cout << "Jobs Engine example 1\n"; + // + // create a complex example for each type 1,2,3 (like web requests) + // - priorities + // - dependencies calls (parent-child relationship) + // (with children for calls that take long time like database access or less time like cache server) + // - coalesce calls (multiple (later) calls resolved when first one is called) + // - timeout + // - throttle (call with some wait-sleep interval in between) + // + // type1 will create 2 children and will be finished when at least 1 is finished + // (this will demonstrate OR and custom children function) + // type2 will create 2 children and will be finished whenn ALL children are finished + // (this will demonstrate the default case AND and default children function) + // type3 will have no children and one call will timeout + // enum class JobsType { kJobsType1, - kJobsType2 + kJobsType2, + kJobsType3, + kJobsDatabase, + kJobsCache, }; + enum class JobsGroupType { - kJobsGroup1 + kJobsGroup12, + kJobsGroup3, + kJobsGroupDatabase, + kJobsGroupCache, }; using Request = std::pair; @@ -34,23 +56,35 @@ namespace examples::jobs_engine { std::cout << "this function is defined without the engine params, called for " << (int)items[0]->type << "\n"; }; + // + // config object for priorities, groups, types, threads, timeouts, sleeps, etc + // JobsEng::JobsConfig config{ - .m_engine = {.m_threads_count = 0 /*dont start any thread yet*/, - .m_config_prio = {.priorities = {{small::EnumPriorities::kHighest, 2}, - {small::EnumPriorities::kHigh, 2}, - {small::EnumPriorities::kNormal, 2}, - {small::EnumPriorities::kLow, 1}}}}, // overall config with default priorities - .m_default_processing_function = jobs_processing_function, // default processing function, better use jobs.add_default_processing_function to set it - .m_groups = { - {JobsGroupType::kJobsGroup1, {.m_threads_count = 1}}}, // config by jobs group - .m_types = { - {JobsType::kJobsType1, {.m_group = JobsGroupType::kJobsGroup1}}, - {JobsType::kJobsType2, {.m_group = JobsGroupType::kJobsGroup1}}, - }}; - - // create jobs engine + .m_engine = {.m_threads_count = 0 /*dont start any thread yet*/, // TODO add thread count for wait_for_children processing and finished, default 1/2 here use 1 due to coalesce + .m_config_prio = {.priorities = {{small::EnumPriorities::kHighest, 2}, + {small::EnumPriorities::kHigh, 2}, + {small::EnumPriorities::kNormal, 2}, + {small::EnumPriorities::kLow, 1}}}}, // overall config with default priorities + + .m_default_processing_function = jobs_processing_function, // default processing function, better use jobs.add_default_processing_function to set it + + .m_groups = {{JobsGroupType::kJobsGroup12, {.m_threads_count = 1}}, // config by jobs group // TODO add sleep_between_requests + {JobsGroupType::kJobsGroup3, {.m_threads_count = 1}}, + {JobsGroupType::kJobsGroupDatabase, {.m_threads_count = 1}}, + {JobsGroupType::kJobsGroupCache, {.m_threads_count = 1}}}, + + .m_types = {{JobsType::kJobsType1, {.m_group = JobsGroupType::kJobsGroup12}}, // + {JobsType::kJobsType2, {.m_group = JobsGroupType::kJobsGroup12}}, // + {JobsType::kJobsType3, {.m_group = JobsGroupType::kJobsGroup3}}, // TODO add timeout for job + {JobsType::kJobsDatabase, {.m_group = JobsGroupType::kJobsGroupDatabase}}, // TODO add timeout for job + {JobsType::kJobsCache, {.m_group = JobsGroupType::kJobsGroupCache}}}}; // TODO add timeout for job + + // + // create jobs engine with the above config + // JobsEng jobs(config); + // TODO add config as param so the sleep after can be overridden jobs.add_default_processing_function([](auto &j /*this jobs engine*/, const auto &jobs_items) { for (auto &item : jobs_items) { std::cout << "thread " << std::this_thread::get_id() @@ -67,6 +101,7 @@ namespace examples::jobs_engine { small::sleep(30); }); + // TODO add config as param so the sleep after can be overridden // add specific function for job1 (calling the function from jobs intead of config allows to pass the engine and extra param) jobs.add_job_processing_function(JobsType::kJobsType1, [](auto &j /*this jobs engine*/, const auto &jobs_items, auto b /*extra param b*/) { for (auto &item : jobs_items) { @@ -80,12 +115,49 @@ namespace examples::jobs_engine { << " ref count " << item.use_count() << " time " << small::toISOString(small::timeNow()) << "\n"; + // TODO add 2 more children jobs for current one for database and server cache + // TODO save somewhere in an unordered_map the database requests - the problem is that jobid is received after push_jobs + // TODO save type1 requests into a promises unordered_map + // TODO for type 2 only database (add another processing function) } small::sleep(30); }, 5 /*param b*/); + // TODO daca as vrea sa folosesc un alt job_server cum modelez asa incat jobul dintr-o parte sa ramana intr-o stare ca si cand ar avea copii si + // TODO sa se face un request in alta parte si ala cand se termina pe finish (sau daca e worker thread in functia de procesare) sa faca set state + // TODO set state merge daca e doar o dependinta, daca sunt mai multe atunci ar tb o functie custom - childProcessing (desi are sau nu are children - sau cum fac un dummy children - poate cu thread_count 0?) + + // add specific function for job2 + jobs.add_job_processing_function(JobsType::kJobsType2, [](auto &j /*this jobs engine*/, const auto &jobs_items) { + for (auto &item : jobs_items) { + std::cout << "thread " << std::this_thread::get_id() + << " JOB2 processing " + << "{" + << " type=" << (int)item->type + << " req.int=" << item->request.first << "," + << " req.str=\"" << item->request.second << "\"" + << "}" + << " ref count " << item.use_count() + << " time " << small::toISOString(small::timeNow()) + << "\n"; + // TODO for type 2 only database children (add another processing function) + } + // TODO config to wait after request (even if it is not specified in the global config - so custom throttle) + small::sleep(30); }); + + // TODO add function for database where demonstrate coalesce of 3 items (sleep 1000) + // TODO add function for cache server - no coalesce for demo purposes (sleep 500) so 3rd parent items is finished due to database and not cache server + + // TODO add function for custom wait children and demonstrate set progress to another item + // TODO add function for custom finished (for type1 to set the promises completed) + JobsEng::JobsID jobs_id{}; std::vector jobs_ids; + // TODO create a promises/futures unordered_map for type1 requests and wait later + + // show coalesce for children database requests + std::unordered_map web_requests; + // push jobs.queue().push_back(small::EnumPriorities::kNormal, JobsType::kJobsType1, {1, "normal"}, &jobs_id); jobs.queue().push_back(small::EnumPriorities::kHigh, JobsType::kJobsType2, {2, "high"}, &jobs_id); diff --git a/include/impl/jobs_item_impl.h b/include/impl/jobs_item_impl.h index 684d5e1..01e6bed 100644 --- a/include/impl/jobs_item_impl.h +++ b/include/impl/jobs_item_impl.h @@ -33,6 +33,7 @@ namespace small::jobsimpl { std::atomic progress{}; // progress 0-100 for state kInProgress JobsRequestT request{}; // request needed for processing function JobsResponseT response{}; // where the results are saved (for the finished callback if exists) + // TODO add parents and children ids explicit jobs_item() = default; explicit jobs_item(const JobsID &jobs_id, const JobsTypeT &jobs_type, const JobsRequestT &jobs_request) diff --git a/include/jobs_config.h b/include/jobs_config.h index 81d95dc..6c6a60f 100644 --- a/include/jobs_config.h +++ b/include/jobs_config.h @@ -18,6 +18,8 @@ namespace small { { using JobsItem = typename small::jobsimpl::jobs_item; using ProcessingFunction = std::function> &)>; + // TODO add WaitChildrenFunction + // TODO add FinishedFunction // config for the entire jobs engine struct ConfigJobsEngine @@ -32,6 +34,8 @@ namespace small { JobsGroupT m_group{}; // job type group (multiple job types can be configured to same group) bool m_has_processing_function{false}; // use default processing function ProcessingFunction m_processing_function{}; // processing Function + // TODO add WaitChildrenFunction + // TODO add FinishedFunction }; // config for the job group (where job types can be grouped) @@ -62,6 +66,8 @@ namespace small { it_f->second.m_has_processing_function = true; it_f->second.m_processing_function = processing_function; } + // TODO add job WaitChildrenFunction + // TODO add job FinishedFunction inline void apply_default_processing_function() { @@ -71,5 +77,8 @@ namespace small { } } } + + // TODO apply WaitChildrenFunction - the function must be passed as parameter from engine + // TODO apply FinishedFunction - the function must be passed as parameter from engine }; } // namespace small diff --git a/include/jobs_engine.h b/include/jobs_engine.h index 9fb17b0..93bd940 100644 --- a/include/jobs_engine.h +++ b/include/jobs_engine.h @@ -252,6 +252,8 @@ namespace small { // setup jobs types m_config.apply_default_processing_function(); + // TODO apply default WaitChildrenFunction - pass the function as parameter using std::bind(this) + // TODO apply default FinishedFunction - pass the function as parameter using std::bind(this) for (auto &[jobs_type, jobs_type_config] : m_config.m_types) { m_queue.add_jobs_type(jobs_type, jobs_type_config.m_group); } @@ -301,6 +303,7 @@ namespace small { for (auto &jobs_item : jobs_items) { elems_by_type[jobs_item->type].reserve(jobs_items.size()); elems_by_type[jobs_item->type].push_back(jobs_item); + // TODO mark the items as in progress } } @@ -311,17 +314,28 @@ namespace small { continue; } + // TODO pass the config parameter // process specific jobs by type it_cfg_type->second.m_processing_function(jobs); + + // TODO marks the items as either wait for children (if it has children) or finished + // TODO put in proper thread for processing children and finished work (1/2 thread(s) for each - better to have a config for it?) + // TODO the worker thread is configured for jobgroup, children and finished are not part of those - a solution is to add a pair or internal_group } + // TODO move this delete on the finished thread for (auto &jobs_id : vec_ids) { m_queue.jobs_del(jobs_id); } + // TODO for sleep after requests use worker_thread delay item -> check if has_items should be set properly + // TODO if sleep is set set has_items to true to force the sleep, but also a last time sleep so if there too much time and are no items dont continue + return ret; } + // TODO external set state for a job moves it to proper wait for children or finished + // // inner function for activate the jobs from queue //