From fba34d905d2efe2df902ffd5e08788da17ca8b3d Mon Sep 17 00:00:00 2001 From: Johann Lombardi Date: Wed, 25 Sep 2024 14:38:02 +0200 Subject: [PATCH] DAOS-16563 client: mark pool/cont handle as g2l after fork (#15125) This patch marks all pool and container handles as if they were created with g2l in the child processes after fork. It prevents misinteractions if one of the child processes closes the handle. The marking is done by iterating through all the pool and container handles which was not supported by the hhash code. This patch also: - adds support for fork to pydaos. - introduces daos_reinit() to be called after fork. - fixes IL to set the atfork callback when no extra eq are used. - remove support for creating an event queue for each pydaos put/get operation. This makes the global event queue the only option. This should probably be moved to a per-thread eq in the future. Signed-off-by: Johann Lombardi --- src/client/api/init.c | 29 ++++++++++ src/client/dfuse/il/int_posix.c | 28 +++++---- src/client/dfuse/pil4dfs/int_dfs.c | 5 +- src/client/pydaos/pydaos_shim.c | 93 ++++++++++++------------------ src/common/misc.c | 6 ++ src/container/cli.c | 17 ++++++ src/gurt/hash.c | 36 ++++++++++++ src/include/daos.h | 11 ++++ src/include/daos/common.h | 4 ++ src/include/daos/container.h | 2 + src/include/daos/pool.h | 3 + src/include/gurt/hash.h | 3 + src/pool/cli.c | 17 ++++++ 13 files changed, 183 insertions(+), 71 deletions(-) diff --git a/src/client/api/init.c b/src/client/api/init.c index 831b6c0c482..26fb68ea2d0 100644 --- a/src/client/api/init.c +++ b/src/client/api/init.c @@ -361,3 +361,32 @@ daos_fini(void) D_MUTEX_UNLOCK(&module_lock); return rc; } + +/** + * Re-initialize the DAOS library in the child process after fork. + */ +int +daos_reinit(void) +{ + int rc; + + rc = daos_eq_lib_reset_after_fork(); + if (rc) + return rc; + + daos_dti_reset(); + + /** + * Mark all pool and container handles owned by the parent process as if they were created + * in the child processes with g2l to avoid confusing the DAOS engines. + */ + rc = dc_pool_mark_all_slave(); + if (rc) + return rc; + + rc = dc_cont_mark_all_slave(); + if (rc) + return rc; + + return 0; +} diff --git a/src/client/dfuse/il/int_posix.c b/src/client/dfuse/il/int_posix.c index 3609adcb5fc..0ddc4ffee25 100644 --- a/src/client/dfuse/il/int_posix.c +++ b/src/client/dfuse/il/int_posix.c @@ -818,16 +818,20 @@ child_hdlr(void) { int rc; - rc = daos_eq_lib_reset_after_fork(); + rc = daos_reinit(); if (rc) - DL_WARN(rc, "daos_eq_lib_init() failed in child process"); - daos_dti_reset(); + DL_WARN(rc, "daos_reinit() failed in child process"); + + /** Reset event queue */ ioil_eqh = ioil_iog.iog_main_eqh = DAOS_HDL_INVAL; - rc = daos_eq_create(&ioil_eqh); - if (rc) - DFUSE_LOG_WARNING("daos_eq_create() failed: "DF_RC, DP_RC(rc)); - else - ioil_iog.iog_main_eqh = ioil_eqh; + + if (ioil_iog.iog_eq_count_max) { + rc = daos_eq_create(&ioil_eqh); + if (rc) + DFUSE_LOG_WARNING("daos_eq_create() failed: " DF_RC, DP_RC(rc)); + else + ioil_iog.iog_main_eqh = ioil_eqh; + } } /* Returns true on success */ @@ -876,10 +880,12 @@ check_ioctl_on_open(int fd, struct fd_entry *entry, int flags) D_GOTO(err, rc = daos_der2errno(rc)); } ioil_iog.iog_main_eqh = ioil_eqh; - - rc = pthread_atfork(NULL, NULL, &child_hdlr); - D_ASSERT(rc == 0); } + + rc = pthread_atfork(NULL, NULL, &child_hdlr); + if (rc) + DFUSE_LOG_WARNING("Failed to install atfork handler: " DF_RC, DP_RC(rc)); + rc = 0; } d_list_for_each_entry(pool, &ioil_iog.iog_pools_head, iop_pools) { diff --git a/src/client/dfuse/pil4dfs/int_dfs.c b/src/client/dfuse/pil4dfs/int_dfs.c index 0ff8d0d8f19..c23c2ef25ca 100644 --- a/src/client/dfuse/pil4dfs/int_dfs.c +++ b/src/client/dfuse/pil4dfs/int_dfs.c @@ -939,10 +939,9 @@ child_hdlr(void) if (atomic_load_relaxed(&d_daos_inited) == false) return; - rc = daos_eq_lib_reset_after_fork(); + rc = daos_reinit(); if (rc) - DL_WARN(rc, "daos_eq_lib_init() failed in child process"); - daos_dti_reset(); + DL_WARN(rc, "daos_reinit() failed in child process"); td_eqh = main_eqh = DAOS_HDL_INVAL; context_reset = true; } diff --git a/src/client/pydaos/pydaos_shim.c b/src/client/pydaos/pydaos_shim.c index 518cb10218e..ea85e900d4e 100644 --- a/src/client/pydaos/pydaos_shim.c +++ b/src/client/pydaos/pydaos_shim.c @@ -63,30 +63,48 @@ struct open_handle { } \ } while (0) -static daos_handle_t glob_eq; -static bool use_glob_eq; +/** Global event queue */ +static daos_handle_t eq; /** * Implementations of baseline shim functions */ +static void +child_handler(void) +{ + int rc; + + rc = daos_reinit(); + if (rc) + D_WARN("daos_reinit() failed in child process %d", rc); + + eq = DAOS_HDL_INVAL; + rc = daos_eq_create(&eq); + if (rc) + DL_ERROR(rc, "Failed to re-create global eq"); +} + static PyObject * __shim_handle__daos_init(PyObject *self, PyObject *args) { int rc; rc = daos_init(); - if ((rc == 0) && (use_glob_eq == 0)) { - d_getenv_bool("PYDAOS_GLOB_EQ", &use_glob_eq); - if (use_glob_eq) { - int ret; - - ret = daos_eq_create(&glob_eq); - if (ret) { - DL_ERROR(ret, "Failed to create global eq"); - use_glob_eq = false; - } - } + if (rc) + return PyLong_FromLong(rc); + + rc = daos_eq_create(&eq); + if (rc) { + DL_ERROR(rc, "Failed to create global eq"); + daos_fini(); + return PyLong_FromLong(rc); + } + + rc = pthread_atfork(NULL, NULL, &child_handler); + if (rc) { + DL_ERROR(rc, "Failed to set atfork handler"); + return PyLong_FromLong(rc); } return PyLong_FromLong(rc); @@ -97,12 +115,9 @@ __shim_handle__daos_fini(PyObject *self, PyObject *args) { int rc; - if (use_glob_eq) { - rc = daos_eq_destroy(glob_eq, DAOS_EQ_DESTROY_FORCE); - if (rc) - D_ERROR("Failed to destroy global eq, "DF_RC"\n", DP_RC(rc)); - use_glob_eq = false; - } + rc = daos_eq_destroy(eq, DAOS_EQ_DESTROY_FORCE); + if (rc) + D_ERROR("Failed to destroy global eq, " DF_RC "\n", DP_RC(rc)); rc = daos_fini(); @@ -914,8 +929,7 @@ __shim_handle__kv_get(PyObject *self, PyObject *args) PyObject *daos_dict; daos_handle_t oh; PyObject *key; - Py_ssize_t pos = 0; - daos_handle_t eq; + Py_ssize_t pos = 0; struct kv_op *kv_array = NULL; struct kv_op *op; daos_event_t *evp; @@ -928,14 +942,6 @@ __shim_handle__kv_get(PyObject *self, PyObject *args) RETURN_NULL_IF_FAILED_TO_PARSE(args, "LO!l", &oh.cookie, &PyDict_Type, &daos_dict, &v_size); - if (!use_glob_eq) { - rc = daos_eq_create(&eq); - if (rc) - return PyLong_FromLong(rc); - } else { - eq = glob_eq; - } - D_ALLOC_ARRAY(kv_array, MAX_INFLIGHT); if (kv_array == NULL) { rc = -DER_NOMEM; @@ -1079,19 +1085,10 @@ __shim_handle__kv_get(PyObject *self, PyObject *args) out: D_FREE(kv_array); - /** destroy event queue */ - if (!use_glob_eq) { - ret = daos_eq_destroy(eq, DAOS_EQ_DESTROY_FORCE); - if (rc == DER_SUCCESS && ret < 0) - rc = ret; - } - /* Populate return list */ return PyLong_FromLong(rc); err: - if (!use_glob_eq) - daos_eq_destroy(eq, DAOS_EQ_DESTROY_FORCE); D_FREE(kv_array); return NULL; @@ -1104,8 +1101,7 @@ __shim_handle__kv_put(PyObject *self, PyObject *args) daos_handle_t oh; PyObject *key; PyObject *value; - Py_ssize_t pos = 0; - daos_handle_t eq; + Py_ssize_t pos = 0; daos_event_t ev_array[MAX_INFLIGHT]; daos_event_t *evp; int i = 0; @@ -1116,14 +1112,6 @@ __shim_handle__kv_put(PyObject *self, PyObject *args) RETURN_NULL_IF_FAILED_TO_PARSE(args, "LO!", &oh.cookie, &PyDict_Type, &daos_dict); - if (!use_glob_eq) { - rc = daos_eq_create(&eq); - if (rc) - return PyLong_FromLong(rc); - } else { - eq = glob_eq; - } - while (PyDict_Next(daos_dict, &pos, &key, &value)) { char *buf; daos_size_t size; @@ -1203,17 +1191,8 @@ __shim_handle__kv_put(PyObject *self, PyObject *args) if (rc == DER_SUCCESS && ret < 0) rc = ret; - /** destroy event queue */ - if (!use_glob_eq) { - ret = daos_eq_destroy(eq, 0); - if (rc == DER_SUCCESS && ret < 0) - rc = ret; - } - return PyLong_FromLong(rc); err: - if (!use_glob_eq) - daos_eq_destroy(eq, 0); return NULL; } diff --git a/src/common/misc.c b/src/common/misc.c index d171d9ea5f4..816b441a59d 100644 --- a/src/common/misc.c +++ b/src/common/misc.c @@ -630,6 +630,12 @@ daos_hhash_link_delete(struct d_hlink *hlink) return d_hhash_link_delete(daos_ht.dht_hhash, hlink); } +int +daos_hhash_traverse(int type, daos_hhash_traverse_cb_t cb, void *arg) +{ + return d_hhash_traverse(daos_ht.dht_hhash, type, cb, arg); +} + /** * a helper to get the needed crt_init_opt. * diff --git a/src/container/cli.c b/src/container/cli.c index cd43667a2a4..1b6ff5f2236 100644 --- a/src/container/cli.c +++ b/src/container/cli.c @@ -3524,3 +3524,20 @@ dc_cont_hdl2props(daos_handle_t coh) return result; } + +static int +cont_mark_slave(struct d_hlink *link, void *arg) +{ + struct dc_cont *cont; + + cont = container_of(link, struct dc_cont, dc_hlink); + cont->dc_slave = 1; + + return 0; +} + +int +dc_cont_mark_all_slave(void) +{ + return daos_hhash_traverse(DAOS_HTYPE_CO, cont_mark_slave, NULL); +} diff --git a/src/gurt/hash.c b/src/gurt/hash.c index f6b5128c151..29a68622034 100644 --- a/src/gurt/hash.c +++ b/src/gurt/hash.c @@ -1344,6 +1344,42 @@ d_hhash_key_type(uint64_t key) return d_hhash_key_isptr(key) ? D_HTYPE_PTR : key & D_HTYPE_MASK; } +struct traverse_args { + int type; + d_hhash_traverse_cb_t cb; + void *arg; +}; + +static int +d_hhash_cb(d_list_t *link, void *args) +{ + struct traverse_args *targs = args; + struct d_hlink *hlink = link2hlink(link); + uint64_t key; + + if (hlink == NULL) + return 0; + + d_hhash_link_key(hlink, &key); + + if (targs->type != d_hhash_key_type(key)) + return 0; + + return targs->cb(hlink, targs->arg); +} + +int +d_hhash_traverse(struct d_hhash *hhash, int type, d_hhash_traverse_cb_t cb, void *arg) +{ + struct traverse_args args; + + args.type = type; + args.cb = cb; + args.arg = arg; + + return d_hash_table_traverse(&hhash->ch_htable, d_hhash_cb, &args); +} + /****************************************************************************** * UUID Hash Table Wrapper * Key: UUID diff --git a/src/include/daos.h b/src/include/daos.h index 9bce025c7ab..0ac3eb86ada 100644 --- a/src/include/daos.h +++ b/src/include/daos.h @@ -46,6 +46,17 @@ daos_init(void); int daos_fini(void); +/** + * Reinitialize DAOS library after a fork call. + * For applications that initialize DAOS and then call fork without exec, some + * internal data structures must be reinitialized in the child process. + * It is recommended to call this function from a fork handler registered via + * pthread_atfork(). If any event queues were created prior to the fork call, + * those must be re-created in the child process. + */ +int +daos_reinit(void); + #if defined(__cplusplus) } #endif diff --git a/src/include/daos/common.h b/src/include/daos/common.h index 6bad86f91b8..64ab66c04e4 100644 --- a/src/include/daos/common.h +++ b/src/include/daos/common.h @@ -959,6 +959,10 @@ bool daos_hhash_link_delete(struct d_hlink *hlink); #define daos_hhash_link_empty(hlink) d_hhash_link_empty(hlink) #define daos_hhash_link_key(hlink, key) d_hhash_link_key(hlink, key) +typedef int (*daos_hhash_traverse_cb_t)(struct d_hlink *link, void *arg); +int +daos_hhash_traverse(int type, daos_hhash_traverse_cb_t cb, void *arg); + /* daos_recx_t overlap detector */ #define DAOS_RECX_OVERLAP(recx_1, recx_2) \ (((recx_1).rx_idx < (recx_2).rx_idx + (recx_2).rx_nr) && \ diff --git a/src/include/daos/container.h b/src/include/daos/container.h index 57ef06f63a2..780b2008e8a 100644 --- a/src/include/daos/container.h +++ b/src/include/daos/container.h @@ -149,4 +149,6 @@ dc_cont_open_flags_valid(uint64_t flags) return true; } +int +dc_cont_mark_all_slave(void); #endif /* __DD_CONT_H__ */ diff --git a/src/include/daos/pool.h b/src/include/daos/pool.h index 9cc482d6c85..e51758e16b1 100644 --- a/src/include/daos/pool.h +++ b/src/include/daos/pool.h @@ -198,4 +198,7 @@ int dc_pool_create_map_refresh_task(daos_handle_t pool_hdl, uint32_t map_version tse_sched_t *sched, tse_task_t **task); void dc_pool_abandon_map_refresh_task(tse_task_t *task); +int +dc_pool_mark_all_slave(void); + #endif /* __DD_POOL_H__ */ diff --git a/src/include/gurt/hash.h b/src/include/gurt/hash.h index fafdd5757e9..c578d3a0a34 100644 --- a/src/include/gurt/hash.h +++ b/src/include/gurt/hash.h @@ -580,6 +580,9 @@ int d_hhash_key_type(uint64_t key); bool d_hhash_key_isptr(uint64_t key); int d_hhash_set_ptrtype(struct d_hhash *hhash); bool d_hhash_is_ptrtype(struct d_hhash *hhash); +typedef int (*d_hhash_traverse_cb_t)(struct d_hlink *link, void *arg); +int +d_hhash_traverse(struct d_hhash *hhash, int type, d_hhash_traverse_cb_t cb, void *arg); /****************************************************************************** * UUID Hash Table Wrapper diff --git a/src/pool/cli.c b/src/pool/cli.c index 85fa718aa1c..6825f57568f 100644 --- a/src/pool/cli.c +++ b/src/pool/cli.c @@ -3662,3 +3662,20 @@ dc_pool_tgt_idx2ptr(struct dc_pool *pool, uint32_t tgt_idx, } return 0; } + +static int +pool_mark_slave(struct d_hlink *link, void *arg) +{ + struct dc_pool *pool; + + pool = container_of(link, struct dc_pool, dp_hlink); + pool->dp_slave = 1; + + return 0; +} + +int +dc_pool_mark_all_slave(void) +{ + return daos_hhash_traverse(DAOS_HTYPE_POOL, pool_mark_slave, NULL); +}