Skip to content

Commit

Permalink
Rename to group_queue to have a more generic purpose (#23)
Browse files Browse the repository at this point in the history
  • Loading branch information
herrcristi authored Dec 25, 2024
1 parent 551b9a1 commit 5a30fed
Show file tree
Hide file tree
Showing 10 changed files with 349 additions and 210 deletions.
73 changes: 73 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ This can be used in following ways:
- lock_queue (thread safe queue with waiting mechanism to be used in concurrent environment)
- time_queue (thread safe queue for delay requests)
- prio_queue (thread safe queue for requests with priority like high, normal, low, etc)
- group_queue (thread safe queue to group items that have a type, priority and info, more types will be grouped to use same queue)

- worker_thread (creates workers on separate threads that do task when requested, based on lock_queue and time_queue)

Expand Down Expand Up @@ -272,6 +273,78 @@ q.signal_exit_force(); // q.signal_exit_when_done();

#

### group_queue

A queue for grouping items that have a type, priority and info (where more types will be grouped to use same queue)

Works with any user priorities and by default small::EnumPriorities are used (kHighest, kHigh, kNormal, kLow, kLowest)

The following functions are available

For container

`size, empty, clear, reset`

Add type elements with priority into queue (the queue must be setup initially to associate the group for a type)

`add_type_group`

`push_back, emplace_back`

For events or locking

`lock, unlock, try_lock`

Wait for items in the group queue

`wait_pop_front, wait_pop_front_for, wait_pop_front_until`

Wait for queue to become empty

`wait`, `wait_for`, `wait_until`

Signal exit when we no longer want to use the queue

`signal_exit_force, is_exit_force` // exit immediatly ignoring what is left in the queue

`signal_exit_when_done, is_exit_when_done` // exit when queue is empty, after this flag is set no more items can be pushed in the queue

Use it like this

```
enum Type {
kType1
};
enum GroupType {
kGroup1
};
small::group_queue<Type, int, GroupType> q;
q.add_type_group( Type::kType1, GroupType::kGroup1 ); // set the group for the type
...
q.push_back( small::EnumPriorities::kNormal, Type::kType1, 1 );
...
// on some thread
std::pair<Type, int> e{};
auto ret = q.wait_pop_front( GroupType::kGroup1, &e );
// or wait_pop_front_for( std::chrono::minutes( 1 ), GroupType::kGroup1, &e );
// ret can be small::EnumLock::kExit, small::EnumLock::kTimeout or ret == small::EnumLock::kElement
if ( ret == small::EnumLock::kElement )
{
// do something with e
...
}
...
// on main thread no more processing (aborting work)
q.signal_exit_force(); // q.signal_exit_when_done()
...
// make sure that all calls to wait_* are finished before calling destructor (like it is done in worker_thread)
```

#

### worker_thread

A class that creates several threads for producer/consumer
Expand Down
51 changes: 51 additions & 0 deletions examples/examples_group_queue.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#include <algorithm>
#include <chrono>
#include <condition_variable>
#include <cstdio>
#include <iostream>
#include <map>
#include <thread>

#include "../include/group_queue.h"

namespace examples::group_queue {
//
// example 1
//
int Example1()
{
std::cout << "Group Queue example 1\n";

enum class Type
{
kType1
};
enum class GroupType
{
kGroup1
};

small::group_queue<Type, int, GroupType> q;
q.add_type_group(Type::kType1, GroupType::kGroup1);

q.push_back(small::EnumPriorities::kNormal, Type::kType1, 1);

// on some thread
std::pair<Type, int> e{};
auto ret = q.wait_pop_front(GroupType::kGroup1, &e);
// or wait_pop_front_for( std::chrono::minutes( 1 ), GroupType:kGroup1, &e );
// ret can be small::EnumLock::kExit, small::EnumLock::kTimeout or ret == small::EnumLock::kElement
if (ret == small::EnumLock::kElement) {
// do something with e
std::cout << "elem from group q has type " << (int)e.first << ", " << e.second << "\n";
}

// on main thread no more processing (aborting work)
q.signal_exit_force(); // q.signal_exit_when_done()

std::cout << "Groups Queue example 1 finish\n\n";

return 0;
}

} // namespace examples::group_queue
43 changes: 2 additions & 41 deletions examples/examples_jobs_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,46 +14,7 @@ namespace examples::jobs_engine {
//
int Example1()
{
std::cout << "Jobs Queue example 1\n";

enum JobType
{
kJob1
};
enum JobGroupType
{
kJobGroup1
};

small::jobs_queue<JobType, int, JobGroupType> q;
q.set_job_type_group(JobType::kJob1, JobGroupType::kJobGroup1);

q.push_back(small::EnumPriorities::kNormal, JobType::kJob1, 1);

// on some thread
std::pair<JobType, int> e{};
auto ret = q.wait_pop_front(JobGroupType ::kJobGroup1, &e);
// or wait_pop_front_for( std::chrono::minutes( 1 ), JobGroupType:kJobGroup1, &e );
// ret can be small::EnumLock::kExit, small::EnumLock::kTimeout or ret == small::EnumLock::kElement
if (ret == small::EnumLock::kElement) {
// do something with e
std::cout << "elem from jobs q " << e.first << ", " << e.second << "\n";
}

// on main thread no more processing (aborting work)
q.signal_exit_force(); // q.signal_exit_when_done()

std::cout << "Jobs Queue example 1 finish\n\n";

return 0;
}

//
// example 2
//
int Example2()
{
std::cout << "Jobs Engine example 2\n";
std::cout << "Jobs Engine example 1\n";

using qc = std::pair<int, std::string>;

Expand Down Expand Up @@ -116,7 +77,7 @@ namespace examples::jobs_engine {
std::cout << "wait for with timeout, ret = " << static_cast<int>(ret) << " as timeout\n";
jobs.wait(); // wait here for jobs to finish due to exit flag

std::cout << "Jobs Engine example 2 finish\n\n";
std::cout << "Jobs Engine example 1 finish\n\n";

return 0;
}
Expand Down
Loading

0 comments on commit 5a30fed

Please sign in to comment.