Skip to content

Commit

Permalink
Add multiple features for jobs engine like parent child dependencies,…
Browse files Browse the repository at this point in the history
… throttling with sleep between requests, timeout for processing
  • Loading branch information
herrcristi committed Jan 7, 2025
1 parent b74225b commit b4a8fee
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 32 deletions.
78 changes: 49 additions & 29 deletions include/impl/jobs_item_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,16 @@
#include "../base_lock.h"

namespace small::jobsimpl {
// a job can be in the following states
// a job can be in the following states (order is important because it may progress only to higher states)
enum class EnumJobsState : unsigned int
{
kNone = 0,
kInProgress,
kWaitChildren,
kFinished,
kTimeout,
kFailed,
kCancelled,
kTimeout
};

// a job item
Expand All @@ -27,44 +28,49 @@ namespace small::jobsimpl {
{
using JobsID = unsigned long long;

JobsID id{}; // job unique id
JobsTypeT type{}; // job type
std::atomic<EnumJobsState> state{EnumJobsState::kNone}; // job state
std::atomic<int> progress{}; // progress 0-100 for state kInProgress
JobsRequestT request{}; // request needed for processing function
JobsResponseT response{}; // where the results are saved (for the finished callback if exists)
// TODO add parents and children ids
JobsID m_id{}; // job unique id
JobsTypeT m_type{}; // job type
std::atomic<EnumJobsState> m_state{EnumJobsState::kNone}; // job state
std::atomic<int> m_progress{}; // progress 0-100 for state kInProgress
std::vector<JobsID> m_parentIDs{}; // for dependencies relationships parent-child
std::vector<JobsID> m_childrenIDs{}; // for dependencies relationships parent-child
JobsRequestT m_request{}; // request needed for processing function
JobsResponseT m_response{}; // where the results are saved (for the finished callback if exists)

explicit jobs_item() = default;
explicit jobs_item(const JobsID &jobs_id, const JobsTypeT &jobs_type, const JobsRequestT &jobs_request)
: id(jobs_id), type(jobs_type), request(jobs_request) {}
: m_id(jobs_id), m_type(jobs_type), m_request(jobs_request) {}
explicit jobs_item(const JobsTypeT &jobs_type, const JobsRequestT &jobs_request)
: type(jobs_type), request(jobs_request) {}
: m_type(jobs_type), m_request(jobs_request) {}
explicit jobs_item(const JobsID &jobs_id, const JobsTypeT &jobs_type, JobsRequestT &&jobs_request)
: id(jobs_id), type(jobs_type), request(std::forward<JobsRequestT>(jobs_request)) {}
: m_id(jobs_id), m_type(jobs_type), m_request(std::forward<JobsRequestT>(jobs_request)) {}
explicit jobs_item(const JobsTypeT &jobs_type, JobsRequestT &&jobs_request)
: type(jobs_type), request(std::forward<JobsRequestT>(jobs_request)) {}
: m_type(jobs_type), m_request(std::forward<JobsRequestT>(jobs_request)) {}

jobs_item(const jobs_item &other) { operator=(other); };
jobs_item(jobs_item &&other) noexcept { operator=(other); };
jobs_item &operator=(const jobs_item &other)
{
id = other.id;
type = other.type;
state = other.state.load();
progress = other.progress.load();
request = other.request;
response = other.response;
m_id = other.m_id;
m_type = other.m_type;
m_state = other.m_state.load();
m_progress = other.m_progress.load();
m_parentIDs = other.m_parentIDs;
m_childrenIDs = other.m_childrenIDs;
m_request = other.m_request;
m_response = other.m_response;
return *this;
}
jobs_item &operator=(jobs_item &&other) noexcept
{
id = std::move(other.id);
type = std::move(other.type);
state = other.state.load();
progress = other.progress.load();
request = std::move(other.request);
response = std::move(other.response);
m_id = std::move(other.m_id);
m_type = std::move(other.m_type);
m_state = other.m_state.load();
m_progress = other.m_progress.load();
m_parentIDs = std::move(other.m_parentIDs);
m_childrenIDs = std::move(other.m_childrenIDs);
m_request = std::move(other.m_request);
m_response = std::move(other.m_response);
return *this;
}

Expand All @@ -74,27 +80,41 @@ namespace small::jobsimpl {
inline void set_state(const EnumJobsState &new_state)
{
for (;;) {
EnumJobsState current_state = state.load();
EnumJobsState current_state = m_state.load();
if (current_state >= new_state) {
return;
}
if (state.compare_exchange_weak(current_state, new_state)) {
if (m_state.compare_exchange_weak(current_state, new_state)) {
return;
}
}
}

// clang-format off
inline void set_state_inprogress () { set_state(EnumJobsState::kInProgress); }
inline void set_state_waitchildren () { set_state(EnumJobsState::kWaitChildren); }
inline void set_state_finished () { set_state(EnumJobsState::kFinished); }
inline void set_state_timeout () { set_state(EnumJobsState::kTimeout); }
inline void set_state_failed () { set_state(EnumJobsState::kFailed); }
inline void set_state_cancelled () { set_state(EnumJobsState::kCancelled); }

inline bool is_state_inprogress () { return m_state.load() == EnumJobsState::kInProgress; }
inline bool is_state_finished () { return m_state.load() == EnumJobsState::kFinished; }
inline bool is_state_timeout () { return m_state.load() == EnumJobsState::kTimeout; }
// clang-format on

//
// set job progress (can only increase)
//

inline void set_progress(const int &new_progress)
{
for (;;) {
int current_progress = progress.load();
int current_progress = m_progress.load();
if (current_progress >= new_progress) {
return;
}
if (progress.compare_exchange_weak(current_progress, new_progress)) {
if (m_progress.compare_exchange_weak(current_progress, new_progress)) {
return;
}
}
Expand Down
6 changes: 3 additions & 3 deletions include/worker_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ namespace small {
template <typename _Callable, typename... Args>
worker_thread(const config_worker_thread config, _Callable function, Args... extra_parameters)
: m_config(config),
m_processing_function(std::bind(std::forward<_Callable>(function), std::ref(*this), std::placeholders::_1 /*item*/, std::forward<Args>(extra_parameters)...))
m_function_processing(std::bind(std::forward<_Callable>(function), std::ref(*this), std::placeholders::_1 /*item*/, std::forward<Args>(extra_parameters)...))
{
// auto start threads if count > 0 otherwise threads should be manually started
if (config.threads_count) {
Expand Down Expand Up @@ -310,7 +310,7 @@ namespace small {
// callback for queue_items
inline void process_items(std::vector<T> &&items)
{
m_processing_function(std::forward<std::vector<T>>(items)); // bind the std::placeholders::_1
m_function_processing(std::forward<std::vector<T>>(items)); // bind the std::placeholders::_1
}

private:
Expand All @@ -320,6 +320,6 @@ namespace small {
config_worker_thread m_config; // config
small::lock_queue_thread<T, small::worker_thread<T>> m_queue_items{*this}; // queue of items
small::time_queue_thread<T, small::worker_thread<T>> m_delayed_items{*this}; // queue of delayed items
std::function<void(const std::vector<T> &)> m_processing_function{}; // processing Function
std::function<void(const std::vector<T> &)> m_function_processing{}; // processing Function
};
} // namespace small

0 comments on commit b4a8fee

Please sign in to comment.