From a13f9ec78e82bc4b136795a4ad1460ca3448dbe5 Mon Sep 17 00:00:00 2001 From: Cristian Herghelegiu Date: Sat, 4 Jan 2025 20:08:24 +0200 Subject: [PATCH] Refactor jobs engine into multiple files --- examples/examples_jobs_engine.h | 15 ++-- include/impl/jobs_item_impl.h | 106 ++++++++++++++------------- include/impl/jobs_queue_impl.h | 124 +++++++------------------------- include/jobs_config.h | 2 +- include/jobs_engine.h | 4 +- 5 files changed, 94 insertions(+), 157 deletions(-) diff --git a/examples/examples_jobs_engine.h b/examples/examples_jobs_engine.h index 3b0d6f3..faa97cb 100644 --- a/examples/examples_jobs_engine.h +++ b/examples/examples_jobs_engine.h @@ -29,7 +29,7 @@ namespace examples::jobs_engine { using Request = std::pair; using JobsEng = small::jobs_engine; - auto jobs_processing_function = [](const std::vector &items) { + auto jobs_processing_function = [](const std::vector> &items) { // this functions is defined without the engine params (it is here just for the example) std::cout << "this function is defined without the engine params, called for " << (int)items[0]->type << "\n"; }; @@ -60,6 +60,7 @@ namespace examples::jobs_engine { << " req.int=" << item->request.first << "," << " req.str=\"" << item->request.second << "\"" << "}" + << " ref count " << item.use_count() << " time " << small::toISOString(small::timeNow()) << "\n"; } @@ -76,6 +77,7 @@ namespace examples::jobs_engine { << " req.int=" << item->request.first << "," << " req.str=\"" << item->request.second << "\"" << "}" + << " ref count " << item.use_count() << " time " << small::toISOString(small::timeNow()) << "\n"; } @@ -89,15 +91,18 @@ namespace examples::jobs_engine { jobs.queue().push_back(small::EnumPriorities::kHigh, JobsType::kJobsType2, {2, "high"}, &jobs_id); jobs.queue().push_back(small::EnumPriorities::kNormal, JobsType::kJobsType1, std::make_pair(3, "normal"), &jobs_id); - jobs.queue().push_back(small::EnumPriorities::kHigh, {.type = JobsType::kJobsType1, .request = {4, "high"}}, &jobs_id); + jobs.queue().push_back(small::EnumPriorities::kHigh, JobsType::kJobsType1, {4, "high"}, &jobs_id); jobs.queue().push_back(small::EnumPriorities::kLow, JobsType::kJobsType1, {5, "low"}, &jobs_id); Request req = {6, "normal"}; - jobs.queue().push_back(small::EnumPriorities::kNormal, {.type = JobsType::kJobsType1, .request = req}, nullptr); + jobs.queue().push_back(small::EnumPriorities::kNormal, JobsType::kJobsType1, req, nullptr); - std::vector jobs_items = {{.type = JobsType::kJobsType1, .request = {7, "highest"}}}; + std::vector> jobs_items = { + std::make_shared(JobsType::kJobsType1, Request{7, "highest"}), + std::make_shared(JobsType::kJobsType1, Request{8, "highest"}), + }; jobs.queue().push_back(small::EnumPriorities::kHighest, jobs_items, &jobs_ids); - jobs.queue().push_back(small::EnumPriorities::kHighest, {{.type = JobsType::kJobsType1, .request = {8, "highest"}}}, &jobs_ids); + jobs.queue().push_back(small::EnumPriorities::kHighest, {std::make_shared(JobsType::kJobsType1, Request{9, "highest"})}, &jobs_ids); jobs.queue().push_back_delay_for(std::chrono::milliseconds(300), small::EnumPriorities::kNormal, JobsType::kJobsType1, {100, "delay normal"}, &jobs_id); jobs.queue().push_back_delay_until(small::timeNow() + std::chrono::milliseconds(350), small::EnumPriorities::kNormal, JobsType::kJobsType1, {101, "delay normal"}, &jobs_id); diff --git a/include/impl/jobs_item_impl.h b/include/impl/jobs_item_impl.h index a755b28..684d5e1 100644 --- a/include/impl/jobs_item_impl.h +++ b/include/impl/jobs_item_impl.h @@ -27,54 +27,60 @@ namespace small::jobsimpl { { using JobsID = unsigned long long; - JobsID id{}; // job unique id - JobsTypeT type{}; // job type - // std::atomic state{}; // job state - // std::atomic progress{}; // progress 0-100 for state kInProgress - EnumJobsState state{}; // job state - int progress{}; // progress 0-100 for state kInProgress - JobsRequestT request{}; // request needed for processing function - JobsResponseT response{}; // where the results are saved (for the finished callback if exists) + JobsID id{}; // job unique id + JobsTypeT type{}; // job type + std::atomic state{EnumJobsState::kNone}; // job state + std::atomic progress{}; // progress 0-100 for state kInProgress + JobsRequestT request{}; // request needed for processing function + JobsResponseT response{}; // where the results are saved (for the finished callback if exists) - // explicit jobs_item() = default; - // explicit jobs_item(std::initializer_list) {}; - // jobs_item(const jobs_item &other) { operator=(other); }; - // jobs_item(jobs_item &&other) { operator=(other); }; - // jobs_item &operator=(const jobs_item &other) - // { - // id = other.id; - // type = other.type; - // state = other.state.load(); - // progress = other.progress.load(); - // request = other.request; - // response = other.response; - // return *this; - // } - // jobs_item &operator=(jobs_item &&other) - // { - // id = std::move(other.id); - // type = std::move(other.type); - // state = other.state.load(); - // progress = other.progress.load(); - // request = std::move(other.request); - // response = std::move(other.response); - // return *this; - // } + explicit jobs_item() = default; + explicit jobs_item(const JobsID &jobs_id, const JobsTypeT &jobs_type, const JobsRequestT &jobs_request) + : id(jobs_id), type(jobs_type), request(jobs_request) {} + explicit jobs_item(const JobsTypeT &jobs_type, const JobsRequestT &jobs_request) + : type(jobs_type), request(jobs_request) {} + explicit jobs_item(const JobsID &jobs_id, const JobsTypeT &jobs_type, JobsRequestT &&jobs_request) + : id(jobs_id), type(jobs_type), request(std::forward(jobs_request)) {} + explicit jobs_item(const JobsTypeT &jobs_type, JobsRequestT &&jobs_request) + : type(jobs_type), request(std::forward(jobs_request)) {} + + jobs_item(const jobs_item &other) { operator=(other); }; + jobs_item(jobs_item &&other) noexcept { operator=(other); }; + jobs_item &operator=(const jobs_item &other) + { + id = other.id; + type = other.type; + state = other.state.load(); + progress = other.progress.load(); + request = other.request; + response = other.response; + return *this; + } + jobs_item &operator=(jobs_item &&other) noexcept + { + id = std::move(other.id); + type = std::move(other.type); + state = other.state.load(); + progress = other.progress.load(); + request = std::move(other.request); + response = std::move(other.response); + return *this; + } // // set job state (can only go from lower to upper state) // inline void set_state(const EnumJobsState &new_state) { - // for (;;) { - // EnumJobsState current_state = state.load(); - // if (current_state >= new_state) { - // return; - // } - // if (state.compare_exchange_weak(current_state, new_state)) { - // return; - // } - // } + for (;;) { + EnumJobsState current_state = state.load(); + if (current_state >= new_state) { + return; + } + if (state.compare_exchange_weak(current_state, new_state)) { + return; + } + } } // @@ -82,15 +88,15 @@ namespace small::jobsimpl { // inline void set_progress(const int &new_progress) { - // for (;;) { - // int current_progress = progress.load(); - // if (current_progress >= new_progress) { - // return; - // } - // if (progress.compare_exchange_weak(current_progress, new_progress)) { - // return; - // } - // } + for (;;) { + int current_progress = progress.load(); + if (current_progress >= new_progress) { + return; + } + if (progress.compare_exchange_weak(current_progress, new_progress)) { + return; + } + } } }; diff --git a/include/impl/jobs_queue_impl.h b/include/impl/jobs_queue_impl.h index e2b009d..b3007ac 100644 --- a/include/impl/jobs_queue_impl.h +++ b/include/impl/jobs_queue_impl.h @@ -109,10 +109,10 @@ namespace small::jobsimpl { // inline std::size_t push_back(const JobsPrioT &priority, const JobsTypeT &jobs_type, const JobsRequestT &job_req, JobsID *jobs_id = nullptr) { - return push_back(priority, {.type = jobs_type, .request = job_req}, jobs_id); + return push_back(priority, std::make_shared(jobs_type, job_req), jobs_id); } - inline std::size_t push_back(const JobsPrioT &priority, const JobsItem &jobs_item, JobsID *jobs_id = nullptr) + inline std::size_t push_back(const JobsPrioT &priority, std::shared_ptr jobs_item, JobsID *jobs_id = nullptr) { if (is_exit()) { return 0; @@ -123,10 +123,10 @@ namespace small::jobsimpl { *jobs_id = id; } - return jobs_activate(priority, jobs_item.type, id); + return jobs_activate(priority, jobs_item->type, id); } - inline std::size_t push_back(const JobsPrioT &priority, const std::vector &jobs_items, std::vector *jobs_ids) + inline std::size_t push_back(const JobsPrioT &priority, const std::vector> &jobs_items, std::vector *jobs_ids) { if (is_exit()) { return 0; @@ -155,47 +155,7 @@ namespace small::jobsimpl { // push_back move semantics inline std::size_t push_back(const JobsPrioT &priority, const JobsTypeT &jobs_type, JobsRequestT &&jobs_req, JobsID *jobs_id = nullptr) { - return push_back(priority, {.type = jobs_type, .request = std::forward(jobs_req)}, jobs_id); - } - - inline std::size_t push_back(const JobsPrioT &priority, JobsItem &&jobs_item, JobsID *jobs_id = nullptr) - { - if (is_exit()) { - return 0; - } - auto jobs_type = jobs_item.type; // save the type because the object will be moved - auto id = jobs_add(std::forward(jobs_item)); - if (jobs_id) { - *jobs_id = id; - } - - return jobs_activate(priority, jobs_type, id); - } - - inline std::size_t push_back(const JobsPrioT &priority, std::vector &&jobs_items, std::vector *jobs_ids) - { - if (is_exit()) { - return 0; - } - - std::unique_lock l(m_lock); - - std::size_t count = 0; - if (jobs_ids) { - jobs_ids->reserve(jobs_items.size()); - jobs_ids->clear(); - } - JobsID jobs_id{}; - for (auto &&jobs_item : jobs_items) { - auto ret = push_back(priority, std::forward(jobs_item), &jobs_id); - if (ret) { - if (jobs_ids) { - jobs_ids->push_back(jobs_id); - } - } - count += ret; - } - return count; + return push_back(priority, std::make_shared(jobs_type, std::forward(jobs_req)), jobs_id); } // no emplace_back do to returning the jobs_id @@ -206,64 +166,43 @@ namespace small::jobsimpl { template inline std::size_t push_back_delay_for(const std::chrono::duration<_Rep, _Period> &__rtime, const JobsPrioT &priority, const JobsTypeT &jobs_type, const JobsRequestT &jobs_req, JobsID *jobs_id = nullptr) { - return push_back_delay_for(__rtime, priority, {.type = jobs_type, .request = jobs_req}, jobs_id); + return push_back_delay_for(__rtime, priority, std::make_shared(jobs_type, jobs_req), jobs_id); } template - inline std::size_t push_back_delay_for(const std::chrono::duration<_Rep, _Period> &__rtime, const JobsPrioT &priority, const JobsItem &jobs_item, JobsID *jobs_id = nullptr) + inline std::size_t push_back_delay_for(const std::chrono::duration<_Rep, _Period> &__rtime, const JobsPrioT &priority, std::shared_ptr jobs_item, JobsID *jobs_id = nullptr) { auto id = jobs_add(jobs_item); if (jobs_id) { *jobs_id = id; } - return m_delayed_items.queue().push_delay_for(__rtime, {priority, jobs_item.type, id}); + return m_delayed_items.queue().push_delay_for(__rtime, {priority, jobs_item->type, id}); } template inline std::size_t push_back_delay_for(const std::chrono::duration<_Rep, _Period> &__rtime, const JobsPrioT &priority, const JobsTypeT &jobs_type, JobsRequestT &&jobs_req, JobsID *jobs_id = nullptr) { - return push_back_delay_for(__rtime, priority, {.type = jobs_type, .request = std::forward(jobs_req)}, jobs_id); - } - - template - inline std::size_t push_back_delay_for(const std::chrono::duration<_Rep, _Period> &__rtime, const JobsPrioT &priority, JobsItem &&jobs_item, JobsID *jobs_id = nullptr) - { - auto jobs_type = jobs_item.type; // save the type because the object will be moved - auto id = jobs_add(std::forward(jobs_item)); - if (jobs_id) { - *jobs_id = id; - } - return m_delayed_items.queue().push_delay_for(__rtime, {priority, jobs_type, id}); + return push_back_delay_for(__rtime, priority, std::make_shared(jobs_type, std::forward(jobs_req)), jobs_id); } // avoid time_casting from one clock to another // template // inline std::size_t push_back_delay_until(const std::chrono::time_point &__atime, const JobsPrioT &priority, const JobsTypeT &jobs_type, const JobsRequestT &jobs_req, JobsID *jobs_id = nullptr) { - return push_back_delay_until(__atime, priority, {.type = jobs_type, .request = jobs_req}, jobs_id); + return push_back_delay_until(__atime, priority, std::make_shared(jobs_type, jobs_req), jobs_id); } - inline std::size_t push_back_delay_until(const std::chrono::time_point &__atime, const JobsPrioT &priority, const JobsItem &jobs_item, JobsID *jobs_id = nullptr) + inline std::size_t push_back_delay_until(const std::chrono::time_point &__atime, const JobsPrioT &priority, std::shared_ptr jobs_item, JobsID *jobs_id = nullptr) { auto id = jobs_add(jobs_item); if (jobs_id) { *jobs_id = id; } - return m_delayed_items.queue().push_delay_until(__atime, {priority, jobs_item.type, id}); + return m_delayed_items.queue().push_delay_until(__atime, {priority, jobs_item->type, id}); } inline std::size_t push_back_delay_until(const std::chrono::time_point &__atime, const JobsPrioT &priority, const JobsTypeT &jobs_type, JobsRequestT &&jobs_req, JobsID *jobs_id = nullptr) { - return push_back_delay_until(__atime, priority, {.type = jobs_type, .request = std::forward(jobs_req)}, jobs_id); - } - - inline std::size_t push_back_delay_until(const std::chrono::time_point &__atime, const JobsPrioT &priority, JobsItem &&jobs_item, JobsID *jobs_id = nullptr) - { - auto jobs_type = jobs_item.type; // save the type because the object will be moved - auto id = jobs_add(std::forward(jobs_item)); - if (jobs_id) { - *jobs_id = id; - } - return m_delayed_items.queue().push_delay_until(__atime, {priority, jobs_type, id}); + return push_back_delay_until(__atime, priority, std::make_shared(jobs_type, std::forward(jobs_req)), jobs_id); } // clang-format off @@ -356,9 +295,9 @@ namespace small::jobsimpl { return it != m_groups_queues.end() ? &it->second : nullptr; } - inline std::vector jobs_get(const std::vector &jobs_ids) + inline std::vector> jobs_get(const std::vector &jobs_ids) { - std::vector jobs_items; + std::vector> jobs_items; jobs_items.reserve(jobs_ids.size()); std::unique_lock l(m_lock); @@ -368,10 +307,10 @@ namespace small::jobsimpl { if (it_j == m_jobs.end()) { continue; } - jobs_items.push_back(&it_j->second); + jobs_items.push_back(it_j->second); } - return jobs_items; + return jobs_items; // will be moved } inline void jobs_del(const JobsID &jobs_id) @@ -391,28 +330,15 @@ namespace small::jobsimpl { // // add job items // - inline JobsID jobs_add(const JobsItem &jobs_item) + inline JobsID jobs_add(std::shared_ptr jobs_item) { std::unique_lock l(m_lock); ++m_jobs_seq_id; JobsID id = m_jobs_seq_id; - m_jobs[id] = jobs_item; - m_jobs[id].id = id; - - return id; - } - - inline JobsID jobs_add(JobsItem &&jobs_item) - { - std::unique_lock l(m_lock); - - ++m_jobs_seq_id; - - JobsID id = m_jobs_seq_id; - jobs_item.id = id; - m_jobs.emplace(id, std::forward(jobs_item)); + jobs_item->id = id; + m_jobs.emplace(id, jobs_item); return id; } @@ -456,11 +382,11 @@ namespace small::jobsimpl { // // members // - mutable small::base_lock m_lock; // global locker - std::atomic m_jobs_seq_id{}; // to get the next jobs id - std::unordered_map m_jobs; // current jobs - std::unordered_map m_groups_queues; // map of queues by group - std::unordered_map m_types_queues; // optimize to have queues by type (which reference queues by group) + mutable small::base_lock m_lock; // global locker + std::atomic m_jobs_seq_id{}; // to get the next jobs id + std::unordered_map> m_jobs; // current jobs + std::unordered_map m_groups_queues; // map of queues by group + std::unordered_map m_types_queues; // optimize to have queues by type (which reference queues by group) JobQueueDelayedT m_delayed_items{*this}; // queue of delayed items diff --git a/include/jobs_config.h b/include/jobs_config.h index 666378e..81d95dc 100644 --- a/include/jobs_config.h +++ b/include/jobs_config.h @@ -17,7 +17,7 @@ namespace small { struct jobs_config { using JobsItem = typename small::jobsimpl::jobs_item; - using ProcessingFunction = std::function &)>; + using ProcessingFunction = std::function> &)>; // config for the entire jobs engine struct ConfigJobsEngine diff --git a/include/jobs_engine.h b/include/jobs_engine.h index f67676e..06084fa 100644 --- a/include/jobs_engine.h +++ b/include/jobs_engine.h @@ -294,10 +294,10 @@ namespace small { *has_items = true; // get jobs - std::vector jobs_items = m_queue.jobs_get(vec_ids); + std::vector> jobs_items = m_queue.jobs_get(vec_ids); // split by type - std::unordered_map> elems_by_type; + std::unordered_map>> elems_by_type; for (auto &jobs_item : jobs_items) { elems_by_type[jobs_item->type].reserve(jobs_items.size()); elems_by_type[jobs_item->type].push_back(jobs_item);