Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add finalizer support to async Redis client #926

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 63 additions & 12 deletions async.c
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,21 @@ static void __redisRunCallback(redisAsyncContext *ac, redisCallback *cb, redisRe
c->flags |= REDIS_IN_CALLBACK;
cb->fn(ac,reply,cb->privdata);
c->flags &= ~REDIS_IN_CALLBACK;
/* Always run finalizer when calling with NULL reply. */
if (reply == NULL && cb->finalizer != NULL) {
c->flags |= REDIS_IN_FINALIZER;
cb->finalizer(ac,cb->privdata);
c->flags &= ~REDIS_IN_FINALIZER;
}
}
}

static void __redisRunFinalizer(redisAsyncContext *ac, redisCallback *cb) {
redisContext *c = &(ac->c);
if (cb->fn != NULL && cb->finalizer != NULL) {
c->flags |= REDIS_IN_FINALIZER;
cb->finalizer(ac,cb->privdata);
c->flags &= ~REDIS_IN_FINALIZER;
}
}

Expand Down Expand Up @@ -362,7 +377,7 @@ static void __redisAsyncFree(redisAsyncContext *ac) {
void redisAsyncFree(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
c->flags |= REDIS_FREEING;
if (!(c->flags & REDIS_IN_CALLBACK))
if (!(c->flags & (REDIS_IN_CALLBACK | REDIS_IN_FINALIZER)))
__redisAsyncFree(ac);
}

Expand Down Expand Up @@ -406,11 +421,11 @@ void redisAsyncDisconnect(redisAsyncContext *ac) {

/** unset the auto-free flag here, because disconnect undoes this */
c->flags &= ~REDIS_NO_AUTO_FREE;
if (!(c->flags & REDIS_IN_CALLBACK) && ac->replies.head == NULL)
if (!(c->flags & (REDIS_IN_CALLBACK | REDIS_IN_FINALIZER)) && ac->replies.head == NULL)
__redisAsyncDisconnect(ac);
}

static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, redisCallback *dstcb) {
static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, redisCallback *dstcb, int* call_finalizer) {
redisContext *c = &(ac->c);
dict *callbacks;
redisCallback *cb;
Expand All @@ -421,6 +436,7 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply,

/* Custom reply functions are not supported for pub/sub. This will fail
* very hard when they are used... */
*call_finalizer = 0;
if (reply->type == REDIS_REPLY_ARRAY || reply->type == REDIS_REPLY_PUSH) {
assert(reply->elements >= 2);
assert(reply->element[0]->type == REDIS_REPLY_STRING);
Expand Down Expand Up @@ -451,8 +467,10 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply,

/* If this is an unsubscribe message, remove it. */
if (strcasecmp(stype+pvariant,"unsubscribe") == 0) {
if (cb->pending_subs == 0)
if (cb->pending_subs == 0) {
dictDelete(callbacks,sname);
*call_finalizer = 1;
}

/* If this was the last unsubscribe message, revert to
* non-subscribe mode. */
Expand All @@ -469,6 +487,7 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply,
} else {
/* Shift callback for invalid commands. */
__redisShiftCallback(&ac->sub.invalid,dstcb);
*call_finalizer = 1;
}
return REDIS_OK;
oom:
Expand Down Expand Up @@ -502,11 +521,12 @@ static int redisIsSubscribeReply(redisReply *reply) {

void redisProcessCallbacks(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
redisCallback cb = {NULL, NULL, 0, NULL};
redisCallback cb = {NULL, NULL, NULL, 0, NULL};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we're placing the new callback at the end.

Suggested change
redisCallback cb = {NULL, NULL, NULL, 0, NULL};
redisCallback cb = {NULL, NULL, 0, NULL, NULL};

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

redisCallback cb = {0}; would be fine too.

void *reply = NULL;
int status;

while((status = redisGetReply(c,&reply)) == REDIS_OK) {
int call_finalizer = 0;
if (reply == NULL) {
/* When the connection is being disconnected and there are
* no more replies, this is the cue to really disconnect. */
Expand Down Expand Up @@ -564,11 +584,16 @@ void redisProcessCallbacks(redisAsyncContext *ac) {
/* No more regular callbacks and no errors, the context *must* be subscribed or monitoring. */
assert((c->flags & REDIS_SUBSCRIBED || c->flags & REDIS_MONITORING));
if(c->flags & REDIS_SUBSCRIBED)
__redisGetSubscribeCallback(ac,reply,&cb);
__redisGetSubscribeCallback(ac,reply,&cb,&call_finalizer);
} else {
call_finalizer = !(c->flags & REDIS_MONITORING);
}

if (cb.fn != NULL) {
__redisRunCallback(ac,&cb,reply);
if (call_finalizer) {
__redisRunFinalizer(ac,&cb);
}
if (!(c->flags & REDIS_NO_AUTO_FREE_REPLIES)){
c->reader->fn->freeObject(reply);
}
Expand Down Expand Up @@ -745,7 +770,7 @@ static const char *nextArgument(const char *start, const char **str, size_t *len
/* Helper function for the redisAsyncCommand* family of functions. Writes a
* formatted command to the output buffer and registers the provided callback
* function with the context. */
static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *cmd, size_t len) {
static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, redisFinalizerCallback *finalizer, void *privdata, const char *cmd, size_t len) {
redisContext *c = &(ac->c);
redisCallback cb;
struct dict *cbdict;
Expand All @@ -758,11 +783,13 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void
sds sname;
int ret;

/* Don't accept new commands when the connection is about to be closed. */
if (c->flags & (REDIS_DISCONNECTING | REDIS_FREEING)) return REDIS_ERR;
/* Don't accept new commands when the connection is about to be closed. Also, don't accept new
* commands when running a finalizer. */
if (c->flags & (REDIS_DISCONNECTING | REDIS_FREEING | REDIS_IN_FINALIZER)) return REDIS_ERR;

/* Setup callback */
cb.fn = fn;
cb.finalizer = finalizer;
cb.privdata = privdata;
cb.pending_subs = 1;

Expand Down Expand Up @@ -793,6 +820,8 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void
if (de != NULL) {
existcb = dictGetEntryVal(de);
cb.pending_subs = existcb->pending_subs + 1;
if (existcb->privdata != cb.privdata)
__redisRunFinalizer(ac,existcb);
}

ret = dictReplace(cbdict,sname,&cb);
Expand Down Expand Up @@ -837,6 +866,10 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void
}

int redisvAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, va_list ap) {
return redisvAsyncCommandWithFinalizer(ac, fn, NULL, privdata, format, ap);
}

int redisvAsyncCommandWithFinalizer(redisAsyncContext *ac, redisCallbackFn *fn, redisFinalizerCallback *finalizer, void *privdata, const char *format, va_list ap) {
char *cmd;
int len;
int status;
Expand All @@ -846,7 +879,7 @@ int redisvAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdat
if (len < 0)
return REDIS_ERR;

status = __redisAsyncCommand(ac,fn,privdata,cmd,len);
status = __redisAsyncCommand(ac,fn,finalizer,privdata,cmd,len);
hi_free(cmd);
return status;
}
Expand All @@ -860,20 +893,38 @@ int redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata
return status;
}

int redisAsyncCommandWithFinalizer(redisAsyncContext *ac, redisCallbackFn *fn, redisFinalizerCallback *finalizer, void *privdata, const char *format, ...) {
va_list ap;
int status;
va_start(ap,format);
status = redisvAsyncCommandWithFinalizer(ac,fn,finalizer,privdata,format,ap);
va_end(ap);
return status;
}

int redisAsyncCommandArgv(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, int argc, const char **argv, const size_t *argvlen) {
return redisAsyncCommandArgvWithFinalizer(ac, fn, NULL, privdata, argc, argv, argvlen);
}

int redisAsyncCommandArgvWithFinalizer(redisAsyncContext *ac, redisCallbackFn *fn, redisFinalizerCallback *finalizer, void *privdata, int argc, const char **argv, const size_t *argvlen) {
sds cmd;
int len;
int status;
len = redisFormatSdsCommandArgv(&cmd,argc,argv,argvlen);
if (len < 0)
return REDIS_ERR;
status = __redisAsyncCommand(ac,fn,privdata,cmd,len);
status = __redisAsyncCommand(ac,fn,finalizer,privdata,cmd,len);
sdsfree(cmd);
return status;
}

int redisAsyncFormattedCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *cmd, size_t len) {
int status = __redisAsyncCommand(ac,fn,privdata,cmd,len);
int status = __redisAsyncCommand(ac,fn,NULL,privdata,cmd,len);
return status;
}

int redisAsyncFormattedCommandWithFinalizer(redisAsyncContext *ac, redisCallbackFn *fn, redisFinalizerCallback *finalizer, void *privdata, const char *cmd, size_t len) {
int status = __redisAsyncCommand(ac,fn,finalizer,privdata,cmd,len);
return status;
}

Expand Down
7 changes: 7 additions & 0 deletions async.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,11 @@ struct dict; /* dictionary header is included in async.c */

/* Reply callback prototype and container */
typedef void (redisCallbackFn)(struct redisAsyncContext*, void*, void*);
typedef void (redisFinalizerCallback)(struct redisAsyncContext*, void*);
typedef struct redisCallback {
struct redisCallback *next; /* simple singly linked list */
redisCallbackFn *fn;
redisFinalizerCallback *finalizer;
int pending_subs;
void *privdata;
Comment on lines +49 to 51
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we have to put the new member at the end of the structure, as the current changes breaks ABI compatibility by inserting a member in the middle.

Suggested change
redisFinalizerCallback *finalizer;
int pending_subs;
void *privdata;
int pending_subs;
void *privdata;
redisFinalizerCallback *finalizer;

Screen Shot 2021-08-08 at 7 09 28 PM

} redisCallback;
Expand Down Expand Up @@ -140,6 +142,11 @@ int redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata
int redisAsyncCommandArgv(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, int argc, const char **argv, const size_t *argvlen);
int redisAsyncFormattedCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *cmd, size_t len);

int redisvAsyncCommandWithFinalizer(redisAsyncContext *ac, redisCallbackFn *fn, redisFinalizerCallback *finalizer, void *privdata, const char *format, va_list ap);
int redisAsyncCommandWithFinalizer(redisAsyncContext *ac, redisCallbackFn *fn, redisFinalizerCallback *finalizer, void *privdata, const char *format, ...);
int redisAsyncCommandArgvWithFinalizer(redisAsyncContext *ac, redisCallbackFn *fn, redisFinalizerCallback *finalizer, void *privdata, int argc, const char **argv, const size_t *argvlen);
int redisAsyncFormattedCommandWithFinalizer(redisAsyncContext *ac, redisCallbackFn *fn, redisFinalizerCallback *finalizer, void *privdata, const char *cmd, size_t len);

#ifdef __cplusplus
}
#endif
Expand Down
4 changes: 3 additions & 1 deletion examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ ENDIF()

FIND_PATH(LIBEVENT event.h)
if (LIBEVENT)
ADD_EXECUTABLE(example-libevent example-libevent)
ADD_EXECUTABLE(example-libevent example-libevent.c)
TARGET_LINK_LIBRARIES(example-libevent hiredis event)
ADD_EXECUTABLE(example-libevent-pubsub example-libevent-pubsub.c)
TARGET_LINK_LIBRARIES(example-libevent-pubsub hiredis event)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also update the Makefile with this new example, although I'm happy to do that after we merge the PR.

ENDIF()

FIND_PATH(LIBUV uv.h)
Expand Down
Loading