Skip to content

Commit

Permalink
Refactor jobs engine into multiple files
Browse files Browse the repository at this point in the history
  • Loading branch information
herrcristi committed Jan 4, 2025
1 parent dd53135 commit 1565624
Show file tree
Hide file tree
Showing 9 changed files with 158 additions and 136 deletions.
5 changes: 3 additions & 2 deletions examples/examples_jobs_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ namespace examples::jobs_engine {
{small::EnumPriorities::kNormal, 2},
{small::EnumPriorities::kLow, 1}}}}, // overall config with default priorities
.m_default_processing_function = jobs_processing_function, // default processing function, better use jobs.add_default_processing_function to set it
.m_groups = {{JobsGroupType::kJobsGroup1, {.m_threads_count = 1}}}, // config by jobs group
.m_types = {
.m_groups = {
{JobsGroupType::kJobsGroup1, {.m_threads_count = 1}}}, // config by jobs group
.m_types = {
{JobsType::kJobsType1, {.m_group = JobsGroupType::kJobsGroup1}},
{JobsType::kJobsType2, {.m_group = JobsGroupType::kJobsGroup1}},
}};
Expand Down
2 changes: 1 addition & 1 deletion include/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ namespace small {
//
// class for representing a buffer
//
class buffer : public base_buffer
class buffer : public small::bufferimpl::base_buffer
{
public:
// buffer (allocates in chunks)
Expand Down
4 changes: 2 additions & 2 deletions include/impl/base_buffer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
#include <type_traits>
#include <vector>

namespace small {
namespace small::bufferimpl {
// class for representing a base_buffer that implements
// all the needed functions and operators
// it must be supplied with derived class with proper functions
Expand Down Expand Up @@ -457,4 +457,4 @@ namespace small {
std::size_t m_buffer_length{0};
};

} // namespace small
} // namespace small::bufferimpl
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

#include <unordered_map>

#include "worker_thread.h"
#include "../worker_thread.h"

namespace small {
namespace small::jobsimpl {

//
// helper class for jobs_engine to execute group of jobs (parent caller must implement 'do_action')
Expand Down Expand Up @@ -188,7 +188,7 @@ namespace small {
//
struct JobWorkerThreadFunction
{
void operator()(small::worker_thread<JobGroupT> &, const std::vector<JobGroupT> &items, small::jobs_engine_thread_pool<JobGroupT, ParentCallerT> *pThis) const
void operator()(small::worker_thread<JobGroupT> &, const std::vector<JobGroupT> &items, jobs_engine_thread_pool<JobGroupT, ParentCallerT> *pThis) const
{
pThis->thread_function(items);
}
Expand All @@ -198,4 +198,4 @@ namespace small {
small::worker_thread<JobGroupT> m_workers{{.threads_count = 0}, JobWorkerThreadFunction(), this};
ParentCallerT &m_parent_caller; // parent jobs engine
};
} // namespace small
} // namespace small::jobsimpl
97 changes: 97 additions & 0 deletions include/impl/jobs_item_impl.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
#pragma once

#include <atomic>
#include <deque>
#include <functional>
#include <queue>
#include <unordered_map>
#include <vector>

#include "../base_lock.h"

namespace small::jobsimpl {
// a job can be in the following states
enum class EnumJobsState : unsigned int
{
kNone = 0,
kInProgress,
kFinished,
kFailed,
kCancelled,
kTimeout
};

// a job item
template <typename JobsTypeT, typename JobsRequestT, typename JobsResponseT>
struct jobs_item
{
using JobsID = unsigned long long;

JobsID id{}; // job unique id
JobsTypeT type{}; // job type
// std::atomic<EnumJobsState> state{}; // job state
// std::atomic<int> progress{}; // progress 0-100 for state kInProgress
EnumJobsState state{}; // job state
int progress{}; // progress 0-100 for state kInProgress
JobsRequestT request{}; // request needed for processing function
JobsResponseT response{}; // where the results are saved (for the finished callback if exists)

// explicit jobs_item() = default;
// explicit jobs_item(std::initializer_list<jobs_item>) {};
// jobs_item(const jobs_item &other) { operator=(other); };
// jobs_item(jobs_item &&other) { operator=(other); };
// jobs_item &operator=(const jobs_item &other)
// {
// id = other.id;
// type = other.type;
// state = other.state.load();
// progress = other.progress.load();
// request = other.request;
// response = other.response;
// return *this;
// }
// jobs_item &operator=(jobs_item &&other)
// {
// id = std::move(other.id);
// type = std::move(other.type);
// state = other.state.load();
// progress = other.progress.load();
// request = std::move(other.request);
// response = std::move(other.response);
// return *this;
// }

//
// set job state (can only go from lower to upper state)
//
inline void set_state(const EnumJobsState &new_state)
{
// for (;;) {
// EnumJobsState current_state = state.load();
// if (current_state >= new_state) {
// return;
// }
// if (state.compare_exchange_weak(current_state, new_state)) {
// return;
// }
// }
}

//
// set job progress (can only increase)
//
inline void set_progress(const int &new_progress)
{
// for (;;) {
// int current_progress = progress.load();
// if (current_progress >= new_progress) {
// return;
// }
// if (progress.compare_exchange_weak(current_progress, new_progress)) {
// return;
// }
// }
}
};

} // namespace small::jobsimpl
20 changes: 8 additions & 12 deletions include/jobs_queue.h → include/impl/jobs_queue_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,24 @@

#include <unordered_map>

#include "jobs_item.h"
#include "prio_queue.h"
#include "time_queue_thread.h"
#include "../prio_queue.h"
#include "../time_queue_thread.h"

namespace small {
#include "jobs_item_impl.h"

namespace small::jobsimpl {
//
// small queue helper class for jobs (parent caller must implement 'jobs_activate')
//
template <typename JobsTypeT, typename JobsRequestT, typename JobsResponseT, typename JobsGroupT, typename JobsPrioT, typename ParentCallerT>
class jobs_queue
{
public:
using JobsItem = typename small::jobs_item<JobsTypeT, JobsRequestT, JobsResponseT>;
using JobsItem = typename small::jobsimpl::jobs_item<JobsTypeT, JobsRequestT, JobsResponseT>;
using JobsID = typename JobsItem::JobsID;
using JobsQueue = small::prio_queue<JobsID, JobsPrioT>;

using ThisJobsQueue = small::jobs_queue<JobsTypeT, JobsRequestT, JobsResponseT, JobsGroupT, JobsPrioT, ParentCallerT>;
using ThisJobsQueue = jobs_queue<JobsTypeT, JobsRequestT, JobsResponseT, JobsGroupT, JobsPrioT, ParentCallerT>;

using JobDelayedItems = std::tuple<JobsPrioT, JobsTypeT, JobsID>;
using JobQueueDelayedT = small::time_queue_thread<JobDelayedItems, ThisJobsQueue>;
Expand All @@ -33,11 +34,6 @@ namespace small {
explicit jobs_queue(ParentCallerT &parent_caller)
: m_parent_caller(parent_caller) {}

~jobs_queue()
{
wait();
}

// size of active items
inline size_t size()
{
Expand Down Expand Up @@ -467,4 +463,4 @@ namespace small {

ParentCallerT &m_parent_caller; // jobs engine
};
} // namespace small
} // namespace small::jobsimpl
15 changes: 8 additions & 7 deletions include/jobs_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,38 +2,39 @@

#include <unordered_map>

#include "jobs_item.h"
#include "prio_queue.h"

#include "impl/jobs_item_impl.h"

namespace small {
//
// small class for jobs config
// - setup how many threads to use overall, and the priorities
// - for each job group how many threads to use
// - for each job type how to process and to which group it belongs
//
template <typename JobsTypeT, typename JobsRequestT, typename JobsResponseT, typename JobsGroupT = JobsTypeT, typename JobsPrioT = EnumPriorities>
template <typename JobsTypeT, typename JobsRequestT, typename JobsResponseT, typename JobsGroupT, typename JobsPrioT>
struct jobs_config
{
using JobsItem = typename small::jobs_item<JobsTypeT, JobsRequestT, JobsResponseT>;
using JobsItem = typename small::jobsimpl::jobs_item<JobsTypeT, JobsRequestT, JobsResponseT>;
using ProcessingFunction = std::function<void(const std::vector<JobsItem *> &)>;

// config the entire jobs engine
// config for the entire jobs engine
struct ConfigJobsEngine
{
int m_threads_count{8}; // how many total threads for processing
small::config_prio_queue<JobsPrioT> m_config_prio{};
};

// config an individual job type
// config for an individual job type
struct ConfigJobsType
{
JobsGroupT m_group{}; // job type group (multiple job types can be configured to same group)
bool m_has_processing_function{false}; // use default processing function
ProcessingFunction m_processing_function{}; // processing Function
};

// config the job group (where job types can be grouped)
// config for the job group (where job types can be grouped)
struct ConfigJobsGroup
{
int m_threads_count{1}; // how many threads for processing (out of the global threads)
Expand All @@ -45,7 +46,7 @@ namespace small {
std::unordered_map<JobsGroupT, ConfigJobsGroup> m_groups; // config by jobs group
std::unordered_map<JobsTypeT, ConfigJobsType> m_types; // config by jobs type

// processing function
// default processing function
inline void add_default_processing_function(ProcessingFunction processing_function)
{
m_default_processing_function = processing_function;
Expand Down
73 changes: 35 additions & 38 deletions include/jobs_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

#include <unordered_map>

#include "impl/jobs_engine_thread_pool_impl.h"
#include "impl/jobs_item_impl.h"
#include "impl/jobs_queue_impl.h"
#include "jobs_config.h"
#include "jobs_engine_thread_pool.h"
#include "jobs_item.h"
#include "jobs_queue.h"

// enum class JobsType
// {
Expand All @@ -17,51 +17,48 @@
// kJobsGroup1
// };
//
// using JobsRequest = std::pair<int, std::string>;
// using JobsResponse = int;
// using JobsEng = small::jobs_engine<JobsType, JobsRequest, JobsResponse, JobsGroupType>;
// using Request = std::pair<int, std::string>;
// using JobsEng = small::jobs_engine<JobsType, Request, int /*response*/, JobsGroupType>;
//
// JobsEng jobs(
// {.threads_count = 0 /*dont start any thread yet*/}, // overall config with default priorities
// {.threads_count = 1, .bulk_count = 1}, // default jobs group config
// {.group = JobsGroupType::kJobsGroup1}, // default jobs type config
// [](auto &j /*this*/, const auto &items) {
// for (auto &item : items) {
// ...
// }
// });
//
// jobs.add_jobs_group(JobsGroupType::kJobsGroup1, {.threads_count = 1});
// JobsEng::JobsConfig config{
// .m_engine = {.m_threads_count = 0 /*dont start any thread yet*/}, // overall config with default priorities
// .m_groups = {
// {JobsGroupType::kJobsGroup1, {.m_threads_count = 1}}}, // config by jobs group
// .m_types = {
// {JobsType::kJobsType1, {.m_group = JobsGroupType::kJobsGroup1}},
// {JobsType::kJobsType2, {.m_group = JobsGroupType::kJobsGroup1}},
// }};
//
// // create jobs engine
// JobsEng jobs(config);
//
// jobs.add_default_processing_function([](auto &j /*this jobs engine*/, const auto &jobs_items) {
// for (auto &item : jobs_items) {
// ...
// }
// });
//
// // add specific function for job1
// jobs.add_jobs_type(JobsType::kJobsType1, {.group = JobsGroupType::kJobsGroup1}, [](auto &j /*this*/, const auto &items, auto b /*extra param b*/) {
// for (auto &item : items) {
// ...
// }
// jobs.add_job_processing_function(JobsType::kJobsType1, [](auto &j /*this jobs engine*/, const auto &jobs_items, auto b /*extra param b*/) {
// for (auto &item : jobs_items) {
// ...
// }
// }, 5 /*param b*/);
//
// // use default config and default function for job2
// jobs.add_jobs_type(JobsType::kJobsType2);
//
// JobsEng::JobsID jobs_id{};
// std::vector<JobsEng::JobsID> jobs_ids;
//
// // push
// jobs.push_back(small::EnumPriorities::kNormal, JobsType::kJobsType1, {1, "normal"}, &jobs_id);
// jobs.push_back(small::EnumPriorities::kHigh, {.type = JobsType::kJobsType1, .request = {4, "high"}}, &jobs_id);
//
// std::vector<JobsEng::JobsItem> jobs_items = {{.type = JobsType::kJobsType1, .request = {7, "highest"}}};
// jobs.push_back(small::EnumPriorities::kHighest, jobs_items, &jobs_ids);
//
// jobs.push_back_delay_for(std::chrono::milliseconds(300), small::EnumPriorities::kNormal, JobsType::kJobsType1, {100, "delay normal"}, &jobs_id);
// jobs.queue().push_back(small::EnumPriorities::kNormal, JobsType::kJobsType1, {1, "normal"}, &jobs_id);
// jobs.queue().push_back_delay_for(std::chrono::milliseconds(300), small::EnumPriorities::kNormal, JobsType::kJobsType1, {100, "delay normal"}, &jobs_id);
//
// jobs.start_threads(3); // manual start threads
//
// // jobs.signal_exit_force();
// auto ret = jobs.wait_for(std::chrono::milliseconds(100)); // wait to finished
// ...
// jobs.wait(); // wait here for jobs to finish due to exit flag
//
// for a more complex example see examples/examples_jobs_engine.h

namespace small {

Expand All @@ -74,8 +71,8 @@ namespace small {
public:
using ThisJobsEngine = small::jobs_engine<JobsTypeT, JobsRequestT, JobsResponseT, JobsGroupT, JobsPrioT>;
using JobsConfig = small::jobs_config<JobsTypeT, JobsRequestT, JobsResponseT, JobsGroupT, JobsPrioT>;
using JobsItem = small::jobs_item<JobsTypeT, JobsRequestT, JobsResponseT>;
using JobsQueue = small::jobs_queue<JobsTypeT, JobsRequestT, JobsResponseT, JobsGroupT, JobsPrioT, ThisJobsEngine>;
using JobsItem = small::jobsimpl::jobs_item<JobsTypeT, JobsRequestT, JobsResponseT>;
using JobsQueue = small::jobsimpl::jobs_queue<JobsTypeT, JobsRequestT, JobsResponseT, JobsGroupT, JobsPrioT, ThisJobsEngine>;
using JobsID = typename JobsItem::JobsID;
using TimeClock = typename JobsQueue::TimeClock;
using TimeDuration = typename JobsQueue::TimeDuration;
Expand Down Expand Up @@ -268,7 +265,7 @@ namespace small {
//
// inner thread function for executing items (should return if there are more items)
//
friend small::jobs_engine_thread_pool<JobsGroupT, ThisJobsEngine>;
friend small::jobsimpl::jobs_engine_thread_pool<JobsGroupT, ThisJobsEngine>;

inline EnumLock do_action(const JobsGroupT &jobs_group, bool *has_items)
{
Expand Down Expand Up @@ -340,8 +337,8 @@ namespace small {
//
// members
//
JobsConfig m_config;
JobsQueue m_queue{*this};
small::jobs_engine_thread_pool<JobsGroupT, ThisJobsEngine> m_thread_pool{*this}; // for processing items (by group) using a pool of threads
JobsConfig m_config;
JobsQueue m_queue{*this};
small::jobsimpl::jobs_engine_thread_pool<JobsGroupT, ThisJobsEngine> m_thread_pool{*this}; // for processing items (by group) using a pool of threads
};
} // namespace small
Loading

0 comments on commit 1565624

Please sign in to comment.