From 251d86f5ccf2cc0eb7b888a137385c76e7cf0c8f Mon Sep 17 00:00:00 2001 From: kukey Date: Wed, 22 Sep 2021 20:52:52 +0800 Subject: [PATCH 1/8] add proxy monitor --- src/Makefile.am | 4 +- src/nc_client.c | 6 ++ src/nc_connection.c | 1 + src/nc_connection.h | 1 + src/nc_message.c | 26 ++++++++ src/nc_message.h | 4 ++ src/nc_monitor.c | 148 +++++++++++++++++++++++++++++++++++++++++++ src/nc_monitor.h | 33 ++++++++++ src/nc_rbtree.c | 41 ++++++++++++ src/nc_rbtree.h | 4 ++ src/nc_request.c | 15 +++++ src/nc_string.c | 101 ++++++++++++++++++++++++++++- src/nc_string.h | 12 +++- src/proto/nc_redis.c | 8 +++ 14 files changed, 398 insertions(+), 6 deletions(-) create mode 100644 src/nc_monitor.c create mode 100644 src/nc_monitor.h diff --git a/src/Makefile.am b/src/Makefile.am index dd07a25a..2e8718f6 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -53,6 +53,7 @@ nutcracker_SOURCES = \ nc_array.c nc_array.h \ nc_util.c nc_util.h \ nc_queue.h \ + nc_monitor.c nc_monitor.h \ nc.c nutcracker_LDADD = $(top_builddir)/src/hashkit/libhashkit.a @@ -81,7 +82,8 @@ test_all_SOURCES = test_all.c \ nc_string.c nc_string.h \ nc_array.c nc_array.h \ nc_util.c nc_util.h \ - nc_queue.h + nc_queue.h \ + nc_monitor.c nc_monitor.h test_all_LDADD = $(top_builddir)/src/hashkit/libhashkit.a test_all_LDADD += $(top_builddir)/src/proto/libproto.a diff --git a/src/nc_client.c b/src/nc_client.c index a3cecaac..b2504d30 100644 --- a/src/nc_client.c +++ b/src/nc_client.c @@ -18,6 +18,7 @@ #include #include #include +#include void client_ref(struct conn *conn, void *owner) @@ -129,6 +130,11 @@ client_close(struct context *ctx, struct conn *conn) client_close_stats(ctx, conn->owner, conn->err, conn->eof); + /* when client close, if conn in monitor, delete it */ + if (conn->monitor_client) { + del_from_monitor(conn); + } + if (conn->sd < 0) { conn->unref(conn); conn_put(conn); diff --git a/src/nc_connection.c b/src/nc_connection.c index d37ac8bb..9c20a515 100644 --- a/src/nc_connection.c +++ b/src/nc_connection.c @@ -157,6 +157,7 @@ _conn_get(void) conn->done = 0; conn->redis = 0; conn->authenticated = 0; + conn->monitor_client = 0; ntotal_conn++; ncurr_conn++; diff --git a/src/nc_connection.h b/src/nc_connection.h index 2fe83d42..caf06257 100644 --- a/src/nc_connection.h +++ b/src/nc_connection.h @@ -89,6 +89,7 @@ struct conn { unsigned done:1; /* done? aka close? */ unsigned redis:1; /* redis? */ unsigned authenticated:1; /* authenticated? */ + unsigned monitor_client:1;/* monitor client? */ }; TAILQ_HEAD(conn_tqh, conn); diff --git a/src/nc_message.c b/src/nc_message.c index cf53ed46..b3fb5782 100644 --- a/src/nc_message.c +++ b/src/nc_message.c @@ -273,6 +273,7 @@ _msg_get(void) msg->fdone = 0; msg->swallow = 0; msg->redis = 0; + msg->monitor = 0; return msg; } @@ -910,3 +911,28 @@ bool msg_set_placeholder_key(struct msg *r) return true; } +rstatus_t +msg_append_full(struct msg *msg, uint8_t *pos, size_t n) +{ + struct mbuf *mbuf = NULL; + size_t cidx = 0; + size_t mbsize = 0; + size_t clen = 0; + + do { + mbuf = msg_ensure_mbuf(msg, n); + if (mbuf == NULL) { + return NC_ENOMEM; + } + + mbsize = mbuf_size(mbuf); + + clen = n > mbsize ? mbsize : n; + mbuf_copy(mbuf, pos+cidx, clen); + cidx += clen; + msg->mlen += (uint32_t)clen; + n -= clen; + } while(n); + + return NC_OK; +} diff --git a/src/nc_message.h b/src/nc_message.h index 26a063bc..d5dba6b2 100644 --- a/src/nc_message.h +++ b/src/nc_message.h @@ -202,6 +202,7 @@ typedef enum msg_parse_result { ACTION( REQ_REDIS_SELECT) /* only during init */ \ ACTION( REQ_REDIS_COMMAND) /* Sent to random server for redis-cli completions*/ \ ACTION( REQ_REDIS_LOLWUT) /* Vitally important */ \ + ACTION( REQ_REDIS_MONITOR) /* monitor */ \ ACTION( RSP_REDIS_STATUS ) /* redis response */ \ ACTION( RSP_REDIS_ERROR ) \ ACTION( RSP_REDIS_ERROR_ERR ) \ @@ -300,6 +301,7 @@ struct msg { unsigned fdone:1; /* all fragments are done? */ unsigned swallow:1; /* swallow response? */ unsigned redis:1; /* redis? */ + unsigned monitor:1; /* monitor comamnd? */ }; TAILQ_HEAD(msg_tqh, msg); @@ -350,4 +352,6 @@ void rsp_recv_done(struct context *ctx, struct conn *conn, struct msg *msg, stru struct msg *rsp_send_next(struct context *ctx, struct conn *conn); void rsp_send_done(struct context *ctx, struct conn *conn, struct msg *msg); +rstatus_t msg_append_full(struct msg *msg, uint8_t *pos, size_t n); + #endif diff --git a/src/nc_monitor.c b/src/nc_monitor.c new file mode 100644 index 00000000..b649fba3 --- /dev/null +++ b/src/nc_monitor.c @@ -0,0 +1,148 @@ +/* + * twemproxy - A fast and lightweight proxy for memcached protocol. + * + * Copyright (C) 2021, wei huang + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +struct rbtree monitor_tree; +struct rbnode monitor_sentinel_node; + +void monitor_init() +{ + rbtree_init(&monitor_tree, &monitor_sentinel_node); +} + +void monitor_deinit(struct context *ctx) +{ + struct rbnode *node = NULL; + + while ((node = rbtree_min(&monitor_tree) != NULL)) + { + struct conn *c = node->data; + if (event_del_conn(ctx->evb, c) != NC_OK) { + log_warn("event del conn c %d failed, ignored: %s", + c->sd, strerror(errno)); + } + c->close(ctx, c); + + rbtree_delete(&monitor_tree, node); + nc_free(node); + } +} + +int mointor_is_empty() +{ + return rbtree_is_empty(&monitor_tree); +} + +rstatus_t add_to_monitor(struct conn *c) +{ + ASSERT(c->client); + + struct rbnode *node = nc_alloc(sizeof(struct rbnode)); + if (node == NULL) + { + return NC_ENOMEM; + } + + c->monitor_client = 1; + node->key = c->sd; + node->data = c; + + rbtree_insert(&monitor_tree, node); +} + +void del_from_monitor(struct conn *c) +{ + ASSERT(c->client && c->monitor_client); + + struct rbnode *node = rbtree_find(&monitor_tree, c->sd); + ASSERT(node != NULL); + rbtree_delete(&monitor_tree, node); + nc_free(node); +} + +struct monitor_data +{ + struct msg *m; + struct conn *c; + struct context *ctx; + struct string *d; +}; + +static void monitor_callback(struct rbnode *node, void *data) +{ + struct monitor_data *mdata = data; + struct conn *req_c = node->data; + + struct msg *req = req_get(req_c); + if (req == NULL) { + return; + } + struct msg *rsp = msg_get(req_c, 0, mdata->c->redis); + if (rsp == NULL) { + msg_put(req); + return; + } + + req->peer = rsp; + rsp->peer = req; + + req->done = 1; + rsp->done = 1; + + if (msg_append_full(rsp, mdata->d->data, mdata->d->len) != NC_OK) { + msg_put(req); + msg_put(rsp); + return; + } + req_c->enqueue_outq(mdata->ctx, req_c, req); + if (event_add_out(mdata->ctx->evb, req_c) != NC_OK) { + req_c->err = errno; + msg_put(req); + msg_put(rsp); + } + + return; +} + +rstatus_t make_monitor(struct context *ctx, struct conn *c, struct msg *m) +{ + ASSERT(c->client); + + struct string monitor_message = null_string; + struct monitor_data mdata = {0}; + mdata.m = m; + mdata.c = c; + mdata.d = &monitor_message; + mdata.ctx = ctx; + struct keypos *kpos = array_get(m->keys, 0); + + string_printf(&monitor_message, "+%ld.%06ld [%s] command=%s key0=", + m->start_ts/1000000, m->start_ts%1000000, + nc_unresolve_peer_desc(c->sd), + (msg_type_string(m->type))->data); + string_cat_len(&monitor_message, kpos->start, kpos->end - kpos->start); + string_cat_len(&monitor_message, "\r\n", 2); + + rbtree_inorder_traversal(monitor_tree.root, monitor_tree.sentinel, monitor_callback, &mdata); + + string_deinit(&monitor_message); + return NC_OK; +} diff --git a/src/nc_monitor.h b/src/nc_monitor.h new file mode 100644 index 00000000..c5226ff4 --- /dev/null +++ b/src/nc_monitor.h @@ -0,0 +1,33 @@ +/* + * twemproxy - A fast and lightweight proxy for memcached protocol. + * + * Copyright (C) 2021, wei huang + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef NC_MONITOR_H +#define NC_MONITOR_H + +#include + +void monitor_init(); +void monitor_deinit(struct context *ctx); +int mointor_is_empty(); + +rstatus_t add_to_monitor(struct conn *c); +void del_from_monitor(struct conn *c); +rstatus_t make_monitor(struct context *ctx, struct conn *c, struct msg *m); + +#endif \ No newline at end of file diff --git a/src/nc_rbtree.c b/src/nc_rbtree.c index d76ad861..2d684ffd 100644 --- a/src/nc_rbtree.c +++ b/src/nc_rbtree.c @@ -342,3 +342,44 @@ rbtree_delete(struct rbtree *tree, struct rbnode *node) rbtree_black(temp); } + +struct rbnode *rbtree_find(struct rbtree *tree, int64_t key) { + struct rbnode **root = &tree->root; + struct rbnode *sentinel = tree->sentinel; + struct rbnode *temp, **p; + + /* empty tree */ + + if (*root == sentinel) { + return NULL; + } + + /* a binary tree find */ + temp = *root; + for (;;) { + + if (temp->key == key) { + break; + } + + p = (key < temp->key) ? &temp->left : &temp->right; + if (*p == sentinel) { + return NULL; + } + temp = *p; + } + + return temp; +} + +void rbtree_inorder_traversal(struct rbnode *root, struct rbnode *sentinel, + void (*func)(struct rbnode *, void *), void *data) { + + if (root == NULL || root == sentinel) { + return; + } + + func(root, data); + rbtree_inorder_traversal(root->left, sentinel, func, data); + rbtree_inorder_traversal(root->right, sentinel, func, data); +} \ No newline at end of file diff --git a/src/nc_rbtree.h b/src/nc_rbtree.h index 4b6137ac..fc0a7568 100644 --- a/src/nc_rbtree.h +++ b/src/nc_rbtree.h @@ -23,6 +23,7 @@ #define rbtree_is_red(_node) ((_node)->color) #define rbtree_is_black(_node) (!rbtree_is_red(_node)) #define rbtree_copy_color(_n1, _n2) ((_n1)->color = (_n2)->color) +#define rbtree_is_empty(_tree) ((_tree)->root == (_tree)->sentinel) struct rbnode { struct rbnode *left; /* left link */ @@ -43,5 +44,8 @@ void rbtree_init(struct rbtree *tree, struct rbnode *node); struct rbnode *rbtree_min(const struct rbtree *tree); void rbtree_insert(struct rbtree *tree, struct rbnode *node); void rbtree_delete(struct rbtree *tree, struct rbnode *node); +struct rbnode *rbtree_find(struct rbtree *tree, int64_t key); +void rbtree_inorder_traversal(struct rbnode *root, struct rbnode *sentinel, + void (*func)(struct rbnode *, void *), void *data); #endif diff --git a/src/nc_request.c b/src/nc_request.c index d69dd1ee..70385730 100644 --- a/src/nc_request.c +++ b/src/nc_request.c @@ -17,6 +17,7 @@ #include #include +#include struct msg * req_get(struct conn *conn) @@ -503,6 +504,15 @@ req_filter(struct conn *conn, struct msg *msg) return true; } + /* + * Handle monitor command. + */ + if (msg->monitor) { + add_to_monitor(conn); + req_put(msg); + return true; + } + /* * If this conn is not authenticated, we will mark it as noforward, * and handle it in the redis_reply handler. @@ -667,6 +677,11 @@ req_recv_done(struct context *ctx, struct conn *conn, struct msg *msg, return; } + /* if have monitor client, make monitor info. */ + if (!mointor_is_empty()) { + make_monitor(ctx, conn, msg); + } + /* do fragment */ pool = conn->owner; TAILQ_INIT(&frag_msgq); diff --git a/src/nc_string.c b/src/nc_string.c index dffaeafd..704bf6e8 100644 --- a/src/nc_string.c +++ b/src/nc_string.c @@ -175,6 +175,23 @@ _safe_itoa(int base, int64_t val, char *buf) return buf + 1; } +static const char * +_safe_check_placeholder(const char *fmt, int32_t *have_placeholder) { + *have_placeholder = false; + int32_t pos = 0; + if (*fmt == '0') { + fmt++; + + while (isdigit(*fmt)) { + *have_placeholder = *have_placeholder * pos + (*fmt - '0'); + fmt++; + pos++; + } + } + + return fmt; +} + static const char * _safe_check_longlong(const char *fmt, int32_t * have_longlong) { @@ -192,14 +209,19 @@ _safe_check_longlong(const char *fmt, int32_t * have_longlong) } int -_safe_vsnprintf(char *to, size_t size, const char *format, va_list ap) +_safe_vsnprintf(char *to, size_t size, int *parse_done, const char *format, va_list ap) { char *start = to; char *end = start + size - 1; + if (parse_done) *parse_done = 1; + for (; *format; ++format) { int32_t have_longlong = false; + int32_t have_placeholder = false; + int32_t placeholder_num = 0; if (*format != '%') { if (to == end) { /* end of buffer */ + if (parse_done) *parse_done = 0; break; } *to++ = *format; /* copy ordinary char */ @@ -207,6 +229,7 @@ _safe_vsnprintf(char *to, size_t size, const char *format, va_list ap) } ++format; /* skip '%' */ + format = _safe_check_placeholder(format, &have_placeholder); format = _safe_check_longlong(format, &have_longlong); switch (*format) { @@ -235,7 +258,7 @@ _safe_vsnprintf(char *to, size_t size, const char *format, va_list ap) } { - char buff[22]; + char buff[22] = {0}; const int base = (*format == 'x' || *format == 'p') ? 16 : 10; /* *INDENT-OFF* */ @@ -248,6 +271,14 @@ _safe_vsnprintf(char *to, size_t size, const char *format, va_list ap) if (*format == 'x' && !have_longlong && ival < 0) { val_as_str += 8; } + + if (have_placeholder) { + placeholder_num = nc_strlen(val_as_str); + while (have_placeholder > placeholder_num && to < end) { + *to++ = '0'; + placeholder_num++; + } + } while (*val_as_str && to < end) { *to++ = *val_as_str++; @@ -278,7 +309,71 @@ _safe_snprintf(char *to, size_t n, const char *fmt, ...) int result; va_list args; va_start(args, fmt); - result = _safe_vsnprintf(to, n, fmt, args); + result = _safe_vsnprintf(to, n, NULL, fmt, args); va_end(args); return result; } + +rstatus_t +string_printf(struct string *s, const char *fmt, ...) +{ + char *buf = NULL; + size_t buflen = nc_strlen(fmt)*2; + int bufstrlen, parse_done; + + buf = nc_alloc(buflen); + if (buf == NULL) { + return NC_ENOMEM; + } + + va_list args, cpy; + va_start(args, fmt); + while(1) { + va_copy(cpy, args); + bufstrlen = _safe_vsnprintf(buf, buflen, &parse_done, fmt, cpy); + va_end(cpy); + + if (!parse_done) { + nc_free(buf); + buflen += 256; + buf = nc_alloc(buflen); + if (buf == NULL) { + return NC_ENOMEM; + } + continue; + } + + break; + } + va_end(args); + + s->len = (uint32_t)bufstrlen; + s->data = (uint8_t*)buf; + + return NC_OK; +} + +rstatus_t string_cat_len(struct string *dst, uint8_t *data, uint32_t len) { + if (len == 0) { + return NC_OK; + } + + uint8_t *buf = dst->data; + uint32_t newlen = dst->len + len + 1; + + buf = nc_realloc(dst->data, newlen); + if (buf == NULL) { + return NC_ENOMEM; + } + + nc_memcpy(buf + dst->len, data, len); + buf[newlen-1] = '\0'; + dst->len = newlen-1; + dst->data = buf; + + return NC_OK; +} + +rstatus_t string_cat(struct string *dst, struct string *src) { + return string_cat_len(dst, src->data, src->len); +} diff --git a/src/nc_string.h b/src/nc_string.h index 755305ea..36c9d966 100644 --- a/src/nc_string.h +++ b/src/nc_string.h @@ -117,14 +117,14 @@ int string_compare(const struct string *s1, const struct string *s2); * Does not support any width/precision * Implemented with simplicity, and async-signal-safety in mind */ -int _safe_vsnprintf(char *to, size_t size, const char *format, va_list ap); +int _safe_vsnprintf(char *to, size_t size, int *parse_done, const char *format, va_list ap); int _safe_snprintf(char *to, size_t n, const char *fmt, ...); #define nc_safe_snprintf(_s, _n, ...) \ _safe_snprintf((char *)(_s), (size_t)(_n), __VA_ARGS__) #define nc_safe_vsnprintf(_s, _n, _f, _a) \ - _safe_vsnprintf((char *)(_s), (size_t)(_n), _f, _a) + _safe_vsnprintf((char *)(_s), (size_t)(_n), NULL, _f, _a) static inline uint8_t * _nc_strchr(uint8_t *p, uint8_t *last, uint8_t c) @@ -152,4 +152,12 @@ _nc_strrchr(uint8_t *p, uint8_t *start, uint8_t c) return NULL; } +rstatus_t string_printf(struct string *s, const char *fmt, ...); + +inline uint32_t string_len(const struct string *str) { + return str->len; +} +rstatus_t string_cat_len(struct string *dst, uint8_t *data, uint32_t len); +rstatus_t string_cat(struct string *dst, struct string *src); + #endif diff --git a/src/proto/nc_redis.c b/src/proto/nc_redis.c index a5ea210f..3a5b72f9 100644 --- a/src/proto/nc_redis.c +++ b/src/proto/nc_redis.c @@ -47,6 +47,7 @@ redis_argz(const struct msg *r) case MSG_REQ_REDIS_PING: case MSG_REQ_REDIS_QUIT: case MSG_REQ_REDIS_COMMAND: + case MSG_REQ_REDIS_MONITOR: return true; default: @@ -1105,6 +1106,13 @@ redis_parse_req(struct msg *r) break; } + if (str7icmp(m, 'm', 'o', 'n', 'i', 't', 'o', 'r')) { + r->type = MSG_REQ_REDIS_MONITOR; + r->noforward = 1; + r->monitor = 1; + break; + } + break; case 8: From 23c63848164c3b6d6c3de195361da5519c8115c0 Mon Sep 17 00:00:00 2001 From: kukey Date: Wed, 22 Sep 2021 21:38:44 +0800 Subject: [PATCH 2/8] fix core_start not call monitor_init() funcation due to delete rbnode segment fault --- src/nc_core.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/nc_core.c b/src/nc_core.c index 430bba7c..d0c4f654 100644 --- a/src/nc_core.c +++ b/src/nc_core.c @@ -21,6 +21,7 @@ #include #include #include +#include static uint32_t ctx_id; /* context generation */ @@ -163,6 +164,7 @@ core_start(struct instance *nci) mbuf_init(nci); msg_init(); conn_init(); + monitor_init(); ctx = core_ctx_create(nci); if (ctx != NULL) { @@ -170,6 +172,7 @@ core_start(struct instance *nci) return ctx; } + monitor_deinit(ctx); conn_deinit(); msg_deinit(); mbuf_deinit(); @@ -180,6 +183,7 @@ core_start(struct instance *nci) void core_stop(struct context *ctx) { + monitor_deinit(ctx); conn_deinit(); msg_deinit(); mbuf_deinit(); From ec8b9379524d3674be744577002d52727612b7f0 Mon Sep 17 00:00:00 2001 From: huangwei03 Date: Thu, 23 Sep 2021 20:23:12 +0800 Subject: [PATCH 3/8] event_add_out fail dequeue_outq fake req info --- src/nc_monitor.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/nc_monitor.c b/src/nc_monitor.c index b649fba3..03941e89 100644 --- a/src/nc_monitor.c +++ b/src/nc_monitor.c @@ -115,6 +115,7 @@ static void monitor_callback(struct rbnode *node, void *data) req_c->enqueue_outq(mdata->ctx, req_c, req); if (event_add_out(mdata->ctx->evb, req_c) != NC_OK) { req_c->err = errno; + req_c->dequeue_outq(mdata->ctx, req_c, req); msg_put(req); msg_put(rsp); } From 79e5d87582636fae2a5011011648dd30c93a0b6a Mon Sep 17 00:00:00 2001 From: kukey Date: Sun, 26 Sep 2021 15:41:50 +0800 Subject: [PATCH 4/8] Apply suggestions from code review Co-authored-by: Tyson Andre --- src/nc_monitor.c | 2 +- src/nc_monitor.h | 2 +- src/nc_string.c | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/nc_monitor.c b/src/nc_monitor.c index 03941e89..106a41af 100644 --- a/src/nc_monitor.c +++ b/src/nc_monitor.c @@ -32,7 +32,7 @@ void monitor_deinit(struct context *ctx) { struct rbnode *node = NULL; - while ((node = rbtree_min(&monitor_tree) != NULL)) + while ((node = rbtree_min(&monitor_tree)) != NULL) { struct conn *c = node->data; if (event_del_conn(ctx->evb, c) != NC_OK) { diff --git a/src/nc_monitor.h b/src/nc_monitor.h index c5226ff4..cf21b777 100644 --- a/src/nc_monitor.h +++ b/src/nc_monitor.h @@ -24,7 +24,7 @@ void monitor_init(); void monitor_deinit(struct context *ctx); -int mointor_is_empty(); +int monitor_is_empty(); rstatus_t add_to_monitor(struct conn *c); void del_from_monitor(struct conn *c); diff --git a/src/nc_string.c b/src/nc_string.c index 704bf6e8..ead1c0ad 100644 --- a/src/nc_string.c +++ b/src/nc_string.c @@ -176,8 +176,8 @@ _safe_itoa(int base, int64_t val, char *buf) } static const char * -_safe_check_placeholder(const char *fmt, int32_t *have_placeholder) { - *have_placeholder = false; +_safe_check_placeholder_len(const char *fmt, int32_t *placeholder_len) { + *placeholder_len = 0; int32_t pos = 0; if (*fmt == '0') { fmt++; From 4170eb94ed62904ad2851fdac19d0613e0409515 Mon Sep 17 00:00:00 2001 From: huangwei03 Date: Fri, 1 Oct 2021 19:15:11 +0800 Subject: [PATCH 5/8] use struct array replace struct rbtree to save conn point --- src/nc_array.c | 19 ++++++++ src/nc_array.h | 1 + src/nc_conf.c | 14 ++++++ src/nc_conf.h | 2 + src/nc_core.c | 3 -- src/nc_monitor.c | 116 ++++++++++++++++++++++++++--------------------- src/nc_monitor.h | 14 ++++-- src/nc_rbtree.c | 41 ----------------- src/nc_rbtree.h | 3 -- src/nc_request.c | 13 ++++-- src/nc_server.c | 2 + src/nc_server.h | 2 + src/nc_string.c | 16 +++---- 13 files changed, 130 insertions(+), 116 deletions(-) diff --git a/src/nc_array.c b/src/nc_array.c index efc8c08c..3753d1c8 100644 --- a/src/nc_array.c +++ b/src/nc_array.c @@ -202,3 +202,22 @@ array_each(const struct array *a, array_each_t func, void *data) return NC_OK; } + +rstatus_t +array_del(struct array *a, uint32_t idx) +{ + uint8_t *pos = NULL; + uint64_t len = 0; + + if (a->nelem == 0 || idx >= a->nelem) { + return NC_ERROR; + } + + pos = (uint8_t*)a->elem + (a->size * idx); + len = (a->nelem - idx - 1) * a->size; + + memmove(pos, pos + a->size, (size_t)len); + a->nelem--; + + return NC_OK; +} diff --git a/src/nc_array.h b/src/nc_array.h index 61669457..7f63dc57 100644 --- a/src/nc_array.h +++ b/src/nc_array.h @@ -69,5 +69,6 @@ void *array_top(const struct array *a); void array_swap(struct array *a, struct array *b); void array_sort(struct array *a, array_compare_t compare); rstatus_t array_each(const struct array *a, array_each_t func, void *data); +rstatus_t array_del(struct array *a, uint32_t idx); #endif diff --git a/src/nc_conf.c b/src/nc_conf.c index c4e48ddb..9f1ab3c4 100644 --- a/src/nc_conf.c +++ b/src/nc_conf.c @@ -19,6 +19,7 @@ #include #include #include +#include #define DEFINE_ACTION(_hash, _name) string(#_name), static const struct string hash_strings[] = { @@ -110,6 +111,10 @@ static const struct command conf_commands[] = { conf_add_server, offsetof(struct conf_pool, server) }, + { string("enable_monitor"), + conf_set_bool, + offsetof(struct conf_pool, enable_monitor) }, + null_command }; @@ -225,6 +230,8 @@ conf_pool_init(struct conf_pool *cp, const struct string *name) return status; } + cp->enable_monitor = CONF_UNSET_NUM; + log_debug(LOG_VVERB, "init conf pool %p, '%.*s'", cp, name->len, name->data); return NC_OK; @@ -311,6 +318,9 @@ conf_pool_each_transform(void *elem, void *data) return status; } + sp->enable_monitor = cp->enable_monitor ? 1 : 0; + monitor_init(sp); + log_debug(LOG_VERB, "transform to pool %"PRIu32" '%.*s'", sp->idx, sp->name.len, sp->name.data); @@ -1282,6 +1292,10 @@ conf_validate_pool(struct conf *cf, struct conf_pool *cp) return status; } + if (cp->enable_monitor == CONF_UNSET_NUM) { + cp->enable_monitor = CONF_DEFAULT_ENABLE_MONITOR; + } + cp->valid = 1; return NC_OK; diff --git a/src/nc_conf.h b/src/nc_conf.h index 6f861949..881d9e2a 100644 --- a/src/nc_conf.h +++ b/src/nc_conf.h @@ -55,6 +55,7 @@ #define CONF_DEFAULT_SERVER_CONNECTIONS 1 #define CONF_DEFAULT_KETAMA_PORT 11211 #define CONF_DEFAULT_TCPKEEPALIVE false +#define CONF_DEFAULT_ENABLE_MONITOR false struct conf_listen { struct string pname; /* listen: as "hostname:port" */ @@ -94,6 +95,7 @@ struct conf_pool { int server_retry_timeout; /* server_retry_timeout: in msec */ int server_failure_limit; /* server_failure_limit: */ struct array server; /* servers: conf_server[] */ + int enable_monitor; /* enable_monitor: */ unsigned valid:1; /* valid? */ }; diff --git a/src/nc_core.c b/src/nc_core.c index d0c4f654..b921c260 100644 --- a/src/nc_core.c +++ b/src/nc_core.c @@ -164,7 +164,6 @@ core_start(struct instance *nci) mbuf_init(nci); msg_init(); conn_init(); - monitor_init(); ctx = core_ctx_create(nci); if (ctx != NULL) { @@ -172,7 +171,6 @@ core_start(struct instance *nci) return ctx; } - monitor_deinit(ctx); conn_deinit(); msg_deinit(); mbuf_deinit(); @@ -183,7 +181,6 @@ core_start(struct instance *nci) void core_stop(struct context *ctx) { - monitor_deinit(ctx); conn_deinit(); msg_deinit(); mbuf_deinit(); diff --git a/src/nc_monitor.c b/src/nc_monitor.c index 106a41af..2e6d0418 100644 --- a/src/nc_monitor.c +++ b/src/nc_monitor.c @@ -18,64 +18,67 @@ */ #include -#include +#include -struct rbtree monitor_tree; -struct rbnode monitor_sentinel_node; - -void monitor_init() +void +monitor_init(struct server_pool *sp) { - rbtree_init(&monitor_tree, &monitor_sentinel_node); + if (sp->enable_monitor) { + array_init(&sp->monitor_conns, CONF_DEFAULT_ARRAY_MONITOR_NUM, sizeof(struct conn *)); + } } -void monitor_deinit(struct context *ctx) +void +monitor_deinit(struct server_pool *sp) { - struct rbnode *node = NULL; - - while ((node = rbtree_min(&monitor_tree)) != NULL) - { - struct conn *c = node->data; - if (event_del_conn(ctx->evb, c) != NC_OK) { - log_warn("event del conn c %d failed, ignored: %s", - c->sd, strerror(errno)); - } - c->close(ctx, c); + struct array *monitor_conns = &sp->monitor_conns; - rbtree_delete(&monitor_tree, node); - nc_free(node); + ASSERT(monitor_conns != NULL); + + if (sp->enable_monitor) { + while (array_n(monitor_conns) > 0) { + array_pop(monitor_conns); + } + array_deinit(monitor_conns); } } -int mointor_is_empty() +rstatus_t +add_to_monitor(struct conn *c) { - return rbtree_is_empty(&monitor_tree); -} + struct server_pool *sp = c->owner; -rstatus_t add_to_monitor(struct conn *c) -{ - ASSERT(c->client); + ASSERT(c->client && sp != NULL); - struct rbnode *node = nc_alloc(sizeof(struct rbnode)); - if (node == NULL) - { + struct conn **monitor = array_push(&sp->monitor_conns); + if (monitor == NULL) { return NC_ENOMEM; } - c->monitor_client = 1; - node->key = c->sd; - node->data = c; + *monitor = c; - rbtree_insert(&monitor_tree, node); + return NC_OK; } -void del_from_monitor(struct conn *c) +void +del_from_monitor(struct conn *c) { - ASSERT(c->client && c->monitor_client); + uint32_t i; + struct conn **tmp_conn = NULL; + struct server_pool *sp = c->owner; + struct array *a = NULL; - struct rbnode *node = rbtree_find(&monitor_tree, c->sd); - ASSERT(node != NULL); - rbtree_delete(&monitor_tree, node); - nc_free(node); + ASSERT(c->client && c->monitor_client); + ASSERT(sp != NULL); + + a = &sp->monitor_conns; + for (i = 0; i < array_n(a); i++) { + tmp_conn = array_get(a, i); + if (*tmp_conn == c) { + array_del(a, i); + break; + } + } } struct monitor_data @@ -86,19 +89,21 @@ struct monitor_data struct string *d; }; -static void monitor_callback(struct rbnode *node, void *data) +static int +monitor_callback(void *conn, void *data) { struct monitor_data *mdata = data; - struct conn *req_c = node->data; + struct conn **monitor_conn = conn; + struct conn *req_c = *monitor_conn; struct msg *req = req_get(req_c); if (req == NULL) { - return; + return NC_ENOMEM; } struct msg *rsp = msg_get(req_c, 0, mdata->c->redis); if (rsp == NULL) { msg_put(req); - return; + return NC_ENOMEM; } req->peer = rsp; @@ -110,7 +115,7 @@ static void monitor_callback(struct rbnode *node, void *data) if (msg_append_full(rsp, mdata->d->data, mdata->d->len) != NC_OK) { msg_put(req); msg_put(rsp); - return; + return NC_ENOMEM; } req_c->enqueue_outq(mdata->ctx, req_c, req); if (event_add_out(mdata->ctx->evb, req_c) != NC_OK) { @@ -118,15 +123,17 @@ static void monitor_callback(struct rbnode *node, void *data) req_c->dequeue_outq(mdata->ctx, req_c, req); msg_put(req); msg_put(rsp); + return NC_ERROR; } - return; + return NC_OK; } -rstatus_t make_monitor(struct context *ctx, struct conn *c, struct msg *m) +rstatus_t rsp_send_monitor_msg(struct context *ctx, struct conn *c, struct msg *m) { ASSERT(c->client); + struct server_pool *sp = c->owner; struct string monitor_message = null_string; struct monitor_data mdata = {0}; mdata.m = m; @@ -135,14 +142,19 @@ rstatus_t make_monitor(struct context *ctx, struct conn *c, struct msg *m) mdata.ctx = ctx; struct keypos *kpos = array_get(m->keys, 0); - string_printf(&monitor_message, "+%ld.%06ld [%s] command=%s key0=", - m->start_ts/1000000, m->start_ts%1000000, - nc_unresolve_peer_desc(c->sd), - (msg_type_string(m->type))->data); - string_cat_len(&monitor_message, kpos->start, kpos->end - kpos->start); - string_cat_len(&monitor_message, "\r\n", 2); + if (c->redis) { + string_printf(&monitor_message, "+%ld.%06ld [%s] command=%s key0=", + m->start_ts/1000000, m->start_ts%1000000, + nc_unresolve_peer_desc(c->sd), + (msg_type_string(m->type))->data); + string_cat_len(&monitor_message, kpos->start, (uint32_t)(kpos->end - kpos->start)); + string_cat_len(&monitor_message, (uint8_t*)"\r\n", 2); + } else { + /* FIX ME: add memcached protocol monitor msg */ + } + - rbtree_inorder_traversal(monitor_tree.root, monitor_tree.sentinel, monitor_callback, &mdata); + array_each(&sp->monitor_conns, monitor_callback, &mdata); string_deinit(&monitor_message); return NC_OK; diff --git a/src/nc_monitor.h b/src/nc_monitor.h index cf21b777..3793b684 100644 --- a/src/nc_monitor.h +++ b/src/nc_monitor.h @@ -20,14 +20,20 @@ #ifndef NC_MONITOR_H #define NC_MONITOR_H +#include #include -void monitor_init(); -void monitor_deinit(struct context *ctx); -int monitor_is_empty(); +#define CONF_DEFAULT_ARRAY_MONITOR_NUM 2 + +void monitor_init(struct server_pool *sp); +void monitor_deinit(struct server_pool *sp); + +inline bool monitor_is_empty(struct server_pool *sp) { + return array_n(&sp->monitor_conns) == 0; +} rstatus_t add_to_monitor(struct conn *c); void del_from_monitor(struct conn *c); -rstatus_t make_monitor(struct context *ctx, struct conn *c, struct msg *m); +rstatus_t rsp_send_monitor_msg(struct context *ctx, struct conn *c, struct msg *m); #endif \ No newline at end of file diff --git a/src/nc_rbtree.c b/src/nc_rbtree.c index 2d684ffd..d76ad861 100644 --- a/src/nc_rbtree.c +++ b/src/nc_rbtree.c @@ -342,44 +342,3 @@ rbtree_delete(struct rbtree *tree, struct rbnode *node) rbtree_black(temp); } - -struct rbnode *rbtree_find(struct rbtree *tree, int64_t key) { - struct rbnode **root = &tree->root; - struct rbnode *sentinel = tree->sentinel; - struct rbnode *temp, **p; - - /* empty tree */ - - if (*root == sentinel) { - return NULL; - } - - /* a binary tree find */ - temp = *root; - for (;;) { - - if (temp->key == key) { - break; - } - - p = (key < temp->key) ? &temp->left : &temp->right; - if (*p == sentinel) { - return NULL; - } - temp = *p; - } - - return temp; -} - -void rbtree_inorder_traversal(struct rbnode *root, struct rbnode *sentinel, - void (*func)(struct rbnode *, void *), void *data) { - - if (root == NULL || root == sentinel) { - return; - } - - func(root, data); - rbtree_inorder_traversal(root->left, sentinel, func, data); - rbtree_inorder_traversal(root->right, sentinel, func, data); -} \ No newline at end of file diff --git a/src/nc_rbtree.h b/src/nc_rbtree.h index fc0a7568..3d7bafe0 100644 --- a/src/nc_rbtree.h +++ b/src/nc_rbtree.h @@ -44,8 +44,5 @@ void rbtree_init(struct rbtree *tree, struct rbnode *node); struct rbnode *rbtree_min(const struct rbtree *tree); void rbtree_insert(struct rbtree *tree, struct rbnode *node); void rbtree_delete(struct rbtree *tree, struct rbnode *node); -struct rbnode *rbtree_find(struct rbtree *tree, int64_t key); -void rbtree_inorder_traversal(struct rbnode *root, struct rbnode *sentinel, - void (*func)(struct rbnode *, void *), void *data); #endif diff --git a/src/nc_request.c b/src/nc_request.c index 70385730..9b0c5ac0 100644 --- a/src/nc_request.c +++ b/src/nc_request.c @@ -475,8 +475,13 @@ req_make_reply(struct context *ctx, struct conn *conn, struct msg *req) static bool req_filter(struct conn *conn, struct msg *msg) { + struct server_pool *sp; + ASSERT(conn->client && !conn->proxy); + sp = conn->owner; + ASSERT(sp != NULL); + if (msg_empty(msg)) { ASSERT(conn->rmsg == NULL); log_debug(LOG_VERB, "filter empty req %"PRIu64" from c %d", msg->id, @@ -507,7 +512,7 @@ req_filter(struct conn *conn, struct msg *msg) /* * Handle monitor command. */ - if (msg->monitor) { + if (sp->enable_monitor && msg->monitor) { add_to_monitor(conn); req_put(msg); return true; @@ -677,13 +682,13 @@ req_recv_done(struct context *ctx, struct conn *conn, struct msg *msg, return; } + pool = conn->owner; /* if have monitor client, make monitor info. */ - if (!mointor_is_empty()) { - make_monitor(ctx, conn, msg); + if (pool->enable_monitor && !monitor_is_empty(pool)) { + rsp_send_monitor_msg(ctx, conn, msg); } /* do fragment */ - pool = conn->owner; TAILQ_INIT(&frag_msgq); status = msg->fragment(msg, array_n(&pool->server), &frag_msgq); if (status != NC_OK) { diff --git a/src/nc_server.c b/src/nc_server.c index dab6a79b..1556cd2e 100644 --- a/src/nc_server.c +++ b/src/nc_server.c @@ -21,6 +21,7 @@ #include #include #include +#include static void server_resolve(struct server *server, struct conn *conn) @@ -922,6 +923,7 @@ server_pool_deinit(struct array *server_pool) } server_deinit(&sp->server); + monitor_deinit(sp); log_debug(LOG_DEBUG, "deinit pool %"PRIu32" '%.*s'", sp->idx, sp->name.len, sp->name.data); diff --git a/src/nc_server.h b/src/nc_server.h index b9798db0..91ed7778 100644 --- a/src/nc_server.h +++ b/src/nc_server.h @@ -116,11 +116,13 @@ struct server_pool { int64_t server_retry_timeout; /* server retry timeout in usec */ uint32_t server_failure_limit; /* server failure limit */ struct string redis_auth; /* redis_auth password (matches requirepass on redis) */ + struct array monitor_conns; /* monitor connections */ unsigned require_auth; /* require_auth? */ unsigned auto_eject_hosts:1; /* auto_eject_hosts? */ unsigned preconnect:1; /* preconnect? */ unsigned redis:1; /* redis? */ unsigned tcpkeepalive:1; /* tcpkeepalive? */ + unsigned enable_monitor:1; /* enable_monitor? */ }; void server_ref(struct conn *conn, void *owner); diff --git a/src/nc_string.c b/src/nc_string.c index ead1c0ad..518f4e9a 100644 --- a/src/nc_string.c +++ b/src/nc_string.c @@ -176,16 +176,14 @@ _safe_itoa(int base, int64_t val, char *buf) } static const char * -_safe_check_placeholder_len(const char *fmt, int32_t *placeholder_len) { +_safe_check_placeholder(const char *fmt, int32_t *placeholder_len) { *placeholder_len = 0; - int32_t pos = 0; if (*fmt == '0') { fmt++; while (isdigit(*fmt)) { - *have_placeholder = *have_placeholder * pos + (*fmt - '0'); + *placeholder_len = *placeholder_len * 10 + (*fmt - '0'); fmt++; - pos++; } } @@ -217,7 +215,7 @@ _safe_vsnprintf(char *to, size_t size, int *parse_done, const char *format, va_l for (; *format; ++format) { int32_t have_longlong = false; - int32_t have_placeholder = false; + int32_t placeholder_len = false; int32_t placeholder_num = 0; if (*format != '%') { if (to == end) { /* end of buffer */ @@ -229,7 +227,7 @@ _safe_vsnprintf(char *to, size_t size, int *parse_done, const char *format, va_l } ++format; /* skip '%' */ - format = _safe_check_placeholder(format, &have_placeholder); + format = _safe_check_placeholder(format, &placeholder_len); format = _safe_check_longlong(format, &have_longlong); switch (*format) { @@ -272,9 +270,9 @@ _safe_vsnprintf(char *to, size_t size, int *parse_done, const char *format, va_l val_as_str += 8; } - if (have_placeholder) { - placeholder_num = nc_strlen(val_as_str); - while (have_placeholder > placeholder_num && to < end) { + if (placeholder_len) { + placeholder_num = (int32_t)(val_as_str - buff); + while (placeholder_len > placeholder_num && to < end) { *to++ = '0'; placeholder_num++; } From 97e3b14492fb44150a2dacc4accad2df73e5baa7 Mon Sep 17 00:00:00 2001 From: huangwei03 Date: Sat, 2 Oct 2021 16:10:38 +0800 Subject: [PATCH 6/8] string_printf() support '%.*s' modifier --- src/nc_core.h | 1 + src/nc_monitor.c | 22 +++++++++++++++------- src/nc_server.c | 1 - src/nc_string.c | 49 ++++++++++++++++++++++++++++++++++++++++-------- 4 files changed, 57 insertions(+), 16 deletions(-) diff --git a/src/nc_core.h b/src/nc_core.h index 3ac6dce6..0ea92d5b 100644 --- a/src/nc_core.h +++ b/src/nc_core.h @@ -117,6 +117,7 @@ struct event_base; #include #include #include +#include struct context { uint32_t id; /* unique context id */ diff --git a/src/nc_monitor.c b/src/nc_monitor.c index 2e6d0418..e2ba7be9 100644 --- a/src/nc_monitor.c +++ b/src/nc_monitor.c @@ -140,15 +140,23 @@ rstatus_t rsp_send_monitor_msg(struct context *ctx, struct conn *c, struct msg * mdata.c = c; mdata.d = &monitor_message; mdata.ctx = ctx; - struct keypos *kpos = array_get(m->keys, 0); + struct keypos kpos = {0}; + struct keypos *tmp_kpos = NULL; if (c->redis) { - string_printf(&monitor_message, "+%ld.%06ld [%s] command=%s key0=", - m->start_ts/1000000, m->start_ts%1000000, - nc_unresolve_peer_desc(c->sd), - (msg_type_string(m->type))->data); - string_cat_len(&monitor_message, kpos->start, (uint32_t)(kpos->end - kpos->start)); - string_cat_len(&monitor_message, (uint8_t*)"\r\n", 2); + /* Only Command command has a fake key. */ + if (m->type != MSG_REQ_REDIS_COMMAND) { + tmp_kpos = array_get(m->keys, 0); + + kpos.start = tmp_kpos->start; + kpos.end = tmp_kpos->end; + } + string_printf(&monitor_message, "+%ld.%06ld [%s] command=%s key0=%.*s\r\n", + m->start_ts/1000000, m->start_ts%1000000, + nc_unresolve_peer_desc(c->sd), + (msg_type_string(m->type))->data, + kpos.end - kpos.start, kpos.start); + } else { /* FIX ME: add memcached protocol monitor msg */ } diff --git a/src/nc_server.c b/src/nc_server.c index 1556cd2e..f4bd4d43 100644 --- a/src/nc_server.c +++ b/src/nc_server.c @@ -21,7 +21,6 @@ #include #include #include -#include static void server_resolve(struct server *server, struct conn *conn) diff --git a/src/nc_string.c b/src/nc_string.c index 518f4e9a..ee425ec0 100644 --- a/src/nc_string.c +++ b/src/nc_string.c @@ -176,8 +176,10 @@ _safe_itoa(int base, int64_t val, char *buf) } static const char * -_safe_check_placeholder(const char *fmt, int32_t *placeholder_len) { +_safe_check_placeholder(const char *fmt, int32_t *placeholder_len, bool *is_variable) { *placeholder_len = 0; + *is_variable = false; + if (*fmt == '0') { fmt++; @@ -185,6 +187,17 @@ _safe_check_placeholder(const char *fmt, int32_t *placeholder_len) { *placeholder_len = *placeholder_len * 10 + (*fmt - '0'); fmt++; } + } else if (*fmt == '.') { + fmt++; + if (*fmt == '*') { + *is_variable = true; + fmt++; + } else { + while (isdigit(*fmt)) { + *placeholder_len = *placeholder_len * 10 + (*fmt - '0'); + fmt++; + } + } } return fmt; @@ -212,6 +225,7 @@ _safe_vsnprintf(char *to, size_t size, int *parse_done, const char *format, va_l char *start = to; char *end = start + size - 1; if (parse_done) *parse_done = 1; + bool is_variable = false; for (; *format; ++format) { int32_t have_longlong = false; @@ -227,7 +241,7 @@ _safe_vsnprintf(char *to, size_t size, int *parse_done, const char *format, va_l } ++format; /* skip '%' */ - format = _safe_check_placeholder(format, &placeholder_len); + format = _safe_check_placeholder(format, &placeholder_len, &is_variable); format = _safe_check_longlong(format, &have_longlong); switch (*format) { @@ -271,7 +285,7 @@ _safe_vsnprintf(char *to, size_t size, int *parse_done, const char *format, va_l } if (placeholder_len) { - placeholder_num = (int32_t)(val_as_str - buff); + placeholder_num = (int32_t)(&buff[sizeof(buff) - 1] - val_as_str); while (placeholder_len > placeholder_num && to < end) { *to++ = '0'; placeholder_num++; @@ -286,11 +300,25 @@ _safe_vsnprintf(char *to, size_t size, int *parse_done, const char *format, va_l } case 's': { + if (is_variable) { + placeholder_len = (int32_t)va_arg(ap, int64_t); + } + const char *val = va_arg(ap, char *); if (!val) { val = "(null)"; + if (is_variable) { + /* placeholder_len = nc_strlen(val); */ + placeholder_len = 6; + } } while (*val && to < end) { + if (is_variable) { + if (placeholder_len == 0) { + break; + } + placeholder_len--; + } *to++ = *val++; } continue; @@ -315,14 +343,12 @@ _safe_snprintf(char *to, size_t n, const char *fmt, ...) rstatus_t string_printf(struct string *s, const char *fmt, ...) { + char static_buff[1024] = {0}; char *buf = NULL; - size_t buflen = nc_strlen(fmt)*2; + size_t buflen = sizeof(static_buff); int bufstrlen, parse_done; - buf = nc_alloc(buflen); - if (buf == NULL) { - return NC_ENOMEM; - } + buf = static_buff; va_list args, cpy; va_start(args, fmt); @@ -339,6 +365,13 @@ string_printf(struct string *s, const char *fmt, ...) return NC_ENOMEM; } continue; + } else { + buf = nc_zalloc(bufstrlen + 1); + if (buf == NULL) { + return NC_ENOMEM; + } + + memcpy(buf ,static_buff, bufstrlen); } break; From 5f715658291e0e50405950ee36148c78ba28adb4 Mon Sep 17 00:00:00 2001 From: kukey Date: Mon, 4 Oct 2021 08:39:45 +0800 Subject: [PATCH 7/8] fix compile warning --- src/nc_client.c | 1 - src/nc_core.c | 1 - src/nc_monitor.h | 6 ++++-- src/nc_request.c | 1 - src/nc_string.c | 2 +- src/nc_string.h | 4 ---- 6 files changed, 5 insertions(+), 10 deletions(-) diff --git a/src/nc_client.c b/src/nc_client.c index b2504d30..a1f24f98 100644 --- a/src/nc_client.c +++ b/src/nc_client.c @@ -18,7 +18,6 @@ #include #include #include -#include void client_ref(struct conn *conn, void *owner) diff --git a/src/nc_core.c b/src/nc_core.c index b921c260..430bba7c 100644 --- a/src/nc_core.c +++ b/src/nc_core.c @@ -21,7 +21,6 @@ #include #include #include -#include static uint32_t ctx_id; /* context generation */ diff --git a/src/nc_monitor.h b/src/nc_monitor.h index 3793b684..76e897c2 100644 --- a/src/nc_monitor.h +++ b/src/nc_monitor.h @@ -28,8 +28,10 @@ void monitor_init(struct server_pool *sp); void monitor_deinit(struct server_pool *sp); -inline bool monitor_is_empty(struct server_pool *sp) { - return array_n(&sp->monitor_conns) == 0; +static inline bool +monitor_is_empty(struct server_pool *sp) +{ + return sp->monitor_conns.nelem == 0; } rstatus_t add_to_monitor(struct conn *c); diff --git a/src/nc_request.c b/src/nc_request.c index 9b0c5ac0..34ccfbc1 100644 --- a/src/nc_request.c +++ b/src/nc_request.c @@ -17,7 +17,6 @@ #include #include -#include struct msg * req_get(struct conn *conn) diff --git a/src/nc_string.c b/src/nc_string.c index ee425ec0..92a99ab8 100644 --- a/src/nc_string.c +++ b/src/nc_string.c @@ -371,7 +371,7 @@ string_printf(struct string *s, const char *fmt, ...) return NC_ENOMEM; } - memcpy(buf ,static_buff, bufstrlen); + memcpy(buf ,static_buff, (size_t)bufstrlen); } break; diff --git a/src/nc_string.h b/src/nc_string.h index 36c9d966..a842aeb4 100644 --- a/src/nc_string.h +++ b/src/nc_string.h @@ -153,10 +153,6 @@ _nc_strrchr(uint8_t *p, uint8_t *start, uint8_t c) } rstatus_t string_printf(struct string *s, const char *fmt, ...); - -inline uint32_t string_len(const struct string *str) { - return str->len; -} rstatus_t string_cat_len(struct string *dst, uint8_t *data, uint32_t len); rstatus_t string_cat(struct string *dst, struct string *src); From 38a3028ae07cb64a532ab13d21e8703bdd72236d Mon Sep 17 00:00:00 2001 From: huangwei03 Date: Tue, 5 Oct 2021 18:05:26 +0800 Subject: [PATCH 8/8] implement memcached monitor protocol that is only for twemproxy --- src/nc_message.h | 1 + src/nc_monitor.c | 23 ++++++++++++++--------- src/proto/nc_memcache.c | 7 +++++++ 3 files changed, 22 insertions(+), 9 deletions(-) diff --git a/src/nc_message.h b/src/nc_message.h index d5dba6b2..b7276563 100644 --- a/src/nc_message.h +++ b/src/nc_message.h @@ -51,6 +51,7 @@ typedef enum msg_parse_result { ACTION( REQ_MC_TOUCH ) /* memcache touch request */ \ ACTION( REQ_MC_QUIT ) /* memcache quit request */ \ ACTION( REQ_MC_VERSION ) /* memcache version request */ \ + ACTION( REQ_MC_MONITOR ) /* memcache monitor request, only used for proxy */ \ ACTION( RSP_MC_NUM ) /* memcache arithmetic response */ \ ACTION( RSP_MC_STORED ) /* memcache cas and storage response */ \ ACTION( RSP_MC_NOT_STORED ) \ diff --git a/src/nc_monitor.c b/src/nc_monitor.c index e2ba7be9..ab28b2af 100644 --- a/src/nc_monitor.c +++ b/src/nc_monitor.c @@ -143,14 +143,15 @@ rstatus_t rsp_send_monitor_msg(struct context *ctx, struct conn *c, struct msg * struct keypos kpos = {0}; struct keypos *tmp_kpos = NULL; - if (c->redis) { - /* Only Command command has a fake key. */ - if (m->type != MSG_REQ_REDIS_COMMAND) { - tmp_kpos = array_get(m->keys, 0); + /* Only redis command command has a fake key. */ + if (m->type != MSG_REQ_REDIS_COMMAND) { + tmp_kpos = array_get(m->keys, 0); - kpos.start = tmp_kpos->start; - kpos.end = tmp_kpos->end; - } + kpos.start = tmp_kpos->start; + kpos.end = tmp_kpos->end; + } + + if (c->redis) { string_printf(&monitor_message, "+%ld.%06ld [%s] command=%s key0=%.*s\r\n", m->start_ts/1000000, m->start_ts%1000000, nc_unresolve_peer_desc(c->sd), @@ -158,10 +159,14 @@ rstatus_t rsp_send_monitor_msg(struct context *ctx, struct conn *c, struct msg * kpos.end - kpos.start, kpos.start); } else { - /* FIX ME: add memcached protocol monitor msg */ + /* This monitor protocol only for twemproxy. */ + string_printf(&monitor_message, "MONITOR\r\n%ld.%06ld [%s] command=%s key0=%.*s\r\nEND\r\n", + m->start_ts/1000000, m->start_ts%1000000, + nc_unresolve_peer_desc(c->sd), + (msg_type_string(m->type))->data, + kpos.end - kpos.start, kpos.start); } - array_each(&sp->monitor_conns, monitor_callback, &mdata); string_deinit(&monitor_message); diff --git a/src/proto/nc_memcache.c b/src/proto/nc_memcache.c index 79545976..4c5f32e7 100644 --- a/src/proto/nc_memcache.c +++ b/src/proto/nc_memcache.c @@ -328,6 +328,12 @@ memcache_parse_req(struct msg *r) break; } + if (str7cmp(m, 'm', 'o', 'n', 'i', 't', 'o', 'r')) { + r->type = MSG_REQ_MC_MONITOR; + r->monitor = 1; + break; + } + break; } @@ -352,6 +358,7 @@ memcache_parse_req(struct msg *r) case MSG_REQ_MC_VERSION: case MSG_REQ_MC_QUIT: + case MSG_REQ_MC_MONITOR: p = p - 1; /* go back by 1 byte */ state = SW_CRLF; break;