From 8839d9e59729705a263adf3115c4800eac03eb5e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Svensson?= Date: Mon, 13 Jan 2025 22:09:34 +0100 Subject: [PATCH 01/12] Handle empty addresses in CLUSTER NODES responses MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Treat an empty ip string as it means the same endpoint the current command was sent to, as described in docs. Signed-off-by: Björn Svensson --- hircluster.c | 78 +++++++++++++++++++++++++++++---------------- hircluster.h | 2 -- hiredis_cluster.def | 1 - 3 files changed, 50 insertions(+), 31 deletions(-) diff --git a/hircluster.c b/hircluster.c index aec5fa3..b29185c 100644 --- a/hircluster.c +++ b/hircluster.c @@ -516,8 +516,8 @@ static redisClusterNode *node_get_with_slots(redisClusterContext *cc, * Return a new node with the "cluster nodes" command reply. */ static redisClusterNode *node_get_with_nodes(redisClusterContext *cc, - sds *node_infos, int info_count, - uint8_t role) { + redisContext *c, sds *node_infos, + int info_count, uint8_t role) { char *p = NULL; redisClusterNode *node = NULL; @@ -530,12 +530,12 @@ static redisClusterNode *node_get_with_nodes(redisClusterContext *cc, goto oom; } + node->role = role; if (role == REDIS_ROLE_MASTER) { node->slots = listCreate(); if (node->slots == NULL) { goto oom; } - node->slots->free = listClusterSlotDestructor; } @@ -548,27 +548,50 @@ static redisClusterNode *node_get_with_nodes(redisClusterContext *cc, if ((p = strchr(node_infos[1], PORT_CPORT_SEPARATOR)) != NULL) { sdsrange(node_infos[1], 0, p - node_infos[1] - 1 /* skip @ */); } - node->addr = node_infos[1]; - node_infos[1] = NULL; /* Ownership moved */ - node->role = role; - - /* Get the ip part */ - if ((p = strrchr(node->addr, IP_PORT_SEPARATOR)) == NULL) { + /* Find the port separator. */ + if ((p = strrchr(node_infos[1], IP_PORT_SEPARATOR)) == NULL) { __redisClusterSetError( cc, REDIS_ERR_OTHER, "server address is incorrect, port separator missing."); goto error; } - node->host = sdsnewlen(node->addr, p - node->addr); - if (node->host == NULL) { - goto oom; + + /* Get the port (skip the found port separator). */ + int port = hi_atoi(p + 1, strlen(p + 1)); + if (port < 1 || port > UINT16_MAX) { + __redisClusterSetError(cc, REDIS_ERR_OTHER, "Invalid port"); + goto error; } - p++; // remove found separator character + node->port = port; - /* Get the port part */ - node->port = hi_atoi(p, strlen(p)); + /* Check that we received an ip/host address, i.e. the field does not + * start with the found port separator. */ + if (node_infos[1] != p) { + node->addr = node_infos[1]; + node_infos[1] = NULL; /* Ownership moved */ + node->host = sdsnewlen(node->addr, p - node->addr); + if (node->host == NULL) { + goto oom; + } + } else { + /* We received an ip/host that is an empty string. According to the docs + * we can treat this as it means the same address we sent this command to. */ + node->host = sdsnew(c->tcp.host); + if (node->host == NULL) { + goto oom; + } + /* Create a new addr field using correct host:port */ + node->addr = sdsnew(node->host); + if (node->addr == NULL) { + goto oom; + } + node->addr = sdscatfmt(node->addr, ":%i", node->port); + if (node->addr == NULL) { + goto oom; + } + } return node; oom: @@ -918,8 +941,8 @@ dict *parse_cluster_slots(redisClusterContext *cc, redisReply *reply, /** * Parse the "cluster nodes" command reply to nodes dict. */ -dict *parse_cluster_nodes(redisClusterContext *cc, char *str, int str_len, - int flags) { +static dict *parse_cluster_nodes(redisClusterContext *cc, redisContext *c, + redisReply *reply) { int ret; dict *nodes = NULL; dict *nodes_name = NULL; @@ -939,8 +962,8 @@ dict *parse_cluster_nodes(redisClusterContext *cc, char *str, int str_len, goto oom; } - start = str; - end = start + str_len; + start = reply->str; + end = start + reply->len; line_start = start; @@ -983,7 +1006,7 @@ dict *parse_cluster_nodes(redisClusterContext *cc, char *str, int str_len, // add master node if (role_len >= 6 && memcmp(role, "master", 6) == 0) { - master = node_get_with_nodes(cc, part, count_part, + master = node_get_with_nodes(cc, c, part, count_part, REDIS_ROLE_MASTER); if (master == NULL) { goto error; @@ -1006,7 +1029,7 @@ dict *parse_cluster_nodes(redisClusterContext *cc, char *str, int str_len, goto error; } - if (flags & HIRCLUSTER_FLAG_ADD_SLAVE) { + if (cc->flags & HIRCLUSTER_FLAG_ADD_SLAVE) { ret = cluster_master_slave_mapping_with_name( cc, &nodes_name, master, master->name); if (ret != REDIS_OK) { @@ -1035,7 +1058,7 @@ dict *parse_cluster_nodes(redisClusterContext *cc, char *str, int str_len, ; } else { // add open slot for master - if (flags & HIRCLUSTER_FLAG_ADD_OPENSLOT && + if (cc->flags & HIRCLUSTER_FLAG_ADD_OPENSLOT && count_slot_start_end == 3 && sdslen(slot_start_end[0]) > 1 && sdslen(slot_start_end[1]) == 1 && @@ -1134,10 +1157,10 @@ dict *parse_cluster_nodes(redisClusterContext *cc, char *str, int str_len, } // add slave node - else if ((flags & HIRCLUSTER_FLAG_ADD_SLAVE) && + else if ((cc->flags & HIRCLUSTER_FLAG_ADD_SLAVE) && (role_len >= 5 && memcmp(role, "slave", 5) == 0)) { - slave = - node_get_with_nodes(cc, part, count_part, REDIS_ROLE_SLAVE); + slave = node_get_with_nodes(cc, c, part, count_part, + REDIS_ROLE_SLAVE); if (slave == NULL) { goto error; } @@ -1266,7 +1289,7 @@ static int handleClusterNodesReply(redisClusterContext *cc, redisContext *c) { return REDIS_ERR; } - dict *nodes = parse_cluster_nodes(cc, reply->str, reply->len, cc->flags); + dict *nodes = parse_cluster_nodes(cc, c, reply); freeReplyObject(reply); return updateNodesAndSlotmap(cc, nodes); } @@ -3857,7 +3880,6 @@ void clusterSlotsReplyCallback(redisAsyncContext *ac, void *r, void *privdata) { /* Reply callback function for CLUSTER NODES */ void clusterNodesReplyCallback(redisAsyncContext *ac, void *r, void *privdata) { - UNUSED(ac); redisReply *reply = (redisReply *)r; redisClusterAsyncContext *acc = (redisClusterAsyncContext *)privdata; acc->lastSlotmapUpdateAttempt = hi_usec_now(); @@ -3869,7 +3891,7 @@ void clusterNodesReplyCallback(redisAsyncContext *ac, void *r, void *privdata) { } redisClusterContext *cc = acc->cc; - dict *nodes = parse_cluster_nodes(cc, reply->str, reply->len, cc->flags); + dict *nodes = parse_cluster_nodes(cc, &ac->c, reply); if (updateNodesAndSlotmap(cc, nodes) != REDIS_OK) { /* Ignore failures for now */ } diff --git a/hircluster.h b/hircluster.h index 88ea42e..b92b149 100644 --- a/hircluster.h +++ b/hircluster.h @@ -291,8 +291,6 @@ int redisClusterUpdateSlotmap(redisClusterContext *cc); /* Internal functions */ redisContext *ctx_get_by_node(redisClusterContext *cc, redisClusterNode *node); -struct dict *parse_cluster_nodes(redisClusterContext *cc, char *str, - int str_len, int flags); struct dict *parse_cluster_slots(redisClusterContext *cc, redisReply *reply, int flags); diff --git a/hiredis_cluster.def b/hiredis_cluster.def index a50b3b1..ea4c5c5 100644 --- a/hiredis_cluster.def +++ b/hiredis_cluster.def @@ -6,7 +6,6 @@ dictInitIterator dictNext hiarray_get - parse_cluster_nodes parse_cluster_slots redisClusterAppendCommand redisClusterAppendCommandArgv From 90ab704c03d655102ec495f3308f9123e5615717 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Svensson?= Date: Tue, 14 Jan 2025 00:16:46 +0100 Subject: [PATCH 02/12] Handle empty addresses in CLUSTER SLOTS responses MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Treat an empty ip string as it means the same endpoint the current command was sent to, as described in docs. Signed-off-by: Björn Svensson --- hircluster.c | 78 ++++++++++++++++----------------------------- hircluster.h | 2 -- hiredis_cluster.def | 1 - 3 files changed, 28 insertions(+), 53 deletions(-) diff --git a/hircluster.c b/hircluster.c index b29185c..d48d4c6 100644 --- a/hircluster.c +++ b/hircluster.c @@ -437,37 +437,9 @@ static int authenticate(redisClusterContext *cc, redisContext *c) { * Return a new node with the "cluster slots" command reply. */ static redisClusterNode *node_get_with_slots(redisClusterContext *cc, - redisReply *host_elem, - redisReply *port_elem, + char *host, int port, uint8_t role) { - redisClusterNode *node = NULL; - - if (host_elem == NULL || port_elem == NULL) { - return NULL; - } - - if (host_elem->type != REDIS_REPLY_STRING || host_elem->len <= 0) { - __redisClusterSetError(cc, REDIS_ERR_OTHER, - "Command(cluster slots) reply error: " - "node ip is not string."); - goto error; - } - - if (port_elem->type != REDIS_REPLY_INTEGER || port_elem->integer <= 0) { - __redisClusterSetError(cc, REDIS_ERR_OTHER, - "Command(cluster slots) reply error: " - "node port is not integer."); - goto error; - } - - if (!hi_valid_port((int)port_elem->integer)) { - __redisClusterSetError(cc, REDIS_ERR_OTHER, - "Command(cluster slots) reply error: " - "node port is not valid."); - goto error; - } - - node = createRedisClusterNode(); + redisClusterNode *node = createRedisClusterNode(); if (node == NULL) { goto oom; } @@ -481,29 +453,26 @@ static redisClusterNode *node_get_with_slots(redisClusterContext *cc, node->slots->free = listClusterSlotDestructor; } - node->addr = sdsnewlen(host_elem->str, host_elem->len); + node->addr = sdsnew(host); if (node->addr == NULL) { goto oom; } - node->addr = sdscatfmt(node->addr, ":%i", port_elem->integer); + node->addr = sdscatfmt(node->addr, ":%i", port); if (node->addr == NULL) { goto oom; } - node->host = sdsnewlen(host_elem->str, host_elem->len); + node->host = sdsnew(host); if (node->host == NULL) { goto oom; } node->name = NULL; - node->port = (int)port_elem->integer; + node->port = port; node->role = role; return node; oom: __redisClusterSetError(cc, REDIS_ERR_OOM, "Out of memory"); - // passthrough - -error: if (node != NULL) { sdsfree(node->addr); sdsfree(node->host); @@ -747,8 +716,8 @@ static int cluster_master_slave_mapping_with_name(redisClusterContext *cc, /** * Parse the "cluster slots" command reply to nodes dict. */ -dict *parse_cluster_slots(redisClusterContext *cc, redisReply *reply, - int flags) { +static dict *parse_cluster_slots(redisClusterContext *cc, redisContext *c, + redisReply *reply) { int ret; cluster_slot *slot = NULL; dict *nodes = NULL; @@ -837,21 +806,30 @@ dict *parse_cluster_slots(redisClusterContext *cc, redisReply *reply, if (elem_ip == NULL || elem_port == NULL || elem_ip->type != REDIS_REPLY_STRING || - elem_port->type != REDIS_REPLY_INTEGER) { + elem_port->type != REDIS_REPLY_INTEGER || + !hi_valid_port((int)elem_port->integer)) { __redisClusterSetError( cc, REDIS_ERR_OTHER, "Command(cluster slots) reply error: " - "master ip or port is not correct."); + "node ip or port is not correct."); goto error; } + /* Get the received ip/host. According to the docs an empty string can + * be treated as it means the same address we sent this command to. */ + char *host = (elem_ip->len > 0) ? elem_ip->str : c->tcp.host; + if (host == NULL) { + goto oom; + } + int port = elem_port->integer; + // this is master. if (idx == 2) { - sds address = sdsnewlen(elem_ip->str, elem_ip->len); + sds address = sdsnew(host); if (address == NULL) { goto oom; } - address = sdscatfmt(address, ":%i", elem_port->integer); + address = sdscatfmt(address, ":%i", port); if (address == NULL) { goto oom; } @@ -871,8 +849,8 @@ dict *parse_cluster_slots(redisClusterContext *cc, redisReply *reply, break; } - master = node_get_with_slots(cc, elem_ip, elem_port, - REDIS_ROLE_MASTER); + master = + node_get_with_slots(cc, host, port, REDIS_ROLE_MASTER); if (master == NULL) { goto error; } @@ -896,9 +874,9 @@ dict *parse_cluster_slots(redisClusterContext *cc, redisReply *reply, } slot = NULL; - } else if (flags & HIRCLUSTER_FLAG_ADD_SLAVE) { - slave = node_get_with_slots(cc, elem_ip, elem_port, - REDIS_ROLE_SLAVE); + } else if (cc->flags & HIRCLUSTER_FLAG_ADD_SLAVE) { + slave = + node_get_with_slots(cc, host, port, REDIS_ROLE_SLAVE); if (slave == NULL) { goto error; } @@ -1257,7 +1235,7 @@ static int handleClusterSlotsReply(redisClusterContext *cc, redisContext *c) { return REDIS_ERR; } - dict *nodes = parse_cluster_slots(cc, reply, cc->flags); + dict *nodes = parse_cluster_slots(cc, c, reply); freeReplyObject(reply); return updateNodesAndSlotmap(cc, nodes); } @@ -3872,7 +3850,7 @@ void clusterSlotsReplyCallback(redisAsyncContext *ac, void *r, void *privdata) { } redisClusterContext *cc = acc->cc; - dict *nodes = parse_cluster_slots(cc, reply, cc->flags); + dict *nodes = parse_cluster_slots(cc, &ac->c, reply); if (updateNodesAndSlotmap(cc, nodes) != REDIS_OK) { /* Ignore failures for now */ } diff --git a/hircluster.h b/hircluster.h index b92b149..80a52ec 100644 --- a/hircluster.h +++ b/hircluster.h @@ -291,8 +291,6 @@ int redisClusterUpdateSlotmap(redisClusterContext *cc); /* Internal functions */ redisContext *ctx_get_by_node(redisClusterContext *cc, redisClusterNode *node); -struct dict *parse_cluster_slots(redisClusterContext *cc, redisReply *reply, - int flags); /* * Asynchronous API diff --git a/hiredis_cluster.def b/hiredis_cluster.def index ea4c5c5..e58ffc4 100644 --- a/hiredis_cluster.def +++ b/hiredis_cluster.def @@ -6,7 +6,6 @@ dictInitIterator dictNext hiarray_get - parse_cluster_slots redisClusterAppendCommand redisClusterAppendCommandArgv redisClusterAppendCommandToNode From f3d1beb5d3b01dd5fe9e1d778c68971d0e8effa0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Svensson?= Date: Fri, 10 Jan 2025 12:34:42 +0100 Subject: [PATCH 03/12] Add option to clusterclient_async for async initial slotmap update MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Björn Svensson --- tests/clusterclient_async.c | 39 +++++++++++++++++++++++++------------ 1 file changed, 27 insertions(+), 12 deletions(-) diff --git a/tests/clusterclient_async.c b/tests/clusterclient_async.c index 7f40f75..3946b95 100644 --- a/tests/clusterclient_async.c +++ b/tests/clusterclient_async.c @@ -51,6 +51,8 @@ char cmd_history[HISTORY_DEPTH][CMD_SIZE]; int num_running = 0; int resend_failed_cmd = 0; int send_to_all = 0; +int show_events = 0; +int async_initial_update = 0; void sendNextCommand(evutil_socket_t, short, void *); @@ -184,7 +186,17 @@ void sendNextCommand(evutil_socket_t fd, short kind, void *arg) { void eventCallback(const redisClusterContext *cc, int event, void *privdata) { (void)cc; - (void)privdata; + + if (event == HIRCLUSTER_EVENT_READY) { + /* Schedule a read from stdin and send next command. */ + redisClusterAsyncContext *acc = (redisClusterAsyncContext *)privdata; + event_base_once(acc->adapter, -1, EV_TIMEOUT, sendNextCommand, acc, + NULL); + } + + if (!show_events) + return; + char *e = NULL; switch (event) { case HIRCLUSTER_EVENT_SLOTMAP_UPDATED: @@ -219,7 +231,6 @@ void disconnectCallback(const redisAsyncContext *ac, int status) { int main(int argc, char **argv) { int use_cluster_slots = 1; // Get topology via CLUSTER SLOTS - int show_events = 0; int show_connection_events = 0; int optind; @@ -230,6 +241,8 @@ int main(int argc, char **argv) { show_events = 1; } else if (strcmp(argv[optind], "--connection-events") == 0) { show_connection_events = 1; + } else if (strcmp(argv[optind], "--async-initial-update") == 0) { + async_initial_update = 1; } else { fprintf(stderr, "Unknown argument: '%s'\n", argv[optind]); } @@ -249,28 +262,30 @@ int main(int argc, char **argv) { redisClusterSetOptionTimeout(acc->cc, timeout); redisClusterSetOptionConnectTimeout(acc->cc, timeout); redisClusterSetOptionMaxRetry(acc->cc, 1); + redisClusterSetEventCallback(acc->cc, eventCallback, acc); if (use_cluster_slots) { redisClusterSetOptionRouteUseSlots(acc->cc); } - if (show_events) { - redisClusterSetEventCallback(acc->cc, eventCallback, NULL); - } if (show_connection_events) { redisClusterAsyncSetConnectCallback(acc, connectCallback); redisClusterAsyncSetDisconnectCallback(acc, disconnectCallback); } - if (redisClusterConnect2(acc->cc) != REDIS_OK) { - printf("Connect error: %s\n", acc->cc->errstr); - exit(2); - } - struct event_base *base = event_base_new(); int status = redisClusterLibeventAttach(acc, base); assert(status == REDIS_OK); - /* Schedule a read from stdin and send next command */ - event_base_once(acc->adapter, -1, EV_TIMEOUT, sendNextCommand, acc, NULL); + if (async_initial_update) { + if (redisClusterAsyncConnect2(acc) != REDIS_OK) { + printf("Connect error: %s\n", acc->errstr); + exit(2); + } + } else { + if (redisClusterConnect2(acc->cc) != REDIS_OK) { + printf("Connect error: %s\n", acc->cc->errstr); + exit(2); + } + } event_base_dispatch(base); From e4dc746f33a46253efb6838349d2b7385dc442d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Svensson?= Date: Mon, 13 Jan 2025 11:41:48 +0100 Subject: [PATCH 04/12] Add connect-during-cluster-startup tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Björn Svensson --- tests/CMakeLists.txt | 8 +++ .../connect-during-cluster-startup-test.sh | 68 +++++++++++++++++++ ...luster-startup-using-cluster-nodes-test.sh | 68 +++++++++++++++++++ 3 files changed, 144 insertions(+) create mode 100755 tests/scripts/connect-during-cluster-startup-test.sh create mode 100755 tests/scripts/connect-during-cluster-startup-using-cluster-nodes-test.sh diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 923aed1..78866ee 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -282,3 +282,11 @@ add_test(NAME client-disconnect-without-slotmap-update-test-async COMMAND "${CMAKE_SOURCE_DIR}/tests/scripts/client-disconnect-without-slotmap-update-test.sh" "$" WORKING_DIRECTORY "${CMAKE_SOURCE_DIR}/tests/scripts/") +add_test(NAME connect-during-cluster-startup-test-async + COMMAND "${CMAKE_SOURCE_DIR}/tests/scripts/connect-during-cluster-startup-test.sh" + "$" + WORKING_DIRECTORY "${CMAKE_SOURCE_DIR}/tests/scripts/") +add_test(NAME connect-during-cluster-startup-using-cluster-nodes-test-async + COMMAND "${CMAKE_SOURCE_DIR}/tests/scripts/connect-during-cluster-startup-using-cluster-nodes-test.sh" + "$" + WORKING_DIRECTORY "${CMAKE_SOURCE_DIR}/tests/scripts/") diff --git a/tests/scripts/connect-during-cluster-startup-test.sh b/tests/scripts/connect-during-cluster-startup-test.sh new file mode 100755 index 0000000..a9ef3fa --- /dev/null +++ b/tests/scripts/connect-during-cluster-startup-test.sh @@ -0,0 +1,68 @@ +#!/bin/sh +# +# Connect to a cluster which is in the processes of starting up. +# +# The first attempt to get the slotmap will receive a reply without any +# slot information and this should result in a retry. +# +# The client is configured to use the CLUSTER SLOTS command. +# +# Usage: $0 /path/to/clusterclient-binary + +clientprog=${1:-./clusterclient_async} +testname=connect-during-cluster-startup-test + +# Sync process just waiting for server to be ready to accept connection. +perl -we 'use sigtrap "handler", sub{exit}, "CONT"; sleep 1; die "timeout"' & +syncpid=$! + +# Start simulated server. +timeout 5s ./simulated-redis.pl -p 7400 -d --sigcont $syncpid <<'EOF' & +# The initial slotmap is not covering any slots, expect a retry. +EXPECT CONNECT +EXPECT ["CLUSTER", "SLOTS"] +SEND [] + +# The node has now been delegated slots. +EXPECT ["CLUSTER", "SLOTS"] +SEND [[0, 16383, ["", 7400, "f5378fa2ad1fbd569f01ba2fe29fa8feb36cdfb8"]]] + +EXPECT ["SET", "foo", "bar"] +SEND +OK +EXPECT CLOSE +EOF +server=$! + +# Wait until server is ready to accept client connection. +wait $syncpid; + +# Run client which will fetch the initial slotmap asynchronously. +timeout 3s "$clientprog" --events --async-initial-update 127.0.0.1:7400 > "$testname.out" <<'EOF' +SET foo bar +EOF +clientexit=$? + +# Wait for server to exit. +wait $server; serverexit=$? + +# Check exit status on server. +if [ $serverexit -ne 0 ]; then + echo "Simulated server exited with status $serverexit" + exit $serverexit +fi +# Check exit status on client. +if [ $clientexit -ne 0 ]; then + echo "$clientprog exited with status $clientexit" + exit $clientexit +fi + +# Check the output from the client. +expected="Event: slotmap-updated +Event: ready +OK +Event: free-context" + +echo "$expected" | diff -u - "$testname.out" || exit 99 + +# Clean up. +rm "$testname.out" diff --git a/tests/scripts/connect-during-cluster-startup-using-cluster-nodes-test.sh b/tests/scripts/connect-during-cluster-startup-using-cluster-nodes-test.sh new file mode 100755 index 0000000..d16d57f --- /dev/null +++ b/tests/scripts/connect-during-cluster-startup-using-cluster-nodes-test.sh @@ -0,0 +1,68 @@ +#!/bin/sh +# +# Connect to a cluster which is in the processes of starting up. +# +# The first attempt to get the slotmap will receive a reply without any +# slot information and this should result in a retry. +# +# The client is configured to use the CLUSTER NODES command. +# +# Usage: $0 /path/to/clusterclient-binary + +clientprog=${1:-./clusterclient_async} +testname=connect-during-cluster-startup-using-cluster-nodes-test + +# Sync process just waiting for server to be ready to accept connection. +perl -we 'use sigtrap "handler", sub{exit}, "CONT"; sleep 1; die "timeout"' & +syncpid=$! + +# Start simulated server. +timeout 5s ./simulated-redis.pl -p 7400 -d --sigcont $syncpid <<'EOF' & +# The initial slotmap is not covering any slots, expect a retry. +EXPECT CONNECT +EXPECT ["CLUSTER", "NODES"] +SEND "8adca41945787ad1c9e725a40a43cf72bd4c6ad4 :7400@17400 myself,master - 0 0 0 connected\n" + +# The node has now been delegated slots. +EXPECT ["CLUSTER", "NODES"] +SEND "8adca41945787ad1c9e725a40a43cf72bd4c6ad4 :7400@17400 myself,master - 0 0 1 connected 0-16383\n" + +EXPECT ["SET", "foo", "bar"] +SEND +OK +EXPECT CLOSE +EOF +server=$! + +# Wait until server is ready to accept client connection. +wait $syncpid; + +# Run client which will fetch the initial slotmap asynchronously using CLUSTER NODES. +timeout 3s "$clientprog" --events --use-cluster-nodes --async-initial-update 127.0.0.1:7400 > "$testname.out" <<'EOF' +SET foo bar +EOF +clientexit=$? + +# Wait for server to exit. +wait $server; serverexit=$? + +# Check exit status on server. +if [ $serverexit -ne 0 ]; then + echo "Simulated server exited with status $serverexit" + exit $serverexit +fi +# Check exit status on client. +if [ $clientexit -ne 0 ]; then + echo "$clientprog exited with status $clientexit" + exit $clientexit +fi + +# Check the output from the client. +expected="Event: slotmap-updated +Event: ready +OK +Event: free-context" + +echo "$expected" | diff -u - "$testname.out" || exit 99 + +# Clean up. +rm "$testname.out" From 4c483a5a618d59ab8dc7b46904562785b16f71e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Svensson?= Date: Mon, 13 Jan 2025 11:44:48 +0100 Subject: [PATCH 05/12] Retry the async slotmap update when the update fails MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Björn Svensson --- hircluster.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/hircluster.c b/hircluster.c index d48d4c6..bfcf32a 100644 --- a/hircluster.c +++ b/hircluster.c @@ -3852,7 +3852,8 @@ void clusterSlotsReplyCallback(redisAsyncContext *ac, void *r, void *privdata) { redisClusterContext *cc = acc->cc; dict *nodes = parse_cluster_slots(cc, &ac->c, reply); if (updateNodesAndSlotmap(cc, nodes) != REDIS_OK) { - /* Ignore failures for now */ + /* Retry using available nodes */ + updateSlotMapAsync(acc, NULL); } } @@ -3871,7 +3872,8 @@ void clusterNodesReplyCallback(redisAsyncContext *ac, void *r, void *privdata) { redisClusterContext *cc = acc->cc; dict *nodes = parse_cluster_nodes(cc, &ac->c, reply); if (updateNodesAndSlotmap(cc, nodes) != REDIS_OK) { - /* Ignore failures for now */ + /* Retry using available nodes */ + updateSlotMapAsync(acc, NULL); } } From 2aa93d79c5cb7c3270b4516df6ba06451a9cd8aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Svensson?= Date: Mon, 13 Jan 2025 22:09:34 +0100 Subject: [PATCH 06/12] Handle empty addresses in CLUSTER NODES responses MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Treat an empty ip string as it means the same endpoint the current command was sent to, as described in docs. Use the host address that is kept in the hiredis context. Signed-off-by: Björn Svensson --- hircluster.c | 79 +++++++++++++++++++++++++++++---------------- hircluster.h | 2 -- hiredis_cluster.def | 1 - 3 files changed, 51 insertions(+), 31 deletions(-) diff --git a/hircluster.c b/hircluster.c index aec5fa3..2b0a3f4 100644 --- a/hircluster.c +++ b/hircluster.c @@ -516,8 +516,8 @@ static redisClusterNode *node_get_with_slots(redisClusterContext *cc, * Return a new node with the "cluster nodes" command reply. */ static redisClusterNode *node_get_with_nodes(redisClusterContext *cc, - sds *node_infos, int info_count, - uint8_t role) { + redisContext *c, sds *node_infos, + int info_count, uint8_t role) { char *p = NULL; redisClusterNode *node = NULL; @@ -530,12 +530,12 @@ static redisClusterNode *node_get_with_nodes(redisClusterContext *cc, goto oom; } + node->role = role; if (role == REDIS_ROLE_MASTER) { node->slots = listCreate(); if (node->slots == NULL) { goto oom; } - node->slots->free = listClusterSlotDestructor; } @@ -548,27 +548,51 @@ static redisClusterNode *node_get_with_nodes(redisClusterContext *cc, if ((p = strchr(node_infos[1], PORT_CPORT_SEPARATOR)) != NULL) { sdsrange(node_infos[1], 0, p - node_infos[1] - 1 /* skip @ */); } - node->addr = node_infos[1]; - node_infos[1] = NULL; /* Ownership moved */ - node->role = role; - - /* Get the ip part */ - if ((p = strrchr(node->addr, IP_PORT_SEPARATOR)) == NULL) { + /* Find the port separator. */ + if ((p = strrchr(node_infos[1], IP_PORT_SEPARATOR)) == NULL) { __redisClusterSetError( cc, REDIS_ERR_OTHER, "server address is incorrect, port separator missing."); goto error; } - node->host = sdsnewlen(node->addr, p - node->addr); - if (node->host == NULL) { - goto oom; + + /* Get the port (skip the found port separator). */ + int port = hi_atoi(p + 1, strlen(p + 1)); + if (port < 1 || port > UINT16_MAX) { + __redisClusterSetError(cc, REDIS_ERR_OTHER, "Invalid port"); + goto error; } - p++; // remove found separator character + node->port = port; - /* Get the port part */ - node->port = hi_atoi(p, strlen(p)); + /* Check that we received an ip/host address, i.e. the field does not + * start with the found port separator. */ + if (node_infos[1] != p) { + node->addr = node_infos[1]; + node_infos[1] = NULL; /* Ownership moved */ + node->host = sdsnewlen(node->addr, p - node->addr); + if (node->host == NULL) { + goto oom; + } + } else { + /* We received an ip/host that is an empty string. According to the docs + * we can treat this as it means the same address we sent this command to. + * Get the host address from the used hiredis context. */ + node->host = sdsnew(c->tcp.host); + if (node->host == NULL) { + goto oom; + } + /* Create a new addr field using correct host:port */ + node->addr = sdsnew(node->host); + if (node->addr == NULL) { + goto oom; + } + node->addr = sdscatfmt(node->addr, ":%i", node->port); + if (node->addr == NULL) { + goto oom; + } + } return node; oom: @@ -918,8 +942,8 @@ dict *parse_cluster_slots(redisClusterContext *cc, redisReply *reply, /** * Parse the "cluster nodes" command reply to nodes dict. */ -dict *parse_cluster_nodes(redisClusterContext *cc, char *str, int str_len, - int flags) { +static dict *parse_cluster_nodes(redisClusterContext *cc, redisContext *c, + redisReply *reply) { int ret; dict *nodes = NULL; dict *nodes_name = NULL; @@ -939,8 +963,8 @@ dict *parse_cluster_nodes(redisClusterContext *cc, char *str, int str_len, goto oom; } - start = str; - end = start + str_len; + start = reply->str; + end = start + reply->len; line_start = start; @@ -983,7 +1007,7 @@ dict *parse_cluster_nodes(redisClusterContext *cc, char *str, int str_len, // add master node if (role_len >= 6 && memcmp(role, "master", 6) == 0) { - master = node_get_with_nodes(cc, part, count_part, + master = node_get_with_nodes(cc, c, part, count_part, REDIS_ROLE_MASTER); if (master == NULL) { goto error; @@ -1006,7 +1030,7 @@ dict *parse_cluster_nodes(redisClusterContext *cc, char *str, int str_len, goto error; } - if (flags & HIRCLUSTER_FLAG_ADD_SLAVE) { + if (cc->flags & HIRCLUSTER_FLAG_ADD_SLAVE) { ret = cluster_master_slave_mapping_with_name( cc, &nodes_name, master, master->name); if (ret != REDIS_OK) { @@ -1035,7 +1059,7 @@ dict *parse_cluster_nodes(redisClusterContext *cc, char *str, int str_len, ; } else { // add open slot for master - if (flags & HIRCLUSTER_FLAG_ADD_OPENSLOT && + if (cc->flags & HIRCLUSTER_FLAG_ADD_OPENSLOT && count_slot_start_end == 3 && sdslen(slot_start_end[0]) > 1 && sdslen(slot_start_end[1]) == 1 && @@ -1134,10 +1158,10 @@ dict *parse_cluster_nodes(redisClusterContext *cc, char *str, int str_len, } // add slave node - else if ((flags & HIRCLUSTER_FLAG_ADD_SLAVE) && + else if ((cc->flags & HIRCLUSTER_FLAG_ADD_SLAVE) && (role_len >= 5 && memcmp(role, "slave", 5) == 0)) { - slave = - node_get_with_nodes(cc, part, count_part, REDIS_ROLE_SLAVE); + slave = node_get_with_nodes(cc, c, part, count_part, + REDIS_ROLE_SLAVE); if (slave == NULL) { goto error; } @@ -1266,7 +1290,7 @@ static int handleClusterNodesReply(redisClusterContext *cc, redisContext *c) { return REDIS_ERR; } - dict *nodes = parse_cluster_nodes(cc, reply->str, reply->len, cc->flags); + dict *nodes = parse_cluster_nodes(cc, c, reply); freeReplyObject(reply); return updateNodesAndSlotmap(cc, nodes); } @@ -3857,7 +3881,6 @@ void clusterSlotsReplyCallback(redisAsyncContext *ac, void *r, void *privdata) { /* Reply callback function for CLUSTER NODES */ void clusterNodesReplyCallback(redisAsyncContext *ac, void *r, void *privdata) { - UNUSED(ac); redisReply *reply = (redisReply *)r; redisClusterAsyncContext *acc = (redisClusterAsyncContext *)privdata; acc->lastSlotmapUpdateAttempt = hi_usec_now(); @@ -3869,7 +3892,7 @@ void clusterNodesReplyCallback(redisAsyncContext *ac, void *r, void *privdata) { } redisClusterContext *cc = acc->cc; - dict *nodes = parse_cluster_nodes(cc, reply->str, reply->len, cc->flags); + dict *nodes = parse_cluster_nodes(cc, &ac->c, reply); if (updateNodesAndSlotmap(cc, nodes) != REDIS_OK) { /* Ignore failures for now */ } diff --git a/hircluster.h b/hircluster.h index 88ea42e..b92b149 100644 --- a/hircluster.h +++ b/hircluster.h @@ -291,8 +291,6 @@ int redisClusterUpdateSlotmap(redisClusterContext *cc); /* Internal functions */ redisContext *ctx_get_by_node(redisClusterContext *cc, redisClusterNode *node); -struct dict *parse_cluster_nodes(redisClusterContext *cc, char *str, - int str_len, int flags); struct dict *parse_cluster_slots(redisClusterContext *cc, redisReply *reply, int flags); diff --git a/hiredis_cluster.def b/hiredis_cluster.def index a50b3b1..ea4c5c5 100644 --- a/hiredis_cluster.def +++ b/hiredis_cluster.def @@ -6,7 +6,6 @@ dictInitIterator dictNext hiarray_get - parse_cluster_nodes parse_cluster_slots redisClusterAppendCommand redisClusterAppendCommandArgv From fa9ec31d6de9b4043d07ffcc8eed37a7320efbf5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Svensson?= Date: Tue, 14 Jan 2025 00:16:46 +0100 Subject: [PATCH 07/12] Handle empty addresses in CLUSTER SLOTS responses MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Treat an empty ip string as it means the same endpoint the current command was sent to, as described in docs. Use the host address from the hiredis context. Remove duplicate validation of host and port elements. Signed-off-by: Björn Svensson --- hircluster.c | 78 ++++++++++++++++----------------------------- hircluster.h | 2 -- hiredis_cluster.def | 1 - 3 files changed, 28 insertions(+), 53 deletions(-) diff --git a/hircluster.c b/hircluster.c index 2b0a3f4..4bb4ca8 100644 --- a/hircluster.c +++ b/hircluster.c @@ -437,37 +437,9 @@ static int authenticate(redisClusterContext *cc, redisContext *c) { * Return a new node with the "cluster slots" command reply. */ static redisClusterNode *node_get_with_slots(redisClusterContext *cc, - redisReply *host_elem, - redisReply *port_elem, + char *host, int port, uint8_t role) { - redisClusterNode *node = NULL; - - if (host_elem == NULL || port_elem == NULL) { - return NULL; - } - - if (host_elem->type != REDIS_REPLY_STRING || host_elem->len <= 0) { - __redisClusterSetError(cc, REDIS_ERR_OTHER, - "Command(cluster slots) reply error: " - "node ip is not string."); - goto error; - } - - if (port_elem->type != REDIS_REPLY_INTEGER || port_elem->integer <= 0) { - __redisClusterSetError(cc, REDIS_ERR_OTHER, - "Command(cluster slots) reply error: " - "node port is not integer."); - goto error; - } - - if (!hi_valid_port((int)port_elem->integer)) { - __redisClusterSetError(cc, REDIS_ERR_OTHER, - "Command(cluster slots) reply error: " - "node port is not valid."); - goto error; - } - - node = createRedisClusterNode(); + redisClusterNode *node = createRedisClusterNode(); if (node == NULL) { goto oom; } @@ -481,29 +453,26 @@ static redisClusterNode *node_get_with_slots(redisClusterContext *cc, node->slots->free = listClusterSlotDestructor; } - node->addr = sdsnewlen(host_elem->str, host_elem->len); + node->addr = sdsnew(host); if (node->addr == NULL) { goto oom; } - node->addr = sdscatfmt(node->addr, ":%i", port_elem->integer); + node->addr = sdscatfmt(node->addr, ":%i", port); if (node->addr == NULL) { goto oom; } - node->host = sdsnewlen(host_elem->str, host_elem->len); + node->host = sdsnew(host); if (node->host == NULL) { goto oom; } node->name = NULL; - node->port = (int)port_elem->integer; + node->port = port; node->role = role; return node; oom: __redisClusterSetError(cc, REDIS_ERR_OOM, "Out of memory"); - // passthrough - -error: if (node != NULL) { sdsfree(node->addr); sdsfree(node->host); @@ -748,8 +717,8 @@ static int cluster_master_slave_mapping_with_name(redisClusterContext *cc, /** * Parse the "cluster slots" command reply to nodes dict. */ -dict *parse_cluster_slots(redisClusterContext *cc, redisReply *reply, - int flags) { +static dict *parse_cluster_slots(redisClusterContext *cc, redisContext *c, + redisReply *reply) { int ret; cluster_slot *slot = NULL; dict *nodes = NULL; @@ -838,21 +807,30 @@ dict *parse_cluster_slots(redisClusterContext *cc, redisReply *reply, if (elem_ip == NULL || elem_port == NULL || elem_ip->type != REDIS_REPLY_STRING || - elem_port->type != REDIS_REPLY_INTEGER) { + elem_port->type != REDIS_REPLY_INTEGER || + !hi_valid_port((int)elem_port->integer)) { __redisClusterSetError( cc, REDIS_ERR_OTHER, "Command(cluster slots) reply error: " - "master ip or port is not correct."); + "node ip or port is not correct."); goto error; } + /* Get the received ip/host. According to the docs an empty string can + * be treated as it means the same address we sent this command to. */ + char *host = (elem_ip->len > 0) ? elem_ip->str : c->tcp.host; + if (host == NULL) { + goto oom; + } + int port = elem_port->integer; + // this is master. if (idx == 2) { - sds address = sdsnewlen(elem_ip->str, elem_ip->len); + sds address = sdsnew(host); if (address == NULL) { goto oom; } - address = sdscatfmt(address, ":%i", elem_port->integer); + address = sdscatfmt(address, ":%i", port); if (address == NULL) { goto oom; } @@ -872,8 +850,8 @@ dict *parse_cluster_slots(redisClusterContext *cc, redisReply *reply, break; } - master = node_get_with_slots(cc, elem_ip, elem_port, - REDIS_ROLE_MASTER); + master = + node_get_with_slots(cc, host, port, REDIS_ROLE_MASTER); if (master == NULL) { goto error; } @@ -897,9 +875,9 @@ dict *parse_cluster_slots(redisClusterContext *cc, redisReply *reply, } slot = NULL; - } else if (flags & HIRCLUSTER_FLAG_ADD_SLAVE) { - slave = node_get_with_slots(cc, elem_ip, elem_port, - REDIS_ROLE_SLAVE); + } else if (cc->flags & HIRCLUSTER_FLAG_ADD_SLAVE) { + slave = + node_get_with_slots(cc, host, port, REDIS_ROLE_SLAVE); if (slave == NULL) { goto error; } @@ -1258,7 +1236,7 @@ static int handleClusterSlotsReply(redisClusterContext *cc, redisContext *c) { return REDIS_ERR; } - dict *nodes = parse_cluster_slots(cc, reply, cc->flags); + dict *nodes = parse_cluster_slots(cc, c, reply); freeReplyObject(reply); return updateNodesAndSlotmap(cc, nodes); } @@ -3873,7 +3851,7 @@ void clusterSlotsReplyCallback(redisAsyncContext *ac, void *r, void *privdata) { } redisClusterContext *cc = acc->cc; - dict *nodes = parse_cluster_slots(cc, reply, cc->flags); + dict *nodes = parse_cluster_slots(cc, &ac->c, reply); if (updateNodesAndSlotmap(cc, nodes) != REDIS_OK) { /* Ignore failures for now */ } diff --git a/hircluster.h b/hircluster.h index b92b149..80a52ec 100644 --- a/hircluster.h +++ b/hircluster.h @@ -291,8 +291,6 @@ int redisClusterUpdateSlotmap(redisClusterContext *cc); /* Internal functions */ redisContext *ctx_get_by_node(redisClusterContext *cc, redisClusterNode *node); -struct dict *parse_cluster_slots(redisClusterContext *cc, redisReply *reply, - int flags); /* * Asynchronous API diff --git a/hiredis_cluster.def b/hiredis_cluster.def index ea4c5c5..e58ffc4 100644 --- a/hiredis_cluster.def +++ b/hiredis_cluster.def @@ -6,7 +6,6 @@ dictInitIterator dictNext hiarray_get - parse_cluster_slots redisClusterAppendCommand redisClusterAppendCommandArgv redisClusterAppendCommandToNode From 4b14c1cc221e99de5b5c1ca9ce96e6866003c902 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Svensson?= Date: Fri, 10 Jan 2025 12:34:42 +0100 Subject: [PATCH 08/12] Add option to clusterclient_async for async initial slotmap update MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The added flag --async-initial-update allows testcases to connect to the cluster using `redisClusterAsyncConnect2`, which will not block while performing the initial slotmap update. Signed-off-by: Björn Svensson --- tests/clusterclient_async.c | 39 +++++++++++++++++++++++++------------ 1 file changed, 27 insertions(+), 12 deletions(-) diff --git a/tests/clusterclient_async.c b/tests/clusterclient_async.c index 7f40f75..3946b95 100644 --- a/tests/clusterclient_async.c +++ b/tests/clusterclient_async.c @@ -51,6 +51,8 @@ char cmd_history[HISTORY_DEPTH][CMD_SIZE]; int num_running = 0; int resend_failed_cmd = 0; int send_to_all = 0; +int show_events = 0; +int async_initial_update = 0; void sendNextCommand(evutil_socket_t, short, void *); @@ -184,7 +186,17 @@ void sendNextCommand(evutil_socket_t fd, short kind, void *arg) { void eventCallback(const redisClusterContext *cc, int event, void *privdata) { (void)cc; - (void)privdata; + + if (event == HIRCLUSTER_EVENT_READY) { + /* Schedule a read from stdin and send next command. */ + redisClusterAsyncContext *acc = (redisClusterAsyncContext *)privdata; + event_base_once(acc->adapter, -1, EV_TIMEOUT, sendNextCommand, acc, + NULL); + } + + if (!show_events) + return; + char *e = NULL; switch (event) { case HIRCLUSTER_EVENT_SLOTMAP_UPDATED: @@ -219,7 +231,6 @@ void disconnectCallback(const redisAsyncContext *ac, int status) { int main(int argc, char **argv) { int use_cluster_slots = 1; // Get topology via CLUSTER SLOTS - int show_events = 0; int show_connection_events = 0; int optind; @@ -230,6 +241,8 @@ int main(int argc, char **argv) { show_events = 1; } else if (strcmp(argv[optind], "--connection-events") == 0) { show_connection_events = 1; + } else if (strcmp(argv[optind], "--async-initial-update") == 0) { + async_initial_update = 1; } else { fprintf(stderr, "Unknown argument: '%s'\n", argv[optind]); } @@ -249,28 +262,30 @@ int main(int argc, char **argv) { redisClusterSetOptionTimeout(acc->cc, timeout); redisClusterSetOptionConnectTimeout(acc->cc, timeout); redisClusterSetOptionMaxRetry(acc->cc, 1); + redisClusterSetEventCallback(acc->cc, eventCallback, acc); if (use_cluster_slots) { redisClusterSetOptionRouteUseSlots(acc->cc); } - if (show_events) { - redisClusterSetEventCallback(acc->cc, eventCallback, NULL); - } if (show_connection_events) { redisClusterAsyncSetConnectCallback(acc, connectCallback); redisClusterAsyncSetDisconnectCallback(acc, disconnectCallback); } - if (redisClusterConnect2(acc->cc) != REDIS_OK) { - printf("Connect error: %s\n", acc->cc->errstr); - exit(2); - } - struct event_base *base = event_base_new(); int status = redisClusterLibeventAttach(acc, base); assert(status == REDIS_OK); - /* Schedule a read from stdin and send next command */ - event_base_once(acc->adapter, -1, EV_TIMEOUT, sendNextCommand, acc, NULL); + if (async_initial_update) { + if (redisClusterAsyncConnect2(acc) != REDIS_OK) { + printf("Connect error: %s\n", acc->errstr); + exit(2); + } + } else { + if (redisClusterConnect2(acc->cc) != REDIS_OK) { + printf("Connect error: %s\n", acc->cc->errstr); + exit(2); + } + } event_base_dispatch(base); From 62cfac0249d005af10f651c3560addd3b54996ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Svensson?= Date: Mon, 13 Jan 2025 11:41:48 +0100 Subject: [PATCH 09/12] Add connect-during-cluster-startup tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Björn Svensson --- tests/CMakeLists.txt | 8 +++ .../connect-during-cluster-startup-test.sh | 68 +++++++++++++++++++ ...luster-startup-using-cluster-nodes-test.sh | 68 +++++++++++++++++++ 3 files changed, 144 insertions(+) create mode 100755 tests/scripts/connect-during-cluster-startup-test.sh create mode 100755 tests/scripts/connect-during-cluster-startup-using-cluster-nodes-test.sh diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 923aed1..78866ee 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -282,3 +282,11 @@ add_test(NAME client-disconnect-without-slotmap-update-test-async COMMAND "${CMAKE_SOURCE_DIR}/tests/scripts/client-disconnect-without-slotmap-update-test.sh" "$" WORKING_DIRECTORY "${CMAKE_SOURCE_DIR}/tests/scripts/") +add_test(NAME connect-during-cluster-startup-test-async + COMMAND "${CMAKE_SOURCE_DIR}/tests/scripts/connect-during-cluster-startup-test.sh" + "$" + WORKING_DIRECTORY "${CMAKE_SOURCE_DIR}/tests/scripts/") +add_test(NAME connect-during-cluster-startup-using-cluster-nodes-test-async + COMMAND "${CMAKE_SOURCE_DIR}/tests/scripts/connect-during-cluster-startup-using-cluster-nodes-test.sh" + "$" + WORKING_DIRECTORY "${CMAKE_SOURCE_DIR}/tests/scripts/") diff --git a/tests/scripts/connect-during-cluster-startup-test.sh b/tests/scripts/connect-during-cluster-startup-test.sh new file mode 100755 index 0000000..a9ef3fa --- /dev/null +++ b/tests/scripts/connect-during-cluster-startup-test.sh @@ -0,0 +1,68 @@ +#!/bin/sh +# +# Connect to a cluster which is in the processes of starting up. +# +# The first attempt to get the slotmap will receive a reply without any +# slot information and this should result in a retry. +# +# The client is configured to use the CLUSTER SLOTS command. +# +# Usage: $0 /path/to/clusterclient-binary + +clientprog=${1:-./clusterclient_async} +testname=connect-during-cluster-startup-test + +# Sync process just waiting for server to be ready to accept connection. +perl -we 'use sigtrap "handler", sub{exit}, "CONT"; sleep 1; die "timeout"' & +syncpid=$! + +# Start simulated server. +timeout 5s ./simulated-redis.pl -p 7400 -d --sigcont $syncpid <<'EOF' & +# The initial slotmap is not covering any slots, expect a retry. +EXPECT CONNECT +EXPECT ["CLUSTER", "SLOTS"] +SEND [] + +# The node has now been delegated slots. +EXPECT ["CLUSTER", "SLOTS"] +SEND [[0, 16383, ["", 7400, "f5378fa2ad1fbd569f01ba2fe29fa8feb36cdfb8"]]] + +EXPECT ["SET", "foo", "bar"] +SEND +OK +EXPECT CLOSE +EOF +server=$! + +# Wait until server is ready to accept client connection. +wait $syncpid; + +# Run client which will fetch the initial slotmap asynchronously. +timeout 3s "$clientprog" --events --async-initial-update 127.0.0.1:7400 > "$testname.out" <<'EOF' +SET foo bar +EOF +clientexit=$? + +# Wait for server to exit. +wait $server; serverexit=$? + +# Check exit status on server. +if [ $serverexit -ne 0 ]; then + echo "Simulated server exited with status $serverexit" + exit $serverexit +fi +# Check exit status on client. +if [ $clientexit -ne 0 ]; then + echo "$clientprog exited with status $clientexit" + exit $clientexit +fi + +# Check the output from the client. +expected="Event: slotmap-updated +Event: ready +OK +Event: free-context" + +echo "$expected" | diff -u - "$testname.out" || exit 99 + +# Clean up. +rm "$testname.out" diff --git a/tests/scripts/connect-during-cluster-startup-using-cluster-nodes-test.sh b/tests/scripts/connect-during-cluster-startup-using-cluster-nodes-test.sh new file mode 100755 index 0000000..d16d57f --- /dev/null +++ b/tests/scripts/connect-during-cluster-startup-using-cluster-nodes-test.sh @@ -0,0 +1,68 @@ +#!/bin/sh +# +# Connect to a cluster which is in the processes of starting up. +# +# The first attempt to get the slotmap will receive a reply without any +# slot information and this should result in a retry. +# +# The client is configured to use the CLUSTER NODES command. +# +# Usage: $0 /path/to/clusterclient-binary + +clientprog=${1:-./clusterclient_async} +testname=connect-during-cluster-startup-using-cluster-nodes-test + +# Sync process just waiting for server to be ready to accept connection. +perl -we 'use sigtrap "handler", sub{exit}, "CONT"; sleep 1; die "timeout"' & +syncpid=$! + +# Start simulated server. +timeout 5s ./simulated-redis.pl -p 7400 -d --sigcont $syncpid <<'EOF' & +# The initial slotmap is not covering any slots, expect a retry. +EXPECT CONNECT +EXPECT ["CLUSTER", "NODES"] +SEND "8adca41945787ad1c9e725a40a43cf72bd4c6ad4 :7400@17400 myself,master - 0 0 0 connected\n" + +# The node has now been delegated slots. +EXPECT ["CLUSTER", "NODES"] +SEND "8adca41945787ad1c9e725a40a43cf72bd4c6ad4 :7400@17400 myself,master - 0 0 1 connected 0-16383\n" + +EXPECT ["SET", "foo", "bar"] +SEND +OK +EXPECT CLOSE +EOF +server=$! + +# Wait until server is ready to accept client connection. +wait $syncpid; + +# Run client which will fetch the initial slotmap asynchronously using CLUSTER NODES. +timeout 3s "$clientprog" --events --use-cluster-nodes --async-initial-update 127.0.0.1:7400 > "$testname.out" <<'EOF' +SET foo bar +EOF +clientexit=$? + +# Wait for server to exit. +wait $server; serverexit=$? + +# Check exit status on server. +if [ $serverexit -ne 0 ]; then + echo "Simulated server exited with status $serverexit" + exit $serverexit +fi +# Check exit status on client. +if [ $clientexit -ne 0 ]; then + echo "$clientprog exited with status $clientexit" + exit $clientexit +fi + +# Check the output from the client. +expected="Event: slotmap-updated +Event: ready +OK +Event: free-context" + +echo "$expected" | diff -u - "$testname.out" || exit 99 + +# Clean up. +rm "$testname.out" From dd7222d7420a8141fe74a31ff8a0d13fbf88abc4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Svensson?= Date: Mon, 13 Jan 2025 11:44:48 +0100 Subject: [PATCH 10/12] Retry the async slotmap update when the update fails MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Björn Svensson --- hircluster.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/hircluster.c b/hircluster.c index 4bb4ca8..9416f07 100644 --- a/hircluster.c +++ b/hircluster.c @@ -3853,7 +3853,8 @@ void clusterSlotsReplyCallback(redisAsyncContext *ac, void *r, void *privdata) { redisClusterContext *cc = acc->cc; dict *nodes = parse_cluster_slots(cc, &ac->c, reply); if (updateNodesAndSlotmap(cc, nodes) != REDIS_OK) { - /* Ignore failures for now */ + /* Retry using available nodes */ + updateSlotMapAsync(acc, NULL); } } @@ -3872,7 +3873,8 @@ void clusterNodesReplyCallback(redisAsyncContext *ac, void *r, void *privdata) { redisClusterContext *cc = acc->cc; dict *nodes = parse_cluster_nodes(cc, &ac->c, reply); if (updateNodesAndSlotmap(cc, nodes) != REDIS_OK) { - /* Ignore failures for now */ + /* Retry using available nodes */ + updateSlotMapAsync(acc, NULL); } } From cc48c7401f94a10d2e729e55ebb44d4614d208e4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Svensson?= Date: Tue, 14 Jan 2025 12:23:24 +0100 Subject: [PATCH 11/12] Use short timeout when scheduling processing of next command MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Since timers are handled before filedescription events in libevent we will not give time to socket reads when sending multiple commands. Using a short time allows hiredis to read from sockets and avoids some unpredictability in tests. Signed-off-by: Björn Svensson --- tests/clusterclient_async.c | 9 ++++++--- ...o-all-nodes-during-scaledown-test-async.sh | 20 ++++--------------- 2 files changed, 10 insertions(+), 19 deletions(-) diff --git a/tests/clusterclient_async.c b/tests/clusterclient_async.c index 3946b95..e4f6494 100644 --- a/tests/clusterclient_async.c +++ b/tests/clusterclient_async.c @@ -97,8 +97,9 @@ void replyCallback(redisClusterAsyncContext *acc, void *r, void *privdata) { if (--num_running == 0) { /* Schedule a read from stdin and send next command */ + struct timeval timeout = {0, 10}; event_base_once(acc->adapter, -1, EV_TIMEOUT, sendNextCommand, acc, - NULL); + &timeout); } } @@ -169,8 +170,9 @@ void sendNextCommand(evutil_socket_t fd, short kind, void *arg) { printf("error: %s\n", acc->errstr); /* Schedule a read from stdin and handle next command. */ + struct timeval timeout = {0, 10}; event_base_once(acc->adapter, -1, EV_TIMEOUT, sendNextCommand, - acc, NULL); + acc, &timeout); } } @@ -190,8 +192,9 @@ void eventCallback(const redisClusterContext *cc, int event, void *privdata) { if (event == HIRCLUSTER_EVENT_READY) { /* Schedule a read from stdin and send next command. */ redisClusterAsyncContext *acc = (redisClusterAsyncContext *)privdata; + struct timeval timeout = {0, 10}; event_base_once(acc->adapter, -1, EV_TIMEOUT, sendNextCommand, acc, - NULL); + &timeout); } if (!show_events) diff --git a/tests/scripts/dbsize-to-all-nodes-during-scaledown-test-async.sh b/tests/scripts/dbsize-to-all-nodes-during-scaledown-test-async.sh index 25d0928..17c5112 100755 --- a/tests/scripts/dbsize-to-all-nodes-during-scaledown-test-async.sh +++ b/tests/scripts/dbsize-to-all-nodes-during-scaledown-test-async.sh @@ -81,26 +81,14 @@ if [ $clientexit -ne 0 ]; then exit $clientexit fi -# Check the output from clusterclient, which depends on timing. -# Client sends the second 'DBSIZE' to node #2 just after node #2 closes its socket. -expected1="10 +# Check the output from clusterclient. +expected="10 20 -error: Server closed the connection +error: Connection refused 11 12" -# Client sends the second 'DBSIZE' to node #2 just before node #2 closes its socket. -expected2="10 -20 -error: Connection reset by peer -11 -12" - -# The reply "11" from node #1 can come before or after the socket error from node #2. -# Therefore, we sort before comparing. -diff -u <(echo "$expected1" | sort) <(sort "$testname.out") || \ - diff -u <(echo "$expected2" | sort) <(sort "$testname.out") || \ - exit 99 +echo "$expected" | diff -u - "$testname.out" || exit 99 # Clean up rm "$testname.out" From 73ff248f41020612e9031e4bd7ce529ef8c5263a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Svensson?= Date: Tue, 14 Jan 2025 13:17:59 +0100 Subject: [PATCH 12/12] Handle NIL addresses in CLUSTER SLOTS responses MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit "A NULL value for the endpoint indicates the node has an unknown endpoint and the client should connect to the same endpoint it used to send the CLUSTER SLOTS command but with the port returned from the command." Signed-off-by: Björn Svensson --- hircluster.c | 12 ++++++-- .../connect-during-cluster-startup-test.sh | 30 ++++++++++++++++--- 2 files changed, 35 insertions(+), 7 deletions(-) diff --git a/hircluster.c b/hircluster.c index bfcf32a..95c11c0 100644 --- a/hircluster.c +++ b/hircluster.c @@ -804,8 +804,11 @@ static dict *parse_cluster_slots(redisClusterContext *cc, redisContext *c, elem_ip = elem_nodes->element[0]; elem_port = elem_nodes->element[1]; + /* Validate ip and port elements. Accept a NULL value ip (NIL type) + * since we will handle the unknown endpoint special. */ if (elem_ip == NULL || elem_port == NULL || - elem_ip->type != REDIS_REPLY_STRING || + (elem_ip->type != REDIS_REPLY_STRING && + elem_ip->type != REDIS_REPLY_NIL) || elem_port->type != REDIS_REPLY_INTEGER || !hi_valid_port((int)elem_port->integer)) { __redisClusterSetError( @@ -815,8 +818,11 @@ static dict *parse_cluster_slots(redisClusterContext *cc, redisContext *c, goto error; } - /* Get the received ip/host. According to the docs an empty string can - * be treated as it means the same address we sent this command to. */ + /* Get the received ip/host. According to the docs an unknown + * endpoint or an empty string can be treated as it means + * the same address as we sent this command to. + * An unknown endpoint has the type REDIS_REPLY_NIL and its + * length is initiated to zero. */ char *host = (elem_ip->len > 0) ? elem_ip->str : c->tcp.host; if (host == NULL) { goto oom; diff --git a/tests/scripts/connect-during-cluster-startup-test.sh b/tests/scripts/connect-during-cluster-startup-test.sh index a9ef3fa..31bd2bc 100755 --- a/tests/scripts/connect-during-cluster-startup-test.sh +++ b/tests/scripts/connect-during-cluster-startup-test.sh @@ -4,6 +4,7 @@ # # The first attempt to get the slotmap will receive a reply without any # slot information and this should result in a retry. +# The following slotmap updates tests the handling of an nil/empty IP address. # # The client is configured to use the CLUSTER SLOTS command. # @@ -18,16 +19,22 @@ syncpid=$! # Start simulated server. timeout 5s ./simulated-redis.pl -p 7400 -d --sigcont $syncpid <<'EOF' & -# The initial slotmap is not covering any slots, expect a retry. +# The initial slotmap is not covering any slots, expect a retry since it's not accepted. EXPECT CONNECT EXPECT ["CLUSTER", "SLOTS"] SEND [] -# The node has now been delegated slots. +# The node has now been delegated a few slots and should be accepted. +# Respond with an unknown endpoint (nil) to test that current connection IP is used instead. +EXPECT ["CLUSTER", "SLOTS"] +SEND *1\r\n*3\r\n:0\r\n:10\r\n*3\r\n$-1\r\n:7400\r\n$40\r\nf5378fa2ad1fbd569f01ba2fe29fa8feb36cdfb8\r\n + +# The node has now been delegated all slots. +# Use empty address to test that current connection IP is used instead. EXPECT ["CLUSTER", "SLOTS"] SEND [[0, 16383, ["", 7400, "f5378fa2ad1fbd569f01ba2fe29fa8feb36cdfb8"]]] -EXPECT ["SET", "foo", "bar"] +EXPECT ["SET", "foo", "bar3"] SEND +OK EXPECT CLOSE EOF @@ -38,7 +45,19 @@ wait $syncpid; # Run client which will fetch the initial slotmap asynchronously. timeout 3s "$clientprog" --events --async-initial-update 127.0.0.1:7400 > "$testname.out" <<'EOF' -SET foo bar +# Slot not yet handled, will trigger a slotmap update which will be throttled. +SET foo bar1 + +# Wait to avoid slotmap update throttling. +!sleep + +# A command will fail direcly, but a slotmap update is scheduled. +SET foo bar2 + +# Allow slotmap update to finish. +!sleep + +SET foo bar3 EOF clientexit=$? @@ -59,6 +78,9 @@ fi # Check the output from the client. expected="Event: slotmap-updated Event: ready +error: slot not served by any node +error: slot not served by any node +Event: slotmap-updated OK Event: free-context"