Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add multiple features for jobs engine like parent child dependencies, throttling with sleep between requests, timeout for processing #29

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .clang-format
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ AlignConsecutiveAssignments:
AlignFunctionDeclarations: true
AlignConsecutiveBitFields: true
AlignConsecutiveMacros: true
# PointerAlignment: Left
BraceWrapping:
AfterEnum: true
AfterStruct: true
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -427,15 +427,15 @@ JobsEng::JobsConfig config{
// create jobs engine
JobsEng jobs(config);
...
jobs.add_default_processing_function([](auto &j /*this jobs engine*/, const auto &jobs_items) {
jobs.config_default_function_processing([](auto &j /*this jobs engine*/, const auto &jobs_items) {
for (auto &item : jobs_items) {
...
}
...
});
...
// add specific function for job1 (calling the function from jobs intead of config allows to pass the engine and extra param)
jobs.add_job_processing_function(JobsType::kJobsType1, [](auto &j /*this jobs engine*/, const auto &jobs_items, auto b /*extra param b*/) {
jobs.config_jobs_function_processing(JobsType::kJobsType1, [](auto &j /*this jobs engine*/, const auto &jobs_items, auto b /*extra param b*/) {
for (auto &item : jobs_items) {
...
}
Expand Down
249 changes: 191 additions & 58 deletions examples/examples_jobs_engine.h

Large diffs are not rendered by default.

14 changes: 7 additions & 7 deletions include/impl/jobs_engine_thread_pool_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ namespace small::jobsimpl {
// config processing by job group type
// this should be done in the initial setup phase once
//
inline void add_job_group(const JobGroupT &job_group, const int &threads_count)
inline void config_jobs_group(const JobGroupT &job_group, const int &threads_count)
{
m_scheduler[job_group].m_threads_count = threads_count;
}
Expand All @@ -63,7 +63,7 @@ namespace small::jobsimpl {
// when items are added to be processed in parent class the start scheduler should be called
// to trigger action (if needed for the new job group)
//
inline void job_start(const JobGroupT &job_group)
inline void jobs_schedule(const JobGroupT &job_group)
{
auto it = m_scheduler.find(job_group); // map is not changed, so can be access without locking
if (it == m_scheduler.end()) {
Expand All @@ -73,7 +73,7 @@ namespace small::jobsimpl {
// even if here it is considered that there are items and something will be scheduled,
// the actual check if work will still exists will be done in do_action of parent
auto &stats = it->second;
job_action_start(job_group, true, stats);
jobs_action_start(job_group, true, stats);
}

// clang-format off
Expand Down Expand Up @@ -129,7 +129,7 @@ namespace small::jobsimpl {
//
// to trigger action (if needed for the new job group)
//
inline void job_action_start(const JobGroupT &job_group, const bool has_items, JobGroupStats &stats)
inline void jobs_action_start(const JobGroupT &job_group, const bool has_items, JobGroupStats &stats)
{
if (!has_items) {
return;
Expand All @@ -148,7 +148,7 @@ namespace small::jobsimpl {
//
// job action ended
//
inline void job_action_end(const JobGroupT &job_group, const bool has_items)
inline void jobs_action_end(const JobGroupT &job_group, const bool has_items)
{
auto it = m_scheduler.find(job_group); // map is not changed, so can be access without locking
if (it == m_scheduler.end()) {
Expand All @@ -160,7 +160,7 @@ namespace small::jobsimpl {
auto &stats = it->second;
--stats.m_running;

job_action_start(job_group, has_items, stats);
jobs_action_start(job_group, has_items, stats);
}

//
Expand All @@ -174,7 +174,7 @@ namespace small::jobsimpl {
m_parent_caller.do_action(job_group, &has_items);

// start another action
job_action_end(job_group, has_items);
jobs_action_end(job_group, has_items);
}
}

Expand Down
123 changes: 94 additions & 29 deletions include/impl/jobs_item_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,16 @@
#include "../base_lock.h"

namespace small::jobsimpl {
// a job can be in the following states
// a job can be in the following states (order is important because it may progress only to higher states)
enum class EnumJobsState : unsigned int
{
kNone = 0,
kInProgress,
kWaitChildren,
kFinished,
kTimeout,
kFailed,
kCancelled,
kTimeout
};

// a job item
Expand All @@ -27,43 +28,57 @@ namespace small::jobsimpl {
{
using JobsID = unsigned long long;

JobsID id{}; // job unique id
JobsTypeT type{}; // job type
std::atomic<EnumJobsState> state{EnumJobsState::kNone}; // job state
std::atomic<int> progress{}; // progress 0-100 for state kInProgress
JobsRequestT request{}; // request needed for processing function
JobsResponseT response{}; // where the results are saved (for the finished callback if exists)
JobsID m_id{}; // job unique id
JobsTypeT m_type{}; // job type
std::atomic<EnumJobsState> m_state{EnumJobsState::kNone}; // job state
std::atomic<int> m_progress{}; // progress 0-100 for state kInProgress
std::atomic_bool m_has_parents{}; // for dependencies relationships parent-child
std::atomic_bool m_has_children{}; // for dependencies relationships parent-child
std::vector<JobsID> m_parentIDs{}; // for dependencies relationships parent-child
std::vector<JobsID> m_childrenIDs{}; // for dependencies relationships parent-child
JobsRequestT m_request{}; // request needed for processing function
JobsResponseT m_response{}; // where the results are saved (for the finished callback if exists)

explicit jobs_item() = default;

explicit jobs_item(const JobsID &jobs_id, const JobsTypeT &jobs_type, const JobsRequestT &jobs_request)
: id(jobs_id), type(jobs_type), request(jobs_request) {}
explicit jobs_item(const JobsTypeT &jobs_type, const JobsRequestT &jobs_request)
: type(jobs_type), request(jobs_request) {}
: m_id(jobs_id), m_type(jobs_type), m_request(jobs_request) {}
explicit jobs_item(const JobsID &jobs_id, const JobsTypeT &jobs_type, JobsRequestT &&jobs_request)
: id(jobs_id), type(jobs_type), request(std::forward<JobsRequestT>(jobs_request)) {}
: m_id(jobs_id), m_type(jobs_type), m_request(std::forward<JobsRequestT>(jobs_request)) {}

explicit jobs_item(const JobsTypeT &jobs_type, const JobsRequestT &jobs_request)
: m_type(jobs_type), m_request(jobs_request) {}
explicit jobs_item(const JobsTypeT &jobs_type, JobsRequestT &&jobs_request)
: type(jobs_type), request(std::forward<JobsRequestT>(jobs_request)) {}
: m_type(jobs_type), m_request(std::forward<JobsRequestT>(jobs_request)) {}

jobs_item(const jobs_item &other) { operator=(other); };
jobs_item(jobs_item &&other) noexcept { operator=(other); };
jobs_item &operator=(const jobs_item &other)
{
id = other.id;
type = other.type;
state = other.state.load();
progress = other.progress.load();
request = other.request;
response = other.response;
m_id = other.m_id;
m_type = other.m_type;
m_state = other.m_state.load();
m_progress = other.m_progress.load();
m_has_parents = other.m_has_parents.load();
m_has_children = other.m_has_children.load();
m_parentIDs = other.m_parentIDs;
m_childrenIDs = other.m_childrenIDs;
m_request = other.m_request;
m_response = other.m_response;
return *this;
}
jobs_item &operator=(jobs_item &&other) noexcept
{
id = std::move(other.id);
type = std::move(other.type);
state = other.state.load();
progress = other.progress.load();
request = std::move(other.request);
response = std::move(other.response);
m_id = std::move(other.m_id);
m_type = std::move(other.m_type);
m_state = other.m_state.load();
m_progress = other.m_progress.load();
m_has_parents = other.m_has_parents.load();
m_has_children = other.m_has_children.load();
m_parentIDs = std::move(other.m_parentIDs);
m_childrenIDs = std::move(other.m_childrenIDs);
m_request = std::move(other.m_request);
m_response = std::move(other.m_response);
return *this;
}

Expand All @@ -73,31 +88,81 @@ namespace small::jobsimpl {
inline void set_state(const EnumJobsState &new_state)
{
for (;;) {
EnumJobsState current_state = state.load();
EnumJobsState current_state = m_state.load();
if (current_state >= new_state) {
return;
}
if (state.compare_exchange_weak(current_state, new_state)) {
if (m_state.compare_exchange_weak(current_state, new_state)) {
return;
}
}
}

// clang-format off
inline void set_state_inprogress () { set_state(EnumJobsState::kInProgress); }
inline void set_state_waitchildren () { set_state(EnumJobsState::kWaitChildren); }
inline void set_state_finished () { set_state(EnumJobsState::kFinished); }
inline void set_state_timeout () { set_state(EnumJobsState::kTimeout); }
inline void set_state_failed () { set_state(EnumJobsState::kFailed); }
inline void set_state_cancelled () { set_state(EnumJobsState::kCancelled); }

static bool is_state_complete (const EnumJobsState &state) { return state >= EnumJobsState::kFinished; }

inline EnumJobsState get_state () const { return m_state.load(); }
inline bool is_state (const EnumJobsState &state) const { return get_state() == state; }
inline bool is_complete () const { return is_state_complete(get_state()); }

inline bool is_state_inprogress () const { return is_state(EnumJobsState::kInProgress); }
inline void is_state_waitchildren () const { return is_state(EnumJobsState::kWaitChildren); }
inline bool is_state_finished () const { return is_state(EnumJobsState::kFinished); }
inline bool is_state_timeout () const { return is_state(EnumJobsState::kTimeout); }
inline void is_state_failed () const { return is_state(EnumJobsState::kFailed); }
inline void is_state_cancelled () const { return is_state(EnumJobsState::kCancelled); }
// clang-format on

//
// set job progress (can only increase)
//
inline void set_progress(const int &new_progress)
{
for (;;) {
int current_progress = progress.load();
int current_progress = m_progress.load();
if (current_progress >= new_progress) {
return;
}
if (progress.compare_exchange_weak(current_progress, new_progress)) {
if (m_progress.compare_exchange_weak(current_progress, new_progress)) {
return;
}
}
}

//
// add child
//
inline void add_child(const JobsID &child_jobs_id)
{
m_childrenIDs.push_back(child_jobs_id); // this should be set under locked area
m_has_children = true;
}

inline bool has_children() const
{
return m_has_children.load();
}

//
// add parent
//
inline void add_parent(const JobsID &parent_jobs_id)
{
m_parentIDs.push_back(parent_jobs_id); // this should be set under locked area
m_has_parents = true;
}

inline bool has_parents() const
{
return m_has_parents.load();
}
};

} // namespace small::jobsimpl
Loading
Loading