Deprecate the error object for I/O and protocol errors

Rather, use the return value REDIS_OK and REDIS_ERR to designate if
calling a function has a successful outcome. When there was an error in
performing the call the error field will be set inside the reader or the
context.
master
Pieter Noordhuis 2010-09-25 15:09:13 +02:00
parent e6fb160b44
commit a7d7618141
3 changed files with 139 additions and 105 deletions

211
hiredis.c
View File

@ -41,6 +41,7 @@
typedef struct redisReader { typedef struct redisReader {
struct redisReplyObjectFunctions *fn; struct redisReplyObjectFunctions *fn;
sds error; /* holds optional error */
void *reply; /* holds temporary reply */ void *reply; /* holds temporary reply */
sds buf; /* read buffer */ sds buf; /* read buffer */
@ -52,16 +53,14 @@ typedef struct redisReader {
} redisReader; } redisReader;
static redisReply *createReplyObject(int type, sds reply); static redisReply *createReplyObject(int type, sds reply);
static void *createErrorObject(const char *str, size_t len);
static void *createStringObject(redisReadTask *task, char *str, size_t len); static void *createStringObject(redisReadTask *task, char *str, size_t len);
static void *createArrayObject(redisReadTask *task, int elements); static void *createArrayObject(redisReadTask *task, int elements);
static void *createIntegerObject(redisReadTask *task, long long value); static void *createIntegerObject(redisReadTask *task, long long value);
static void *createNilObject(redisReadTask *task); static void *createNilObject(redisReadTask *task);
static void redisSetReplyReaderError(redisReader *r, void *obj); static void redisSetReplyReaderError(redisReader *r, sds err);
/* Default set of functions to build the reply. */ /* Default set of functions to build the reply. */
static redisReplyFunctions defaultFunctions = { static redisReplyFunctions defaultFunctions = {
createErrorObject,
createStringObject, createStringObject,
createArrayObject, createArrayObject,
createIntegerObject, createIntegerObject,
@ -106,24 +105,6 @@ void freeReplyObject(void *reply) {
free(r); free(r);
} }
/* Helper function that allows printf-like creation of error objects. */
static void *formatError(redisReplyFunctions *fn, const char *fmt, ...) {
va_list ap;
sds err;
void *obj;
va_start(ap,fmt);
err = sdscatvprintf(sdsempty(),fmt,ap);
va_end(ap);
obj = fn->createError(err,sdslen(err));
sdsfree(err);
return obj;
}
static void *createErrorObject(const char *str, size_t len) {
redisReply *r = createReplyObject(REDIS_ERROR,sdsnewlen(str,len));
return r;
}
static void *createStringObject(redisReadTask *task, char *str, size_t len) { static void *createStringObject(redisReadTask *task, char *str, size_t len) {
redisReply *r = createReplyObject(task->type,sdsnewlen(str,len)); redisReply *r = createReplyObject(task->type,sdsnewlen(str,len));
assert(task->type == REDIS_REPLY_ERROR || assert(task->type == REDIS_REPLY_ERROR ||
@ -308,7 +289,6 @@ static int processItem(redisReader *r) {
redisReadTask *cur = &(r->rlist[r->rpos]); redisReadTask *cur = &(r->rlist[r->rpos]);
char *p; char *p;
sds byte; sds byte;
void *err;
/* check if we need to read type */ /* check if we need to read type */
if (cur->type < 0) { if (cur->type < 0) {
@ -331,9 +311,8 @@ static int processItem(redisReader *r) {
break; break;
default: default:
byte = sdscatrepr(sdsempty(),p,1); byte = sdscatrepr(sdsempty(),p,1);
err = formatError(r->fn, redisSetReplyReaderError(r,sdscatprintf(sdsempty(),
"protocol error, got %s as reply type byte", byte); "protocol error, got %s as reply type byte", byte));
redisSetReplyReaderError(r,err);
sdsfree(byte); sdsfree(byte);
return -1; return -1;
} }
@ -354,14 +333,15 @@ static int processItem(redisReader *r) {
case REDIS_REPLY_ARRAY: case REDIS_REPLY_ARRAY:
return processMultiBulkItem(r); return processMultiBulkItem(r);
default: default:
err = formatError(r->fn,"unknown item type '%d'", cur->type); redisSetReplyReaderError(r,sdscatprintf(sdsempty(),
redisSetReplyReaderError(r,err); "unknown item type '%d'", cur->type));
return -1; return -1;
} }
} }
void *redisReplyReaderCreate(redisReplyFunctions *fn) { void *redisReplyReaderCreate(redisReplyFunctions *fn) {
redisReader *r = calloc(sizeof(redisReader),1); redisReader *r = calloc(sizeof(redisReader),1);
r->error = NULL;
r->fn = fn == NULL ? &defaultFunctions : fn; r->fn = fn == NULL ? &defaultFunctions : fn;
r->buf = sdsempty(); r->buf = sdsempty();
r->rlist = malloc(sizeof(redisReadTask)*1); r->rlist = malloc(sizeof(redisReadTask)*1);
@ -379,6 +359,8 @@ void *redisReplyReaderGetObject(void *reader) {
void redisReplyReaderFree(void *reader) { void redisReplyReaderFree(void *reader) {
redisReader *r = reader; redisReader *r = reader;
if (r->error != NULL)
sdsfree(r->error);
if (r->reply != NULL) if (r->reply != NULL)
r->fn->freeObject(r->reply); r->fn->freeObject(r->reply);
if (r->buf != NULL) if (r->buf != NULL)
@ -388,7 +370,7 @@ void redisReplyReaderFree(void *reader) {
free(r); free(r);
} }
static void redisSetReplyReaderError(redisReader *r, void *obj) { static void redisSetReplyReaderError(redisReader *r, sds err) {
if (r->reply != NULL) if (r->reply != NULL)
r->fn->freeObject(r->reply); r->fn->freeObject(r->reply);
@ -399,7 +381,12 @@ static void redisSetReplyReaderError(redisReader *r, void *obj) {
r->pos = 0; r->pos = 0;
} }
r->rlen = r->rpos = 0; r->rlen = r->rpos = 0;
r->reply = obj; r->error = err;
}
char *redisReplyReaderGetError(void *reader) {
redisReader *r = reader;
return r->error;
} }
void redisReplyReaderFeed(void *reader, char *buf, int len) { void redisReplyReaderFeed(void *reader, char *buf, int len) {
@ -410,12 +397,13 @@ void redisReplyReaderFeed(void *reader, char *buf, int len) {
r->buf = sdscatlen(r->buf,buf,len); r->buf = sdscatlen(r->buf,buf,len);
} }
void *redisReplyReaderGetReply(void *reader) { int redisReplyReaderGetReply(void *reader, void **reply) {
redisReader *r = reader; redisReader *r = reader;
if (reply != NULL) *reply = NULL;
/* When the buffer is empty, there will never be a reply. */ /* When the buffer is empty, there will never be a reply. */
if (sdslen(r->buf) == 0) if (sdslen(r->buf) == 0)
return NULL; return REDIS_OK;
/* Create first item to process when the item list is empty. */ /* Create first item to process when the item list is empty. */
if (r->rlen == 0) { if (r->rlen == 0) {
@ -446,7 +434,7 @@ void *redisReplyReaderGetReply(void *reader) {
/* Emit a reply when there is one. */ /* Emit a reply when there is one. */
if (r->rpos == r->rlen) { if (r->rpos == r->rlen) {
void *reply = r->reply; void *aux = r->reply;
r->reply = NULL; r->reply = NULL;
/* Destroy the buffer when it is empty and is quite large. */ /* Destroy the buffer when it is empty and is quite large. */
@ -458,10 +446,15 @@ void *redisReplyReaderGetReply(void *reader) {
/* Set list of items to read to be empty. */ /* Set list of items to read to be empty. */
r->rlen = r->rpos = 0; r->rlen = r->rpos = 0;
return reply;
} else { /* Check if there actually *is* a reply. */
return NULL; if (r->error != NULL) {
return REDIS_ERR;
} else {
if (reply != NULL) *reply = aux;
}
} }
return REDIS_OK;
} }
/* Helper function for redisCommand(). It's used to append the next argument /* Helper function for redisCommand(). It's used to append the next argument
@ -571,11 +564,11 @@ static int redisContextConnect(redisContext *c, const char *ip, int port) {
} }
if (c->fd == ANET_ERR) { if (c->fd == ANET_ERR) {
c->error = c->fn->createError(err,strlen(err)); c->error = sdsnew(err);
return REDIS_ERR; return REDIS_ERR;
} }
if (anetTcpNoDelay(err,c->fd) == ANET_ERR) { if (anetTcpNoDelay(err,c->fd) == ANET_ERR) {
c->error = c->fn->createError(err,strlen(err)); c->error = sdsnew(err);
return REDIS_ERR; return REDIS_ERR;
} }
return REDIS_OK; return REDIS_OK;
@ -623,12 +616,12 @@ int redisBufferRead(redisContext *c) {
/* Try again later */ /* Try again later */
} else { } else {
/* Set error in context */ /* Set error in context */
c->error = formatError(c->fn, c->error = sdscatprintf(sdsempty(),
"Error reading from socket: %s", strerror(errno)); "Error reading from socket: %s", strerror(errno));
return REDIS_ERR; return REDIS_ERR;
} }
} else if (nread == 0) { } else if (nread == 0) {
c->error = formatError(c->fn, c->error = sdscatprintf(sdsempty(),
"Server closed the connection"); "Server closed the connection");
return REDIS_ERR; return REDIS_ERR;
} else { } else {
@ -644,21 +637,34 @@ static void redisPopCallback(redisContext *c) {
c->cpos--; c->cpos--;
} }
void *redisGetReply(redisContext *c) { int redisGetReply(redisContext *c, void **reply) {
redisPopCallback(c); redisPopCallback(c);
return redisReplyReaderGetReply(c->reader); if (redisReplyReaderGetReply(c->reader,reply) == REDIS_ERR) {
/* Copy the (protocol) error from the reader to the context. */
c->error = sdsnew(((redisReader*)c->reader)->error);
return REDIS_ERR;
}
return REDIS_OK;
} }
int redisProcessCallbacks(redisContext *c) { int redisProcessCallbacks(redisContext *c) {
void *reply; void *reply = NULL;
redisCallback cb; redisCallback cb;
while ((reply = redisReplyReaderGetReply(c->reader)) != NULL) { do {
cb = c->callbacks[0]; cb = c->callbacks[0];
if (cb.fn != NULL) if (redisGetReply(c,&reply) == REDIS_ERR)
cb.fn(c,reply,cb.privdata); return REDIS_ERR;
redisPopCallback(c);
} /* Fire callback when there is a reply. */
if (reply != NULL) {
if (cb.fn != NULL) {
cb.fn(c,reply,cb.privdata);
} else {
c->fn->freeObject(reply);
}
}
} while (reply != NULL);
return REDIS_OK; return REDIS_OK;
} }
@ -671,7 +677,7 @@ int redisBufferWrite(redisContext *c, int *done) {
/* Try again later */ /* Try again later */
} else { } else {
/* Set error in context */ /* Set error in context */
c->error = formatError(c->fn, c->error = sdscatprintf(sdsempty(),
"Error writing to socket: %s", strerror(errno)); "Error writing to socket: %s", strerror(errno));
return REDIS_ERR; return REDIS_ERR;
} }
@ -687,73 +693,102 @@ int redisBufferWrite(redisContext *c, int *done) {
return REDIS_OK; return REDIS_OK;
} }
static void* redisCommandWrite(redisContext *c, redisCallback *cb, char *str, size_t len) { static int redisCommandWriteBlock(redisContext *c, void **reply, char *str, size_t len) {
void *reply = NULL;
int wdone = 0; int wdone = 0;
void *aux = NULL;
assert(c->flags & HIREDIS_BLOCK);
c->obuf = sdscatlen(c->obuf,str,len); c->obuf = sdscatlen(c->obuf,str,len);
/* Read reply immediately when the context is blocking. */ /* Write until done. */
if (c->flags & HIREDIS_BLOCK) { do {
do { /* Write until done. */ if (redisBufferWrite(c,&wdone) == REDIS_ERR)
if (redisBufferWrite(c,&wdone) == REDIS_ERR) return REDIS_ERR;
return c->error; } while (!wdone);
} while (!wdone);
do { /* Read until there is a reply. */ /* Read until there is a reply. */
if (redisBufferRead(c) == REDIS_ERR) do {
return c->error; if (redisBufferRead(c) == REDIS_ERR)
reply = redisGetReply(c); return REDIS_ERR;
} while (reply == NULL); if (redisGetReply(c,&aux) == REDIS_ERR)
} else { return REDIS_ERR;
/* Make room for the callback. */ } while (aux == NULL);
assert(c->cpos <= c->clen);
if (c->cpos == c->clen) {
c->clen++;
c->callbacks = realloc(c->callbacks,c->clen*sizeof(redisCallback));
}
if (cb != NULL) { /* Set reply object. */
c->callbacks[c->cpos] = *cb; if (reply != NULL)
} else { *reply = aux;
memset(&c->callbacks[c->cpos],0,sizeof(redisCallback));
} return REDIS_OK;
c->cpos++;
}
return reply;
} }
/* Write a formatted command to the output buffer, and, if the context is a static int redisCommandWriteNonBlock(redisContext *c, redisCallback *cb, char *str, size_t len) {
* blocking connection, read the reply and return it. When this function assert(!(c->flags & HIREDIS_BLOCK));
* is called from a non-blocking context, it will always return NULL. */ c->obuf = sdscatlen(c->obuf,str,len);
/* Make sure there is space for the callback. */
assert(c->cpos <= c->clen);
if (c->cpos == c->clen) {
c->clen++;
c->callbacks = realloc(c->callbacks,c->clen*sizeof(redisCallback));
}
if (cb != NULL) {
c->callbacks[c->cpos] = *cb;
} else {
memset(&c->callbacks[c->cpos],0,sizeof(redisCallback));
}
c->cpos++;
return REDIS_OK;
}
/* Write a formatted command to the output buffer. If the given context is
* blocking, immediately read the reply into the "reply" pointer. When the
* context is non-blocking, the "reply" pointer will not be used and a
* NULL callback will be appended to the list of callbacks.
*
* Returns the reply when a reply was succesfully retrieved. Returns NULL
* otherwise. When NULL is returned in a blocking context, provided that
* the reply build functions did not return NULL when building the reply,
* the error field in the context will be set. */
void *redisCommand(redisContext *c, const char *format, ...) { void *redisCommand(redisContext *c, const char *format, ...) {
va_list ap; va_list ap;
sds cmd; sds cmd;
void *reply; void *reply = NULL;
va_start(ap,format); va_start(ap,format);
cmd = redisFormatCommand(format,ap); cmd = redisFormatCommand(format,ap);
va_end(ap); va_end(ap);
reply = redisCommandWrite(c,NULL,cmd,sdslen(cmd)); if (c->flags & HIREDIS_BLOCK) {
if (redisCommandWriteBlock(c,&reply,cmd,sdslen(cmd)) == REDIS_OK) {
return reply;
}
} else {
redisCommandWriteNonBlock(c,NULL,cmd,sdslen(cmd));
}
sdsfree(cmd); sdsfree(cmd);
return reply; return NULL;
} }
/* Write a formatted command to the output buffer and register the provided /* Write a formatted command to the output buffer. Registers the provided
* callback function and argument. When this function is called in a * callback function and argument in the callback list.
* non-blocking context, it is a no-op. Always returns NULL. */ *
* Always returns NULL. In a non-blocking context this will never fail because
* this function does not do any I/O. In a blocking context this function will
* have no effect (a callback in a blocking context makes no sense). */
void *redisCommandWithCallback(redisContext *c, redisCallbackFn *fn, void *privdata, const char *format, ...) { void *redisCommandWithCallback(redisContext *c, redisCallbackFn *fn, void *privdata, const char *format, ...) {
va_list ap; va_list ap;
redisCallback cb = { fn, privdata };
sds cmd; sds cmd;
int status;
redisCallback cb = { fn, privdata };
/* Skip if the context is blocking. */ /* This function may only be used in a non-blocking context. */
if (c->flags & HIREDIS_BLOCK) return NULL; if (c->flags & HIREDIS_BLOCK) return NULL;
va_start(ap,format); va_start(ap,format);
cmd = redisFormatCommand(format,ap); cmd = redisFormatCommand(format,ap);
va_end(ap); va_end(ap);
redisCommandWrite(c,&cb,cmd,sdslen(cmd)); status = redisCommandWriteNonBlock(c,&cb,cmd,sdslen(cmd));
sdsfree(cmd); sdsfree(cmd);
return NULL; return NULL;
} }

View File

@ -63,7 +63,6 @@ typedef struct redisReadTask {
} redisReadTask; } redisReadTask;
typedef struct redisReplyObjectFunctions { typedef struct redisReplyObjectFunctions {
void *(*createError)(const char*, size_t);
void *(*createString)(redisReadTask*, char*, size_t); void *(*createString)(redisReadTask*, char*, size_t);
void *(*createArray)(redisReadTask*, int); void *(*createArray)(redisReadTask*, int);
void *(*createInteger)(redisReadTask*, long long); void *(*createInteger)(redisReadTask*, long long);
@ -85,7 +84,7 @@ typedef struct redisCallback {
typedef struct redisContext { typedef struct redisContext {
int fd; int fd;
int flags; int flags;
void *error; /* Error object is set when in erronous state */ sds error; /* Error object is set when in erronous state */
sds obuf; /* Write buffer */ sds obuf; /* Write buffer */
redisReplyFunctions *fn; redisReplyFunctions *fn;
void *reader; void *reader;
@ -97,15 +96,16 @@ typedef struct redisContext {
void freeReplyObject(void *reply); void freeReplyObject(void *reply);
void *redisReplyReaderCreate(redisReplyFunctions *fn); void *redisReplyReaderCreate(redisReplyFunctions *fn);
void *redisReplyReaderGetObject(void *reader); void *redisReplyReaderGetObject(void *reader);
char *redisReplyReaderGetError(void *reader);
void redisReplyReaderFree(void *ptr); void redisReplyReaderFree(void *ptr);
void redisReplyReaderFeed(void *reader, char *buf, int len); void redisReplyReaderFeed(void *reader, char *buf, int len);
void *redisReplyReaderGetReply(void *reader); int redisReplyReaderGetReply(void *reader, void **reply);
redisContext *redisConnect(const char *ip, int port, redisReplyFunctions *fn); redisContext *redisConnect(const char *ip, int port, redisReplyFunctions *fn);
redisContext *redisConnectNonBlock(const char *ip, int port, redisReplyFunctions *fn); redisContext *redisConnectNonBlock(const char *ip, int port, redisReplyFunctions *fn);
int redisBufferRead(redisContext *c); int redisBufferRead(redisContext *c);
int redisBufferWrite(redisContext *c, int *done); int redisBufferWrite(redisContext *c, int *done);
void *redisGetReply(redisContext *c); int redisGetReply(redisContext *c, void **reply);
int redisProcessCallbacks(redisContext *c); int redisProcessCallbacks(redisContext *c);
void *redisCommand(redisContext *c, const char *format, ...); void *redisCommand(redisContext *c, const char *format, ...);

25
test.c
View File

@ -25,18 +25,17 @@ static void __connect(redisContext **c) {
} }
int main(void) { int main(void) {
int i, tests = 0, fails = 0; int i, ret, tests = 0, fails = 0;
long long t1, t2; long long t1, t2;
redisContext *c; redisContext *c;
redisReply *reply; redisReply *reply;
void *reader; void *reader;
char *err;
__connect(&c); __connect(&c);
test("Returns I/O error when the connection is lost: "); test("Returns I/O error when the connection is lost: ");
reply = redisCommand(c,"QUIT"); test_cond(redisCommand(c,"QUIT") == NULL &&
test_cond(reply->type == REDIS_ERROR && strcmp(c->error,"Server closed the connection") == 0);
strcmp(reply->reply,"Server closed the connection") == 0);
freeReplyObject(reply);
__connect(&c); /* reconnect */ __connect(&c); /* reconnect */
test("Is able to deliver commands: "); test("Is able to deliver commands: ");
@ -126,10 +125,10 @@ int main(void) {
test("Error handling in reply parser: "); test("Error handling in reply parser: ");
reader = redisReplyReaderCreate(NULL); reader = redisReplyReaderCreate(NULL);
redisReplyReaderFeed(reader,(char*)"@foo\r\n",6); redisReplyReaderFeed(reader,(char*)"@foo\r\n",6);
reply = redisReplyReaderGetReply(reader); ret = redisReplyReaderGetReply(reader,(void*)&reply);
test_cond(reply->type == REDIS_ERROR && err = redisReplyReaderGetError(reader);
strcasecmp(reply->reply,"protocol error, got \"@\" as reply type byte") == 0); test_cond(ret == REDIS_ERR &&
freeReplyObject(reply); strcasecmp(err,"protocol error, got \"@\" as reply type byte") == 0);
redisReplyReaderFree(reader); redisReplyReaderFree(reader);
/* when the reply already contains multiple items, they must be free'd /* when the reply already contains multiple items, they must be free'd
@ -139,10 +138,10 @@ int main(void) {
redisReplyReaderFeed(reader,(char*)"*2\r\n",4); redisReplyReaderFeed(reader,(char*)"*2\r\n",4);
redisReplyReaderFeed(reader,(char*)"$5\r\nhello\r\n",11); redisReplyReaderFeed(reader,(char*)"$5\r\nhello\r\n",11);
redisReplyReaderFeed(reader,(char*)"@foo\r\n",6); redisReplyReaderFeed(reader,(char*)"@foo\r\n",6);
reply = redisReplyReaderGetReply(reader); ret = redisReplyReaderGetReply(reader,(void*)&reply);
test_cond(reply->type == REDIS_ERROR && err = redisReplyReaderGetError(reader);
strcasecmp(reply->reply,"protocol error, got \"@\" as reply type byte") == 0); test_cond(ret == REDIS_ERR &&
freeReplyObject(reply); strcasecmp(err,"protocol error, got \"@\" as reply type byte") == 0);
redisReplyReaderFree(reader); redisReplyReaderFree(reader);
test("Throughput:\n"); test("Throughput:\n");