Correction for command timeout during pubsub (#1038)
* Add test of command timeout during pubsub A timeout of a non-subscribe command will be ignored during pubsub. It will be handled as an idle timeout and a response is awaited for. * Correction for command timeout during pubsub Disconnect when a sent non-subscribe command triggers a timeout.
This commit is contained in:
parent
24d5344934
commit
ff860e55db
2
async.c
2
async.c
@ -696,7 +696,7 @@ void redisAsyncHandleTimeout(redisAsyncContext *ac) {
|
||||
redisCallback cb;
|
||||
|
||||
if ((c->flags & REDIS_CONNECTED)) {
|
||||
if ( ac->replies.head == NULL) {
|
||||
if (ac->replies.head == NULL && ac->sub.replies.head == NULL) {
|
||||
/* Nothing to do - just an idle timeout */
|
||||
return;
|
||||
}
|
||||
|
95
test.c
95
test.c
@ -1527,6 +1527,14 @@ void array_cb(redisAsyncContext *ac, void *r, void *privdata) {
|
||||
state->checkpoint++;
|
||||
}
|
||||
|
||||
/* Expect a NULL reply */
|
||||
void null_cb(redisAsyncContext *ac, void *r, void *privdata) {
|
||||
(void) ac;
|
||||
assert(r == NULL);
|
||||
TestState *state = privdata;
|
||||
state->checkpoint++;
|
||||
}
|
||||
|
||||
static void test_pubsub_handling(struct config config) {
|
||||
test("Subscribe, handle published message and unsubscribe: ");
|
||||
/* Setup event dispatcher with a testcase timeout */
|
||||
@ -1618,7 +1626,85 @@ static void test_pubsub_handling_resp3(struct config config) {
|
||||
/* Verify test checkpoints */
|
||||
assert(state.checkpoint == 5);
|
||||
}
|
||||
#endif
|
||||
|
||||
/* Subscribe callback for test_command_timeout_during_pubsub:
|
||||
* - a subscribe response triggers a published message
|
||||
* - the published message triggers a command that times out
|
||||
* - the command timeout triggers a disconnect */
|
||||
void subscribe_with_timeout_cb(redisAsyncContext *ac, void *r, void *privdata) {
|
||||
redisReply *reply = r;
|
||||
TestState *state = privdata;
|
||||
|
||||
/* The non-clean disconnect should trigger the
|
||||
* subscription callback with a NULL reply. */
|
||||
if (reply == NULL) {
|
||||
state->checkpoint++;
|
||||
event_base_loopbreak(base);
|
||||
return;
|
||||
}
|
||||
|
||||
assert(reply->type == (state->resp3 ? REDIS_REPLY_PUSH : REDIS_REPLY_ARRAY) &&
|
||||
reply->elements == 3);
|
||||
|
||||
if (strcmp(reply->element[0]->str,"subscribe") == 0) {
|
||||
assert(strcmp(reply->element[1]->str,"mychannel") == 0 &&
|
||||
reply->element[2]->str == NULL);
|
||||
publish_msg(state->options,"mychannel","Hello!");
|
||||
state->checkpoint++;
|
||||
} else if (strcmp(reply->element[0]->str,"message") == 0) {
|
||||
assert(strcmp(reply->element[1]->str,"mychannel") == 0 &&
|
||||
strcmp(reply->element[2]->str,"Hello!") == 0);
|
||||
state->checkpoint++;
|
||||
|
||||
/* Send a command that will trigger a timeout */
|
||||
redisAsyncCommand(ac,null_cb,state,"DEBUG SLEEP 3");
|
||||
redisAsyncCommand(ac,null_cb,state,"LPUSH mylist foo");
|
||||
} else {
|
||||
printf("Unexpected pubsub command: %s\n", reply->element[0]->str);
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
static void test_command_timeout_during_pubsub(struct config config) {
|
||||
test("Command timeout during Pub/Sub: ");
|
||||
/* Setup event dispatcher with a testcase timeout */
|
||||
base = event_base_new();
|
||||
struct event *timeout = evtimer_new(base,timeout_cb,NULL);
|
||||
assert(timeout != NULL);
|
||||
|
||||
evtimer_assign(timeout,base,timeout_cb,NULL);
|
||||
struct timeval timeout_tv = {.tv_sec = 10};
|
||||
evtimer_add(timeout,&timeout_tv);
|
||||
|
||||
/* Connect */
|
||||
redisOptions options = get_redis_tcp_options(config);
|
||||
redisAsyncContext *ac = redisAsyncConnectWithOptions(&options);
|
||||
assert(ac != NULL && ac->err == 0);
|
||||
redisLibeventAttach(ac,base);
|
||||
|
||||
/* Configure a command timout */
|
||||
struct timeval command_timeout = {.tv_sec = 2};
|
||||
redisAsyncSetTimeout(ac,command_timeout);
|
||||
|
||||
/* Not expecting any push messages in this test */
|
||||
redisAsyncSetPushCallback(ac,unexpected_push_cb);
|
||||
|
||||
/* Switch protocol */
|
||||
redisAsyncCommand(ac,NULL,NULL,"HELLO 3");
|
||||
|
||||
/* Start subscribe */
|
||||
TestState state = {.options = &options, .resp3 = 1};
|
||||
redisAsyncCommand(ac,subscribe_with_timeout_cb,&state,"subscribe mychannel");
|
||||
|
||||
/* Start event dispatching loop */
|
||||
assert(event_base_dispatch(base) == 0);
|
||||
event_free(timeout);
|
||||
event_base_free(base);
|
||||
|
||||
/* Verify test checkpoints */
|
||||
test_cond(state.checkpoint == 5);
|
||||
}
|
||||
#endif /* HIREDIS_TEST_ASYNC */
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
struct config cfg = {
|
||||
@ -1748,8 +1834,11 @@ int main(int argc, char **argv) {
|
||||
disconnect(c, 0);
|
||||
|
||||
test_pubsub_handling(cfg);
|
||||
if (major >= 6) test_pubsub_handling_resp3(cfg);
|
||||
#endif
|
||||
if (major >= 6) {
|
||||
test_pubsub_handling_resp3(cfg);
|
||||
test_command_timeout_during_pubsub(cfg);
|
||||
}
|
||||
#endif /* HIREDIS_TEST_ASYNC */
|
||||
|
||||
if (test_inherit_fd) {
|
||||
printf("\nTesting against inherited fd (%s): ", cfg.unix_sock.path);
|
||||
|
Loading…
x
Reference in New Issue
Block a user