Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

skip CRC checksumming during diskless full sync with TLS enabled. #1479

Open
wants to merge 14 commits into
base: unstable
Choose a base branch
from
Open
7 changes: 7 additions & 0 deletions src/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ typedef struct ConnectionType {

/* TLS specified methods */
sds (*get_peer_cert)(struct connection *conn);

/* Miselenious */
int (*connIntegrityChecked)(void); // return 1 if connection type has built-in integrity checks
} ConnectionType;

struct connection {
Expand Down Expand Up @@ -483,4 +486,8 @@ static inline void connSetPostponeUpdateState(connection *conn, int on) {
}
}

static inline int connIsIntegrityChecked(connection *conn) {
return conn->type->connIntegrityChecked && conn->type->connIntegrityChecked();
}

#endif /* __REDIS_CONNECTION_H */
21 changes: 19 additions & 2 deletions src/rdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -3023,6 +3023,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
int error;
long long empty_keys_skipped = 0;

if (rdb->flags & RIO_FLAG_BYPASS_CRC) server.stat_total_sync_bypass_crc++;
rdb->update_cksum = rdbLoadProgressCallback;
rdb->max_processing_chunk = server.loading_process_events_interval_bytes;
if (rioRead(rdb, buf, 9) == 0) goto eoferr;
Expand Down Expand Up @@ -3367,7 +3368,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
if (rioRead(rdb, &cksum, 8) == 0) goto eoferr;
if (server.rdb_checksum && !server.skip_checksum_validation) {
memrev64ifbe(&cksum);
if (cksum == 0) {
if (cksum == 0 || (rdb->flags & RIO_FLAG_BYPASS_CRC)) {
serverLog(LL_NOTICE, "RDB file was saved with checksum disabled: no check performed.");
} else if (cksum != expected) {
serverLog(LL_WARNING,
Expand Down Expand Up @@ -3558,8 +3559,13 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) {
safe_to_exit_pipe = pipefds[0]; /* read end */
server.rdb_child_exit_pipe = pipefds[1]; /* write end */
}
/*
* For replicas with repl_state == REPLICA_STATE_WAIT_BGSAVE_END and replica_req == req:
* Check replica capabilities, if every replica supports bypassing CRC, primary should also bypass CRC, otherwise, use CRC.
*/
int bypass_crc = 1;
/* Collect the connections of the replicas we want to transfer
* the RDB to, which are i WAIT_BGSAVE_START state. */
* the RDB to, which are in WAIT_BGSAVE_START state. */
int connsnum = 0;
connection **conns = zmalloc(sizeof(connection *) * listLength(server.replicas));
server.rdb_pipe_conns = NULL;
Expand Down Expand Up @@ -3591,6 +3597,10 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) {
}
replicationSetupReplicaForFullResync(replica, getPsyncInitialOffset());
}

// do not bypass CRC on the primary if connection doesn't have integrity check or if the replica doesn't support it
if (!connIsIntegrityChecked(replica->conn) || !(replica->replica_capa & REPLICA_CAPA_BYPASS_CRC))
bypass_crc = 0;
}

/* Create the child process. */
Expand All @@ -3614,6 +3624,12 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) {
}
serverSetCpuAffinity(server.bgsave_cpulist);

if (bypass_crc) {
serverLog(LL_NOTICE, "CRC checksum is disabled for this RDB transfer");
// mark rdb object to skip CRC checksum calculations
ranshid marked this conversation as resolved.
Show resolved Hide resolved
rdb.flags |= RIO_FLAG_BYPASS_CRC;
}

retval = rdbSaveRioWithEOFMark(req, &rdb, NULL, rsi);
if (retval == C_OK && rioFlush(&rdb) == 0) retval = C_ERR;

Expand Down Expand Up @@ -3679,6 +3695,7 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) {
}
}
if (!dual_channel) close(safe_to_exit_pipe);
if (bypass_crc) server.stat_total_sync_bypass_crc++;
return (childpid == -1) ? C_ERR : C_OK;
}
return C_OK; /* Unreached. */
Expand Down
3 changes: 3 additions & 0 deletions src/rdma.c
Original file line number Diff line number Diff line change
Expand Up @@ -1817,6 +1817,9 @@ static ConnectionType CT_RDMA = {
.process_pending_data = rdmaProcessPendingData,
.postpone_update_state = postPoneUpdateRdmaState,
.update_state = updateRdmaState,

/* Miselenious */
.connIntegrityChecked = NULL,
};

ConnectionType *connectionTypeRdma(void) {
Expand Down
34 changes: 29 additions & 5 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -1245,11 +1245,13 @@ void syncCommand(client *c) {
* the primary can accurately lists replicas and their listening ports in the
* INFO output.
*
* - capa <eof|psync2|dual-channel>
* - capa <eof|psync2|dual-channel|bypass-crc>
* What is the capabilities of this instance.
* eof: supports EOF-style RDB transfer for diskless replication.
* psync2: supports PSYNC v2, so understands +CONTINUE <new repl ID>.
* dual-channel: supports full sync using rdb channel.
* bypass-crc: supports skipping CRC calculations during diskless sync using
* a connection that has integrity checks (such as TLS).
*
* - ack <offset> [fack <aofofs>]
* Replica informs the primary the amount of replication stream that it
Expand Down Expand Up @@ -1315,7 +1317,8 @@ void replconfCommand(client *c) {
/* If dual-channel is disable on this primary, treat this command as unrecognized
* replconf option. */
c->replica_capa |= REPLICA_CAPA_DUAL_CHANNEL;
}
} else if (!strcasecmp(c->argv[j + 1]->ptr, REPLICA_CAPA_BYPASS_CRC_STR))
c->replica_capa |= REPLICA_CAPA_BYPASS_CRC;
} else if (!strcasecmp(c->argv[j]->ptr, "ack")) {
/* REPLCONF ACK is used by replica to inform the primary the amount
* of replication stream that it processed so far. It is an
Expand Down Expand Up @@ -1973,6 +1976,11 @@ static int useDisklessLoad(void) {
return enabled;
}

/* Returns 1 if the replica can skip CRC calculations during full sync */
int replicationSupportBypassCRC(connection *conn, int is_replica_diskless_load, int is_primary_diskless_sync) {
return is_replica_diskless_load && is_primary_diskless_sync && connIsIntegrityChecked(conn);
}

/* Helper function for readSyncBulkPayload() to initialize tempDb
* before socket-loading the new db from primary. The tempDb may be populated
* by swapMainDbWithTempDb or freed by disklessLoadDiscardTempDb later. */
Expand Down Expand Up @@ -2252,7 +2260,14 @@ void readSyncBulkPayload(connection *conn) {

serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Loading DB in memory");
startLoading(server.repl_transfer_size, RDBFLAGS_REPLICATION, asyncLoading);

if (replicationSupportBypassCRC(conn, use_diskless_load, usemark)) {
/* We can bypass CRC checks when data is transmitted through a verified stream.
* The usemark flag indicates that the primary is streaming the data directly without
* writing it to storage.
* Similarly, the use_diskless_load flag indicates that the
* replica will load the payload directly into memory without first writing it to disk. */
ranshid marked this conversation as resolved.
Show resolved Hide resolved
rdb.flags |= RIO_FLAG_BYPASS_CRC;
}
int loadingFailed = 0;
rdbLoadingCtx loadingCtx = {.dbarray = dbarray, .functions_lib_ctx = functions_lib_ctx};
if (rdbLoadRioWithLoadingCtxScopedRdb(&rdb, RDBFLAGS_REPLICATION, &rsi, &loadingCtx) != C_OK) {
Expand Down Expand Up @@ -2494,6 +2509,7 @@ char *sendCommand(connection *conn, ...) {
while (1) {
arg = va_arg(ap, char *);
if (arg == NULL) break;
if (strcmp(arg, "") == 0) continue;
ranshid marked this conversation as resolved.
Show resolved Hide resolved
cmdargs = sdscatprintf(cmdargs, "$%zu\r\n%s\r\n", strlen(arg), arg);
argslen++;
}
Expand Down Expand Up @@ -3511,11 +3527,19 @@ void syncWithPrimary(connection *conn) {
*
* EOF: supports EOF-style RDB transfer for diskless replication.
* PSYNC2: supports PSYNC v2, so understands +CONTINUE <new repl ID>.
* BYPASS-CRC: supports skipping CRC calculations during full sync.
ranshid marked this conversation as resolved.
Show resolved Hide resolved
* Inform the primary of this capa only during diskless sync using a
* connection that has integrity checks (such as TLS).
* In disk-based sync, or non-integrity-checked connection, there is more
* concern for data corruprion so we keep this extra layer of detection.
*
* The primary will ignore capabilities it does not understand. */
int send_bypass_crc_capa = replicationSupportBypassCRC(conn, useDisklessLoad(), 1);
err = sendCommand(conn, "REPLCONF", "capa", "eof", "capa", "psync2",
server.dual_channel_replication ? "capa" : NULL,
server.dual_channel_replication ? "dual-channel" : NULL, NULL);
send_bypass_crc_capa ? "capa" : "",
send_bypass_crc_capa ? REPLICA_CAPA_BYPASS_CRC_STR : "",
server.dual_channel_replication ? "capa" : "",
server.dual_channel_replication ? "dual-channel" : "", NULL);
if (err) goto write_error;

/* Inform the primary of our (replica) version. */
Expand Down
1 change: 1 addition & 0 deletions src/rio.c
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@ void rioFreeFd(rio *r) {
/* This function can be installed both in memory and file streams when checksum
* computation is needed. */
void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len) {
if ((r->flags & RIO_FLAG_BYPASS_CRC) != 0) return; // skip CRC64 calculations
ranshid marked this conversation as resolved.
Show resolved Hide resolved
r->cksum = crc64(r->cksum, buf, len);
}

Expand Down
1 change: 1 addition & 0 deletions src/rio.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#define RIO_FLAG_READ_ERROR (1 << 0)
#define RIO_FLAG_WRITE_ERROR (1 << 1)
#define RIO_FLAG_CLOSE_ASAP (1 << 2) /* Rio was closed asynchronously during the current rio operation. */
#define RIO_FLAG_BYPASS_CRC (1 << 3)

#define RIO_TYPE_FILE (1 << 0)
#define RIO_TYPE_BUFFER (1 << 1)
Expand Down
2 changes: 2 additions & 0 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -2639,6 +2639,7 @@ void resetServerStats(void) {
server.stat_fork_rate = 0;
server.stat_total_forks = 0;
server.stat_rejected_conn = 0;
server.stat_total_sync_bypass_crc = 0;
server.stat_sync_full = 0;
server.stat_sync_partial_ok = 0;
server.stat_sync_partial_err = 0;
Expand Down Expand Up @@ -5879,6 +5880,7 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) {
"instantaneous_input_repl_kbps:%.2f\r\n", (float)getInstantaneousMetric(STATS_METRIC_NET_INPUT_REPLICATION) / 1024,
"instantaneous_output_repl_kbps:%.2f\r\n", (float)getInstantaneousMetric(STATS_METRIC_NET_OUTPUT_REPLICATION) / 1024,
"rejected_connections:%lld\r\n", server.stat_rejected_conn,
"total_sync_bypass_crc:%ld\r\n", server.stat_total_sync_bypass_crc,
ranshid marked this conversation as resolved.
Show resolved Hide resolved
"sync_full:%lld\r\n", server.stat_sync_full,
"sync_partial_ok:%lld\r\n", server.stat_sync_partial_ok,
"sync_partial_err:%lld\r\n", server.stat_sync_partial_err,
Expand Down
8 changes: 7 additions & 1 deletion src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -435,11 +435,15 @@ typedef enum {
* a replica that only wants RDB without replication buffer */
#define REPLICA_STATE_BG_RDB_LOAD 11 /* Main channel of a replica which uses dual channel replication. */

/* Replica capabilities. */
/* Replica capability flags */
#define REPLICA_CAPA_NONE 0
#define REPLICA_CAPA_EOF (1 << 0) /* Can parse the RDB EOF streaming format. */
#define REPLICA_CAPA_PSYNC2 (1 << 1) /* Supports PSYNC2 protocol. */
#define REPLICA_CAPA_DUAL_CHANNEL (1 << 2) /* Supports dual channel replication sync */
#define REPLICA_CAPA_BYPASS_CRC (1 << 3) /* Supports bypassing CRC checks for sync requests. */

/* Replica capability strings */
#define REPLICA_CAPA_BYPASS_CRC_STR "bypass-crc" /* Supports bypassing CRC checks for sync requests. */

/* Replica requirements */
#define REPLICA_REQ_NONE 0
Expand Down Expand Up @@ -1840,6 +1844,7 @@ struct valkeyServer {
double stat_fork_rate; /* Fork rate in GB/sec. */
long long stat_total_forks; /* Total count of fork. */
long long stat_rejected_conn; /* Clients rejected because of maxclients */
size_t stat_total_sync_bypass_crc; /* Total number of full syncs stated with CRC checksum bypassed */
long long stat_sync_full; /* Number of full resyncs with replicas. */
long long stat_sync_partial_ok; /* Number of accepted PSYNC requests. */
long long stat_sync_partial_err; /* Number of unaccepted PSYNC requests. */
Expand Down Expand Up @@ -2115,6 +2120,7 @@ struct valkeyServer {
* when it receives an error on the replication stream */
int repl_ignore_disk_write_error; /* Configures whether replicas panic when unable to
* persist writes to AOF. */

/* The following two fields is where we store primary PSYNC replid/offset
* while the PSYNC is in progress. At the end we'll copy the fields into
* the server->primary client structure. */
Expand Down
3 changes: 3 additions & 0 deletions src/socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,9 @@ static ConnectionType CT_Socket = {
.process_pending_data = NULL,
.postpone_update_state = NULL,
.update_state = NULL,

/* Miselenious */
ranshid marked this conversation as resolved.
Show resolved Hide resolved
.connIntegrityChecked = NULL,
};

int connBlock(connection *conn) {
Expand Down
7 changes: 7 additions & 0 deletions src/tls.c
Original file line number Diff line number Diff line change
Expand Up @@ -814,6 +814,10 @@ static int connTLSListen(connListener *listener) {
return listenToPort(listener);
}

static int connTLSIsIntegrityChecked(void) {
return 1;
}

static void connTLSCloseListener(connListener *listener) {
connectionTypeTcp()->closeListener(listener);
}
Expand Down Expand Up @@ -1186,6 +1190,9 @@ static ConnectionType CT_TLS = {

/* TLS specified methods */
.get_peer_cert = connTLSGetPeerCert,

/* Miselenious */
ranshid marked this conversation as resolved.
Show resolved Hide resolved
.connIntegrityChecked = connTLSIsIntegrityChecked,
};

int RedisRegisterConnectionTypeTLS(void) {
Expand Down
3 changes: 3 additions & 0 deletions src/unix.c
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,9 @@ static ConnectionType CT_Unix = {
.process_pending_data = NULL,
.postpone_update_state = NULL,
.update_state = NULL,

/* Miselenious */
ranshid marked this conversation as resolved.
Show resolved Hide resolved
.connIntegrityChecked = NULL,
};

int RedisRegisterConnectionTypeUnix(void) {
Expand Down
37 changes: 37 additions & 0 deletions tests/integration/bypass-crc.tcl
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
start_server {tags {"repl tls"} overrides {save {}}} {
ranshid marked this conversation as resolved.
Show resolved Hide resolved
set primary [srv 0 client]
set primary_host [srv 0 host]
set primary_port [srv 0 port]
set primary_bypassed_crc_counter 0
foreach mds {no yes} {
foreach sdl {disabled on-empty-db swapdb flush-before-load} {
ranshid marked this conversation as resolved.
Show resolved Hide resolved
test "Bypass CRC sync - tls:$::tls, repl_diskless_sync:$mds, repl_diskless_load:$sdl" {
$primary config set repl-diskless-sync $mds
start_server {overrides {save {}}} {
set replica [srv 0 client]
$replica config set repl-diskless-load $sdl
$replica replicaof $primary_host $primary_port

wait_for_condition 50 100 {
[string match {*master_link_status:up*} [$replica info replication]]
ranshid marked this conversation as resolved.
Show resolved Hide resolved
} else {
fail "Replication not started"
}

set replica_bypassing_crc_count [string match {*total_sync_bypass_crc:1*} [$replica info stats]]
set stats [regexp -inline {total_sync_bypass_crc:(\d+)} [$primary info stats]]
set primary_bypass_crc_count [lindex $stats 1]
ranshid marked this conversation as resolved.
Show resolved Hide resolved

if {$sdl eq "disabled" || $mds eq "no" || !$::tls} {
assert_equal $primary_bypassed_crc_counter $primary_bypass_crc_count "Primary should not bypass CRC in this scenario"
assert_equal 0 $replica_bypassing_crc_count "Replica should not bypass CRC in this scenario"
} else {
incr primary_bypassed_crc_counter
assert_equal $primary_bypassed_crc_counter $primary_bypass_crc_count "Primary should bypass CRC in this scenario"
assert_equal 1 $replica_bypassing_crc_count "Replica should bypass CRC in this scenario"
}
}
}
}
}
}