Add asynchronous test for pubsub using RESP3 (#1012)

* Include `unsubscribe` as a subscribe reply in RESP3

By providing the (p)unsubscribe message via the subscribe callback,
instead of via the push callback, we get the same behavior in RESP3
as in RESP2.

* Add asynchronous test for pubsub using RESP3

The testcase will subscribe to a channel, and via a second client
a message is published to the channel. After receiving the message
the testcase will unsubscribe and disconnect.

This RESP3 testcase reuses the subscribe callback from the RESP2
testcase to make sure we have a common behavior.
master
Bjorn Svensson 2021-12-01 20:43:23 +01:00 committed by GitHub
parent b5716ee829
commit da5a4ff362
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 56 additions and 4 deletions

View File

@ -496,8 +496,8 @@ static int redisIsSubscribeReply(redisReply *reply) {
len = reply->element[0]->len - off;
return !strncasecmp(str, "subscribe", len) ||
!strncasecmp(str, "message", len);
!strncasecmp(str, "message", len) ||
!strncasecmp(str, "unsubscribe", len);
}
void redisProcessCallbacks(redisAsyncContext *ac) {

56
test.c
View File

@ -1453,6 +1453,7 @@ struct event_base *base;
typedef struct TestState {
redisOptions *options;
int checkpoint;
int resp3;
} TestState;
/* Testcase timeout, will trigger a failure */
@ -1479,7 +1480,7 @@ void publish_msg(redisOptions *options, const char* channel, const char* msg) {
disconnect(c, 0);
}
/* Subscribe callback for test_pubsub_handling:
/* Subscribe callback for test_pubsub_handling and test_pubsub_handling_resp3:
* - a published message triggers an unsubscribe
* - an unsubscribe response triggers a disconnect */
void subscribe_cb(redisAsyncContext *ac, void *r, void *privdata) {
@ -1487,7 +1488,7 @@ void subscribe_cb(redisAsyncContext *ac, void *r, void *privdata) {
TestState *state = privdata;
assert(reply != NULL &&
reply->type == REDIS_REPLY_ARRAY &&
reply->type == (state->resp3 ? REDIS_REPLY_PUSH : REDIS_REPLY_ARRAY) &&
reply->elements == 3);
if (strcmp(reply->element[0]->str,"subscribe") == 0) {
@ -1546,6 +1547,49 @@ static void test_pubsub_handling(struct config config) {
/* Verify test checkpoints */
assert(state.checkpoint == 1);
}
/* Unexpected push message, will trigger a failure */
void unexpected_push_cb(redisAsyncContext *ac, void *r) {
(void) ac; (void) r;
printf("Unexpected call to the PUSH callback!\n");
exit(1);
}
static void test_pubsub_handling_resp3(struct config config) {
test("Subscribe, handle published message and unsubscribe using RESP3: ");
/* Setup event dispatcher with a testcase timeout */
base = event_base_new();
struct event *timeout = evtimer_new(base, timeout_cb, NULL);
assert(timeout != NULL);
evtimer_assign(timeout,base,timeout_cb,NULL);
struct timeval timeout_tv = {.tv_sec = 10};
evtimer_add(timeout, &timeout_tv);
/* Connect */
redisOptions options = get_redis_tcp_options(config);
redisAsyncContext *ac = redisAsyncConnectWithOptions(&options);
assert(ac != NULL && ac->err == 0);
redisLibeventAttach(ac,base);
/* Not expecting any push messages in this test */
redisAsyncSetPushCallback(ac, unexpected_push_cb);
/* Switch protocol */
redisAsyncCommand(ac,NULL,NULL,"HELLO 3");
/* Start subscribe */
TestState state = {.options = &options, .resp3 = 1};
redisAsyncCommand(ac,subscribe_cb,&state,"subscribe mychannel");
/* Start event dispatching loop */
test_cond(event_base_dispatch(base) == 0);
event_free(timeout);
event_base_free(base);
/* Verify test checkpoints */
assert(state.checkpoint == 1);
}
#endif
int main(int argc, char **argv) {
@ -1668,7 +1712,15 @@ int main(int argc, char **argv) {
#ifdef HIREDIS_TEST_ASYNC
printf("\nTesting asynchronous API against TCP connection (%s:%d):\n", cfg.tcp.host, cfg.tcp.port);
cfg.type = CONN_TCP;
int major;
redisContext *c = do_connect(cfg);
get_redis_version(c, &major, NULL);
disconnect(c, 0);
test_pubsub_handling(cfg);
if (major >= 6) test_pubsub_handling_resp3(cfg);
#endif
if (test_inherit_fd) {