From 2e7d7cbabd32912342218078282fce92f6cc0ab6 Mon Sep 17 00:00:00 2001 From: Michael Grunder Date: Sun, 19 Jul 2020 18:54:42 -0700 Subject: [PATCH] Resp3 oob push support (#841) Proper support for RESP3 PUSH messages. By default, PUSH messages are now intercepted and the reply memory freed. This means existing code should work unchanged when connecting to Redis >= 6.0.0 even if `CLIENT TRACKING` were then enabled. Additionally, we define two callbacks users can configure if they wish to handle these messages in a custom way: void redisPushFn(void *privdata, void *reply); void redisAsyncPushFn(redisAsyncContext *ac, void *reply); See #825 --- .gitignore | 1 + .travis.yml | 3 + Makefile | 6 +- README.md | 78 +++++++++++++++-- async.c | 64 +++++++++++++- async.h | 4 + examples/CMakeLists.txt | 3 + examples/example-push.c | 118 +++++++++++++++++++++++++ hiredis.c | 46 +++++++++- hiredis.h | 23 ++++- test.c | 189 +++++++++++++++++++++++++++++++++++++--- 11 files changed, 505 insertions(+), 30 deletions(-) create mode 100644 examples/example-push.c diff --git a/.gitignore b/.gitignore index 8e50b54..056959f 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,4 @@ /*.a /*.pc *.dSYM +tags diff --git a/.travis.yml b/.travis.yml index 792d175..d0a551a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -24,6 +24,8 @@ before_script: addons: apt: + sources: + - sourceline: 'ppa:chris-lea/redis-server' packages: - libc6-dbg - libc6-dev @@ -35,6 +37,7 @@ addons: - libssl-dev - libssl-dev:i386 - valgrind + - redis env: - BITS="32" diff --git a/Makefile b/Makefile index f19e760..a8d37a2 100644 --- a/Makefile +++ b/Makefile @@ -5,7 +5,7 @@ OBJ=alloc.o net.o hiredis.o sds.o async.o read.o sockcompat.o SSL_OBJ=ssl.o -EXAMPLES=hiredis-example hiredis-example-libevent hiredis-example-libev hiredis-example-glib +EXAMPLES=hiredis-example hiredis-example-libevent hiredis-example-libev hiredis-example-glib hiredis-example-push ifeq ($(USE_SSL),1) EXAMPLES+=hiredis-example-ssl hiredis-example-libevent-ssl endif @@ -161,6 +161,7 @@ hiredis-example-macosx: examples/example-macosx.c adapters/macosx.h $(STLIBNAME) hiredis-example-ssl: examples/example-ssl.c $(STLIBNAME) $(SSL_STLIBNAME) $(CC) -o examples/$@ $(REAL_CFLAGS) -I. $< $(STLIBNAME) $(SSL_STLIBNAME) $(REAL_LDFLAGS) $(SSL_LDFLAGS) + ifndef AE_DIR hiredis-example-ae: @echo "Please specify AE_DIR (e.g. /src)" @@ -195,6 +196,9 @@ endif hiredis-example: examples/example.c $(STLIBNAME) $(CC) -o examples/$@ $(REAL_CFLAGS) -I. $< $(STLIBNAME) $(REAL_LDFLAGS) +hiredis-example-push: examples/example-push.c $(STLIBNAME) + $(CC) -o examples/$@ $(REAL_CFLAGS) -I. $< $(STLIBNAME) $(REAL_LDFLAGS) + examples: $(EXAMPLES) TEST_LIBS = $(STLIBNAME) diff --git a/README.md b/README.md index 3fc3418..c012f1a 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ [![Build Status](https://travis-ci.org/redis/hiredis.png)](https://travis-ci.org/redis/hiredis) -**This Readme reflects the latest changed in the master branch. See [v0.13.3](https://github.com/redis/hiredis/tree/v0.13.3) for the Readme and documentation for the latest release ([API/ABI history](https://abi-laboratory.pro/?view=timeline&l=hiredis)).** +**This Readme reflects the latest changed in the master branch. See [v0.14.1](https://github.com/redis/hiredis/tree/v0.14.1) for the Readme and documentation for the latest release ([API/ABI history](https://abi-laboratory.pro/?view=timeline&l=hiredis)).** # HIREDIS @@ -500,12 +500,75 @@ if (redisInitiateSSLWithContext(c, ssl) != REDIS_OK) { } ``` +## RESP3 PUSH replies +Redis 6.0 introduced PUSH replies with the reply-type `>`. These messages are generated spontaneously and can arrive at any time, so must be handled using callbacks. + +### Default behavior +Hiredis installs handlers on `redisContext` and `redisAsyncContext` by default, which will intercept and free any PUSH replies detected. This means existing code will work as-is after upgrading to Redis 6 and switching to `RESP3`. + +### Custom PUSH handler prototypes +The callback prototypes differ between `redisContext` and `redisAsyncContext`. + +#### redisContext +```c +void my_push_handler(void *privdata, void *reply) { + /* Handle the reply */ + + /* Note: We need to free the reply in our custom handler for + blocking contexts. This lets us keep the reply if + we want. */ + freeReplyObject(reply); +} +``` + +#### redisAsyncContext +```c +void my_async_push_handler(redisAsyncContext *ac, void *reply) { + /* Handle the reply */ + + /* Note: Because async hiredis always frees replies, you should + not call freeReplyObject in an async push callback. */ +} +``` + +### Installing a custom handler +There are two ways to set your own PUSH handlers. + +1. Set `push_cb` or `async_push_cb` in the `redisOptions` struct and connect with `redisConnectWithOptions` or `redisAsyncConnectWithOptions`. + ```c + redisOptions = {0}; + REDIS_OPTIONS_SET_TCP(&options, "127.0.0.1", 6379); + options->push_cb = my_push_handler; + redisContext *context = redisConnectWithOptions(&options); + ``` +2. Call `redisSetPushCallback` or `redisAsyncSetPushCallback` on a connected context. + ```c + redisContext *context = redisConnect("127.0.0.1", 6379); + redisSetPushCallback(context, my_push_handler); + ``` + + _Note `redisSetPushCallback` and `redisAsyncSetPushCallback` both return any currently configured handler, making it easy to override and then return to the old value._ + +### Specifying no handler +If you have a unique use-case where you don't want hiredis to automatically intercept and free PUSH replies, you will want to configure no handler at all. This can be done in two ways. +1. Set the `REDIS_OPT_NO_PUSH_AUTOFREE` flag in `redisOptions` and leave the callback function pointer `NULL`. + ```c + redisOptions = {0}; + REDIS_OPTIONS_SET_TCP(&options, "127.0.0.1", 6379); + options->options |= REDIS_OPT_NO_PUSH_AUTOFREE; + redisContext *context = redisConnectWithOptions(&options); + ``` +3. Call `redisSetPushCallback` with `NULL` once connected. + ```c + redisContext *context = redisConnect("127.0.0.1", 6379); + redisSetPushCallback(context, NULL); + ``` + + _Note: With no handler configured, calls to `redisCommand` may generate more than one reply, so this strategy is only applicable when there's some kind of blocking`redisGetReply()` loop (e.g. `MONITOR` or `SUBSCRIBE` workloads)._ + ## Allocator injection -Hiredis uses a pass-thru structure of function pointers defined in -[alloc.h](https://github.com/redis/hiredis/blob/f5d25850/alloc.h#L41) that conttain -the currently configured allocation and deallocation functions. By default they -just point to libc (`malloc`, `calloc`, `realloc`, etc). +Hiredis uses a pass-thru structure of function pointers defined in [alloc.h](https://github.com/redis/hiredis/blob/f5d25850/alloc.h#L41) that contain the currently configured allocation and deallocation functions. By default they just point to libc (`malloc`, `calloc`, `realloc`, etc). ### Overriding @@ -532,5 +595,6 @@ hiredisResetAllocators(); ## AUTHORS -Hiredis was written by Salvatore Sanfilippo (antirez at gmail) and -Pieter Noordhuis (pcnoordhuis at gmail) and is released under the BSD license. +Hiredis was written by Salvatore Sanfilippo (antirez at gmail), +Pieter Noordhuis (pcnoordhuis at gmail), and Michael Grunder +(michael dot grunder at gmail) and is released under the BSD license. diff --git a/async.c b/async.c index 2cff0b1..1073d8d 100644 --- a/async.c +++ b/async.c @@ -167,16 +167,26 @@ redisAsyncContext *redisAsyncConnectWithOptions(const redisOptions *options) { redisContext *c; redisAsyncContext *ac; + /* Clear any erroneously set sync callback and flag that we don't want to + * use freeReplyObject by default. */ + myOptions.push_cb = NULL; + myOptions.options |= REDIS_OPT_NO_PUSH_AUTOFREE; + myOptions.options |= REDIS_OPT_NONBLOCK; c = redisConnectWithOptions(&myOptions); if (c == NULL) { return NULL; } + ac = redisAsyncInitialize(c); if (ac == NULL) { redisFree(c); return NULL; } + + /* Set any configured async push handler */ + redisAsyncSetPushCallback(ac, myOptions.async_push_cb); + __redisAsyncCopyError(ac); return ac; } @@ -279,6 +289,14 @@ static void __redisRunCallback(redisAsyncContext *ac, redisCallback *cb, redisRe } } +static void __redisRunPushCallback(redisAsyncContext *ac, redisReply *reply) { + if (ac->push_cb != NULL) { + ac->c.flags |= REDIS_IN_CALLBACK; + ac->push_cb(ac, reply); + ac->c.flags &= ~REDIS_IN_CALLBACK; + } +} + /* Helper function to free the context. */ static void __redisAsyncFree(redisAsyncContext *ac) { redisContext *c = &(ac->c); @@ -294,7 +312,7 @@ static void __redisAsyncFree(redisAsyncContext *ac) { while (__redisShiftCallback(&ac->sub.invalid,&cb) == REDIS_OK) __redisRunCallback(ac,&cb,NULL); - /* Run subscription callbacks callbacks with NULL reply */ + /* Run subscription callbacks with NULL reply */ if (ac->sub.channels) { it = dictGetIterator(ac->sub.channels); if (it != NULL) { @@ -459,6 +477,30 @@ oom: return REDIS_ERR; } +#define redisIsSpontaneousPushReply(r) \ + (redisIsPushReply(r) && !redisIsSubscribeReply(r)) + +static int redisIsSubscribeReply(redisReply *reply) { + char *str; + size_t len, off; + + /* We will always have at least one string with the subscribe/message type */ + if (reply->elements < 1 || reply->element[0]->type != REDIS_REPLY_STRING || + reply->element[0]->len < sizeof("message") - 1) + { + return 0; + } + + /* Get the string/len moving past 'p' if needed */ + off = tolower(reply->element[0]->str[0]) == 'p'; + str = reply->element[0]->str + off; + len = reply->element[0]->len - off; + + return !strncasecmp(str, "subscribe", len) || + !strncasecmp(str, "message", len); + +} + void redisProcessCallbacks(redisAsyncContext *ac) { redisContext *c = &(ac->c); redisCallback cb = {NULL, NULL, 0, NULL}; @@ -485,8 +527,18 @@ void redisProcessCallbacks(redisAsyncContext *ac) { break; } - /* Even if the context is subscribed, pending regular callbacks will - * get a reply before pub/sub messages arrive. */ + /* Send any non-subscribe related PUSH messages to our PUSH handler + * while allowing subscribe related PUSH messages to pass through. + * This allows existing code to be backward compatible and work in + * either RESP2 or RESP3 mode. */ + if (redisIsSpontaneousPushReply(reply)) { + __redisRunPushCallback(ac, reply); + c->reader->fn->freeObject(reply); + continue; + } + + /* Even if the context is subscribed, pending regular + * callbacks will get a reply before pub/sub messages arrive. */ if (__redisShiftCallback(&ac->replies,&cb) != REDIS_OK) { /* * A spontaneous reply in a not-subscribed context can be the error @@ -809,6 +861,12 @@ int redisAsyncFormattedCommand(redisAsyncContext *ac, redisCallbackFn *fn, void return status; } +redisAsyncPushFn *redisAsyncSetPushCallback(redisAsyncContext *ac, redisAsyncPushFn *fn) { + redisAsyncPushFn *old = ac->push_cb; + ac->push_cb = fn; + return old; +} + int redisAsyncSetTimeout(redisAsyncContext *ac, struct timeval tv) { if (!ac->c.timeout) { ac->c.timeout = hi_calloc(1, sizeof(tv)); diff --git a/async.h b/async.h index ba301da..b1d2cb2 100644 --- a/async.h +++ b/async.h @@ -106,6 +106,9 @@ typedef struct redisAsyncContext { struct dict *channels; struct dict *patterns; } sub; + + /* Any configured RESP3 PUSH handler */ + redisAsyncPushFn *push_cb; } redisAsyncContext; /* Functions that proxy to hiredis */ @@ -118,6 +121,7 @@ redisAsyncContext *redisAsyncConnectUnix(const char *path); int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn); int redisAsyncSetDisconnectCallback(redisAsyncContext *ac, redisDisconnectCallback *fn); +redisAsyncPushFn *redisAsyncSetPushCallback(redisAsyncContext *ac, redisAsyncPushFn *fn); int redisAsyncSetTimeout(redisAsyncContext *ac, struct timeval tv); void redisAsyncDisconnect(redisAsyncContext *ac); void redisAsyncFree(redisAsyncContext *ac); diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index dd3a313..1d5bc56 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -44,3 +44,6 @@ ENDIF() ADD_EXECUTABLE(example example.c) TARGET_LINK_LIBRARIES(example hiredis) + +ADD_EXECUTABLE(example-push example-push.c) +TARGET_LINK_LIBRARIES(example-push hiredis) diff --git a/examples/example-push.c b/examples/example-push.c new file mode 100644 index 0000000..d8b41f9 --- /dev/null +++ b/examples/example-push.c @@ -0,0 +1,118 @@ +#include +#include +#include +#include +#include + +#define KEY_COUNT 5 + +#define panicAbort(fmt, ...) \ + do { \ + fprintf(stderr, "%s:%d:%s(): " fmt, __FILE__, __LINE__, __func__, __VA_ARGS__); \ + exit(-1); \ + } while (0) + +static void assertReplyAndFree(redisContext *context, redisReply *reply, int type) { + if (reply == NULL) + panicAbort("NULL reply from server (error: %s)", context->errstr); + + if (reply->type != type) { + if (reply->type == REDIS_REPLY_ERROR) + fprintf(stderr, "Redis Error: %s\n", reply->str); + + panicAbort("Expected reply type %d but got type %d", type, reply->type); + } + + freeReplyObject(reply); +} + +/* Switch to the RESP3 protocol and enable client tracking */ +static void enableClientTracking(redisContext *c) { + redisReply *reply = redisCommand(c, "HELLO 3"); + if (reply == NULL || c->err) { + panicAbort("NULL reply or server error (error: %s)", c->errstr); + } + + if (reply->type != REDIS_REPLY_MAP) { + fprintf(stderr, "Error: Can't send HELLO 3 command. Are you sure you're "); + fprintf(stderr, "connected to redis-server >= 6.0.0?\nRedis error: %s\n", + reply->type == REDIS_REPLY_ERROR ? reply->str : "(unknown)"); + exit(-1); + } + + freeReplyObject(reply); + + /* Enable client tracking */ + reply = redisCommand(c, "CLIENT TRACKING ON"); + assertReplyAndFree(c, reply, REDIS_REPLY_STATUS); +} + +void pushReplyHandler(void *privdata, void *r) { + redisReply *reply = r; + int *invalidations = privdata; + + /* Sanity check on the invalidation reply */ + if (reply->type != REDIS_REPLY_PUSH || reply->elements != 2 || + reply->element[1]->type != REDIS_REPLY_ARRAY || + reply->element[1]->element[0]->type != REDIS_REPLY_STRING) + { + panicAbort("%s", "Can't parse PUSH message!"); + } + + /* Increment our invalidation count */ + *invalidations += 1; + + printf("pushReplyHandler(): INVALIDATE '%s' (invalidation count: %d)\n", + reply->element[1]->element[0]->str, *invalidations); + + freeReplyObject(reply); +} + +int main(int argc, char **argv) { + unsigned int j, invalidations = 0; + redisContext *c; + redisReply *reply; + + const char *hostname = (argc > 1) ? argv[1] : "127.0.0.1"; + int port = (argc > 2) ? atoi(argv[2]) : 6379; + + redisOptions o = {0}; + REDIS_OPTIONS_SET_TCP(&o, hostname, port); + + /* Set our custom PUSH message handler */ + o.push_cb = pushReplyHandler; + + c = redisConnectWithOptions(&o); + if (c == NULL || c->err) + panicAbort("Connection error: %s", c ? c->errstr : "OOM"); + + /* Enable RESP3 and turn on client tracking */ + enableClientTracking(c); + + /* Set our context privdata to the address of our invalidation counter. Each + * time our PUSH handler is called, hiredis will pass the privdata for context */ + c->privdata = &invalidations; + + /* Set some keys and then read them back. Once we do that, Redis will deliver + * invalidation push messages whenever the key is modified */ + for (j = 0; j < KEY_COUNT; j++) { + reply = redisCommand(c, "SET key:%d initial:%d", j, j); + assertReplyAndFree(c, reply, REDIS_REPLY_STATUS); + + reply = redisCommand(c, "GET key:%d", j); + assertReplyAndFree(c, reply, REDIS_REPLY_STRING); + } + + /* Trigger invalidation messages by updating keys we just read */ + for (j = 0; j < KEY_COUNT; j++) { + printf(" main(): SET key:%d update:%d\n", j, j); + reply = redisCommand(c, "SET key:%d update:%d", j, j); + assertReplyAndFree(c, reply, REDIS_REPLY_STATUS); + printf(" main(): SET REPLY OK\n"); + } + + printf("\nTotal detected invalidations: %d, expected: %d\n", invalidations, KEY_COUNT); + + /* PING server */ + redisFree(c); +} diff --git a/hiredis.c b/hiredis.c index 7eaa991..e9761ec 100644 --- a/hiredis.c +++ b/hiredis.c @@ -184,7 +184,8 @@ static void *createArrayObject(const redisReadTask *task, size_t elements) { parent = task->parent->obj; assert(parent->type == REDIS_REPLY_ARRAY || parent->type == REDIS_REPLY_MAP || - parent->type == REDIS_REPLY_SET); + parent->type == REDIS_REPLY_SET || + parent->type == REDIS_REPLY_PUSH); parent->element[task->idx] = r; } return r; @@ -679,6 +680,11 @@ redisReader *redisReaderCreate(void) { return redisReaderCreateWithFunctions(&defaultFunctions); } +static void redisPushAutoFree(void *privdata, void *reply) { + (void)privdata; + freeReplyObject(reply); +} + static redisContext *redisContextInit(const redisOptions *options) { redisContext *c; @@ -687,6 +693,14 @@ static redisContext *redisContextInit(const redisOptions *options) { return NULL; c->funcs = &redisContextDefaultFuncs; + + /* Set any user supplied RESP3 PUSH handler or use freeReplyObject + * as a default unless specifically flagged that we don't want one. */ + if (options->push_cb != NULL) + redisSetPushCallback(c, options->push_cb); + else if (!(options->options & REDIS_OPT_NO_PUSH_AUTOFREE)) + redisSetPushCallback(c, redisPushAutoFree); + c->obuf = sdsempty(); c->reader = redisReaderCreate(); c->fd = REDIS_INVALID_FD; @@ -773,7 +787,7 @@ redisContext *redisConnectWithOptions(const redisOptions *options) { c->flags |= REDIS_REUSEADDR; } if (options->options & REDIS_OPT_NOAUTOFREE) { - c->flags |= REDIS_NO_AUTO_FREE; + c->flags |= REDIS_NO_AUTO_FREE; } if (options->type == REDIS_CONN_TCP) { @@ -876,6 +890,13 @@ int redisEnableKeepAlive(redisContext *c) { return REDIS_OK; } +/* Set a user provided RESP3 PUSH handler and return any old one set. */ +redisPushFn *redisSetPushCallback(redisContext *c, redisPushFn *fn) { + redisPushFn *old = c->push_cb; + c->push_cb = fn; + return old; +} + /* Use this function to handle a read event on the descriptor. It will try * and read some bytes from the socket and feed them to the reply parser. * @@ -947,9 +968,21 @@ int redisGetReplyFromReader(redisContext *c, void **reply) { __redisSetError(c,c->reader->err,c->reader->errstr); return REDIS_ERR; } + return REDIS_OK; } +/* Internal helper that returns 1 if the reply was a RESP3 PUSH + * message and we handled it with a user-provided callback. */ +static int redisHandledPushReply(redisContext *c, void *reply) { + if (reply && c->push_cb && redisIsPushReply(reply)) { + c->push_cb(c->privdata, reply); + return 1; + } + + return 0; +} + int redisGetReply(redisContext *c, void **reply) { int wdone = 0; void *aux = NULL; @@ -970,8 +1003,13 @@ int redisGetReply(redisContext *c, void **reply) { do { if (redisBufferRead(c) == REDIS_ERR) return REDIS_ERR; - if (redisGetReplyFromReader(c,&aux) == REDIS_ERR) - return REDIS_ERR; + + /* We loop here in case the user has specified a RESP3 + * PUSH handler (e.g. for client tracking). */ + do { + if (redisGetReplyFromReader(c,&aux) == REDIS_ERR) + return REDIS_ERR; + } while (redisHandledPushReply(c, aux)); } while (aux == NULL); } diff --git a/hiredis.h b/hiredis.h index bf10509..20e9bfa 100644 --- a/hiredis.h +++ b/hiredis.h @@ -92,6 +92,15 @@ typedef long long ssize_t; * SO_REUSEADDR is being used. */ #define REDIS_CONNECT_RETRIES 10 +/* Forward declarations for structs defined elsewhere */ +struct redisAsyncContext; +struct redisContext; + +/* RESP3 push helpers and callback prototypes */ +#define redisIsPushReply(r) (((redisReply*)(r))->type == REDIS_REPLY_PUSH) +typedef void (redisPushFn)(void *, void *); +typedef void (redisAsyncPushFn)(struct redisAsyncContext *, void *); + #ifdef __cplusplus extern "C" { #endif @@ -140,6 +149,9 @@ struct redisSsl; */ #define REDIS_OPT_NOAUTOFREE 0x04 +/* Don't automatically intercept and free RESP3 PUSH replies. */ +#define REDIS_OPT_NO_PUSH_AUTOFREE 0x08 + /* In Unix systems a file descriptor is a regular signed int, with -1 * representing an invalid descriptor. In Windows it is a SOCKET * (32- or 64-bit unsigned integer depending on the architecture), where @@ -180,6 +192,10 @@ typedef struct { * file descriptor */ redisFD fd; } endpoint; + + /* A user defined PUSH message callback */ + redisPushFn *push_cb; + redisAsyncPushFn *async_push_cb; } redisOptions; /** @@ -194,9 +210,6 @@ typedef struct { (opts)->type = REDIS_CONN_UNIX; \ (opts)->endpoint.unix_socket = path; -struct redisAsyncContext; -struct redisContext; - typedef struct redisContextFuncs { void (*free_privdata)(void *); void (*async_read)(struct redisAsyncContext *); @@ -235,6 +248,9 @@ typedef struct redisContext { /* Additional private data for hiredis addons such as SSL */ void *privdata; + + /* An optional RESP3 PUSH handler */ + redisPushFn *push_cb; } redisContext; redisContext *redisConnectWithOptions(const redisOptions *options); @@ -261,6 +277,7 @@ redisContext *redisConnectFd(redisFD fd); */ int redisReconnect(redisContext *c); +redisPushFn *redisSetPushCallback(redisContext *c, redisPushFn *fn); int redisSetTimeout(redisContext *c, const struct timeval tv); int redisEnableKeepAlive(redisContext *c); void redisFree(redisContext *c); diff --git a/test.c b/test.c index 2aef0d5..2216ea3 100644 --- a/test.c +++ b/test.c @@ -13,6 +13,7 @@ #include #include "hiredis.h" +#include "async.h" #ifdef HIREDIS_TEST_SSL #include "hiredis_ssl.h" #endif @@ -77,6 +78,43 @@ static long long usec(void) { #define assert(e) (void)(e) #endif +/* Helper to extract Redis version information. Aborts on any failure. */ +#define REDIS_VERSION_FIELD "redis_version:" +void get_redis_version(redisContext *c, int *majorptr, int *minorptr) { + redisReply *reply; + char *eptr, *s, *e; + int major, minor; + + reply = redisCommand(c, "INFO"); + if (reply == NULL || c->err || reply->type != REDIS_REPLY_STRING) + goto abort; + if ((s = strstr(reply->str, REDIS_VERSION_FIELD)) == NULL) + goto abort; + + s += strlen(REDIS_VERSION_FIELD); + + /* We need a field terminator and at least 'x.y.z' (5) bytes of data */ + if ((e = strstr(s, "\r\n")) == NULL || (e - s) < 5) + goto abort; + + /* Extract version info */ + major = strtol(s, &eptr, 10); + if (*eptr != '.') goto abort; + minor = strtol(eptr+1, NULL, 10); + + /* Push info the caller wants */ + if (majorptr) *majorptr = major; + if (minorptr) *minorptr = minor; + + freeReplyObject(reply); + return; + +abort: + freeReplyObject(reply); + fprintf(stderr, "Error: Cannot determine Redis version, aborting\n"); + exit(1); +} + static redisContext *select_database(redisContext *c) { redisReply *reply; @@ -99,6 +137,26 @@ static redisContext *select_database(redisContext *c) { return c; } +/* Switch protocol */ +static void send_hello(redisContext *c, int version) { + redisReply *reply; + int expected; + + reply = redisCommand(c, "HELLO %d", version); + expected = version == 3 ? REDIS_REPLY_MAP : REDIS_REPLY_ARRAY; + assert(reply != NULL && reply->type == expected); + freeReplyObject(reply); +} + +/* Togggle client tracking */ +static void send_client_tracking(redisContext *c, const char *str) { + redisReply *reply; + + reply = redisCommand(c, "CLIENT TRACKING %s", str); + assert(reply != NULL && reply->type == REDIS_REPLY_STATUS); + freeReplyObject(reply); +} + static int disconnect(redisContext *c, int keep_fd) { redisReply *reply; @@ -615,9 +673,123 @@ static void test_blocking_connection_errors(void) { #endif } +/* Dummy push handler */ +void push_handler(void *privdata, void *reply) { + int *counter = privdata; + freeReplyObject(reply); + *counter += 1; +} + +/* Dummy function just to test setting a callback with redisOptions */ +void push_handler_async(redisAsyncContext *ac, void *reply) { + (void)ac; + (void)reply; +} + +static void test_resp3_push_handler(redisContext *c) { + redisPushFn *old = NULL; + redisReply *reply; + void *privdata; + int n = 0; + + /* Switch to RESP3 and turn on client tracking */ + send_hello(c, 3); + send_client_tracking(c, "ON"); + privdata = c->privdata; + c->privdata = &n; + + reply = redisCommand(c, "GET key:0"); + assert(reply != NULL); + freeReplyObject(reply); + + test("RESP3 PUSH messages are handled out of band by default: "); + reply = redisCommand(c, "SET key:0 val:0"); + test_cond(reply != NULL && reply->type == REDIS_REPLY_STATUS); + freeReplyObject(reply); + + assert((reply = redisCommand(c, "GET key:0")) != NULL); + freeReplyObject(reply); + + old = redisSetPushCallback(c, push_handler); + test("We can set a custom RESP3 PUSH handler: "); + reply = redisCommand(c, "SET key:0 val:0"); + test_cond(reply != NULL && reply->type == REDIS_REPLY_STATUS && n == 1); + freeReplyObject(reply); + + /* Unset the push callback and generate an invalidate message making + * sure it is not handled out of band. */ + test("With no handler, PUSH replies come in-band: "); + redisSetPushCallback(c, NULL); + assert((reply = redisCommand(c, "GET key:0")) != NULL); + freeReplyObject(reply); + assert((reply = redisCommand(c, "SET key:0 invalid")) != NULL); + test_cond(reply->type == REDIS_REPLY_PUSH); + freeReplyObject(reply); + + test("With no PUSH handler, no replies are lost: "); + assert(redisGetReply(c, (void**)&reply) == REDIS_OK); + test_cond(reply != NULL && reply->type == REDIS_REPLY_STATUS); + freeReplyObject(reply); + + /* Return to the originally set PUSH handler */ + assert(old != NULL); + redisSetPushCallback(c, old); + + /* Switch back to RESP2 and disable tracking */ + c->privdata = privdata; + send_client_tracking(c, "OFF"); + send_hello(c, 2); +} + +redisOptions get_redis_tcp_options(struct config config) { + redisOptions options = {0}; + REDIS_OPTIONS_SET_TCP(&options, config.tcp.host, config.tcp.port); + return options; +} + +static void test_resp3_push_options(struct config config) { + redisAsyncContext *ac; + redisContext *c; + redisOptions options; + + test("We set a default RESP3 handler for redisContext: "); + options = get_redis_tcp_options(config); + assert((c = redisConnectWithOptions(&options)) != NULL); + test_cond(c->push_cb != NULL); + redisFree(c); + + test("We don't set a default RESP3 push handler for redisAsyncContext: "); + options = get_redis_tcp_options(config); + assert((ac = redisAsyncConnectWithOptions(&options)) != NULL); + test_cond(ac->c.push_cb == NULL); + redisAsyncFree(ac); + + test("Our REDIS_OPT_NO_PUSH_AUTOFREE flag works: "); + options = get_redis_tcp_options(config); + options.options |= REDIS_OPT_NO_PUSH_AUTOFREE; + assert((c = redisConnectWithOptions(&options)) != NULL); + test_cond(c->push_cb == NULL); + redisFree(c); + + test("We can use redisOptions to set a custom PUSH handler for redisContext: "); + options = get_redis_tcp_options(config); + options.push_cb = push_handler; + assert((c = redisConnectWithOptions(&options)) != NULL); + test_cond(c->push_cb == push_handler); + redisFree(c); + + test("We can use redisOptions to set a custom PUSH handler for redisAsyncContext: "); + options = get_redis_tcp_options(config); + options.async_push_cb = push_handler_async; + assert((ac = redisAsyncConnectWithOptions(&options)) != NULL); + test_cond(ac->push_cb == push_handler_async); + redisAsyncFree(ac); +} + static void test_blocking_connection(struct config config) { redisContext *c; redisReply *reply; + int major; c = do_connect(config); @@ -695,6 +867,10 @@ static void test_blocking_connection(struct config config) { assert(redisAppendCommand(c, "PING") == REDIS_OK); test_cond(redisGetReply(c, NULL) == REDIS_OK); + get_redis_version(c, &major, NULL); + if (major >= 6) test_resp3_push_handler(c); + test_resp3_push_options(config); + disconnect(c, 0); } @@ -780,18 +956,7 @@ static void test_blocking_io_errors(struct config config) { /* Connect to target given by config. */ c = do_connect(config); - { - /* Find out Redis version to determine the path for the next test */ - const char *field = "redis_version:"; - char *p, *eptr; - - reply = redisCommand(c,"INFO"); - p = strstr(reply->str,field); - major = strtol(p+strlen(field),&eptr,10); - p = eptr+1; /* char next to the first "." */ - minor = strtol(p,&eptr,10); - freeReplyObject(reply); - } + get_redis_version(c, &major, &minor); test("Returns I/O error when the connection is lost: "); reply = redisCommand(c,"QUIT");