Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support all X* (streams) commands #675

Open
wants to merge 4 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 33 additions & 2 deletions notes/redis.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
+-------------------+------------+---------------------------------------------------------------------------------------------------------------------+
| EXPIREAT | Yes | EXPIREAT key timestamp |
+-------------------+------------+---------------------------------------------------------------------------------------------------------------------+
| KEYS | Yes* | KEYS pattern |
| KEYS | Yes* | KEYS pattern |
+-------------------+------------+---------------------------------------------------------------------------------------------------------------------+
| MIGRATE | No | MIGRATE host port key destination-db timeout |
+-------------------+------------+---------------------------------------------------------------------------------------------------------------------+
Expand Down Expand Up @@ -267,6 +267,7 @@
** SSCAN scans only sorted sets in the local node.

## HyperLogLog

+-------------------+------------+---------------------------------------------------------------------------------------------------------------------+
| Command | Supported? | Format |
+-------------------+------------+---------------------------------------------------------------------------------------------------------------------+
Expand All @@ -277,6 +278,37 @@
| PFMERGE | No | PFMERGE destkey sourcekey [sourcekey ...] |
+-------------------+------------+---------------------------------------------------------------------------------------------------------------------+

### Streams

+-------------------+------------+---------------------------------------------------------------------------------------------------------------------+
| Command | Supported? | Format |
+-------------------+------------+---------------------------------------------------------------------------------------------------------------------+
| XACK | Yes | XACK key group ID [ID ...] |
+-------------------+------------+---------------------------------------------------------------------------------------------------------------------+
| XADD | Yes | XADD key ID field string [field string ...] |
+-------------------+------------+---------------------------------------------------------------------------------------------------------------------+
| XCLAIM | Yes | XCLAIM key group consumer min-idle-time ID [ID ...] [IDLE ms] [TIME ms-unix-time] [RETRYCOUNT count] [force] [id] |
+-------------------+------------+---------------------------------------------------------------------------------------------------------------------+
| XDEL | Yes | XDEL key ID [ID ...] |
+-------------------+------------+---------------------------------------------------------------------------------------------------------------------+
| XGROUP | Yes | XGROUP [CREATE key groupname id-or-$] [SETID key id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname cname] |
+-------------------+------------+---------------------------------------------------------------------------------------------------------------------+
| XINFO | Yes | XINFO [CONSUMERS key groupname] [GROUPS key] [STREAM key] [HELP] |
+-------------------+------------+---------------------------------------------------------------------------------------------------------------------+
| XLEN | Yes | XLEN key |
+-------------------+------------+---------------------------------------------------------------------------------------------------------------------+
| XPENDING | Yes | XPENDING key group [start end count] [consumer] |
+-------------------+------------+---------------------------------------------------------------------------------------------------------------------+
| XRANGE | Yes | XRANGE key start end [COUNT count] |
+-------------------+------------+---------------------------------------------------------------------------------------------------------------------+
| XREAD | Yes | XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...] |
+-------------------+------------+---------------------------------------------------------------------------------------------------------------------+
| XREADGROUP | No | XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...] |
+-------------------+------------+---------------------------------------------------------------------------------------------------------------------+
| XREVRANGE | Yes | XREVRANGE key end start [COUNT count] |
+-------------------+------------+---------------------------------------------------------------------------------------------------------------------+
| XTRIM | Yes | XTRIM key MAXLEN [~] count |
+-------------------+------------+---------------------------------------------------------------------------------------------------------------------+

### Pub/Sub

Expand Down Expand Up @@ -402,7 +434,6 @@

* INFO reads only the local node.


## Note

- redis commands are not case sensitive
Expand Down
3 changes: 3 additions & 0 deletions src/dyn_message.c
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,8 @@ static struct msg *_msg_get(struct conn *conn, bool request,
msg->ntokens = 0;
msg->rntokens = 0;
msg->nkeys = 0;
memset(msg->stack, 0, sizeof(msg->stack));
msg->nested_depth = 0;
msg->rlen = 0;
msg->integer = 0;

Expand All @@ -390,6 +392,7 @@ static struct msg *_msg_get(struct conn *conn, bool request,
msg->swallow = 0;
msg->dnode_header_prepended = 0;
msg->rsp_sent = 0;
msg->require_subcommand = 0;

// dynomite
msg->is_read = 1;
Expand Down
45 changes: 38 additions & 7 deletions src/dyn_message.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@

#define MAX_ALLOWABLE_PROCESSED_MSGS 500

#define MAX_BULK_REPLY_DEPTH 6

typedef void (*func_msg_parse_t)(struct msg *, const struct string *hash_tag);
typedef rstatus_t (*func_msg_fragment_t)(struct msg *, struct server_pool *,
struct rack *, struct msg_tqh *);
Expand Down Expand Up @@ -152,6 +154,7 @@ typedef enum msg_parse_result {
ACTION(REQ_REDIS_HSTRLEN) \
ACTION(REQ_REDIS_KEYS) \
ACTION(REQ_REDIS_INFO) \
ACTION(REQ_REDIS_DBSIZE) \
ACTION(REQ_REDIS_LINDEX) /* redis requests - lists */ \
ACTION(REQ_REDIS_LINSERT) \
ACTION(REQ_REDIS_LLEN) \
Expand Down Expand Up @@ -224,6 +227,30 @@ typedef enum msg_parse_result {
ACTION(REQ_REDIS_JSONARRLEN) \
ACTION(REQ_REDIS_JSONOBJKEYS) \
ACTION(REQ_REDIS_JSONOBJLEN) \
ACTION(REQ_REDIS_XACK) /* redis requests - streams */ \
ACTION(REQ_REDIS_XADD) \
ACTION(REQ_REDIS_XCLAIM) \
ACTION(REQ_REDIS_XDEL) \
ACTION(REQ_REDIS_XGROUP) \
ACTION(REQ_REDIS_XGROUP_CREATE) \
ACTION(REQ_REDIS_XGROUP_DELCONSUMER) \
ACTION(REQ_REDIS_XGROUP_DESTROY) \
ACTION(REQ_REDIS_XGROUP_HELP) \
ACTION(REQ_REDIS_XGROUP_SETID) \
ACTION(REQ_REDIS_XINFO) \
ACTION(REQ_REDIS_XINFO_CONSUMERS) \
ACTION(REQ_REDIS_XINFO_GROUPS) \
ACTION(REQ_REDIS_XINFO_HELP) \
ACTION(REQ_REDIS_XINFO_STREAM) \
ACTION(REQ_REDIS_XLEN) \
ACTION(REQ_REDIS_XPENDING) \
ACTION(REQ_REDIS_XRANGE) \
ACTION(REQ_REDIS_XREAD) \
ACTION(REQ_REDIS_XREAD_STREAMS) \
ACTION(REQ_REDIS_XREADGROUP) \
ACTION(REQ_REDIS_XREADGROUP_GROUP) \
ACTION(REQ_REDIS_XREVRANGE) \
ACTION(REQ_REDIS_XTRIM) \
/* ACTION(REQ_REDIS_AUTH) */ \
/* ACTION(REQ_REDIS_SELECT)*/ /* only during init */ \
ACTION(REQ_REDIS_PFADD) /* redis requests - hyperloglog */ \
Expand Down Expand Up @@ -253,6 +280,7 @@ typedef enum msg_parse_result {
ACTION(REQ_REDIS_SCRIPT_EXISTS) \
ACTION(REQ_REDIS_SCRIPT_FLUSH) \
ACTION(REQ_REDIS_SCRIPT_KILL) \
ACTION(REQ_REDIS_SCRIPT_HELP) \
/* ACTION( REQ_REDIS_AUTH) */ \
/* ACTION( REQ_REDIS_SELECT)*/ /* only during init */

Expand Down Expand Up @@ -423,13 +451,15 @@ struct msg {
uint32_t vlen; /* value length (memcache) */
uint8_t *end; /* end marker (memcache) */

uint8_t *ntoken_start; /* ntoken start (redis) */
uint8_t *ntoken_end; /* ntoken end (redis) */
uint32_t ntokens; /* # tokens (redis) */
uint32_t nkeys; /* # keys in script (redis EVAL/EVALSHA) */
uint32_t rntokens; /* running # tokens used by parsing fsa (redis) */
uint32_t rlen; /* running length in parsing fsa (redis) */
uint32_t integer; /* integer reply value (redis) */
uint8_t *ntoken_start; /* ntoken start (redis) */
uint8_t *ntoken_end; /* ntoken end (redis) */
uint32_t ntokens; /* # tokens (redis) */
uint32_t nkeys; /* # keys in script (redis EVAL/EVALSHA) or parameters for XREAD */
uint32_t rntokens; /* running # tokens used by parsing fsa (redis) */
uint32_t rlen; /* running length in parsing fsa (redis) */
uint32_t stack[MAX_BULK_REPLY_DEPTH]; /* stack to save rntokens of nesting multibulks */
uint8_t nested_depth; /* the depth of the current nested multibulk */
uint32_t integer; /* integer reply value (redis) */

struct msg *frag_owner; /* owner of fragment message */
uint32_t nfrag; /* # fragment */
Expand All @@ -438,6 +468,7 @@ struct msg {
struct msg *
*frag_seq; /* sequence of fragment message, map from keys to fragments*/

msg_type_t require_subcommand; /* command with sub-cmd like SCRIPT LOAD */
err_t error_code; /* errno on error? */
unsigned is_error : 1; /* error? */
unsigned is_ferror : 1; /* one or more fragments are in error? */
Expand Down
Loading