diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f1d23f40fa..48a94ef984 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -83,7 +83,7 @@ jobs: steps: - uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1 - name: make - run: make -j3 SERVER_CFLAGS='-Werror' + run: make -j3 all-with-unit-tests SERVER_CFLAGS='-Werror' build-32bit: runs-on: ubuntu-latest diff --git a/src/Makefile b/src/Makefile index 020b70d6d5..ae2de1c626 100644 --- a/src/Makefile +++ b/src/Makefile @@ -98,15 +98,6 @@ ifeq ($(USE_JEMALLOC),no) MALLOC=libc endif -# Some unit tests compile files a second time to get access to static functions, the "--allow-multiple-definition" flag -# allows us to do that without an error, by using the first instance of function. This behavior can also be used -# to tweak behavior of code just for unit tests. The version of ld on MacOS apparently always does this. -ifneq ($(uname_S),Darwin) - ALLOW_DUPLICATE_FLAG=-Wl,--allow-multiple-definition -else - ALLOW_DUPLICATE_FLAG= -endif - ifdef SANITIZER ifeq ($(SANITIZER),address) MALLOC=libc @@ -494,7 +485,7 @@ $(ENGINE_LIB_NAME): $(ENGINE_SERVER_OBJ) # valkey-unit-tests $(ENGINE_UNIT_TESTS): $(ENGINE_TEST_OBJ) $(ENGINE_LIB_NAME) - $(SERVER_LD) $(ALLOW_DUPLICATE_FLAG) -o $@ $^ ../deps/hiredis/libhiredis.a ../deps/lua/src/liblua.a ../deps/hdr_histogram/libhdrhistogram.a ../deps/fpconv/libfpconv.a $(FINAL_LIBS) + $(SERVER_LD) -o $@ $^ ../deps/hiredis/libhiredis.a ../deps/lua/src/liblua.a ../deps/hdr_histogram/libhdrhistogram.a ../deps/fpconv/libfpconv.a $(FINAL_LIBS) # valkey-sentinel $(ENGINE_SENTINEL_NAME): $(SERVER_NAME) diff --git a/src/cluster.h b/src/cluster.h index 81bf3a1091..15a553382a 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -96,7 +96,7 @@ int clusterNodeIsFailing(clusterNode *node); int clusterNodeIsNoFailover(clusterNode *node); char *clusterNodeGetShardId(clusterNode *node); int clusterNodeNumReplicas(clusterNode *node); -clusterNode *clusterNodeGetReplica(clusterNode *node, int slave_idx); +clusterNode *clusterNodeGetReplica(clusterNode *node, int replica_idx); clusterNode *getMigratingSlotDest(int slot); clusterNode *getImportingSlotSource(int slot); clusterNode *getNodeBySlot(int slot); diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index 70ca2ab252..fd60df19af 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -1237,7 +1237,7 @@ void clusterReset(int hard) { if (nodeIsReplica(myself)) { clusterSetNodeAsPrimary(myself); replicationUnsetPrimary(); - emptyData(-1, EMPTYDB_NO_FLAGS, NULL); + emptyData(-1, server.lazyfree_lazy_user_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS, NULL); } /* Close slots, reset manual failover state. */ @@ -2995,7 +2995,7 @@ int clusterIsValidPacket(clusterLink *link) { return 0; } - if (type == server.cluster_drop_packet_filter) { + if (type == server.cluster_drop_packet_filter || server.cluster_drop_packet_filter == -2) { serverLog(LL_WARNING, "Dropping packet that matches debug drop filter"); return 0; } @@ -3084,7 +3084,8 @@ int clusterProcessPacket(clusterLink *link) { if (!clusterIsValidPacket(link)) { clusterMsg *hdr = (clusterMsg *)link->rcvbuf; uint16_t type = ntohs(hdr->type); - if (server.debug_cluster_close_link_on_packet_drop && type == server.cluster_drop_packet_filter) { + if (server.debug_cluster_close_link_on_packet_drop && + (type == server.cluster_drop_packet_filter || server.cluster_drop_packet_filter == -2)) { freeClusterLink(link); serverLog(LL_WARNING, "Closing link for matching packet type %hu", type); return 0; @@ -4708,11 +4709,18 @@ int clusterGetReplicaRank(void) { void clusterLogCantFailover(int reason) { char *msg; static time_t lastlog_time = 0; + time_t now = time(NULL); - /* Don't log if we have the same reason for some time. */ - if (reason == server.cluster->cant_failover_reason && - time(NULL) - lastlog_time < CLUSTER_CANT_FAILOVER_RELOG_PERIOD) + /* General logging suppression if the same reason has occurred recently. */ + if (reason == server.cluster->cant_failover_reason && now - lastlog_time < CLUSTER_CANT_FAILOVER_RELOG_PERIOD) { return; + } + + /* Special case: If the failure reason is due to data age, log 10 times less frequently. */ + if (reason == server.cluster->cant_failover_reason && reason == CLUSTER_CANT_FAILOVER_DATA_AGE && + now - lastlog_time < 10 * CLUSTER_CANT_FAILOVER_RELOG_PERIOD) { + return; + } server.cluster->cant_failover_reason = reason; @@ -6090,6 +6098,8 @@ const char *clusterGetMessageTypeString(int type) { return "unknown"; } +/* Get the slot from robj and return it. If the slot is not valid, + * return -1 and send an error to the client. */ int getSlotOrReply(client *c, robj *o) { long long slot; @@ -6815,7 +6825,7 @@ int clusterCommandSpecial(client *c) { memset(slots, 0, CLUSTER_SLOTS); /* Check that all the arguments are parseable.*/ for (j = 2; j < c->argc; j++) { - if ((slot = getSlotOrReply(c, c->argv[j])) == C_ERR) { + if ((slot = getSlotOrReply(c, c->argv[j])) == -1) { zfree(slots); return 1; } @@ -6848,11 +6858,11 @@ int clusterCommandSpecial(client *c) { /* Check that all the arguments are parseable and that all the * slots are not already busy. */ for (j = 2; j < c->argc; j += 2) { - if ((startslot = getSlotOrReply(c, c->argv[j])) == C_ERR) { + if ((startslot = getSlotOrReply(c, c->argv[j])) == -1) { zfree(slots); return 1; } - if ((endslot = getSlotOrReply(c, c->argv[j + 1])) == C_ERR) { + if ((endslot = getSlotOrReply(c, c->argv[j + 1])) == -1) { zfree(slots); return 1; } diff --git a/src/cluster_slot_stats.c b/src/cluster_slot_stats.c index 284208af54..b52692bd15 100644 --- a/src/cluster_slot_stats.c +++ b/src/cluster_slot_stats.c @@ -279,8 +279,8 @@ void clusterSlotStatsCommand(client *c) { if (c->argc == 5 && !strcasecmp(c->argv[2]->ptr, "slotsrange")) { /* CLUSTER SLOT-STATS SLOTSRANGE start-slot end-slot */ int startslot, endslot; - if ((startslot = getSlotOrReply(c, c->argv[3])) == C_ERR || - (endslot = getSlotOrReply(c, c->argv[4])) == C_ERR) { + if ((startslot = getSlotOrReply(c, c->argv[3])) == -1 || + (endslot = getSlotOrReply(c, c->argv[4])) == -1) { return; } if (startslot > endslot) { diff --git a/src/commands.def b/src/commands.def index b1b39b9632..168514871c 100644 --- a/src/commands.def +++ b/src/commands.def @@ -6428,6 +6428,7 @@ struct COMMAND_STRUCT ACL_Subcommands[] = { /* BGSAVE history */ commandHistory BGSAVE_History[] = { {"3.2.2","Added the `SCHEDULE` option."}, +{"8.1.0","Added the `CANCEL` option."}, }; #endif @@ -6441,9 +6442,15 @@ commandHistory BGSAVE_History[] = { #define BGSAVE_Keyspecs NULL #endif +/* BGSAVE operation argument table */ +struct COMMAND_ARG BGSAVE_operation_Subargs[] = { +{MAKE_ARG("schedule",ARG_TYPE_PURE_TOKEN,-1,"SCHEDULE",NULL,"3.2.2",CMD_ARG_NONE,0,NULL)}, +{MAKE_ARG("cancel",ARG_TYPE_PURE_TOKEN,-1,"CANCEL",NULL,"8.1.0",CMD_ARG_NONE,0,NULL)}, +}; + /* BGSAVE argument table */ struct COMMAND_ARG BGSAVE_Args[] = { -{MAKE_ARG("schedule",ARG_TYPE_PURE_TOKEN,-1,"SCHEDULE",NULL,"3.2.2",CMD_ARG_OPTIONAL,0,NULL)}, +{MAKE_ARG("operation",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_OPTIONAL,2,NULL),.subargs=BGSAVE_operation_Subargs}, }; /********** COMMAND COUNT ********************/ @@ -11009,7 +11016,7 @@ struct COMMAND_STRUCT serverCommandTable[] = { /* server */ {MAKE_CMD("acl","A container for Access List Control commands.","Depends on subcommand.","6.0.0",CMD_DOC_NONE,NULL,NULL,"server",COMMAND_GROUP_SERVER,ACL_History,0,ACL_Tips,0,NULL,-2,CMD_SENTINEL,0,ACL_Keyspecs,0,NULL,0),.subcommands=ACL_Subcommands}, {MAKE_CMD("bgrewriteaof","Asynchronously rewrites the append-only file to disk.","O(1)","1.0.0",CMD_DOC_NONE,NULL,NULL,"server",COMMAND_GROUP_SERVER,BGREWRITEAOF_History,0,BGREWRITEAOF_Tips,0,bgrewriteaofCommand,1,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_NOSCRIPT,0,BGREWRITEAOF_Keyspecs,0,NULL,0)}, -{MAKE_CMD("bgsave","Asynchronously saves the database(s) to disk.","O(1)","1.0.0",CMD_DOC_NONE,NULL,NULL,"server",COMMAND_GROUP_SERVER,BGSAVE_History,1,BGSAVE_Tips,0,bgsaveCommand,-1,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_NOSCRIPT,0,BGSAVE_Keyspecs,0,NULL,1),.args=BGSAVE_Args}, +{MAKE_CMD("bgsave","Asynchronously saves the database(s) to disk.","O(1)","1.0.0",CMD_DOC_NONE,NULL,NULL,"server",COMMAND_GROUP_SERVER,BGSAVE_History,2,BGSAVE_Tips,0,bgsaveCommand,-1,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_NOSCRIPT,0,BGSAVE_Keyspecs,0,NULL,1),.args=BGSAVE_Args}, {MAKE_CMD("command","Returns detailed information about all commands.","O(N) where N is the total number of commands","2.8.13",CMD_DOC_NONE,NULL,NULL,"server",COMMAND_GROUP_SERVER,COMMAND_History,0,COMMAND_Tips,1,commandCommand,-1,CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,COMMAND_Keyspecs,0,NULL,0),.subcommands=COMMAND_Subcommands}, {MAKE_CMD("config","A container for server configuration commands.","Depends on subcommand.","2.0.0",CMD_DOC_NONE,NULL,NULL,"server",COMMAND_GROUP_SERVER,CONFIG_History,0,CONFIG_Tips,0,NULL,-2,0,0,CONFIG_Keyspecs,0,NULL,0),.subcommands=CONFIG_Subcommands}, {MAKE_CMD("dbsize","Returns the number of keys in the database.","O(1)","1.0.0",CMD_DOC_NONE,NULL,NULL,"server",COMMAND_GROUP_SERVER,DBSIZE_History,0,DBSIZE_Tips,2,dbsizeCommand,1,CMD_READONLY|CMD_FAST,ACL_CATEGORY_KEYSPACE,DBSIZE_Keyspecs,0,NULL,0)}, diff --git a/src/commands/bgsave.json b/src/commands/bgsave.json index f73d8a89b5..6b4688ba57 100644 --- a/src/commands/bgsave.json +++ b/src/commands/bgsave.json @@ -10,6 +10,10 @@ [ "3.2.2", "Added the `SCHEDULE` option." + ], + [ + "8.1.0", + "Added the `CANCEL` option." ] ], "command_flags": [ @@ -19,11 +23,23 @@ ], "arguments": [ { - "name": "schedule", - "token": "SCHEDULE", - "type": "pure-token", + "name": "operation", + "type": "oneof", "optional": true, - "since": "3.2.2" + "arguments": [ + { + "name": "schedule", + "token": "SCHEDULE", + "type": "pure-token", + "since": "3.2.2" + }, + { + "name": "cancel", + "token": "CANCEL", + "type": "pure-token", + "since": "8.1.0" + } + ] } ], "reply_schema": { @@ -33,6 +49,12 @@ }, { "const": "Background saving scheduled" + }, + { + "const": "Background saving cancelled" + }, + { + "const": "Scheduled background saving cancelled" } ] } diff --git a/src/config.c b/src/config.c index 560c7266bd..e72e45fb2e 100644 --- a/src/config.c +++ b/src/config.c @@ -152,6 +152,13 @@ configEnum propagation_error_behavior_enum[] = { {"panic-on-replicas", PROPAGATION_ERR_BEHAVIOR_PANIC_ON_REPLICAS}, {NULL, 0}}; +configEnum log_format_enum[] = {{"legacy", LOG_FORMAT_LEGACY}, {"logfmt", LOG_FORMAT_LOGFMT}, {NULL, 0}}; + +configEnum log_timestamp_format_enum[] = {{"legacy", LOG_TIMESTAMP_LEGACY}, + {"iso8601", LOG_TIMESTAMP_ISO8601}, + {"milliseconds", LOG_TIMESTAMP_MILLISECONDS}, + {NULL, 0}}; + /* Output buffer limits presets. */ clientBufferLimitsConfig clientBufferLimitsDefaults[CLIENT_TYPE_OBUF_COUNT] = { {0, 0, 0}, /* normal */ @@ -622,9 +629,6 @@ void loadServerConfigFromString(char *config) { if (server.config_hz < CONFIG_MIN_HZ) server.config_hz = CONFIG_MIN_HZ; if (server.config_hz > CONFIG_MAX_HZ) server.config_hz = CONFIG_MAX_HZ; - /* To ensure backward compatibility when io_threads_num is according to the previous maximum of 128. */ - if (server.io_threads_num > IO_THREADS_MAX_NUM) server.io_threads_num = IO_THREADS_MAX_NUM; - sdsfreesplitres(lines, totlines); reading_config_file = 0; return; @@ -3192,11 +3196,13 @@ standardConfig static_configs[] = { createEnumConfig("propagation-error-behavior", NULL, MODIFIABLE_CONFIG, propagation_error_behavior_enum, server.propagation_error_behavior, PROPAGATION_ERR_BEHAVIOR_IGNORE, NULL, NULL), createEnumConfig("shutdown-on-sigint", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, shutdown_on_sig_enum, server.shutdown_on_sigint, 0, isValidShutdownOnSigFlags, NULL), createEnumConfig("shutdown-on-sigterm", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, shutdown_on_sig_enum, server.shutdown_on_sigterm, 0, isValidShutdownOnSigFlags, NULL), + createEnumConfig("log-format", NULL, MODIFIABLE_CONFIG, log_format_enum, server.log_format, LOG_FORMAT_LEGACY, NULL, NULL), + createEnumConfig("log-timestamp-format", NULL, MODIFIABLE_CONFIG, log_timestamp_format_enum, server.log_timestamp_format, LOG_TIMESTAMP_LEGACY, NULL, NULL), /* Integer configs */ createIntConfig("databases", NULL, IMMUTABLE_CONFIG, 1, INT_MAX, server.dbnum, 16, INTEGER_CONFIG, NULL, NULL), - createIntConfig("port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.port, 6379, INTEGER_CONFIG, NULL, updatePort), /* TCP port. */ - createIntConfig("io-threads", NULL, DEBUG_CONFIG | IMMUTABLE_CONFIG, 1, 128, server.io_threads_num, 1, INTEGER_CONFIG, NULL, NULL), /* Single threaded by default */ + createIntConfig("port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.port, 6379, INTEGER_CONFIG, NULL, updatePort), /* TCP port. */ + createIntConfig("io-threads", NULL, DEBUG_CONFIG | IMMUTABLE_CONFIG, 1, IO_THREADS_MAX_NUM, server.io_threads_num, 1, INTEGER_CONFIG, NULL, NULL), /* Single threaded by default */ createIntConfig("events-per-io-thread", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.events_per_io_thread, 2, INTEGER_CONFIG, NULL, NULL), createIntConfig("prefetch-batch-max-size", NULL, MODIFIABLE_CONFIG, 0, 128, server.prefetch_batch_max_size, 16, INTEGER_CONFIG, NULL, NULL), createIntConfig("auto-aof-rewrite-percentage", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.aof_rewrite_perc, 100, INTEGER_CONFIG, NULL, NULL), diff --git a/src/config.h b/src/config.h index 558b974f7d..3b79c5c681 100644 --- a/src/config.h +++ b/src/config.h @@ -338,7 +338,7 @@ void setcpuaffinity(const char *cpulist); #define HAVE_FADVISE #endif -#define IO_THREADS_MAX_NUM 16 +#define IO_THREADS_MAX_NUM 256 #ifndef CACHE_LINE_SIZE #if defined(__aarch64__) && defined(__APPLE__) diff --git a/src/db.c b/src/db.c index 1db85439a2..f466dbc82b 100644 --- a/src/db.c +++ b/src/db.c @@ -2418,9 +2418,9 @@ int bzmpopGetKeys(struct serverCommand *cmd, robj **argv, int argc, getKeysResul /* Helper function to extract keys from the SORT RO command. * - * SORT + * SORT_RO * - * The second argument of SORT is always a key, however an arbitrary number of + * The second argument of SORT_RO is always a key, however an arbitrary number of * keys may be accessed while doing the sort (the BY and GET args), so the * key-spec declares incomplete keys which is why we have to provide a concrete * implementation to fetch the keys. diff --git a/src/debug.c b/src/debug.c index 317498dd6a..1b7cd90ae7 100644 --- a/src/debug.c +++ b/src/debug.c @@ -432,7 +432,7 @@ void debugCommand(client *c) { " Some fields of the default behavior may be time consuming to fetch,", " and `fast` can be passed to avoid fetching them.", "DROP-CLUSTER-PACKET-FILTER ", - " Drop all packets that match the filtered type. Set to -1 allow all packets.", + " Drop all packets that match the filtered type. Set to -1 allow all packets or -2 to drop all packets.", "CLOSE-CLUSTER-LINK-ON-PACKET-DROP <0|1>", " This is valid only when DROP-CLUSTER-PACKET-FILTER is set to a valid packet type.", " When set to 1, the cluster link is closed after dropping a packet based on the filter.", @@ -1023,7 +1023,7 @@ void debugCommand(client *c) { /* =========================== Crash handling ============================== */ -__attribute__((noinline)) void _serverAssert(const char *estr, const char *file, int line) { +__attribute__((noinline, weak)) void _serverAssert(const char *estr, const char *file, int line) { int new_report = bugReportStart(); serverLog(LL_WARNING, "=== %sASSERTION FAILED ===", new_report ? "" : "RECURSIVE "); serverLog(LL_WARNING, "==> %s:%d '%s' is not true", file, line, estr); diff --git a/src/io_threads.c b/src/io_threads.c index b0368cf07b..f4471b96d0 100644 --- a/src/io_threads.c +++ b/src/io_threads.c @@ -319,7 +319,7 @@ void initIOThreads(void) { int trySendReadToIOThreads(client *c) { if (server.active_io_threads_num <= 1) return C_ERR; - /* If IO thread is areadty reading, return C_OK to make sure the main thread will not handle it. */ + /* If IO thread is already reading, return C_OK to make sure the main thread will not handle it. */ if (c->io_read_state != CLIENT_IDLE) return C_OK; /* Currently, replica/master writes are not offloaded and are processed synchronously. */ if (c->flag.primary || getClientType(c) == CLIENT_TYPE_REPLICA) return C_ERR; diff --git a/src/kvstore.c b/src/kvstore.c index 10d6c73dfe..687ff879cd 100644 --- a/src/kvstore.c +++ b/src/kvstore.c @@ -54,8 +54,8 @@ struct _kvstore { int flags; dictType *dtype; dict **dicts; - long long num_dicts; - long long num_dicts_bits; + int num_dicts; + int num_dicts_bits; list *rehashing; /* List of dictionaries in this kvstore that are currently rehashing. */ int resize_cursor; /* Cron job uses this cursor to gradually resize dictionaries (only used if num_dicts > 1). */ int allocated_dicts; /* The number of allocated dicts. */ diff --git a/src/networking.c b/src/networking.c index c765c3cc4f..5c4dcdae00 100644 --- a/src/networking.c +++ b/src/networking.c @@ -889,8 +889,11 @@ void setDeferredAggregateLen(client *c, void *node, long length, char prefix) { } char lenstr[128]; - size_t lenstr_len = snprintf(lenstr, sizeof(lenstr), "%c%ld\r\n", prefix, length); - setDeferredReply(c, node, lenstr, lenstr_len); + lenstr[0] = prefix; + size_t lenstr_len = ll2string(lenstr + 1, sizeof(lenstr) - 1, length); + lenstr[lenstr_len + 1] = '\r'; + lenstr[lenstr_len + 2] = '\n'; + setDeferredReply(c, node, lenstr, lenstr_len + 3); } void setDeferredArrayLen(client *c, void *node, long length) { @@ -2682,6 +2685,8 @@ void processInlineBuffer(client *c) { /* Create an Object for all arguments. */ for (c->argc = 0, j = 0; j < argc; j++) { + /* Strings returned from sdssplitargs() may have unused capacity that we can trim. */ + argv[j] = sdsRemoveFreeSpace(argv[j], 1); c->argv[c->argc] = createObject(OBJ_STRING, argv[j]); c->argc++; c->argv_len_sum += sdslen(argv[j]); diff --git a/src/rdb.c b/src/rdb.c index 864b51dd1e..eb581aa181 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -1359,6 +1359,7 @@ ssize_t rdbSaveDb(rio *rdb, int dbid, int rdbflags, long *key_counter, int slot_ sdsfree(slot_info); goto werr; } + written += res; last_slot = curr_slot; sdsfree(slot_info); } @@ -3695,6 +3696,21 @@ void bgsaveCommand(client *c) { if (c->argc > 1) { if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr, "schedule")) { schedule = 1; + } else if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr, "cancel")) { + /* Terminates an in progress BGSAVE */ + if (server.child_type == CHILD_TYPE_RDB) { + /* There is an ongoing bgsave */ + serverLog(LL_NOTICE, "Background saving will be aborted due to user request"); + killRDBChild(); + addReplyStatus(c, "Background saving cancelled"); + } else if (server.rdb_bgsave_scheduled == 1) { + serverLog(LL_NOTICE, "Scheduled background saving will be cancelled due to user request"); + server.rdb_bgsave_scheduled = 0; + addReplyStatus(c, "Scheduled background saving cancelled"); + } else { + addReplyError(c, "Background saving is currently not in progress or scheduled"); + } + return; } else { addReplyErrorObject(c, shared.syntaxerr); return; @@ -3709,6 +3725,11 @@ void bgsaveCommand(client *c) { } else if (hasActiveChildProcess() || server.in_exec) { if (schedule || server.in_exec) { server.rdb_bgsave_scheduled = 1; + if (schedule) { + serverLog(LL_NOTICE, "Background saving scheduled due to user request"); + } else { + serverLog(LL_NOTICE, "Background saving scheduled to run after transaction execution"); + } addReplyStatus(c, "Background saving scheduled"); } else { addReplyError(c, "Another child process is active (AOF?): can't BGSAVE right now. " diff --git a/src/rdma.c b/src/rdma.c index dd6de395d0..bb38baa0f1 100644 --- a/src/rdma.c +++ b/src/rdma.c @@ -127,6 +127,8 @@ typedef struct rdma_listener { * handler into pending list */ static list *pending_list; +static rdma_listener *rdma_listeners; + static ConnectionType CT_RDMA; static int valkey_rdma_rx_size = VALKEY_RDMA_DEFAULT_RX_SIZE; @@ -141,12 +143,34 @@ static void serverRdmaError(char *err, const char *fmt, ...) { va_end(ap); } +static inline int connRdmaAllowCommand(void) { + /* RDMA MR is not accessible in a child process, avoid segment fault due to + * invalid MR access, close it rather than server random crash */ + if (server.in_fork_child != CHILD_TYPE_NONE) { + return C_ERR; + } + + return C_OK; +} + +static inline int connRdmaAllowRW(connection *conn) { + if (conn->state == CONN_STATE_ERROR || conn->state == CONN_STATE_CLOSED) { + return C_ERR; + } + + return connRdmaAllowCommand(); +} + static int rdmaPostRecv(RdmaContext *ctx, struct rdma_cm_id *cm_id, ValkeyRdmaCmd *cmd) { struct ibv_sge sge; size_t length = sizeof(ValkeyRdmaCmd); struct ibv_recv_wr recv_wr, *bad_wr; int ret; + if (connRdmaAllowCommand()) { + return C_ERR; + } + sge.addr = (uint64_t)cmd; sge.length = length; sge.lkey = ctx->cmd_mr->lkey; @@ -449,13 +473,22 @@ static int rdmaHandleEstablished(struct rdma_cm_event *ev) { return C_OK; } +static inline void rdmaDelKeepalive(aeEventLoop *el, RdmaContext *ctx) { + if (ctx->keepalive_te == AE_ERR) { + return; + } + + aeDeleteTimeEvent(el, ctx->keepalive_te); + ctx->keepalive_te = AE_ERR; +} + static int rdmaHandleDisconnect(aeEventLoop *el, struct rdma_cm_event *ev) { struct rdma_cm_id *cm_id = ev->id; RdmaContext *ctx = cm_id->context; connection *conn = ctx->conn; rdma_connection *rdma_conn = (rdma_connection *)conn; - aeDeleteTimeEvent(el, ctx->keepalive_te); + rdmaDelKeepalive(el, ctx); conn->state = CONN_STATE_CLOSED; /* we can't close connection now, let's mark this connection as closed state */ @@ -748,7 +781,7 @@ static rdma_listener *rdmaFdToListener(connListener *listener, int fd) { for (int i = 0; i < listener->count; i++) { if (listener->fd[i] != fd) continue; - return (rdma_listener *)listener->priv + i; + return &rdma_listeners[i]; } return NULL; @@ -1171,6 +1204,7 @@ static void connRdmaClose(connection *conn) { } ctx = cm_id->context; + rdmaDelKeepalive(server.el, ctx); rdma_disconnect(cm_id); /* poll all CQ before close */ @@ -1202,6 +1236,10 @@ static size_t connRdmaSend(connection *conn, const void *data, size_t data_len) char *remote_addr = ctx->tx_addr + ctx->tx.offset; int ret; + if (connRdmaAllowCommand()) { + return C_ERR; + } + memcpy(addr, data, data_len); sge.addr = (uint64_t)addr; @@ -1235,7 +1273,7 @@ static int connRdmaWrite(connection *conn, const void *data, size_t data_len) { RdmaContext *ctx = cm_id->context; uint32_t towrite; - if (conn->state == CONN_STATE_ERROR || conn->state == CONN_STATE_CLOSED) { + if (connRdmaAllowRW(conn)) { return C_ERR; } @@ -1278,7 +1316,7 @@ static int connRdmaRead(connection *conn, void *buf, size_t buf_len) { struct rdma_cm_id *cm_id = rdma_conn->cm_id; RdmaContext *ctx = cm_id->context; - if (conn->state == CONN_STATE_ERROR || conn->state == CONN_STATE_CLOSED) { + if (connRdmaAllowRW(conn)) { return C_ERR; } @@ -1300,7 +1338,7 @@ static ssize_t connRdmaSyncWrite(connection *conn, char *ptr, ssize_t size, long long long start = mstime(); uint32_t towrite; - if (conn->state == CONN_STATE_ERROR || conn->state == CONN_STATE_CLOSED) { + if (connRdmaAllowRW(conn)) { return C_ERR; } @@ -1343,7 +1381,7 @@ static ssize_t connRdmaSyncRead(connection *conn, char *ptr, ssize_t size, long long long start = mstime(); uint32_t toread; - if (conn->state == CONN_STATE_ERROR || conn->state == CONN_STATE_CLOSED) { + if (connRdmaAllowRW(conn)) { return C_ERR; } @@ -1378,7 +1416,7 @@ static ssize_t connRdmaSyncReadLine(connection *conn, char *ptr, ssize_t size, l char *c; char nl = 0; - if (conn->state == CONN_STATE_ERROR || conn->state == CONN_STATE_CLOSED) { + if (connRdmaAllowRW(conn)) { return C_ERR; } @@ -1537,7 +1575,7 @@ int connRdmaListen(connListener *listener) { bindaddr = default_bindaddr; } - listener->priv = rdma_listener = zcalloc_num(bindaddr_count, sizeof(*rdma_listener)); + rdma_listeners = rdma_listener = zcalloc_num(bindaddr_count, sizeof(*rdma_listener)); for (j = 0; j < bindaddr_count; j++) { char *addr = bindaddr[j]; int optional = *addr == '-'; @@ -1757,13 +1795,14 @@ static int rdmaChangeListener(void) { aeDeleteFileEvent(server.el, listener->fd[i], AE_READABLE); listener->fd[i] = -1; - struct rdma_listener *rdma_listener = (struct rdma_listener *)listener->priv + i; + struct rdma_listener *rdma_listener = &rdma_listeners[i]; rdma_destroy_id(rdma_listener->cm_id); rdma_destroy_event_channel(rdma_listener->cm_channel); } listener->count = 0; - zfree(listener->priv); + zfree(rdma_listeners); + rdma_listeners = NULL; closeListener(listener); diff --git a/src/replication.c b/src/replication.c index 51b654ae32..2f840dfcd6 100644 --- a/src/replication.c +++ b/src/replication.c @@ -651,7 +651,7 @@ void replicationFeedStreamFromPrimaryStream(char *buf, size_t buflen) { /* Debugging: this is handy to see the stream sent from primary * to replicas. Disabled with if(0). */ if (0) { - if (server.hide_user_data_from_log) { + if (!server.hide_user_data_from_log) { printf("%zu:", buflen); for (size_t j = 0; j < buflen; j++) { printf("%c", isprint(buf[j]) ? buf[j] : '.'); @@ -1761,6 +1761,7 @@ void updateReplicasWaitingBgsave(int bgsaveerr, int type) { client *replica = ln->value; if (replica->repl_state == REPLICA_STATE_WAIT_BGSAVE_END) { + int repldbfd; struct valkey_stat buf; if (bgsaveerr != C_OK) { @@ -1810,17 +1811,26 @@ void updateReplicasWaitingBgsave(int bgsaveerr, int type) { } replica->repl_start_cmd_stream_on_ack = 1; } else { - if ((replica->repldbfd = open(server.rdb_filename, O_RDONLY)) == -1 || - valkey_fstat(replica->repldbfd, &buf) == -1) { + repldbfd = open(server.rdb_filename, O_RDONLY); + if (repldbfd == -1) { freeClientAsync(replica); - serverLog(LL_WARNING, "SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno)); + serverLog(LL_WARNING, "SYNC failed. Can't open DB after BGSAVE: %s", strerror(errno)); continue; } + if (valkey_fstat(repldbfd, &buf) == -1) { + freeClientAsync(replica); + serverLog(LL_WARNING, "SYNC failed. Can't stat DB after BGSAVE: %s", strerror(errno)); + close(repldbfd); + continue; + } + replica->repldbfd = repldbfd; replica->repldboff = 0; replica->repldbsize = buf.st_size; replica->repl_state = REPLICA_STATE_SEND_BULK; replica->replpreamble = sdscatprintf(sdsempty(), "$%lld\r\n", (unsigned long long)replica->repldbsize); + /* When repl_state changes to REPLICA_STATE_SEND_BULK, we will release + * the resources in freeClient. */ connSetWriteHandler(replica->conn, NULL); if (connSetWriteHandler(replica->conn, sendBulkToReplica) == C_ERR) { freeClientAsync(replica); diff --git a/src/sds.c b/src/sds.c index e14f4bd0bd..4dd7d709aa 100644 --- a/src/sds.c +++ b/src/sds.c @@ -1032,6 +1032,86 @@ int hex_digit_to_int(char c) { } } +/* Helper function for sdssplitargs that parses a single argument. It + * populates the number characters needed to store the parsed argument + * in len, if provided, or will copy the parsed string into dst, if provided. + * If the string is able to be parsed, this function returns the number of + * characters that were parsed. If the argument can't be parsed, it + * returns 0. */ +static int sdsparsearg(const char *arg, unsigned int *len, char *dst) { + const char *p = arg; + int inq = 0; /* set to 1 if we are in "quotes" */ + int insq = 0; /* set to 1 if we are in 'single quotes' */ + int done = 0; + + while (!done) { + int new_char = -1; + if (inq) { + if (*p == '\\' && *(p + 1) == 'x' && is_hex_digit(*(p + 2)) && is_hex_digit(*(p + 3))) { + new_char = (hex_digit_to_int(*(p + 2)) * 16) + hex_digit_to_int(*(p + 3)); + p += 3; + } else if (*p == '\\' && *(p + 1)) { + p++; + switch (*p) { + case 'n': new_char = '\n'; break; + case 'r': new_char = '\r'; break; + case 't': new_char = '\t'; break; + case 'b': new_char = '\b'; break; + case 'a': new_char = '\a'; break; + default: new_char = *p; break; + } + } else if (*p == '"') { + /* closing quote must be followed by a space or + * nothing at all. */ + if (*(p + 1) && !isspace(*(p + 1))) return 0; + done = 1; + } else if (!*p) { + /* unterminated quotes */ + return 0; + } else { + new_char = *p; + } + } else if (insq) { + if (*p == '\\' && *(p + 1) == '\'') { + p++; + new_char = *p; + } else if (*p == '\'') { + /* closing quote must be followed by a space or + * nothing at all. */ + if (*(p + 1) && !isspace(*(p + 1))) return 0; + done = 1; + } else if (!*p) { + /* unterminated quotes */ + return 0; + } else { + new_char = *p; + } + } else { + switch (*p) { + case ' ': + case '\n': + case '\r': + case '\t': + case '\0': done = 1; break; + case '"': inq = 1; break; + case '\'': insq = 1; break; + default: new_char = *p; break; + } + } + if (new_char != -1) { + if (len) (*len)++; + if (dst) { + *dst = (char)new_char; + dst++; + } + } + if (*p) { + p++; + } + } + return p - arg; +} + /* Split a line into arguments, where every argument can be in the * following programming-language REPL-alike form: * @@ -1049,103 +1129,42 @@ int hex_digit_to_int(char c) { * The function returns the allocated tokens on success, even when the * input string is empty, or NULL if the input contains unbalanced * quotes or closed quotes followed by non space characters - * as in: "foo"bar or "foo' + * as in: "foo"bar or "foo'. + * + * The sds strings returned by this function are not initialized with + * extra space. */ sds *sdssplitargs(const char *line, int *argc) { const char *p = line; - char *current = NULL; char **vector = NULL; *argc = 0; - while (1) { + while (*p) { /* skip blanks */ while (*p && isspace(*p)) p++; - if (*p) { - /* get a token */ - int inq = 0; /* set to 1 if we are in "quotes" */ - int insq = 0; /* set to 1 if we are in 'single quotes' */ - int done = 0; - - if (current == NULL) current = sdsempty(); - while (!done) { - if (inq) { - if (*p == '\\' && *(p + 1) == 'x' && is_hex_digit(*(p + 2)) && is_hex_digit(*(p + 3))) { - unsigned char byte; - - byte = (hex_digit_to_int(*(p + 2)) * 16) + hex_digit_to_int(*(p + 3)); - current = sdscatlen(current, (char *)&byte, 1); - p += 3; - } else if (*p == '\\' && *(p + 1)) { - char c; - - p++; - switch (*p) { - case 'n': c = '\n'; break; - case 'r': c = '\r'; break; - case 't': c = '\t'; break; - case 'b': c = '\b'; break; - case 'a': c = '\a'; break; - default: c = *p; break; - } - current = sdscatlen(current, &c, 1); - } else if (*p == '"') { - /* closing quote must be followed by a space or - * nothing at all. */ - if (*(p + 1) && !isspace(*(p + 1))) goto err; - done = 1; - } else if (!*p) { - /* unterminated quotes */ - goto err; - } else { - current = sdscatlen(current, p, 1); - } - } else if (insq) { - if (*p == '\\' && *(p + 1) == '\'') { - p++; - current = sdscatlen(current, "'", 1); - } else if (*p == '\'') { - /* closing quote must be followed by a space or - * nothing at all. */ - if (*(p + 1) && !isspace(*(p + 1))) goto err; - done = 1; - } else if (!*p) { - /* unterminated quotes */ - goto err; - } else { - current = sdscatlen(current, p, 1); - } - } else { - switch (*p) { - case ' ': - case '\n': - case '\r': - case '\t': - case '\0': done = 1; break; - case '"': inq = 1; break; - case '\'': insq = 1; break; - default: current = sdscatlen(current, p, 1); break; - } - } - if (*p) p++; - } + if (!(*p)) break; + unsigned int len = 0; + if (sdsparsearg(p, &len, NULL)) { + sds current = sdsnewlen(SDS_NOINIT, len); + int parsedlen = sdsparsearg(p, NULL, current); + assert(parsedlen > 0); + p += parsedlen; + /* add the token to the vector */ vector = s_realloc(vector, ((*argc) + 1) * sizeof(char *)); vector[*argc] = current; (*argc)++; current = NULL; } else { - /* Even on empty input string return something not NULL. */ - if (vector == NULL) vector = s_malloc(sizeof(void *)); - return vector; + while ((*argc)--) sdsfree(vector[*argc]); + s_free(vector); + *argc = 0; + return NULL; } } - -err: - while ((*argc)--) sdsfree(vector[*argc]); - s_free(vector); - if (current) sdsfree(current); - *argc = 0; - return NULL; + /* Even on empty input string return something not NULL. */ + if (vector == NULL) vector = s_malloc(sizeof(void *)); + return vector; } /* Modify the string substituting all the occurrences of the set of diff --git a/src/server.c b/src/server.c index ee401509fc..413d2b7170 100644 --- a/src/server.c +++ b/src/server.c @@ -109,11 +109,69 @@ const char *replstateToString(int replstate); * function of the server may be called from other threads. */ void nolocks_localtime(struct tm *tmp, time_t t, time_t tz, int dst); +/* Formats the timezone offset into a string. daylight_active indicates whether dst is active (1) + * or not (0). */ +void formatTimezone(char *buf, size_t buflen, int timezone, int daylight_active) { + serverAssert(buflen >= 7); + serverAssert(timezone >= -50400 && timezone <= 43200); + // Adjust the timezone for daylight saving, if active + int total_offset = (-1) * timezone + 3600 * daylight_active; + int hours = abs(total_offset / 3600); + int minutes = abs(total_offset % 3600) / 60; + buf[0] = total_offset >= 0 ? '+' : '-'; + buf[1] = '0' + hours / 10; + buf[2] = '0' + hours % 10; + buf[3] = ':'; + buf[4] = '0' + minutes / 10; + buf[5] = '0' + minutes % 10; + buf[6] = '\0'; +} + +bool hasInvalidLogfmtChar(const char *msg) { + if (msg == NULL) return false; + + for (int i = 0; msg[i] != '\0'; i++) { + if (msg[i] == '"' || msg[i] == '\n' || msg[i] == '\r') { + return true; + } + } + return false; +} + +/* Modifies the input string by: + * replacing \r and \n with whitespace + * replacing " with ' + * + * Parameters: + * safemsg - A char pointer where the modified message will be stored + * safemsglen - size of safemsg + * msg - The original message */ +void filterInvalidLogfmtChar(char *safemsg, size_t safemsglen, const char *msg) { + serverAssert(safemsglen == LOG_MAX_LEN); + if (msg == NULL) return; + + size_t index = 0; + while (index < safemsglen - 1 && msg[index] != '\0') { + if (msg[index] == '"') { + safemsg[index] = '\''; + } else if (msg[index] == '\n' || msg[index] == '\r') { + safemsg[index] = ' '; + } else { + safemsg[index] = msg[index]; + } + index++; + } + safemsg[index] = '\0'; +} + /* Low level logging. To use only for very big messages, otherwise * serverLog() is to prefer. */ void serverLogRaw(int level, const char *msg) { const int syslogLevelMap[] = {LOG_DEBUG, LOG_INFO, LOG_NOTICE, LOG_WARNING}; const char *c = ".-*#"; + const char *verbose_level[] = {"debug", "info", "notice", "warning"}; + const char *roles[] = {"sentinel", "RDB/AOF", "replica", "primary"}; + const char *role_chars = "XCSM"; FILE *fp; char buf[64]; int rawmode = (level & LL_RAW); @@ -133,23 +191,54 @@ void serverLogRaw(int level, const char *msg) { } else { int off; struct timeval tv; - int role_char; pid_t pid = getpid(); int daylight_active = atomic_load_explicit(&server.daylight_active, memory_order_relaxed); gettimeofday(&tv, NULL); struct tm tm; nolocks_localtime(&tm, tv.tv_sec, server.timezone, daylight_active); - off = strftime(buf, sizeof(buf), "%d %b %Y %H:%M:%S.", &tm); - snprintf(buf + off, sizeof(buf) - off, "%03d", (int)tv.tv_usec / 1000); + switch (server.log_timestamp_format) { + case LOG_TIMESTAMP_LEGACY: + off = strftime(buf, sizeof(buf), "%d %b %Y %H:%M:%S.", &tm); + snprintf(buf + off, sizeof(buf) - off, "%03d", (int)tv.tv_usec / 1000); + break; + + case LOG_TIMESTAMP_ISO8601: + off = strftime(buf, sizeof(buf), "%Y-%m-%dT%H:%M:%S.", &tm); + char tzbuf[7]; + formatTimezone(tzbuf, sizeof(tzbuf), server.timezone, server.daylight_active); + snprintf(buf + off, sizeof(buf) - off, "%03d%s", (int)tv.tv_usec / 1000, tzbuf); + break; + + case LOG_TIMESTAMP_MILLISECONDS: + snprintf(buf, sizeof(buf), "%lld", (long long)tv.tv_sec * 1000 + (long long)tv.tv_usec / 1000); + break; + } + int role_index; if (server.sentinel_mode) { - role_char = 'X'; /* Sentinel. */ + role_index = 0; /* Sentinel. */ } else if (pid != server.pid) { - role_char = 'C'; /* RDB / AOF writing child. */ + role_index = 1; /* RDB / AOF writing child. */ } else { - role_char = (server.primary_replication_link ? 'S' : 'M'); /* replica or Primary. */ + role_index = (server.primary_replication_link ? 2 : 3); /* replica or Primary. */ + } + switch (server.log_format) { + case LOG_FORMAT_LOGFMT: + if (hasInvalidLogfmtChar(msg)) { + char safemsg[LOG_MAX_LEN]; + filterInvalidLogfmtChar(safemsg, LOG_MAX_LEN, msg); + fprintf(fp, "pid=%d role=%s timestamp=\"%s\" level=%s message=\"%s\"\n", (int)getpid(), roles[role_index], + buf, verbose_level[level], safemsg); + } else { + fprintf(fp, "pid=%d role=%s timestamp=\"%s\" level=%s message=\"%s\"\n", (int)getpid(), roles[role_index], + buf, verbose_level[level], msg); + } + break; + + case LOG_FORMAT_LEGACY: + fprintf(fp, "%d:%c %s %c %s\n", (int)getpid(), role_chars[role_index], buf, c[level], msg); + break; } - fprintf(fp, "%d:%c %s %c %s\n", (int)getpid(), role_char, buf, c[level], msg); } fflush(fp); @@ -3818,12 +3907,6 @@ int processCommand(client *c) { reqresAppendRequest(c); } - /* Handle possible security attacks. */ - if (!strcasecmp(c->argv[0]->ptr, "host:") || !strcasecmp(c->argv[0]->ptr, "post")) { - securityWarningCommand(c); - return C_ERR; - } - /* If we're inside a module blocked context yielding that wants to avoid * processing clients, postpone the command. */ if (server.busy_module_yield_flags != BUSY_MODULE_YIELD_NONE && @@ -3838,6 +3921,13 @@ int processCommand(client *c) { * we do not have to repeat the same checks */ if (!client_reprocessing_command) { struct serverCommand *cmd = c->io_parsed_cmd ? c->io_parsed_cmd : lookupCommand(c->argv, c->argc); + if (!cmd) { + /* Handle possible security attacks. */ + if (!strcasecmp(c->argv[0]->ptr, "host:") || !strcasecmp(c->argv[0]->ptr, "post")) { + securityWarningCommand(c); + return C_ERR; + } + } c->cmd = c->lastcmd = c->realcmd = cmd; sds err; if (!commandCheckExistence(c, &err)) { @@ -6711,7 +6801,8 @@ serverTestProc *getTestProcByName(const char *name) { } #endif -int main(int argc, char **argv) { +/* Main is marked as weak so that unit tests can use their own main function. */ +__attribute__((weak)) int main(int argc, char **argv) { struct timeval tv; int j; char config_from_stdin = 0; diff --git a/src/server.h b/src/server.h index b0520093fd..bda62acdd7 100644 --- a/src/server.h +++ b/src/server.h @@ -568,6 +568,15 @@ typedef enum { #define PAUSE_ACTION_EVICT (1 << 3) #define PAUSE_ACTION_REPLICA (1 << 4) /* pause replica traffic */ +/* Sets log format */ +typedef enum { LOG_FORMAT_LEGACY = 0, + LOG_FORMAT_LOGFMT } log_format_type; + +/* Sets log timestamp format */ +typedef enum { LOG_TIMESTAMP_LEGACY = 0, + LOG_TIMESTAMP_ISO8601, + LOG_TIMESTAMP_MILLISECONDS } log_timestamp_type; + /* common sets of actions to pause/unpause */ #define PAUSE_ACTIONS_CLIENT_WRITE_SET \ (PAUSE_ACTION_CLIENT_WRITE | PAUSE_ACTION_EXPIRE | PAUSE_ACTION_EVICT | PAUSE_ACTION_REPLICA) @@ -2015,17 +2024,19 @@ struct valkeyServer { serverOpArray also_propagate; /* Additional command to propagate. */ int replication_allowed; /* Are we allowed to replicate? */ /* Logging */ - char *logfile; /* Path of log file */ - int syslog_enabled; /* Is syslog enabled? */ - char *syslog_ident; /* Syslog ident */ - int syslog_facility; /* Syslog facility */ - int crashlog_enabled; /* Enable signal handler for crashlog. - * disable for clean core dumps. */ - int crashed; /* True if the server has crashed, used in catClientInfoString - * to indicate that no wait for IO threads is needed. */ - int memcheck_enabled; /* Enable memory check on crash. */ - int use_exit_on_panic; /* Use exit() on panic and assert rather than - * abort(). useful for Valgrind. */ + char *logfile; /* Path of log file */ + int syslog_enabled; /* Is syslog enabled? */ + char *syslog_ident; /* Syslog ident */ + int syslog_facility; /* Syslog facility */ + int crashlog_enabled; /* Enable signal handler for crashlog. + * disable for clean core dumps. */ + int crashed; /* True if the server has crashed, used in catClientInfoString + * to indicate that no wait for IO threads is needed. */ + int memcheck_enabled; /* Enable memory check on crash. */ + int use_exit_on_panic; /* Use exit() on panic and assert rather than + * abort(). useful for Valgrind. */ + int log_format; /* Print log in specific format */ + int log_timestamp_format; /* Timestamp format in log */ /* Shutdown */ int shutdown_timeout; /* Graceful shutdown time limit in seconds. */ int shutdown_on_sigint; /* Shutdown flags configured for SIGINT. */ diff --git a/src/sort.c b/src/sort.c index f027b0c321..92777b068c 100644 --- a/src/sort.c +++ b/src/sort.c @@ -43,6 +43,11 @@ serverSortOperation *createSortOperation(int type, robj *pattern) { return so; } +/* Return 1 if pattern is the special pattern '#'. */ +static int isReturnSubstPattern(sds pattern) { + return pattern[0] == '#' && pattern[1] == '\0'; +} + /* Return the value associated to the key with a name obtained using * the following rules: * @@ -68,7 +73,7 @@ robj *lookupKeyByPattern(serverDb *db, robj *pattern, robj *subst) { /* If the pattern is "#" return the substitution object itself in order * to implement the "SORT ... GET #" feature. */ spat = pattern->ptr; - if (spat[0] == '#' && spat[1] == '\0') { + if (isReturnSubstPattern(spat)) { incrRefCount(subst); return subst; } @@ -258,6 +263,7 @@ void sortCommandGeneric(client *c, int readonly) { * unless we can make sure the keys formed by the pattern are in the same slot * as the key to sort. */ if (server.cluster_enabled && + !isReturnSubstPattern(c->argv[j + 1]->ptr) && patternHashSlot(c->argv[j + 1]->ptr, sdslen(c->argv[j + 1]->ptr)) != getKeySlot(c->argv[1]->ptr)) { addReplyError(c, "GET option of SORT denied in Cluster mode when " "keys formed by the pattern may be in different slots."); diff --git a/src/unit/test_files.h b/src/unit/test_files.h index cd2e0c5b92..c2b062039a 100644 --- a/src/unit/test_files.h +++ b/src/unit/test_files.h @@ -99,6 +99,7 @@ int test_raxFuzz(int argc, char **argv, int flags); int test_sds(int argc, char **argv, int flags); int test_typesAndAllocSize(int argc, char **argv, int flags); int test_sdsHeaderSizes(int argc, char **argv, int flags); +int test_sdssplitargs(int argc, char **argv, int flags); int test_sha1(int argc, char **argv, int flags); int test_string2ll(int argc, char **argv, int flags); int test_string2l(int argc, char **argv, int flags); @@ -157,7 +158,7 @@ unitTest __test_intset_c[] = {{"test_intsetValueEncodings", test_intsetValueEnco unitTest __test_kvstore_c[] = {{"test_kvstoreAdd16Keys", test_kvstoreAdd16Keys}, {"test_kvstoreIteratorRemoveAllKeysNoDeleteEmptyDict", test_kvstoreIteratorRemoveAllKeysNoDeleteEmptyDict}, {"test_kvstoreIteratorRemoveAllKeysDeleteEmptyDict", test_kvstoreIteratorRemoveAllKeysDeleteEmptyDict}, {"test_kvstoreDictIteratorRemoveAllKeysNoDeleteEmptyDict", test_kvstoreDictIteratorRemoveAllKeysNoDeleteEmptyDict}, {"test_kvstoreDictIteratorRemoveAllKeysDeleteEmptyDict", test_kvstoreDictIteratorRemoveAllKeysDeleteEmptyDict}, {NULL, NULL}}; unitTest __test_listpack_c[] = {{"test_listpackCreateIntList", test_listpackCreateIntList}, {"test_listpackCreateList", test_listpackCreateList}, {"test_listpackLpPrepend", test_listpackLpPrepend}, {"test_listpackLpPrependInteger", test_listpackLpPrependInteger}, {"test_listpackGetELementAtIndex", test_listpackGetELementAtIndex}, {"test_listpackPop", test_listpackPop}, {"test_listpackGetELementAtIndex2", test_listpackGetELementAtIndex2}, {"test_listpackIterate0toEnd", test_listpackIterate0toEnd}, {"test_listpackIterate1toEnd", test_listpackIterate1toEnd}, {"test_listpackIterate2toEnd", test_listpackIterate2toEnd}, {"test_listpackIterateBackToFront", test_listpackIterateBackToFront}, {"test_listpackIterateBackToFrontWithDelete", test_listpackIterateBackToFrontWithDelete}, {"test_listpackDeleteWhenNumIsMinusOne", test_listpackDeleteWhenNumIsMinusOne}, {"test_listpackDeleteWithNegativeIndex", test_listpackDeleteWithNegativeIndex}, {"test_listpackDeleteInclusiveRange0_0", test_listpackDeleteInclusiveRange0_0}, {"test_listpackDeleteInclusiveRange0_1", test_listpackDeleteInclusiveRange0_1}, {"test_listpackDeleteInclusiveRange1_2", test_listpackDeleteInclusiveRange1_2}, {"test_listpackDeleteWitStartIndexOutOfRange", test_listpackDeleteWitStartIndexOutOfRange}, {"test_listpackDeleteWitNumOverflow", test_listpackDeleteWitNumOverflow}, {"test_listpackBatchDelete", test_listpackBatchDelete}, {"test_listpackDeleteFooWhileIterating", test_listpackDeleteFooWhileIterating}, {"test_listpackReplaceWithSameSize", test_listpackReplaceWithSameSize}, {"test_listpackReplaceWithDifferentSize", test_listpackReplaceWithDifferentSize}, {"test_listpackRegressionGt255Bytes", test_listpackRegressionGt255Bytes}, {"test_listpackCreateLongListAndCheckIndices", test_listpackCreateLongListAndCheckIndices}, {"test_listpackCompareStrsWithLpEntries", test_listpackCompareStrsWithLpEntries}, {"test_listpackLpMergeEmptyLps", test_listpackLpMergeEmptyLps}, {"test_listpackLpMergeLp1Larger", test_listpackLpMergeLp1Larger}, {"test_listpackLpMergeLp2Larger", test_listpackLpMergeLp2Larger}, {"test_listpackLpNextRandom", test_listpackLpNextRandom}, {"test_listpackLpNextRandomCC", test_listpackLpNextRandomCC}, {"test_listpackRandomPairWithOneElement", test_listpackRandomPairWithOneElement}, {"test_listpackRandomPairWithManyElements", test_listpackRandomPairWithManyElements}, {"test_listpackRandomPairsWithOneElement", test_listpackRandomPairsWithOneElement}, {"test_listpackRandomPairsWithManyElements", test_listpackRandomPairsWithManyElements}, {"test_listpackRandomPairsUniqueWithOneElement", test_listpackRandomPairsUniqueWithOneElement}, {"test_listpackRandomPairsUniqueWithManyElements", test_listpackRandomPairsUniqueWithManyElements}, {"test_listpackPushVariousEncodings", test_listpackPushVariousEncodings}, {"test_listpackLpFind", test_listpackLpFind}, {"test_listpackLpValidateIntegrity", test_listpackLpValidateIntegrity}, {"test_listpackNumberOfElementsExceedsLP_HDR_NUMELE_UNKNOWN", test_listpackNumberOfElementsExceedsLP_HDR_NUMELE_UNKNOWN}, {"test_listpackStressWithRandom", test_listpackStressWithRandom}, {"test_listpackSTressWithVariableSize", test_listpackSTressWithVariableSize}, {"test_listpackBenchmarkInit", test_listpackBenchmarkInit}, {"test_listpackBenchmarkLpAppend", test_listpackBenchmarkLpAppend}, {"test_listpackBenchmarkLpFindString", test_listpackBenchmarkLpFindString}, {"test_listpackBenchmarkLpFindNumber", test_listpackBenchmarkLpFindNumber}, {"test_listpackBenchmarkLpSeek", test_listpackBenchmarkLpSeek}, {"test_listpackBenchmarkLpValidateIntegrity", test_listpackBenchmarkLpValidateIntegrity}, {"test_listpackBenchmarkLpCompareWithString", test_listpackBenchmarkLpCompareWithString}, {"test_listpackBenchmarkLpCompareWithNumber", test_listpackBenchmarkLpCompareWithNumber}, {"test_listpackBenchmarkFree", test_listpackBenchmarkFree}, {NULL, NULL}}; unitTest __test_rax_c[] = {{"test_raxRandomWalk", test_raxRandomWalk}, {"test_raxIteratorUnitTests", test_raxIteratorUnitTests}, {"test_raxTryInsertUnitTests", test_raxTryInsertUnitTests}, {"test_raxRegressionTest1", test_raxRegressionTest1}, {"test_raxRegressionTest2", test_raxRegressionTest2}, {"test_raxRegressionTest3", test_raxRegressionTest3}, {"test_raxRegressionTest4", test_raxRegressionTest4}, {"test_raxRegressionTest5", test_raxRegressionTest5}, {"test_raxRegressionTest6", test_raxRegressionTest6}, {"test_raxBenchmark", test_raxBenchmark}, {"test_raxHugeKey", test_raxHugeKey}, {"test_raxFuzz", test_raxFuzz}, {NULL, NULL}}; -unitTest __test_sds_c[] = {{"test_sds", test_sds}, {"test_typesAndAllocSize", test_typesAndAllocSize}, {"test_sdsHeaderSizes", test_sdsHeaderSizes}, {NULL, NULL}}; +unitTest __test_sds_c[] = {{"test_sds", test_sds}, {"test_typesAndAllocSize", test_typesAndAllocSize}, {"test_sdsHeaderSizes", test_sdsHeaderSizes}, {"test_sdssplitargs", test_sdssplitargs}, {NULL, NULL}}; unitTest __test_sha1_c[] = {{"test_sha1", test_sha1}, {NULL, NULL}}; unitTest __test_util_c[] = {{"test_string2ll", test_string2ll}, {"test_string2l", test_string2l}, {"test_ll2string", test_ll2string}, {"test_ld2string", test_ld2string}, {"test_fixedpoint_d2string", test_fixedpoint_d2string}, {"test_version2num", test_version2num}, {"test_reclaimFilePageCache", test_reclaimFilePageCache}, {NULL, NULL}}; unitTest __test_ziplist_c[] = {{"test_ziplistCreateIntList", test_ziplistCreateIntList}, {"test_ziplistPop", test_ziplistPop}, {"test_ziplistGetElementAtIndex3", test_ziplistGetElementAtIndex3}, {"test_ziplistGetElementOutOfRange", test_ziplistGetElementOutOfRange}, {"test_ziplistGetLastElement", test_ziplistGetLastElement}, {"test_ziplistGetFirstElement", test_ziplistGetFirstElement}, {"test_ziplistGetElementOutOfRangeReverse", test_ziplistGetElementOutOfRangeReverse}, {"test_ziplistIterateThroughFullList", test_ziplistIterateThroughFullList}, {"test_ziplistIterateThroughListFrom1ToEnd", test_ziplistIterateThroughListFrom1ToEnd}, {"test_ziplistIterateThroughListFrom2ToEnd", test_ziplistIterateThroughListFrom2ToEnd}, {"test_ziplistIterateThroughStartOutOfRange", test_ziplistIterateThroughStartOutOfRange}, {"test_ziplistIterateBackToFront", test_ziplistIterateBackToFront}, {"test_ziplistIterateBackToFrontDeletingAllItems", test_ziplistIterateBackToFrontDeletingAllItems}, {"test_ziplistDeleteInclusiveRange0To0", test_ziplistDeleteInclusiveRange0To0}, {"test_ziplistDeleteInclusiveRange0To1", test_ziplistDeleteInclusiveRange0To1}, {"test_ziplistDeleteInclusiveRange1To2", test_ziplistDeleteInclusiveRange1To2}, {"test_ziplistDeleteWithStartIndexOutOfRange", test_ziplistDeleteWithStartIndexOutOfRange}, {"test_ziplistDeleteWithNumOverflow", test_ziplistDeleteWithNumOverflow}, {"test_ziplistDeleteFooWhileIterating", test_ziplistDeleteFooWhileIterating}, {"test_ziplistReplaceWithSameSize", test_ziplistReplaceWithSameSize}, {"test_ziplistReplaceWithDifferentSize", test_ziplistReplaceWithDifferentSize}, {"test_ziplistRegressionTestForOver255ByteStrings", test_ziplistRegressionTestForOver255ByteStrings}, {"test_ziplistRegressionTestDeleteNextToLastEntries", test_ziplistRegressionTestDeleteNextToLastEntries}, {"test_ziplistCreateLongListAndCheckIndices", test_ziplistCreateLongListAndCheckIndices}, {"test_ziplistCompareStringWithZiplistEntries", test_ziplistCompareStringWithZiplistEntries}, {"test_ziplistMergeTest", test_ziplistMergeTest}, {"test_ziplistStressWithRandomPayloadsOfDifferentEncoding", test_ziplistStressWithRandomPayloadsOfDifferentEncoding}, {"test_ziplistCascadeUpdateEdgeCases", test_ziplistCascadeUpdateEdgeCases}, {"test_ziplistInsertEdgeCase", test_ziplistInsertEdgeCase}, {"test_ziplistStressWithVariableSize", test_ziplistStressWithVariableSize}, {"test_BenchmarkziplistFind", test_BenchmarkziplistFind}, {"test_BenchmarkziplistIndex", test_BenchmarkziplistIndex}, {"test_BenchmarkziplistValidateIntegrity", test_BenchmarkziplistValidateIntegrity}, {"test_BenchmarkziplistCompareWithString", test_BenchmarkziplistCompareWithString}, {"test_BenchmarkziplistCompareWithNumber", test_BenchmarkziplistCompareWithNumber}, {"test_ziplistStress__ziplistCascadeUpdate", test_ziplistStress__ziplistCascadeUpdate}, {NULL, NULL}}; diff --git a/src/unit/test_sds.c b/src/unit/test_sds.c index 19b5c7d73f..b97d0d9d32 100644 --- a/src/unit/test_sds.c +++ b/src/unit/test_sds.c @@ -328,3 +328,44 @@ int test_sdsHeaderSizes(int argc, char **argv, int flags) { return 0; } + +int test_sdssplitargs(int argc, char **argv, int flags) { + UNUSED(argc); + UNUSED(argv); + UNUSED(flags); + + int len; + sds *sargv; + + sargv = sdssplitargs("Testing one two three", &len); + TEST_ASSERT(4 == len); + TEST_ASSERT(!strcmp("Testing", sargv[0])); + TEST_ASSERT(!strcmp("one", sargv[1])); + TEST_ASSERT(!strcmp("two", sargv[2])); + TEST_ASSERT(!strcmp("three", sargv[3])); + sdsfreesplitres(sargv, len); + + sargv = sdssplitargs("", &len); + TEST_ASSERT(0 == len); + TEST_ASSERT(sargv != NULL); + sdsfreesplitres(sargv, len); + + sargv = sdssplitargs("\"Testing split strings\" \'Another split string\'", &len); + TEST_ASSERT(2 == len); + TEST_ASSERT(!strcmp("Testing split strings", sargv[0])); + TEST_ASSERT(!strcmp("Another split string", sargv[1])); + sdsfreesplitres(sargv, len); + + sargv = sdssplitargs("\"Hello\" ", &len); + TEST_ASSERT(1 == len); + TEST_ASSERT(!strcmp("Hello", sargv[0])); + sdsfreesplitres(sargv, len); + + char *binary_string = "\"\\x73\\x75\\x70\\x65\\x72\\x20\\x00\\x73\\x65\\x63\\x72\\x65\\x74\\x20\\x70\\x61\\x73\\x73\\x77\\x6f\\x72\\x64\""; + sargv = sdssplitargs(binary_string, &len); + TEST_ASSERT(1 == len); + TEST_ASSERT(22 == sdslen(sargv[0])); + sdsfreesplitres(sargv, len); + + return 0; +} diff --git a/src/ziplist.c b/src/ziplist.c index d4f8b71699..608487fa2b 100644 --- a/src/ziplist.c +++ b/src/ziplist.c @@ -1,4 +1,4 @@ -/* The ziplist is a specially encoded dually linked list that is designed +/* The ziplist is a specially encoded doubly linked list that is designed * to be very memory efficient. It stores both strings and integer values, * where integers are encoded as actual integers instead of a series of * characters. It allows push and pop operations on either side of the list diff --git a/tests/cluster/tests/28-cluster-shards.tcl b/tests/cluster/tests/28-cluster-shards.tcl index d6534c816b..5fb6743246 100644 --- a/tests/cluster/tests/28-cluster-shards.tcl +++ b/tests/cluster/tests/28-cluster-shards.tcl @@ -117,7 +117,7 @@ test "Kill a node and tell the replica to immediately takeover" { # Primary 0 node should report as fail, wait until the new primary acknowledges it. test "Verify health as fail for killed node" { - wait_for_condition 50 100 { + wait_for_condition 1000 50 { "fail" eq [dict get [get_node_info_from_shard $node_0_id 4 "node"] "health"] } else { fail "New primary never detected the node failed" diff --git a/tests/integration/rdb.tcl b/tests/integration/rdb.tcl index e3f92bf521..61cb0cea7e 100644 --- a/tests/integration/rdb.tcl +++ b/tests/integration/rdb.tcl @@ -170,6 +170,64 @@ start_server {} { } assert_equal [s rdb_changes_since_last_save] 0 } + + test {bgsave cancel aborts save} { + r config set save "" + # Generating RDB will take some 100 seconds + r config set rdb-key-save-delay 1000000 + populate 100 "" 16 + + r bgsave + wait_for_condition 50 100 { + [s rdb_bgsave_in_progress] == 1 + } else { + fail "bgsave did not start in time" + } + set fork_child_pid [get_child_pid 0] + + assert {[r bgsave cancel] eq {Background saving cancelled}} + set temp_rdb [file join [lindex [r config get dir] 1] temp-${fork_child_pid}.rdb] + # Temp rdb must be deleted + wait_for_condition 50 100 { + ![file exists $temp_rdb] + } else { + fail "bgsave temp file was not deleted after cancel" + } + + # Make sure no save is running and that bgsave return an error + wait_for_condition 50 100 { + [s rdb_bgsave_in_progress] == 0 + } else { + fail "bgsave is currently running" + } + assert_error "ERR Background saving is currently not in progress or scheduled" {r bgsave cancel} + } + + test {bgsave cancel schedulled request} { + r config set save "" + # Generating RDB will take some 100 seconds + r config set rdb-key-save-delay 1000000 + populate 100 "" 16 + + # start a long AOF child + r bgrewriteaof + wait_for_condition 50 100 { + [s aof_rewrite_in_progress] == 1 + } else { + fail "aof not started" + } + + # Make sure cancel return valid status + assert {[r bgsave schedule] eq {Background saving scheduled}} + + # Cancel the scheduled save + assert {[r bgsave cancel] eq {Scheduled background saving cancelled}} + + # Make sure a second call to bgsave cancel return an error + assert_error "ERR Background saving is currently not in progress or scheduled" {r bgsave cancel} + } + + } test {client freed during loading} { diff --git a/tests/unit/cluster/cluster-shards.tcl b/tests/unit/cluster/cluster-shards.tcl index 19acd186f5..170114d822 100644 --- a/tests/unit/cluster/cluster-shards.tcl +++ b/tests/unit/cluster/cluster-shards.tcl @@ -42,7 +42,7 @@ start_cluster 3 3 {tags {external:skip cluster}} { } test "Verify health as fail for killed node" { - wait_for_condition 50 100 { + wait_for_condition 1000 50 { "fail" eq [dict get [get_node_info_from_shard $node_0_id $validation_node "node"] "health"] } else { fail "New primary never detected the node failed" diff --git a/tests/unit/sort.tcl b/tests/unit/sort.tcl index 397e7e12ea..cd171ee51e 100644 --- a/tests/unit/sort.tcl +++ b/tests/unit/sort.tcl @@ -384,24 +384,28 @@ start_cluster 1 0 {tags {"external:skip cluster sort"}} { test "sort by in cluster mode" { catch {r sort "{a}mylist" by by*} e assert_match {ERR BY option of SORT denied in Cluster mode when *} $e - r sort "{a}mylist" by "{a}by*" - } {3 1 2} + assert_equal {3 1 2} [r sort "{a}mylist" by "{a}by*"] + assert_equal {3 1 2} [r sort "{a}mylist" by "{a}by*" get #] + } test "sort get in cluster mode" { catch {r sort "{a}mylist" by "{a}by*" get get*} e assert_match {ERR GET option of SORT denied in Cluster mode when *} $e - r sort "{a}mylist" by "{a}by*" get "{a}get*" - } {30 200 100} + assert_equal {30 200 100} [r sort "{a}mylist" by "{a}by*" get "{a}get*"] + assert_equal {30 3 200 1 100 2} [r sort "{a}mylist" by "{a}by*" get "{a}get*" get #] + } test "sort_ro by in cluster mode" { catch {r sort_ro "{a}mylist" by by*} e assert_match {ERR BY option of SORT denied in Cluster mode when *} $e - r sort_ro "{a}mylist" by "{a}by*" - } {3 1 2} + assert_equal {3 1 2} [r sort_ro "{a}mylist" by "{a}by*"] + assert_equal {3 1 2} [r sort_ro "{a}mylist" by "{a}by*" get #] + } test "sort_ro get in cluster mode" { catch {r sort_ro "{a}mylist" by "{a}by*" get get*} e assert_match {ERR GET option of SORT denied in Cluster mode when *} $e - r sort_ro "{a}mylist" by "{a}by*" get "{a}get*" - } {30 200 100} + assert_equal {30 200 100} [r sort_ro "{a}mylist" by "{a}by*" get "{a}get*"] + assert_equal {30 3 200 1 100 2} [r sort_ro "{a}mylist" by "{a}by*" get "{a}get*" get #] + } } diff --git a/valkey.conf b/valkey.conf index f485b42b1a..7c7b9da43e 100644 --- a/valkey.conf +++ b/valkey.conf @@ -348,6 +348,23 @@ pidfile /var/run/valkey_6379.pid # nothing (nothing is logged) loglevel notice +# Specify the logging format. +# This can be one of: +# +# - legacy: the default, traditional log format +# - logfmt: a structured log format; see https://www.brandur.org/logfmt +# +# log-format legacy + +# Specify the timestamp format used in logs using 'log-timestamp-format'. +# +# - legacy: default format +# - iso8601: ISO 8601 extended date and time with time zone, on the form +# yyyy-mm-ddThh:mm:ss.sss±hh:mm +# - milliseconds: milliseconds since the epoch +# +# log-timestamp-format legacy + # Specify the log file name. Also the empty string can be used to force # the server to log on the standard output. Note that if you use standard # output for logging but daemonize, logs will be sent to /dev/null @@ -1308,7 +1325,11 @@ lazyfree-lazy-user-del yes # deletion, which can be controlled by passing the [SYNC|ASYNC] flags into the # commands. When neither flag is passed, this directive will be used to determine # if the data should be deleted asynchronously. - +# +# When a replica performs a node reset via CLUSTER RESET, the entire +# database content is removed to allow the node to become an empty primary. +# This directive also determines whether the data should be deleted asynchronously. +# # There are many problems with running flush synchronously. Even in single CPU # environments, the thread managers should balance between the freeing and # serving incoming requests. The default value is yes. @@ -1576,7 +1597,7 @@ aof-timestamp-enabled no # Maximum time to wait for replicas when shutting down, in seconds. # # During shut down, a grace period allows any lagging replicas to catch up with -# the latest replication offset before the primary exists. This period can +# the latest replication offset before the primary exits. This period can # prevent data loss, especially for deployments without configured disk backups. # # The 'shutdown-timeout' value is the grace period's duration in seconds. It is