Merge pull request #931 from kristjanvalur/pr2
Stability: Support calling redisAsyncCommand and redisAsyncDisconnect from the onConnected callback
This commit is contained in:
commit
17c8fe0798
64
async.c
64
async.c
@ -303,6 +303,34 @@ static void __redisRunPushCallback(redisAsyncContext *ac, redisReply *reply) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void __redisRunConnectCallback(redisAsyncContext *ac, int status)
|
||||||
|
{
|
||||||
|
if (ac->onConnect) {
|
||||||
|
if (!(ac->c.flags & REDIS_IN_CALLBACK)) {
|
||||||
|
ac->c.flags |= REDIS_IN_CALLBACK;
|
||||||
|
ac->onConnect(ac, status);
|
||||||
|
ac->c.flags &= ~REDIS_IN_CALLBACK;
|
||||||
|
} else {
|
||||||
|
/* already in callback */
|
||||||
|
ac->onConnect(ac, status);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void __redisRunDisconnectCallback(redisAsyncContext *ac, int status)
|
||||||
|
{
|
||||||
|
if (ac->onDisconnect) {
|
||||||
|
if (!(ac->c.flags & REDIS_IN_CALLBACK)) {
|
||||||
|
ac->c.flags |= REDIS_IN_CALLBACK;
|
||||||
|
ac->onDisconnect(ac, status);
|
||||||
|
ac->c.flags &= ~REDIS_IN_CALLBACK;
|
||||||
|
} else {
|
||||||
|
/* already in callback */
|
||||||
|
ac->onDisconnect(ac, status);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* Helper function to free the context. */
|
/* Helper function to free the context. */
|
||||||
static void __redisAsyncFree(redisAsyncContext *ac) {
|
static void __redisAsyncFree(redisAsyncContext *ac) {
|
||||||
redisContext *c = &(ac->c);
|
redisContext *c = &(ac->c);
|
||||||
@ -338,12 +366,11 @@ static void __redisAsyncFree(redisAsyncContext *ac) {
|
|||||||
|
|
||||||
/* Execute disconnect callback. When redisAsyncFree() initiated destroying
|
/* Execute disconnect callback. When redisAsyncFree() initiated destroying
|
||||||
* this context, the status will always be REDIS_OK. */
|
* this context, the status will always be REDIS_OK. */
|
||||||
if (ac->onDisconnect && (c->flags & REDIS_CONNECTED)) {
|
if (c->flags & REDIS_CONNECTED) {
|
||||||
if (c->flags & REDIS_FREEING) {
|
int status = ac->err == 0 ? REDIS_OK : REDIS_ERR;
|
||||||
ac->onDisconnect(ac,REDIS_OK);
|
if (c->flags & REDIS_FREEING)
|
||||||
} else {
|
status = REDIS_OK;
|
||||||
ac->onDisconnect(ac,(ac->err == 0) ? REDIS_OK : REDIS_ERR);
|
__redisRunDisconnectCallback(ac, status);
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ac->dataCleanup) {
|
if (ac->dataCleanup) {
|
||||||
@ -603,7 +630,7 @@ void redisProcessCallbacks(redisAsyncContext *ac) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
static void __redisAsyncHandleConnectFailure(redisAsyncContext *ac) {
|
static void __redisAsyncHandleConnectFailure(redisAsyncContext *ac) {
|
||||||
if (ac->onConnect) ac->onConnect(ac, REDIS_ERR);
|
__redisRunConnectCallback(ac, REDIS_ERR);
|
||||||
__redisAsyncDisconnect(ac);
|
__redisAsyncDisconnect(ac);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -628,8 +655,19 @@ static int __redisAsyncHandleConnect(redisAsyncContext *ac) {
|
|||||||
return REDIS_ERR;
|
return REDIS_ERR;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ac->onConnect) ac->onConnect(ac, REDIS_OK);
|
/* flag us as fully connect, but allow the callback
|
||||||
|
* to disconnect. For that reason, permit the function
|
||||||
|
* to delete the context here after callback return.
|
||||||
|
*/
|
||||||
c->flags |= REDIS_CONNECTED;
|
c->flags |= REDIS_CONNECTED;
|
||||||
|
__redisRunConnectCallback(ac, REDIS_OK);
|
||||||
|
if ((ac->c.flags & REDIS_DISCONNECTING)) {
|
||||||
|
redisAsyncDisconnect(ac);
|
||||||
|
return REDIS_ERR;
|
||||||
|
} else if ((ac->c.flags & REDIS_FREEING)) {
|
||||||
|
redisAsyncFree(ac);
|
||||||
|
return REDIS_ERR;
|
||||||
|
}
|
||||||
return REDIS_OK;
|
return REDIS_OK;
|
||||||
} else {
|
} else {
|
||||||
return REDIS_OK;
|
return REDIS_OK;
|
||||||
@ -653,6 +691,8 @@ void redisAsyncRead(redisAsyncContext *ac) {
|
|||||||
*/
|
*/
|
||||||
void redisAsyncHandleRead(redisAsyncContext *ac) {
|
void redisAsyncHandleRead(redisAsyncContext *ac) {
|
||||||
redisContext *c = &(ac->c);
|
redisContext *c = &(ac->c);
|
||||||
|
/* must not be called from a callback */
|
||||||
|
assert(!(c->flags & REDIS_IN_CALLBACK));
|
||||||
|
|
||||||
if (!(c->flags & REDIS_CONNECTED)) {
|
if (!(c->flags & REDIS_CONNECTED)) {
|
||||||
/* Abort connect was not successful. */
|
/* Abort connect was not successful. */
|
||||||
@ -686,6 +726,8 @@ void redisAsyncWrite(redisAsyncContext *ac) {
|
|||||||
|
|
||||||
void redisAsyncHandleWrite(redisAsyncContext *ac) {
|
void redisAsyncHandleWrite(redisAsyncContext *ac) {
|
||||||
redisContext *c = &(ac->c);
|
redisContext *c = &(ac->c);
|
||||||
|
/* must not be called from a callback */
|
||||||
|
assert(!(c->flags & REDIS_IN_CALLBACK));
|
||||||
|
|
||||||
if (!(c->flags & REDIS_CONNECTED)) {
|
if (!(c->flags & REDIS_CONNECTED)) {
|
||||||
/* Abort connect was not successful. */
|
/* Abort connect was not successful. */
|
||||||
@ -702,6 +744,8 @@ void redisAsyncHandleWrite(redisAsyncContext *ac) {
|
|||||||
void redisAsyncHandleTimeout(redisAsyncContext *ac) {
|
void redisAsyncHandleTimeout(redisAsyncContext *ac) {
|
||||||
redisContext *c = &(ac->c);
|
redisContext *c = &(ac->c);
|
||||||
redisCallback cb;
|
redisCallback cb;
|
||||||
|
/* must not be called from a callback */
|
||||||
|
assert(!(c->flags & REDIS_IN_CALLBACK));
|
||||||
|
|
||||||
if ((c->flags & REDIS_CONNECTED)) {
|
if ((c->flags & REDIS_CONNECTED)) {
|
||||||
if (ac->replies.head == NULL && ac->sub.replies.head == NULL) {
|
if (ac->replies.head == NULL && ac->sub.replies.head == NULL) {
|
||||||
@ -721,8 +765,8 @@ void redisAsyncHandleTimeout(redisAsyncContext *ac) {
|
|||||||
__redisAsyncCopyError(ac);
|
__redisAsyncCopyError(ac);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!(c->flags & REDIS_CONNECTED) && ac->onConnect) {
|
if (!(c->flags & REDIS_CONNECTED)) {
|
||||||
ac->onConnect(ac, REDIS_ERR);
|
__redisRunConnectCallback(ac, REDIS_ERR);
|
||||||
}
|
}
|
||||||
|
|
||||||
while (__redisShiftCallback(&ac->replies, &cb) == REDIS_OK) {
|
while (__redisShiftCallback(&ac->replies, &cb) == REDIS_OK) {
|
||||||
|
2
async.h
2
async.h
@ -57,7 +57,7 @@ typedef struct redisCallbackList {
|
|||||||
|
|
||||||
/* Connection callback prototypes */
|
/* Connection callback prototypes */
|
||||||
typedef void (redisDisconnectCallback)(const struct redisAsyncContext*, int status);
|
typedef void (redisDisconnectCallback)(const struct redisAsyncContext*, int status);
|
||||||
typedef void (redisConnectCallback)(const struct redisAsyncContext*, int status);
|
typedef void (redisConnectCallback)(struct redisAsyncContext*, int status);
|
||||||
typedef void(redisTimerCallback)(void *timer, void *privdata);
|
typedef void(redisTimerCallback)(void *timer, void *privdata);
|
||||||
|
|
||||||
/* Context for an async connection to Redis */
|
/* Context for an async connection to Redis */
|
||||||
|
54
test.c
54
test.c
@ -1907,7 +1907,9 @@ typedef enum astest_no
|
|||||||
ASTEST_CONNECT=0,
|
ASTEST_CONNECT=0,
|
||||||
ASTEST_CONN_TIMEOUT,
|
ASTEST_CONN_TIMEOUT,
|
||||||
ASTEST_PINGPONG,
|
ASTEST_PINGPONG,
|
||||||
ASTEST_PINGPONG_TIMEOUT
|
ASTEST_PINGPONG_TIMEOUT,
|
||||||
|
ASTEST_ISSUE_931,
|
||||||
|
ASTEST_ISSUE_931_PING
|
||||||
}astest_no;
|
}astest_no;
|
||||||
|
|
||||||
/* a static context for the async tests */
|
/* a static context for the async tests */
|
||||||
@ -1918,6 +1920,7 @@ struct _astest {
|
|||||||
int connects;
|
int connects;
|
||||||
int connect_status;
|
int connect_status;
|
||||||
int disconnects;
|
int disconnects;
|
||||||
|
int pongs;
|
||||||
int disconnect_status;
|
int disconnect_status;
|
||||||
int connected;
|
int connected;
|
||||||
int err;
|
int err;
|
||||||
@ -1941,7 +1944,9 @@ static void asCleanup(void* data)
|
|||||||
t->ac = NULL;
|
t->ac = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void connectCallback(const redisAsyncContext *c, int status) {
|
static void commandCallback(struct redisAsyncContext *ac, void* _reply, void* _privdata);
|
||||||
|
|
||||||
|
static void connectCallback(redisAsyncContext *c, int status) {
|
||||||
struct _astest *t = (struct _astest *)c->data;
|
struct _astest *t = (struct _astest *)c->data;
|
||||||
assert(t == &astest);
|
assert(t == &astest);
|
||||||
assert(t->connects == 0);
|
assert(t->connects == 0);
|
||||||
@ -1950,6 +1955,15 @@ static void connectCallback(const redisAsyncContext *c, int status) {
|
|||||||
t->connects++;
|
t->connects++;
|
||||||
t->connect_status = status;
|
t->connect_status = status;
|
||||||
t->connected = status == REDIS_OK ? 1 : -1;
|
t->connected = status == REDIS_OK ? 1 : -1;
|
||||||
|
|
||||||
|
if (t->testno == ASTEST_ISSUE_931) {
|
||||||
|
/* disconnect again */
|
||||||
|
redisAsyncDisconnect(c);
|
||||||
|
}
|
||||||
|
else if (t->testno == ASTEST_ISSUE_931_PING)
|
||||||
|
{
|
||||||
|
status = redisAsyncCommand(c, commandCallback, NULL, "PING");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
static void disconnectCallback(const redisAsyncContext *c, int status) {
|
static void disconnectCallback(const redisAsyncContext *c, int status) {
|
||||||
assert(c->data == (void*)&astest);
|
assert(c->data == (void*)&astest);
|
||||||
@ -1969,20 +1983,22 @@ static void commandCallback(struct redisAsyncContext *ac, void* _reply, void* _p
|
|||||||
(void)_privdata;
|
(void)_privdata;
|
||||||
t->err = ac->err;
|
t->err = ac->err;
|
||||||
strcpy(t->errstr, ac->errstr);
|
strcpy(t->errstr, ac->errstr);
|
||||||
if (t->testno == ASTEST_PINGPONG)
|
t->counter++;
|
||||||
|
if (t->testno == ASTEST_PINGPONG ||t->testno == ASTEST_ISSUE_931_PING)
|
||||||
{
|
{
|
||||||
test_cond(reply != NULL && reply->type == REDIS_REPLY_STATUS && strcmp(reply->str, "PONG") == 0);
|
assert(reply != NULL && reply->type == REDIS_REPLY_STATUS && strcmp(reply->str, "PONG") == 0);
|
||||||
|
t->pongs++;
|
||||||
redisAsyncFree(ac);
|
redisAsyncFree(ac);
|
||||||
}
|
}
|
||||||
if (t->testno == ASTEST_PINGPONG_TIMEOUT)
|
if (t->testno == ASTEST_PINGPONG_TIMEOUT)
|
||||||
{
|
{
|
||||||
/* two ping pongs */
|
/* two ping pongs */
|
||||||
assert(reply != NULL && reply->type == REDIS_REPLY_STATUS && strcmp(reply->str, "PONG") == 0);
|
assert(reply != NULL && reply->type == REDIS_REPLY_STATUS && strcmp(reply->str, "PONG") == 0);
|
||||||
if (++t->counter == 1) {
|
t->pongs++;
|
||||||
|
if (t->counter == 1) {
|
||||||
int status = redisAsyncCommand(ac, commandCallback, NULL, "PING");
|
int status = redisAsyncCommand(ac, commandCallback, NULL, "PING");
|
||||||
assert(status == REDIS_OK);
|
assert(status == REDIS_OK);
|
||||||
} else {
|
} else {
|
||||||
test_cond(reply != NULL && reply->type == REDIS_REPLY_STATUS && strcmp(reply->str, "PONG") == 0);
|
|
||||||
redisAsyncFree(ac);
|
redisAsyncFree(ac);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -2089,6 +2105,7 @@ static void test_async_polling(struct config config) {
|
|||||||
assert(status == REDIS_OK);
|
assert(status == REDIS_OK);
|
||||||
while(astest.ac)
|
while(astest.ac)
|
||||||
redisPollTick(c, 0.1);
|
redisPollTick(c, 0.1);
|
||||||
|
test_cond(astest.pongs == 1);
|
||||||
|
|
||||||
/* Test a ping/pong after connection that didn't time out.
|
/* Test a ping/pong after connection that didn't time out.
|
||||||
* see https://github.com/redis/hiredis/issues/945
|
* see https://github.com/redis/hiredis/issues/945
|
||||||
@ -2105,8 +2122,33 @@ static void test_async_polling(struct config config) {
|
|||||||
assert(status == REDIS_OK);
|
assert(status == REDIS_OK);
|
||||||
while(astest.ac)
|
while(astest.ac)
|
||||||
redisPollTick(c, 0.1);
|
redisPollTick(c, 0.1);
|
||||||
|
test_cond(astest.pongs == 2);
|
||||||
config = defaultconfig;
|
config = defaultconfig;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Test disconnect from an on_connect callback
|
||||||
|
* see https://github.com/redis/hiredis/issues/931
|
||||||
|
*/
|
||||||
|
test("Disconnect from onConnected callback (Issue #931): ");
|
||||||
|
c = do_aconnect(config, ASTEST_ISSUE_931);
|
||||||
|
while(astest.disconnects == 0)
|
||||||
|
redisPollTick(c, 0.1);
|
||||||
|
assert(astest.connected == 0);
|
||||||
|
assert(astest.connects == 1);
|
||||||
|
test_cond(astest.disconnects == 1);
|
||||||
|
|
||||||
|
/* Test ping/pong from an on_connect callback
|
||||||
|
* see https://github.com/redis/hiredis/issues/931
|
||||||
|
*/
|
||||||
|
test("Ping/Pong from onConnected callback (Issue #931): ");
|
||||||
|
c = do_aconnect(config, ASTEST_ISSUE_931_PING);
|
||||||
|
/* connect callback issues ping, reponse callback destroys context */
|
||||||
|
while(astest.ac)
|
||||||
|
redisPollTick(c, 0.1);
|
||||||
|
assert(astest.connected == 0);
|
||||||
|
assert(astest.connects == 1);
|
||||||
|
assert(astest.disconnects == 1);
|
||||||
|
test_cond(astest.pongs == 1);
|
||||||
}
|
}
|
||||||
/* End of Async polling_adapter driven tests */
|
/* End of Async polling_adapter driven tests */
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user