From 9ec16922d80606a9d93030360125b49ce3e6a9d4 Mon Sep 17 00:00:00 2001 From: Fan Yong Date: Thu, 17 Oct 2024 11:54:06 +0800 Subject: [PATCH] DAOS-16469 dtx: properly handle DTX partial commit When a DTX leader globally commit the DTX, it is possible that some DTX participant(s) cannot commit such DTX entry because of kinds of issues, such as network or space trouble. Under such case, the DTX leader needs to keep the active DTX entry persistently for further commit/resync. But it does not means related modification attched to such DTX entry on the leader target cannot be committed, instead, we can commit related modification with only keeping the DTX header. That is enough for the DTX leader to do further DTX commit/resync to handle related former failed DTX participant(s). The benefit is that VOS aggregation on the leader target will not be affected by remote DTX commit failure. The patch also reduces DTX related warning to avoid log flood. Allow-unstable-test: true Signed-off-by: Fan Yong --- src/dtx/dtx_coll.c | 2 +- src/dtx/dtx_common.c | 2 +- src/dtx/dtx_cos.c | 45 +++++-- src/dtx/dtx_internal.h | 2 +- src/dtx/dtx_rpc.c | 105 ++++++++++------ src/dtx/dtx_srv.c | 2 +- src/include/daos_srv/container.h | 3 + src/include/daos_srv/vos.h | 3 +- src/object/srv_ec_aggregate.c | 33 ++++- src/object/srv_obj.c | 22 ++-- src/utils/ddb/ddb_vos.c | 2 +- src/utils/ddb/tests/ddb_test_driver.c | 2 +- src/vos/tests/vts_dtx.c | 18 +-- src/vos/tests/vts_io.c | 2 +- src/vos/tests/vts_mvcc.c | 8 +- src/vos/tests/vts_pm.c | 39 +++--- src/vos/vos_common.c | 2 +- src/vos/vos_dtx.c | 169 ++++++++++++++++---------- src/vos/vos_internal.h | 6 +- src/vos/vos_io.c | 4 +- src/vos/vos_obj.c | 4 +- 21 files changed, 303 insertions(+), 172 deletions(-) diff --git a/src/dtx/dtx_coll.c b/src/dtx/dtx_coll.c index 7e7c81991551..b54abc11f6af 100644 --- a/src/dtx/dtx_coll.c +++ b/src/dtx/dtx_coll.c @@ -304,7 +304,7 @@ dtx_coll_local_one(void *args) switch (opc) { case DTX_COLL_COMMIT: - rc = vos_dtx_commit(cont->sc_hdl, &dcla->dcla_xid, 1, NULL); + rc = vos_dtx_commit(cont->sc_hdl, &dcla->dcla_xid, 1, false, NULL); break; case DTX_COLL_ABORT: rc = vos_dtx_abort(cont->sc_hdl, &dcla->dcla_xid, dcla->dcla_epoch); diff --git a/src/dtx/dtx_common.c b/src/dtx/dtx_common.c index ecb156729ed6..fdc35acf9be4 100644 --- a/src/dtx/dtx_common.c +++ b/src/dtx/dtx_common.c @@ -1594,7 +1594,7 @@ dtx_end(struct dtx_handle *dth, struct ds_cont_child *cont, int result) * and can be committed next time. */ rc = vos_dtx_commit(cont->sc_hdl, dth->dth_dti_cos, - dth->dth_dti_cos_count, NULL); + dth->dth_dti_cos_count, false, NULL); if (rc < 0) D_ERROR(DF_UUID": Fail to DTX CoS commit: %d\n", DP_UUID(cont->sc_uuid), rc); diff --git a/src/dtx/dtx_cos.c b/src/dtx/dtx_cos.c index 4c165f94d0c7..99caa4e4d630 100644 --- a/src/dtx/dtx_cos.c +++ b/src/dtx/dtx_cos.c @@ -360,6 +360,16 @@ dtx_cos_del_one(struct ds_cont_child *cont, struct dtx_cos_rec_child *dcrc) return rc; } +static void +dtx_cos_demote_one(struct ds_cont_child *cont, struct dtx_cos_rec_child *dcrc) +{ + d_list_del(&dcrc->dcrc_gl_committable); + if (dcrc->dcrc_coll) + d_list_add_tail(&dcrc->dcrc_gl_committable, &cont->sc_dtx_coll_list); + else + d_list_add_tail(&dcrc->dcrc_gl_committable, &cont->sc_dtx_cos_list); +} + int dtx_fetch_committable(struct ds_cont_child *cont, uint32_t max_cnt, daos_unit_oid_t *oid, daos_epoch_t epoch, bool force, @@ -622,7 +632,7 @@ dtx_cos_add(struct ds_cont_child *cont, void *entry, daos_unit_oid_t *oid, int dtx_cos_del(struct ds_cont_child *cont, struct dtx_id *xid, - daos_unit_oid_t *oid, uint64_t dkey_hash) + daos_unit_oid_t *oid, uint64_t dkey_hash, bool demote) { struct dtx_cos_key key; d_iov_t kiov; @@ -645,36 +655,41 @@ dtx_cos_del(struct ds_cont_child *cont, struct dtx_id *xid, d_list_for_each_entry(dcrc, &dcr->dcr_prio_list, dcrc_lo_link) { if (memcmp(&dcrc->dcrc_dte->dte_xid, xid, sizeof(*xid)) == 0) { - rc = dtx_cos_del_one(cont, dcrc); + if (demote) + dtx_cos_demote_one(cont, dcrc); + else + rc = dtx_cos_del_one(cont, dcrc); D_GOTO(out, found = 1); } } d_list_for_each_entry(dcrc, &dcr->dcr_reg_list, dcrc_lo_link) { if (memcmp(&dcrc->dcrc_dte->dte_xid, xid, sizeof(*xid)) == 0) { - rc = dtx_cos_del_one(cont, dcrc); + if (demote) + dtx_cos_demote_one(cont, dcrc); + else + rc = dtx_cos_del_one(cont, dcrc); D_GOTO(out, found = 2); } } d_list_for_each_entry(dcrc, &dcr->dcr_expcmt_list, dcrc_lo_link) { if (memcmp(&dcrc->dcrc_dte->dte_xid, xid, sizeof(*xid)) == 0) { - rc = dtx_cos_del_one(cont, dcrc); + if (demote) + dtx_cos_demote_one(cont, dcrc); + else + rc = dtx_cos_del_one(cont, dcrc); D_GOTO(out, found = 3); } } out: - if (found > 0) + if (found > 0 && !demote) d_tm_dec_gauge(dtx_tls_get()->dt_committable, 1); if (rc == 0 && found == 0) rc = -DER_NONEXIST; - DL_CDEBUG(rc != 0 && rc != -DER_NONEXIST, DLOG_ERR, DB_TRACE, rc, - "Remove DTX from CoS cache "DF_UOID", key %lu", - DP_UOID(*oid), (unsigned long)dkey_hash); - return rc == -DER_NONEXIST ? 0 : rc; } @@ -778,10 +793,14 @@ dtx_cos_batched_del(struct ds_cont_child *cont, struct dtx_id xid[], bool rm[], if (memcmp(&dcrc->dcrc_dte->dte_xid, &xid[i], sizeof(struct dtx_id)) == 0) { found = true; - if (rm[i]) { - rc = dtx_cos_del_one(cont, dcrc); - if (rc == 0) - del++; + if (rm != NULL) { + if (rm[i]) { + rc = dtx_cos_del_one(cont, dcrc); + if (rc == 0) + del++; + } + } else { + dtx_cos_demote_one(cont, dcrc); } } } diff --git a/src/dtx/dtx_internal.h b/src/dtx/dtx_internal.h index 06d0333dd77e..3fbd06f6b925 100644 --- a/src/dtx/dtx_internal.h +++ b/src/dtx/dtx_internal.h @@ -259,7 +259,7 @@ int dtx_fetch_committable(struct ds_cont_child *cont, uint32_t max_cnt, int dtx_cos_add(struct ds_cont_child *cont, void *entry, daos_unit_oid_t *oid, uint64_t dkey_hash, daos_epoch_t epoch, uint32_t flags); int dtx_cos_del(struct ds_cont_child *cont, struct dtx_id *xid, - daos_unit_oid_t *oid, uint64_t dkey_hash); + daos_unit_oid_t *oid, uint64_t dkey_hash, bool demote); uint64_t dtx_cos_oldest(struct ds_cont_child *cont); void dtx_cos_prio(struct ds_cont_child *cont, struct dtx_id *xid, daos_unit_oid_t *oid, uint64_t dkey_hash); diff --git a/src/dtx/dtx_rpc.c b/src/dtx/dtx_rpc.c index 2ccbfec2734d..acce995faaf4 100644 --- a/src/dtx/dtx_rpc.c +++ b/src/dtx/dtx_rpc.c @@ -59,6 +59,7 @@ struct dtx_req_args { int dra_length; /* The collective RPC result. */ int dra_result; + uint32_t dra_local_fail:1; /* Pointer to the committed DTX list, used for DTX_REFRESH case. */ d_list_t *dra_cmt_list; /* Pointer to the aborted DTX list, used for DTX_REFRESH case. */ @@ -81,6 +82,7 @@ struct dtx_req_rec { int drr_count; /* DTX count */ int drr_result; /* The RPC result */ uint32_t drr_comp:1, + drr_local_fail:1, drr_single_dti:1; uint32_t drr_inline_flags; struct dtx_id *drr_dti; /* The DTX array */ @@ -290,10 +292,13 @@ dtx_req_send(struct dtx_req_rec *drr, daos_epoch_t epoch) "DTX req for opc %x to %d/%d (req %p future %p) sent epoch "DF_X64, dra->dra_opc, drr->drr_rank, drr->drr_tag, req, dra->dra_future, epoch); - if (rc != 0 && drr->drr_comp == 0) { - drr->drr_comp = 1; - drr->drr_result = rc; - ABT_future_set(dra->dra_future, drr); + if (rc != 0) { + drr->drr_local_fail = 1; + if (drr->drr_comp == 0) { + drr->drr_comp = 1; + drr->drr_result = rc; + ABT_future_set(dra->dra_future, drr); + } } return rc; @@ -309,6 +314,8 @@ dtx_req_list_cb(void **args) if (dra->dra_opc == DTX_CHECK) { for (i = 0; i < dra->dra_length; i++) { drr = args[i]; + if (drr->drr_local_fail) + dra->dra_local_fail = 1; dtx_merge_check_result(&dra->dra_result, drr->drr_result); D_DEBUG(DB_TRACE, "The DTX "DF_DTI" RPC req result %d, status is %d.\n", DP_DTI(&drr->drr_dti[0]), drr->drr_result, dra->dra_result); @@ -316,6 +323,8 @@ dtx_req_list_cb(void **args) } else { for (i = 0; i < dra->dra_length; i++) { drr = args[i]; + if (drr->drr_local_fail) + dra->dra_local_fail = 1; if (dra->dra_result == 0 || dra->dra_result == -DER_NONEXIST) dra->dra_result = drr->drr_result; } @@ -382,7 +391,12 @@ dtx_req_list_send(struct dtx_common_args *dca, bool is_reentrance) if (rc != ABT_SUCCESS) { D_ERROR("ABT_future_create failed for opc %x, len %d: rc %d.\n", dra->dra_opc, dca->dca_steps, rc); - return dss_abterr2der(rc); + dca->dca_dra.dra_local_fail = 1; + if (dra->dra_opc == DTX_CHECK) + dtx_merge_check_result(&dra->dra_result, dss_abterr2der(rc)); + else if (dra->dra_result == 0 || dra->dra_result == -DER_NONEXIST) + dra->dra_result = dss_abterr2der(rc); + return DSS_CHORE_DONE; } D_DEBUG(DB_TRACE, "%p: DTX req for opc %x, future %p (%d) start.\n", @@ -750,7 +764,12 @@ dtx_rpc(struct ds_cont_child *cont,d_list_t *dti_list, struct dtx_entry **dtes, switch (opc) { case DTX_COMMIT: case DTX_ABORT: - if (rc != -DER_EXCLUDED && rc != -DER_OOG) + /* + * Continue to send out more RPCs as long as there is no local failure, + * then other healthy participants can commit/abort related DTX entries + * without being affected by the bad one(s). + */ + if (dca->dca_dra.dra_local_fail) goto out; break; case DTX_CHECK: @@ -826,17 +845,8 @@ dtx_commit(struct ds_cont_child *cont, struct dtx_entry **dtes, if (rc > 0 || rc == -DER_NONEXIST || rc == -DER_EXCLUDED || rc == -DER_OOG) rc = 0; - if (rc != 0) { - /* - * Some DTX entries may have been committed on some participants. Then mark all - * the DTX entries (in the dtis) as "PARTIAL_COMMITTED" and re-commit them later. - * It is harmless to re-commit the DTX that has ever been committed. - */ - if (dra->dra_committed > 0) - rc1 = vos_dtx_set_flags(cont->sc_hdl, dca.dca_dtis, count, - DTE_PARTIAL_COMMITTED); - } else { - if (has_cos) { + if (rc == 0 || dra->dra_committed > 0) { + if (rc == 0 && has_cos) { if (count > 1) { D_ALLOC_ARRAY(rm_cos, count); if (rm_cos == NULL) @@ -846,7 +856,12 @@ dtx_commit(struct ds_cont_child *cont, struct dtx_entry **dtes, } } - rc1 = vos_dtx_commit(cont->sc_hdl, dca.dca_dtis, count, rm_cos); + /* + * Some DTX entries may have been committed on some participants. Then mark all + * the DTX entries (in the dtis) as "PARTIAL_COMMITTED" and re-commit them later. + * It is harmless to re-commit the DTX that has ever been committed. + */ + rc1 = vos_dtx_commit(cont->sc_hdl, dca.dca_dtis, count, rc != 0, rm_cos); if (rc1 > 0) { dra->dra_committed += rc1; rc1 = 0; @@ -855,13 +870,28 @@ dtx_commit(struct ds_cont_child *cont, struct dtx_entry **dtes, rc1 = 0; } - if (rc1 == 0 && rm_cos != NULL) { + /* + * For partial commit case, move related DTX entries to the tail of the + * committable list, then the next batched commit can commit others and + * retry those partial committed sometime later instead of blocking the + * others committable with continuously retry the failed ones. + * + * The side-effect of such behavior is that the DTX which is committable + * earlier maybe delay committed than the later ones. + */ + if (rc1 == 0 && has_cos) { if (dcks != NULL) { - for (i = 0; i < count; i++) { - if (rm_cos[i]) { - D_ASSERT(!daos_oid_is_null(dcks[i].oid.id_pub)); + if (rm_cos != NULL) { + for (i = 0; i < count; i++) { + if (!rm_cos[i]) + continue; dtx_cos_del(cont, &dca.dca_dtis[i], &dcks[i].oid, - dcks[i].dkey_hash); + dcks[i].dkey_hash, false); + } + } else { + for (i = 0; i < count; i++) { + dtx_cos_del(cont, &dca.dca_dtis[i], &dcks[i].oid, + dcks[i].dkey_hash, true); } } } else { @@ -1141,7 +1171,7 @@ dtx_refresh_internal(struct ds_cont_child *cont, int *check_count, d_list_t *che * It has been committed/committable on leader, we may miss * related DTX commit request, so let's commit it locally. */ - rc1 = vos_dtx_commit(cont->sc_hdl, &dsp->dsp_xid, 1, NULL); + rc1 = vos_dtx_commit(cont->sc_hdl, &dsp->dsp_xid, 1, false, NULL); if (rc1 == 0 || rc1 == -DER_NONEXIST || !for_io /* cleanup case */) { d_list_del(&dsp->dsp_link); dtx_dsp_free(dsp); @@ -1636,24 +1666,29 @@ dtx_coll_commit(struct ds_cont_child *cont, struct dtx_coll_entry *dce, struct d committed += dcra.dcra_committed; } - if (rc == 0 && rc1 == 0) - rc2 = vos_dtx_commit(cont->sc_hdl, &dce->dce_xid, 1, NULL); - else if (committed > 0) + if ((rc == 0 && rc1 == 0) || committed > 0) { /* Mark the DTX as "PARTIAL_COMMITTED" and re-commit it later via cleanup logic. */ - rc2 = vos_dtx_set_flags(cont->sc_hdl, &dce->dce_xid, 1, DTE_PARTIAL_COMMITTED); - if (rc2 > 0 || rc2 == -DER_NONEXIST) - rc2 = 0; + rc2 = vos_dtx_commit(cont->sc_hdl, &dce->dce_xid, 1, rc != 0 || rc1 != 0, NULL); + if (rc2 > 0 || rc2 == -DER_NONEXIST) + rc2 = 0; + } /* - * NOTE: Currently, we commit collective DTX one by one with high priority. So here we have - * to remove the collective DTX entry from the CoS even if the commit failed remotely. - * Otherwise, the batched commit ULT may be blocked by such "bad" entry. + * For partial commit case, move related DTX entries to the tail of the + * committable list, then the next batched commit can commit others and + * retry those partial committed sometime later instead of blocking the + * others committable with continuously retry the failed ones. + * + * The side-effect of such behavior is that the DTX which is committable + * earlier maybe delay committed than the later ones. */ if (rc2 == 0 && has_cos) { if (dck != NULL) - dtx_cos_del(cont, &dce->dce_xid, &dck->oid, dck->dkey_hash); + dtx_cos_del(cont, &dce->dce_xid, &dck->oid, dck->dkey_hash, + rc != 0 || rc1 != 0); else - dtx_cos_batched_del(cont, &dce->dce_xid, &cos, 1); + dtx_cos_batched_del(cont, &dce->dce_xid, + rc != 0 || rc1 != 0 ? NULL : &cos, 1); } D_CDEBUG(rc != 0 || rc1 != 0 || rc2 != 0, DLOG_ERR, DB_TRACE, diff --git a/src/dtx/dtx_srv.c b/src/dtx/dtx_srv.c index 41480b6e8b06..f736c59dece8 100644 --- a/src/dtx/dtx_srv.c +++ b/src/dtx/dtx_srv.c @@ -196,7 +196,7 @@ dtx_handler(crt_rpc_t *rpc) count = din->di_dtx_array.ca_count - i; dtis = (struct dtx_id *)din->di_dtx_array.ca_arrays + i; - rc1 = vos_dtx_commit(cont->sc_hdl, dtis, count, NULL); + rc1 = vos_dtx_commit(cont->sc_hdl, dtis, count, false, NULL); if (rc1 > 0) committed += rc1; else if (rc == 0 && rc1 < 0) diff --git a/src/include/daos_srv/container.h b/src/include/daos_srv/container.h index f99f4df14e39..9fc615c2a8b7 100644 --- a/src/include/daos_srv/container.h +++ b/src/include/daos_srv/container.h @@ -108,6 +108,9 @@ struct ds_cont_child { uint32_t sc_dtx_committable_count; uint32_t sc_dtx_committable_coll_count; + /* Last timestamp when EC aggregation reports -DER_INPROGRESS. */ + uint64_t sc_ec_agg_busy_ts; + /* The global minimum EC aggregation epoch, which will be upper * limit for VOS aggregation, i.e. EC object VOS aggregation can * not cross this limit. For simplification purpose, all objects diff --git a/src/include/daos_srv/vos.h b/src/include/daos_srv/vos.h index 3b838c2b4a60..47848cea167f 100644 --- a/src/include/daos_srv/vos.h +++ b/src/include/daos_srv/vos.h @@ -118,13 +118,14 @@ vos_dtx_load_mbs(daos_handle_t coh, struct dtx_id *dti, daos_unit_oid_t *oid, * \param coh [IN] Container open handle. * \param dtis [IN] The array for DTX identifiers to be committed. * \param count [IN] The count of DTXs to be committed. + * \param keep_act [IN] Keep DTX entry or not. * \param rm_cos [OUT] The array for whether remove entry from CoS cache. * * \return Negative value if error. * \return Others are for the count of committed DTXs. */ int -vos_dtx_commit(daos_handle_t coh, struct dtx_id dtis[], int count, bool rm_cos[]); +vos_dtx_commit(daos_handle_t coh, struct dtx_id dtis[], int count, bool keep_act, bool rm_cos[]); /** * Abort the specified DTXs. diff --git a/src/object/srv_ec_aggregate.c b/src/object/srv_ec_aggregate.c index 71c630fa947a..50b513d16120 100644 --- a/src/object/srv_ec_aggregate.c +++ b/src/object/srv_ec_aggregate.c @@ -2667,8 +2667,13 @@ cont_ec_aggregate_cb(struct ds_cont_child *cont, daos_epoch_range_t *epr, struct ec_agg_param *ec_agg_param = agg_param->ap_data; vos_iter_param_t iter_param = { 0 }; struct vos_iter_anchors anchors = { 0 }; + struct dtx_handle *dth = NULL; + struct dtx_share_peer *dsp; + struct dtx_id dti = { 0 }; + struct dtx_epoch epoch = { 0 }; + daos_unit_oid_t oid = { 0 }; + int blocks = 0; int rc = 0; - int blocks = 0; /* * Avoid calling into vos_aggregate() when aborting aggregation @@ -2715,8 +2720,32 @@ cont_ec_aggregate_cb(struct ds_cont_child *cont, daos_epoch_range_t *epr, agg_reset_entry(&ec_agg_param->ap_agg_entry, NULL, NULL); retry: + epoch.oe_value = epr->epr_hi; + rc = dtx_begin(cont->sc_hdl, &dti, &epoch, 0, cont->sc_pool->spc_map_version, &oid, + NULL, 0, 0, NULL, &dth); + if (rc != 0) + goto update_hae; + rc = vos_iterate(&iter_param, VOS_ITER_OBJ, true, &anchors, agg_iterate_pre_cb, - agg_iterate_post_cb, ec_agg_param, NULL); + agg_iterate_post_cb, ec_agg_param, dth); + if (rc == -DER_INPROGRESS && !d_list_empty(&dth->dth_share_tbd_list)) { + uint64_t now = daos_gettime_coarse(); + + /* Report warning per each 10 seconds to avoid log flood. */ + if (now - cont->sc_ec_agg_busy_ts > 10) { + while ((dsp = d_list_pop_entry(&dth->dth_share_tbd_list, + struct dtx_share_peer, dsp_link)) != NULL) { + D_WARN(DF_CONT ": EC aggregate hit non-committed DTX " DF_DTI "\n", + DP_CONT(cont->sc_pool->spc_uuid, cont->sc_uuid), + DP_DTI(&dsp->dsp_xid)); + dtx_dsp_free(dsp); + } + + cont->sc_ec_agg_busy_ts = now; + } + } + + dtx_end(dth, cont, rc); /* Post_cb may not being executed in some cases */ agg_clear_extents(&ec_agg_param->ap_agg_entry); diff --git a/src/object/srv_obj.c b/src/object/srv_obj.c index 041ea903c4fe..ac95190224f7 100644 --- a/src/object/srv_obj.c +++ b/src/object/srv_obj.c @@ -2019,17 +2019,19 @@ obj_local_rw(crt_rpc_t *rpc, struct obj_io_context *ioc, struct dtx_handle *dth) again: rc = obj_local_rw_internal_wrap(rpc, ioc, dth); if (dth != NULL && obj_dtx_need_refresh(dth, rc)) { - if (unlikely(++retry % 10 == 3)) { + if (unlikely(++retry % 10 == 9)) { dsp = d_list_entry(dth->dth_share_tbd_list.next, struct dtx_share_peer, dsp_link); D_WARN("DTX refresh for "DF_DTI" because of "DF_DTI" (%d) for %d times, " - "maybe dead loop\n", DP_DTI(&dth->dth_xid), DP_DTI(&dsp->dsp_xid), + "maybe starve\n", DP_DTI(&dth->dth_xid), DP_DTI(&dsp->dsp_xid), dth->dth_share_tbd_count, retry); } - rc = dtx_refresh(dth, ioc->ioc_coc); - if (rc == -DER_AGAIN) - goto again; + if (!obj_rpc_is_fetch(rpc) || retry < 30) { + rc = dtx_refresh(dth, ioc->ioc_coc); + if (rc == -DER_AGAIN) + goto again; + } } return rc; @@ -2687,7 +2689,7 @@ ds_obj_tgt_update_handler(crt_rpc_t *rpc) if (orw->orw_dti_cos.ca_count > 0) { rc = vos_dtx_commit(ioc.ioc_vos_coh, orw->orw_dti_cos.ca_arrays, - orw->orw_dti_cos.ca_count, NULL); + orw->orw_dti_cos.ca_count, false, NULL); if (rc < 0) { D_WARN(DF_UOID ": Failed to DTX CoS commit " DF_RC "\n", DP_UOID(orw->orw_oid), DP_RC(rc)); @@ -3537,11 +3539,11 @@ obj_local_punch(struct obj_punch_in *opi, crt_opcode_t opc, uint32_t shard_nr, u } if (obj_dtx_need_refresh(dth, rc)) { - if (unlikely(++retry % 10 == 3)) { + if (unlikely(++retry % 10 == 9)) { dsp = d_list_entry(dth->dth_share_tbd_list.next, struct dtx_share_peer, dsp_link); D_WARN("DTX refresh for "DF_DTI" because of "DF_DTI" (%d) for %d " - "times, maybe dead loop\n", DP_DTI(&dth->dth_xid), + "times, maybe starve\n", DP_DTI(&dth->dth_xid), DP_DTI(&dsp->dsp_xid), dth->dth_share_tbd_count, retry); } @@ -4884,11 +4886,11 @@ ds_cpd_handle_one_wrap(crt_rpc_t *rpc, struct daos_cpd_sub_head *dcsh, again: rc = ds_cpd_handle_one(rpc, dcsh, dcde, dcsrs, ioc, dth); if (obj_dtx_need_refresh(dth, rc)) { - if (unlikely(++retry % 10 == 3)) { + if (unlikely(++retry % 10 == 9)) { dsp = d_list_entry(dth->dth_share_tbd_list.next, struct dtx_share_peer, dsp_link); D_WARN("DTX refresh for "DF_DTI" because of "DF_DTI" (%d) for %d " - "times, maybe dead loop\n", DP_DTI(&dth->dth_xid), + "times, maybe starve\n", DP_DTI(&dth->dth_xid), DP_DTI(&dsp->dsp_xid), dth->dth_share_tbd_count, retry); } diff --git a/src/utils/ddb/ddb_vos.c b/src/utils/ddb/ddb_vos.c index b331e830fdd8..c766b76fc21a 100644 --- a/src/utils/ddb/ddb_vos.c +++ b/src/utils/ddb/ddb_vos.c @@ -1386,7 +1386,7 @@ dv_dtx_get_act_table(daos_handle_t coh, dv_dtx_act_handler handler_cb, void *han int dv_dtx_commit_active_entry(daos_handle_t coh, struct dtx_id *dti) { - return vos_dtx_commit(coh, dti, 1, NULL); + return vos_dtx_commit(coh, dti, 1, false, NULL); } int diff --git a/src/utils/ddb/tests/ddb_test_driver.c b/src/utils/ddb/tests/ddb_test_driver.c index 2f9c24f931fb..3f7769813703 100644 --- a/src/utils/ddb/tests/ddb_test_driver.c +++ b/src/utils/ddb/tests/ddb_test_driver.c @@ -511,7 +511,7 @@ dvt_vos_insert_dtx_records(daos_handle_t coh, uint32_t nr, uint32_t committed_nr /* commit */ for (i = 0; i < committed_nr; i++) - assert_int_equal(1, vos_dtx_commit(coh, &dth[i]->dth_xid, 1, NULL)); + assert_int_equal(1, vos_dtx_commit(coh, &dth[i]->dth_xid, 1, false, NULL)); /* end each dtx */ for (i = 0; i < nr; i++) diff --git a/src/vos/tests/vts_dtx.c b/src/vos/tests/vts_dtx.c index bd54dd52838c..d83d2356d150 100644 --- a/src/vos/tests/vts_dtx.c +++ b/src/vos/tests/vts_dtx.c @@ -227,7 +227,7 @@ vts_dtx_commit_visibility(struct io_test_args *args, bool ext, bool punch_obj) assert_memory_not_equal(update_buf, fetch_buf, UPDATE_BUF_SIZE); /* Commit the update DTX. */ - rc = vos_dtx_commit(args->ctx.tc_co_hdl, &xid, 1, NULL); + rc = vos_dtx_commit(args->ctx.tc_co_hdl, &xid, 1, false, NULL); assert_rc_equal(rc, 1); memset(fetch_buf, 0, UPDATE_BUF_SIZE); @@ -269,7 +269,7 @@ vts_dtx_commit_visibility(struct io_test_args *args, bool ext, bool punch_obj) assert_memory_equal(update_buf, fetch_buf, UPDATE_BUF_SIZE); /* Commit the punch DTX. */ - rc = vos_dtx_commit(args->ctx.tc_co_hdl, &xid, 1, NULL); + rc = vos_dtx_commit(args->ctx.tc_co_hdl, &xid, 1, false, NULL); assert_rc_equal(rc, 1); memset(fetch_buf, 0, UPDATE_BUF_SIZE); @@ -471,11 +471,11 @@ dtx_14(void **state) vts_dtx_end(dth); /* Commit the DTX. */ - rc = vos_dtx_commit(args->ctx.tc_co_hdl, &xid, 1, NULL); + rc = vos_dtx_commit(args->ctx.tc_co_hdl, &xid, 1, false, NULL); assert_rc_equal(rc, 1); /* Double commit the DTX is harmless. */ - rc = vos_dtx_commit(args->ctx.tc_co_hdl, &xid, 1, NULL); + rc = vos_dtx_commit(args->ctx.tc_co_hdl, &xid, 1, false, NULL); assert(rc >= 0); memset(fetch_buf, 0, UPDATE_BUF_SIZE); @@ -570,7 +570,7 @@ dtx_15(void **state) assert_memory_equal(update_buf1, fetch_buf, UPDATE_BUF_SIZE); /* Aborted DTX cannot be committed. */ - rc = vos_dtx_commit(args->ctx.tc_co_hdl, &xid, 1, NULL); + rc = vos_dtx_commit(args->ctx.tc_co_hdl, &xid, 1, false, NULL); assert(rc >= 0); memset(fetch_buf, 0, UPDATE_BUF_SIZE); @@ -649,7 +649,7 @@ dtx_16(void **state) assert_memory_equal(update_buf, fetch_buf, UPDATE_BUF_SIZE); /* Commit the DTX. */ - rc = vos_dtx_commit(args->ctx.tc_co_hdl, &dth->dth_xid, 1, NULL); + rc = vos_dtx_commit(args->ctx.tc_co_hdl, &dth->dth_xid, 1, false, NULL); assert_rc_equal(rc, 1); vts_dtx_end(dth); @@ -740,7 +740,7 @@ dtx_17(void **state) } /* Commit the first 4 DTXs. */ - rc = vos_dtx_commit(args->ctx.tc_co_hdl, xid, 4, NULL); + rc = vos_dtx_commit(args->ctx.tc_co_hdl, xid, 4, false, NULL); assert_rc_equal(rc, 4); param.ip_hdl = args->ctx.tc_co_hdl; @@ -767,7 +767,7 @@ dtx_17(void **state) } /* Commit the others. */ - rc = vos_dtx_commit(args->ctx.tc_co_hdl, &xid[4], 6, NULL); + rc = vos_dtx_commit(args->ctx.tc_co_hdl, &xid[4], 6, false, NULL); assert_rc_equal(rc, 6); memset(&anchors, 0, sizeof(anchors)); @@ -827,7 +827,7 @@ dtx_18(void **state) } /* Commit all DTXs. */ - rc = vos_dtx_commit(args->ctx.tc_co_hdl, xid, 10, NULL); + rc = vos_dtx_commit(args->ctx.tc_co_hdl, xid, 10, false, NULL); assert_rc_equal(rc, 10); for (i = 0; i < 10; i++) { diff --git a/src/vos/tests/vts_io.c b/src/vos/tests/vts_io.c index 2f084a2d99d8..affcf6242bea 100644 --- a/src/vos/tests/vts_io.c +++ b/src/vos/tests/vts_io.c @@ -2820,7 +2820,7 @@ io_query_key(void **state) xid = dth->dth_xid; vts_dtx_end(dth); - rc = vos_dtx_commit(arg->ctx.tc_co_hdl, &xid, 1, NULL); + rc = vos_dtx_commit(arg->ctx.tc_co_hdl, &xid, 1, false, NULL); assert_rc_equal(rc, 1); rc = vos_obj_query_key(arg->ctx.tc_co_hdl, oid, DAOS_GET_DKEY | diff --git a/src/vos/tests/vts_mvcc.c b/src/vos/tests/vts_mvcc.c index 6c625d2051a9..907b6957cf13 100644 --- a/src/vos/tests/vts_mvcc.c +++ b/src/vos/tests/vts_mvcc.c @@ -249,7 +249,7 @@ stop_tx(daos_handle_t coh, struct tx_helper *txh, bool success, bool write) vts_dtx_end(dth); if (txh->th_nr_mods != 0) { if (success && !txh->th_skip_commit) { - err = vos_dtx_commit(coh, &xid, 1, NULL); + err = vos_dtx_commit(coh, &xid, 1, false, NULL); assert(err >= 0); } else { if (!success) @@ -1297,7 +1297,7 @@ conflicting_rw_exec_one(struct io_test_args *arg, int i, int j, bool empty, if (!daos_is_zero_dti(&txh1.th_saved_xid)) { if (txh1.th_skip_commit) { rc = vos_dtx_commit(arg->ctx.tc_co_hdl, - &txh1.th_saved_xid, 1, NULL); + &txh1.th_saved_xid, 1, false, NULL); assert(rc >= 0); } if (expect_inprogress) { @@ -1563,7 +1563,7 @@ uncertainty_check_exec_one(struct io_test_args *arg, int i, int j, bool empty, if (!daos_is_zero_dti(&wtx->th_saved_xid)) { if (wtx->th_skip_commit) { rc = vos_dtx_commit(arg->ctx.tc_co_hdl, - &wtx->th_saved_xid, 1, NULL); + &wtx->th_saved_xid, 1, false, NULL); assert(rc >= 0); } } @@ -1571,7 +1571,7 @@ uncertainty_check_exec_one(struct io_test_args *arg, int i, int j, bool empty, if (!daos_is_zero_dti(&atx->th_saved_xid)) { if (atx->th_skip_commit) { rc = vos_dtx_commit(arg->ctx.tc_co_hdl, - &atx->th_saved_xid, 1, NULL); + &atx->th_saved_xid, 1, false, NULL); assert(rc >= 0); } } diff --git a/src/vos/tests/vts_pm.c b/src/vos/tests/vts_pm.c index 99e53fc71b68..7df7b39da270 100644 --- a/src/vos/tests/vts_pm.c +++ b/src/vos/tests/vts_pm.c @@ -1068,7 +1068,7 @@ obj_punch_op(void **state, daos_handle_t coh, daos_unit_oid_t oid, assert_rc_equal(rc, 0); - rc = vos_dtx_commit(coh, &xid, 1, NULL); + rc = vos_dtx_commit(coh, &xid, 1, false, NULL); assert_rc_equal(rc, 1); } @@ -1095,7 +1095,7 @@ cond_dkey_punch_op(void **state, daos_handle_t coh, daos_unit_oid_t oid, assert_rc_equal(rc, expected_rc); if (expected_rc == 0) { - rc = vos_dtx_commit(coh, &xid, 1, NULL); + rc = vos_dtx_commit(coh, &xid, 1, false, NULL); assert_rc_equal(rc, 1); } } @@ -1128,7 +1128,7 @@ cond_akey_punch_op(void **state, daos_handle_t coh, daos_unit_oid_t oid, assert_rc_equal(rc, expected_rc); if (expected_rc == 0) { - rc = vos_dtx_commit(coh, &xid, 1, NULL); + rc = vos_dtx_commit(coh, &xid, 1, false, NULL); assert_rc_equal(rc, 1); } } @@ -1240,7 +1240,7 @@ cond_updaten_op_(void **state, daos_handle_t coh, daos_unit_oid_t oid, vts_dtx_end(dth); if (expected_rc == 0) { - rc = vos_dtx_commit(coh, &xid, 1, NULL); + rc = vos_dtx_commit(coh, &xid, 1, false, NULL); assert_rc_equal(rc, 1); } @@ -1726,7 +1726,7 @@ minor_epoch_punch_sv(void **state) vts_dtx_end(dth); assert_rc_equal(rc, 0); - rc = vos_dtx_commit(arg->ctx.tc_co_hdl, &xid, 1, NULL); + rc = vos_dtx_commit(arg->ctx.tc_co_hdl, &xid, 1, false, NULL); assert_rc_equal(rc, 1); /* Now read back original # of bytes */ @@ -1822,7 +1822,7 @@ minor_epoch_punch_array(void **state) vts_dtx_end(dth); assert_rc_equal(rc, 0); - rc = vos_dtx_commit(arg->ctx.tc_co_hdl, &xid, 1, NULL); + rc = vos_dtx_commit(arg->ctx.tc_co_hdl, &xid, 1, false, NULL); assert_rc_equal(rc, 1); /* Now read back original # of bytes */ @@ -2303,7 +2303,7 @@ test_inprogress_parent_punch(void **state) assert_rc_equal(rc, 0); xid2 = dth2->dth_xid; vts_dtx_end(dth2); - rc = vos_dtx_commit(arg->ctx.tc_co_hdl, &xid2, 1, NULL); + rc = vos_dtx_commit(arg->ctx.tc_co_hdl, &xid2, 1, false, NULL); assert_rc_equal(rc, 1); /** Now try to punch akey 2, should fail */ @@ -2314,7 +2314,7 @@ test_inprogress_parent_punch(void **state) assert_rc_equal(rc, -DER_INPROGRESS); /** Now commit the in progress punch and try again */ - rc = vos_dtx_commit(arg->ctx.tc_co_hdl, &xid1, 1, NULL); + rc = vos_dtx_commit(arg->ctx.tc_co_hdl, &xid1, 1, false, NULL); assert_rc_equal(rc, 1); rc = vos_obj_punch(arg->ctx.tc_co_hdl, oid, epoch, 0, 0, &dkey, 1, @@ -2322,7 +2322,7 @@ test_inprogress_parent_punch(void **state) assert_rc_equal(rc, 0); xid2 = dth2->dth_xid; vts_dtx_end(dth2); - rc = vos_dtx_commit(arg->ctx.tc_co_hdl, &xid2, 1, NULL); + rc = vos_dtx_commit(arg->ctx.tc_co_hdl, &xid2, 1, false, NULL); assert_rc_equal(rc, 1); memset(buf, 'x', sizeof(buf)); @@ -2583,9 +2583,8 @@ many_tx(void **state) writes++; vts_dtx_end(req[cur_tx].dth); if (req[old_tx].commit) { - rc = vos_dtx_commit(coh, - &req[old_tx].xid, 1, - NULL); + rc = vos_dtx_commit(coh, &req[old_tx].xid, 1, + false, NULL); assert_rc_equal(rc, 1); } memset(&req[old_tx], 0, sizeof(req[0])); @@ -2604,7 +2603,7 @@ many_tx(void **state) memset(&req[old_tx], 0, sizeof(req[0])); continue; } - rc = vos_dtx_commit(coh, &req[old_tx].xid, 1, NULL); + rc = vos_dtx_commit(coh, &req[old_tx].xid, 1, false, NULL); assert_rc_equal(rc, 1); memset(&req[old_tx], 0, sizeof(req[0])); } @@ -2668,7 +2667,7 @@ execute_op(daos_handle_t coh, daos_unit_oid_t oid, daos_epoch_t epoch, do_commit: vts_dtx_end(req.dth); if (commit && req.commit) { - rc = vos_dtx_commit(coh, &req.xid, 1, NULL); + rc = vos_dtx_commit(coh, &req.xid, 1, false, NULL); assert_rc_equal(rc, 1); } @@ -2720,7 +2719,7 @@ uncommitted_parent(void **state) execute_op(coh, oid, epoch, &dkey, &akey[1], &sgl, first, 5, true, TX_OP_UPDATE1); /** Commit the punch */ - rc = vos_dtx_commit(coh, &xid, 1, NULL); + rc = vos_dtx_commit(coh, &xid, 1, false, NULL); assert_rc_equal(rc, 1); memset(buf, 'x', sizeof(buf)); @@ -2789,7 +2788,7 @@ test_uncommitted_key(void **state) assert_rc_equal(rc, 0); /** Commit the update */ - rc = vos_dtx_commit(coh, &xid, 1, NULL); + rc = vos_dtx_commit(coh, &xid, 1, false, NULL); assert_rc_equal(rc, 1); memset(buf, 'x', sizeof(buf)); @@ -2888,7 +2887,7 @@ test_multiple_key_conditionals_common(void **state, bool with_dtx) assert_rc_equal(rc, 0); if (with_dtx) { vts_dtx_end(dth); - rc = vos_dtx_commit(arg->ctx.tc_co_hdl, &xid, 1, NULL); + rc = vos_dtx_commit(arg->ctx.tc_co_hdl, &xid, 1, false, NULL); assert_rc_equal(rc, 1); } @@ -2916,7 +2915,7 @@ test_multiple_key_conditionals_common(void **state, bool with_dtx) assert_rc_equal(rc, 0); if (with_dtx) { vts_dtx_end(dth); - rc = vos_dtx_commit(arg->ctx.tc_co_hdl, &xid, 1, NULL); + rc = vos_dtx_commit(arg->ctx.tc_co_hdl, &xid, 1, false, NULL); assert_rc_equal(rc, 1); } @@ -2933,7 +2932,7 @@ test_multiple_key_conditionals_common(void **state, bool with_dtx) assert_rc_equal(rc, 0); if (with_dtx) { vts_dtx_end(dth); - rc = vos_dtx_commit(arg->ctx.tc_co_hdl, &xid, 1, NULL); + rc = vos_dtx_commit(arg->ctx.tc_co_hdl, &xid, 1, false, NULL); assert_rc_equal(rc, 1); } @@ -2976,7 +2975,7 @@ test_multiple_key_conditionals_common(void **state, bool with_dtx) assert_rc_equal(rc, 0); if (with_dtx) { vts_dtx_end(dth); - rc = vos_dtx_commit(arg->ctx.tc_co_hdl, &xid, 1, NULL); + rc = vos_dtx_commit(arg->ctx.tc_co_hdl, &xid, 1, false, NULL); assert_rc_equal(rc, 1); } diff --git a/src/vos/vos_common.c b/src/vos/vos_common.c index cd2f2a5a6930..385df8d06c3c 100644 --- a/src/vos/vos_common.c +++ b/src/vos/vos_common.c @@ -408,7 +408,7 @@ vos_tx_end(struct vos_container *cont, struct dtx_handle *dth_in, cont->vc_solo_dtx_epoch < dth->dth_epoch) cont->vc_solo_dtx_epoch = dth->dth_epoch; - vos_dtx_post_handle(cont, &dae, &dce, 1, false, err != 0); + vos_dtx_post_handle(cont, &dae, &dce, 1, false, err != 0, false); } else { D_ASSERT(dce == NULL); if (err == 0 && dth->dth_active) { diff --git a/src/vos/vos_dtx.c b/src/vos/vos_dtx.c index 0a325088a778..872d5fdf1e98 100644 --- a/src/vos/vos_dtx.c +++ b/src/vos/vos_dtx.c @@ -175,7 +175,7 @@ dtx_inprogress(struct vos_dtx_act_ent *dae, struct dtx_handle *dth, static void dtx_act_ent_cleanup(struct vos_container *cont, struct vos_dtx_act_ent *dae, - struct dtx_handle *dth, bool evict) + struct dtx_handle *dth, bool evict, bool keep_df) { if (evict) { daos_unit_oid_t *oids; @@ -211,8 +211,10 @@ dtx_act_ent_cleanup(struct vos_container *cont, struct vos_dtx_act_ent *dae, dae->dae_rec_cap = 0; DAE_REC_CNT(dae) = 0; - dae->dae_df_off = UMOFF_NULL; - dae->dae_dbd = NULL; + if (!keep_df) { + dae->dae_df_off = UMOFF_NULL; + dae->dae_dbd = NULL; + } } static int @@ -271,7 +273,7 @@ dtx_act_ent_free(struct btr_instance *tins, struct btr_record *rec, D_ASSERT(dae != NULL); *(struct vos_dtx_act_ent **)args = dae; } else if (dae != NULL) { - dtx_act_ent_cleanup(tins->ti_priv, dae, NULL, true); + dtx_act_ent_cleanup(tins->ti_priv, dae, NULL, true, false); } return 0; @@ -623,8 +625,7 @@ do_dtx_rec_release(struct umem_instance *umm, struct vos_container *cont, } static int -dtx_rec_release(struct vos_container *cont, struct vos_dtx_act_ent *dae, - bool abort) +dtx_rec_release(struct vos_container *cont, struct vos_dtx_act_ent *dae, bool abort, bool keep_act) { struct umem_instance *umm = vos_cont2umm(cont); struct vos_dtx_act_ent_df *dae_df; @@ -657,13 +658,6 @@ dtx_rec_release(struct vos_container *cont, struct vos_dtx_act_ent *dae, abort ? "abort" : "commit", DP_DTI(&DAE_XID(dae)), dbd, DP_UUID(cont->vc_pool->vp_id), DP_UUID(cont->vc_id)); - if (!UMOFF_IS_NULL(dae_df->dae_mbs_off)) { - /* dae_mbs_off will be invalid via flag DTE_INVALID. */ - rc = umem_free(umm, dae_df->dae_mbs_off); - if (rc != 0) - return rc; - } - if (dae->dae_records != NULL) { D_ASSERT(DAE_REC_CNT(dae) > DTX_INLINE_REC_CNT); @@ -693,6 +687,35 @@ dtx_rec_release(struct vos_container *cont, struct vos_dtx_act_ent *dae, return rc; } + if (keep_act) { + /* + * If it is required to keep the active DTX entry, then it must be for partial + * commit. Let's mark it as DTE_PARTIAL_COMMITTED. + */ + if ((DAE_FLAGS(dae) & DTE_PARTIAL_COMMITTED)) + return 0; + + rc = umem_tx_add_ptr(umm, &dae_df->dae_rec_off, sizeof(dae_df->dae_rec_off)); + if (rc != 0) + return rc; + + rc = umem_tx_add_ptr(umm, &dae_df->dae_flags, sizeof(dae_df->dae_flags)); + if (rc != 0) + return rc; + + dae_df->dae_rec_off = UMOFF_NULL; + dae_df->dae_flags |= DTE_PARTIAL_COMMITTED; + + return 0; + } + + if (!UMOFF_IS_NULL(dae_df->dae_mbs_off)) { + /* dae_mbs_off will be invalid via flag DTE_INVALID. */ + rc = umem_free(umm, dae_df->dae_mbs_off); + if (rc != 0) + return rc; + } + if (dbd->dbd_count > 1 || dbd->dbd_index < dbd->dbd_cap) { rc = umem_tx_add_ptr(umm, &dae_df->dae_flags, sizeof(dae_df->dae_flags)); if (rc != 0) @@ -764,7 +787,7 @@ dtx_rec_release(struct vos_container *cont, struct vos_dtx_act_ent *dae, static int vos_dtx_commit_one(struct vos_container *cont, struct dtx_id *dti, daos_epoch_t epoch, - daos_epoch_t cmt_time, struct vos_dtx_cmt_ent **dce_p, + daos_epoch_t cmt_time, bool keep_act, struct vos_dtx_cmt_ent **dce_p, struct vos_dtx_act_ent **dae_p, bool *rm_cos) { struct vos_dtx_act_ent *dae = NULL; @@ -813,7 +836,7 @@ vos_dtx_commit_one(struct vos_container *cont, struct dtx_id *dti, daos_epoch_t rc = dbtree_delete(cont->vc_dtx_active_hdl, BTR_PROBE_BYPASS, &kiov, &dae); if (rc == 0) { - dtx_act_ent_cleanup(cont, dae, NULL, false); + dtx_act_ent_cleanup(cont, dae, NULL, false, false); dtx_evict_lid(cont, dae); } @@ -827,43 +850,48 @@ vos_dtx_commit_one(struct vos_container *cont, struct dtx_id *dti, daos_epoch_t D_GOTO(out, rc = -DER_ALREADY); } - D_ALLOC_PTR(dce); - if (dce == NULL) - D_GOTO(out, rc = -DER_NOMEM); + /* Generate committed DTX entry when it is not required to keep the active DTX entry. */ + if (!keep_act) { + D_ALLOC_PTR(dce); + if (dce == NULL) + D_GOTO(out, rc = -DER_NOMEM); - DCE_CMT_TIME(dce) = cmt_time; - if (dae != NULL) { - DCE_XID(dce) = DAE_XID(dae); - DCE_EPOCH(dce) = DAE_EPOCH(dae); - } else { - struct dtx_handle *dth = vos_dth_get(false); + DCE_CMT_TIME(dce) = cmt_time; + if (dae != NULL) { + DCE_XID(dce) = DAE_XID(dae); + DCE_EPOCH(dce) = DAE_EPOCH(dae); + } else { + struct dtx_handle *dth = vos_dth_get(false); - D_ASSERT(!cont->vc_pool->vp_sysdb); - D_ASSERT(dtx_is_valid_handle(dth)); - D_ASSERT(dth->dth_solo); + D_ASSERT(!cont->vc_pool->vp_sysdb); + D_ASSERT(dtx_is_valid_handle(dth)); + D_ASSERT(dth->dth_solo); - dae = dth->dth_ent; - D_ASSERT(dae != NULL); + dae = dth->dth_ent; + D_ASSERT(dae != NULL); - DCE_XID(dce) = *dti; - DCE_EPOCH(dce) = dth->dth_epoch; - } + DCE_XID(dce) = *dti; + DCE_EPOCH(dce) = dth->dth_epoch; + } - d_iov_set(&riov, dce, sizeof(*dce)); - rc = dbtree_upsert(cont->vc_dtx_committed_hdl, BTR_PROBE_EQ, - DAOS_INTENT_UPDATE, &kiov, &riov, NULL); - if (rc != 0) - goto out; + d_iov_set(&riov, dce, sizeof(*dce)); + rc = dbtree_upsert(cont->vc_dtx_committed_hdl, BTR_PROBE_EQ, + DAOS_INTENT_UPDATE, &kiov, &riov, NULL); + if (rc != 0) + goto out; - *dce_p = dce; - dce = NULL; + *dce_p = dce; + dce = NULL; + } else { + D_ASSERT(rm_cos == NULL); + } dae->dae_committing = 1; if (epoch != 0) goto out; - rc = dtx_rec_release(cont, dae, false); + rc = dtx_rec_release(cont, dae, false, keep_act); if (rc != 0) goto out; @@ -1171,16 +1199,19 @@ vos_dtx_check_availability(daos_handle_t coh, uint32_t entry, } if (intent == DAOS_INTENT_PURGE) { - uint32_t age = d_hlc_age2sec(DAE_XID(dae).dti_hlc); + uint64_t now = daos_gettime_coarse(); /* * The DTX entry still references related data record, * then we cannot (vos) aggregate related data record. + * Report warning per each 10 seconds to avoid log flood. */ - if (age >= DAOS_AGG_THRESHOLD) - D_WARN("DTX "DF_DTI" (state:%u, age:%u) still references the data, " - "cannot be (VOS) aggregated\n", - DP_DTI(&DAE_XID(dae)), vos_dtx_status(dae), age); + D_CDEBUG(now - cont->vc_agg_busy_ts > 10, DLOG_WARN, DB_TRACE, + "DTX "DF_DTI" (state:%u, flags:%x, age:%u) still references the data, " + "cannot be (VOS) aggregated\n", DP_DTI(&DAE_XID(dae)), vos_dtx_status(dae), + DAE_FLAGS(dae), (unsigned int)d_hlc_age2sec(DAE_XID(dae).dti_hlc)); + if (now - cont->vc_agg_busy_ts > 10) + cont->vc_agg_busy_ts = now; return ALB_AVAILABLE_DIRTY; } @@ -1666,7 +1697,7 @@ vos_dtx_prepared(struct dtx_handle *dth, struct vos_dtx_cmt_ent **dce_p) dae->dae_committing = 1; else rc = vos_dtx_commit_internal(cont, &dth->dth_xid, 1, - dth->dth_epoch, NULL, NULL, dce_p); + dth->dth_epoch, false, NULL, NULL, dce_p); dth->dth_active = 0; dth->dth_pinned = 0; if (rc >= 0) { @@ -1977,7 +2008,7 @@ vos_dtx_load_mbs(daos_handle_t coh, struct dtx_id *dti, daos_unit_oid_t *oid, int vos_dtx_commit_internal(struct vos_container *cont, struct dtx_id dtis[], - int count, daos_epoch_t epoch, bool rm_cos[], + int count, daos_epoch_t epoch, bool keep_act, bool rm_cos[], struct vos_dtx_act_ent **daes, struct vos_dtx_cmt_ent **dces) { struct vos_cont_df *cont_df = cont->vc_cont_df; @@ -2010,7 +2041,7 @@ vos_dtx_commit_internal(struct vos_container *cont, struct dtx_id dtis[], again: for (j = dbd->dbd_count; j < dbd->dbd_cap && i < count; i++) { - rc = vos_dtx_commit_one(cont, &dtis[i], epoch, cmt_time, &dces[i], + rc = vos_dtx_commit_one(cont, &dtis[i], epoch, cmt_time, keep_act, &dces[i], daes != NULL ? &daes[i] : NULL, rm_cos != NULL ? &rm_cos[i] : NULL); if (rc == 0 && (daes == NULL || daes[i] != NULL)) @@ -2111,7 +2142,7 @@ void vos_dtx_post_handle(struct vos_container *cont, struct vos_dtx_act_ent **daes, struct vos_dtx_cmt_ent **dces, - int count, bool abort, bool rollback) + int count, bool abort, bool rollback, bool keep_act) { d_iov_t kiov; int rc; @@ -2170,6 +2201,18 @@ vos_dtx_post_handle(struct vos_container *cont, if (daes[i] == NULL) continue; + /* + * If it is required to keep the active DTX entry, then it must be for partial + * commit. Let's mark it as DTE_PARTIAL_COMMITTED. + */ + if (!abort && keep_act) { + DAE_FLAGS(daes[i]) |= DTE_PARTIAL_COMMITTED; + + daes[i]->dae_committing = 0; + dtx_act_ent_cleanup(cont, daes[i], NULL, false, true); + continue; + } + d_iov_set(&kiov, &DAE_XID(daes[i]), sizeof(DAE_XID(daes[i]))); rc = dbtree_delete(cont->vc_dtx_active_hdl, BTR_PROBE_EQ, &kiov, NULL); @@ -2192,13 +2235,13 @@ vos_dtx_post_handle(struct vos_container *cont, daes[i]->dae_aborted = 1; daes[i]->dae_aborting = 0; - dtx_act_ent_cleanup(cont, daes[i], NULL, true); + dtx_act_ent_cleanup(cont, daes[i], NULL, true, false); } else { D_ASSERT(daes[i]->dae_aborting == 0); daes[i]->dae_committed = 1; daes[i]->dae_committing = 0; - dtx_act_ent_cleanup(cont, daes[i], NULL, false); + dtx_act_ent_cleanup(cont, daes[i], NULL, false, false); } DAE_FLAGS(daes[i]) &= ~(DTE_CORRUPTED | DTE_ORPHAN | DTE_PARTIAL_COMMITTED); } @@ -2206,7 +2249,7 @@ vos_dtx_post_handle(struct vos_container *cont, } int -vos_dtx_commit(daos_handle_t coh, struct dtx_id dtis[], int count, bool rm_cos[]) +vos_dtx_commit(daos_handle_t coh, struct dtx_id dtis[], int count, bool keep_act, bool rm_cos[]) { struct vos_dtx_act_ent **daes = NULL; struct vos_dtx_cmt_ent **dces = NULL; @@ -2230,14 +2273,15 @@ vos_dtx_commit(daos_handle_t coh, struct dtx_id dtis[], int count, bool rm_cos[] /* Commit multiple DTXs via single local transaction. */ rc = umem_tx_begin(vos_cont2umm(cont), NULL); if (rc == 0) { - committed = vos_dtx_commit_internal(cont, dtis, count, 0, rm_cos, daes, dces); + committed = vos_dtx_commit_internal(cont, dtis, count, 0, + keep_act, rm_cos, daes, dces); if (committed >= 0) { rc = umem_tx_commit(vos_cont2umm(cont)); D_ASSERT(rc == 0); } else { rc = umem_tx_abort(vos_cont2umm(cont), committed); } - vos_dtx_post_handle(cont, daes, dces, count, false, rc != 0); + vos_dtx_post_handle(cont, daes, dces, count, false, rc != 0, keep_act); } out: @@ -2267,7 +2311,7 @@ vos_dtx_abort_internal(struct vos_container *cont, struct vos_dtx_act_ent *dae, dth->dth_need_validation = 1; } - rc = dtx_rec_release(cont, dae, true); + rc = dtx_rec_release(cont, dae, true, false); dae->dae_preparing = 0; if (rc == 0) { dae->dae_aborting = 1; @@ -2295,7 +2339,7 @@ vos_dtx_abort_internal(struct vos_container *cont, struct vos_dtx_act_ent *dae, out: if (rc == 0 || force) - vos_dtx_post_handle(cont, &dae, NULL, 1, true, false); + vos_dtx_post_handle(cont, &dae, NULL, 1, true, false, false); else if (rc != 0) dae->dae_aborting = 0; @@ -2341,7 +2385,7 @@ vos_dtx_abort(daos_handle_t coh, struct dtx_id *dti, daos_epoch_t epoch) * table at that time, then need to be removed again via vos_dtx_post_handle. */ if (dae->dae_aborted) - vos_dtx_post_handle(cont, &dae, NULL, 1, true, false); + vos_dtx_post_handle(cont, &dae, NULL, 1, true, false, false); D_GOTO(out, rc = -DER_ALREADY); } @@ -2422,9 +2466,6 @@ vos_dtx_set_flags_one(struct vos_container *cont, struct dtx_id *dti, uint32_t f DL_CDEBUG(rc != 0, DLOG_ERR, DLOG_WARN, rc, "Mark the DTX entry " DF_DTI " as %s", DP_DTI(dti), vos_dtx_flags2name(flags)); - if ((rc == -DER_NO_PERM || rc == -DER_NONEXIST) && flags == DTE_PARTIAL_COMMITTED) - rc = 0; - return rc; } @@ -2443,7 +2484,7 @@ vos_dtx_set_flags(daos_handle_t coh, struct dtx_id dtis[], int count, uint32_t f D_ASSERT(cont != NULL); /* Only allow set single flags. */ - if (flags != DTE_CORRUPTED && flags != DTE_ORPHAN && flags != DTE_PARTIAL_COMMITTED) { + if (flags != DTE_CORRUPTED && flags != DTE_ORPHAN) { D_ERROR("Try to set unrecognized flags %x on DTX "DF_DTI", count %u\n", flags, DP_DTI(&dtis[0]), count); D_GOTO(out, rc = -DER_INVAL); @@ -2935,7 +2976,7 @@ vos_dtx_cleanup_internal(struct dtx_handle *dth) */ if (dae != NULL) { D_ASSERT(!vos_dae_is_prepare(dae)); - dtx_act_ent_cleanup(cont, dae, dth, true); + dtx_act_ent_cleanup(cont, dae, dth, true, false); } } else { d_iov_set(&kiov, &dth->dth_xid, sizeof(dth->dth_xid)); @@ -2958,7 +2999,7 @@ vos_dtx_cleanup_internal(struct dtx_handle *dth) if (DAE_EPOCH(dae) != dth->dth_epoch) goto out; - dtx_act_ent_cleanup(cont, dae, dth, true); + dtx_act_ent_cleanup(cont, dae, dth, true, false); rc = dbtree_delete(cont->vc_dtx_active_hdl, riov.iov_buf != NULL ? BTR_PROBE_BYPASS : BTR_PROBE_EQ, @@ -3091,7 +3132,7 @@ vos_dtx_attach(struct dtx_handle *dth, bool persistent, bool exist) dae->dae_preparing = 0; if (dth->dth_solo) - vos_dtx_post_handle(cont, &dae, &dce, 1, false, rc != 0); + vos_dtx_post_handle(cont, &dae, &dce, 1, false, rc != 0, false); else if (rc == 0) dae->dae_prepared = 1; } diff --git a/src/vos/vos_internal.h b/src/vos/vos_internal.h index 9441ba452657..9b1ae3061895 100644 --- a/src/vos/vos_internal.h +++ b/src/vos/vos_internal.h @@ -353,6 +353,8 @@ struct vos_container { daos_epoch_range_t vc_epr_aggregation; /* Current ongoing discard EPR */ daos_epoch_range_t vc_epr_discard; + /* Last timestamp when VOS aggregation reports -DER_TX_BUSY */ + uint64_t vc_agg_busy_ts; /* Last timestamp when VOS aggregation reporting ENOSPACE */ uint64_t vc_agg_nospc_ts; /* Last timestamp when IO reporting ENOSPACE */ @@ -757,7 +759,7 @@ vos_dtx_prepared(struct dtx_handle *dth, struct vos_dtx_cmt_ent **dce_p); int vos_dtx_commit_internal(struct vos_container *cont, struct dtx_id dtis[], - int count, daos_epoch_t epoch, bool rm_cos[], + int count, daos_epoch_t epoch, bool keep_act, bool rm_cos[], struct vos_dtx_act_ent **daes, struct vos_dtx_cmt_ent **dces); int @@ -767,7 +769,7 @@ void vos_dtx_post_handle(struct vos_container *cont, struct vos_dtx_act_ent **daes, struct vos_dtx_cmt_ent **dces, - int count, bool abort, bool rollback); + int count, bool abort, bool rollback, bool keep_act); /** * Establish indexed active DTX table in DRAM. diff --git a/src/vos/vos_io.c b/src/vos/vos_io.c index 7aa3c897755f..83595e050f27 100644 --- a/src/vos/vos_io.c +++ b/src/vos/vos_io.c @@ -2590,7 +2590,7 @@ vos_update_end(daos_handle_t ioh, uint32_t pm_ver, daos_key_t *dkey, int err, D_GOTO(abort, err = -DER_NOMEM); err = vos_dtx_commit_internal(ioc->ic_cont, dth->dth_dti_cos, - dth->dth_dti_cos_count, 0, NULL, daes, dces); + dth->dth_dti_cos_count, 0, false, NULL, daes, dces); if (err < 0) goto abort; if (err == 0) @@ -2669,7 +2669,7 @@ vos_update_end(daos_handle_t ioh, uint32_t pm_ver, daos_key_t *dkey, int err, if (daes != NULL) vos_dtx_post_handle(ioc->ic_cont, daes, dces, dth->dth_dti_cos_count, - false, err != 0); + false, err != 0, false); } if (err != 0) diff --git a/src/vos/vos_obj.c b/src/vos/vos_obj.c index cc72575f6082..2711e916322c 100644 --- a/src/vos/vos_obj.c +++ b/src/vos/vos_obj.c @@ -508,7 +508,7 @@ vos_obj_punch(daos_handle_t coh, daos_unit_oid_t oid, daos_epoch_t epoch, D_GOTO(reset, rc = -DER_NOMEM); rc = vos_dtx_commit_internal(cont, dth->dth_dti_cos, - dth->dth_dti_cos_count, 0, NULL, daes, dces); + dth->dth_dti_cos_count, 0, false, NULL, daes, dces); if (rc < 0) goto reset; if (rc == 0) @@ -581,7 +581,7 @@ vos_obj_punch(daos_handle_t coh, daos_unit_oid_t oid, daos_epoch_t epoch, if (daes != NULL) vos_dtx_post_handle(cont, daes, dces, dth->dth_dti_cos_count, - false, rc != 0); + false, rc != 0, false); } if (obj != NULL)