From 5a30fedc0b55ad48b6eb55efb3be37b543e29768 Mon Sep 17 00:00:00 2001 From: Cristian Herghelegiu <56673580+herrcristi@users.noreply.github.com> Date: Wed, 25 Dec 2024 15:39:11 +0200 Subject: [PATCH] Rename to group_queue to have a more generic purpose (#23) --- README.md | 73 ++++++ examples/examples_group_queue.h | 51 ++++ examples/examples_jobs_engine.h | 43 +--- include/{jobs_queue.h => group_queue.h} | 228 +++++++++--------- include/jobs_engine.h | 16 +- include/prio_queue.h | 28 ++- include/util_time.h | 2 +- main.cpp | 3 +- ...st_jobs_queue.cpp => test_group_queue.cpp} | 76 +++--- tests/test_prio_queue.cpp | 39 +++ 10 files changed, 349 insertions(+), 210 deletions(-) create mode 100644 examples/examples_group_queue.h rename include/{jobs_queue.h => group_queue.h} (61%) rename tests/{test_jobs_queue.cpp => test_group_queue.cpp} (80%) diff --git a/README.md b/README.md index cfbedfe..6263fe5 100644 --- a/README.md +++ b/README.md @@ -11,6 +11,7 @@ This can be used in following ways: - lock_queue (thread safe queue with waiting mechanism to be used in concurrent environment) - time_queue (thread safe queue for delay requests) - prio_queue (thread safe queue for requests with priority like high, normal, low, etc) +- group_queue (thread safe queue to group items that have a type, priority and info, more types will be grouped to use same queue) - worker_thread (creates workers on separate threads that do task when requested, based on lock_queue and time_queue) @@ -272,6 +273,78 @@ q.signal_exit_force(); // q.signal_exit_when_done(); # +### group_queue + +A queue for grouping items that have a type, priority and info (where more types will be grouped to use same queue) + +Works with any user priorities and by default small::EnumPriorities are used (kHighest, kHigh, kNormal, kLow, kLowest) + +The following functions are available + +For container + +`size, empty, clear, reset` + +Add type elements with priority into queue (the queue must be setup initially to associate the group for a type) + +`add_type_group` + +`push_back, emplace_back` + +For events or locking + +`lock, unlock, try_lock` + +Wait for items in the group queue + +`wait_pop_front, wait_pop_front_for, wait_pop_front_until` + +Wait for queue to become empty + +`wait`, `wait_for`, `wait_until` + +Signal exit when we no longer want to use the queue + +`signal_exit_force, is_exit_force` // exit immediatly ignoring what is left in the queue + +`signal_exit_when_done, is_exit_when_done` // exit when queue is empty, after this flag is set no more items can be pushed in the queue + +Use it like this + +``` +enum Type { + kType1 +}; +enum GroupType { + kGroup1 +}; + +small::group_queue q; +q.add_type_group( Type::kType1, GroupType::kGroup1 ); // set the group for the type +... +q.push_back( small::EnumPriorities::kNormal, Type::kType1, 1 ); +... + +// on some thread +std::pair e{}; +auto ret = q.wait_pop_front( GroupType::kGroup1, &e ); +// or wait_pop_front_for( std::chrono::minutes( 1 ), GroupType::kGroup1, &e ); +// ret can be small::EnumLock::kExit, small::EnumLock::kTimeout or ret == small::EnumLock::kElement +if ( ret == small::EnumLock::kElement ) +{ + // do something with e + ... +} + +... +// on main thread no more processing (aborting work) +q.signal_exit_force(); // q.signal_exit_when_done() +... +// make sure that all calls to wait_* are finished before calling destructor (like it is done in worker_thread) +``` + +# + ### worker_thread A class that creates several threads for producer/consumer diff --git a/examples/examples_group_queue.h b/examples/examples_group_queue.h new file mode 100644 index 0000000..c3e88d2 --- /dev/null +++ b/examples/examples_group_queue.h @@ -0,0 +1,51 @@ +#include +#include +#include +#include +#include +#include +#include + +#include "../include/group_queue.h" + +namespace examples::group_queue { + // + // example 1 + // + int Example1() + { + std::cout << "Group Queue example 1\n"; + + enum class Type + { + kType1 + }; + enum class GroupType + { + kGroup1 + }; + + small::group_queue q; + q.add_type_group(Type::kType1, GroupType::kGroup1); + + q.push_back(small::EnumPriorities::kNormal, Type::kType1, 1); + + // on some thread + std::pair e{}; + auto ret = q.wait_pop_front(GroupType::kGroup1, &e); + // or wait_pop_front_for( std::chrono::minutes( 1 ), GroupType:kGroup1, &e ); + // ret can be small::EnumLock::kExit, small::EnumLock::kTimeout or ret == small::EnumLock::kElement + if (ret == small::EnumLock::kElement) { + // do something with e + std::cout << "elem from group q has type " << (int)e.first << ", " << e.second << "\n"; + } + + // on main thread no more processing (aborting work) + q.signal_exit_force(); // q.signal_exit_when_done() + + std::cout << "Groups Queue example 1 finish\n\n"; + + return 0; + } + +} // namespace examples::group_queue \ No newline at end of file diff --git a/examples/examples_jobs_engine.h b/examples/examples_jobs_engine.h index 0872cd6..c4e2126 100644 --- a/examples/examples_jobs_engine.h +++ b/examples/examples_jobs_engine.h @@ -14,46 +14,7 @@ namespace examples::jobs_engine { // int Example1() { - std::cout << "Jobs Queue example 1\n"; - - enum JobType - { - kJob1 - }; - enum JobGroupType - { - kJobGroup1 - }; - - small::jobs_queue q; - q.set_job_type_group(JobType::kJob1, JobGroupType::kJobGroup1); - - q.push_back(small::EnumPriorities::kNormal, JobType::kJob1, 1); - - // on some thread - std::pair e{}; - auto ret = q.wait_pop_front(JobGroupType ::kJobGroup1, &e); - // or wait_pop_front_for( std::chrono::minutes( 1 ), JobGroupType:kJobGroup1, &e ); - // ret can be small::EnumLock::kExit, small::EnumLock::kTimeout or ret == small::EnumLock::kElement - if (ret == small::EnumLock::kElement) { - // do something with e - std::cout << "elem from jobs q " << e.first << ", " << e.second << "\n"; - } - - // on main thread no more processing (aborting work) - q.signal_exit_force(); // q.signal_exit_when_done() - - std::cout << "Jobs Queue example 1 finish\n\n"; - - return 0; - } - - // - // example 2 - // - int Example2() - { - std::cout << "Jobs Engine example 2\n"; + std::cout << "Jobs Engine example 1\n"; using qc = std::pair; @@ -116,7 +77,7 @@ namespace examples::jobs_engine { std::cout << "wait for with timeout, ret = " << static_cast(ret) << " as timeout\n"; jobs.wait(); // wait here for jobs to finish due to exit flag - std::cout << "Jobs Engine example 2 finish\n\n"; + std::cout << "Jobs Engine example 1 finish\n\n"; return 0; } diff --git a/include/jobs_queue.h b/include/group_queue.h similarity index 61% rename from include/jobs_queue.h rename to include/group_queue.h index 813cb90..07e1886 100644 --- a/include/jobs_queue.h +++ b/include/group_queue.h @@ -7,25 +7,25 @@ #include "base_lock.h" #include "prio_queue.h" -// a queue for jobs which have a job type and a job info +// a queue for grouping items which have a type, info and a priority // -// enum JobType { -// kJob1 +// enum Type { +// kType1 // }; -// enum JobGroupType { -// kJobGroup1 +// enum GroupType { +// kGroup1 // }; // -// small::jobs_queue q; -// q.set_job_type_group( JobType::kJob1, JobGroupType::kJobGroup1 ); +// small::group_queue q; +// q.add_type_group( Type::kType1, GroupType::kGroup1 ); // ... -// q.push_back( small::EnumPriorities::kNormal, JobType::kJob1, 1 ); +// q.push_back( small::EnumPriorities::kNormal, Type::kType1, 1 ); // ... // // // on some thread -// std::pair e{}; -// auto ret = q.wait_pop_front( JobGroupType::kJobGroup1, &e ); -// // or wait_pop_front_for( std::chrono::minutes( 1 ), JobGroupType::kJobGroup1, &e ); +// std::pair e{}; +// auto ret = q.wait_pop_front( GroupType::kGroup1, &e ); +// // or wait_pop_front_for( std::chrono::minutes( 1 ), GroupType::kGroup1, &e ); // // ret can be small::EnumLock::kExit, small::EnumLock::kTimeout or ret == small::EnumLock::kElement // if ( ret == small::EnumLock::kElement ) // { @@ -40,61 +40,61 @@ namespace small { // - // queue for jobs who have a job type, priority and a job info elem - // the job types can be grouped by group + // queue for grouping items that have a type, priority and info elem + // the types are grouped by group // - template - class jobs_queue + template + class group_queue { - using JobTypeQueue = small::prio_queue, PrioT>; + using TypeQueue = small::prio_queue, PrioT>; public: // - // jobs_queue + // group_queue // - explicit jobs_queue(const small::config_prio_queue &config = {}) + explicit group_queue(const small::config_prio_queue &config = {}) : m_prio_config{config} { } - jobs_queue(const jobs_queue &o) : jobs_queue() { operator=(o); }; - jobs_queue(jobs_queue &&o) noexcept : jobs_queue() { operator=(std::move(o)); }; + group_queue(const group_queue &o) : group_queue() { operator=(o); }; + group_queue(group_queue &&o) noexcept : group_queue() { operator=(std::move(o)); }; - inline jobs_queue &operator=(const jobs_queue &o) + inline group_queue &operator=(const group_queue &o) { std::scoped_lock l(m_lock, o.m_lock); - m_total_count = o.m_total_count; - m_jobs_types_groups = o.m_jobs_types_groups; - m_jobs_group_queues = o.m_jobs_group_queues; - // rebuild m_jobs_types_queues - m_jobs_types_queues.clear(); - for (auto &[job_type, job_group] : m_jobs_types_groups) { - m_jobs_types_queues[job_type] = &m_jobs_group_queues[job_group]; + m_total_count = o.m_total_count; + m_types_groups = o.m_types_groups; + m_group_queues = o.m__group_queues; + // rebuild m_types_queues + m_types_queues.clear(); + for (auto &[type, group] : m_types_groups) { + m_types_queues[type] = &m_group_queues[group]; } return *this; } - inline jobs_queue &operator=(jobs_queue &&o) noexcept + inline group_queue &operator=(group_queue &&o) noexcept { std::scoped_lock l(m_lock, o.m_lock); - m_total_count = o.m_total_count; - m_jobs_types_groups = std::move(o.m_jobs_types_groups); - m_jobs_group_queues = std::move(o.m_jobs_group_queues); - m_jobs_types_queues = std::move(o.m_jobs_types_queues); + m_total_count = o.m_total_count; + m_types_groups = std::move(o.m_types_groups); + m_group_queues = std::move(o.m_group_queues); + m_types_queues = std::move(o.m_types_queues); return *this; } // - // config, map the job_type to a group + // config, map the type to a group // - inline void set_job_type_group(const JobTypeT job_type, const JobGroupT job_group) + inline void add_type_group(const TypeT type, const GroupT group) { - // m_jobs_group_queues will be initialized only here so later it can be accessed even without locking - m_jobs_types_groups[job_type] = job_group; + // m_group_queues will be initialized only here so later it can be accessed even without locking + m_types_groups[type] = group; - auto it_group = m_jobs_group_queues.find(job_group); - if (it_group == m_jobs_group_queues.end()) { - m_jobs_group_queues[job_group] = JobTypeQueue{m_prio_config}; + auto it_group = m_group_queues.find(group); + if (it_group == m_group_queues.end()) { + m_group_queues[group] = TypeQueue{m_prio_config}; } - m_jobs_types_queues[job_type] = &m_jobs_group_queues[job_group]; + m_types_queues[type] = &m_group_queues[group]; } // @@ -107,30 +107,30 @@ namespace small { inline bool empty() { return size() == 0; } - inline size_t size(const JobGroupT job_group) + inline size_t size(const GroupT group) { - auto it = m_jobs_group_queues.find(job_group); - return it != m_jobs_group_queues.end() ? it->second.size() : 0; + auto it = m_group_queues.find(group); + return it != m_group_queues.end() ? it->second.size() : 0; } - inline bool empty(const JobGroupT job_group) { return size(job_group) == 0; } + inline bool empty(const GroupT group) { return size(group) == 0; } // // removes elements // inline void clear() { - for (auto &[job_group, q] : m_jobs_group_queues) { + for (auto &[group, q] : m_group_queues) { std::unique_lock l(q); m_total_count.fetch_sub(q.size()); q.clear(); } } - inline void clear(const JobGroupT job_group) + inline void clear(const GroupT group) { - auto it = m_jobs_group_queues.find(job_group); - if (it != m_jobs_group_queues.end()) { + auto it = m_group_queues.find(group); + if (it != m_group_queues.end()) { std::unique_lock l(it->second); m_total_count.fetch_sub(it->second.size()); it->second.clear(); @@ -147,36 +147,36 @@ namespace small { // // push_back // - inline std::size_t push_back(const PrioT priority, const JobTypeT job_type, const JobElemT &elem) + inline std::size_t push_back(const PrioT priority, const TypeT type, const ElemT &elem) { if (is_exit()) { return 0; } // get queue - auto q = get_job_type_group_queue(job_type); + auto q = get_type_group_queue(type); if (!q) { return 0; } ++m_total_count; // increase before adding - auto ret = q->push_back(priority, {job_type, elem}); + auto ret = q->push_back(priority, {type, elem}); if (!ret) { --m_total_count; } return ret; } - inline std::size_t push_back(const PrioT priority, const std::pair &pair_elem) + inline std::size_t push_back(const PrioT priority, const std::pair &pair_elem) { if (is_exit()) { return 0; } - auto job_type = pair_elem.first; + auto type = pair_elem.first; // get queue - auto q = get_job_type_group_queue(job_type); + auto q = get_type_group_queue(type); if (!q) { return 0; } @@ -189,14 +189,14 @@ namespace small { return ret; } - inline std::size_t push_back(const PrioT priority, const JobTypeT job_type, const std::vector &elems) + inline std::size_t push_back(const PrioT priority, const TypeT type, const std::vector &elems) { if (is_exit()) { return 0; } // get queue - auto q = get_job_type_group_queue(job_type); + auto q = get_type_group_queue(type); if (!q) { return 0; } @@ -204,7 +204,7 @@ namespace small { std::size_t count = 0; for (auto &elem : elems) { ++m_total_count; // increase before adding - auto ret = q->push_back(priority, {job_type, elem}); + auto ret = q->push_back(priority, {type, elem}); if (!ret) { --m_total_count; } @@ -214,56 +214,56 @@ namespace small { } // push_back move semantics - inline std::size_t push_back(const PrioT priority, const JobTypeT job_type, JobElemT &&elem) + inline std::size_t push_back(const PrioT priority, const TypeT type, ElemT &&elem) { if (is_exit()) { return 0; } // get queue - auto q = get_job_type_group_queue(job_type); + auto q = get_type_group_queue(type); if (!q) { return 0; } ++m_total_count; // increase before adding - auto ret = q->push_back(priority, {job_type, std::forward(elem)}); + auto ret = q->push_back(priority, {type, std::forward(elem)}); if (!ret) { --m_total_count; } return ret; } - inline std::size_t push_back(const PrioT priority, std::pair &&pair_elem) + inline std::size_t push_back(const PrioT priority, std::pair &&pair_elem) { if (is_exit()) { return 0; } - auto job_type = pair_elem.first; + auto type = pair_elem.first; // get queue - auto q = get_job_type_group_queue(job_type); + auto q = get_type_group_queue(type); if (!q) { return 0; } ++m_total_count; // increase before adding - auto ret = q->push_back(priority, std::forward>(pair_elem)); + auto ret = q->push_back(priority, std::forward>(pair_elem)); if (!ret) { --m_total_count; } return ret; } - inline std::size_t push_back(const PrioT priority, const JobTypeT job_type, std::vector &&elems) + inline std::size_t push_back(const PrioT priority, const TypeT type, std::vector &&elems) { if (is_exit()) { return 0; } // get queue - auto q = get_job_type_group_queue(job_type); + auto q = get_type_group_queue(type); if (!q) { return 0; } @@ -271,7 +271,7 @@ namespace small { std::size_t count = 0; for (auto &elem : elems) { ++m_total_count; // increase before adding - auto ret = q->push_back(priority, {job_type, std::forward(elem)}); + auto ret = q->push_back(priority, {type, std::forward(elem)}); if (!ret) { --m_total_count; } @@ -282,20 +282,20 @@ namespace small { // emplace_back template - inline std::size_t emplace_back(const PrioT priority, const JobTypeT job_type, _Args &&...__args) + inline std::size_t emplace_back(const PrioT priority, const TypeT type, _Args &&...__args) { if (is_exit()) { return 0; } // get queue - auto q = get_job_type_group_queue(job_type); + auto q = get_type_group_queue(type); if (!q) { return 0; } ++m_total_count; // increase before adding - auto ret = q->emplace_back(priority, job_type, JobElemT{std::forward<_Args>(__args)...}); + auto ret = q->emplace_back(priority, type, ElemT{std::forward<_Args>(__args)...}); if (!ret) { --m_total_count; } @@ -309,7 +309,7 @@ namespace small { { std::unique_lock l(m_lock); m_lock.signal_exit_force(); - for (auto &[job_group, q] : m_jobs_group_queues) { + for (auto &[group, q] : m_group_queues) { q.signal_exit_force(); } } @@ -322,7 +322,7 @@ namespace small { { std::unique_lock l(m_lock); m_lock.signal_exit_when_done(); - for (auto &[job_group, q] : m_jobs_group_queues) { + for (auto &[group, q] : m_group_queues) { q.signal_exit_when_done(); } } @@ -339,12 +339,12 @@ namespace small { // // wait pop_front and return that element // - inline EnumLock wait_pop_front(const JobGroupT job_group, - std::pair *elem) + inline EnumLock wait_pop_front(const GroupT group, + std::pair *elem) { // get queue - auto it_q = m_jobs_group_queues.find(job_group); - if (it_q == m_jobs_group_queues.end()) { + auto it_q = m_group_queues.find(group); + if (it_q == m_group_queues.end()) { return small::EnumLock::kTimeout; } @@ -360,13 +360,13 @@ namespace small { return ret; } - inline EnumLock wait_pop_front(const JobGroupT job_group, - std::vector> &vec_elems, - int max_count = 1) + inline EnumLock wait_pop_front(const GroupT group, + std::vector> &vec_elems, + int max_count = 1) { // get queue - auto it_q = m_jobs_group_queues.find(job_group); - if (it_q == m_jobs_group_queues.end()) { + auto it_q = m_group_queues.find(group); + if (it_q == m_group_queues.end()) { return small::EnumLock::kTimeout; } @@ -389,12 +389,12 @@ namespace small { // template inline EnumLock wait_pop_front_for(const std::chrono::duration<_Rep, _Period> &__rtime, - const JobGroupT job_group, - std::pair *elem) + const GroupT group, + std::pair *elem) { // get queue - auto it_q = m_jobs_group_queues.find(job_group); - if (it_q == m_jobs_group_queues.end()) { + auto it_q = m_group_queues.find(group); + if (it_q == m_group_queues.end()) { return small::EnumLock::kTimeout; } @@ -412,13 +412,13 @@ namespace small { template inline EnumLock wait_pop_front_for(const std::chrono::duration<_Rep, _Period> &__rtime, - const JobGroupT job_group, - std::vector> &vec_elems, + const GroupT group, + std::vector> &vec_elems, int max_count = 1) { // get queue - auto it_q = m_jobs_group_queues.find(job_group); - if (it_q == m_jobs_group_queues.end()) { + auto it_q = m_group_queues.find(group); + if (it_q == m_group_queues.end()) { return small::EnumLock::kTimeout; } @@ -441,12 +441,12 @@ namespace small { // template inline EnumLock wait_pop_front_until(const std::chrono::time_point<_Clock, _Duration> &__atime, - const JobGroupT job_group, - std::pair *elem) + const GroupT group, + std::pair *elem) { // get queue - auto it_q = m_jobs_group_queues.find(job_group); - if (it_q == m_jobs_group_queues.end()) { + auto it_q = m_group_queues.find(group); + if (it_q == m_group_queues.end()) { return small::EnumLock::kTimeout; } @@ -464,13 +464,13 @@ namespace small { template inline EnumLock wait_pop_front_until(const std::chrono::time_point<_Clock, _Duration> &__atime, - const JobGroupT job_group, - std::vector> &vec_elems, + const GroupT group, + std::vector> &vec_elems, int max_count = 1) { // get queue - auto it_q = m_jobs_group_queues.find(job_group); - if (it_q == m_jobs_group_queues.end()) { + auto it_q = m_group_queues.find(group); + if (it_q == m_group_queues.end()) { return small::EnumLock::kTimeout; } @@ -495,7 +495,7 @@ namespace small { { signal_exit_when_done(); - for (auto &[job_group, q] : m_jobs_group_queues) { + for (auto &[group, q] : m_group_queues) { std::unique_lock l(q); m_queues_exit_condition.wait(l, [_q = &q]() -> bool { return _q->empty() || _q->is_exit_force(); @@ -521,7 +521,7 @@ namespace small { { signal_exit_when_done(); - for (auto &[job_group, q] : m_jobs_group_queues) { + for (auto &[group, q] : m_group_queues) { std::unique_lock l(q); auto status = m_queues_exit_condition.wait_until(l, __atime, [_q = &q]() -> bool { @@ -537,15 +537,15 @@ namespace small { private: // - // get job type queue from the group queues + // get type queue from the group queues // - inline JobTypeQueue *get_job_type_group_queue(const JobTypeT job_type) + inline TypeQueue *get_type_group_queue(const TypeT type) { // optimization to get the queue from the type - // instead of getting the group from type from m_jobs_types_groups - // and then getting the queue from the m_jobs_group_queues - auto it_q = m_jobs_types_queues.find(job_type); - if (it_q == m_jobs_types_queues.end()) { + // instead of getting the group from type from m_types_groups + // and then getting the queue from the m_group_queues + auto it_q = m_types_queues.find(type); + if (it_q == m_types_queues.end()) { return nullptr; } @@ -555,7 +555,7 @@ namespace small { // // notify condition if q is empty // - inline void notify_if_empty(JobTypeQueue &q) + inline void notify_if_empty(TypeQueue &q) { std::unique_lock l(q); if (q.empty() || q.is_exit_force()) { @@ -567,12 +567,12 @@ namespace small { // // members // - mutable small::base_lock m_lock; // global locker - std::atomic m_total_count{}; // count of all jobs items - std::unordered_map m_jobs_types_groups; // map to get the group for a job type - small::config_prio_queue m_prio_config; // config for the priority queue - std::unordered_map m_jobs_group_queues; // map of queues grouped by group - std::unordered_map m_jobs_types_queues; // optimize from group to type map of queues - std::condition_variable_any m_queues_exit_condition; // condition to wait for queues to be empty when signal_exit_when_done + mutable small::base_lock m_lock; // global locker + std::atomic m_total_count{}; // count of all items + std::unordered_map m_types_groups; // map to get the group for a type + small::config_prio_queue m_prio_config; // config for the priority queue + std::unordered_map m_group_queues; // map of queues grouped by group + std::unordered_map m_types_queues; // optimize from group to type map of queues + std::condition_variable_any m_queues_exit_condition; // condition to wait for queues to be empty when signal_exit_when_done }; } // namespace small diff --git a/include/jobs_engine.h b/include/jobs_engine.h index b0ee2c8..122745e 100644 --- a/include/jobs_engine.h +++ b/include/jobs_engine.h @@ -2,8 +2,8 @@ #include +#include "group_queue.h" #include "jobs_engine_scheduler.h" -#include "jobs_queue.h" // // example // using qc = std::pair; @@ -198,7 +198,7 @@ namespace small { m_config.m_jobs_types[job_type] = { .m_config = m_config.m_default_job_type, .m_processing_function = m_config.m_default_processing_function}; - m_jobs_queues.set_job_type_group(job_type, m_config.m_default_job_type.group); + m_jobs_queues.add_type_group(job_type, m_config.m_default_job_type.group); } inline void add_job_type(const JobTypeT job_type, const config_job_type &config) @@ -207,7 +207,7 @@ namespace small { m_config.m_jobs_types[job_type] = { .m_config = config, .m_processing_function = m_config.m_processing_function}; - m_jobs_queues.set_job_type_group(job_type, config.group); + m_jobs_queues.add_type_group(job_type, config.group); } template @@ -217,7 +217,7 @@ namespace small { m_config.m_jobs_types[job_type] = { .m_config = config, .m_processing_function = std::bind(std::forward<_Callable>(function), std::ref(*this), std::placeholders::_1 /*job_type*/, std::placeholders::_2 /*items*/, std::forward(extra_parameters)...)}; - m_jobs_queues.set_job_type_group(job_type, config.group); + m_jobs_queues.add_type_group(job_type, config.group); } // @@ -454,9 +454,9 @@ namespace small { std::unordered_map m_jobs_types; // config by job type }; - JobEngineConfig m_config; // configs for all: engine, groups, job types - small::jobs_queue m_jobs_queues; // curent jobs queues (with grouping and priority) for job types - JobQueueDelayedT m_delayed_items{*this}; // queue of delayed items - small::jobs_engine_scheduler m_scheduler{*this}; // scheduler for processing items (by group) using a pool of threads + JobEngineConfig m_config; // configs for all: engine, groups, job types + small::group_queue m_jobs_queues; // curent jobs queues (with grouping and priority) for job types + JobQueueDelayedT m_delayed_items{*this}; // queue of delayed items + small::jobs_engine_scheduler m_scheduler{*this}; // scheduler for processing items (by group) using a pool of threads }; } // namespace small diff --git a/include/prio_queue.h b/include/prio_queue.h index 8828974..10d0fff 100644 --- a/include/prio_queue.h +++ b/include/prio_queue.h @@ -44,6 +44,12 @@ namespace small { kLowest }; + // for the case when priorities are not used (this will reduce this class to a simple lock queue) + enum class EnumIgnorePriorities : unsigned int + { + kNoPriority = 0, + }; + template struct config_prio_queue { @@ -61,7 +67,15 @@ namespace small { {small::EnumPriorities::kHigh, 3}, {small::EnumPriorities::kNormal, 3}, {small::EnumPriorities::kLow, 3}, - {small::EnumPriorities::kLowest, 0}}; + {small::EnumPriorities::kLowest, 1}}; + }; + + // setup default for EnumPriorities + template <> + struct config_prio_queue + { + std::vector> priorities{ + {small::EnumIgnorePriorities::kNoPriority, 1}}; }; // @@ -402,11 +416,11 @@ namespace small { // iterate priorities from high to low for (auto &[prio, ratio] : m_config.priorities) { - auto &queue = m_prio_queues[prio]; + auto *queue = &m_prio_queues[prio]; auto &stats = m_prio_stats[prio]; // save the first priority for which the queue is not empty - if (!queue.empty() && !prio_with_non_empty_queue) { + if (!queue->empty() && !prio_with_non_empty_queue) { prio_with_non_empty_queue = prio; } @@ -421,22 +435,22 @@ namespace small { ++stats.m_count_executed; reset_higher_stats(prio); - if (queue.empty()) { + if (queue->empty()) { // choose one from previous if (prio_with_non_empty_queue) { - queue = m_prio_queues[prio_with_non_empty_queue.value()]; + queue = &m_prio_queues[prio_with_non_empty_queue.value()]; auto &prev_stats = m_prio_stats[prio_with_non_empty_queue.value()]; ++prev_stats.m_count_executed; } // all queues are empty so far so go to a lower prio - if (queue.empty()) { + if (queue->empty()) { continue; } } // get elem - return pop_front(queue, elem); + return pop_front(*queue, elem); } // reset all stats diff --git a/include/util_time.h b/include/util_time.h index ef66f00..9833224 100644 --- a/include/util_time.h +++ b/include/util_time.h @@ -54,7 +54,7 @@ namespace small { { auto tt = std::chrono::system_clock::to_time_t(time); std::tm tt_tm; // = *std::gmtime(&tt) is not thread safe -#ifdef _WIN32 || _WIN64 +#if defined(_WIN32) || defined(_WIN64) gmtime_s(&tt_tm, &tt); #else gmtime_r(&tt, &tt_tm); diff --git a/main.cpp b/main.cpp index fc6c7cc..d7634d5 100644 --- a/main.cpp +++ b/main.cpp @@ -7,6 +7,7 @@ #include "examples/examples_base64.h" #include "examples/examples_buffer.h" #include "examples/examples_event.h" +#include "examples/examples_group_queue.h" #include "examples/examples_hash.h" #include "examples/examples_jobs_engine.h" #include "examples/examples_lock_queue.h" @@ -29,13 +30,13 @@ int main() examples::lock_queue::Example1(); examples::time_queue::Example1(); examples::prio_queue::Example1(); + examples::group_queue::Example1(); examples::worker_thread::Example1(); examples::worker_thread::Example2(); examples::worker_thread::Example3_Perf(); examples::jobs_engine::Example1(); - examples::jobs_engine::Example2(); return 0; } \ No newline at end of file diff --git a/tests/test_jobs_queue.cpp b/tests/test_group_queue.cpp similarity index 80% rename from tests/test_jobs_queue.cpp rename to tests/test_group_queue.cpp index 8e36d53..6560185 100644 --- a/tests/test_jobs_queue.cpp +++ b/tests/test_group_queue.cpp @@ -3,7 +3,7 @@ #include #include -#include "../include/jobs_queue.h" +#include "../include/group_queue.h" #include "../include/util.h" namespace { @@ -35,13 +35,13 @@ namespace { // TEST_F(JobsQueueTest, Lock) { - small::jobs_queue q; + small::group_queue q; std::latch sync_thread{1}; std::latch sync_main{1}; // create thread - auto thread = std::jthread([](small::jobs_queue &_q, std::latch &sync_thread, std::latch &sync_main) { + auto thread = std::jthread([](small::group_queue &_q, std::latch &sync_thread, std::latch &sync_main) { std::unique_lock lock(_q); sync_thread.count_down(); // signal that thread is started (and also locked is acquired) sync_main.wait(); // wait that the main finished executing test to proceed further @@ -81,15 +81,15 @@ namespace { // TEST_F(JobsQueueTest, Queue_Operations) { - small::jobs_queue q( + small::group_queue q( {.priorities{{ {small::EnumPriorities::kHigh, 3}, {small::EnumPriorities::kNormal, 3}, {small::EnumPriorities::kLow, 3}, }}}); - q.set_job_type_group(JobType::kJob1, JobType::kJob1 /*as group*/); - q.set_job_type_group(JobType::kJob2, JobType::kJob1 /*as group*/); // same group - q.set_job_type_group(JobType::kJob3, JobType::kJob3 /*as group*/); + q.add_type_group(JobType::kJob1, JobType::kJob1 /*as group*/); + q.add_type_group(JobType::kJob2, JobType::kJob1 /*as group*/); // same group + q.add_type_group(JobType::kJob3, JobType::kJob3 /*as group*/); ASSERT_EQ(q.size(), 0); // push @@ -133,10 +133,10 @@ namespace { TEST_F(JobsQueueTest, Queue_Operations_Vec) { - small::jobs_queue q; - q.set_job_type_group(JobType::kJob1, JobType::kJob1 /*as group*/); - q.set_job_type_group(JobType::kJob2, JobType::kJob1 /*as group*/); // same group - q.set_job_type_group(JobType::kJob3, JobType::kJob3 /*as group*/); + small::group_queue q; + q.add_type_group(JobType::kJob1, JobType::kJob1 /*as group*/); + q.add_type_group(JobType::kJob2, JobType::kJob1 /*as group*/); // same group + q.add_type_group(JobType::kJob3, JobType::kJob3 /*as group*/); ASSERT_EQ(q.size(), 0); @@ -173,10 +173,10 @@ namespace { TEST_F(JobsQueueTest, Queue_Operations_Clear) { - small::jobs_queue q; - q.set_job_type_group(JobType::kJob1, JobType::kJob1 /*as group*/); - q.set_job_type_group(JobType::kJob2, JobType::kJob1 /*as group*/); // same group - q.set_job_type_group(JobType::kJob3, JobType::kJob3 /*as group*/); + small::group_queue q; + q.add_type_group(JobType::kJob1, JobType::kJob1 /*as group*/); + q.add_type_group(JobType::kJob2, JobType::kJob1 /*as group*/); // same group + q.add_type_group(JobType::kJob3, JobType::kJob3 /*as group*/); ASSERT_EQ(q.size(), 0); @@ -204,10 +204,10 @@ namespace { TEST_F(JobsQueueTest, Queue_Operations_Timeout) { - small::jobs_queue q; - q.set_job_type_group(JobType::kJob1, JobType::kJob1 /*as group*/); - q.set_job_type_group(JobType::kJob2, JobType::kJob1 /*as group*/); // same group - q.set_job_type_group(JobType::kJob3, JobType::kJob3 /*as group*/); + small::group_queue q; + q.add_type_group(JobType::kJob1, JobType::kJob1 /*as group*/); + q.add_type_group(JobType::kJob2, JobType::kJob1 /*as group*/); // same group + q.add_type_group(JobType::kJob3, JobType::kJob3 /*as group*/); ASSERT_EQ(q.size(), 0); @@ -246,10 +246,10 @@ namespace { TEST_F(JobsQueueTest, Queue_Operations_Timeout_Vec) { - small::jobs_queue q; - q.set_job_type_group(JobType::kJob1, JobType::kJob1 /*as group*/); - q.set_job_type_group(JobType::kJob2, JobType::kJob1 /*as group*/); // same group - q.set_job_type_group(JobType::kJob3, JobType::kJob3 /*as group*/); + small::group_queue q; + q.add_type_group(JobType::kJob1, JobType::kJob1 /*as group*/); + q.add_type_group(JobType::kJob2, JobType::kJob1 /*as group*/); // same group + q.add_type_group(JobType::kJob3, JobType::kJob3 /*as group*/); ASSERT_EQ(q.size(), 0); @@ -289,17 +289,17 @@ namespace { TEST_F(JobsQueueTest, Queue_Operations_Thread) { - small::jobs_queue q; - q.set_job_type_group(JobType::kJob1, JobType::kJob1 /*as group*/); - q.set_job_type_group(JobType::kJob2, JobType::kJob1 /*as group*/); // same group - q.set_job_type_group(JobType::kJob3, JobType::kJob3 /*as group*/); + small::group_queue q; + q.add_type_group(JobType::kJob1, JobType::kJob1 /*as group*/); + q.add_type_group(JobType::kJob2, JobType::kJob1 /*as group*/); // same group + q.add_type_group(JobType::kJob3, JobType::kJob3 /*as group*/); ASSERT_EQ(q.size(), 0); // push inside thread auto timeStart = small::timeNow(); - auto thread = std::jthread([](small::jobs_queue &_q) { + auto thread = std::jthread([](small::group_queue &_q) { small::sleep(300); int value{5}; @@ -322,10 +322,10 @@ namespace { TEST_F(JobsQueueTest, Queue_Operations_Signal_Exit_Force) { - small::jobs_queue q; - q.set_job_type_group(JobType::kJob1, JobType::kJob1 /*as group*/); - q.set_job_type_group(JobType::kJob2, JobType::kJob1 /*as group*/); // same group - q.set_job_type_group(JobType::kJob3, JobType::kJob3 /*as group*/); + small::group_queue q; + q.add_type_group(JobType::kJob1, JobType::kJob1 /*as group*/); + q.add_type_group(JobType::kJob2, JobType::kJob1 /*as group*/); // same group + q.add_type_group(JobType::kJob3, JobType::kJob3 /*as group*/); ASSERT_EQ(q.size(), 0); @@ -343,7 +343,7 @@ namespace { // create thread auto timeStart = small::timeNow(); - auto thread = std::jthread([](small::jobs_queue &_q) { + auto thread = std::jthread([](small::group_queue &_q) { // signal after some time small::sleep(300); _q.signal_exit_force(); @@ -366,10 +366,10 @@ namespace { TEST_F(JobsQueueTest, Queue_Operations_Signal_Exit_When_Done) { - small::jobs_queue q; - q.set_job_type_group(JobType::kJob1, JobType::kJob1 /*as group*/); - q.set_job_type_group(JobType::kJob2, JobType::kJob1 /*as group*/); // same group - q.set_job_type_group(JobType::kJob3, JobType::kJob3 /*as group*/); + small::group_queue q; + q.add_type_group(JobType::kJob1, JobType::kJob1 /*as group*/); + q.add_type_group(JobType::kJob2, JobType::kJob1 /*as group*/); // same group + q.add_type_group(JobType::kJob3, JobType::kJob3 /*as group*/); ASSERT_EQ(q.size(), 0); @@ -387,7 +387,7 @@ namespace { // create thread auto timeStart = small::timeNow(); - auto thread = std::jthread([](small::jobs_queue &_q) { + auto thread = std::jthread([](small::group_queue &_q) { // signal after some time small::sleep(300); _q.signal_exit_when_done(); diff --git a/tests/test_prio_queue.cpp b/tests/test_prio_queue.cpp index 229c219..d55e0fe 100644 --- a/tests/test_prio_queue.cpp +++ b/tests/test_prio_queue.cpp @@ -120,6 +120,45 @@ namespace { ASSERT_EQ(q1.size(), 0); } + // + // queue ignored priorities + // + TEST_F(PrioQueueTest, Queue_Operations_Ignore_Priorities) + { + small::prio_queue q; + ASSERT_EQ(q.size(), 0); + + // push + auto r_push = q.push_back(small::EnumIgnorePriorities::kNoPriority, 5); + ASSERT_EQ(r_push, 1); + + r_push = q.push_back(small::EnumIgnorePriorities::kNoPriority, 6); + ASSERT_EQ(r_push, 1); + + r_push = q.push_back({small::EnumIgnorePriorities::kNoPriority, 7}); // as a pair + ASSERT_EQ(r_push, 1); + ASSERT_EQ(q.size(), 3); + + // pop + int value{}; + auto ret = q.wait_pop_front(&value); + ASSERT_EQ(ret, small::EnumLock::kElement); + ASSERT_EQ(value, 5); + + value = {}; + ret = q.wait_pop_front(&value); + ASSERT_EQ(ret, small::EnumLock::kElement); + ASSERT_EQ(value, 6); + + value = {}; + ret = q.wait_pop_front(&value); + ASSERT_EQ(ret, small::EnumLock::kElement); + ASSERT_EQ(value, 7); + + // check size + ASSERT_EQ(q.size(), 0); + } + TEST_F(PrioQueueTest, Queue_Operations_Vec) { small::prio_queue q;