diff --git a/bbinc/comdb2_machine_info.h b/bbinc/comdb2_machine_info.h index 04de517f37..19a86ec34f 100644 --- a/bbinc/comdb2_machine_info.h +++ b/bbinc/comdb2_machine_info.h @@ -18,7 +18,7 @@ #define __INCLUDED_MACHINE_INFO_H struct comdb2_machine_info { - int (*machine_is_up)(const char *host); + int (*machine_is_up)(const char *host, int *drtest); int (*machine_status_init)(void); int (*machine_class)(const char *host); int (*machine_my_class)(void); diff --git a/bbinc/rtcpu.h b/bbinc/rtcpu.h index 6c2d541e73..cba86deea5 100644 --- a/bbinc/rtcpu.h +++ b/bbinc/rtcpu.h @@ -17,11 +17,11 @@ #ifndef INCLUDED_RTCPU_H #define INCLUDED_RTCPU_H -void register_rtcpu_callbacks(int (*a)(const char *), int (*b)(void), int (*c)(const char *), int (*d)(void), +void register_rtcpu_callbacks(int (*a)(const char *, int *), int (*b)(void), int (*c)(const char *), int (*d)(void), int (*e)(const char *), int (*f)(const char *), int (*g)(const char *, const char **), int (*h)(const char **), int (*i)(const char *, int *, const char ***), int (*j)(const char *, const char *)); -int machine_is_up(const char *host); +int machine_is_up(const char *host, int *isdrtest); int machine_class(const char *host); int machine_my_class(void); int machine_dc(const char *host); diff --git a/bdb/bdb_api.h b/bdb/bdb_api.h index 9af7df4e87..63a5a67a74 100644 --- a/bdb/bdb_api.h +++ b/bdb/bdb_api.h @@ -102,6 +102,7 @@ enum { BDB_CALLBACK_SERIALCHECK, BDB_CALLBACK_ADMIN_APPSOCK, BDB_CALLBACK_SYNCMODE, + BDB_CALLBACK_NODEUP_DRTEST }; enum { BDB_REPFAIL_NET, BDB_REPFAIL_TIMEOUT, BDB_REPFAIL_RMTBDB }; @@ -401,6 +402,9 @@ typedef void (*UNDOSHADOWFP)(struct bdb_osql_log *); /* Callback to return sync type */ typedef int (*SYNCMODE)(bdb_state_type *); +/* Callback to dr-test aware rtcpu */ +typedef int (*NODEUP_DRTEST)(bdb_state_type *, const char *hode, int *isdrtest); + typedef int (*BDB_CALLBACK_FP)(); bdb_callback_type *bdb_callback_create(void); void bdb_callback_set(bdb_callback_type *bdb_callback, int callback_type, diff --git a/bdb/bdb_int.h b/bdb/bdb_int.h index b35e1c30fd..f2f0c03bea 100644 --- a/bdb/bdb_int.h +++ b/bdb/bdb_int.h @@ -697,6 +697,7 @@ struct bdb_callback_tag { NODEDOWNFP nodedown_rtn; SERIALCHECK serialcheck_rtn; SYNCMODE syncmode_rtn; + NODEUP_DRTEST nodeup_drtest_rtn; }; struct waiting_for_lsn { diff --git a/bdb/callback.c b/bdb/callback.c index 7526329341..5c5fdc08b1 100644 --- a/bdb/callback.c +++ b/bdb/callback.c @@ -107,6 +107,10 @@ void bdb_callback_set(bdb_callback_type *bdb_callback, int callback_type, bdb_callback->syncmode_rtn = (SYNCMODE)callback_rtn; break; + case BDB_CALLBACK_NODEUP_DRTEST: + bdb_callback->nodeup_drtest_rtn = (NODEUP_DRTEST)callback_rtn; + break; + default: break; } diff --git a/bdb/rep.c b/bdb/rep.c index 949bb4cbc1..ec87043d2f 100644 --- a/bdb/rep.c +++ b/bdb/rep.c @@ -2563,24 +2563,22 @@ static void got_new_seqnum_from_node(bdb_state_type *bdb_state, /* new LSN from node: we may need to make the node coherent */ Pthread_mutex_lock(&(bdb_state->coherent_state_lock)); struct hostinfo *m = retrieve_hostinfo(bdb_state->repinfo->master_host_interned); + int nodeup = 0, is_drtest = 0; if (change_coherency) { if (h->coherent_state == STATE_INCOHERENT || h->coherent_state == STATE_INCOHERENT_WAIT) { - if (bdb_state->callback->nodeup_rtn) { - if ((bdb_state->callback->nodeup_rtn(bdb_state, host))) { + if (bdb_state->callback->nodeup_drtest_rtn) { + if ((nodeup = bdb_state->callback->nodeup_drtest_rtn(bdb_state, host, &is_drtest)) || is_drtest != 0) { rc = bdb_wait_for_seqnum_from_node_nowait_int( bdb_state, &m->seqnum, hostinterned); if (rc == 0) { /* prevent a node from becoming coherent for at least * downgrade_penalty seconds after an event that would * delay commits (the last downgrade) */ - if (downgrade_penalty && - (gettimeofday_ms() - h->last_downgrade_time) <= - downgrade_penalty) { - set_coherent_state(bdb_state, hostinterned, - STATE_INCOHERENT_WAIT, __func__, - __LINE__); + if ((!nodeup && is_drtest) || + (downgrade_penalty && (gettimeofday_ms() - h->last_downgrade_time) <= downgrade_penalty)) { + set_coherent_state(bdb_state, hostinterned, STATE_INCOHERENT_WAIT, __func__, __LINE__); } else { /* dont send here under lock */ set_coherent_state(bdb_state, hostinterned, STATE_COHERENT, @@ -2870,8 +2868,9 @@ static int bdb_wait_for_seqnum_from_node_int(bdb_state_type *bdb_state, if (fakeincoherent) { node_is_rtcpu = 1; } - if (bdb_state->callback->nodeup_rtn) - if (!(bdb_state->callback->nodeup_rtn(bdb_state, host->str))) + int is_drtest = 0; + if (bdb_state->callback->nodeup_drtest_rtn) + if (!(bdb_state->callback->nodeup_drtest_rtn(bdb_state, host->str, &is_drtest))) node_is_rtcpu = 1; /* dont wait if it's in a skipped state */ @@ -2892,21 +2891,26 @@ static int bdb_wait_for_seqnum_from_node_int(bdb_state_type *bdb_state, Pthread_mutex_lock(&(bdb_state->coherent_state_lock)); if (h->coherent_state == STATE_COHERENT || h->coherent_state == STATE_INCOHERENT_WAIT) { - if (h->coherent_state == STATE_COHERENT) + + int newstate = is_drtest ? STATE_INCOHERENT_WAIT : STATE_INCOHERENT; + if (h->coherent_state == STATE_COHERENT) { defer_commits(bdb_state, host->str, __func__); - h->last_downgrade_time = gettimeofday_ms(); - set_coherent_state(bdb_state, host, STATE_INCOHERENT, __func__, - __LINE__); - bdb_state->repinfo->skipsinceepoch = comdb2_time_epoch(); + } + if (h->coherent_state != newstate) { + h->last_downgrade_time = gettimeofday_ms(); + set_coherent_state(bdb_state, host, newstate, __func__, __LINE__); + bdb_state->repinfo->skipsinceepoch = comdb2_time_epoch(); + } } Pthread_mutex_unlock(&(bdb_state->coherent_state_lock)); - if (bdb_state->attr->wait_for_seqnum_trace) { - logmsg(LOGMSG_USER, PR_LSN " %s became incoherent, not waiting\n", - PARM_LSN(seqnum->lsn), host->str); + if (!is_drtest) { + if (bdb_state->attr->wait_for_seqnum_trace) { + logmsg(LOGMSG_USER, PR_LSN " %s became incoherent, not waiting\n", PARM_LSN(seqnum->lsn), host->str); + } + return -2; } - return -2; } Pthread_mutex_lock(&(bdb_state->seqnum_info->lock)); diff --git a/db/fdb_boots.c b/db/fdb_boots.c index c5882247ac..097a9964f1 100644 --- a/db/fdb_boots.c +++ b/db/fdb_boots.c @@ -178,7 +178,7 @@ static char *_get_node_initial(int nnodes, char **nodes, int *lcl, *lcl_nnodes = 0; *rescpu_nnodes = 0; for (i = 0; i < nnodes; i++) { - if (!machine_is_up(nodes[i])) { + if (!machine_is_up(nodes[i], NULL)) { continue; } @@ -255,7 +255,7 @@ static char *_get_node_next(int nnodes, char **nodes, int *lcl, char *arg, break; /* ignore rtcpu */ - if (!machine_is_up(nodes[i])) { + if (!machine_is_up(nodes[i], NULL)) { continue; } @@ -548,7 +548,7 @@ int fdb_get_rescpu_nodes(fdb_location_t *loc, int *locals) rescpued = 0; for (i = 0; i < loc->nnodes; i++) { - if (machine_is_up(loc->nodes[i])) { + if (machine_is_up(loc->nodes[i], NULL)) { rescpued++; if (loc->lcl[i] && locals) diff --git a/db/glue.c b/db/glue.c index 349fbf8d6b..3add8fd75d 100644 --- a/db/glue.c +++ b/db/glue.c @@ -3123,6 +3123,15 @@ int serial_check_callback(char *tbname, int idxnum, void *key, int keylen, int getroom_callback(void *dummy, const char *host) { return machine_dc(host); } +static int nodeup_drtest_callback(void *bdb_handle, const char *host, int *is_drtest) +{ + extern char *tcmtest_routecpu_down_node; + if (host == tcmtest_routecpu_down_node) { + return 0; + } + return machine_is_up(host, is_drtest); +} + /* callback to report whether node is up or down through rtcpu */ static int nodeup_callback(void *bdb_handle, const char *host) { @@ -3135,7 +3144,7 @@ int is_node_up(const char *host) if (host == tcmtest_routecpu_down_node) { return 0; } - return machine_is_up(host); + return machine_is_up(host, NULL); } /* callback to set dynamically configurable election settings */ @@ -3701,8 +3710,8 @@ int open_bdb_env(struct dbenv *dbenv) bdb_callback_set(dbenv->bdb_callback, BDB_CALLBACK_WHOISMASTER, new_master_callback); bdb_callback_set(dbenv->bdb_callback, BDB_CALLBACK_NODEUP, nodeup_callback); - bdb_callback_set(dbenv->bdb_callback, BDB_CALLBACK_GETROOM, - getroom_callback); + bdb_callback_set(dbenv->bdb_callback, BDB_CALLBACK_NODEUP_DRTEST, nodeup_drtest_callback); + bdb_callback_set(dbenv->bdb_callback, BDB_CALLBACK_GETROOM, getroom_callback); bdb_callback_set(dbenv->bdb_callback, BDB_CALLBACK_APPSOCK, appsock_callback); bdb_callback_set(dbenv->bdb_callback, BDB_CALLBACK_ADMIN_APPSOCK, diff --git a/db/process_message.c b/db/process_message.c index 6f2f11e07b..93ee90b933 100644 --- a/db/process_message.c +++ b/db/process_message.c @@ -2973,6 +2973,60 @@ int process_command(struct dbenv *dbenv, char *line, int lline, int st) } logmsg(LOGMSG_WARN, "machine_cache requires find, add or dump\n"); return -1; + + } else if (tokcmp(tok, ltok, "fakedr") == 0) { + /* Message-traps to verify behavior for drtesting */ + /* + * fakedr add + * fakedr del + * fakedr dump + */ + tok = segtok(line, lline, &st, <ok); + + /* Add */ + if (tokcmp(tok, ltok, "add") == 0) { + + /* Host */ + tok = segtok(line, lline, &st, <ok); + if (!ltok) { + logmsg(LOGMSG_WARN, "machine_cluster add requires host & cluster\n"); + return -1; + } + char *host = alloca(ltok + 1); + tokcpy(tok, ltok, host); + + void add_fake_drtest(const char *host); + add_fake_drtest(host); + return 0; + } + + /* Del */ + if (tokcmp(tok, ltok, "del") == 0) { + + /* Host */ + tok = segtok(line, lline, &st, <ok); + if (!ltok) { + logmsg(LOGMSG_WARN, "machine_cluster add requires host & cluster\n"); + return -1; + } + char *host = alloca(ltok + 1); + tokcpy(tok, ltok, host); + + void del_fake_drtest(const char *host); + del_fake_drtest(host); + return 0; + } + + /* Dump */ + if (tokcmp(tok, ltok, "dump") == 0) { + void dump_fake_drtest(); + dump_fake_drtest(); + return 0; + } + + logmsg(LOGMSG_WARN, "fakedr requires add, del or dump\n"); + return -1; + } else if (tokcmp(tok, ltok, "machine_cluster") == 0) { /* machine_cluster add diff --git a/net/net.c b/net/net.c index c0e0bbb51e..e409962f5f 100644 --- a/net/net.c +++ b/net/net.c @@ -2977,7 +2977,7 @@ char *net_get_osql_node(netinfo_type *netinfo_ptr) continue; /* is rtcpu-ed? */ - if (machine_is_up(ptr->host) != 1) + if (machine_is_up(ptr->host, NULL) != 1) continue; if (nnodes >= REPMAX) diff --git a/tests/rtcpu_drtest.test/Makefile b/tests/rtcpu_drtest.test/Makefile new file mode 100644 index 0000000000..b40978c33d --- /dev/null +++ b/tests/rtcpu_drtest.test/Makefile @@ -0,0 +1,10 @@ +ifeq ($(TESTSROOTDIR),) + include ../testcase.mk +else + include $(TESTSROOTDIR)/testcase.mk +endif +export CHECK_DB_AT_FINISH=0 +ifeq ($(TEST_TIMEOUT),) + export TEST_TIMEOUT=4m +endif + diff --git a/tests/rtcpu_drtest.test/README b/tests/rtcpu_drtest.test/README new file mode 100644 index 0000000000..9e7327f1ea --- /dev/null +++ b/tests/rtcpu_drtest.test/README @@ -0,0 +1,2 @@ +Verify that we wait-for-seqnum nodes which are dr-testing. This verifies correct +behavior for replicant_retry_on_not_durable 1. diff --git a/tests/rtcpu_drtest.test/lrl.options b/tests/rtcpu_drtest.test/lrl.options new file mode 100644 index 0000000000..8b86c0990a --- /dev/null +++ b/tests/rtcpu_drtest.test/lrl.options @@ -0,0 +1,2 @@ +replicant_retry_on_not_durable 1 +forbid_remote_admin 0 diff --git a/tests/rtcpu_drtest.test/runit b/tests/rtcpu_drtest.test/runit new file mode 100755 index 0000000000..81181f2cba --- /dev/null +++ b/tests/rtcpu_drtest.test/runit @@ -0,0 +1,47 @@ +#!/usr/bin/env bash +bash -n "$0" | exit 1 + +source ${TESTSROOTDIR}/tools/runit_common.sh +source ${TESTSROOTDIR}/tools/cluster_utils.sh + +[[ -z "$CLUSTER" ]] && failexit "This test requires a CLUSTER" + +export MASTER=$(get_master) + +# Create a table +$CDB2SQL_EXE $DBNAME $CDB2_OPTIONS default "create table t1(a int)" + +# Mark all nodes as offline (pretend we are dr-testing them) +for node in $CLUSTER ; do + if [[ "$node" != $MASTER ]]; then + # Tell every other node that this node is offline + for n in $CLUSTER ; do + $CDB2SQL_EXE --admin $DBNAME $CDB2_OPTIONS --host $n "exec procedure sys.cmd.send(\"fakedr add $node\")" + done + fi +done + +# Make sure we can insert records +j=0 +while [[ $j -lt 10 ]]; do + $CDB2SQL_EXE $DBNAME $CDB2_OPTIONS --host $MASTER "insert into t1 select * from generate_series(1, 1000)" + let j=j+1 +done + +# Verify that the master has the other nodes listed as 'incoherent_wait' +$CDB2SQL_EXE $DBNAME $CDB2_OPTIONS --host $MASTER "select * from comdb2_cluster" +cnt=$($CDB2SQL_EXE --tabs $DBNAME $CDB2_OPTIONS --host $MASTER "select count(*) from comdb2_cluster where is_master='N' and coherent_state='coherent'") + +[[ "$cnt" == 0 ]] || failexit "Expected all non-master nodes to be INCOHERENT_WAIT" + +# Mark all nodes as online again for test cleanup +for node in $CLUSTER ; do + if [[ "$node" != $MASTER ]]; then + # Tell every other node that this node is offline + for n in $CLUSTER ; do + $CDB2SQL_EXE --admin $DBNAME $CDB2_OPTIONS --host $n "exec procedure sys.cmd.send(\"fakedr del $node\")" + done + fi +done + +echo "Success!" diff --git a/util/rtcpu.c b/util/rtcpu.c index 67f88f411f..acb33d3564 100644 --- a/util/rtcpu.c +++ b/util/rtcpu.c @@ -34,8 +34,9 @@ #include "machclass.h" #include "logmsg.h" #include "sys_wrap.h" +#include -static int machine_is_up_default(const char *host); +static int machine_is_up_default(const char *host, int *isdrtest); static int machine_status_init(void); static int machine_class_default(const char *host); static int machine_my_class_default(void); @@ -46,7 +47,7 @@ static int machine_my_cluster_default(const char **cluster); static int machine_cluster_machs_default(const char *cluster, int *count, const char ***machs); static int machine_add_cluster_default(const char *host, const char *cluster); -static int (*machine_is_up_cb)(const char *host) = machine_is_up_default; +static int (*machine_is_up_cb)(const char *host, int *drtest) = machine_is_up_default; static int (*machine_status_init_cb)(void) = machine_status_init; static int (*machine_class_cb)(const char *host) = machine_class_default; static int (*machine_my_class_cb)(void) = machine_my_class_default; @@ -71,7 +72,7 @@ static void init_once(void) pthread_once(&once, do_once); } -void register_rtcpu_callbacks(int (*a)(const char *), int (*b)(void), int (*c)(const char *), int (*d)(void), +void register_rtcpu_callbacks(int (*a)(const char *, int *), int (*b)(void), int (*c)(const char *), int (*d)(void), int (*e)(const char *), int (*f)(const char *), int (*g)(const char *, const char **), int (*h)(const char **), int (*i)(const char *, int *, const char ***), int (*j)(const char *, const char *)) @@ -94,14 +95,14 @@ void register_rtcpu_callbacks(int (*a)(const char *), int (*b)(void), int (*c)(c machine_add_cluster_cb = j; } -int machine_is_up(const char *host) +int machine_is_up(const char *host, int *drtest) { init_once(); if (!isinterned(host)) abort(); - return machine_is_up_cb(host); + return machine_is_up_cb(host, drtest); } int machine_class(const char *host) @@ -144,9 +145,74 @@ int machine_num(const char *host) return machine_num_cb(host); } -static int machine_is_up_default(const char *host) +static hash_t *fake_drtest_hash = NULL; +static pthread_mutex_t fake_drtest_lk = PTHREAD_MUTEX_INITIALIZER; + +void add_fake_drtest(const char *host) +{ + Pthread_mutex_lock(&fake_drtest_lk); + if (!fake_drtest_hash) { + fake_drtest_hash = hash_init_str(0); + } + hash_add(fake_drtest_hash, strdup(host)); + Pthread_mutex_unlock(&fake_drtest_lk); +} + +void del_fake_drtest(const char *host) +{ + Pthread_mutex_lock(&fake_drtest_lk); + char *h; + if (fake_drtest_hash && (h = hash_find(fake_drtest_hash, host))) { + hash_del(fake_drtest_hash, h); + } + Pthread_mutex_unlock(&fake_drtest_lk); +} + +static int print_fake_drtest_hash(void *obj, void *arg) +{ + logmsg(LOGMSG_USER, "%s\n", (char *)obj); + return 0; +} + +void dump_fake_drtest(void) +{ + if (!fake_drtest_hash) { + logmsg(LOGMSG_USER, "No fake drtest hosts\n"); + return; + } + Pthread_mutex_lock(&fake_drtest_lk); + if (fake_drtest_hash) { + hash_for(fake_drtest_hash, print_fake_drtest_hash, NULL); + } + Pthread_mutex_unlock(&fake_drtest_lk); +} + +static int is_fake_drtest(const char *host) +{ + int rc = 0; + if (!fake_drtest_hash) { + return 0; + } + Pthread_mutex_lock(&fake_drtest_lk); + if (fake_drtest_hash && hash_find(fake_drtest_hash, host) != NULL) { + rc = 1; + } + Pthread_mutex_unlock(&fake_drtest_lk); + return rc; +} + +static int machine_is_up_default(const char *host, int *drtest) { - return 1; + int is_fake = is_fake_drtest(host); + + if (is_fake) { + logmsg(LOGMSG_WARN, "%s fakedr returning machine-down for host: %s\n", __func__, host); + } + + if (drtest) { + *drtest = is_fake; + } + return is_fake ? 0 : 1; } static int machine_status_init(void)