Skip to content

Commit

Permalink
Refactor jobs engine (#27)
Browse files Browse the repository at this point in the history
* Refactor jobs engine
  • Loading branch information
herrcristi authored Dec 28, 2024
1 parent 60fa286 commit 9d0b439
Show file tree
Hide file tree
Showing 9 changed files with 545 additions and 1,334 deletions.
3 changes: 3 additions & 0 deletions .clang-format
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ AlignConsecutiveDeclarations: true
AlignConsecutiveAssignments:
Enabled: true
AlignCompound: true
AlignFunctionDeclarations: true
AlignConsecutiveBitFields: true
AlignConsecutiveMacros: true
BraceWrapping:
AfterEnum: true
AfterStruct: true
Expand Down
156 changes: 46 additions & 110 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ This can be used in following ways:
- lock_queue (thread safe queue with waiting mechanism to be used in concurrent environment)
- time_queue (thread safe queue for delay requests)
- prio_queue (thread safe queue for requests with priority like high, normal, low, etc)
- group_queue (thread safe queue to group items that have a type, priority and info, more types will be grouped to use same queue)

- worker_thread (creates workers on separate threads that do task when requested, based on lock_queue and time_queue)

Expand Down Expand Up @@ -285,78 +284,6 @@ q.signal_exit_force(); // q.signal_exit_when_done();

#

### group_queue

A queue for grouping items that have a type, priority and info (where more types will be grouped to use same queue)

Works with any user priorities and by default small::EnumPriorities are used (kHighest, kHigh, kNormal, kLow, kLowest)

The following functions are available

For container

`size, empty, clear, reset`

Add type elements with priority into queue (the queue must be setup initially to associate the group for a type)

`add_type_group`

`push_back, emplace_back`

For events or locking

`lock, unlock, try_lock`

Wait for items in the group queue

`wait_pop_front, wait_pop_front_for, wait_pop_front_until`

Wait for queue to become empty

`wait`, `wait_for`, `wait_until`

Signal exit when we no longer want to use the queue

`signal_exit_force, is_exit_force` // exit immediatly ignoring what is left in the queue

`signal_exit_when_done, is_exit_when_done` // exit when queue is empty, after this flag is set no more items can be pushed in the queue

Use it like this

```
enum Type {
kType1
};
enum GroupType {
kGroup1
};
small::group_queue<Type, int, GroupType> q;
q.add_type_group( Type::kType1, GroupType::kGroup1 ); // set the group for the type
...
q.push_back( small::EnumPriorities::kNormal, Type::kType1, 1 );
...
// on some thread
std::pair<Type, int> e{};
auto ret = q.wait_pop_front( GroupType::kGroup1, &e );
// or wait_pop_front_for( std::chrono::minutes( 1 ), GroupType::kGroup1, &e );
// ret can be small::EnumLock::kExit, small::EnumLock::kTimeout or ret == small::EnumLock::kElement
if ( ret == small::EnumLock::kElement )
{
// do something with e
...
}
...
// on main thread no more processing (aborting work)
q.signal_exit_force(); // q.signal_exit_when_done()
...
// make sure that all calls to wait_* are finished before calling destructor (like it is done in worker_thread)
```

#

### worker_thread

A class that creates several threads for producer/consumer
Expand Down Expand Up @@ -448,19 +375,19 @@ workers.signal_exit_force(); // workers.signal_exit_when_done();
A class that process different jobs type using the same thread pool

Every job is defined by type, group, priority and request.
Multiple job type can be grouped together under one group, and if 1 thread will serve that group all that job type requests will actually behave like serialized.
Multiple job type can be grouped together under one group (if 1 thread will serve that group all that job type requests will actually behave like serialized.)

Every type has a config associated specifying how many threads to use from the pool, how many for bulk processing, etc
Every group has a config associated specifying how many threads to use from the pool, how many for bulk processing, etc

The following functions are available

For data

`size, empty, clear`

`push_back, emplace_back`
`push_back`

`push_back_delay_for`, `push_back_delay_until`, `emplace_back_delay_for`, `emplace_back_delay_until`
`push_back_delay_for, push_back_delay_until`

To use it as a locker

Expand All @@ -475,52 +402,61 @@ Signal exit when we no longer want to use it,
Use it like this

```
using qc = std::pair<int, std::string>;
...
enum JobType
enum class JobsType
{
job1,
job2
kJobsType1,
kJobsType2
};
enum class JobsGroupType
{
kJobsGroup1
};
...
small::jobs_engine<JobType, qc> jobs(
{.threads_count = 0 /*dont start any thread yet*/}, // global config
{.threads_count = 1, .bulk_count = 1}, // default job group config
{.group = JobType::job1}, // default job type config
[](auto &j /*this*/, const auto job_type, const auto &items) { // default processing function
...
for (auto &[i, s] : items) {
...
using JobsRequest = std::pair<int, std::string>;
using JobsResponse = int;
using JobsEng = small::jobs_engine<JobsType, JobsRequest, JobsResponse, JobsGroupType>;
...
JobsEng jobs(
{.threads_count = 0 /*dont start any thread yet*/}, // overall config with default priorities
{.threads_count = 1, .bulk_count = 1}, // default jobs group config
{.group = JobsGroupType::kJobsGroup1}, // default jobs type config
[](auto &j /*this*/, const auto &items) {
for (auto &item : items) {
... // item->
}
...
});
...
jobs.add_job_group(JobType::job1); // will use default group config
jobs.add_jobs_group(JobsGroupType::kJobsGroup1, {.threads_count = 1});
...
// add specific function for job1
jobs.add_job_type(JobType::job1, {.group = JobType::job1}, [](auto &j /*this*/, const auto job_type, const auto &items, auto b /*extra param b*/) {
...
for(auto &[i, s]:items){
...
jobs.add_jobs_type(JobsType::kJobsType1, {.group = JobsGroupType::kJobsGroup1}, [](auto &j /*this*/, const auto &items, auto b /*extra param b*/) {
for (auto &item : items) {
... // item->
}
...
}, 5 /*param b*/);
...
// use default config and default processing function for job2
// add job2 with default config and default processing function
jobs.add_job_type(JobType::job2);
// manually start thread pool with 3 threads
jobs.start_threads(3);
}, 5 /*extra param b*/);
// use default config and default function for job2
jobs.add_jobs_type(JobsType::kJobsType2);
...
JobsEng::JobsID jobs_id{};
std::vector<JobsEng::JobsID> jobs_ids;
// push jobs with different priorities and types
jobs.push_back(small::EnumPriorities::kNormal, JobType::job1, {1, "a"});
jobs.push_back(small::EnumPriorities::kHigh, JobType::job2, {2, "b"});
jobs.push_back(small::EnumPriorities::kNormal, JobsType::kJobsType1, {1, "normal"}, &jobs_id);
jobs.push_back(small::EnumPriorities::kHigh, JobsType::kJobsType2, {2, "high"}, &jobs_id);
// wait for all jobs to complete with no timeout
auto ret = jobs.wait_for(std::chrono::milliseconds(0));
std::vector<JobsEng::JobsItem> jobs_items = {{.type = JobsType::kJobsType1, .request = {7, "highest"}}};
jobs.push_back(small::EnumPriorities::kHighest, jobs_items, &jobs_ids);
jobs.wait(); // wait here for jobs to finish, it sets the flag exit_when_done and no more items can be pushed
jobs.push_back_delay_for(std::chrono::milliseconds(300), small::EnumPriorities::kNormal, JobsType::kJobsType1, {100, "delay normal"}, &jobs_id);
...
jobs.start_threads(3); // manual start threads
...
// jobs.signal_exit_force();
auto ret = jobs.wait_for(std::chrono::milliseconds(100)); // wait to finished
...
jobs.wait(); // wait here for jobs to finish due to exit flag
...
```

Expand Down
51 changes: 0 additions & 51 deletions examples/examples_group_queue.h

This file was deleted.

83 changes: 54 additions & 29 deletions examples/examples_jobs_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,60 +16,85 @@ namespace examples::jobs_engine {
{
std::cout << "Jobs Engine example 1\n";

using qc = std::pair<int, std::string>;

enum JobType
enum class JobsType
{
kJobsType1,
kJobsType2
};
enum class JobsGroupType
{
job1,
job2
kJobsGroup1
};

small::jobs_engine<JobType, qc> jobs(
{.threads_count = 0 /*dont start any thread yet*/},
{.threads_count = 1, .bulk_count = 1},
{.group = JobType::job1},
[](auto &j /*this*/, const auto job_type, const auto &items) {
for (auto &[i, s] : items) {
using Request = std::pair<int, std::string>;
using JobsEng = small::jobs_engine<JobsType, Request, int, JobsGroupType>;

JobsEng jobs(
{.threads_count = 0 /*dont start any thread yet*/}, // overall config with default priorities
{.threads_count = 1, .bulk_count = 1}, // default jobs group config
{.group = JobsGroupType::kJobsGroup1}, // default jobs type config
[](auto &j /*this*/, const auto &items) {
for (auto &item : items) {
std::cout << "thread " << std::this_thread::get_id()
<< " default processing type " << job_type << " {" << i << ", \"" << s << "\"}"
<< " DEFAULT processing "
<< "{"
<< " type=" << (int)item->type
<< " req.int=" << item->request.first << ","
<< " req.str=\"" << item->request.second << "\""
<< "}"
<< " time " << small::toISOString(small::timeNow())
<< "\n";
}
small::sleep(30);
});

jobs.add_job_group(JobType::job1, {.threads_count = 1});
jobs.add_jobs_group(JobsGroupType::kJobsGroup1, {.threads_count = 1});

// add specific function for job1
jobs.add_job_type(JobType::job1, {.group = JobType::job1}, [](auto &j /*this*/, const auto job_type, const auto &items, auto b /*extra param b*/) {
jobs.add_jobs_type(JobsType::kJobsType1, {.group = JobsGroupType::kJobsGroup1}, [](auto &j /*this*/, const auto &items, auto b /*extra param b*/) {
// process item using the jobs lock (not recommended)
{
std::unique_lock mlock( j );
for(auto &[i, s]:items){
std::cout << "thread " << std::this_thread::get_id()
<< " specific processing type " << job_type << " {" << i << ", \"" << s << "\"} and b=" << b
for (auto &item : items) {
std::cout << "thread " << std::this_thread::get_id()
<< " JOB1 processing "
<< "{"
<< " type=" << (int)item->type
<< " req.int=" << item->request.first << ","
<< " req.str=\"" << item->request.second << "\""
<< "}"
<< " time " << small::toISOString(small::timeNow())
<< "\n";
}
}
small::sleep(30); }, 5 /*param b*/);

// use default config and default function for job2
jobs.add_job_type(JobType::job2);
jobs.add_jobs_type(JobsType::kJobsType2);

jobs.start_threads(3); // manual start threads
JobsEng::JobsID jobs_id{};
std::vector<JobsEng::JobsID> jobs_ids;

// push
jobs.push_back(small::EnumPriorities::kNormal, JobType::job1, {1, "a"});
jobs.push_back(small::EnumPriorities::kHigh, JobType::job2, {2, "b"});

jobs.push_back(small::EnumPriorities::kNormal, JobType::job1, std::make_pair(3, "c"));
jobs.emplace_back(small::EnumPriorities::kHigh, JobType::job1, 4, "d");
jobs.emplace_back(small::EnumPriorities::kLow, JobType::job1, 5, "e");
jobs.emplace_back(small::EnumPriorities::kNormal, JobType::job1, 6, "f");
jobs.emplace_back_delay_for(std::chrono::milliseconds(300), small::EnumPriorities::kNormal, JobType::job1, 7, "g");
jobs.emplace_back_delay_until(small::timeNow() + std::chrono::milliseconds(350), small::EnumPriorities::kNormal, JobType::job1, 8, "h");
jobs.push_back_delay_for(std::chrono::milliseconds(400), small::EnumPriorities::kNormal, JobType::job1, {9, "i"});
jobs.push_back(small::EnumPriorities::kNormal, JobsType::kJobsType1, {1, "normal"}, &jobs_id);
jobs.push_back(small::EnumPriorities::kHigh, JobsType::kJobsType2, {2, "high"}, &jobs_id);

jobs.push_back(small::EnumPriorities::kNormal, JobsType::kJobsType1, std::make_pair(3, "normal"), &jobs_id);
jobs.push_back(small::EnumPriorities::kHigh, {.type = JobsType::kJobsType1, .request = {4, "high"}}, &jobs_id);
jobs.push_back(small::EnumPriorities::kLow, JobsType::kJobsType1, {5, "low"}, &jobs_id);

Request req = {6, "normal"};
jobs.push_back(small::EnumPriorities::kNormal, {.type = JobsType::kJobsType1, .request = req}, nullptr);

std::vector<JobsEng::JobsItem> jobs_items = {{.type = JobsType::kJobsType1, .request = {7, "highest"}}};
jobs.push_back(small::EnumPriorities::kHighest, jobs_items, &jobs_ids);
jobs.push_back(small::EnumPriorities::kHighest, {{.type = JobsType::kJobsType1, .request = {8, "highest"}}}, &jobs_ids);

jobs.push_back_delay_for(std::chrono::milliseconds(300), small::EnumPriorities::kNormal, JobsType::kJobsType1, {100, "delay normal"}, &jobs_id);
jobs.push_back_delay_until(small::timeNow() + std::chrono::milliseconds(350), small::EnumPriorities::kNormal, JobsType::kJobsType1, {101, "delay normal"}, &jobs_id);
jobs.push_back_delay_for(std::chrono::milliseconds(400), small::EnumPriorities::kNormal, JobsType::kJobsType1, {102, "delay normal"}, &jobs_id);

jobs.start_threads(3); // manual start threads

small::sleep(50);
// jobs.signal_exit_force();
Expand Down
Loading

0 comments on commit 9d0b439

Please sign in to comment.