Skip to content

Commit

Permalink
Refactor jobs engine into multiple files (#28)
Browse files Browse the repository at this point in the history
* Refactor jobs engine into multiple files
  • Loading branch information
herrcristi authored Jan 4, 2025
1 parent 9d0b439 commit e024856
Show file tree
Hide file tree
Showing 9 changed files with 820 additions and 610 deletions.
70 changes: 37 additions & 33 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ Signal exit when we no longer want to use it,

`signal_exit_when_done`

Use it like this
Use it like this (for a more complete example see the [example](examples/examples_jobs_engine.h) )

```
enum class JobsType
Expand All @@ -412,44 +412,48 @@ enum class JobsGroupType
kJobsGroup1
};
...
using JobsRequest = std::pair<int, std::string>;
using JobsResponse = int;
using JobsEng = small::jobs_engine<JobsType, JobsRequest, JobsResponse, JobsGroupType>;
...
JobsEng jobs(
{.threads_count = 0 /*dont start any thread yet*/}, // overall config with default priorities
{.threads_count = 1, .bulk_count = 1}, // default jobs group config
{.group = JobsGroupType::kJobsGroup1}, // default jobs type config
[](auto &j /*this*/, const auto &items) {
for (auto &item : items) {
... // item->
}
using Request = std::pair<int, std::string>;
using JobsEng = small::jobs_engine<JobsType, Request, int /*response*/, JobsGroupType>;
...
JobsEng::JobsConfig config{
.m_engine = {.m_threads_count = 0 /*dont start any thread yet*/ }, // overall config with default priorities
.m_groups = {
{JobsGroupType::kJobsGroup1, {.m_threads_count = 1}}}, // config by jobs group
.m_types = {
{JobsType::kJobsType1, {.m_group = JobsGroupType::kJobsGroup1}},
{JobsType::kJobsType2, {.m_group = JobsGroupType::kJobsGroup1}},
}};
...
// create jobs engine
JobsEng jobs(config);
...
jobs.add_default_processing_function([](auto &j /*this jobs engine*/, const auto &jobs_items) {
for (auto &item : jobs_items) {
...
});
jobs.add_jobs_group(JobsGroupType::kJobsGroup1, {.threads_count = 1});
}
...
});
...
// add specific function for job1
jobs.add_jobs_type(JobsType::kJobsType1, {.group = JobsGroupType::kJobsGroup1}, [](auto &j /*this*/, const auto &items, auto b /*extra param b*/) {
for (auto &item : items) {
... // item->
// add specific function for job1 (calling the function from jobs intead of config allows to pass the engine and extra param)
jobs.add_job_processing_function(JobsType::kJobsType1, [](auto &j /*this jobs engine*/, const auto &jobs_items, auto b /*extra param b*/) {
for (auto &item : jobs_items) {
...
}
}, 5 /*extra param b*/);
// use default config and default function for job2
jobs.add_jobs_type(JobsType::kJobsType2);
}, 5 /*param b*/);
...
JobsEng::JobsID jobs_id{};
std::vector<JobsEng::JobsID> jobs_ids;
jobs.push_back(small::EnumPriorities::kNormal, JobsType::kJobsType1, {1, "normal"}, &jobs_id);
jobs.push_back(small::EnumPriorities::kHigh, JobsType::kJobsType2, {2, "high"}, &jobs_id);
std::vector<JobsEng::JobsItem> jobs_items = {{.type = JobsType::kJobsType1, .request = {7, "highest"}}};
jobs.push_back(small::EnumPriorities::kHighest, jobs_items, &jobs_ids);
jobs.push_back_delay_for(std::chrono::milliseconds(300), small::EnumPriorities::kNormal, JobsType::kJobsType1, {100, "delay normal"}, &jobs_id);
...
// push
jobs.queue().push_back(small::EnumPriorities::kNormal, JobsType::kJobsType1, {1, "normal"}, &jobs_id);
...
std::vector<std::shared_ptr<JobsEng::JobsItem>> jobs_items = {
std::make_shared<JobsEng::JobsItem>(JobsType::kJobsType1, Request{7, "highest"}),
std::make_shared<JobsEng::JobsItem>(JobsType::kJobsType1, Request{8, "highest"}),
};
jobs.queue().push_back(small::EnumPriorities::kHighest, jobs_items, &jobs_ids);
...
jobs.queue().push_back_delay_for(std::chrono::milliseconds(300), small::EnumPriorities::kNormal, JobsType::kJobsType1, {100, "delay normal"}, &jobs_id);
...
jobs.start_threads(3); // manual start threads
...
Expand Down
126 changes: 71 additions & 55 deletions examples/examples_jobs_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,72 +27,86 @@ namespace examples::jobs_engine {
};

using Request = std::pair<int, std::string>;
using JobsEng = small::jobs_engine<JobsType, Request, int, JobsGroupType>;

JobsEng jobs(
{.threads_count = 0 /*dont start any thread yet*/}, // overall config with default priorities
{.threads_count = 1, .bulk_count = 1}, // default jobs group config
{.group = JobsGroupType::kJobsGroup1}, // default jobs type config
[](auto &j /*this*/, const auto &items) {
for (auto &item : items) {
std::cout << "thread " << std::this_thread::get_id()
<< " DEFAULT processing "
<< "{"
<< " type=" << (int)item->type
<< " req.int=" << item->request.first << ","
<< " req.str=\"" << item->request.second << "\""
<< "}"
<< " time " << small::toISOString(small::timeNow())
<< "\n";
}
small::sleep(30);
});

jobs.add_jobs_group(JobsGroupType::kJobsGroup1, {.threads_count = 1});

// add specific function for job1
jobs.add_jobs_type(JobsType::kJobsType1, {.group = JobsGroupType::kJobsGroup1}, [](auto &j /*this*/, const auto &items, auto b /*extra param b*/) {
// process item using the jobs lock (not recommended)
{
std::unique_lock mlock( j );
for (auto &item : items) {
std::cout << "thread " << std::this_thread::get_id()
<< " JOB1 processing "
<< "{"
<< " type=" << (int)item->type
<< " req.int=" << item->request.first << ","
<< " req.str=\"" << item->request.second << "\""
<< "}"
<< " time " << small::toISOString(small::timeNow())
<< "\n";
}
}
small::sleep(30); }, 5 /*param b*/);
using JobsEng = small::jobs_engine<JobsType, Request, int /*response*/, JobsGroupType>;

auto jobs_processing_function = [](const std::vector<std::shared_ptr<JobsEng::JobsItem>> &items) {
// this functions is defined without the engine params (it is here just for the example)
std::cout << "this function is defined without the engine params, called for " << (int)items[0]->type << "\n";
};

// use default config and default function for job2
jobs.add_jobs_type(JobsType::kJobsType2);
JobsEng::JobsConfig config{
.m_engine = {.m_threads_count = 0 /*dont start any thread yet*/,
.m_config_prio = {.priorities = {{small::EnumPriorities::kHighest, 2},
{small::EnumPriorities::kHigh, 2},
{small::EnumPriorities::kNormal, 2},
{small::EnumPriorities::kLow, 1}}}}, // overall config with default priorities
.m_default_processing_function = jobs_processing_function, // default processing function, better use jobs.add_default_processing_function to set it
.m_groups = {
{JobsGroupType::kJobsGroup1, {.m_threads_count = 1}}}, // config by jobs group
.m_types = {
{JobsType::kJobsType1, {.m_group = JobsGroupType::kJobsGroup1}},
{JobsType::kJobsType2, {.m_group = JobsGroupType::kJobsGroup1}},
}};

// create jobs engine
JobsEng jobs(config);

jobs.add_default_processing_function([](auto &j /*this jobs engine*/, const auto &jobs_items) {
for (auto &item : jobs_items) {
std::cout << "thread " << std::this_thread::get_id()
<< " DEFAULT processing "
<< "{"
<< " type=" << (int)item->type
<< " req.int=" << item->request.first << ","
<< " req.str=\"" << item->request.second << "\""
<< "}"
<< " ref count " << item.use_count()
<< " time " << small::toISOString(small::timeNow())
<< "\n";
}
small::sleep(30);
});

// add specific function for job1 (calling the function from jobs intead of config allows to pass the engine and extra param)
jobs.add_job_processing_function(JobsType::kJobsType1, [](auto &j /*this jobs engine*/, const auto &jobs_items, auto b /*extra param b*/) {
for (auto &item : jobs_items) {
std::cout << "thread " << std::this_thread::get_id()
<< " JOB1 processing "
<< "{"
<< " type=" << (int)item->type
<< " req.int=" << item->request.first << ","
<< " req.str=\"" << item->request.second << "\""
<< "}"
<< " ref count " << item.use_count()
<< " time " << small::toISOString(small::timeNow())
<< "\n";
}
small::sleep(30); }, 5 /*param b*/);

JobsEng::JobsID jobs_id{};
std::vector<JobsEng::JobsID> jobs_ids;

// push
jobs.push_back(small::EnumPriorities::kNormal, JobsType::kJobsType1, {1, "normal"}, &jobs_id);
jobs.push_back(small::EnumPriorities::kHigh, JobsType::kJobsType2, {2, "high"}, &jobs_id);
jobs.queue().push_back(small::EnumPriorities::kNormal, JobsType::kJobsType1, {1, "normal"}, &jobs_id);
jobs.queue().push_back(small::EnumPriorities::kHigh, JobsType::kJobsType2, {2, "high"}, &jobs_id);

jobs.push_back(small::EnumPriorities::kNormal, JobsType::kJobsType1, std::make_pair(3, "normal"), &jobs_id);
jobs.push_back(small::EnumPriorities::kHigh, {.type = JobsType::kJobsType1, .request = {4, "high"}}, &jobs_id);
jobs.push_back(small::EnumPriorities::kLow, JobsType::kJobsType1, {5, "low"}, &jobs_id);
jobs.queue().push_back(small::EnumPriorities::kNormal, JobsType::kJobsType1, std::make_pair(3, "normal"), &jobs_id);
jobs.queue().push_back(small::EnumPriorities::kHigh, JobsType::kJobsType1, {4, "high"}, &jobs_id);
jobs.queue().push_back(small::EnumPriorities::kLow, JobsType::kJobsType1, {5, "low"}, &jobs_id);

Request req = {6, "normal"};
jobs.push_back(small::EnumPriorities::kNormal, {.type = JobsType::kJobsType1, .request = req}, nullptr);
jobs.queue().push_back(small::EnumPriorities::kNormal, JobsType::kJobsType1, req, nullptr);

std::vector<JobsEng::JobsItem> jobs_items = {{.type = JobsType::kJobsType1, .request = {7, "highest"}}};
jobs.push_back(small::EnumPriorities::kHighest, jobs_items, &jobs_ids);
jobs.push_back(small::EnumPriorities::kHighest, {{.type = JobsType::kJobsType1, .request = {8, "highest"}}}, &jobs_ids);
std::vector<std::shared_ptr<JobsEng::JobsItem>> jobs_items = {
std::make_shared<JobsEng::JobsItem>(JobsType::kJobsType1, Request{7, "highest"}),
std::make_shared<JobsEng::JobsItem>(JobsType::kJobsType1, Request{8, "highest"}),
};
jobs.queue().push_back(small::EnumPriorities::kHighest, jobs_items, &jobs_ids);
jobs.queue().push_back(small::EnumPriorities::kHighest, {std::make_shared<JobsEng::JobsItem>(JobsType::kJobsType1, Request{9, "highest"})}, &jobs_ids);

jobs.push_back_delay_for(std::chrono::milliseconds(300), small::EnumPriorities::kNormal, JobsType::kJobsType1, {100, "delay normal"}, &jobs_id);
jobs.push_back_delay_until(small::timeNow() + std::chrono::milliseconds(350), small::EnumPriorities::kNormal, JobsType::kJobsType1, {101, "delay normal"}, &jobs_id);
jobs.push_back_delay_for(std::chrono::milliseconds(400), small::EnumPriorities::kNormal, JobsType::kJobsType1, {102, "delay normal"}, &jobs_id);
jobs.queue().push_back_delay_for(std::chrono::milliseconds(300), small::EnumPriorities::kNormal, JobsType::kJobsType1, {100, "delay normal"}, &jobs_id);
jobs.queue().push_back_delay_until(small::timeNow() + std::chrono::milliseconds(350), small::EnumPriorities::kNormal, JobsType::kJobsType1, {101, "delay normal"}, &jobs_id);
jobs.queue().push_back_delay_for(std::chrono::milliseconds(400), small::EnumPriorities::kNormal, JobsType::kJobsType1, {102, "delay normal"}, &jobs_id);

jobs.start_threads(3); // manual start threads

Expand All @@ -102,6 +116,8 @@ namespace examples::jobs_engine {
std::cout << "wait for with timeout, ret = " << static_cast<int>(ret) << " as timeout\n";
jobs.wait(); // wait here for jobs to finish due to exit flag

std::cout << "size = " << jobs.size() << "\n";

std::cout << "Jobs Engine example 1 finish\n\n";

return 0;
Expand Down
2 changes: 1 addition & 1 deletion include/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ namespace small {
//
// class for representing a buffer
//
class buffer : public base_buffer
class buffer : public small::bufferimpl::base_buffer
{
public:
// buffer (allocates in chunks)
Expand Down
4 changes: 2 additions & 2 deletions include/impl/base_buffer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
#include <type_traits>
#include <vector>

namespace small {
namespace small::bufferimpl {
// class for representing a base_buffer that implements
// all the needed functions and operators
// it must be supplied with derived class with proper functions
Expand Down Expand Up @@ -457,4 +457,4 @@ namespace small {
std::size_t m_buffer_length{0};
};

} // namespace small
} // namespace small::bufferimpl
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

#include <unordered_map>

#include "worker_thread.h"
#include "../worker_thread.h"

namespace small {
namespace small::jobsimpl {

//
// helper class for jobs_engine to execute group of jobs (parent caller must implement 'do_action')
Expand Down Expand Up @@ -188,7 +188,7 @@ namespace small {
//
struct JobWorkerThreadFunction
{
void operator()(small::worker_thread<JobGroupT> &, const std::vector<JobGroupT> &items, small::jobs_engine_thread_pool<JobGroupT, ParentCallerT> *pThis) const
void operator()(small::worker_thread<JobGroupT> &, const std::vector<JobGroupT> &items, jobs_engine_thread_pool<JobGroupT, ParentCallerT> *pThis) const
{
pThis->thread_function(items);
}
Expand All @@ -198,4 +198,4 @@ namespace small {
small::worker_thread<JobGroupT> m_workers{{.threads_count = 0}, JobWorkerThreadFunction(), this};
ParentCallerT &m_parent_caller; // parent jobs engine
};
} // namespace small
} // namespace small::jobsimpl
103 changes: 103 additions & 0 deletions include/impl/jobs_item_impl.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
#pragma once

#include <atomic>
#include <deque>
#include <functional>
#include <queue>
#include <unordered_map>
#include <vector>

#include "../base_lock.h"

namespace small::jobsimpl {
// a job can be in the following states
enum class EnumJobsState : unsigned int
{
kNone = 0,
kInProgress,
kFinished,
kFailed,
kCancelled,
kTimeout
};

// a job item
template <typename JobsTypeT, typename JobsRequestT, typename JobsResponseT>
struct jobs_item
{
using JobsID = unsigned long long;

JobsID id{}; // job unique id
JobsTypeT type{}; // job type
std::atomic<EnumJobsState> state{EnumJobsState::kNone}; // job state
std::atomic<int> progress{}; // progress 0-100 for state kInProgress
JobsRequestT request{}; // request needed for processing function
JobsResponseT response{}; // where the results are saved (for the finished callback if exists)

explicit jobs_item() = default;
explicit jobs_item(const JobsID &jobs_id, const JobsTypeT &jobs_type, const JobsRequestT &jobs_request)
: id(jobs_id), type(jobs_type), request(jobs_request) {}
explicit jobs_item(const JobsTypeT &jobs_type, const JobsRequestT &jobs_request)
: type(jobs_type), request(jobs_request) {}
explicit jobs_item(const JobsID &jobs_id, const JobsTypeT &jobs_type, JobsRequestT &&jobs_request)
: id(jobs_id), type(jobs_type), request(std::forward<JobsRequestT>(jobs_request)) {}
explicit jobs_item(const JobsTypeT &jobs_type, JobsRequestT &&jobs_request)
: type(jobs_type), request(std::forward<JobsRequestT>(jobs_request)) {}

jobs_item(const jobs_item &other) { operator=(other); };
jobs_item(jobs_item &&other) noexcept { operator=(other); };
jobs_item &operator=(const jobs_item &other)
{
id = other.id;
type = other.type;
state = other.state.load();
progress = other.progress.load();
request = other.request;
response = other.response;
return *this;
}
jobs_item &operator=(jobs_item &&other) noexcept
{
id = std::move(other.id);
type = std::move(other.type);
state = other.state.load();
progress = other.progress.load();
request = std::move(other.request);
response = std::move(other.response);
return *this;
}

//
// set job state (can only go from lower to upper state)
//
inline void set_state(const EnumJobsState &new_state)
{
for (;;) {
EnumJobsState current_state = state.load();
if (current_state >= new_state) {
return;
}
if (state.compare_exchange_weak(current_state, new_state)) {
return;
}
}
}

//
// set job progress (can only increase)
//
inline void set_progress(const int &new_progress)
{
for (;;) {
int current_progress = progress.load();
if (current_progress >= new_progress) {
return;
}
if (progress.compare_exchange_weak(current_progress, new_progress)) {
return;
}
}
}
};

} // namespace small::jobsimpl
Loading

0 comments on commit e024856

Please sign in to comment.