Skip to content

Commit

Permalink
Squashed commit of the following:
Browse files Browse the repository at this point in the history
commit 3c32ee1
Author: Madelyn Olson <[email protected]>
Date:   Mon Nov 4 12:36:20 2024 -0800

    Add a filter option to drop all cluster packets (valkey-io#1252)

    A minor debugging change that helped in the investigation of
    valkey-io#1251. Basically there are
    some edge cases where we want to fully isolate a note from receiving
    packets, but can't suspend the process because we need it to continue
    sending outbound traffic. So, added a filter for that.

    Signed-off-by: Madelyn Olson <[email protected]>

commit a102852
Author: Binbin <[email protected]>
Date:   Sat Nov 2 19:51:14 2024 +0800

    Fix timing issue in cluster-shards tests (valkey-io#1243)

    The cluster-node-timeout is 3000 in our tests, the timing test wasn't
    succeeding, so extending the wait_for made them much more reliable.

    Signed-off-by: Binbin <[email protected]>

commit 0d7b234
Author: Jim Brunner <[email protected]>
Date:   Fri Nov 1 15:16:18 2024 -0700

    correct type internal to kvstore (minor) (valkey-io#1246)

    All of the internal variables related to number of dicts in the kvstore
    are type `int`. Not sure why these 2 items were declared as `long long`.

    Signed-off-by: Jim Brunner <[email protected]>

commit e985ead
Author: zhenwei pi <[email protected]>
Date:   Fri Nov 1 20:28:09 2024 +0800

    RDMA: Prevent IO for child process (valkey-io#1244)

    RDMA MR (memory region) is not forkable, the VMA (virtual memory area)
    of a MR gets empty in a child process. Prevent IO for child process to
    avoid server crash.

    In the check for whether read and write is allowed in an RDMA
    connection, a check that if we're in a child process is added. If we
    are, the function returns an error, which will cause the RDMA client to
    be disconnected.

    Suggested-by: Viktor Söderqvist <[email protected]>
    Signed-off-by: zhenwei pi <[email protected]>

commit 1c222f7
Author: Madelyn Olson <[email protected]>
Date:   Thu Oct 31 11:37:53 2024 -0700

    Improve performance of sdssplitargs (valkey-io#1230)

    The current implementation of `sdssplitargs` does repeated `sdscatlen`
    to build the parsed arguments, which isn't very efficient because it
    does a lot of extra reallocations and moves through the sds code a lot.
    It also typically results in memory overhead, because `sdscatlen`
    over-allocates, which is usually not needed since args are usually not
    modified after being created.

    The new implementation of sdssplitargs does two passes, the first to
    parse the argument to figure out the final length and the second to
    actually copy the string. It's generally about 2x faster for larger
    strings (~100 bytes), and about 20% faster for small strings (~10
    bytes). This is generally faster since as long as everything is in the
    CPU cache, it's going to be fast.

    There are a couple of sanity tests, none existed before, as well as some
    fuzzying which was used to find some bugs and also to do the
    benchmarking. The original benchmarking code can be seen
    valkey-io@6576aeb.

    ```
    test_sdssplitargs_benchmark - unit/test_sds.c:530] Using random seed: 1729883235
    [test_sdssplitargs_benchmark - unit/test_sds.c:577] Improvement: 56.44%, new:13039us, old:29930us
    [test_sdssplitargs_benchmark - unit/test_sds.c:577] Improvement: 56.58%, new:12057us, old:27771us
    [test_sdssplitargs_benchmark - unit/test_sds.c:577] Improvement: 59.18%, new:9048us, old:22165us
    [test_sdssplitargs_benchmark - unit/test_sds.c:577] Improvement: 54.61%, new:12381us, old:27278us
    [test_sdssplitargs_benchmark - unit/test_sds.c:577] Improvement: 51.17%, new:16012us, old:32793us
    [test_sdssplitargs_benchmark - unit/test_sds.c:577] Improvement: 49.18%, new:16041us, old:31563us
    [test_sdssplitargs_benchmark - unit/test_sds.c:577] Improvement: 58.40%, new:12450us, old:29930us
    [test_sdssplitargs_benchmark - unit/test_sds.c:577] Improvement: 56.49%, new:13066us, old:30031us
    [test_sdssplitargs_benchmark - unit/test_sds.c:577] Improvement: 58.75%, new:12744us, old:30894us
    [test_sdssplitargs_benchmark - unit/test_sds.c:577] Improvement: 52.44%, new:16885us, old:35504us
    [test_sdssplitargs_benchmark - unit/test_sds.c:577] Improvement: 62.57%, new:8107us, old:21659us
    [test_sdssplitargs_benchmark - unit/test_sds.c:577] Improvement: 62.12%, new:8320us, old:21966us
    [test_sdssplitargs_benchmark - unit/test_sds.c:577] Improvement: 45.23%, new:13960us, old:25487us
    [test_sdssplitargs_benchmark - unit/test_sds.c:577] Improvement: 57.95%, new:9188us, old:21849us
    ```

    ---------

    Signed-off-by: Madelyn Olson <[email protected]>

commit 91cbf77
Author: Masahiro Ide <[email protected]>
Date:   Fri Nov 1 03:30:05 2024 +0900

    Eliminate snprintf usage at setDeferredAggregateLen (valkey-io#1234)

    to align with how we encode the length at `_addReplyLongLongWithPrefix`

    Signed-off-by: Masahiro Ide <[email protected]>
    Co-authored-by: Masahiro Ide <[email protected]>

commit ab98f37
Author: zhenwei pi <[email protected]>
Date:   Wed Oct 30 18:12:42 2024 +0800

    RDMA: Delete keepalive timer on closing (valkey-io#1237)

    Typically, RDMA connection gets closed by client side, the server side
    handles diconnected CM event, and delete keepalive timer correctly.
    However, the server side may close connection voluntarily, for example
    the maxium connections exceed. Handle this case to avoid invalid memory
    access.

    Signed-off-by: zhenwei pi <[email protected]>

commit 789a73b
Author: Binbin <[email protected]>
Date:   Wed Oct 30 10:25:50 2024 +0800

    Minor fix to debug logging in replicationFeedStreamFromPrimaryStream (valkey-io#1235)

    We should only print logs when hide-user-data-from-log is off.

    Signed-off-by: Binbin <[email protected]>

commit 13f5f66
Author: Shivshankar <[email protected]>
Date:   Tue Oct 29 19:19:56 2024 -0400

    Update the argument of clusterNodeGetReplica declaration (valkey-io#1239)

    clusterNodeGetReplica agrumnets are missed to migrate during the slave
    to replication migration so updated the argument slave to replica.

    Signed-off-by: Shivshankar-Reddy <[email protected]>

commit 5a4c064
Author: Madelyn Olson <[email protected]>
Date:   Tue Oct 29 14:26:17 2024 -0700

    Mark main and serverAssert as weak symbols to be overridden (valkey-io#1232)

    At some point unit tests stopped building on MacOS because of duplicate
    symbols. I had originally solved this problem by using a flag that
    overrides symbols, but the much better solution is to mark the duplicate
    symbols as weak and they can be overridden during linking. (Symbols by
    default are strong, strong symbols override weak symbols)

    I also added macos unit build to the CI, so that this doesn't silently
    break in the future again.

    ---------

    Signed-off-by: Madelyn Olson <[email protected]>
    Co-authored-by: Viktor Söderqvist <[email protected]>

commit 8ee7a58
Author: zixuan zhao <[email protected]>
Date:   Tue Oct 29 06:13:30 2024 -0400

    Document log format configs in valkey.conf (valkey-io#1233)

    Add config options for log format and timestamp format introduced by
    Related to valkey-io#1225

    This change adds two new configs into valkey.conf:
    log-format
    log-timestamp-format

    ---------

    Signed-off-by: azuredream <[email protected]>

commit c21f1dc
Author: Lipeng Zhu <[email protected]>
Date:   Mon Oct 28 13:43:23 2024 +0800

    Increase the IO_THREADS_MAX_NUM. (valkey-io#1220)

    This patch try to increase the max number of io-threads from 16(128) to
    256 for below reasons:

    1. The core number increases a lot in the modern server processors, for
    example, the [Sierra
    Forest](https://en.wikipedia.org/wiki/Sierra_Forest) processors are
    targeted towards with up to **288** cores.
    Due to limitation of **_io-threads_** number (16 and 128 ), benchmark
    like https://openbenchmarking.org/test/pts/valkey even cannot run on a
    high core count server.

    2. For some workloads, the bottleneck could be main thread, but for the
    other workloads, big key/value which caused heavy io, the bottleneck
    could be the io-threads, for example benchmark `memtier_benchmark -s
    127.0.0.1 -p 9001 "--data-size" "20000" --ratio 1:0 --key-pattern P:P
    --key-minimum=1 --key-maximum 1000000 --test-time 180 -c 50 -t 16
    --hide-histogram`. The QPS is still scalable after 16 io-threads.

    ![image](https://github.com/user-attachments/assets/e980f805-a162-44be-b03e-ab37a9c489cf)
    **Fig 1. QPS Scale factor with io-threads number grows.**

    Signed-off-by: Lipeng Zhu <[email protected]>
    Co-authored-by: Wangyang Guo <[email protected]>

commit 5d2ff85
Author: Binbin <[email protected]>
Date:   Sun Oct 27 15:23:00 2024 +0800

    Fix minor repldbfd leak in updateReplicasWaitingBgsave if fstat fails (valkey-io#1226)

    In the old code, if fstat fails, replica->repldbfd will hold the
    fd and we are doing a free client. And in freeClient, we check and
    close only if repl_state == REPLICA_STATE_SEND_BULK. So if fstat
    fails, we will leak the fd.

    We can also extend freeClient to handle REPLICA_STATE_WAIT_BGSAVE_END
    as well, but here seems to be a more friendly (and safer) way.

    Signed-off-by: Binbin <[email protected]>

commit 4be09e4
Author: Shivshankar <[email protected]>
Date:   Fri Oct 25 08:03:59 2024 -0400

    Fix typo in valkey.conf file's shutdown section (valkey-io#1224)

    Found typo "exists" ==> "exits" in valkey.conf in shutdown section.

    Signed-off-by: Shivshankar-Reddy <[email protected]>

commit 9c60fcd
Author: Lipeng Zhu <[email protected]>
Date:   Fri Oct 25 17:13:28 2024 +0800

    Do security attack check only when command not found to reduce the critical path (valkey-io#1212)

    When explored the cycles distribution for main thread with io-threads
    enabled. We found this security attack check takes significant time in
    main thread, **~3%** cycles were used to do the commands security check
    in main thread.

    This patch try to completely avoid doing it in the hot path. We can do
    it only after we looked up the command and it wasn't found, just before
    we call commandCheckExistence.

    ---------

    Signed-off-by: Lipeng Zhu <[email protected]>
    Co-authored-by: Wangyang Guo <[email protected]>

commit 55bbbe0
Author: zixuan zhao <[email protected]>
Date:   Thu Oct 24 18:36:32 2024 -0400

    Configurable log and timestamp formats (logfmt, ISO8601) (valkey-io#1022)

    Add ability to configure log output format and timestamp format in the
    logs.

    This change adds two new configs:

    * `log-format`: Either legacy or logfmt (See https://brandur.org/logfmt)
    * `log-timestamp-format`: legacy, iso8601 or milliseconds (since the
    eppch).

    Related to valkey-io#1006.

    Example:

    ```
    $ ./valkey-server  /home/zhaoz12/git/valkey/valkey/valkey.conf
    pid=109463 role=RDB/AOF timestamp="2024-09-10T20:37:25.738-04:00" level=warning message="WARNING Memory overcommit must be enabled! Without it, a background save or replication may fail under low memory condition. Being disabled, it can also cause failures without low memory condition, see jemalloc/jemalloc#1328. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect."
    pid=109463 role=RDB/AOF timestamp="2024-09-10T20:37:25.738-04:00" level=notice message="oO0OoO0OoO0Oo Valkey is starting oO0OoO0OoO0Oo"
    pid=109463 role=RDB/AOF timestamp="2024-09-10T20:37:25.738-04:00" level=notice message="Valkey version=255.255.255, bits=64, commit=affbea5d, modified=1, pid=109463, just started"
    pid=109463 role=RDB/AOF timestamp="2024-09-10T20:37:25.738-04:00" level=notice message="Configuration loaded"
    pid=109463 role=master timestamp="2024-09-10T20:37:25.738-04:00" level=notice message="monotonic clock: POSIX clock_gettime"
    pid=109463 role=master timestamp="2024-09-10T20:37:25.739-04:00" level=warning message="Failed to write PID file: Permission denied"
    ```

    ---------

    Signed-off-by: azuredream <[email protected]>

commit 2956367
Author: Binbin <[email protected]>
Date:   Thu Oct 24 21:53:05 2024 +0800

    Maintain return value of rdbSaveDb after writing slot-info aux (valkey-io#1222)

    All other places written in this function are maintained it,
    although the caller of rdbSaveDb does not reply on it, it is
    maintained to be consistent with other places, is its duty.

    Signed-off-by: Binbin <[email protected]>

commit a21fe71
Author: Binbin <[email protected]>
Date:   Thu Oct 24 16:38:47 2024 +0800

    Limit CLUSTER_CANT_FAILOVER_DATA_AGE log to 10 times period (valkey-io#1189)

    If a replica is step into data_age too old stage, it can not
    trigger the failover and currently it can not be automatically
    recovered and we will print a log every
    CLUSTER_CANT_FAILOVER_RELOG_PERIOD,
    which is every second. If the primary has not recovered or there is
    no manual failover, this log will flood the log file.

    In this case, limit its frequency to 10 times period, which is
    10 seconds in our code. Also in this data_age too old stage,
    the repeated logs also can stand for the progress of the failover.

    See also valkey-io#780 for more details about it.

    Signed-off-by: Binbin <[email protected]>
    Co-authored-by: Ping Xie <[email protected]>

commit c419524
Author: muelstefamzn <[email protected]>
Date:   Wed Oct 23 16:56:32 2024 -0700

    Trim free space from inline command argument strings to avoid excess memory usage (valkey-io#1213)

    The command argument strings created while parsing inline commands (see
    `processInlineBuffer()`) can contain free capacity. Since some commands
    ,such as `SET`, store these strings in the database, that free capacity
    increases the memory usage. In the worst case, it could double the
    memory usage.

    This only occurs if the inline command format is used. The argument
    strings are built by appending character by character in
    `sdssplitargs()`. Regular RESP commands are not affected.

    This change trims the strings within `processInlineBuffer()`.
    this?

    When the command argument string is packed into an object,
    `trimStringObjectIfNeeded()` is called.

    This does only trim the string if it is larger than
    `PROTO_MBULK_BIG_ARG` (32kB), as only strings larger than this would
    ever need trimming if the command it sent using the bulk string format.

    We could modify this condition, but that would potentially have a
    performance impact on commands using the bulk format. Since those make
    up for the vast majority of executed commands, limiting this change to
    inline commands seems prudent.

    * 1 million `SET [key] [value]` commands
    * Random keys (16 bytes)
    * 600 bytes values

    Memory usage without this change:

    ```
    used_memory:1089327888
    used_memory_human:1.01G
    used_memory_rss:1131696128
    used_memory_rss_human:1.05G
    used_memory_peak:1089348264
    used_memory_peak_human:1.01G
    used_memory_peak_perc:100.00%
    used_memory_overhead:49302800
    used_memory_startup:911808
    used_memory_dataset:1040025088
    used_memory_dataset_perc:95.55%
    ```

    Memory usage with this change:
    ```
    used_memory:705327888
    used_memory_human:672.65M
    used_memory_rss:718802944
    used_memory_rss_human:685.50M
    used_memory_peak:705348256
    used_memory_peak_human:672.67M
    used_memory_peak_perc:100.00%
    used_memory_overhead:49302800
    used_memory_startup:911808
    used_memory_dataset:656025088
    used_memory_dataset_perc:93.13%
    ```

    If the same experiment is repeated using the normal RESP array of bulk
    string format (`*3\r\n$3\r\nSET\r\n...`) then the memory usage is 672MB
    with and without of this change.

    If a replica is attached, its memory usage is 672MB with and without
    this change, since the replication link never uses inline commands.

    Signed-off-by: Stefan Mueller <[email protected]>

commit c176de4
Author: danish-mehmood <[email protected]>
Date:   Thu Oct 24 02:30:42 2024 +0500

    Clarify the wording from dually to the more common doubly (valkey-io#1214)

    Clarify documentation is ziplist.c

    Signed-off-by: danish-mehmood <[email protected]>

commit b803f7a
Author: Binbin <[email protected]>
Date:   Wed Oct 23 17:11:42 2024 +0800

    Cleaned up getSlotOrReply is return -1 instead of C_ERR (valkey-io#1211)

    Minor cleanup since getSlotOrReply return -1 on error, not return C_ERR.

    Signed-off-by: Binbin <[email protected]>

commit 5d70ccd
Author: Binbin <[email protected]>
Date:   Wed Oct 23 10:22:25 2024 +0800

    Make replica CLUSTER RESET flush async based on lazyfree-lazy-user-flush (valkey-io#1190)

    Currently, if the replica has a lot of data, CLUSTER RESET
    will block for a while and report the slowlog, and it seems
    that there is no harm in making it async so external components
    can be easier when monitoring it.

    Signed-off-by: Binbin <[email protected]>
    Co-authored-by: Ping Xie <[email protected]>

commit 285064b
Author: Shivshankar <[email protected]>
Date:   Mon Oct 21 22:54:40 2024 -0400

    fix typo  (valkey-io#1202)

    Signed-off-by: Shivshankar-Reddy <[email protected]>

commit 771918e
Author: Shivshankar <[email protected]>
Date:   Mon Oct 21 16:48:29 2024 -0400

    Updating command.def by running the generate-command-code.py  (valkey-io#1203)

    Part of valkey-io#1200 PR, since feild is
    changed. Looks like commands.def is missed to get genereated based on
    the changes so that is causing CI failure on unstable.

    Signed-off-by: Shivshankar-Reddy <[email protected]>

commit 5885dc5
Author: Viktor Söderqvist <[email protected]>
Date:   Mon Oct 21 16:04:47 2024 +0200

    Fix BGSAVE CANCEL since and history fields (valkey-io#1200)

    Fixes wrong "since" and "history" introduced in valkey-io#757.

    ---------

    Signed-off-by: Viktor Söderqvist <[email protected]>

commit 29b83f1
Author: ranshid <[email protected]>
Date:   Mon Oct 21 12:56:44 2024 +0300

    Introduce bgsave cancel (valkey-io#757)

    In some cases bgsave child process can run for a long time exhausting
    system resources. Although it is possible to kill the bgsave child
    process from the system shell, sometimes it is not possible allowing OS
    level access.

    This PR adds a new subcommand to the BGSAVE command.
    When user will issue `BGSAVE CANCEL`, it will do one of the 2:

    1. In case a bgsave child process is currently running, the child
       process would be immediately killed thus terminating any
       save/replication full sync process.
    2. In case a bgsave child process is SCHEDULED to run, the scheduled
       execution will be cancelled.

    ---------

    Signed-off-by: ranshid <[email protected]>
    Signed-off-by: ranshid <[email protected]>
    Signed-off-by: Ran Shidlansik <[email protected]>
    Signed-off-by: Binbin <[email protected]>
    Co-authored-by: Binbin <[email protected]>
    Co-authored-by: Viktor Söderqvist <[email protected]>

commit 71f8c34
Author: zhenwei pi <[email protected]>
Date:   Mon Oct 21 16:11:27 2024 +0800

    RDMA: Fix listener priv opaque pointer (valkey-io#1194)

    struct connListener.priv should be used by connection type specific
    data, static local listener data should not use this.

    A RDMA config structure is going to be introduced in the next step:

    ```
    typedef struct serverRdmaContextConfig {
        char *bindaddr;
        int bindaddr_count;
        int port;
        int rx_size;
        int comp_vector;
        ...
    } serverRdmaContextConfig;
    ```

    Then a builtin RDMA will be supported.

    Signed-off-by: zhenwei pi <[email protected]>

commit 2743b7e
Author: Binbin <[email protected]>
Date:   Sat Oct 19 14:56:10 2024 +0800

    Fix SORT GET to ignore special pattern # in cluster slot check (valkey-io#1182)

    This special pattern '#' is used to get the element itself,
    it does not actually participate in the slot check.

    In this case, passing `GET #` will cause '#' to participate
    in the slot check, causing the command to get an
    `pattern may be in different slots` error.

    Signed-off-by: Binbin <[email protected]>

Signed-off-by: Jacob Murphy <[email protected]>
  • Loading branch information
murphyjacob4 committed Nov 4, 2024
1 parent 421b8f1 commit 9b8248b
Show file tree
Hide file tree
Showing 29 changed files with 542 additions and 179 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ jobs:
steps:
- uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1
- name: make
run: make -j3 SERVER_CFLAGS='-Werror'
run: make -j3 all-with-unit-tests SERVER_CFLAGS='-Werror'

build-32bit:
runs-on: ubuntu-latest
Expand Down
11 changes: 1 addition & 10 deletions src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,6 @@ ifeq ($(USE_JEMALLOC),no)
MALLOC=libc
endif

# Some unit tests compile files a second time to get access to static functions, the "--allow-multiple-definition" flag
# allows us to do that without an error, by using the first instance of function. This behavior can also be used
# to tweak behavior of code just for unit tests. The version of ld on MacOS apparently always does this.
ifneq ($(uname_S),Darwin)
ALLOW_DUPLICATE_FLAG=-Wl,--allow-multiple-definition
else
ALLOW_DUPLICATE_FLAG=
endif

ifdef SANITIZER
ifeq ($(SANITIZER),address)
MALLOC=libc
Expand Down Expand Up @@ -494,7 +485,7 @@ $(ENGINE_LIB_NAME): $(ENGINE_SERVER_OBJ)

# valkey-unit-tests
$(ENGINE_UNIT_TESTS): $(ENGINE_TEST_OBJ) $(ENGINE_LIB_NAME)
$(SERVER_LD) $(ALLOW_DUPLICATE_FLAG) -o $@ $^ ../deps/hiredis/libhiredis.a ../deps/lua/src/liblua.a ../deps/hdr_histogram/libhdrhistogram.a ../deps/fpconv/libfpconv.a $(FINAL_LIBS)
$(SERVER_LD) -o $@ $^ ../deps/hiredis/libhiredis.a ../deps/lua/src/liblua.a ../deps/hdr_histogram/libhdrhistogram.a ../deps/fpconv/libfpconv.a $(FINAL_LIBS)

# valkey-sentinel
$(ENGINE_SENTINEL_NAME): $(SERVER_NAME)
Expand Down
2 changes: 1 addition & 1 deletion src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ int clusterNodeIsFailing(clusterNode *node);
int clusterNodeIsNoFailover(clusterNode *node);
char *clusterNodeGetShardId(clusterNode *node);
int clusterNodeNumReplicas(clusterNode *node);
clusterNode *clusterNodeGetReplica(clusterNode *node, int slave_idx);
clusterNode *clusterNodeGetReplica(clusterNode *node, int replica_idx);
clusterNode *getMigratingSlotDest(int slot);
clusterNode *getImportingSlotSource(int slot);
clusterNode *getNodeBySlot(int slot);
Expand Down
28 changes: 19 additions & 9 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -1237,7 +1237,7 @@ void clusterReset(int hard) {
if (nodeIsReplica(myself)) {
clusterSetNodeAsPrimary(myself);
replicationUnsetPrimary();
emptyData(-1, EMPTYDB_NO_FLAGS, NULL);
emptyData(-1, server.lazyfree_lazy_user_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS, NULL);
}

/* Close slots, reset manual failover state. */
Expand Down Expand Up @@ -2995,7 +2995,7 @@ int clusterIsValidPacket(clusterLink *link) {
return 0;
}

if (type == server.cluster_drop_packet_filter) {
if (type == server.cluster_drop_packet_filter || server.cluster_drop_packet_filter == -2) {
serverLog(LL_WARNING, "Dropping packet that matches debug drop filter");
return 0;
}
Expand Down Expand Up @@ -3084,7 +3084,8 @@ int clusterProcessPacket(clusterLink *link) {
if (!clusterIsValidPacket(link)) {
clusterMsg *hdr = (clusterMsg *)link->rcvbuf;
uint16_t type = ntohs(hdr->type);
if (server.debug_cluster_close_link_on_packet_drop && type == server.cluster_drop_packet_filter) {
if (server.debug_cluster_close_link_on_packet_drop &&
(type == server.cluster_drop_packet_filter || server.cluster_drop_packet_filter == -2)) {
freeClusterLink(link);
serverLog(LL_WARNING, "Closing link for matching packet type %hu", type);
return 0;
Expand Down Expand Up @@ -4708,11 +4709,18 @@ int clusterGetReplicaRank(void) {
void clusterLogCantFailover(int reason) {
char *msg;
static time_t lastlog_time = 0;
time_t now = time(NULL);

/* Don't log if we have the same reason for some time. */
if (reason == server.cluster->cant_failover_reason &&
time(NULL) - lastlog_time < CLUSTER_CANT_FAILOVER_RELOG_PERIOD)
/* General logging suppression if the same reason has occurred recently. */
if (reason == server.cluster->cant_failover_reason && now - lastlog_time < CLUSTER_CANT_FAILOVER_RELOG_PERIOD) {
return;
}

/* Special case: If the failure reason is due to data age, log 10 times less frequently. */
if (reason == server.cluster->cant_failover_reason && reason == CLUSTER_CANT_FAILOVER_DATA_AGE &&
now - lastlog_time < 10 * CLUSTER_CANT_FAILOVER_RELOG_PERIOD) {
return;
}

server.cluster->cant_failover_reason = reason;

Expand Down Expand Up @@ -6090,6 +6098,8 @@ const char *clusterGetMessageTypeString(int type) {
return "unknown";
}

/* Get the slot from robj and return it. If the slot is not valid,
* return -1 and send an error to the client. */
int getSlotOrReply(client *c, robj *o) {
long long slot;

Expand Down Expand Up @@ -6815,7 +6825,7 @@ int clusterCommandSpecial(client *c) {
memset(slots, 0, CLUSTER_SLOTS);
/* Check that all the arguments are parseable.*/
for (j = 2; j < c->argc; j++) {
if ((slot = getSlotOrReply(c, c->argv[j])) == C_ERR) {
if ((slot = getSlotOrReply(c, c->argv[j])) == -1) {
zfree(slots);
return 1;
}
Expand Down Expand Up @@ -6848,11 +6858,11 @@ int clusterCommandSpecial(client *c) {
/* Check that all the arguments are parseable and that all the
* slots are not already busy. */
for (j = 2; j < c->argc; j += 2) {
if ((startslot = getSlotOrReply(c, c->argv[j])) == C_ERR) {
if ((startslot = getSlotOrReply(c, c->argv[j])) == -1) {
zfree(slots);
return 1;
}
if ((endslot = getSlotOrReply(c, c->argv[j + 1])) == C_ERR) {
if ((endslot = getSlotOrReply(c, c->argv[j + 1])) == -1) {
zfree(slots);
return 1;
}
Expand Down
4 changes: 2 additions & 2 deletions src/cluster_slot_stats.c
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,8 @@ void clusterSlotStatsCommand(client *c) {
if (c->argc == 5 && !strcasecmp(c->argv[2]->ptr, "slotsrange")) {
/* CLUSTER SLOT-STATS SLOTSRANGE start-slot end-slot */
int startslot, endslot;
if ((startslot = getSlotOrReply(c, c->argv[3])) == C_ERR ||
(endslot = getSlotOrReply(c, c->argv[4])) == C_ERR) {
if ((startslot = getSlotOrReply(c, c->argv[3])) == -1 ||
(endslot = getSlotOrReply(c, c->argv[4])) == -1) {
return;
}
if (startslot > endslot) {
Expand Down
11 changes: 9 additions & 2 deletions src/commands.def
Original file line number Diff line number Diff line change
Expand Up @@ -6428,6 +6428,7 @@ struct COMMAND_STRUCT ACL_Subcommands[] = {
/* BGSAVE history */
commandHistory BGSAVE_History[] = {
{"3.2.2","Added the `SCHEDULE` option."},
{"8.1.0","Added the `CANCEL` option."},
};
#endif

Expand All @@ -6441,9 +6442,15 @@ commandHistory BGSAVE_History[] = {
#define BGSAVE_Keyspecs NULL
#endif

/* BGSAVE operation argument table */
struct COMMAND_ARG BGSAVE_operation_Subargs[] = {
{MAKE_ARG("schedule",ARG_TYPE_PURE_TOKEN,-1,"SCHEDULE",NULL,"3.2.2",CMD_ARG_NONE,0,NULL)},
{MAKE_ARG("cancel",ARG_TYPE_PURE_TOKEN,-1,"CANCEL",NULL,"8.1.0",CMD_ARG_NONE,0,NULL)},
};

/* BGSAVE argument table */
struct COMMAND_ARG BGSAVE_Args[] = {
{MAKE_ARG("schedule",ARG_TYPE_PURE_TOKEN,-1,"SCHEDULE",NULL,"3.2.2",CMD_ARG_OPTIONAL,0,NULL)},
{MAKE_ARG("operation",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_OPTIONAL,2,NULL),.subargs=BGSAVE_operation_Subargs},
};

/********** COMMAND COUNT ********************/
Expand Down Expand Up @@ -11009,7 +11016,7 @@ struct COMMAND_STRUCT serverCommandTable[] = {
/* server */
{MAKE_CMD("acl","A container for Access List Control commands.","Depends on subcommand.","6.0.0",CMD_DOC_NONE,NULL,NULL,"server",COMMAND_GROUP_SERVER,ACL_History,0,ACL_Tips,0,NULL,-2,CMD_SENTINEL,0,ACL_Keyspecs,0,NULL,0),.subcommands=ACL_Subcommands},
{MAKE_CMD("bgrewriteaof","Asynchronously rewrites the append-only file to disk.","O(1)","1.0.0",CMD_DOC_NONE,NULL,NULL,"server",COMMAND_GROUP_SERVER,BGREWRITEAOF_History,0,BGREWRITEAOF_Tips,0,bgrewriteaofCommand,1,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_NOSCRIPT,0,BGREWRITEAOF_Keyspecs,0,NULL,0)},
{MAKE_CMD("bgsave","Asynchronously saves the database(s) to disk.","O(1)","1.0.0",CMD_DOC_NONE,NULL,NULL,"server",COMMAND_GROUP_SERVER,BGSAVE_History,1,BGSAVE_Tips,0,bgsaveCommand,-1,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_NOSCRIPT,0,BGSAVE_Keyspecs,0,NULL,1),.args=BGSAVE_Args},
{MAKE_CMD("bgsave","Asynchronously saves the database(s) to disk.","O(1)","1.0.0",CMD_DOC_NONE,NULL,NULL,"server",COMMAND_GROUP_SERVER,BGSAVE_History,2,BGSAVE_Tips,0,bgsaveCommand,-1,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_NOSCRIPT,0,BGSAVE_Keyspecs,0,NULL,1),.args=BGSAVE_Args},
{MAKE_CMD("command","Returns detailed information about all commands.","O(N) where N is the total number of commands","2.8.13",CMD_DOC_NONE,NULL,NULL,"server",COMMAND_GROUP_SERVER,COMMAND_History,0,COMMAND_Tips,1,commandCommand,-1,CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,COMMAND_Keyspecs,0,NULL,0),.subcommands=COMMAND_Subcommands},
{MAKE_CMD("config","A container for server configuration commands.","Depends on subcommand.","2.0.0",CMD_DOC_NONE,NULL,NULL,"server",COMMAND_GROUP_SERVER,CONFIG_History,0,CONFIG_Tips,0,NULL,-2,0,0,CONFIG_Keyspecs,0,NULL,0),.subcommands=CONFIG_Subcommands},
{MAKE_CMD("dbsize","Returns the number of keys in the database.","O(1)","1.0.0",CMD_DOC_NONE,NULL,NULL,"server",COMMAND_GROUP_SERVER,DBSIZE_History,0,DBSIZE_Tips,2,dbsizeCommand,1,CMD_READONLY|CMD_FAST,ACL_CATEGORY_KEYSPACE,DBSIZE_Keyspecs,0,NULL,0)},
Expand Down
30 changes: 26 additions & 4 deletions src/commands/bgsave.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@
[
"3.2.2",
"Added the `SCHEDULE` option."
],
[
"8.1.0",
"Added the `CANCEL` option."
]
],
"command_flags": [
Expand All @@ -19,11 +23,23 @@
],
"arguments": [
{
"name": "schedule",
"token": "SCHEDULE",
"type": "pure-token",
"name": "operation",
"type": "oneof",
"optional": true,
"since": "3.2.2"
"arguments": [
{
"name": "schedule",
"token": "SCHEDULE",
"type": "pure-token",
"since": "3.2.2"
},
{
"name": "cancel",
"token": "CANCEL",
"type": "pure-token",
"since": "8.1.0"
}
]
}
],
"reply_schema": {
Expand All @@ -33,6 +49,12 @@
},
{
"const": "Background saving scheduled"
},
{
"const": "Background saving cancelled"
},
{
"const": "Scheduled background saving cancelled"
}
]
}
Expand Down
16 changes: 11 additions & 5 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,13 @@ configEnum propagation_error_behavior_enum[] = {
{"panic-on-replicas", PROPAGATION_ERR_BEHAVIOR_PANIC_ON_REPLICAS},
{NULL, 0}};

configEnum log_format_enum[] = {{"legacy", LOG_FORMAT_LEGACY}, {"logfmt", LOG_FORMAT_LOGFMT}, {NULL, 0}};

configEnum log_timestamp_format_enum[] = {{"legacy", LOG_TIMESTAMP_LEGACY},
{"iso8601", LOG_TIMESTAMP_ISO8601},
{"milliseconds", LOG_TIMESTAMP_MILLISECONDS},
{NULL, 0}};

/* Output buffer limits presets. */
clientBufferLimitsConfig clientBufferLimitsDefaults[CLIENT_TYPE_OBUF_COUNT] = {
{0, 0, 0}, /* normal */
Expand Down Expand Up @@ -622,9 +629,6 @@ void loadServerConfigFromString(char *config) {
if (server.config_hz < CONFIG_MIN_HZ) server.config_hz = CONFIG_MIN_HZ;
if (server.config_hz > CONFIG_MAX_HZ) server.config_hz = CONFIG_MAX_HZ;

/* To ensure backward compatibility when io_threads_num is according to the previous maximum of 128. */
if (server.io_threads_num > IO_THREADS_MAX_NUM) server.io_threads_num = IO_THREADS_MAX_NUM;

sdsfreesplitres(lines, totlines);
reading_config_file = 0;
return;
Expand Down Expand Up @@ -3192,11 +3196,13 @@ standardConfig static_configs[] = {
createEnumConfig("propagation-error-behavior", NULL, MODIFIABLE_CONFIG, propagation_error_behavior_enum, server.propagation_error_behavior, PROPAGATION_ERR_BEHAVIOR_IGNORE, NULL, NULL),
createEnumConfig("shutdown-on-sigint", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, shutdown_on_sig_enum, server.shutdown_on_sigint, 0, isValidShutdownOnSigFlags, NULL),
createEnumConfig("shutdown-on-sigterm", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, shutdown_on_sig_enum, server.shutdown_on_sigterm, 0, isValidShutdownOnSigFlags, NULL),
createEnumConfig("log-format", NULL, MODIFIABLE_CONFIG, log_format_enum, server.log_format, LOG_FORMAT_LEGACY, NULL, NULL),
createEnumConfig("log-timestamp-format", NULL, MODIFIABLE_CONFIG, log_timestamp_format_enum, server.log_timestamp_format, LOG_TIMESTAMP_LEGACY, NULL, NULL),

/* Integer configs */
createIntConfig("databases", NULL, IMMUTABLE_CONFIG, 1, INT_MAX, server.dbnum, 16, INTEGER_CONFIG, NULL, NULL),
createIntConfig("port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.port, 6379, INTEGER_CONFIG, NULL, updatePort), /* TCP port. */
createIntConfig("io-threads", NULL, DEBUG_CONFIG | IMMUTABLE_CONFIG, 1, 128, server.io_threads_num, 1, INTEGER_CONFIG, NULL, NULL), /* Single threaded by default */
createIntConfig("port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.port, 6379, INTEGER_CONFIG, NULL, updatePort), /* TCP port. */
createIntConfig("io-threads", NULL, DEBUG_CONFIG | IMMUTABLE_CONFIG, 1, IO_THREADS_MAX_NUM, server.io_threads_num, 1, INTEGER_CONFIG, NULL, NULL), /* Single threaded by default */
createIntConfig("events-per-io-thread", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.events_per_io_thread, 2, INTEGER_CONFIG, NULL, NULL),
createIntConfig("prefetch-batch-max-size", NULL, MODIFIABLE_CONFIG, 0, 128, server.prefetch_batch_max_size, 16, INTEGER_CONFIG, NULL, NULL),
createIntConfig("auto-aof-rewrite-percentage", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.aof_rewrite_perc, 100, INTEGER_CONFIG, NULL, NULL),
Expand Down
2 changes: 1 addition & 1 deletion src/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ void setcpuaffinity(const char *cpulist);
#define HAVE_FADVISE
#endif

#define IO_THREADS_MAX_NUM 16
#define IO_THREADS_MAX_NUM 256

#ifndef CACHE_LINE_SIZE
#if defined(__aarch64__) && defined(__APPLE__)
Expand Down
4 changes: 2 additions & 2 deletions src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -2418,9 +2418,9 @@ int bzmpopGetKeys(struct serverCommand *cmd, robj **argv, int argc, getKeysResul

/* Helper function to extract keys from the SORT RO command.
*
* SORT <sort-key>
* SORT_RO <sort-key>
*
* The second argument of SORT is always a key, however an arbitrary number of
* The second argument of SORT_RO is always a key, however an arbitrary number of
* keys may be accessed while doing the sort (the BY and GET args), so the
* key-spec declares incomplete keys which is why we have to provide a concrete
* implementation to fetch the keys.
Expand Down
4 changes: 2 additions & 2 deletions src/debug.c
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ void debugCommand(client *c) {
" Some fields of the default behavior may be time consuming to fetch,",
" and `fast` can be passed to avoid fetching them.",
"DROP-CLUSTER-PACKET-FILTER <packet-type>",
" Drop all packets that match the filtered type. Set to -1 allow all packets.",
" Drop all packets that match the filtered type. Set to -1 allow all packets or -2 to drop all packets.",
"CLOSE-CLUSTER-LINK-ON-PACKET-DROP <0|1>",
" This is valid only when DROP-CLUSTER-PACKET-FILTER is set to a valid packet type.",
" When set to 1, the cluster link is closed after dropping a packet based on the filter.",
Expand Down Expand Up @@ -1023,7 +1023,7 @@ void debugCommand(client *c) {

/* =========================== Crash handling ============================== */

__attribute__((noinline)) void _serverAssert(const char *estr, const char *file, int line) {
__attribute__((noinline, weak)) void _serverAssert(const char *estr, const char *file, int line) {
int new_report = bugReportStart();
serverLog(LL_WARNING, "=== %sASSERTION FAILED ===", new_report ? "" : "RECURSIVE ");
serverLog(LL_WARNING, "==> %s:%d '%s' is not true", file, line, estr);
Expand Down
2 changes: 1 addition & 1 deletion src/io_threads.c
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ void initIOThreads(void) {

int trySendReadToIOThreads(client *c) {
if (server.active_io_threads_num <= 1) return C_ERR;
/* If IO thread is areadty reading, return C_OK to make sure the main thread will not handle it. */
/* If IO thread is already reading, return C_OK to make sure the main thread will not handle it. */
if (c->io_read_state != CLIENT_IDLE) return C_OK;
/* Currently, replica/master writes are not offloaded and are processed synchronously. */
if (c->flag.primary || getClientType(c) == CLIENT_TYPE_REPLICA) return C_ERR;
Expand Down
4 changes: 2 additions & 2 deletions src/kvstore.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ struct _kvstore {
int flags;
dictType *dtype;
dict **dicts;
long long num_dicts;
long long num_dicts_bits;
int num_dicts;
int num_dicts_bits;
list *rehashing; /* List of dictionaries in this kvstore that are currently rehashing. */
int resize_cursor; /* Cron job uses this cursor to gradually resize dictionaries (only used if num_dicts > 1). */
int allocated_dicts; /* The number of allocated dicts. */
Expand Down
9 changes: 7 additions & 2 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -889,8 +889,11 @@ void setDeferredAggregateLen(client *c, void *node, long length, char prefix) {
}

char lenstr[128];
size_t lenstr_len = snprintf(lenstr, sizeof(lenstr), "%c%ld\r\n", prefix, length);
setDeferredReply(c, node, lenstr, lenstr_len);
lenstr[0] = prefix;
size_t lenstr_len = ll2string(lenstr + 1, sizeof(lenstr) - 1, length);
lenstr[lenstr_len + 1] = '\r';
lenstr[lenstr_len + 2] = '\n';
setDeferredReply(c, node, lenstr, lenstr_len + 3);
}

void setDeferredArrayLen(client *c, void *node, long length) {
Expand Down Expand Up @@ -2682,6 +2685,8 @@ void processInlineBuffer(client *c) {

/* Create an Object for all arguments. */
for (c->argc = 0, j = 0; j < argc; j++) {
/* Strings returned from sdssplitargs() may have unused capacity that we can trim. */
argv[j] = sdsRemoveFreeSpace(argv[j], 1);
c->argv[c->argc] = createObject(OBJ_STRING, argv[j]);
c->argc++;
c->argv_len_sum += sdslen(argv[j]);
Expand Down
21 changes: 21 additions & 0 deletions src/rdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -1359,6 +1359,7 @@ ssize_t rdbSaveDb(rio *rdb, int dbid, int rdbflags, long *key_counter, int slot_
sdsfree(slot_info);
goto werr;
}
written += res;
last_slot = curr_slot;
sdsfree(slot_info);
}
Expand Down Expand Up @@ -3695,6 +3696,21 @@ void bgsaveCommand(client *c) {
if (c->argc > 1) {
if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr, "schedule")) {
schedule = 1;
} else if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr, "cancel")) {
/* Terminates an in progress BGSAVE */
if (server.child_type == CHILD_TYPE_RDB) {
/* There is an ongoing bgsave */
serverLog(LL_NOTICE, "Background saving will be aborted due to user request");
killRDBChild();
addReplyStatus(c, "Background saving cancelled");
} else if (server.rdb_bgsave_scheduled == 1) {
serverLog(LL_NOTICE, "Scheduled background saving will be cancelled due to user request");
server.rdb_bgsave_scheduled = 0;
addReplyStatus(c, "Scheduled background saving cancelled");
} else {
addReplyError(c, "Background saving is currently not in progress or scheduled");
}
return;
} else {
addReplyErrorObject(c, shared.syntaxerr);
return;
Expand All @@ -3709,6 +3725,11 @@ void bgsaveCommand(client *c) {
} else if (hasActiveChildProcess() || server.in_exec) {
if (schedule || server.in_exec) {
server.rdb_bgsave_scheduled = 1;
if (schedule) {
serverLog(LL_NOTICE, "Background saving scheduled due to user request");
} else {
serverLog(LL_NOTICE, "Background saving scheduled to run after transaction execution");
}
addReplyStatus(c, "Background saving scheduled");
} else {
addReplyError(c, "Another child process is active (AOF?): can't BGSAVE right now. "
Expand Down
Loading

0 comments on commit 9b8248b

Please sign in to comment.