Skip to content

Commit

Permalink
Fix[MQB]: send one QueueUpdateAdvisory for all Apps upon domain recon…
Browse files Browse the repository at this point in the history
…figure

Signed-off-by: Emelia Lei <wlei29@bloomberg.net>
  • Loading branch information
emelialei88 committed Jan 23, 2025
1 parent 005a33d commit 8599a93
Show file tree
Hide file tree
Showing 10 changed files with 567 additions and 18 deletions.
30 changes: 12 additions & 18 deletions src/groups/mqb/mqbblp/mqbblp_cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2854,24 +2854,18 @@ void Cluster::onDomainReconfigured(const mqbi::Domain& domain,
oldCfgAppIds,
newCfgAppIds);

// TODO: This should be one call - one QueueUpdateAdvisory for all Apps
bsl::unordered_set<bsl::string>::const_iterator it = addedIds.cbegin();
for (; it != addedIds.cend(); ++it) {
dispatcher()->execute(
bdlf::BindUtil::bind(&ClusterOrchestrator::registerAppId,
&d_clusterOrchestrator,
*it,
bsl::ref(domain)),
this);
}
for (it = removedIds.cbegin(); it != removedIds.cend(); ++it) {
dispatcher()->execute(
bdlf::BindUtil::bind(&ClusterOrchestrator::unregisterAppId,
&d_clusterOrchestrator,
*it,
bsl::ref(domain)),
this);
}
dispatcher()->execute(
bdlf::BindUtil::bind(&ClusterOrchestrator::registerAppIds,
&d_clusterOrchestrator,
addedIds,
bsl::ref(domain)),
this);
dispatcher()->execute(
bdlf::BindUtil::bind(&ClusterOrchestrator::unregisterAppIds,
&d_clusterOrchestrator,
removedIds,
bsl::ref(domain)),
this);
}

int Cluster::processCommand(mqbcmd::ClusterResult* result,
Expand Down
24 changes: 24 additions & 0 deletions src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1960,6 +1960,30 @@ void ClusterOrchestrator::unregisterAppId(bsl::string appId,
d_stateManager_mp->unregisterAppId(appId, &domain);
}

void ClusterOrchestrator::registerAppIds(
bsl::unordered_set<bsl::string> appIds,
const mqbi::Domain& domain)
{
// executed by the cluster* DISPATCHER* thread

// PRECONDITIONS
BSLS_ASSERT_SAFE(dispatcher()->inDispatcherThread(d_cluster_p));

d_stateManager_mp->registerAppIds(appIds, &domain);
}

void ClusterOrchestrator::unregisterAppIds(
bsl::unordered_set<bsl::string> appIds,
const mqbi::Domain& domain)
{
// executed by the cluster* DISPATCHER* thread

// PRECONDITIONS
BSLS_ASSERT_SAFE(dispatcher()->inDispatcherThread(d_cluster_p));

d_stateManager_mp->unregisterAppIds(appIds, &domain);
}

void ClusterOrchestrator::onPartitionPrimaryStatus(int partitionId,
int status,
unsigned int primaryLeaseId)
Expand Down
18 changes: 18 additions & 0 deletions src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,24 @@ class ClusterOrchestrator {
/// function call.
void unregisterAppId(bsl::string appId, const mqbi::Domain& domain);

/// Invoked by @bbref{mqbblp::Cluster} to register new `appIds` for
/// `domain`.
///
/// Note: As this function is dispatched from a separate thread, `appIds`
/// is taken by value to ensure it survives the lifetime of this
/// function call.
void registerAppIds(bsl::unordered_set<bsl::string> appIds,
const mqbi::Domain& domain);

/// Invoked by @bbref{mqbblp::Cluster} to unregister `appIds` for
/// `domain`.
///
/// Note: As this function is dispatched from a separate thread, `appIds`
/// is taken by value to ensure it survives the lifetime of this
/// function call.
void unregisterAppIds(bsl::unordered_set<bsl::string> appIds,
const mqbi::Domain& domain);

/// Register a queue info for the queue with the specified `uri`,
/// `partitionId`, `queueKey` and `appIdInfos`. If the specified
/// `forceUpdate` flag is true, update queue info even if it is valid
Expand Down
38 changes: 38 additions & 0 deletions src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1291,6 +1291,44 @@ void ClusterStateManager::unregisterAppId(const bsl::string& appId,
d_allocator_p);
}

void ClusterStateManager::registerAppIds(
const bsl::unordered_set<bsl::string>& appIds,
const mqbi::Domain* domain)
{
// executed by the cluster *DISPATCHER* thread

// PRECONDITIONS
BSLS_ASSERT_SAFE(dispatcher()->inDispatcherThread(d_cluster_p));
BSLS_ASSERT_SAFE(!d_cluster_p->isRemote());
BSLS_ASSERT_SAFE(domain);

mqbc::ClusterUtil::registerAppIds(d_clusterData_p,
d_clusterStateLedger_mp.get(),
*d_state_p,
appIds,
domain,
d_allocator_p);
}

void ClusterStateManager::unregisterAppIds(
const bsl::unordered_set<bsl::string>& appIds,
const mqbi::Domain* domain)
{
// executed by the cluster *DISPATCHER* thread

// PRECONDITIONS
BSLS_ASSERT_SAFE(dispatcher()->inDispatcherThread(d_cluster_p));
BSLS_ASSERT_SAFE(!d_cluster_p->isRemote());
BSLS_ASSERT_SAFE(domain);

mqbc::ClusterUtil::unregisterAppIds(d_clusterData_p,
d_clusterStateLedger_mp.get(),
*d_state_p,
appIds,
domain,
d_allocator_p);
}

void ClusterStateManager::initiateLeaderSync(bool wait)
{
// executed by the cluster *DISPATCHER* thread
Expand Down
16 changes: 16 additions & 0 deletions src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.h
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,22 @@ class ClusterStateManager BSLS_KEYWORD_FINAL
void unregisterAppId(const bsl::string& appId,
const mqbi::Domain* domain) BSLS_KEYWORD_OVERRIDE;

/// Register the specified `appIds` for all queues in the specified
/// `domain`.
///
/// THREAD: This method is invoked in the associated cluster's
/// dispatcher thread.
void registerAppIds(const bsl::unordered_set<bsl::string>& appIds,
const mqbi::Domain* domain) BSLS_KEYWORD_OVERRIDE;

/// Unregister the specified `appIds` for all queues in the specified
/// `domain`.
///
/// THREAD: This method is invoked in the associated cluster's
/// dispatcher thread.
void unregisterAppIds(const bsl::unordered_set<bsl::string>& appIds,
const mqbi::Domain* domain) BSLS_KEYWORD_OVERRIDE;

/// Invoked when a newly elected (i.e. passive) leader node initiates a
/// sync with followers before transitioning to active leader.
///
Expand Down
36 changes: 36 additions & 0 deletions src/groups/mqb/mqbc/mqbc_clusterstatemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1603,6 +1603,42 @@ void ClusterStateManager::unregisterAppId(const bsl::string& appId,
d_allocator_p);
}

void ClusterStateManager::registerAppIds(
const bsl::unordered_set<bsl::string>& appIds,
const mqbi::Domain* domain)
{
// executed by the cluster *DISPATCHER* thread

// PRECONDITIONS
BSLS_ASSERT_SAFE(dispatcher()->inDispatcherThread(d_cluster_p));
BSLS_ASSERT_SAFE(domain);

mqbc::ClusterUtil::registerAppIds(d_clusterData_p,
d_clusterStateLedger_mp.get(),
*d_state_p,
appIds,
domain,
d_allocator_p);
}

void ClusterStateManager::unregisterAppIds(
const bsl::unordered_set<bsl::string>& appIds,
const mqbi::Domain* domain)
{
// executed by the cluster *DISPATCHER* thread

// PRECONDITIONS
BSLS_ASSERT_SAFE(dispatcher()->inDispatcherThread(d_cluster_p));
BSLS_ASSERT_SAFE(domain);

mqbc::ClusterUtil::unregisterAppIds(d_clusterData_p,
d_clusterStateLedger_mp.get(),
*d_state_p,
appIds,
domain,
d_allocator_p);
}

void ClusterStateManager::initiateLeaderSync(BSLS_ANNOTATION_UNUSED bool wait)
{
// While this method could be invoked by ClusterOrchestrator as part of
Expand Down
16 changes: 16 additions & 0 deletions src/groups/mqb/mqbc/mqbc_clusterstatemanager.h
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,22 @@ class ClusterStateManager BSLS_KEYWORD_FINAL
void unregisterAppId(const bsl::string& appId,
const mqbi::Domain* domain) BSLS_KEYWORD_OVERRIDE;

/// Register the specified `appIds` for all queues in the specified
/// `domain`.
///
/// THREAD: This method is invoked in the associated cluster's
/// dispatcher thread.
void registerAppIds(const bsl::unordered_set<bsl::string>& appIds,
const mqbi::Domain* domain) BSLS_KEYWORD_OVERRIDE;

/// Unregister the specified `appIds` for all queues in the specified
/// `domain`.
///
/// THREAD: This method is invoked in the associated cluster's
/// dispatcher thread.
void unregisterAppIds(const bsl::unordered_set<bsl::string>& appIds,
const mqbi::Domain* domain) BSLS_KEYWORD_OVERRIDE;

/// Invoked when a newly elected (i.e. passive) leader node initiates a
/// sync with followers before transitioning to active leader.
///
Expand Down
Loading

0 comments on commit 8599a93

Please sign in to comment.