Add asynchronous test for pubsub using RESP2
The testcase will subscribe to a channel, and via a second client a message is published to the channel. After receiving the message the testcase will unsubscribe and disconnect.master
parent
648763c36e
commit
4021726a69
107
test.c
107
test.c
|
@ -18,6 +18,10 @@
|
|||
#ifdef HIREDIS_TEST_SSL
|
||||
#include "hiredis_ssl.h"
|
||||
#endif
|
||||
#ifdef HIREDIS_TEST_ASYNC
|
||||
#include "adapters/libevent.h"
|
||||
#include <event.h>
|
||||
#endif
|
||||
#include "net.h"
|
||||
#include "win32.h"
|
||||
|
||||
|
@ -1443,6 +1447,104 @@ static void test_throughput(struct config config) {
|
|||
// redisFree(c);
|
||||
// }
|
||||
|
||||
#ifdef HIREDIS_TEST_ASYNC
|
||||
struct event_base *base;
|
||||
|
||||
typedef struct TestState {
|
||||
redisOptions *options;
|
||||
int checkpoint;
|
||||
} TestState;
|
||||
|
||||
/* Testcase timeout, will trigger a failure */
|
||||
void timeout_cb(int fd, short event, void *arg) {
|
||||
(void) fd; (void) event; (void) arg;
|
||||
printf("Timeout in async testing!\n");
|
||||
exit(1);
|
||||
}
|
||||
|
||||
/* Unexpected call, will trigger a failure */
|
||||
void unexpected_cb(redisAsyncContext *ac, void *r, void *privdata) {
|
||||
(void) ac; (void) r;
|
||||
printf("Unexpected call: %s\n",(char*)privdata);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
/* Helper function to publish a message via own client. */
|
||||
void publish_msg(redisOptions *options, const char* channel, const char* msg) {
|
||||
redisContext *c = redisConnectWithOptions(options);
|
||||
assert(c != NULL);
|
||||
redisReply *reply = redisCommand(c,"PUBLISH %s %s",channel,msg);
|
||||
assert(reply->type == REDIS_REPLY_INTEGER && reply->integer == 1);
|
||||
freeReplyObject(reply);
|
||||
disconnect(c, 0);
|
||||
}
|
||||
|
||||
/* Subscribe callback for test_pubsub_handling:
|
||||
* - a published message triggers an unsubscribe
|
||||
* - an unsubscribe response triggers a disconnect */
|
||||
void subscribe_cb(redisAsyncContext *ac, void *r, void *privdata) {
|
||||
redisReply *reply = r;
|
||||
TestState *state = privdata;
|
||||
|
||||
assert(reply != NULL &&
|
||||
reply->type == REDIS_REPLY_ARRAY &&
|
||||
reply->elements == 3);
|
||||
|
||||
if (strcmp(reply->element[0]->str,"subscribe") == 0) {
|
||||
assert(strcmp(reply->element[1]->str,"mychannel") == 0 &&
|
||||
reply->element[2]->str == NULL);
|
||||
publish_msg(state->options,"mychannel","Hello!");
|
||||
} else if (strcmp(reply->element[0]->str,"message") == 0) {
|
||||
assert(strcmp(reply->element[1]->str,"mychannel") == 0 &&
|
||||
strcmp(reply->element[2]->str,"Hello!") == 0);
|
||||
state->checkpoint++;
|
||||
|
||||
/* Unsubscribe after receiving the published message. Send unsubscribe
|
||||
* which should call the callback registered during subscribe */
|
||||
redisAsyncCommand(ac,unexpected_cb,
|
||||
(void*)"unsubscribe should call subscribe_cb()",
|
||||
"unsubscribe");
|
||||
} else if (strcmp(reply->element[0]->str,"unsubscribe") == 0) {
|
||||
assert(strcmp(reply->element[1]->str,"mychannel") == 0 &&
|
||||
reply->element[2]->str == NULL);
|
||||
|
||||
/* Disconnect after unsubscribe */
|
||||
redisAsyncDisconnect(ac);
|
||||
event_base_loopbreak(base);
|
||||
} else {
|
||||
printf("Unexpected pubsub command: %s\n", reply->element[0]->str);
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
static void test_pubsub_handling(struct config config) {
|
||||
test("Subscribe, handle published message and unsubscribe: ");
|
||||
/* Setup event dispatcher with a testcase timeout */
|
||||
base = event_base_new();
|
||||
struct event timeout;
|
||||
evtimer_assign(&timeout,base,timeout_cb,NULL);
|
||||
struct timeval timeout_tv = {.tv_sec = 10};
|
||||
evtimer_add(&timeout, &timeout_tv);
|
||||
|
||||
/* Connect */
|
||||
redisOptions options = get_redis_tcp_options(config);
|
||||
redisAsyncContext *ac = redisAsyncConnectWithOptions(&options);
|
||||
assert(ac != NULL && ac->err == 0);
|
||||
redisLibeventAttach(ac,base);
|
||||
|
||||
/* Start subscribe */
|
||||
TestState state = {.options = &options};
|
||||
redisAsyncCommand(ac,subscribe_cb,&state,"subscribe mychannel");
|
||||
|
||||
/* Start event dispatching loop */
|
||||
test_cond(event_base_dispatch(base) == 0);
|
||||
event_base_free(base);
|
||||
|
||||
/* Verify test checkpoints */
|
||||
assert(state.checkpoint == 1);
|
||||
}
|
||||
#endif
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
struct config cfg = {
|
||||
.tcp = {
|
||||
|
@ -1561,6 +1663,11 @@ int main(int argc, char **argv) {
|
|||
}
|
||||
#endif
|
||||
|
||||
#ifdef HIREDIS_TEST_ASYNC
|
||||
printf("\nTesting asynchronous API against TCP connection (%s:%d):\n", cfg.tcp.host, cfg.tcp.port);
|
||||
test_pubsub_handling(cfg);
|
||||
#endif
|
||||
|
||||
if (test_inherit_fd) {
|
||||
printf("\nTesting against inherited fd (%s): ", cfg.unix_sock.path);
|
||||
if (test_unix_socket) {
|
||||
|
|
Loading…
Reference in New Issue