diff --git a/src/client/api/init.c b/src/client/api/init.c index 831b6c0c482..26fb68ea2d0 100644 --- a/src/client/api/init.c +++ b/src/client/api/init.c @@ -361,3 +361,32 @@ daos_fini(void) D_MUTEX_UNLOCK(&module_lock); return rc; } + +/** + * Re-initialize the DAOS library in the child process after fork. + */ +int +daos_reinit(void) +{ + int rc; + + rc = daos_eq_lib_reset_after_fork(); + if (rc) + return rc; + + daos_dti_reset(); + + /** + * Mark all pool and container handles owned by the parent process as if they were created + * in the child processes with g2l to avoid confusing the DAOS engines. + */ + rc = dc_pool_mark_all_slave(); + if (rc) + return rc; + + rc = dc_cont_mark_all_slave(); + if (rc) + return rc; + + return 0; +} diff --git a/src/client/dfuse/il/int_posix.c b/src/client/dfuse/il/int_posix.c index 3609adcb5fc..0ddc4ffee25 100644 --- a/src/client/dfuse/il/int_posix.c +++ b/src/client/dfuse/il/int_posix.c @@ -818,16 +818,20 @@ child_hdlr(void) { int rc; - rc = daos_eq_lib_reset_after_fork(); + rc = daos_reinit(); if (rc) - DL_WARN(rc, "daos_eq_lib_init() failed in child process"); - daos_dti_reset(); + DL_WARN(rc, "daos_reinit() failed in child process"); + + /** Reset event queue */ ioil_eqh = ioil_iog.iog_main_eqh = DAOS_HDL_INVAL; - rc = daos_eq_create(&ioil_eqh); - if (rc) - DFUSE_LOG_WARNING("daos_eq_create() failed: "DF_RC, DP_RC(rc)); - else - ioil_iog.iog_main_eqh = ioil_eqh; + + if (ioil_iog.iog_eq_count_max) { + rc = daos_eq_create(&ioil_eqh); + if (rc) + DFUSE_LOG_WARNING("daos_eq_create() failed: " DF_RC, DP_RC(rc)); + else + ioil_iog.iog_main_eqh = ioil_eqh; + } } /* Returns true on success */ @@ -876,10 +880,12 @@ check_ioctl_on_open(int fd, struct fd_entry *entry, int flags) D_GOTO(err, rc = daos_der2errno(rc)); } ioil_iog.iog_main_eqh = ioil_eqh; - - rc = pthread_atfork(NULL, NULL, &child_hdlr); - D_ASSERT(rc == 0); } + + rc = pthread_atfork(NULL, NULL, &child_hdlr); + if (rc) + DFUSE_LOG_WARNING("Failed to install atfork handler: " DF_RC, DP_RC(rc)); + rc = 0; } d_list_for_each_entry(pool, &ioil_iog.iog_pools_head, iop_pools) { diff --git a/src/client/dfuse/pil4dfs/int_dfs.c b/src/client/dfuse/pil4dfs/int_dfs.c index 0ff8d0d8f19..c23c2ef25ca 100644 --- a/src/client/dfuse/pil4dfs/int_dfs.c +++ b/src/client/dfuse/pil4dfs/int_dfs.c @@ -939,10 +939,9 @@ child_hdlr(void) if (atomic_load_relaxed(&d_daos_inited) == false) return; - rc = daos_eq_lib_reset_after_fork(); + rc = daos_reinit(); if (rc) - DL_WARN(rc, "daos_eq_lib_init() failed in child process"); - daos_dti_reset(); + DL_WARN(rc, "daos_reinit() failed in child process"); td_eqh = main_eqh = DAOS_HDL_INVAL; context_reset = true; } diff --git a/src/client/pydaos/pydaos_shim.c b/src/client/pydaos/pydaos_shim.c index 518cb10218e..ea85e900d4e 100644 --- a/src/client/pydaos/pydaos_shim.c +++ b/src/client/pydaos/pydaos_shim.c @@ -63,30 +63,48 @@ struct open_handle { } \ } while (0) -static daos_handle_t glob_eq; -static bool use_glob_eq; +/** Global event queue */ +static daos_handle_t eq; /** * Implementations of baseline shim functions */ +static void +child_handler(void) +{ + int rc; + + rc = daos_reinit(); + if (rc) + D_WARN("daos_reinit() failed in child process %d", rc); + + eq = DAOS_HDL_INVAL; + rc = daos_eq_create(&eq); + if (rc) + DL_ERROR(rc, "Failed to re-create global eq"); +} + static PyObject * __shim_handle__daos_init(PyObject *self, PyObject *args) { int rc; rc = daos_init(); - if ((rc == 0) && (use_glob_eq == 0)) { - d_getenv_bool("PYDAOS_GLOB_EQ", &use_glob_eq); - if (use_glob_eq) { - int ret; - - ret = daos_eq_create(&glob_eq); - if (ret) { - DL_ERROR(ret, "Failed to create global eq"); - use_glob_eq = false; - } - } + if (rc) + return PyLong_FromLong(rc); + + rc = daos_eq_create(&eq); + if (rc) { + DL_ERROR(rc, "Failed to create global eq"); + daos_fini(); + return PyLong_FromLong(rc); + } + + rc = pthread_atfork(NULL, NULL, &child_handler); + if (rc) { + DL_ERROR(rc, "Failed to set atfork handler"); + return PyLong_FromLong(rc); } return PyLong_FromLong(rc); @@ -97,12 +115,9 @@ __shim_handle__daos_fini(PyObject *self, PyObject *args) { int rc; - if (use_glob_eq) { - rc = daos_eq_destroy(glob_eq, DAOS_EQ_DESTROY_FORCE); - if (rc) - D_ERROR("Failed to destroy global eq, "DF_RC"\n", DP_RC(rc)); - use_glob_eq = false; - } + rc = daos_eq_destroy(eq, DAOS_EQ_DESTROY_FORCE); + if (rc) + D_ERROR("Failed to destroy global eq, " DF_RC "\n", DP_RC(rc)); rc = daos_fini(); @@ -914,8 +929,7 @@ __shim_handle__kv_get(PyObject *self, PyObject *args) PyObject *daos_dict; daos_handle_t oh; PyObject *key; - Py_ssize_t pos = 0; - daos_handle_t eq; + Py_ssize_t pos = 0; struct kv_op *kv_array = NULL; struct kv_op *op; daos_event_t *evp; @@ -928,14 +942,6 @@ __shim_handle__kv_get(PyObject *self, PyObject *args) RETURN_NULL_IF_FAILED_TO_PARSE(args, "LO!l", &oh.cookie, &PyDict_Type, &daos_dict, &v_size); - if (!use_glob_eq) { - rc = daos_eq_create(&eq); - if (rc) - return PyLong_FromLong(rc); - } else { - eq = glob_eq; - } - D_ALLOC_ARRAY(kv_array, MAX_INFLIGHT); if (kv_array == NULL) { rc = -DER_NOMEM; @@ -1079,19 +1085,10 @@ __shim_handle__kv_get(PyObject *self, PyObject *args) out: D_FREE(kv_array); - /** destroy event queue */ - if (!use_glob_eq) { - ret = daos_eq_destroy(eq, DAOS_EQ_DESTROY_FORCE); - if (rc == DER_SUCCESS && ret < 0) - rc = ret; - } - /* Populate return list */ return PyLong_FromLong(rc); err: - if (!use_glob_eq) - daos_eq_destroy(eq, DAOS_EQ_DESTROY_FORCE); D_FREE(kv_array); return NULL; @@ -1104,8 +1101,7 @@ __shim_handle__kv_put(PyObject *self, PyObject *args) daos_handle_t oh; PyObject *key; PyObject *value; - Py_ssize_t pos = 0; - daos_handle_t eq; + Py_ssize_t pos = 0; daos_event_t ev_array[MAX_INFLIGHT]; daos_event_t *evp; int i = 0; @@ -1116,14 +1112,6 @@ __shim_handle__kv_put(PyObject *self, PyObject *args) RETURN_NULL_IF_FAILED_TO_PARSE(args, "LO!", &oh.cookie, &PyDict_Type, &daos_dict); - if (!use_glob_eq) { - rc = daos_eq_create(&eq); - if (rc) - return PyLong_FromLong(rc); - } else { - eq = glob_eq; - } - while (PyDict_Next(daos_dict, &pos, &key, &value)) { char *buf; daos_size_t size; @@ -1203,17 +1191,8 @@ __shim_handle__kv_put(PyObject *self, PyObject *args) if (rc == DER_SUCCESS && ret < 0) rc = ret; - /** destroy event queue */ - if (!use_glob_eq) { - ret = daos_eq_destroy(eq, 0); - if (rc == DER_SUCCESS && ret < 0) - rc = ret; - } - return PyLong_FromLong(rc); err: - if (!use_glob_eq) - daos_eq_destroy(eq, 0); return NULL; } diff --git a/src/common/misc.c b/src/common/misc.c index d171d9ea5f4..816b441a59d 100644 --- a/src/common/misc.c +++ b/src/common/misc.c @@ -630,6 +630,12 @@ daos_hhash_link_delete(struct d_hlink *hlink) return d_hhash_link_delete(daos_ht.dht_hhash, hlink); } +int +daos_hhash_traverse(int type, daos_hhash_traverse_cb_t cb, void *arg) +{ + return d_hhash_traverse(daos_ht.dht_hhash, type, cb, arg); +} + /** * a helper to get the needed crt_init_opt. * diff --git a/src/container/cli.c b/src/container/cli.c index cd43667a2a4..1b6ff5f2236 100644 --- a/src/container/cli.c +++ b/src/container/cli.c @@ -3524,3 +3524,20 @@ dc_cont_hdl2props(daos_handle_t coh) return result; } + +static int +cont_mark_slave(struct d_hlink *link, void *arg) +{ + struct dc_cont *cont; + + cont = container_of(link, struct dc_cont, dc_hlink); + cont->dc_slave = 1; + + return 0; +} + +int +dc_cont_mark_all_slave(void) +{ + return daos_hhash_traverse(DAOS_HTYPE_CO, cont_mark_slave, NULL); +} diff --git a/src/gurt/hash.c b/src/gurt/hash.c index f6b5128c151..29a68622034 100644 --- a/src/gurt/hash.c +++ b/src/gurt/hash.c @@ -1344,6 +1344,42 @@ d_hhash_key_type(uint64_t key) return d_hhash_key_isptr(key) ? D_HTYPE_PTR : key & D_HTYPE_MASK; } +struct traverse_args { + int type; + d_hhash_traverse_cb_t cb; + void *arg; +}; + +static int +d_hhash_cb(d_list_t *link, void *args) +{ + struct traverse_args *targs = args; + struct d_hlink *hlink = link2hlink(link); + uint64_t key; + + if (hlink == NULL) + return 0; + + d_hhash_link_key(hlink, &key); + + if (targs->type != d_hhash_key_type(key)) + return 0; + + return targs->cb(hlink, targs->arg); +} + +int +d_hhash_traverse(struct d_hhash *hhash, int type, d_hhash_traverse_cb_t cb, void *arg) +{ + struct traverse_args args; + + args.type = type; + args.cb = cb; + args.arg = arg; + + return d_hash_table_traverse(&hhash->ch_htable, d_hhash_cb, &args); +} + /****************************************************************************** * UUID Hash Table Wrapper * Key: UUID diff --git a/src/include/daos.h b/src/include/daos.h index 9bce025c7ab..0ac3eb86ada 100644 --- a/src/include/daos.h +++ b/src/include/daos.h @@ -46,6 +46,17 @@ daos_init(void); int daos_fini(void); +/** + * Reinitialize DAOS library after a fork call. + * For applications that initialize DAOS and then call fork without exec, some + * internal data structures must be reinitialized in the child process. + * It is recommended to call this function from a fork handler registered via + * pthread_atfork(). If any event queues were created prior to the fork call, + * those must be re-created in the child process. + */ +int +daos_reinit(void); + #if defined(__cplusplus) } #endif diff --git a/src/include/daos/common.h b/src/include/daos/common.h index 6bad86f91b8..64ab66c04e4 100644 --- a/src/include/daos/common.h +++ b/src/include/daos/common.h @@ -959,6 +959,10 @@ bool daos_hhash_link_delete(struct d_hlink *hlink); #define daos_hhash_link_empty(hlink) d_hhash_link_empty(hlink) #define daos_hhash_link_key(hlink, key) d_hhash_link_key(hlink, key) +typedef int (*daos_hhash_traverse_cb_t)(struct d_hlink *link, void *arg); +int +daos_hhash_traverse(int type, daos_hhash_traverse_cb_t cb, void *arg); + /* daos_recx_t overlap detector */ #define DAOS_RECX_OVERLAP(recx_1, recx_2) \ (((recx_1).rx_idx < (recx_2).rx_idx + (recx_2).rx_nr) && \ diff --git a/src/include/daos/container.h b/src/include/daos/container.h index 57ef06f63a2..780b2008e8a 100644 --- a/src/include/daos/container.h +++ b/src/include/daos/container.h @@ -149,4 +149,6 @@ dc_cont_open_flags_valid(uint64_t flags) return true; } +int +dc_cont_mark_all_slave(void); #endif /* __DD_CONT_H__ */ diff --git a/src/include/daos/pool.h b/src/include/daos/pool.h index 9cc482d6c85..e51758e16b1 100644 --- a/src/include/daos/pool.h +++ b/src/include/daos/pool.h @@ -198,4 +198,7 @@ int dc_pool_create_map_refresh_task(daos_handle_t pool_hdl, uint32_t map_version tse_sched_t *sched, tse_task_t **task); void dc_pool_abandon_map_refresh_task(tse_task_t *task); +int +dc_pool_mark_all_slave(void); + #endif /* __DD_POOL_H__ */ diff --git a/src/include/gurt/hash.h b/src/include/gurt/hash.h index fafdd5757e9..c578d3a0a34 100644 --- a/src/include/gurt/hash.h +++ b/src/include/gurt/hash.h @@ -580,6 +580,9 @@ int d_hhash_key_type(uint64_t key); bool d_hhash_key_isptr(uint64_t key); int d_hhash_set_ptrtype(struct d_hhash *hhash); bool d_hhash_is_ptrtype(struct d_hhash *hhash); +typedef int (*d_hhash_traverse_cb_t)(struct d_hlink *link, void *arg); +int +d_hhash_traverse(struct d_hhash *hhash, int type, d_hhash_traverse_cb_t cb, void *arg); /****************************************************************************** * UUID Hash Table Wrapper diff --git a/src/pool/cli.c b/src/pool/cli.c index 85fa718aa1c..6825f57568f 100644 --- a/src/pool/cli.c +++ b/src/pool/cli.c @@ -3662,3 +3662,20 @@ dc_pool_tgt_idx2ptr(struct dc_pool *pool, uint32_t tgt_idx, } return 0; } + +static int +pool_mark_slave(struct d_hlink *link, void *arg) +{ + struct dc_pool *pool; + + pool = container_of(link, struct dc_pool, dp_hlink); + pool->dp_slave = 1; + + return 0; +} + +int +dc_pool_mark_all_slave(void) +{ + return daos_hhash_traverse(DAOS_HTYPE_POOL, pool_mark_slave, NULL); +}