Support calling redisAsyncDisconnect from the onConnected callback, by deferring context deletion
This commit is contained in:
parent
6ed060920f
commit
005d7edebe
64
async.c
64
async.c
@ -303,6 +303,34 @@ static void __redisRunPushCallback(redisAsyncContext *ac, redisReply *reply) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void __redisRunConnectCallback(redisAsyncContext *ac, int status)
|
||||||
|
{
|
||||||
|
if (ac->onConnect) {
|
||||||
|
if (!(ac->c.flags & REDIS_IN_CALLBACK)) {
|
||||||
|
ac->c.flags |= REDIS_IN_CALLBACK;
|
||||||
|
ac->onConnect(ac, status);
|
||||||
|
ac->c.flags &= ~REDIS_IN_CALLBACK;
|
||||||
|
} else {
|
||||||
|
/* already in callback */
|
||||||
|
ac->onConnect(ac, status);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void __redisRunDisconnectCallback(redisAsyncContext *ac, int status)
|
||||||
|
{
|
||||||
|
if (ac->onDisconnect) {
|
||||||
|
if (!(ac->c.flags & REDIS_IN_CALLBACK)) {
|
||||||
|
ac->c.flags |= REDIS_IN_CALLBACK;
|
||||||
|
ac->onDisconnect(ac, status);
|
||||||
|
ac->c.flags &= ~REDIS_IN_CALLBACK;
|
||||||
|
} else {
|
||||||
|
/* already in callback */
|
||||||
|
ac->onDisconnect(ac, status);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* Helper function to free the context. */
|
/* Helper function to free the context. */
|
||||||
static void __redisAsyncFree(redisAsyncContext *ac) {
|
static void __redisAsyncFree(redisAsyncContext *ac) {
|
||||||
redisContext *c = &(ac->c);
|
redisContext *c = &(ac->c);
|
||||||
@ -338,12 +366,11 @@ static void __redisAsyncFree(redisAsyncContext *ac) {
|
|||||||
|
|
||||||
/* Execute disconnect callback. When redisAsyncFree() initiated destroying
|
/* Execute disconnect callback. When redisAsyncFree() initiated destroying
|
||||||
* this context, the status will always be REDIS_OK. */
|
* this context, the status will always be REDIS_OK. */
|
||||||
if (ac->onDisconnect && (c->flags & REDIS_CONNECTED)) {
|
if (c->flags & REDIS_CONNECTED) {
|
||||||
if (c->flags & REDIS_FREEING) {
|
int status = ac->err == 0 ? REDIS_OK : REDIS_ERR;
|
||||||
ac->onDisconnect(ac,REDIS_OK);
|
if (c->flags & REDIS_FREEING)
|
||||||
} else {
|
status = REDIS_OK;
|
||||||
ac->onDisconnect(ac,(ac->err == 0) ? REDIS_OK : REDIS_ERR);
|
__redisRunDisconnectCallback(ac, status);
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ac->dataCleanup) {
|
if (ac->dataCleanup) {
|
||||||
@ -603,7 +630,7 @@ void redisProcessCallbacks(redisAsyncContext *ac) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
static void __redisAsyncHandleConnectFailure(redisAsyncContext *ac) {
|
static void __redisAsyncHandleConnectFailure(redisAsyncContext *ac) {
|
||||||
if (ac->onConnect) ac->onConnect(ac, REDIS_ERR);
|
__redisRunConnectCallback(ac, REDIS_ERR);
|
||||||
__redisAsyncDisconnect(ac);
|
__redisAsyncDisconnect(ac);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -628,8 +655,19 @@ static int __redisAsyncHandleConnect(redisAsyncContext *ac) {
|
|||||||
return REDIS_ERR;
|
return REDIS_ERR;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ac->onConnect) ac->onConnect(ac, REDIS_OK);
|
/* flag us as fully connect, but allow the callback
|
||||||
|
* to disconnect. For that reason, permit the function
|
||||||
|
* to delete the context here after callback return.
|
||||||
|
*/
|
||||||
c->flags |= REDIS_CONNECTED;
|
c->flags |= REDIS_CONNECTED;
|
||||||
|
__redisRunConnectCallback(ac, REDIS_OK);
|
||||||
|
if ((ac->c.flags & REDIS_DISCONNECTING)) {
|
||||||
|
redisAsyncDisconnect(ac);
|
||||||
|
return REDIS_ERR;
|
||||||
|
} else if ((ac->c.flags & REDIS_FREEING)) {
|
||||||
|
redisAsyncFree(ac);
|
||||||
|
return REDIS_ERR;
|
||||||
|
}
|
||||||
return REDIS_OK;
|
return REDIS_OK;
|
||||||
} else {
|
} else {
|
||||||
return REDIS_OK;
|
return REDIS_OK;
|
||||||
@ -653,6 +691,8 @@ void redisAsyncRead(redisAsyncContext *ac) {
|
|||||||
*/
|
*/
|
||||||
void redisAsyncHandleRead(redisAsyncContext *ac) {
|
void redisAsyncHandleRead(redisAsyncContext *ac) {
|
||||||
redisContext *c = &(ac->c);
|
redisContext *c = &(ac->c);
|
||||||
|
/* must not be called from a callback */
|
||||||
|
assert(!(c->flags & REDIS_IN_CALLBACK));
|
||||||
|
|
||||||
if (!(c->flags & REDIS_CONNECTED)) {
|
if (!(c->flags & REDIS_CONNECTED)) {
|
||||||
/* Abort connect was not successful. */
|
/* Abort connect was not successful. */
|
||||||
@ -686,6 +726,8 @@ void redisAsyncWrite(redisAsyncContext *ac) {
|
|||||||
|
|
||||||
void redisAsyncHandleWrite(redisAsyncContext *ac) {
|
void redisAsyncHandleWrite(redisAsyncContext *ac) {
|
||||||
redisContext *c = &(ac->c);
|
redisContext *c = &(ac->c);
|
||||||
|
/* must not be called from a callback */
|
||||||
|
assert(!(c->flags & REDIS_IN_CALLBACK));
|
||||||
|
|
||||||
if (!(c->flags & REDIS_CONNECTED)) {
|
if (!(c->flags & REDIS_CONNECTED)) {
|
||||||
/* Abort connect was not successful. */
|
/* Abort connect was not successful. */
|
||||||
@ -702,6 +744,8 @@ void redisAsyncHandleWrite(redisAsyncContext *ac) {
|
|||||||
void redisAsyncHandleTimeout(redisAsyncContext *ac) {
|
void redisAsyncHandleTimeout(redisAsyncContext *ac) {
|
||||||
redisContext *c = &(ac->c);
|
redisContext *c = &(ac->c);
|
||||||
redisCallback cb;
|
redisCallback cb;
|
||||||
|
/* must not be called from a callback */
|
||||||
|
assert(!(c->flags & REDIS_IN_CALLBACK));
|
||||||
|
|
||||||
if ((c->flags & REDIS_CONNECTED)) {
|
if ((c->flags & REDIS_CONNECTED)) {
|
||||||
if (ac->replies.head == NULL && ac->sub.replies.head == NULL) {
|
if (ac->replies.head == NULL && ac->sub.replies.head == NULL) {
|
||||||
@ -721,8 +765,8 @@ void redisAsyncHandleTimeout(redisAsyncContext *ac) {
|
|||||||
__redisAsyncCopyError(ac);
|
__redisAsyncCopyError(ac);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!(c->flags & REDIS_CONNECTED) && ac->onConnect) {
|
if (!(c->flags & REDIS_CONNECTED)) {
|
||||||
ac->onConnect(ac, REDIS_ERR);
|
__redisRunConnectCallback(ac, REDIS_ERR);
|
||||||
}
|
}
|
||||||
|
|
||||||
while (__redisShiftCallback(&ac->replies, &cb) == REDIS_OK) {
|
while (__redisShiftCallback(&ac->replies, &cb) == REDIS_OK) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user