Allow sending commands after sending an unsubscribe (#1036)

* Add test of async commands after unsubscribe

Verify that commands are handled after unsubscribing from a channel.
A command is sent before the `unsubscribe` response is received,
which currently triggers an assert in async.c:567:

`redisProcessCallbacks: Assertion `(c->flags & REDIS_SUBSCRIBED || c->flags & REDIS_MONITORING)' failed.`

* Handle async commands after an unsubscribe

When unsubscribing from the last channel we move from the `subscribe`
state to a normal state. These states uses different holders for the
command callback information.
By moving the callback info during the state change the callback order
can be maintained.
master
Bjorn Svensson 2022-01-18 19:09:38 +01:00 committed by GitHub
parent ff860e55db
commit f2ce5980e6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 32 additions and 18 deletions

View File

@ -460,8 +460,15 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply,
/* Unset subscribed flag only when no pipelined pending subscribe. */
if (reply->element[2]->integer == 0
&& dictSize(ac->sub.channels) == 0
&& dictSize(ac->sub.patterns) == 0)
&& dictSize(ac->sub.patterns) == 0) {
c->flags &= ~REDIS_SUBSCRIBED;
/* Move ongoing regular command callbacks. */
redisCallback cb;
while (__redisShiftCallback(&ac->sub.replies,&cb) == REDIS_OK) {
__redisPushCallback(&ac->replies,&cb);
}
}
}
}
sdsfree(sname);

41
test.c
View File

@ -1454,8 +1454,15 @@ typedef struct TestState {
redisOptions *options;
int checkpoint;
int resp3;
int disconnect;
} TestState;
/* Helper to disconnect and stop event loop */
void async_disconnect(redisAsyncContext *ac) {
redisAsyncDisconnect(ac);
event_base_loopbreak(base);
}
/* Testcase timeout, will trigger a failure */
void timeout_cb(int fd, short event, void *arg) {
(void) fd; (void) event; (void) arg;
@ -1480,9 +1487,18 @@ void publish_msg(redisOptions *options, const char* channel, const char* msg) {
disconnect(c, 0);
}
/* Expect a reply of type INTEGER */
void integer_cb(redisAsyncContext *ac, void *r, void *privdata) {
redisReply *reply = r;
TestState *state = privdata;
assert(reply != NULL && reply->type == REDIS_REPLY_INTEGER);
state->checkpoint++;
if (state->disconnect) async_disconnect(ac);
}
/* Subscribe callback for test_pubsub_handling and test_pubsub_handling_resp3:
* - a published message triggers an unsubscribe
* - an unsubscribe response triggers a disconnect */
* - a command is sent before the unsubscribe response is received. */
void subscribe_cb(redisAsyncContext *ac, void *r, void *privdata) {
redisReply *reply = r;
TestState *state = privdata;
@ -1505,13 +1521,13 @@ void subscribe_cb(redisAsyncContext *ac, void *r, void *privdata) {
redisAsyncCommand(ac,unexpected_cb,
(void*)"unsubscribe should call subscribe_cb()",
"unsubscribe");
/* Send a regular command after unsubscribing, then disconnect */
state->disconnect = 1;
redisAsyncCommand(ac,integer_cb,state,"LPUSH mylist foo");
} else if (strcmp(reply->element[0]->str,"unsubscribe") == 0) {
assert(strcmp(reply->element[1]->str,"mychannel") == 0 &&
reply->element[2]->str == NULL);
/* Disconnect after unsubscribe */
redisAsyncDisconnect(ac);
event_base_loopbreak(base);
} else {
printf("Unexpected pubsub command: %s\n", reply->element[0]->str);
exit(1);
@ -1520,11 +1536,11 @@ void subscribe_cb(redisAsyncContext *ac, void *r, void *privdata) {
/* Expect a reply of type ARRAY */
void array_cb(redisAsyncContext *ac, void *r, void *privdata) {
(void) ac;
redisReply *reply = r;
TestState *state = privdata;
assert(reply != NULL && reply->type == REDIS_REPLY_ARRAY);
state->checkpoint++;
if (state->disconnect) async_disconnect(ac);
}
/* Expect a NULL reply */
@ -1565,7 +1581,7 @@ static void test_pubsub_handling(struct config config) {
event_base_free(base);
/* Verify test checkpoints */
assert(state.checkpoint == 2);
assert(state.checkpoint == 3);
}
/* Unexpected push message, will trigger a failure */
@ -1575,15 +1591,6 @@ void unexpected_push_cb(redisAsyncContext *ac, void *r) {
exit(1);
}
/* Expect a reply of type INTEGER */
void integer_cb(redisAsyncContext *ac, void *r, void *privdata) {
(void) ac;
redisReply *reply = r;
TestState *state = privdata;
assert(reply != NULL && reply->type == REDIS_REPLY_INTEGER);
state->checkpoint++;
}
static void test_pubsub_handling_resp3(struct config config) {
test("Subscribe, handle published message and unsubscribe using RESP3: ");
/* Setup event dispatcher with a testcase timeout */
@ -1624,7 +1631,7 @@ static void test_pubsub_handling_resp3(struct config config) {
event_base_free(base);
/* Verify test checkpoints */
assert(state.checkpoint == 5);
assert(state.checkpoint == 6);
}
/* Subscribe callback for test_command_timeout_during_pubsub: