diff --git a/CMakeLists.txt b/CMakeLists.txt index 37b728f66..d6418ba74 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -97,7 +97,8 @@ if(thallium_FOUND) endif() # Boost -find_package(Boost REQUIRED COMPONENTS regex system filesystem fiber REQUIRED) +# find_package(Boost REQUIRED COMPONENTS regex system filesystem fiber REQUIRED) +find_package(Boost REQUIRED COMPONENTS fiber REQUIRED) if (Boost_FOUND) message(STATUS "found boost at ${Boost_INCLUDE_DIRS}") endif() diff --git a/include/hermes/bucket.h b/include/hermes/bucket.h index f607d6ef6..690c29826 100644 --- a/include/hermes/bucket.h +++ b/include/hermes/bucket.h @@ -393,7 +393,7 @@ class Bucket { * */ void ReorganizeBlob(const BlobId &blob_id, float score) { - blob_mdm_->AsyncReorganizeBlobRoot(id_, blob_id, score, 0); + blob_mdm_->AsyncReorganizeBlobRoot(id_, blob_id, score, 0, true); } /** @@ -402,7 +402,7 @@ class Bucket { void ReorganizeBlob(const BlobId &blob_id, float score, Context &ctx) { - blob_mdm_->AsyncReorganizeBlobRoot(id_, blob_id, score, 0); + blob_mdm_->AsyncReorganizeBlobRoot(id_, blob_id, score, 0, true); } /** @@ -412,7 +412,7 @@ class Bucket { float score, u32 node_id, Context &ctx) { - blob_mdm_->AsyncReorganizeBlobRoot(id_, blob_id, score, node_id); + blob_mdm_->AsyncReorganizeBlobRoot(id_, blob_id, score, node_id, true); } /** diff --git a/include/hermes/hermes_types.h b/include/hermes/hermes_types.h index 5ba7228dd..822540252 100644 --- a/include/hermes/hermes_types.h +++ b/include/hermes/hermes_types.h @@ -278,10 +278,12 @@ struct BlobInfo { size_t blob_size_; /**< The overall size of the blob */ size_t max_blob_size_; /**< The amount of space current buffers support */ float score_; /**< The priority of this blob */ + float user_score_; /**< The user-defined priority of this blob */ std::atomic access_freq_; /**< Number of times blob accessed in epoch */ hshm::Timepoint last_access_; /**< Last time blob accessed */ std::atomic mod_count_; /**< The number of times blob modified */ std::atomic last_flush_; /**< The last mod that was flushed */ + bitfield32_t flags_; /**< Flags */ /** Serialization */ template diff --git a/tasks/hermes_blob_mdm/include/hermes_blob_mdm/hermes_blob_mdm.h b/tasks/hermes_blob_mdm/include/hermes_blob_mdm/hermes_blob_mdm.h index 140282af1..f8a705304 100644 --- a/tasks/hermes_blob_mdm/include/hermes_blob_mdm/hermes_blob_mdm.h +++ b/tasks/hermes_blob_mdm/include/hermes_blob_mdm/hermes_blob_mdm.h @@ -186,11 +186,12 @@ class Client : public TaskLibClient { const TagId &tag_id, const BlobId &blob_id, float score, - u32 node_id) { + u32 node_id, + bool user_score) { // HILOG(kDebug, "Beginning REORGANIZE (task_node={})", task_node); HRUN_CLIENT->ConstructTask( task, task_node, DomainId::GetNode(blob_id.node_id_), id_, - tag_id, blob_id, score, node_id); + tag_id, blob_id, score, node_id, user_score); } HRUN_TASK_NODE_PUSH_ROOT(ReorganizeBlob); diff --git a/tasks/hermes_blob_mdm/include/hermes_blob_mdm/hermes_blob_mdm_tasks.h b/tasks/hermes_blob_mdm/include/hermes_blob_mdm/hermes_blob_mdm_tasks.h index 3852fb921..806340fc1 100644 --- a/tasks/hermes_blob_mdm/include/hermes_blob_mdm/hermes_blob_mdm_tasks.h +++ b/tasks/hermes_blob_mdm/include/hermes_blob_mdm/hermes_blob_mdm_tasks.h @@ -226,6 +226,7 @@ class PutBlobPhase { #define HERMES_BLOB_DID_CREATE BIT_OPT(u32, 4) #define HERMES_GET_BLOB_ID BIT_OPT(u32, 5) #define HERMES_HAS_DERIVED BIT_OPT(u32, 6) +#define HERMES_USER_SCORE_STATIONARY BIT_OPT(u32, 7) /** A task to put data in a blob */ struct PutBlobTask : public Task, TaskFlags { @@ -1075,6 +1076,7 @@ struct ReorganizeBlobTask : public Task, TaskFlags { IN BlobId blob_id_; IN float score_; IN u32 node_id_; + IN bool is_user_score_; TEMP int phase_ = ReorganizeBlobPhase::kGet; TEMP hipc::Pointer data_; TEMP size_t data_size_; @@ -1095,7 +1097,8 @@ struct ReorganizeBlobTask : public Task, TaskFlags { const TagId &tag_id, const BlobId &blob_id, float score, - u32 node_id) : Task(alloc) { + u32 node_id, + bool is_user_score) : Task(alloc) { // Initialize task task_node_ = task_node; lane_hash_ = blob_id.hash_; @@ -1110,6 +1113,7 @@ struct ReorganizeBlobTask : public Task, TaskFlags { blob_id_ = blob_id; score_ = score; node_id_ = node_id; + is_user_score_ = is_user_score; } /** (De)serialize message call */ diff --git a/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc b/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc index 0f5a3d151..4d82a6c72 100644 --- a/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc +++ b/tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc @@ -134,7 +134,36 @@ class Server : public TaskLib { if (access_score > 1) { access_score = 1; } - return std::max(freq_score, access_score); + float data_score = std::max(freq_score, access_score); + float user_score = blob_info.user_score_; + if (!blob_info.flags_.Any(HERMES_USER_SCORE_STATIONARY)) { + user_score *= data_score; + } + return std::max(data_score, user_score); + } + + /** Check if blob should be reorganized */ + template + bool ShouldReorganize(BlobInfo &blob_info, + float score, + TaskNode &task_node) { + for (BufferInfo &buf : blob_info.buffers_) { + TargetInfo &target = *target_map_[buf.tid_]; + Histogram &hist = target.monitor_task_->score_hist_; + if constexpr(UPDATE_SCORE) { + target.AsyncUpdateScore(task_node + 1, + blob_info.score_, score); + } + u32 percentile = hist.GetPercentile(score); + size_t rem_cap = target.monitor_task_->rem_cap_; + size_t max_cap = target.max_cap_; + if (rem_cap < max_cap / 10) { + if (percentile < 10 || percentile > 90) { + return true; + } + } + } + return false; } /** @@ -150,22 +179,11 @@ class Server : public TaskLib { BlobInfo &blob_info = it.second; // Update blob scores float new_score = MakeScore(blob_info, now); - bool reorganize = false; - for (BufferInfo &buf : blob_info.buffers_) { - TargetInfo &target = *target_map_[buf.tid_]; - Histogram &hist = target.monitor_task_->score_hist_; - target.AsyncUpdateScore(task->task_node_ + 1, - blob_info.score_, new_score); - u32 percentile = hist.GetPercentile(blob_info.score_); - if (percentile < 10 || percentile > 90) { - reorganize = true; - } - } - if (reorganize) { + if (ShouldReorganize(blob_info, new_score, task->task_node_)) { blob_mdm_.AsyncReorganizeBlob(task->task_node_ + 1, blob_info.tag_id_, blob_info.blob_id_, - new_score, 0); + new_score, 0, false); } blob_info.access_freq_ = 0; blob_info.score_ = new_score; @@ -663,6 +681,17 @@ class Server : public TaskLib { return; } BlobInfo &blob_info = it->second; + if (task->is_user_score_) { + blob_info.user_score_ = task->score_; + blob_info.score_ = std::max(blob_info.user_score_, + blob_info.score_); + } else { + blob_info.score_ = task->score_; + } + if (!ShouldReorganize(blob_info, task->score_, task->task_node_)) { + task->SetModuleComplete(); + return; + } task->data_ = HRUN_CLIENT->AllocateBuffer(blob_info.blob_size_).shm_; task->data_size_ = blob_info.blob_size_; task->get_task_ = blob_mdm_.AsyncGetBlob(task->task_node_ + 1,