Merge pull request #932 from kristjanvalur/pr3
Polling adapter and example
This commit is contained in:
commit
eaa2a7ee77
5
Makefile
5
Makefile
@ -4,7 +4,7 @@
|
||||
# This file is released under the BSD license, see the COPYING file
|
||||
|
||||
OBJ=alloc.o net.o hiredis.o sds.o async.o read.o sockcompat.o
|
||||
EXAMPLES=hiredis-example hiredis-example-libevent hiredis-example-libev hiredis-example-glib hiredis-example-push
|
||||
EXAMPLES=hiredis-example hiredis-example-libevent hiredis-example-libev hiredis-example-glib hiredis-example-push hiredis-example-poll
|
||||
TESTS=hiredis-test
|
||||
LIBNAME=libhiredis
|
||||
PKGCONFNAME=hiredis.pc
|
||||
@ -192,6 +192,9 @@ hiredis-example-macosx: examples/example-macosx.c adapters/macosx.h $(STLIBNAME)
|
||||
hiredis-example-ssl: examples/example-ssl.c $(STLIBNAME) $(SSL_STLIBNAME)
|
||||
$(CC) -o examples/$@ $(REAL_CFLAGS) -I. $< $(STLIBNAME) $(SSL_STLIBNAME) $(REAL_LDFLAGS) $(SSL_LDFLAGS)
|
||||
|
||||
hiredis-example-poll: examples/example-poll.c adapters/poll.h $(STLIBNAME)
|
||||
$(CC) -o examples/$@ $(REAL_CFLAGS) -I. $< $(STLIBNAME) $(REAL_LDFLAGS)
|
||||
|
||||
ifndef AE_DIR
|
||||
hiredis-example-ae:
|
||||
@echo "Please specify AE_DIR (e.g. <redis repository>/src)"
|
||||
|
197
adapters/poll.h
Normal file
197
adapters/poll.h
Normal file
@ -0,0 +1,197 @@
|
||||
|
||||
#ifndef HIREDIS_POLL_H
|
||||
#define HIREDIS_POLL_H
|
||||
|
||||
#include "../async.h"
|
||||
#include "../sockcompat.h"
|
||||
#include <string.h> // for memset
|
||||
#include <errno.h>
|
||||
|
||||
/* Values to return from redisPollTick */
|
||||
#define REDIS_POLL_HANDLED_READ 1
|
||||
#define REDIS_POLL_HANDLED_WRITE 2
|
||||
#define REDIS_POLL_HANDLED_TIMEOUT 4
|
||||
|
||||
/* An adapter to allow manual polling of the async context by checking the state
|
||||
* of the underlying file descriptor. Useful in cases where there is no formal
|
||||
* IO event loop but regular ticking can be used, such as in game engines. */
|
||||
|
||||
typedef struct redisPollEvents {
|
||||
redisAsyncContext *context;
|
||||
redisFD fd;
|
||||
char reading, writing;
|
||||
char in_tick;
|
||||
char deleted;
|
||||
double deadline;
|
||||
} redisPollEvents;
|
||||
|
||||
static double redisPollTimevalToDouble(struct timeval *tv) {
|
||||
if (tv == NULL)
|
||||
return 0.0;
|
||||
return tv->tv_sec + tv->tv_usec / 1000000.00;
|
||||
}
|
||||
|
||||
static double redisPollGetNow(void) {
|
||||
#ifndef _MSC_VER
|
||||
struct timeval tv;
|
||||
gettimeofday(&tv,NULL);
|
||||
return redisPollTimevalToDouble(&tv);
|
||||
#else
|
||||
FILETIME ft;
|
||||
ULARGE_INTEGER li;
|
||||
GetSystemTimeAsFileTime(&ft);
|
||||
li.HighPart = ft.dwHighDateTime;
|
||||
li.LowPart = ft.dwLowDateTime;
|
||||
return (double)li.QuadPart * 1e-7;
|
||||
#endif
|
||||
}
|
||||
|
||||
/* Poll for io, handling any pending callbacks. The timeout argument can be
|
||||
* positive to wait for a maximum given time for IO, zero to poll, or negative
|
||||
* to wait forever */
|
||||
static int redisPollTick(redisAsyncContext *ac, double timeout) {
|
||||
int reading, writing;
|
||||
struct pollfd pfd;
|
||||
int handled;
|
||||
int ns;
|
||||
int itimeout;
|
||||
|
||||
redisPollEvents *e = (redisPollEvents*)ac->ev.data;
|
||||
if (!e)
|
||||
return 0;
|
||||
|
||||
/* local flags, won't get changed during callbacks */
|
||||
reading = e->reading;
|
||||
writing = e->writing;
|
||||
if (!reading && !writing)
|
||||
return 0;
|
||||
|
||||
pfd.fd = e->fd;
|
||||
pfd.events = 0;
|
||||
if (reading)
|
||||
pfd.events = POLLIN;
|
||||
if (writing)
|
||||
pfd.events |= POLLOUT;
|
||||
|
||||
if (timeout >= 0.0) {
|
||||
itimeout = (int)(timeout * 1000.0);
|
||||
} else {
|
||||
itimeout = -1;
|
||||
}
|
||||
|
||||
ns = poll(&pfd, 1, itimeout);
|
||||
if (ns < 0) {
|
||||
/* ignore the EINTR error */
|
||||
if (errno != EINTR)
|
||||
return ns;
|
||||
ns = 0;
|
||||
}
|
||||
|
||||
handled = 0;
|
||||
e->in_tick = 1;
|
||||
if (ns) {
|
||||
if (reading && (pfd.revents & POLLIN)) {
|
||||
redisAsyncHandleRead(ac);
|
||||
handled |= REDIS_POLL_HANDLED_READ;
|
||||
}
|
||||
/* on Windows, connection failure is indicated with the Exception fdset.
|
||||
* handle it the same as writable. */
|
||||
if (writing && (pfd.revents & (POLLOUT | POLLERR))) {
|
||||
/* context Read callback may have caused context to be deleted, e.g.
|
||||
by doing an redisAsyncDisconnect() */
|
||||
if (!e->deleted) {
|
||||
redisAsyncHandleWrite(ac);
|
||||
handled |= REDIS_POLL_HANDLED_WRITE;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* perform timeouts */
|
||||
if (!e->deleted && e->deadline != 0.0) {
|
||||
double now = redisPollGetNow();
|
||||
if (now >= e->deadline) {
|
||||
/* deadline has passed. disable timeout and perform callback */
|
||||
e->deadline = 0.0;
|
||||
redisAsyncHandleTimeout(ac);
|
||||
handled |= REDIS_POLL_HANDLED_TIMEOUT;
|
||||
}
|
||||
}
|
||||
|
||||
/* do a delayed cleanup if required */
|
||||
if (e->deleted)
|
||||
hi_free(e);
|
||||
else
|
||||
e->in_tick = 0;
|
||||
|
||||
return handled;
|
||||
}
|
||||
|
||||
static void redisPollAddRead(void *data) {
|
||||
redisPollEvents *e = (redisPollEvents*)data;
|
||||
e->reading = 1;
|
||||
}
|
||||
|
||||
static void redisPollDelRead(void *data) {
|
||||
redisPollEvents *e = (redisPollEvents*)data;
|
||||
e->reading = 0;
|
||||
}
|
||||
|
||||
static void redisPollAddWrite(void *data) {
|
||||
redisPollEvents *e = (redisPollEvents*)data;
|
||||
e->writing = 1;
|
||||
}
|
||||
|
||||
static void redisPollDelWrite(void *data) {
|
||||
redisPollEvents *e = (redisPollEvents*)data;
|
||||
e->writing = 0;
|
||||
}
|
||||
|
||||
static void redisPollCleanup(void *data) {
|
||||
redisPollEvents *e = (redisPollEvents*)data;
|
||||
|
||||
/* if we are currently processing a tick, postpone deletion */
|
||||
if (e->in_tick)
|
||||
e->deleted = 1;
|
||||
else
|
||||
hi_free(e);
|
||||
}
|
||||
|
||||
static void redisPollScheduleTimer(void *data, struct timeval tv)
|
||||
{
|
||||
redisPollEvents *e = (redisPollEvents*)data;
|
||||
double now = redisPollGetNow();
|
||||
e->deadline = now + redisPollTimevalToDouble(&tv);
|
||||
}
|
||||
|
||||
static int redisPollAttach(redisAsyncContext *ac) {
|
||||
redisContext *c = &(ac->c);
|
||||
redisPollEvents *e;
|
||||
|
||||
/* Nothing should be attached when something is already attached */
|
||||
if (ac->ev.data != NULL)
|
||||
return REDIS_ERR;
|
||||
|
||||
/* Create container for context and r/w events */
|
||||
e = (redisPollEvents*)hi_malloc(sizeof(*e));
|
||||
if (e == NULL)
|
||||
return REDIS_ERR;
|
||||
memset(e, 0, sizeof(*e));
|
||||
|
||||
e->context = ac;
|
||||
e->fd = c->fd;
|
||||
e->reading = e->writing = 0;
|
||||
e->in_tick = e->deleted = 0;
|
||||
e->deadline = 0.0;
|
||||
|
||||
/* Register functions to start/stop listening for events */
|
||||
ac->ev.addRead = redisPollAddRead;
|
||||
ac->ev.delRead = redisPollDelRead;
|
||||
ac->ev.addWrite = redisPollAddWrite;
|
||||
ac->ev.delWrite = redisPollDelWrite;
|
||||
ac->ev.scheduleTimer = redisPollScheduleTimer;
|
||||
ac->ev.cleanup = redisPollCleanup;
|
||||
ac->ev.data = e;
|
||||
|
||||
return REDIS_OK;
|
||||
}
|
||||
#endif /* HIREDIS_POLL_H */
|
62
examples/example-poll.c
Normal file
62
examples/example-poll.c
Normal file
@ -0,0 +1,62 @@
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <signal.h>
|
||||
#include <unistd.h>
|
||||
|
||||
#include <async.h>
|
||||
#include <adapters/poll.h>
|
||||
|
||||
/* Put in the global scope, so that loop can be explicitly stopped */
|
||||
static int exit_loop = 0;
|
||||
|
||||
void getCallback(redisAsyncContext *c, void *r, void *privdata) {
|
||||
redisReply *reply = r;
|
||||
if (reply == NULL) return;
|
||||
printf("argv[%s]: %s\n", (char*)privdata, reply->str);
|
||||
|
||||
/* Disconnect after receiving the reply to GET */
|
||||
redisAsyncDisconnect(c);
|
||||
}
|
||||
|
||||
void connectCallback(const redisAsyncContext *c, int status) {
|
||||
if (status != REDIS_OK) {
|
||||
printf("Error: %s\n", c->errstr);
|
||||
exit_loop = 1;
|
||||
return;
|
||||
}
|
||||
|
||||
printf("Connected...\n");
|
||||
}
|
||||
|
||||
void disconnectCallback(const redisAsyncContext *c, int status) {
|
||||
exit_loop = 1;
|
||||
if (status != REDIS_OK) {
|
||||
printf("Error: %s\n", c->errstr);
|
||||
return;
|
||||
}
|
||||
|
||||
printf("Disconnected...\n");
|
||||
}
|
||||
|
||||
int main (int argc, char **argv) {
|
||||
signal(SIGPIPE, SIG_IGN);
|
||||
|
||||
redisAsyncContext *c = redisAsyncConnect("127.0.0.1", 6379);
|
||||
if (c->err) {
|
||||
/* Let *c leak for now... */
|
||||
printf("Error: %s\n", c->errstr);
|
||||
return 1;
|
||||
}
|
||||
|
||||
redisPollAttach(c);
|
||||
redisAsyncSetConnectCallback(c,connectCallback);
|
||||
redisAsyncSetDisconnectCallback(c,disconnectCallback);
|
||||
redisAsyncCommand(c, NULL, NULL, "SET key %b", argv[argc-1], strlen(argv[argc-1]));
|
||||
redisAsyncCommand(c, getCallback, (char*)"end-1", "GET key");
|
||||
while (!exit_loop)
|
||||
{
|
||||
redisPollTick(c, 0.1);
|
||||
}
|
||||
return 0;
|
||||
}
|
222
test.c
222
test.c
@ -15,6 +15,7 @@
|
||||
|
||||
#include "hiredis.h"
|
||||
#include "async.h"
|
||||
#include "adapters/poll.h"
|
||||
#ifdef HIREDIS_TEST_SSL
|
||||
#include "hiredis_ssl.h"
|
||||
#endif
|
||||
@ -1898,6 +1899,217 @@ static void test_monitor(struct config config) {
|
||||
}
|
||||
#endif /* HIREDIS_TEST_ASYNC */
|
||||
|
||||
/* tests for async api using polling adapter, requires no extra libraries*/
|
||||
|
||||
/* enum for the test cases, the callbacks have different logic based on them */
|
||||
typedef enum astest_no
|
||||
{
|
||||
ASTEST_CONNECT=0,
|
||||
ASTEST_CONN_TIMEOUT,
|
||||
ASTEST_PINGPONG,
|
||||
ASTEST_PINGPONG_TIMEOUT
|
||||
}astest_no;
|
||||
|
||||
/* a static context for the async tests */
|
||||
struct _astest {
|
||||
redisAsyncContext *ac;
|
||||
astest_no testno;
|
||||
int counter;
|
||||
int connects;
|
||||
int connect_status;
|
||||
int disconnects;
|
||||
int disconnect_status;
|
||||
int connected;
|
||||
int err;
|
||||
char errstr[256];
|
||||
};
|
||||
static struct _astest astest;
|
||||
|
||||
static void asSleep(int ms)
|
||||
{
|
||||
#if _MSC_VER
|
||||
Sleep(ms);
|
||||
#else
|
||||
usleep(ms*1000);
|
||||
#endif
|
||||
}
|
||||
|
||||
/* async callbacks */
|
||||
static void asCleanup(void* data)
|
||||
{
|
||||
struct _astest *t = (struct _astest *)data;
|
||||
t->ac = NULL;
|
||||
}
|
||||
|
||||
static void connectCallback(const redisAsyncContext *c, int status) {
|
||||
struct _astest *t = (struct _astest *)c->data;
|
||||
assert(t == &astest);
|
||||
assert(t->connects == 0);
|
||||
t->err = c->err;
|
||||
strcpy(t->errstr, c->errstr);
|
||||
t->connects++;
|
||||
t->connect_status = status;
|
||||
t->connected = status == REDIS_OK ? 1 : -1;
|
||||
}
|
||||
static void disconnectCallback(const redisAsyncContext *c, int status) {
|
||||
assert(c->data == (void*)&astest);
|
||||
assert(astest.disconnects == 0);
|
||||
astest.err = c->err;
|
||||
strcpy(astest.errstr, c->errstr);
|
||||
astest.disconnects++;
|
||||
astest.disconnect_status = status;
|
||||
astest.connected = 0;
|
||||
}
|
||||
|
||||
static void commandCallback(struct redisAsyncContext *ac, void* _reply, void* _privdata)
|
||||
{
|
||||
redisReply *reply = (redisReply*)_reply;
|
||||
struct _astest *t = (struct _astest *)ac->data;
|
||||
assert(t == &astest);
|
||||
(void)_privdata;
|
||||
t->err = ac->err;
|
||||
strcpy(t->errstr, ac->errstr);
|
||||
if (t->testno == ASTEST_PINGPONG)
|
||||
{
|
||||
test_cond(reply != NULL && reply->type == REDIS_REPLY_STATUS && strcmp(reply->str, "PONG") == 0);
|
||||
redisAsyncFree(ac);
|
||||
}
|
||||
if (t->testno == ASTEST_PINGPONG_TIMEOUT)
|
||||
{
|
||||
/* two ping pongs */
|
||||
assert(reply != NULL && reply->type == REDIS_REPLY_STATUS && strcmp(reply->str, "PONG") == 0);
|
||||
if (++t->counter == 1) {
|
||||
int status = redisAsyncCommand(ac, commandCallback, NULL, "PING");
|
||||
assert(status == REDIS_OK);
|
||||
} else {
|
||||
test_cond(reply != NULL && reply->type == REDIS_REPLY_STATUS && strcmp(reply->str, "PONG") == 0);
|
||||
redisAsyncFree(ac);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static redisAsyncContext *do_aconnect(struct config config, astest_no testno)
|
||||
{
|
||||
redisOptions options = {0};
|
||||
memset(&astest, 0, sizeof(astest));
|
||||
|
||||
astest.testno = testno;
|
||||
astest.connect_status = astest.disconnect_status = -2;
|
||||
|
||||
if (config.type == CONN_TCP) {
|
||||
options.type = REDIS_CONN_TCP;
|
||||
options.connect_timeout = &config.tcp.timeout;
|
||||
REDIS_OPTIONS_SET_TCP(&options, config.tcp.host, config.tcp.port);
|
||||
} else if (config.type == CONN_SSL) {
|
||||
options.type = REDIS_CONN_TCP;
|
||||
options.connect_timeout = &config.tcp.timeout;
|
||||
REDIS_OPTIONS_SET_TCP(&options, config.ssl.host, config.ssl.port);
|
||||
} else if (config.type == CONN_UNIX) {
|
||||
options.type = REDIS_CONN_UNIX;
|
||||
options.endpoint.unix_socket = config.unix_sock.path;
|
||||
} else if (config.type == CONN_FD) {
|
||||
options.type = REDIS_CONN_USERFD;
|
||||
/* Create a dummy connection just to get an fd to inherit */
|
||||
redisContext *dummy_ctx = redisConnectUnix(config.unix_sock.path);
|
||||
if (dummy_ctx) {
|
||||
redisFD fd = disconnect(dummy_ctx, 1);
|
||||
printf("Connecting to inherited fd %d\n", (int)fd);
|
||||
options.endpoint.fd = fd;
|
||||
}
|
||||
}
|
||||
redisAsyncContext *c = redisAsyncConnectWithOptions(&options);
|
||||
assert(c);
|
||||
astest.ac = c;
|
||||
c->data = &astest;
|
||||
c->dataCleanup = asCleanup;
|
||||
redisPollAttach(c);
|
||||
redisAsyncSetConnectCallback(c, connectCallback);
|
||||
redisAsyncSetDisconnectCallback(c, disconnectCallback);
|
||||
return c;
|
||||
}
|
||||
|
||||
static void as_printerr(void) {
|
||||
printf("Async err %d : %s\n", astest.err, astest.errstr);
|
||||
}
|
||||
|
||||
#define ASASSERT(e) do { \
|
||||
if (!(e)) \
|
||||
as_printerr(); \
|
||||
assert(e); \
|
||||
} while (0);
|
||||
|
||||
static void test_async_polling(struct config config) {
|
||||
int status;
|
||||
redisAsyncContext *c;
|
||||
struct config defaultconfig = config;
|
||||
|
||||
test("Async connect: ");
|
||||
c = do_aconnect(config, ASTEST_CONNECT);
|
||||
assert(c);
|
||||
while(astest.connected == 0)
|
||||
redisPollTick(c, 0.1);
|
||||
assert(astest.connects == 1);
|
||||
ASASSERT(astest.connect_status == REDIS_OK);
|
||||
assert(astest.disconnects == 0);
|
||||
test_cond(astest.connected == 1);
|
||||
|
||||
test("Async free after connect: ");
|
||||
assert(astest.ac != NULL);
|
||||
redisAsyncFree(c);
|
||||
assert(astest.disconnects == 1);
|
||||
assert(astest.ac == NULL);
|
||||
test_cond(astest.disconnect_status == REDIS_OK);
|
||||
|
||||
if (config.type == CONN_TCP || config.type == CONN_SSL) {
|
||||
/* timeout can only be simulated with network */
|
||||
test("Async connect timeout: ");
|
||||
config.tcp.host = "192.168.254.254"; /* blackhole ip */
|
||||
config.tcp.timeout.tv_usec = 100000;
|
||||
c = do_aconnect(config, ASTEST_CONN_TIMEOUT);
|
||||
assert(c);
|
||||
assert(c->err == 0);
|
||||
while(astest.connected == 0)
|
||||
redisPollTick(c, 0.1);
|
||||
assert(astest.connected == -1);
|
||||
/*
|
||||
* freeing should not be done, clearing should have happened.
|
||||
*redisAsyncFree(c);
|
||||
*/
|
||||
assert(astest.ac == NULL);
|
||||
test_cond(astest.connect_status == REDIS_ERR);
|
||||
config = defaultconfig;
|
||||
}
|
||||
|
||||
/* Test a ping/pong after connection */
|
||||
test("Async PING/PONG: ");
|
||||
c = do_aconnect(config, ASTEST_PINGPONG);
|
||||
while(astest.connected == 0)
|
||||
redisPollTick(c, 0.1);
|
||||
status = redisAsyncCommand(c, commandCallback, NULL, "PING");
|
||||
assert(status == REDIS_OK);
|
||||
while(astest.ac)
|
||||
redisPollTick(c, 0.1);
|
||||
|
||||
/* Test a ping/pong after connection that didn't time out.
|
||||
* see https://github.com/redis/hiredis/issues/945
|
||||
*/
|
||||
if (config.type == CONN_TCP || config.type == CONN_SSL) {
|
||||
test("Async PING/PONG after connect timeout: ");
|
||||
config.tcp.timeout.tv_usec = 10000; /* 10ms */
|
||||
c = do_aconnect(config, ASTEST_PINGPONG_TIMEOUT);
|
||||
while(astest.connected == 0)
|
||||
redisPollTick(c, 0.1);
|
||||
/* sleep 0.1 s, allowing old timeout to arrive */
|
||||
asSleep(10);
|
||||
status = redisAsyncCommand(c, commandCallback, NULL, "PING");
|
||||
assert(status == REDIS_OK);
|
||||
while(astest.ac)
|
||||
redisPollTick(c, 0.1);
|
||||
config = defaultconfig;
|
||||
}
|
||||
}
|
||||
/* End of Async polling_adapter driven tests */
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
struct config cfg = {
|
||||
.tcp = {
|
||||
@ -2017,6 +2229,7 @@ int main(int argc, char **argv) {
|
||||
#endif
|
||||
|
||||
#ifdef HIREDIS_TEST_ASYNC
|
||||
cfg.type = CONN_TCP;
|
||||
printf("\nTesting asynchronous API against TCP connection (%s:%d):\n", cfg.tcp.host, cfg.tcp.port);
|
||||
cfg.type = CONN_TCP;
|
||||
|
||||
@ -2034,6 +2247,15 @@ int main(int argc, char **argv) {
|
||||
}
|
||||
#endif /* HIREDIS_TEST_ASYNC */
|
||||
|
||||
cfg.type = CONN_TCP;
|
||||
printf("\nTesting asynchronous API using polling_adapter TCP (%s:%d):\n", cfg.tcp.host, cfg.tcp.port);
|
||||
test_async_polling(cfg);
|
||||
if (test_unix_socket) {
|
||||
cfg.type = CONN_UNIX;
|
||||
printf("\nTesting asynchronous API using polling_adapter UNIX (%s):\n", cfg.unix_sock.path);
|
||||
test_async_polling(cfg);
|
||||
}
|
||||
|
||||
if (test_inherit_fd) {
|
||||
printf("\nTesting against inherited fd (%s): ", cfg.unix_sock.path);
|
||||
if (test_unix_socket) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user