Support PING while subscribing (RESP2) (#1027)

* Handle PING during pubsub in RESP2

* Rename invalid callback list

Some commands are valid to send during a subscribe in RESP2, and
most in RESP3. Renaming the callback list from `invalid` to `replies`
to detail this fact.

* Fix review comment
master
Bjorn Svensson 2021-12-16 21:38:15 +01:00 committed by GitHub
parent e3a479e409
commit d3384260e7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 25 additions and 16 deletions

25
async.c
View File

@ -144,8 +144,8 @@ static redisAsyncContext *redisAsyncInitialize(redisContext *c) {
ac->replies.head = NULL;
ac->replies.tail = NULL;
ac->sub.invalid.head = NULL;
ac->sub.invalid.tail = NULL;
ac->sub.replies.head = NULL;
ac->sub.replies.tail = NULL;
ac->sub.channels = channels;
ac->sub.patterns = patterns;
@ -312,9 +312,7 @@ static void __redisAsyncFree(redisAsyncContext *ac) {
/* Execute pending callbacks with NULL reply. */
while (__redisShiftCallback(&ac->replies,&cb) == REDIS_OK)
__redisRunCallback(ac,&cb,NULL);
/* Execute callbacks for invalid commands */
while (__redisShiftCallback(&ac->sub.invalid,&cb) == REDIS_OK)
while (__redisShiftCallback(&ac->sub.replies,&cb) == REDIS_OK)
__redisRunCallback(ac,&cb,NULL);
/* Run subscription callbacks with NULL reply */
@ -419,10 +417,11 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply,
char *stype;
sds sname;
/* Custom reply functions are not supported for pub/sub. This will fail
* very hard when they are used... */
if (reply->type == REDIS_REPLY_ARRAY || reply->type == REDIS_REPLY_PUSH) {
assert(reply->elements >= 2);
/* Match reply with the expected format of a pushed message.
* The type and number of elements (3 to 4) are specified at:
* https://redis.io/topics/pubsub#format-of-pushed-messages */
if ((reply->type == REDIS_REPLY_ARRAY && reply->elements >= 3) ||
reply->type == REDIS_REPLY_PUSH) {
assert(reply->element[0]->type == REDIS_REPLY_STRING);
stype = reply->element[0]->str;
pvariant = (tolower(stype[0]) == 'p') ? 1 : 0;
@ -467,8 +466,8 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply,
}
sdsfree(sname);
} else {
/* Shift callback for invalid commands. */
__redisShiftCallback(&ac->sub.invalid,dstcb);
/* Shift callback for pending command in subscribed context. */
__redisShiftCallback(&ac->sub.replies,dstcb);
}
return REDIS_OK;
oom:
@ -814,9 +813,7 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void
goto oom;
} else {
if (c->flags & REDIS_SUBSCRIBED) {
/* This will likely result in an error reply, but it needs to be
* received and passed to the callback. */
if (__redisPushCallback(&ac->sub.invalid,&cb) != REDIS_OK)
if (__redisPushCallback(&ac->sub.replies,&cb) != REDIS_OK)
goto oom;
} else {
if (__redisPushCallback(&ac->replies,&cb) != REDIS_OK)

View File

@ -102,7 +102,7 @@ typedef struct redisAsyncContext {
/* Subscription callbacks */
struct {
redisCallbackList invalid;
redisCallbackList replies;
struct dict *channels;
struct dict *patterns;
} sub;

14
test.c
View File

@ -1518,6 +1518,15 @@ void subscribe_cb(redisAsyncContext *ac, void *r, void *privdata) {
}
}
/* Expect a reply of type ARRAY */
void array_cb(redisAsyncContext *ac, void *r, void *privdata) {
(void) ac;
redisReply *reply = r;
TestState *state = privdata;
assert(reply != NULL && reply->type == REDIS_REPLY_ARRAY);
state->checkpoint++;
}
static void test_pubsub_handling(struct config config) {
test("Subscribe, handle published message and unsubscribe: ");
/* Setup event dispatcher with a testcase timeout */
@ -1539,13 +1548,16 @@ static void test_pubsub_handling(struct config config) {
TestState state = {.options = &options};
redisAsyncCommand(ac,subscribe_cb,&state,"subscribe mychannel");
/* Make sure non-subscribe commands are handled */
redisAsyncCommand(ac,array_cb,&state,"PING");
/* Start event dispatching loop */
test_cond(event_base_dispatch(base) == 0);
event_free(timeout);
event_base_free(base);
/* Verify test checkpoints */
assert(state.checkpoint == 1);
assert(state.checkpoint == 2);
}
/* Unexpected push message, will trigger a failure */