Skip to content

Commit

Permalink
Add multiple features for jobs engine like parent child dependencies,…
Browse files Browse the repository at this point in the history
… throttling with sleep between requests, timeout for processing
  • Loading branch information
herrcristi committed Jan 7, 2025
1 parent 164c3d1 commit 4c438dc
Show file tree
Hide file tree
Showing 2 changed files with 157 additions and 62 deletions.
91 changes: 55 additions & 36 deletions examples/examples_jobs_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,66 +51,84 @@ namespace examples::jobs_engine {
using Request = std::pair<int, std::string>;
using JobsEng = small::jobs_engine<JobsType, Request, int /*response*/, JobsGroupType>;

auto jobs_processing_function = [](const std::vector<std::shared_ptr<JobsEng::JobsItem>> &items) {
auto jobs_function_processing = [](const std::vector<std::shared_ptr<JobsEng::JobsItem>> &items, JobsEng::JobsConfig::ConfigProcessing & /* config */) {
// this functions is defined without the engine params (it is here just for the example)
std::cout << "this function is defined without the engine params, called for " << (int)items[0]->type << "\n";
std::cout << "this function is defined without the engine params, called for " << (int)items[0]->m_type << "\n";
};

//
// config object for priorities, groups, types, threads, timeouts, sleeps, etc
//
JobsEng::JobsConfig config{
.m_engine = {.m_threads_count = 0 /*dont start any thread yet*/, // TODO add thread count for wait_for_children processing and finished, default 1/2 here use 1 due to coalesce
.m_config_prio = {.priorities = {{small::EnumPriorities::kHighest, 2},
{small::EnumPriorities::kHigh, 2},
{small::EnumPriorities::kNormal, 2},
{small::EnumPriorities::kLow, 1}}}}, // overall config with default priorities

.m_default_processing_function = jobs_processing_function, // default processing function, better use jobs.add_default_processing_function to set it

.m_groups = {{JobsGroupType::kJobsGroup12, {.m_threads_count = 1}}, // config by jobs group // TODO add sleep_between_requests
{JobsGroupType::kJobsGroup3, {.m_threads_count = 1}},
{JobsGroupType::kJobsGroupDatabase, {.m_threads_count = 1}},
{JobsGroupType::kJobsGroupCache, {.m_threads_count = 1}}},

.m_types = {{JobsType::kJobsType1, {.m_group = JobsGroupType::kJobsGroup12}}, //
{JobsType::kJobsType2, {.m_group = JobsGroupType::kJobsGroup12}}, //
{JobsType::kJobsType3, {.m_group = JobsGroupType::kJobsGroup3}}, // TODO add timeout for job
{JobsType::kJobsDatabase, {.m_group = JobsGroupType::kJobsGroupDatabase}}, // TODO add timeout for job
{JobsType::kJobsCache, {.m_group = JobsGroupType::kJobsGroupCache}}}}; // TODO add timeout for job
.m_engine = {.m_threads_count = 0, // dont start any thread yet
.m_threads_count_finished = 1, // override how many threads to use for internal processing of finished states
.m_config_prio = {.priorities = {{small::EnumPriorities::kHighest, 2},
{small::EnumPriorities::kHigh, 2},
{small::EnumPriorities::kNormal, 2},
{small::EnumPriorities::kLow, 1}}}}, // overall config with default priorities

.m_default_function_processing = jobs_function_processing, // default processing function, better use jobs.add_default_function_processing to set it

.m_groups = {{JobsGroupType::kJobsGroup12, {.m_threads_count = 1}}, // config by jobs group
{JobsGroupType::kJobsGroup3, {.m_threads_count = 1, .m_delay_next_request = std::chrono::milliseconds(30)}},
{JobsGroupType::kJobsGroupDatabase, {.m_threads_count = 1}}, // these requests will coalesce results for demo purposes
{JobsGroupType::kJobsGroupCache, {.m_threads_count = 0}}}, // no threads !!, these requests are executed outside of jobs engine for demo purposes

.m_types = {{JobsType::kJobsType1, {.m_group = JobsGroupType::kJobsGroup12}},
{JobsType::kJobsType2, {.m_group = JobsGroupType::kJobsGroup12}},
{JobsType::kJobsType3, {.m_group = JobsGroupType::kJobsGroup3, .m_timeout = std::chrono::milliseconds(500)}},
{JobsType::kJobsDatabase, {.m_group = JobsGroupType::kJobsGroupDatabase}},
{JobsType::kJobsCache, {.m_group = JobsGroupType::kJobsGroupCache}}}};

//
// create jobs engine with the above config
//
JobsEng jobs(config);

// TODO add config as param so the sleep after can be overridden
jobs.add_default_processing_function([](auto &j /*this jobs engine*/, const auto &jobs_items) {
// create a cache server (with workers to simulate access to it)
// (as an external engine outside the jobs engine for demo purposes)
small::worker_thread<JobsEng::JobsID> cache_server({.threads_count = 1}, [](auto &w /*this*/, const auto &items) {
// process item using the workers lock (not recommended)

for (auto &i : items) {
std::cout << "thread " << std::this_thread::get_id()
<< " processing cache {" << i << "}" << "\n";

// TODO mark the jobs id associated as succeeded (for demo purposes to avoid creating other structures)
}
// sleep long enough
small::sleep(500);
});

// default processing used for job type 3 with custom delay in between requests
// one request will succeed and one request will timeout for demo purposes
jobs.add_default_function_processing([](auto &j /*this jobs engine*/, const auto &jobs_items, auto &jobs_config) {
for (auto &item : jobs_items) {
std::cout << "thread " << std::this_thread::get_id()
<< " DEFAULT processing "
<< "{"
<< " type=" << (int)item->type
<< " req.int=" << item->request.first << ","
<< " req.str=\"" << item->request.second << "\""
<< " type=" << (int)item->m_type
<< " req.int=" << item->m_request.first << ","
<< " req.str=\"" << item->m_request.second << "\""
<< "}"
<< " ref count " << item.use_count()
<< " time " << small::toISOString(small::timeNow())
<< "\n";
}
small::sleep(30);

// set a custom delay (timeout for job3 is 500 ms)
jobs_config.m_delay_next_request = std::chrono::milliseconds(1000);
});

// TODO add config as param so the sleep after can be overridden
// add specific function for job1 (calling the function from jobs intead of config allows to pass the engine and extra param)
jobs.add_job_processing_function(JobsType::kJobsType1, [](auto &j /*this jobs engine*/, const auto &jobs_items, auto b /*extra param b*/) {
jobs.add_job_function_processing(JobsType::kJobsType1, [](auto &j /*this jobs engine*/, const auto &jobs_items, auto & /* config */, auto b /*extra param b*/) {
for (auto &item : jobs_items) {
std::cout << "thread " << std::this_thread::get_id()
<< " JOB1 processing "
<< "{"
<< " type=" << (int)item->type
<< " req.int=" << item->request.first << ","
<< " req.str=\"" << item->request.second << "\""
<< " type=" << (int)item->m_type
<< " req.int=" << item->m_request.first << ","
<< " req.str=\"" << item->m_request.second << "\""
<< "}"
<< " ref count " << item.use_count()
<< " time " << small::toISOString(small::timeNow())
Expand All @@ -123,18 +141,18 @@ namespace examples::jobs_engine {
small::sleep(30); }, 5 /*param b*/);

// TODO daca as vrea sa folosesc un alt job_server cum modelez asa incat jobul dintr-o parte sa ramana intr-o stare ca si cand ar avea copii si
// TODO sa se face un request in alta parte si ala cand se termina pe finish (sau daca e worker thread in functia de procesare) sa faca set state
// TODO sa se faca un request in alta parte si ala cand se termina pe finish (sau daca e worker thread in functia de procesare) sa faca set state
// TODO set state merge daca e doar o dependinta, daca sunt mai multe atunci ar tb o functie custom - childProcessing (desi are sau nu are children - sau cum fac un dummy children - poate cu thread_count 0?)

// add specific function for job2
jobs.add_job_processing_function(JobsType::kJobsType2, [](auto &j /*this jobs engine*/, const auto &jobs_items) {
jobs.add_job_function_processing(JobsType::kJobsType2, [](auto &j /*this jobs engine*/, const auto &jobs_items, auto & /* config */) {
for (auto &item : jobs_items) {
std::cout << "thread " << std::this_thread::get_id()
<< " JOB2 processing "
<< "{"
<< " type=" << (int)item->type
<< " req.int=" << item->request.first << ","
<< " req.str=\"" << item->request.second << "\""
<< " type=" << (int)item->m_type
<< " req.int=" << item->m_request.first << ","
<< " req.str=\"" << item->m_request.second << "\""
<< "}"
<< " ref count " << item.use_count()
<< " time " << small::toISOString(small::timeNow())
Expand All @@ -158,6 +176,7 @@ namespace examples::jobs_engine {
// show coalesce for children database requests
std::unordered_map<unsigned int, JobsEng::JobsID> web_requests;

// TODO type3 one request will succeed and one request will timeout for demo purposes
// push
jobs.queue().push_back(small::EnumPriorities::kNormal, JobsType::kJobsType1, {1, "normal"}, &jobs_id);
jobs.queue().push_back(small::EnumPriorities::kHigh, JobsType::kJobsType2, {2, "high"}, &jobs_id);
Expand Down
Loading

0 comments on commit 4c438dc

Please sign in to comment.