Skip to content

Commit

Permalink
Reply offload
Browse files Browse the repository at this point in the history
  • Loading branch information
alexander-shabanov committed Dec 17, 2024
1 parent 980a801 commit c2d1a60
Show file tree
Hide file tree
Showing 19 changed files with 794 additions and 121 deletions.
45 changes: 22 additions & 23 deletions src/cluster_slot_stats.c
Original file line number Diff line number Diff line change
Expand Up @@ -132,22 +132,27 @@ static void addReplySortedSlotStats(client *c, slotStatForSort slot_stats[], lon
}
}

static int canAddNetworkBytesOut(client *c) {
return server.cluster_slot_stats_enabled && server.cluster_enabled && c->slot != -1;
static int canAddNetworkBytesOut(int slot) {
return clusterSlotStatsEnabled() && slot != -1;
}

/* Accumulates egress bytes for the slot. */
void clusterSlotStatsAddNetworkBytesOutForSlot(int slot, unsigned long long net_bytes_out) {
if (!canAddNetworkBytesOut(slot)) return;

serverAssert(slot >= 0 && slot < CLUSTER_SLOTS);
server.cluster->slot_stats[slot].network_bytes_out += net_bytes_out;
}

/* Accumulates egress bytes upon sending RESP responses back to user clients. */
void clusterSlotStatsAddNetworkBytesOutForUserClient(client *c) {
if (!canAddNetworkBytesOut(c)) return;

serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
server.cluster->slot_stats[c->slot].network_bytes_out += c->net_output_bytes_curr_cmd;
clusterSlotStatsAddNetworkBytesOutForSlot(c->slot, c->net_output_bytes_curr_cmd);
}

/* Accumulates egress bytes upon sending replication stream. This only applies for primary nodes. */
static void clusterSlotStatsUpdateNetworkBytesOutForReplication(long long len) {
client *c = server.current_client;
if (c == NULL || !canAddNetworkBytesOut(c)) return;
if (c == NULL || !canAddNetworkBytesOut(c->slot)) return;

serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
serverAssert(nodeIsPrimary(server.cluster->myself));
Expand All @@ -174,24 +179,14 @@ void clusterSlotStatsDecrNetworkBytesOutForReplication(long long len) {
* This type is not aggregated, to stay consistent with server.stat_net_output_bytes aggregation.
* This function covers the internal propagation component. */
void clusterSlotStatsAddNetworkBytesOutForShardedPubSubInternalPropagation(client *c, int slot) {
/* For a blocked client, c->slot could be pre-filled.
* Thus c->slot is backed-up for restoration after aggregation is completed. */
int _slot = c->slot;
c->slot = slot;
if (!canAddNetworkBytesOut(c)) {
/* c->slot should not change as a side effect of this function,
* regardless of the function's early return condition. */
c->slot = _slot;
return;
}
if (!canAddNetworkBytesOut(slot)) return;

serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
server.cluster->slot_stats[c->slot].network_bytes_out += c->net_output_bytes_curr_cmd;
serverAssert(slot >= 0 && slot < CLUSTER_SLOTS);
server.cluster->slot_stats[slot].network_bytes_out += c->net_output_bytes_curr_cmd;

/* For sharded pubsub, the client's network bytes metrics must be reset here,
* as resetClient() is not called until subscription ends. */
c->net_output_bytes_curr_cmd = 0;
c->slot = _slot;
}

/* Adds reply for the ORDERBY variant.
Expand Down Expand Up @@ -219,8 +214,7 @@ void clusterSlotStatResetAll(void) {
* would equate to repeating the same calculation twice.
*/
static int canAddCpuDuration(client *c) {
return server.cluster_slot_stats_enabled && /* Config should be enabled. */
server.cluster_enabled && /* Cluster mode should be enabled. */
return clusterSlotStatsEnabled() &&
c->slot != -1 && /* Command should be slot specific. */
(!server.execution_nesting || /* Either; */
(server.execution_nesting && /* 1) Command should not be nested, or */
Expand Down Expand Up @@ -248,7 +242,7 @@ static int canAddNetworkBytesIn(client *c) {
* Third, blocked client is not aggregated, to avoid duplicate aggregation upon unblocking.
* Fourth, the server is not under a MULTI/EXEC transaction, to avoid duplicate aggregation of
* EXEC's 14 bytes RESP upon nested call()'s afterCommand(). */
return server.cluster_enabled && server.cluster_slot_stats_enabled && c->slot != -1 && !(c->flag.blocked) &&
return clusterSlotStatsEnabled() && c->slot != -1 && !(c->flag.blocked) &&
!server.in_exec;
}

Expand Down Expand Up @@ -343,3 +337,8 @@ void clusterSlotStatsCommand(client *c) {
addReplySubcommandSyntaxError(c);
}
}

int clusterSlotStatsEnabled(void) {
return server.cluster_slot_stats_enabled && /* Config should be enabled. */
server.cluster_enabled; /* Cluster mode should be enabled. */
}
2 changes: 2 additions & 0 deletions src/cluster_slot_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
/* General use-cases. */
void clusterSlotStatReset(int slot);
void clusterSlotStatResetAll(void);
int clusterSlotStatsEnabled(void);

/* cpu-usec metric. */
void clusterSlotStatsAddCpuDuration(client *c, ustime_t duration);
Expand All @@ -17,6 +18,7 @@ void clusterSlotStatsSetClusterMsgLength(uint32_t len);
void clusterSlotStatsResetClusterMsgLength(void);

/* network-bytes-out metric. */
void clusterSlotStatsAddNetworkBytesOutForSlot(int slot, unsigned long long net_bytes_out);
void clusterSlotStatsAddNetworkBytesOutForUserClient(client *c);
void clusterSlotStatsIncrNetworkBytesOutForReplication(long long len);
void clusterSlotStatsDecrNetworkBytesOutForReplication(long long len);
Expand Down
1 change: 1 addition & 0 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -3206,6 +3206,7 @@ standardConfig static_configs[] = {
createBoolConfig("cluster-slot-stats-enabled", NULL, MODIFIABLE_CONFIG, server.cluster_slot_stats_enabled, 0, NULL, NULL),
createBoolConfig("hide-user-data-from-log", NULL, MODIFIABLE_CONFIG, server.hide_user_data_from_log, 1, NULL, NULL),
createBoolConfig("import-mode", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, server.import_mode, 0, NULL, NULL),
createBoolConfig("reply-offload", NULL, MODIFIABLE_CONFIG, server.reply_offload_enabled, 0, NULL, NULL),

/* String Configs */
createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.acl_filename, "", NULL, NULL),
Expand Down
6 changes: 5 additions & 1 deletion src/io_threads.c
Original file line number Diff line number Diff line change
Expand Up @@ -397,9 +397,13 @@ int trySendWriteToIOThreads(client *c) {
* threads from reading data that might be invalid in their local CPU cache. */
c->io_last_reply_block = listLast(c->reply);
if (c->io_last_reply_block) {
c->io_last_bufpos = ((clientReplyBlock *)listNodeValue(c->io_last_reply_block))->used;
clientReplyBlock *block = (clientReplyBlock *)listNodeValue(c->io_last_reply_block);
c->io_last_bufpos = block->used;
/* If reply offload enabled force new header */
block->last_header = NULL;
} else {
c->io_last_bufpos = (size_t)c->bufpos;
c->last_header = NULL;
}
serverAssert(c->bufpos > 0 || c->io_last_bufpos > 0);

Expand Down
3 changes: 3 additions & 0 deletions src/memory_prefetch.c
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ static void prefetchEntry(KeyPrefetchInfo *info) {
if (hashtableIncrementalFindStep(&info->hashtab_state) == 1) {
/* Not done yet */
moveToNextKey();
/* If reply offload enabled no need to prefetch value because main thread will not access it */
} else if (server.reply_offload_enabled) {
markKeyAsdone(info);
} else {
info->state = PREFETCH_VALUE;
}
Expand Down
Loading

0 comments on commit c2d1a60

Please sign in to comment.