diff --git a/async.c b/async.c index e37afbde7..9aac48f9b 100644 --- a/async.c +++ b/async.c @@ -291,6 +291,21 @@ static void __redisRunCallback(redisAsyncContext *ac, redisCallback *cb, redisRe c->flags |= REDIS_IN_CALLBACK; cb->fn(ac,reply,cb->privdata); c->flags &= ~REDIS_IN_CALLBACK; + /* Always run finalizer when calling with NULL reply. */ + if (reply == NULL && cb->finalizer != NULL) { + c->flags |= REDIS_IN_FINALIZER; + cb->finalizer(ac,cb->privdata); + c->flags &= ~REDIS_IN_FINALIZER; + } + } +} + +static void __redisRunFinalizer(redisAsyncContext *ac, redisCallback *cb) { + redisContext *c = &(ac->c); + if (cb->fn != NULL && cb->finalizer != NULL) { + c->flags |= REDIS_IN_FINALIZER; + cb->finalizer(ac,cb->privdata); + c->flags &= ~REDIS_IN_FINALIZER; } } @@ -362,7 +377,7 @@ static void __redisAsyncFree(redisAsyncContext *ac) { void redisAsyncFree(redisAsyncContext *ac) { redisContext *c = &(ac->c); c->flags |= REDIS_FREEING; - if (!(c->flags & REDIS_IN_CALLBACK)) + if (!(c->flags & (REDIS_IN_CALLBACK | REDIS_IN_FINALIZER))) __redisAsyncFree(ac); } @@ -406,11 +421,11 @@ void redisAsyncDisconnect(redisAsyncContext *ac) { /** unset the auto-free flag here, because disconnect undoes this */ c->flags &= ~REDIS_NO_AUTO_FREE; - if (!(c->flags & REDIS_IN_CALLBACK) && ac->replies.head == NULL) + if (!(c->flags & (REDIS_IN_CALLBACK | REDIS_IN_FINALIZER)) && ac->replies.head == NULL) __redisAsyncDisconnect(ac); } -static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, redisCallback *dstcb) { +static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, redisCallback *dstcb, int* call_finalizer) { redisContext *c = &(ac->c); dict *callbacks; redisCallback *cb; @@ -421,6 +436,7 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, /* Custom reply functions are not supported for pub/sub. This will fail * very hard when they are used... */ + *call_finalizer = 0; if (reply->type == REDIS_REPLY_ARRAY || reply->type == REDIS_REPLY_PUSH) { assert(reply->elements >= 2); assert(reply->element[0]->type == REDIS_REPLY_STRING); @@ -451,8 +467,10 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, /* If this is an unsubscribe message, remove it. */ if (strcasecmp(stype+pvariant,"unsubscribe") == 0) { - if (cb->pending_subs == 0) + if (cb->pending_subs == 0) { dictDelete(callbacks,sname); + *call_finalizer = 1; + } /* If this was the last unsubscribe message, revert to * non-subscribe mode. */ @@ -469,6 +487,7 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, } else { /* Shift callback for invalid commands. */ __redisShiftCallback(&ac->sub.invalid,dstcb); + *call_finalizer = 1; } return REDIS_OK; oom: @@ -502,11 +521,12 @@ static int redisIsSubscribeReply(redisReply *reply) { void redisProcessCallbacks(redisAsyncContext *ac) { redisContext *c = &(ac->c); - redisCallback cb = {NULL, NULL, 0, NULL}; + redisCallback cb = {NULL, NULL, NULL, 0, NULL}; void *reply = NULL; int status; while((status = redisGetReply(c,&reply)) == REDIS_OK) { + int call_finalizer = 0; if (reply == NULL) { /* When the connection is being disconnected and there are * no more replies, this is the cue to really disconnect. */ @@ -564,11 +584,16 @@ void redisProcessCallbacks(redisAsyncContext *ac) { /* No more regular callbacks and no errors, the context *must* be subscribed or monitoring. */ assert((c->flags & REDIS_SUBSCRIBED || c->flags & REDIS_MONITORING)); if(c->flags & REDIS_SUBSCRIBED) - __redisGetSubscribeCallback(ac,reply,&cb); + __redisGetSubscribeCallback(ac,reply,&cb,&call_finalizer); + } else { + call_finalizer = !(c->flags & REDIS_MONITORING); } if (cb.fn != NULL) { __redisRunCallback(ac,&cb,reply); + if (call_finalizer) { + __redisRunFinalizer(ac,&cb); + } if (!(c->flags & REDIS_NO_AUTO_FREE_REPLIES)){ c->reader->fn->freeObject(reply); } @@ -745,7 +770,7 @@ static const char *nextArgument(const char *start, const char **str, size_t *len /* Helper function for the redisAsyncCommand* family of functions. Writes a * formatted command to the output buffer and registers the provided callback * function with the context. */ -static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *cmd, size_t len) { +static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, redisFinalizerCallback *finalizer, void *privdata, const char *cmd, size_t len) { redisContext *c = &(ac->c); redisCallback cb; struct dict *cbdict; @@ -758,11 +783,13 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void sds sname; int ret; - /* Don't accept new commands when the connection is about to be closed. */ - if (c->flags & (REDIS_DISCONNECTING | REDIS_FREEING)) return REDIS_ERR; + /* Don't accept new commands when the connection is about to be closed. Also, don't accept new + * commands when running a finalizer. */ + if (c->flags & (REDIS_DISCONNECTING | REDIS_FREEING | REDIS_IN_FINALIZER)) return REDIS_ERR; /* Setup callback */ cb.fn = fn; + cb.finalizer = finalizer; cb.privdata = privdata; cb.pending_subs = 1; @@ -793,6 +820,8 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void if (de != NULL) { existcb = dictGetEntryVal(de); cb.pending_subs = existcb->pending_subs + 1; + if (existcb->privdata != cb.privdata) + __redisRunFinalizer(ac,existcb); } ret = dictReplace(cbdict,sname,&cb); @@ -837,6 +866,10 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void } int redisvAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, va_list ap) { + return redisvAsyncCommandWithFinalizer(ac, fn, NULL, privdata, format, ap); +} + +int redisvAsyncCommandWithFinalizer(redisAsyncContext *ac, redisCallbackFn *fn, redisFinalizerCallback *finalizer, void *privdata, const char *format, va_list ap) { char *cmd; int len; int status; @@ -846,7 +879,7 @@ int redisvAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdat if (len < 0) return REDIS_ERR; - status = __redisAsyncCommand(ac,fn,privdata,cmd,len); + status = __redisAsyncCommand(ac,fn,finalizer,privdata,cmd,len); hi_free(cmd); return status; } @@ -860,20 +893,38 @@ int redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata return status; } +int redisAsyncCommandWithFinalizer(redisAsyncContext *ac, redisCallbackFn *fn, redisFinalizerCallback *finalizer, void *privdata, const char *format, ...) { + va_list ap; + int status; + va_start(ap,format); + status = redisvAsyncCommandWithFinalizer(ac,fn,finalizer,privdata,format,ap); + va_end(ap); + return status; +} + int redisAsyncCommandArgv(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, int argc, const char **argv, const size_t *argvlen) { + return redisAsyncCommandArgvWithFinalizer(ac, fn, NULL, privdata, argc, argv, argvlen); +} + +int redisAsyncCommandArgvWithFinalizer(redisAsyncContext *ac, redisCallbackFn *fn, redisFinalizerCallback *finalizer, void *privdata, int argc, const char **argv, const size_t *argvlen) { sds cmd; int len; int status; len = redisFormatSdsCommandArgv(&cmd,argc,argv,argvlen); if (len < 0) return REDIS_ERR; - status = __redisAsyncCommand(ac,fn,privdata,cmd,len); + status = __redisAsyncCommand(ac,fn,finalizer,privdata,cmd,len); sdsfree(cmd); return status; } int redisAsyncFormattedCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *cmd, size_t len) { - int status = __redisAsyncCommand(ac,fn,privdata,cmd,len); + int status = __redisAsyncCommand(ac,fn,NULL,privdata,cmd,len); + return status; +} + +int redisAsyncFormattedCommandWithFinalizer(redisAsyncContext *ac, redisCallbackFn *fn, redisFinalizerCallback *finalizer, void *privdata, const char *cmd, size_t len) { + int status = __redisAsyncCommand(ac,fn,finalizer,privdata,cmd,len); return status; } diff --git a/async.h b/async.h index b1d2cb263..062e94437 100644 --- a/async.h +++ b/async.h @@ -42,9 +42,11 @@ struct dict; /* dictionary header is included in async.c */ /* Reply callback prototype and container */ typedef void (redisCallbackFn)(struct redisAsyncContext*, void*, void*); +typedef void (redisFinalizerCallback)(struct redisAsyncContext*, void*); typedef struct redisCallback { struct redisCallback *next; /* simple singly linked list */ redisCallbackFn *fn; + redisFinalizerCallback *finalizer; int pending_subs; void *privdata; } redisCallback; @@ -140,6 +142,11 @@ int redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata int redisAsyncCommandArgv(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, int argc, const char **argv, const size_t *argvlen); int redisAsyncFormattedCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *cmd, size_t len); +int redisvAsyncCommandWithFinalizer(redisAsyncContext *ac, redisCallbackFn *fn, redisFinalizerCallback *finalizer, void *privdata, const char *format, va_list ap); +int redisAsyncCommandWithFinalizer(redisAsyncContext *ac, redisCallbackFn *fn, redisFinalizerCallback *finalizer, void *privdata, const char *format, ...); +int redisAsyncCommandArgvWithFinalizer(redisAsyncContext *ac, redisCallbackFn *fn, redisFinalizerCallback *finalizer, void *privdata, int argc, const char **argv, const size_t *argvlen); +int redisAsyncFormattedCommandWithFinalizer(redisAsyncContext *ac, redisCallbackFn *fn, redisFinalizerCallback *finalizer, void *privdata, const char *cmd, size_t len); + #ifdef __cplusplus } #endif diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 1d5bc56e0..d2837ad9a 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -21,8 +21,10 @@ ENDIF() FIND_PATH(LIBEVENT event.h) if (LIBEVENT) - ADD_EXECUTABLE(example-libevent example-libevent) + ADD_EXECUTABLE(example-libevent example-libevent.c) TARGET_LINK_LIBRARIES(example-libevent hiredis event) + ADD_EXECUTABLE(example-libevent-pubsub example-libevent-pubsub.c) + TARGET_LINK_LIBRARIES(example-libevent-pubsub hiredis event) ENDIF() FIND_PATH(LIBUV uv.h) diff --git a/examples/example-libevent-pubsub.c b/examples/example-libevent-pubsub.c new file mode 100644 index 000000000..78f5ee383 --- /dev/null +++ b/examples/example-libevent-pubsub.c @@ -0,0 +1,169 @@ +#include +#include +#include +#include + +#include +#include +#include +#include + +void printReplyInternal(const redisReply* r) { + switch (r->type) { + case REDIS_REPLY_INTEGER: + printf("(integer %lld)", r->integer); + break; + case REDIS_REPLY_DOUBLE: + printf("(double %f)", r->dval); + break; + case REDIS_REPLY_ERROR: + printf("(error %s)", r->str); + break; + case REDIS_REPLY_STATUS: + printf("(status %s)", r->str); + break; + case REDIS_REPLY_STRING: + printf("(string %s)", r->str); + break; + case REDIS_REPLY_VERB: + printf("(verb %s)", r->str); + break; + case REDIS_REPLY_ARRAY: + printf("(array %zu", r->elements); + for (size_t i = 0; i < r->elements; ++i) { + putchar(' '); + printReplyInternal(r->element[i]); + } + putchar(')'); + break; + default: + printf("(?%d)", r->type); + break; + } +} + +void printReply(const redisReply* r) { + printReplyInternal(r); + putchar('\n'); +} + +void getCallback(redisAsyncContext *c, void *r, void *privdata) { + redisReply *reply = r; + if (reply == NULL) { + if (c->errstr) { + printf("errstr: %s\n", c->errstr); + } + return; + } + printf("argv[%s]: %s\n", (char*)privdata, reply->str); +} + +void getFinalizer(redisAsyncContext *c, void *privdata) { + printf("Get finalizer called\n"); + redisAsyncDisconnect(c); +} + +void connectCallback(const redisAsyncContext *c, int status) { + if (status != REDIS_OK) { + printf("Error: %s\n", c->errstr); + return; + } + printf("Connected...\n"); +} + +void disconnectCallback(const redisAsyncContext *c, int status) { + if (status != REDIS_OK) { + printf("Error: %s\n", c->errstr); + return; + } + printf("Disconnected...\n"); +} + +typedef struct _SubscribeData { + int break_loop; + int remaining_message_count; +} SubscribeData; + +void subscribeCallback(redisAsyncContext *c, void *r, void *privdata) { + redisReply *reply = r; + SubscribeData *sd = privdata; + if (reply == NULL) { + if (c->errstr) { + printf("errstr: %s\n", c->errstr); + } + return; + } + + printf("Subscribe reply: "); + printReply(reply); + + assert(reply->type == REDIS_REPLY_ARRAY); + assert(reply->elements == 3); + assert(reply->element[0]->type == REDIS_REPLY_STRING); + + if (!strcasecmp(reply->element[0]->str, "message")) { + if (--sd->remaining_message_count == 0) { + redisAsyncCommand(c, NULL, NULL, "UNSUBSCRIBE foo"); + } + } + + if (sd->break_loop) { + sd->break_loop = 0; + redisLibeventEvents* e = c->ev.data; + event_base_loopbreak(e->base); + } +} + +void subscribeFinalizer(redisAsyncContext *c, void *privdata) { + printf("Subscribe finalizer called\n"); + redisAsyncDisconnect(c); +} + +int main (int argc, char **argv) { +#ifndef _WIN32 + signal(SIGPIPE, SIG_IGN); +#endif + + struct event_base *base = event_base_new(); + redisOptions options = {0}; + REDIS_OPTIONS_SET_TCP(&options, "127.0.0.1", 6379); + struct timeval tv = {0}; + tv.tv_sec = 1; + options.connect_timeout = &tv; + + + redisAsyncContext *sub = redisAsyncConnectWithOptions(&options); + if (sub->err) { + /* Let *c leak for now... */ + printf("Error: %s\n", sub->errstr); + return 1; + } + + redisLibeventAttach(sub,base); + + redisAsyncContext *c = redisAsyncConnectWithOptions(&options); + if (c->err) { + /* Let *c leak for now... */ + printf("Error: %s\n", c->errstr); + return 1; + } + + redisLibeventAttach(c,base); + redisAsyncSetConnectCallback(c,connectCallback); + redisAsyncSetDisconnectCallback(c,disconnectCallback); + + SubscribeData sd; + memset(&sd, 0, sizeof(sd)); + sd.break_loop = 1; + sd.remaining_message_count = 3; + redisAsyncCommandWithFinalizer(sub, subscribeCallback, subscribeFinalizer, &sd, "SUBSCRIBE foo"); + event_base_dispatch(base); + + redisAsyncCommand(c, NULL, NULL, "SET key %b", argv[argc-1], strlen(argv[argc-1])); + for (int i = 0; i < 3; ++i) { + redisAsyncCommand(c, NULL, NULL, "PUBLISH foo msg%d", i); + } + redisAsyncCommandWithFinalizer(c, getCallback, getFinalizer, (char*)"end-1", "GET key"); + event_base_dispatch(base); + return 0; +} diff --git a/hiredis.h b/hiredis.h index be8525fb1..71f526739 100644 --- a/hiredis.h +++ b/hiredis.h @@ -89,6 +89,9 @@ typedef long long ssize_t; /* Flag that indicates the user does not want replies to be automatically freed */ #define REDIS_NO_AUTO_FREE_REPLIES 0x400 +/* Flag that is set when an async finalizer is executed. */ +#define REDIS_IN_FINALIZER 0x800 + #define REDIS_KEEPALIVE_INTERVAL 15 /* seconds */ /* number of times we retry to connect in the case of EADDRNOTAVAIL and