From a20867799c425346a2616d00b3ab5e328c6b9d82 Mon Sep 17 00:00:00 2001 From: Fan Yong Date: Mon, 28 Oct 2024 14:23:21 +0800 Subject: [PATCH] DAOS-16721 dtx: handle potential DTX ID reusing trouble The patch contains the following improvements: 1. When VOS level logic returns -DER_TX_RESATRT, the object level RPC handler should set 'RESEND' flag then restart the transaction with newer epoch. Because dtx_abort() logic cannot guarantee all former prepared DTX entries (on all related participants) can be aborted, especially if the former one failed for some network trouble, that may cause restarted transaction hit -DER_TX_ID_REUSED unexpectedly. 2. Compare the epoch for DTX entries with the same transaction ID for distinguishing potential reused TX ID more accurately. 3. Add DTX entry into DTX CoS cache if cannot commit it synchronously. Then subsequent batched commit logic can handle it. 4. If server complains suspected TX ID reusing, then reports -EIO to related application instead of assertion on client. 5. Control DTX related warning message frequency to avoid log flood. 6. Collect more information when generate some error/warning message. Allow-unstable-test: true Signed-off-by: Fan Yong --- src/dtx/dtx_coll.c | 8 ++- src/dtx/dtx_common.c | 83 ++++++++++++---------- src/dtx/dtx_rpc.c | 15 ++-- src/dtx/dtx_srv.c | 5 +- src/include/daos_srv/container.h | 3 + src/object/cli_coll.c | 2 +- src/object/cli_obj.c | 23 ++++-- src/object/cli_shard.c | 13 ++-- src/object/obj_internal.h | 2 +- src/object/obj_utils.c | 17 +++-- src/object/srv_ec_aggregate.c | 33 ++++++++- src/object/srv_obj.c | 116 ++++++++++++++++++------------- src/object/srv_obj_remote.c | 12 ++-- src/vos/vos_dtx.c | 63 ++++++++++++----- src/vos/vos_internal.h | 2 + 15 files changed, 254 insertions(+), 143 deletions(-) diff --git a/src/dtx/dtx_coll.c b/src/dtx/dtx_coll.c index 7e7c8199155..ba45aaa2616 100644 --- a/src/dtx/dtx_coll.c +++ b/src/dtx/dtx_coll.c @@ -81,9 +81,11 @@ dtx_coll_prep_ult(void *arg) } if (dcpa->dcpa_result != 0) { - if (dcpa->dcpa_result != -DER_INPROGRESS && dcpa->dcpa_result != -DER_NONEXIST) - D_ERROR("Failed to load mbs for "DF_DTI", opc %u: "DF_RC"\n", - DP_DTI(&dci->dci_xid), opc, DP_RC(rc)); + if (dcpa->dcpa_result < 0 && + dcpa->dcpa_result != -DER_INPROGRESS && dcpa->dcpa_result != -DER_NONEXIST) + D_ERROR("Failed to load mbs for "DF_DTI" in "DF_UUID"/"DF_UUID", opc %u: " + DF_RC"\n", DP_DTI(&dci->dci_xid), DP_UUID(dci->dci_po_uuid), + DP_UUID(dci->dci_co_uuid), opc, DP_RC(dcpa->dcpa_result)); goto out; } diff --git a/src/dtx/dtx_common.c b/src/dtx/dtx_common.c index ecb156729ed..1ee74ae11a4 100644 --- a/src/dtx/dtx_common.c +++ b/src/dtx/dtx_common.c @@ -1271,7 +1271,6 @@ dtx_leader_end(struct dtx_leader_handle *dlh, struct ds_cont_hdl *coh, int resul int status = -1; int rc = 0; bool aborted = false; - bool unpin = false; D_ASSERT(cont != NULL); @@ -1339,7 +1338,7 @@ dtx_leader_end(struct dtx_leader_handle *dlh, struct ds_cont_hdl *coh, int resul * it persistently. Otherwise, the subsequent DTX resync may not find it as * to regard it as failed transaction and abort it. */ - if (result == 0 && !dth->dth_active && !dth->dth_prepared && !dth->dth_solo && + if (!dth->dth_active && !dth->dth_prepared && (dth->dth_dist || dth->dth_modification_cnt > 0)) { result = vos_dtx_attach(dth, true, dth->dth_ent != NULL ? true : false); if (unlikely(result < 0)) { @@ -1349,7 +1348,7 @@ dtx_leader_end(struct dtx_leader_handle *dlh, struct ds_cont_hdl *coh, int resul } } - if (dth->dth_prepared || dtx_batched_ult_max == 0) { + if ((dth->dth_prepared && !dlh->dlh_coll) || dtx_batched_ult_max == 0) { dth->dth_sync = 1; goto sync; } @@ -1363,14 +1362,12 @@ dtx_leader_end(struct dtx_leader_handle *dlh, struct ds_cont_hdl *coh, int resul if (DAOS_FAIL_CHECK(DAOS_DTX_MISS_COMMIT)) dth->dth_sync = 1; - /* For synchronous DTX, do not add it into CoS cache, otherwise, - * we may have no way to remove it from the cache. - */ if (dth->dth_sync) goto sync; D_ASSERT(dth->dth_mbs != NULL); +cache: if (dlh->dlh_coll) { rc = dtx_cos_add(cont, dlh->dlh_coll_entry, &dth->dth_leader_oid, dth->dth_dkey_hash, dth->dth_epoch, DCF_EXP_CMT | DCF_COLL); @@ -1378,38 +1375,47 @@ dtx_leader_end(struct dtx_leader_handle *dlh, struct ds_cont_hdl *coh, int resul size = sizeof(*dte) + sizeof(*mbs) + dth->dth_mbs->dm_data_size; D_ALLOC(dte, size); if (dte == NULL) { - dth->dth_sync = 1; - goto sync; - } - - mbs = (struct dtx_memberships *)(dte + 1); - memcpy(mbs, dth->dth_mbs, size - sizeof(*dte)); - - dte->dte_xid = dth->dth_xid; - dte->dte_ver = dth->dth_ver; - dte->dte_refs = 1; - dte->dte_mbs = mbs; + rc = -DER_NOMEM; + } else { + mbs = (struct dtx_memberships *)(dte + 1); + memcpy(mbs, dth->dth_mbs, size - sizeof(*dte)); + + dte->dte_xid = dth->dth_xid; + dte->dte_ver = dth->dth_ver; + dte->dte_refs = 1; + dte->dte_mbs = mbs; + + if (!(mbs->dm_flags & DMF_SRDG_REP)) + flags = DCF_EXP_CMT; + else if (dth->dth_modify_shared) + flags = DCF_SHARED; + else + flags = 0; - if (!(mbs->dm_flags & DMF_SRDG_REP)) - flags = DCF_EXP_CMT; - else if (dth->dth_modify_shared) - flags = DCF_SHARED; - else - flags = 0; + rc = dtx_cos_add(cont, dte, &dth->dth_leader_oid, dth->dth_dkey_hash, + dth->dth_epoch, flags); + dtx_entry_put(dte); + } + } - rc = dtx_cos_add(cont, dte, &dth->dth_leader_oid, dth->dth_dkey_hash, - dth->dth_epoch, flags); - dtx_entry_put(dte); + /* + * NOTE: If we failed to add the committable DTX into CoS cache, then we also have no way + * to commit (or abort) the DTX because of out of memory. Such DTX will be finally + * committed via next DTX resync (after recovered from OOM). + * + * Here, we only warning to notify the trouble, but not failed the transaction. + */ + if (rc != 0) { + D_WARN(DF_UUID": Fail to cache %s DTX "DF_DTI": "DF_RC"\n", + DP_UUID(cont->sc_uuid), dlh->dlh_coll ? "collective" : "regular", + DP_DTI(&dth->dth_xid), DP_RC(rc)); + D_GOTO(out, result = 0); } - if (rc == 0) { - if (!DAOS_FAIL_CHECK(DAOS_DTX_NO_COMMITTABLE)) { - vos_dtx_mark_committable(dth); - if (cont->sc_dtx_committable_count > DTX_THRESHOLD_COUNT || dlh->dlh_coll) - sched_req_wakeup(dss_get_module_info()->dmi_dtx_cmt_req); - } - } else { - dth->dth_sync = 1; + if (!DAOS_FAIL_CHECK(DAOS_DTX_NO_COMMITTABLE)) { + vos_dtx_mark_committable(dth); + if (cont->sc_dtx_committable_count > DTX_THRESHOLD_COUNT || dlh->dlh_coll) + sched_req_wakeup(dss_get_module_info()->dmi_dtx_cmt_req); } sync: @@ -1428,10 +1434,15 @@ dtx_leader_end(struct dtx_leader_handle *dlh, struct ds_cont_hdl *coh, int resul rc = dtx_commit(cont, &dte, NULL, 1, false); } - if (rc != 0) + if (rc != 0) { D_WARN(DF_UUID": Fail to sync %s commit DTX "DF_DTI": "DF_RC"\n", DP_UUID(cont->sc_uuid), dlh->dlh_coll ? "collective" : "regular", DP_DTI(&dth->dth_xid), DP_RC(rc)); + if (likely(dtx_batched_ult_max != 0)) { + dth->dth_sync = 0; + goto cache; + } + } /* * NOTE: The semantics of 'sync' commit does not guarantee that all @@ -1451,7 +1462,7 @@ dtx_leader_end(struct dtx_leader_handle *dlh, struct ds_cont_hdl *coh, int resul * to locally retry for avoiding related forwarded RPC timeout, instead, * The leader will trigger retry globally without abort 'prepared' ones. */ - if (unpin || (result < 0 && result != -DER_AGAIN && !dth->dth_solo)) { + if (result < 0 && result != -DER_AGAIN && !dth->dth_solo) { /* 1. Drop partial modification for distributed transaction. * 2. Remove the pinned DTX entry. */ diff --git a/src/dtx/dtx_rpc.c b/src/dtx/dtx_rpc.c index 2ccbfec2734..6d34e871269 100644 --- a/src/dtx/dtx_rpc.c +++ b/src/dtx/dtx_rpc.c @@ -1657,8 +1657,9 @@ dtx_coll_commit(struct ds_cont_child *cont, struct dtx_coll_entry *dce, struct d } D_CDEBUG(rc != 0 || rc1 != 0 || rc2 != 0, DLOG_ERR, DB_TRACE, - "Collectively commit DTX "DF_DTI": %d/%d/%d\n", - DP_DTI(&dce->dce_xid), rc, rc1, rc2); + "Collectively commit DTX "DF_DTI" in "DF_UUID"/"DF_UUID": %d/%d/%d\n", + DP_DTI(&dce->dce_xid), DP_UUID(cont->sc_pool_uuid), DP_UUID(cont->sc_uuid), + rc, rc1, rc2); return rc != 0 ? rc : rc1 != 0 ? rc1 : rc2; } @@ -1717,8 +1718,9 @@ dtx_coll_abort(struct ds_cont_child *cont, struct dtx_coll_entry *dce, daos_epoc rc2 = 0; D_CDEBUG(rc != 0 || rc1 != 0 || rc2 != 0, DLOG_ERR, DB_TRACE, - "Collectively abort DTX "DF_DTI": %d/%d/%d\n", - DP_DTI(&dce->dce_xid), rc, rc1, rc2); + "Collectively abort DTX "DF_DTI" with epoch "DF_X64" in " + DF_UUID"/"DF_UUID": %d/%d/%d\n", DP_DTI(&dce->dce_xid), epoch, + DP_UUID(cont->sc_pool_uuid), DP_UUID(cont->sc_uuid), rc, rc1, rc2); return rc != 0 ? rc : rc1 != 0 ? rc1 : rc2; } @@ -1766,8 +1768,9 @@ dtx_coll_check(struct ds_cont_child *cont, struct dtx_coll_entry *dce, daos_epoc } D_CDEBUG((rc < 0 && rc != -DER_NONEXIST) || (rc1 < 0 && rc1 != -DER_NONEXIST), DLOG_ERR, - DB_TRACE, "Collectively check DTX "DF_DTI": %d/%d/\n", - DP_DTI(&dce->dce_xid), rc, rc1); + DB_TRACE, "Collectively check DTX "DF_DTI" in "DF_UUID"/"DF_UUID": %d/%d/\n", + DP_DTI(&dce->dce_xid), DP_UUID(cont->sc_pool_uuid), DP_UUID(cont->sc_uuid), + rc, rc1); return dce->dce_ranks != NULL ? rc : rc1; } diff --git a/src/dtx/dtx_srv.c b/src/dtx/dtx_srv.c index 41480b6e8b0..0885b7f6cdc 100644 --- a/src/dtx/dtx_srv.c +++ b/src/dtx/dtx_srv.c @@ -474,8 +474,9 @@ dtx_coll_handler(crt_rpc_t *rpc) out: D_CDEBUG(rc < 0, DLOG_ERR, DB_TRACE, - "Handled collective DTX PRC %u on rank %u for "DF_DTI": "DF_RC"\n", - opc, myrank, DP_DTI(&dci->dci_xid), DP_RC(rc)); + "Handled collective DTX PRC %u on rank %u for "DF_DTI" in " + DF_UUID"/"DF_UUID": "DF_RC"\n", opc, myrank, DP_DTI(&dci->dci_xid), + DP_UUID(dci->dci_po_uuid), DP_UUID(dci->dci_co_uuid), DP_RC(rc)); dco->dco_status = rc; rc = crt_reply_send(rpc); diff --git a/src/include/daos_srv/container.h b/src/include/daos_srv/container.h index f99f4df14e3..9fc615c2a8b 100644 --- a/src/include/daos_srv/container.h +++ b/src/include/daos_srv/container.h @@ -108,6 +108,9 @@ struct ds_cont_child { uint32_t sc_dtx_committable_count; uint32_t sc_dtx_committable_coll_count; + /* Last timestamp when EC aggregation reports -DER_INPROGRESS. */ + uint64_t sc_ec_agg_busy_ts; + /* The global minimum EC aggregation epoch, which will be upper * limit for VOS aggregation, i.e. EC object VOS aggregation can * not cross this limit. For simplification purpose, all objects diff --git a/src/object/cli_coll.c b/src/object/cli_coll.c index 3285bec58b3..d517e3269d6 100644 --- a/src/object/cli_coll.c +++ b/src/object/cli_coll.c @@ -873,7 +873,7 @@ queue_coll_query_task(tse_task_t *api_task, struct obj_auxi_args *obj_auxi, stru 0, 0, ocdc); for (i = 0; i < ocdc->grp_nr; i++) { - obj_coll_disp_dest(ocdc, coa->coa_dcts, &tgt_ep); + obj_coll_disp_dest(ocdc, coa->coa_dcts, &tgt_ep, obj->cob_md.omd_id); tmp = coa->coa_dcts[ocdc->cur_pos].dct_shards[tgt_ep.ep_tag].dcs_idx; rc = queue_shard_query_key_task(api_task, obj_auxi, epoch, tmp, map_ver, diff --git a/src/object/cli_obj.c b/src/object/cli_obj.c index fcc4f7601f4..75d661d0665 100644 --- a/src/object/cli_obj.c +++ b/src/object/cli_obj.c @@ -1718,15 +1718,20 @@ dc_obj_retry_delay(tse_task_t *task, int err, uint16_t *retry_cnt, uint16_t *inp uint32_t timeout_sec) { uint32_t delay = 0; + uint32_t limit = 4; /* - * Randomly delay 5 - 68 us if it is not the first retry for + * Randomly delay 5 ~ 1028 us if it is not the first retry for * -DER_INPROGRESS || -DER_UPDATE_AGAIN cases. */ ++(*retry_cnt); if (err == -DER_INPROGRESS || err == -DER_UPDATE_AGAIN) { if (++(*inprogress_cnt) > 1) { - delay = (d_rand() & ((1 << 6) - 1)) + 5; + limit += *inprogress_cnt; + if (limit > 10) + limit = 10; + + delay = (d_rand() & ((1 << limit) - 1)) + 5; /* Rebuild is being established on the server side, wait a bit longer */ if (err == -DER_UPDATE_AGAIN) delay <<= 10; @@ -4856,11 +4861,14 @@ obj_comp_cb(tse_task_t *task, void *data) D_ASSERT(daos_handle_is_inval(obj_auxi->th)); D_ASSERT(obj_is_modification_opc(obj_auxi->opc)); - if (task->dt_result == -DER_TX_ID_REUSED && obj_auxi->retry_cnt != 0) - /* XXX: it is must because miss to set "RESEND" flag, that is bug. */ - D_ASSERTF(0, - "Miss 'RESEND' flag (%x) when resend the RPC for task %p: %u\n", - obj_auxi->flags, task, obj_auxi->retry_cnt); + if (task->dt_result == -DER_TX_ID_REUSED && obj_auxi->retry_cnt != 0) { + D_ERROR("TX ID maybe reused for unknown reason, " + "task %p, opc %u, flags %x, retry_cnt %u\n", + task, obj_auxi->opc, obj_auxi->flags, obj_auxi->retry_cnt); + task->dt_result = -DER_IO; + obj_auxi->io_retry = 0; + goto args_fini; + } if (obj_auxi->opc == DAOS_OBJ_RPC_UPDATE) { daos_obj_rw_t *api_args = dc_task_get_args(obj_auxi->obj_task); @@ -4886,6 +4894,7 @@ obj_comp_cb(tse_task_t *task, void *data) } } +args_fini: if (obj_auxi->opc == DAOS_OBJ_RPC_COLL_PUNCH) obj_coll_oper_args_fini(&obj_auxi->p_args.pa_coa); diff --git a/src/object/cli_shard.c b/src/object/cli_shard.c index 0c9dfc1418e..9f084140f80 100644 --- a/src/object/cli_shard.c +++ b/src/object/cli_shard.c @@ -1451,11 +1451,14 @@ obj_shard_coll_punch_cb(tse_task_t *task, void *data) shard_args->pa_auxi.obj_auxi->max_delay = timeout; } - DL_CDEBUG(task->dt_result < 0, DLOG_ERR, DB_IO, task->dt_result, - "DAOS_OBJ_RPC_COLL_PUNCH RPC %p for "DF_UOID" with DTX " - DF_DTI" for task %p, map_ver %u/%u, flags %lx/%x, %s layout", - rpc, DP_UOID(ocpi->ocpi_oid), DP_DTI(&ocpi->ocpi_xid), task, ocpi->ocpi_map_ver, - *cb_args->cpca_ver, (unsigned long)ocpi->ocpi_api_flags, ocpi->ocpi_flags, + DL_CDEBUG(task->dt_result < 0 && task->dt_result != -DER_INPROGRESS, + DLOG_ERR, DB_IO, task->dt_result, + "DAOS_OBJ_RPC_COLL_PUNCH RPC %p for "DF_UOID" in "DF_UUID"/"DF_UUID"/" + DF_UUID" with DTX "DF_DTI" for task %p, map_ver %u/%u, flags %lx/%x, %s layout", + rpc, DP_UOID(ocpi->ocpi_oid), DP_UUID(ocpi->ocpi_po_uuid), + DP_UUID(ocpi->ocpi_co_hdl), DP_UUID(ocpi->ocpi_co_uuid), DP_DTI(&ocpi->ocpi_xid), + task, ocpi->ocpi_map_ver, *cb_args->cpca_ver, + (unsigned long)ocpi->ocpi_api_flags, ocpi->ocpi_flags, cb_args->cpca_shard_args->pa_coa.coa_raw_sparse ? "sparse" : "continuous"); crt_req_decref(rpc); diff --git a/src/object/obj_internal.h b/src/object/obj_internal.h index 06cfdb5b195..c0df21dd009 100644 --- a/src/object/obj_internal.h +++ b/src/object/obj_internal.h @@ -1100,7 +1100,7 @@ int daos_obj_query_merge(struct obj_query_merge_args *oqma); void obj_coll_disp_init(uint32_t tgt_nr, uint32_t max_tgt_size, uint32_t inline_size, uint32_t start, uint32_t max_width, struct obj_coll_disp_cursor *ocdc); void obj_coll_disp_dest(struct obj_coll_disp_cursor *ocdc, struct daos_coll_target *tgts, - crt_endpoint_t *tgt_ep); + crt_endpoint_t *tgt_ep, daos_obj_id_t oid); void obj_coll_disp_move(struct obj_coll_disp_cursor *ocdc); int obj_utils_init(void); void obj_utils_fini(void); diff --git a/src/object/obj_utils.c b/src/object/obj_utils.c index 82d91c966ac..c01947a05a1 100644 --- a/src/object/obj_utils.c +++ b/src/object/obj_utils.c @@ -616,23 +616,22 @@ obj_coll_disp_init(uint32_t tgt_nr, uint32_t max_tgt_size, uint32_t inline_size, void obj_coll_disp_dest(struct obj_coll_disp_cursor *ocdc, struct daos_coll_target *tgts, - crt_endpoint_t *tgt_ep) + crt_endpoint_t *tgt_ep, daos_obj_id_t oid) { struct daos_coll_target *dct = &tgts[ocdc->cur_pos]; struct daos_coll_target tmp; - unsigned long rand = 0; uint32_t size; int pos; int i; if (ocdc->cur_step > 2) { - rand = d_rand(); /* - * Randomly choose an engine as the relay one for load balance. - * If the one corresponding to "pos" is former moved one, then - * use the "cur_pos" as the relay engine. + * Choose an engine (according to the given oid) as the relay one for load balance. + * If the one corresponding to "pos" is former moved one, then use the "cur_pos" as + * the relay engine. Then even if related RPC was resent without changing pool map, + * then the relay one will be the same as the original case. */ - pos = rand % (ocdc->tgt_nr - ocdc->cur_pos) + ocdc->cur_pos; + pos = oid.lo % (ocdc->tgt_nr - ocdc->cur_pos) + ocdc->cur_pos; if (pos > ocdc->cur_pos && tgts[pos].dct_rank > dct->dct_rank) { memcpy(&tmp, &tgts[pos], sizeof(tmp)); memcpy(&tgts[pos], dct, sizeof(tmp)); @@ -642,8 +641,8 @@ obj_coll_disp_dest(struct obj_coll_disp_cursor *ocdc, struct daos_coll_target *t size = dct->dct_bitmap_sz << 3; - /* Randomly choose a XS as the local leader on target engine for load balance. */ - for (i = 0, pos = (rand != 0 ? rand : d_rand()) % dct->dct_tgt_nr; i < size; i++) { + /* Choose a target as the local agent on the engine for load balance. */ + for (i = 0, pos = oid.lo % dct->dct_tgt_nr; i < size; i++) { if (isset(dct->dct_bitmap, i)) { pos -= dct->dct_shards[i].dcs_nr; if (pos < 0) diff --git a/src/object/srv_ec_aggregate.c b/src/object/srv_ec_aggregate.c index 71c630fa947..50b513d1612 100644 --- a/src/object/srv_ec_aggregate.c +++ b/src/object/srv_ec_aggregate.c @@ -2667,8 +2667,13 @@ cont_ec_aggregate_cb(struct ds_cont_child *cont, daos_epoch_range_t *epr, struct ec_agg_param *ec_agg_param = agg_param->ap_data; vos_iter_param_t iter_param = { 0 }; struct vos_iter_anchors anchors = { 0 }; + struct dtx_handle *dth = NULL; + struct dtx_share_peer *dsp; + struct dtx_id dti = { 0 }; + struct dtx_epoch epoch = { 0 }; + daos_unit_oid_t oid = { 0 }; + int blocks = 0; int rc = 0; - int blocks = 0; /* * Avoid calling into vos_aggregate() when aborting aggregation @@ -2715,8 +2720,32 @@ cont_ec_aggregate_cb(struct ds_cont_child *cont, daos_epoch_range_t *epr, agg_reset_entry(&ec_agg_param->ap_agg_entry, NULL, NULL); retry: + epoch.oe_value = epr->epr_hi; + rc = dtx_begin(cont->sc_hdl, &dti, &epoch, 0, cont->sc_pool->spc_map_version, &oid, + NULL, 0, 0, NULL, &dth); + if (rc != 0) + goto update_hae; + rc = vos_iterate(&iter_param, VOS_ITER_OBJ, true, &anchors, agg_iterate_pre_cb, - agg_iterate_post_cb, ec_agg_param, NULL); + agg_iterate_post_cb, ec_agg_param, dth); + if (rc == -DER_INPROGRESS && !d_list_empty(&dth->dth_share_tbd_list)) { + uint64_t now = daos_gettime_coarse(); + + /* Report warning per each 10 seconds to avoid log flood. */ + if (now - cont->sc_ec_agg_busy_ts > 10) { + while ((dsp = d_list_pop_entry(&dth->dth_share_tbd_list, + struct dtx_share_peer, dsp_link)) != NULL) { + D_WARN(DF_CONT ": EC aggregate hit non-committed DTX " DF_DTI "\n", + DP_CONT(cont->sc_pool->spc_uuid, cont->sc_uuid), + DP_DTI(&dsp->dsp_xid)); + dtx_dsp_free(dsp); + } + + cont->sc_ec_agg_busy_ts = now; + } + } + + dtx_end(dth, cont, rc); /* Post_cb may not being executed in some cases */ agg_clear_extents(&ec_agg_param->ap_agg_entry); diff --git a/src/object/srv_obj.c b/src/object/srv_obj.c index 041ea903c4f..4762e04b898 100644 --- a/src/object/srv_obj.c +++ b/src/object/srv_obj.c @@ -2953,8 +2953,11 @@ ds_obj_rw_handler(crt_rpc_t *rpc) d_tm_inc_counter(opm->opm_update_resent, 1); -again1: - e = 0; +again: + if (flags & ORF_RESEND) + e = orw->orw_epoch; + else + e = 0; rc = dtx_handle_resend(ioc.ioc_vos_coh, &orw->orw_dti, &e, &version); switch (rc) { @@ -2965,8 +2968,13 @@ ds_obj_rw_handler(crt_rpc_t *rpc) orw->orw_epoch = e; /* TODO: Also recover the epoch uncertainty. */ break; + case -DER_MISMATCH: + rc = vos_dtx_abort(ioc.ioc_vos_coh, &orw->orw_dti, e); + if (rc < 0 && rc != -DER_NONEXIST) + D_GOTO(out, rc); + /* Fall through */ case -DER_NONEXIST: - rc = 0; + flags = 0; break; default: D_GOTO(out, rc); @@ -2976,7 +2984,6 @@ ds_obj_rw_handler(crt_rpc_t *rpc) D_GOTO(out, rc); } -again2: /* For leader case, we need to find out the potential conflict * (or share the same non-committed object/dkey) DTX(s) in the * CoS (committable) cache, piggyback them via the dispdatched @@ -3021,7 +3028,7 @@ ds_obj_rw_handler(crt_rpc_t *rpc) exec_arg.rpc = rpc; exec_arg.ioc = &ioc; - exec_arg.flags = flags; + exec_arg.flags |= flags; exec_arg.start = orw->orw_start_shard; /* Execute the operation on all targets */ @@ -3036,28 +3043,25 @@ ds_obj_rw_handler(crt_rpc_t *rpc) case -DER_TX_RESTART: /* * If this is a standalone operation, we can restart the - * internal transaction right here. Otherwise, we have to defer - * the restart to the RPC client. + * internal transaction right here. Otherwise we have to + * defer the restart to the RPC sponsor. */ - if (opc == DAOS_OBJ_RPC_UPDATE) { - /* - * Only standalone updates use this RPC. Retry with - * newer epoch. - */ - orw->orw_epoch = d_hlc_get(); - orw->orw_flags &= ~ORF_RESEND; - flags = 0; - d_tm_inc_counter(opm->opm_update_restart, 1); - goto again2; - } + if (opc != DAOS_OBJ_RPC_UPDATE) + break; - break; + /* Only standalone updates use this RPC. Retry with newer epoch. */ + orw->orw_epoch = d_hlc_get(); + exec_arg.flags |= ORF_RESEND; + flags = ORF_RESEND; + d_tm_inc_counter(opm->opm_update_restart, 1); + goto again; case -DER_AGAIN: - orw->orw_flags |= ORF_RESEND; need_abort = true; + exec_arg.flags |= ORF_RESEND; + flags = ORF_RESEND; d_tm_inc_counter(opm->opm_update_retry, 1); ABT_thread_yield(); - goto again1; + goto again; default: break; } @@ -3875,8 +3879,11 @@ ds_obj_punch_handler(crt_rpc_t *rpc) if (opi->opi_flags & ORF_RESEND) { daos_epoch_t e; -again1: - e = 0; +again: + if (flags & ORF_RESEND) + e = opi->opi_epoch; + else + e = 0; rc = dtx_handle_resend(ioc.ioc_vos_coh, &opi->opi_dti, &e, &version); switch (rc) { @@ -3887,8 +3894,13 @@ ds_obj_punch_handler(crt_rpc_t *rpc) flags |= ORF_RESEND; /* TODO: Also recovery the epoch uncertainty. */ break; + case -DER_MISMATCH: + rc = vos_dtx_abort(ioc.ioc_vos_coh, &opi->opi_dti, e); + if (rc < 0 && rc != -DER_NONEXIST) + D_GOTO(out, rc); + /* Fall through */ case -DER_NONEXIST: - rc = 0; + flags = 0; break; default: D_GOTO(out, rc); @@ -3898,7 +3910,6 @@ ds_obj_punch_handler(crt_rpc_t *rpc) goto cleanup; } -again2: /* For leader case, we need to find out the potential conflict * (or share the same non-committed object/dkey) DTX(s) in the * CoS (committable) cache, piggyback them via the dispdatched @@ -3943,7 +3954,7 @@ ds_obj_punch_handler(crt_rpc_t *rpc) exec_arg.rpc = rpc; exec_arg.ioc = &ioc; - exec_arg.flags = flags; + exec_arg.flags |= flags; /* Execute the operation on all shards */ if (opi->opi_api_flags & DAOS_COND_PUNCH) @@ -3959,19 +3970,17 @@ ds_obj_punch_handler(crt_rpc_t *rpc) rc = dtx_leader_end(dlh, ioc.ioc_coh, rc); switch (rc) { case -DER_TX_RESTART: - /* - * Only standalone punches use this RPC. Retry with newer - * epoch. - */ + /* Only standalone punches use this RPC. Retry with newer epoch. */ opi->opi_epoch = d_hlc_get(); - opi->opi_flags &= ~ORF_RESEND; - flags = 0; - goto again2; + exec_arg.flags |= ORF_RESEND; + flags = ORF_RESEND; + goto again; case -DER_AGAIN: - opi->opi_flags |= ORF_RESEND; need_abort = true; + exec_arg.flags |= ORF_RESEND; + flags = ORF_RESEND; ABT_thread_yield(); - goto again1; + goto again; default: break; } @@ -5663,8 +5672,11 @@ ds_obj_coll_punch_handler(crt_rpc_t *rpc) if (ocpi->ocpi_flags & ORF_RESEND) { -again1: - tmp = 0; +again: + if (!(ocpi->ocpi_flags & ORF_LEADER) || (flags & ORF_RESEND)) + tmp = ocpi->ocpi_epoch; + else + tmp = 0; rc = dtx_handle_resend(ioc.ioc_vos_coh, &ocpi->ocpi_xid, &tmp, &version); switch (rc) { case -DER_ALREADY: @@ -5674,7 +5686,13 @@ ds_obj_coll_punch_handler(crt_rpc_t *rpc) flags |= ORF_RESEND; /* TODO: Also recovery the epoch uncertainty. */ break; + case -DER_MISMATCH: + rc = vos_dtx_abort(ioc.ioc_vos_coh, &ocpi->ocpi_xid, tmp); + if (rc < 0 && rc != -DER_NONEXIST) + D_GOTO(out, rc); + /* Fall through */ case -DER_NONEXIST: + flags = 0; break; default: D_GOTO(out, rc); @@ -5683,7 +5701,6 @@ ds_obj_coll_punch_handler(crt_rpc_t *rpc) dce->dce_ver = version; } -again2: epoch.oe_value = ocpi->ocpi_epoch; epoch.oe_first = epoch.oe_value; epoch.oe_flags = orf_to_dtx_epoch_flags(ocpi->ocpi_flags); @@ -5695,7 +5712,7 @@ ds_obj_coll_punch_handler(crt_rpc_t *rpc) exec_arg.rpc = rpc; exec_arg.ioc = &ioc; - exec_arg.flags = flags; + exec_arg.flags |= flags; exec_arg.coll_shards = dcts[0].dct_shards; exec_arg.coll_tgts = dcts; obj_coll_disp_init(dct_nr, ocpi->ocpi_max_tgt_sz, @@ -5728,14 +5745,15 @@ ds_obj_coll_punch_handler(crt_rpc_t *rpc) switch (rc) { case -DER_TX_RESTART: ocpi->ocpi_epoch = d_hlc_get(); - ocpi->ocpi_flags &= ~ORF_RESEND; - flags = 0; - goto again2; + exec_arg.flags |= ORF_RESEND; + flags = ORF_RESEND; + goto again; case -DER_AGAIN: - ocpi->ocpi_flags |= ORF_RESEND; need_abort = true; + exec_arg.flags |= ORF_RESEND; + flags = ORF_RESEND; ABT_thread_yield(); - goto again1; + goto again; default: break; } @@ -5755,12 +5773,14 @@ ds_obj_coll_punch_handler(crt_rpc_t *rpc) max_ver = version; DL_CDEBUG(rc != 0 && rc != -DER_INPROGRESS && rc != -DER_TX_RESTART, DLOG_ERR, DB_IO, rc, - "(%s) handled collective punch RPC %p for obj "DF_UOID" on XS %u/%u epc " - DF_X64" pmv %u/%u, with dti "DF_DTI", bulk_tgt_sz %u, bulk_tgt_nr %u, " - "tgt_nr %u, forward width %u, forward depth %u, flags %x", + "(%s) handled collective punch RPC %p for obj "DF_UOID" on XS %u/%u in "DF_UUID"/" + DF_UUID"/"DF_UUID" with epc "DF_X64", pmv %u/%u, dti "DF_DTI", bulk_tgt_sz %u, " + "bulk_tgt_nr %u, tgt_nr %u, forward width %u, forward depth %u, flags %x", (ocpi->ocpi_flags & ORF_LEADER) ? "leader" : (ocpi->ocpi_tgts.ca_count == 1 ? "non-leader" : "relay-engine"), rpc, - DP_UOID(ocpi->ocpi_oid), dmi->dmi_xs_id, dmi->dmi_tgt_id, ocpi->ocpi_epoch, + DP_UOID(ocpi->ocpi_oid), dmi->dmi_xs_id, dmi->dmi_tgt_id, + DP_UUID(ocpi->ocpi_po_uuid), DP_UUID(ocpi->ocpi_co_hdl), + DP_UUID(ocpi->ocpi_co_uuid), ocpi->ocpi_epoch, ocpi->ocpi_map_ver, max_ver, DP_DTI(&ocpi->ocpi_xid), ocpi->ocpi_bulk_tgt_sz, ocpi->ocpi_bulk_tgt_nr, (unsigned int)ocpi->ocpi_tgts.ca_count, ocpi->ocpi_disp_width, ocpi->ocpi_disp_depth, ocpi->ocpi_flags); diff --git a/src/object/srv_obj_remote.c b/src/object/srv_obj_remote.c index ce06723621b..f64d851e5b4 100644 --- a/src/object/srv_obj_remote.c +++ b/src/object/srv_obj_remote.c @@ -136,7 +136,7 @@ ds_obj_remote_update(struct dtx_leader_handle *dlh, void *data, int idx, *orw = *orw_parent; orw->orw_oid.id_shard = shard_tgt->st_shard_id; - orw->orw_flags |= ORF_BULK_BIND | obj_exec_arg->flags; + orw->orw_flags |= (ORF_BULK_BIND | obj_exec_arg->flags) & ~ORF_LEADER; if (shard_tgt->st_flags & DTF_DELAY_FORWARD && dlh->dlh_drop_cond) orw->orw_api_flags &= ~DAOS_COND_MASK; orw->orw_dti_cos.ca_count = dth->dth_dti_cos_count; @@ -247,7 +247,7 @@ ds_obj_remote_punch(struct dtx_leader_handle *dlh, void *data, int idx, *opi = *opi_parent; opi->opi_oid.id_shard = shard_tgt->st_shard_id; - opi->opi_flags |= obj_exec_arg->flags; + opi->opi_flags |= obj_exec_arg->flags & ~ORF_LEADER; if (shard_tgt->st_flags & DTF_DELAY_FORWARD && dlh->dlh_drop_cond) opi->opi_api_flags &= ~DAOS_COND_PUNCH; opi->opi_dti_cos.ca_count = dth->dth_dti_cos_count; @@ -495,7 +495,7 @@ ds_obj_coll_punch_remote(struct dtx_leader_handle *dlh, void *data, int idx, crt_endpoint_t tgt_ep = { 0 }; crt_rpc_t *parent_req = exec_arg->rpc; crt_rpc_t *req; - struct obj_coll_punch_in *ocpi_parent; + struct obj_coll_punch_in *ocpi_parent = crt_req_get(parent_req); struct obj_coll_punch_in *ocpi; int tag; int rc = 0; @@ -509,7 +509,7 @@ ds_obj_coll_punch_remote(struct dtx_leader_handle *dlh, void *data, int idx, if (remote_arg == NULL) D_GOTO(out, rc = -DER_NOMEM); - obj_coll_disp_dest(cursor, exec_arg->coll_tgts, &tgt_ep); + obj_coll_disp_dest(cursor, exec_arg->coll_tgts, &tgt_ep, ocpi_parent->ocpi_oid.id_pub); tag = tgt_ep.ep_tag; crt_req_addref(parent_req); @@ -524,9 +524,7 @@ ds_obj_coll_punch_remote(struct dtx_leader_handle *dlh, void *data, int idx, D_GOTO(out, rc); } - ocpi_parent = crt_req_get(parent_req); ocpi = crt_req_get(req); - ocpi->ocpi_odm = ocpi_parent->ocpi_odm; uuid_copy(ocpi->ocpi_po_uuid, ocpi_parent->ocpi_po_uuid); uuid_copy(ocpi->ocpi_co_hdl, ocpi_parent->ocpi_co_hdl); @@ -634,7 +632,7 @@ ds_obj_coll_query_remote(struct dtx_leader_handle *dlh, void *data, int idx, if (remote_arg == NULL) D_GOTO(out, rc = -DER_NOMEM); - obj_coll_disp_dest(cursor, exec_arg->coll_tgts, &tgt_ep); + obj_coll_disp_dest(cursor, exec_arg->coll_tgts, &tgt_ep, ocqi_parent->ocqi_oid.id_pub); tag = tgt_ep.ep_tag; remote_arg->dlh = dlh; diff --git a/src/vos/vos_dtx.c b/src/vos/vos_dtx.c index 0a325088a77..3000e1fa573 100644 --- a/src/vos/vos_dtx.c +++ b/src/vos/vos_dtx.c @@ -311,16 +311,38 @@ dtx_act_ent_update(struct btr_instance *tins, struct btr_record *rec, if (unlikely(!dae_old->dae_aborted)) { /* - * XXX: There are two possible reasons for that: - * - * 1. Client resent the RPC but without set 'RESEND' flag. - * 2. Client reused the DTX ID for different modifications. - * - * Currently, the 1st case is more suspected. + * If the new entry and the old entry are for the same transaction, then the RPC + * for the new one will take 'RESEND' flag, that will cause the old one has been + * aborted before arriving at here. So it is quite possible that the new one and + * the old one are for different transactions. */ - D_ERROR("The TX ID "DF_DTI" may be reused for epoch "DF_X64" vs "DF_X64"\n", - DP_DTI(&DAE_XID(dae_old)), DAE_EPOCH(dae_old), DAE_EPOCH(dae_new)); - return -DER_TX_ID_REUSED; + if (DAE_EPOCH(dae_old) < DAE_EPOCH(dae_new)) { + D_ERROR("The TX ID "DF_DTI" may be reused for epoch "DF_X64" vs "DF_X64"\n", + DP_DTI(&DAE_XID(dae_old)), DAE_EPOCH(dae_old), DAE_EPOCH(dae_new)); + return -DER_TX_ID_REUSED; + } + + /* + * If the old entry has higher epoch, it is quite possible that the resent RPC + * was handled before the original RPC (corresponding to 'dae_new'). Returning + * -DER_INPROGRESS to make the RPC sponsor to retry the RPC with 'RESEND' flag, + * then related RPC handler logic will handle such case. + */ + if (DAE_EPOCH(dae_old) > DAE_EPOCH(dae_new)) { + D_ERROR("Resent RPC may be handled before original one for DTX "DF_DTI + " with epoch "DF_X64" vs "DF_X64"\n", + DP_DTI(&DAE_XID(dae_old)), DAE_EPOCH(dae_old), DAE_EPOCH(dae_new)); + return -DER_INPROGRESS; + } + + /* + * The two entries uses the same epoch, then it may be caused by repeated RPCs + * from different sources, such as multiple relay engines forward the same RPC + * to current target. We need to notify related caller for such buggy case. + */ + D_ERROR("Receive repeated DTX "DF_DTI" with epoch "DF_X64"\n", + DP_DTI(&DAE_XID(dae_old)), DAE_EPOCH(dae_old)); + return -DER_MISC; } rec->rec_off = umem_ptr2off(&tins->ti_umm, dae_new); @@ -1171,16 +1193,20 @@ vos_dtx_check_availability(daos_handle_t coh, uint32_t entry, } if (intent == DAOS_INTENT_PURGE) { - uint32_t age = d_hlc_age2sec(DAE_XID(dae).dti_hlc); + uint64_t now = daos_gettime_coarse(); /* * The DTX entry still references related data record, * then we cannot (vos) aggregate related data record. + * Report warning per each 10 seconds to avoid log flood. */ - if (age >= DAOS_AGG_THRESHOLD) - D_WARN("DTX "DF_DTI" (state:%u, age:%u) still references the data, " - "cannot be (VOS) aggregated\n", - DP_DTI(&DAE_XID(dae)), vos_dtx_status(dae), age); + if (now - cont->vc_agg_busy_ts > 10) { + D_WARN("DTX "DF_DTI" (state:%u, flags:%x, age:%u) still references " + "the modification, cannot be (VOS) aggregated\n", + DP_DTI(&DAE_XID(dae)), vos_dtx_status(dae), DAE_FLAGS(dae), + (unsigned int)d_hlc_age2sec(DAE_XID(dae).dti_hlc)); + cont->vc_agg_busy_ts = now; + } return ALB_AVAILABLE_DIRTY; } @@ -1908,8 +1934,13 @@ vos_dtx_check(daos_handle_t coh, struct dtx_id *dti, daos_epoch_t *epoch, daos_epoch_t e = *epoch; *epoch = DAE_EPOCH(dae); - if (e != 0 && e != DAE_EPOCH(dae)) - return -DER_MISMATCH; + if (e != 0) { + if (e > DAE_EPOCH(dae)) + return -DER_MISMATCH; + + if (e < DAE_EPOCH(dae)) + return -DER_TX_RESTART; + } } return vos_dae_is_prepare(dae) ? DTX_ST_PREPARED : DTX_ST_INITED; diff --git a/src/vos/vos_internal.h b/src/vos/vos_internal.h index 9441ba45265..7a8428a3d79 100644 --- a/src/vos/vos_internal.h +++ b/src/vos/vos_internal.h @@ -353,6 +353,8 @@ struct vos_container { daos_epoch_range_t vc_epr_aggregation; /* Current ongoing discard EPR */ daos_epoch_range_t vc_epr_discard; + /* Last timestamp when VOS aggregation reports -DER_TX_BUSY */ + uint64_t vc_agg_busy_ts; /* Last timestamp when VOS aggregation reporting ENOSPACE */ uint64_t vc_agg_nospc_ts; /* Last timestamp when IO reporting ENOSPACE */