Skip to content

Commit

Permalink
Refactor jobs engine into multiple files
Browse files Browse the repository at this point in the history
  • Loading branch information
herrcristi committed Jan 4, 2025
1 parent ab4ad31 commit a13f9ec
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 157 deletions.
15 changes: 10 additions & 5 deletions examples/examples_jobs_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ namespace examples::jobs_engine {
using Request = std::pair<int, std::string>;
using JobsEng = small::jobs_engine<JobsType, Request, int /*response*/, JobsGroupType>;

auto jobs_processing_function = [](const std::vector<JobsEng::JobsItem *> &items) {
auto jobs_processing_function = [](const std::vector<std::shared_ptr<JobsEng::JobsItem>> &items) {
// this functions is defined without the engine params (it is here just for the example)
std::cout << "this function is defined without the engine params, called for " << (int)items[0]->type << "\n";
};
Expand Down Expand Up @@ -60,6 +60,7 @@ namespace examples::jobs_engine {
<< " req.int=" << item->request.first << ","
<< " req.str=\"" << item->request.second << "\""
<< "}"
<< " ref count " << item.use_count()
<< " time " << small::toISOString(small::timeNow())
<< "\n";
}
Expand All @@ -76,6 +77,7 @@ namespace examples::jobs_engine {
<< " req.int=" << item->request.first << ","
<< " req.str=\"" << item->request.second << "\""
<< "}"
<< " ref count " << item.use_count()
<< " time " << small::toISOString(small::timeNow())
<< "\n";
}
Expand All @@ -89,15 +91,18 @@ namespace examples::jobs_engine {
jobs.queue().push_back(small::EnumPriorities::kHigh, JobsType::kJobsType2, {2, "high"}, &jobs_id);

jobs.queue().push_back(small::EnumPriorities::kNormal, JobsType::kJobsType1, std::make_pair(3, "normal"), &jobs_id);
jobs.queue().push_back(small::EnumPriorities::kHigh, {.type = JobsType::kJobsType1, .request = {4, "high"}}, &jobs_id);
jobs.queue().push_back(small::EnumPriorities::kHigh, JobsType::kJobsType1, {4, "high"}, &jobs_id);
jobs.queue().push_back(small::EnumPriorities::kLow, JobsType::kJobsType1, {5, "low"}, &jobs_id);

Request req = {6, "normal"};
jobs.queue().push_back(small::EnumPriorities::kNormal, {.type = JobsType::kJobsType1, .request = req}, nullptr);
jobs.queue().push_back(small::EnumPriorities::kNormal, JobsType::kJobsType1, req, nullptr);

std::vector<JobsEng::JobsItem> jobs_items = {{.type = JobsType::kJobsType1, .request = {7, "highest"}}};
std::vector<std::shared_ptr<JobsEng::JobsItem>> jobs_items = {
std::make_shared<JobsEng::JobsItem>(JobsType::kJobsType1, Request{7, "highest"}),
std::make_shared<JobsEng::JobsItem>(JobsType::kJobsType1, Request{8, "highest"}),
};
jobs.queue().push_back(small::EnumPriorities::kHighest, jobs_items, &jobs_ids);
jobs.queue().push_back(small::EnumPriorities::kHighest, {{.type = JobsType::kJobsType1, .request = {8, "highest"}}}, &jobs_ids);
jobs.queue().push_back(small::EnumPriorities::kHighest, {std::make_shared<JobsEng::JobsItem>(JobsType::kJobsType1, Request{9, "highest"})}, &jobs_ids);

jobs.queue().push_back_delay_for(std::chrono::milliseconds(300), small::EnumPriorities::kNormal, JobsType::kJobsType1, {100, "delay normal"}, &jobs_id);
jobs.queue().push_back_delay_until(small::timeNow() + std::chrono::milliseconds(350), small::EnumPriorities::kNormal, JobsType::kJobsType1, {101, "delay normal"}, &jobs_id);
Expand Down
106 changes: 56 additions & 50 deletions include/impl/jobs_item_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,70 +27,76 @@ namespace small::jobsimpl {
{
using JobsID = unsigned long long;

JobsID id{}; // job unique id
JobsTypeT type{}; // job type
// std::atomic<EnumJobsState> state{}; // job state
// std::atomic<int> progress{}; // progress 0-100 for state kInProgress
EnumJobsState state{}; // job state
int progress{}; // progress 0-100 for state kInProgress
JobsRequestT request{}; // request needed for processing function
JobsResponseT response{}; // where the results are saved (for the finished callback if exists)
JobsID id{}; // job unique id
JobsTypeT type{}; // job type
std::atomic<EnumJobsState> state{EnumJobsState::kNone}; // job state
std::atomic<int> progress{}; // progress 0-100 for state kInProgress
JobsRequestT request{}; // request needed for processing function
JobsResponseT response{}; // where the results are saved (for the finished callback if exists)

// explicit jobs_item() = default;
// explicit jobs_item(std::initializer_list<jobs_item>) {};
// jobs_item(const jobs_item &other) { operator=(other); };
// jobs_item(jobs_item &&other) { operator=(other); };
// jobs_item &operator=(const jobs_item &other)
// {
// id = other.id;
// type = other.type;
// state = other.state.load();
// progress = other.progress.load();
// request = other.request;
// response = other.response;
// return *this;
// }
// jobs_item &operator=(jobs_item &&other)
// {
// id = std::move(other.id);
// type = std::move(other.type);
// state = other.state.load();
// progress = other.progress.load();
// request = std::move(other.request);
// response = std::move(other.response);
// return *this;
// }
explicit jobs_item() = default;
explicit jobs_item(const JobsID &jobs_id, const JobsTypeT &jobs_type, const JobsRequestT &jobs_request)
: id(jobs_id), type(jobs_type), request(jobs_request) {}
explicit jobs_item(const JobsTypeT &jobs_type, const JobsRequestT &jobs_request)
: type(jobs_type), request(jobs_request) {}
explicit jobs_item(const JobsID &jobs_id, const JobsTypeT &jobs_type, JobsRequestT &&jobs_request)
: id(jobs_id), type(jobs_type), request(std::forward<JobsRequestT>(jobs_request)) {}
explicit jobs_item(const JobsTypeT &jobs_type, JobsRequestT &&jobs_request)
: type(jobs_type), request(std::forward<JobsRequestT>(jobs_request)) {}

jobs_item(const jobs_item &other) { operator=(other); };
jobs_item(jobs_item &&other) noexcept { operator=(other); };
jobs_item &operator=(const jobs_item &other)
{
id = other.id;
type = other.type;
state = other.state.load();
progress = other.progress.load();
request = other.request;
response = other.response;
return *this;
}
jobs_item &operator=(jobs_item &&other) noexcept
{
id = std::move(other.id);
type = std::move(other.type);
state = other.state.load();
progress = other.progress.load();
request = std::move(other.request);
response = std::move(other.response);
return *this;
}

//
// set job state (can only go from lower to upper state)
//
inline void set_state(const EnumJobsState &new_state)
{
// for (;;) {
// EnumJobsState current_state = state.load();
// if (current_state >= new_state) {
// return;
// }
// if (state.compare_exchange_weak(current_state, new_state)) {
// return;
// }
// }
for (;;) {
EnumJobsState current_state = state.load();
if (current_state >= new_state) {
return;
}
if (state.compare_exchange_weak(current_state, new_state)) {
return;
}
}
}

//
// set job progress (can only increase)
//
inline void set_progress(const int &new_progress)
{
// for (;;) {
// int current_progress = progress.load();
// if (current_progress >= new_progress) {
// return;
// }
// if (progress.compare_exchange_weak(current_progress, new_progress)) {
// return;
// }
// }
for (;;) {
int current_progress = progress.load();
if (current_progress >= new_progress) {
return;
}
if (progress.compare_exchange_weak(current_progress, new_progress)) {
return;
}
}
}
};

Expand Down
124 changes: 25 additions & 99 deletions include/impl/jobs_queue_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,10 @@ namespace small::jobsimpl {
//
inline std::size_t push_back(const JobsPrioT &priority, const JobsTypeT &jobs_type, const JobsRequestT &job_req, JobsID *jobs_id = nullptr)
{
return push_back(priority, {.type = jobs_type, .request = job_req}, jobs_id);
return push_back(priority, std::make_shared<JobsItem>(jobs_type, job_req), jobs_id);
}

inline std::size_t push_back(const JobsPrioT &priority, const JobsItem &jobs_item, JobsID *jobs_id = nullptr)
inline std::size_t push_back(const JobsPrioT &priority, std::shared_ptr<JobsItem> jobs_item, JobsID *jobs_id = nullptr)
{
if (is_exit()) {
return 0;
Expand All @@ -123,10 +123,10 @@ namespace small::jobsimpl {
*jobs_id = id;
}

return jobs_activate(priority, jobs_item.type, id);
return jobs_activate(priority, jobs_item->type, id);
}

inline std::size_t push_back(const JobsPrioT &priority, const std::vector<JobsItem> &jobs_items, std::vector<JobsID> *jobs_ids)
inline std::size_t push_back(const JobsPrioT &priority, const std::vector<std::shared_ptr<JobsItem>> &jobs_items, std::vector<JobsID> *jobs_ids)
{
if (is_exit()) {
return 0;
Expand Down Expand Up @@ -155,47 +155,7 @@ namespace small::jobsimpl {
// push_back move semantics
inline std::size_t push_back(const JobsPrioT &priority, const JobsTypeT &jobs_type, JobsRequestT &&jobs_req, JobsID *jobs_id = nullptr)
{
return push_back(priority, {.type = jobs_type, .request = std::forward<JobsRequestT>(jobs_req)}, jobs_id);
}

inline std::size_t push_back(const JobsPrioT &priority, JobsItem &&jobs_item, JobsID *jobs_id = nullptr)
{
if (is_exit()) {
return 0;
}
auto jobs_type = jobs_item.type; // save the type because the object will be moved
auto id = jobs_add(std::forward<JobsItem>(jobs_item));
if (jobs_id) {
*jobs_id = id;
}

return jobs_activate(priority, jobs_type, id);
}

inline std::size_t push_back(const JobsPrioT &priority, std::vector<JobsItem> &&jobs_items, std::vector<JobsID> *jobs_ids)
{
if (is_exit()) {
return 0;
}

std::unique_lock l(m_lock);

std::size_t count = 0;
if (jobs_ids) {
jobs_ids->reserve(jobs_items.size());
jobs_ids->clear();
}
JobsID jobs_id{};
for (auto &&jobs_item : jobs_items) {
auto ret = push_back(priority, std::forward<JobsItem>(jobs_item), &jobs_id);
if (ret) {
if (jobs_ids) {
jobs_ids->push_back(jobs_id);
}
}
count += ret;
}
return count;
return push_back(priority, std::make_shared<JobsItem>(jobs_type, std::forward<JobsRequestT>(jobs_req)), jobs_id);
}

// no emplace_back do to returning the jobs_id
Expand All @@ -206,64 +166,43 @@ namespace small::jobsimpl {
template <typename _Rep, typename _Period>
inline std::size_t push_back_delay_for(const std::chrono::duration<_Rep, _Period> &__rtime, const JobsPrioT &priority, const JobsTypeT &jobs_type, const JobsRequestT &jobs_req, JobsID *jobs_id = nullptr)
{
return push_back_delay_for(__rtime, priority, {.type = jobs_type, .request = jobs_req}, jobs_id);
return push_back_delay_for(__rtime, priority, std::make_shared<JobsItem>(jobs_type, jobs_req), jobs_id);
}

template <typename _Rep, typename _Period>
inline std::size_t push_back_delay_for(const std::chrono::duration<_Rep, _Period> &__rtime, const JobsPrioT &priority, const JobsItem &jobs_item, JobsID *jobs_id = nullptr)
inline std::size_t push_back_delay_for(const std::chrono::duration<_Rep, _Period> &__rtime, const JobsPrioT &priority, std::shared_ptr<JobsItem> jobs_item, JobsID *jobs_id = nullptr)
{
auto id = jobs_add(jobs_item);
if (jobs_id) {
*jobs_id = id;
}
return m_delayed_items.queue().push_delay_for(__rtime, {priority, jobs_item.type, id});
return m_delayed_items.queue().push_delay_for(__rtime, {priority, jobs_item->type, id});
}

template <typename _Rep, typename _Period>
inline std::size_t push_back_delay_for(const std::chrono::duration<_Rep, _Period> &__rtime, const JobsPrioT &priority, const JobsTypeT &jobs_type, JobsRequestT &&jobs_req, JobsID *jobs_id = nullptr)
{
return push_back_delay_for(__rtime, priority, {.type = jobs_type, .request = std::forward<JobsRequestT>(jobs_req)}, jobs_id);
}

template <typename _Rep, typename _Period>
inline std::size_t push_back_delay_for(const std::chrono::duration<_Rep, _Period> &__rtime, const JobsPrioT &priority, JobsItem &&jobs_item, JobsID *jobs_id = nullptr)
{
auto jobs_type = jobs_item.type; // save the type because the object will be moved
auto id = jobs_add(std::forward<JobsItem>(jobs_item));
if (jobs_id) {
*jobs_id = id;
}
return m_delayed_items.queue().push_delay_for(__rtime, {priority, jobs_type, id});
return push_back_delay_for(__rtime, priority, std::make_shared<JobsItem>(jobs_type, std::forward<JobsRequestT>(jobs_req)), jobs_id);
}

// avoid time_casting from one clock to another // template <typename _Clock, typename _Duration> //
inline std::size_t push_back_delay_until(const std::chrono::time_point<TimeClock, TimeDuration> &__atime, const JobsPrioT &priority, const JobsTypeT &jobs_type, const JobsRequestT &jobs_req, JobsID *jobs_id = nullptr)
{
return push_back_delay_until(__atime, priority, {.type = jobs_type, .request = jobs_req}, jobs_id);
return push_back_delay_until(__atime, priority, std::make_shared<JobsItem>(jobs_type, jobs_req), jobs_id);
}

inline std::size_t push_back_delay_until(const std::chrono::time_point<TimeClock, TimeDuration> &__atime, const JobsPrioT &priority, const JobsItem &jobs_item, JobsID *jobs_id = nullptr)
inline std::size_t push_back_delay_until(const std::chrono::time_point<TimeClock, TimeDuration> &__atime, const JobsPrioT &priority, std::shared_ptr<JobsItem> jobs_item, JobsID *jobs_id = nullptr)
{
auto id = jobs_add(jobs_item);
if (jobs_id) {
*jobs_id = id;
}
return m_delayed_items.queue().push_delay_until(__atime, {priority, jobs_item.type, id});
return m_delayed_items.queue().push_delay_until(__atime, {priority, jobs_item->type, id});
}

inline std::size_t push_back_delay_until(const std::chrono::time_point<TimeClock, TimeDuration> &__atime, const JobsPrioT &priority, const JobsTypeT &jobs_type, JobsRequestT &&jobs_req, JobsID *jobs_id = nullptr)
{
return push_back_delay_until(__atime, priority, {.type = jobs_type, .request = std::forward<JobsRequestT>(jobs_req)}, jobs_id);
}

inline std::size_t push_back_delay_until(const std::chrono::time_point<TimeClock, TimeDuration> &__atime, const JobsPrioT &priority, JobsItem &&jobs_item, JobsID *jobs_id = nullptr)
{
auto jobs_type = jobs_item.type; // save the type because the object will be moved
auto id = jobs_add(std::forward<JobsItem>(jobs_item));
if (jobs_id) {
*jobs_id = id;
}
return m_delayed_items.queue().push_delay_until(__atime, {priority, jobs_type, id});
return push_back_delay_until(__atime, priority, std::make_shared<JobsItem>(jobs_type, std::forward<JobsRequestT>(jobs_req)), jobs_id);
}

// clang-format off
Expand Down Expand Up @@ -356,9 +295,9 @@ namespace small::jobsimpl {
return it != m_groups_queues.end() ? &it->second : nullptr;
}

inline std::vector<JobsItem *> jobs_get(const std::vector<JobsID> &jobs_ids)
inline std::vector<std::shared_ptr<JobsItem>> jobs_get(const std::vector<JobsID> &jobs_ids)
{
std::vector<JobsItem *> jobs_items;
std::vector<std::shared_ptr<JobsItem>> jobs_items;
jobs_items.reserve(jobs_ids.size());

std::unique_lock l(m_lock);
Expand All @@ -368,10 +307,10 @@ namespace small::jobsimpl {
if (it_j == m_jobs.end()) {
continue;
}
jobs_items.push_back(&it_j->second);
jobs_items.push_back(it_j->second);
}

return jobs_items;
return jobs_items; // will be moved
}

inline void jobs_del(const JobsID &jobs_id)
Expand All @@ -391,28 +330,15 @@ namespace small::jobsimpl {
//
// add job items
//
inline JobsID jobs_add(const JobsItem &jobs_item)
inline JobsID jobs_add(std::shared_ptr<JobsItem> jobs_item)
{
std::unique_lock l(m_lock);

++m_jobs_seq_id;

JobsID id = m_jobs_seq_id;
m_jobs[id] = jobs_item;
m_jobs[id].id = id;

return id;
}

inline JobsID jobs_add(JobsItem &&jobs_item)
{
std::unique_lock l(m_lock);

++m_jobs_seq_id;

JobsID id = m_jobs_seq_id;
jobs_item.id = id;
m_jobs.emplace(id, std::forward<JobsItem>(jobs_item));
jobs_item->id = id;
m_jobs.emplace(id, jobs_item);

return id;
}
Expand Down Expand Up @@ -456,11 +382,11 @@ namespace small::jobsimpl {
//
// members
//
mutable small::base_lock m_lock; // global locker
std::atomic<JobsID> m_jobs_seq_id{}; // to get the next jobs id
std::unordered_map<JobsID, JobsItem> m_jobs; // current jobs
std::unordered_map<JobsGroupT, JobsQueue> m_groups_queues; // map of queues by group
std::unordered_map<JobsTypeT, JobsQueue *> m_types_queues; // optimize to have queues by type (which reference queues by group)
mutable small::base_lock m_lock; // global locker
std::atomic<JobsID> m_jobs_seq_id{}; // to get the next jobs id
std::unordered_map<JobsID, std::shared_ptr<JobsItem>> m_jobs; // current jobs
std::unordered_map<JobsGroupT, JobsQueue> m_groups_queues; // map of queues by group
std::unordered_map<JobsTypeT, JobsQueue *> m_types_queues; // optimize to have queues by type (which reference queues by group)

JobQueueDelayedT m_delayed_items{*this}; // queue of delayed items

Expand Down
Loading

0 comments on commit a13f9ec

Please sign in to comment.