Skip to content

Commit

Permalink
Add multiple features for jobs engine like parent child dependencies,…
Browse files Browse the repository at this point in the history
… throttling with sleep between requests, timeout for processing
  • Loading branch information
herrcristi committed Jan 6, 2025
1 parent e024856 commit 4fa7473
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 16 deletions.
104 changes: 88 additions & 16 deletions examples/examples_jobs_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,36 @@ namespace examples::jobs_engine {
{
std::cout << "Jobs Engine example 1\n";

//
// create a complex example for each type 1,2,3 (like web requests)
// - priorities
// - dependencies calls (parent-child relationship)
// (with children for calls that take long time like database access or less time like cache server)
// - coalesce calls (multiple (later) calls resolved when first one is called)
// - timeout
// - throttle (call with some wait-sleep interval in between)
//
// type1 will create 2 children and will be finished when at least 1 is finished
// (this will demonstrate OR and custom children function)
// type2 will create 2 children and will be finished whenn ALL children are finished
// (this will demonstrate the default case AND and default children function)
// type3 will have no children and one call will timeout
//
enum class JobsType
{
kJobsType1,
kJobsType2
kJobsType2,
kJobsType3,
kJobsDatabase,
kJobsCache,
};

enum class JobsGroupType
{
kJobsGroup1
kJobsGroup12,
kJobsGroup3,
kJobsGroupDatabase,
kJobsGroupCache,
};

using Request = std::pair<int, std::string>;
Expand All @@ -34,23 +56,35 @@ namespace examples::jobs_engine {
std::cout << "this function is defined without the engine params, called for " << (int)items[0]->type << "\n";
};

//
// config object for priorities, groups, types, threads, timeouts, sleeps, etc
//
JobsEng::JobsConfig config{
.m_engine = {.m_threads_count = 0 /*dont start any thread yet*/,
.m_config_prio = {.priorities = {{small::EnumPriorities::kHighest, 2},
{small::EnumPriorities::kHigh, 2},
{small::EnumPriorities::kNormal, 2},
{small::EnumPriorities::kLow, 1}}}}, // overall config with default priorities
.m_default_processing_function = jobs_processing_function, // default processing function, better use jobs.add_default_processing_function to set it
.m_groups = {
{JobsGroupType::kJobsGroup1, {.m_threads_count = 1}}}, // config by jobs group
.m_types = {
{JobsType::kJobsType1, {.m_group = JobsGroupType::kJobsGroup1}},
{JobsType::kJobsType2, {.m_group = JobsGroupType::kJobsGroup1}},
}};

// create jobs engine
.m_engine = {.m_threads_count = 0 /*dont start any thread yet*/, // TODO add thread count for wait_for_children processing and finished, default 1/2 here use 1 due to coalesce
.m_config_prio = {.priorities = {{small::EnumPriorities::kHighest, 2},
{small::EnumPriorities::kHigh, 2},
{small::EnumPriorities::kNormal, 2},
{small::EnumPriorities::kLow, 1}}}}, // overall config with default priorities

.m_default_processing_function = jobs_processing_function, // default processing function, better use jobs.add_default_processing_function to set it

.m_groups = {{JobsGroupType::kJobsGroup12, {.m_threads_count = 1}}, // config by jobs group // TODO add sleep_between_requests
{JobsGroupType::kJobsGroup3, {.m_threads_count = 1}},
{JobsGroupType::kJobsGroupDatabase, {.m_threads_count = 1}},
{JobsGroupType::kJobsGroupCache, {.m_threads_count = 1}}},

.m_types = {{JobsType::kJobsType1, {.m_group = JobsGroupType::kJobsGroup12}}, //
{JobsType::kJobsType2, {.m_group = JobsGroupType::kJobsGroup12}}, //
{JobsType::kJobsType3, {.m_group = JobsGroupType::kJobsGroup3}}, // TODO add timeout for job
{JobsType::kJobsDatabase, {.m_group = JobsGroupType::kJobsGroupDatabase}}, // TODO add timeout for job
{JobsType::kJobsCache, {.m_group = JobsGroupType::kJobsGroupCache}}}}; // TODO add timeout for job

//
// create jobs engine with the above config
//
JobsEng jobs(config);

// TODO add config as param so the sleep after can be overridden
jobs.add_default_processing_function([](auto &j /*this jobs engine*/, const auto &jobs_items) {
for (auto &item : jobs_items) {
std::cout << "thread " << std::this_thread::get_id()
Expand All @@ -67,6 +101,7 @@ namespace examples::jobs_engine {
small::sleep(30);
});

// TODO add config as param so the sleep after can be overridden
// add specific function for job1 (calling the function from jobs intead of config allows to pass the engine and extra param)
jobs.add_job_processing_function(JobsType::kJobsType1, [](auto &j /*this jobs engine*/, const auto &jobs_items, auto b /*extra param b*/) {
for (auto &item : jobs_items) {
Expand All @@ -80,12 +115,49 @@ namespace examples::jobs_engine {
<< " ref count " << item.use_count()
<< " time " << small::toISOString(small::timeNow())
<< "\n";
// TODO add 2 more children jobs for current one for database and server cache
// TODO save somewhere in an unordered_map the database requests - the problem is that jobid is received after push_jobs
// TODO save type1 requests into a promises unordered_map
// TODO for type 2 only database (add another processing function)
}
small::sleep(30); }, 5 /*param b*/);

// TODO daca as vrea sa folosesc un alt job_server cum modelez asa incat jobul dintr-o parte sa ramana intr-o stare ca si cand ar avea copii si
// TODO sa se face un request in alta parte si ala cand se termina pe finish (sau daca e worker thread in functia de procesare) sa faca set state
// TODO set state merge daca e doar o dependinta, daca sunt mai multe atunci ar tb o functie custom - childProcessing (desi are sau nu are children - sau cum fac un dummy children - poate cu thread_count 0?)

// add specific function for job2
jobs.add_job_processing_function(JobsType::kJobsType2, [](auto &j /*this jobs engine*/, const auto &jobs_items) {
for (auto &item : jobs_items) {
std::cout << "thread " << std::this_thread::get_id()
<< " JOB2 processing "
<< "{"
<< " type=" << (int)item->type
<< " req.int=" << item->request.first << ","
<< " req.str=\"" << item->request.second << "\""
<< "}"
<< " ref count " << item.use_count()
<< " time " << small::toISOString(small::timeNow())
<< "\n";
// TODO for type 2 only database children (add another processing function)
}
// TODO config to wait after request (even if it is not specified in the global config - so custom throttle)
small::sleep(30); });

// TODO add function for database where demonstrate coalesce of 3 items (sleep 1000)
// TODO add function for cache server - no coalesce for demo purposes (sleep 500) so 3rd parent items is finished due to database and not cache server

// TODO add function for custom wait children and demonstrate set progress to another item
// TODO add function for custom finished (for type1 to set the promises completed)

JobsEng::JobsID jobs_id{};
std::vector<JobsEng::JobsID> jobs_ids;

// TODO create a promises/futures unordered_map for type1 requests and wait later

// show coalesce for children database requests
std::unordered_map<unsigned int, JobsEng::JobsID> web_requests;

// push
jobs.queue().push_back(small::EnumPriorities::kNormal, JobsType::kJobsType1, {1, "normal"}, &jobs_id);
jobs.queue().push_back(small::EnumPriorities::kHigh, JobsType::kJobsType2, {2, "high"}, &jobs_id);
Expand Down
1 change: 1 addition & 0 deletions include/impl/jobs_item_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ namespace small::jobsimpl {
std::atomic<int> progress{}; // progress 0-100 for state kInProgress
JobsRequestT request{}; // request needed for processing function
JobsResponseT response{}; // where the results are saved (for the finished callback if exists)
// TODO add parents and children ids

explicit jobs_item() = default;
explicit jobs_item(const JobsID &jobs_id, const JobsTypeT &jobs_type, const JobsRequestT &jobs_request)
Expand Down
9 changes: 9 additions & 0 deletions include/jobs_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ namespace small {
{
using JobsItem = typename small::jobsimpl::jobs_item<JobsTypeT, JobsRequestT, JobsResponseT>;
using ProcessingFunction = std::function<void(const std::vector<std::shared_ptr<JobsItem>> &)>;
// TODO add WaitChildrenFunction
// TODO add FinishedFunction

// config for the entire jobs engine
struct ConfigJobsEngine
Expand All @@ -32,6 +34,8 @@ namespace small {
JobsGroupT m_group{}; // job type group (multiple job types can be configured to same group)
bool m_has_processing_function{false}; // use default processing function
ProcessingFunction m_processing_function{}; // processing Function
// TODO add WaitChildrenFunction
// TODO add FinishedFunction
};

// config for the job group (where job types can be grouped)
Expand Down Expand Up @@ -62,6 +66,8 @@ namespace small {
it_f->second.m_has_processing_function = true;
it_f->second.m_processing_function = processing_function;
}
// TODO add job WaitChildrenFunction
// TODO add job FinishedFunction

inline void apply_default_processing_function()
{
Expand All @@ -71,5 +77,8 @@ namespace small {
}
}
}

// TODO apply WaitChildrenFunction - the function must be passed as parameter from engine
// TODO apply FinishedFunction - the function must be passed as parameter from engine
};
} // namespace small
14 changes: 14 additions & 0 deletions include/jobs_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,8 @@ namespace small {

// setup jobs types
m_config.apply_default_processing_function();
// TODO apply default WaitChildrenFunction - pass the function as parameter using std::bind(this)
// TODO apply default FinishedFunction - pass the function as parameter using std::bind(this)
for (auto &[jobs_type, jobs_type_config] : m_config.m_types) {
m_queue.add_jobs_type(jobs_type, jobs_type_config.m_group);
}
Expand Down Expand Up @@ -301,6 +303,7 @@ namespace small {
for (auto &jobs_item : jobs_items) {
elems_by_type[jobs_item->type].reserve(jobs_items.size());
elems_by_type[jobs_item->type].push_back(jobs_item);
// TODO mark the items as in progress
}
}

Expand All @@ -311,17 +314,28 @@ namespace small {
continue;
}

// TODO pass the config parameter
// process specific jobs by type
it_cfg_type->second.m_processing_function(jobs);

// TODO marks the items as either wait for children (if it has children) or finished
// TODO put in proper thread for processing children and finished work (1/2 thread(s) for each - better to have a config for it?)
// TODO the worker thread is configured for jobgroup, children and finished are not part of those - a solution is to add a pair or internal_group
}

// TODO move this delete on the finished thread
for (auto &jobs_id : vec_ids) {
m_queue.jobs_del(jobs_id);
}

// TODO for sleep after requests use worker_thread delay item -> check if has_items should be set properly
// TODO if sleep is set set has_items to true to force the sleep, but also a last time sleep so if there too much time and are no items dont continue

return ret;
}

// TODO external set state for a job moves it to proper wait for children or finished

//
// inner function for activate the jobs from queue
//
Expand Down

0 comments on commit 4fa7473

Please sign in to comment.