diff --git a/src/dtx/dtx_coll.c b/src/dtx/dtx_coll.c index 7e7c8199155..ba45aaa2616 100644 --- a/src/dtx/dtx_coll.c +++ b/src/dtx/dtx_coll.c @@ -81,9 +81,11 @@ dtx_coll_prep_ult(void *arg) } if (dcpa->dcpa_result != 0) { - if (dcpa->dcpa_result != -DER_INPROGRESS && dcpa->dcpa_result != -DER_NONEXIST) - D_ERROR("Failed to load mbs for "DF_DTI", opc %u: "DF_RC"\n", - DP_DTI(&dci->dci_xid), opc, DP_RC(rc)); + if (dcpa->dcpa_result < 0 && + dcpa->dcpa_result != -DER_INPROGRESS && dcpa->dcpa_result != -DER_NONEXIST) + D_ERROR("Failed to load mbs for "DF_DTI" in "DF_UUID"/"DF_UUID", opc %u: " + DF_RC"\n", DP_DTI(&dci->dci_xid), DP_UUID(dci->dci_po_uuid), + DP_UUID(dci->dci_co_uuid), opc, DP_RC(dcpa->dcpa_result)); goto out; } diff --git a/src/dtx/dtx_common.c b/src/dtx/dtx_common.c index ecb156729ed..1ee74ae11a4 100644 --- a/src/dtx/dtx_common.c +++ b/src/dtx/dtx_common.c @@ -1271,7 +1271,6 @@ dtx_leader_end(struct dtx_leader_handle *dlh, struct ds_cont_hdl *coh, int resul int status = -1; int rc = 0; bool aborted = false; - bool unpin = false; D_ASSERT(cont != NULL); @@ -1339,7 +1338,7 @@ dtx_leader_end(struct dtx_leader_handle *dlh, struct ds_cont_hdl *coh, int resul * it persistently. Otherwise, the subsequent DTX resync may not find it as * to regard it as failed transaction and abort it. */ - if (result == 0 && !dth->dth_active && !dth->dth_prepared && !dth->dth_solo && + if (!dth->dth_active && !dth->dth_prepared && (dth->dth_dist || dth->dth_modification_cnt > 0)) { result = vos_dtx_attach(dth, true, dth->dth_ent != NULL ? true : false); if (unlikely(result < 0)) { @@ -1349,7 +1348,7 @@ dtx_leader_end(struct dtx_leader_handle *dlh, struct ds_cont_hdl *coh, int resul } } - if (dth->dth_prepared || dtx_batched_ult_max == 0) { + if ((dth->dth_prepared && !dlh->dlh_coll) || dtx_batched_ult_max == 0) { dth->dth_sync = 1; goto sync; } @@ -1363,14 +1362,12 @@ dtx_leader_end(struct dtx_leader_handle *dlh, struct ds_cont_hdl *coh, int resul if (DAOS_FAIL_CHECK(DAOS_DTX_MISS_COMMIT)) dth->dth_sync = 1; - /* For synchronous DTX, do not add it into CoS cache, otherwise, - * we may have no way to remove it from the cache. - */ if (dth->dth_sync) goto sync; D_ASSERT(dth->dth_mbs != NULL); +cache: if (dlh->dlh_coll) { rc = dtx_cos_add(cont, dlh->dlh_coll_entry, &dth->dth_leader_oid, dth->dth_dkey_hash, dth->dth_epoch, DCF_EXP_CMT | DCF_COLL); @@ -1378,38 +1375,47 @@ dtx_leader_end(struct dtx_leader_handle *dlh, struct ds_cont_hdl *coh, int resul size = sizeof(*dte) + sizeof(*mbs) + dth->dth_mbs->dm_data_size; D_ALLOC(dte, size); if (dte == NULL) { - dth->dth_sync = 1; - goto sync; - } - - mbs = (struct dtx_memberships *)(dte + 1); - memcpy(mbs, dth->dth_mbs, size - sizeof(*dte)); - - dte->dte_xid = dth->dth_xid; - dte->dte_ver = dth->dth_ver; - dte->dte_refs = 1; - dte->dte_mbs = mbs; + rc = -DER_NOMEM; + } else { + mbs = (struct dtx_memberships *)(dte + 1); + memcpy(mbs, dth->dth_mbs, size - sizeof(*dte)); + + dte->dte_xid = dth->dth_xid; + dte->dte_ver = dth->dth_ver; + dte->dte_refs = 1; + dte->dte_mbs = mbs; + + if (!(mbs->dm_flags & DMF_SRDG_REP)) + flags = DCF_EXP_CMT; + else if (dth->dth_modify_shared) + flags = DCF_SHARED; + else + flags = 0; - if (!(mbs->dm_flags & DMF_SRDG_REP)) - flags = DCF_EXP_CMT; - else if (dth->dth_modify_shared) - flags = DCF_SHARED; - else - flags = 0; + rc = dtx_cos_add(cont, dte, &dth->dth_leader_oid, dth->dth_dkey_hash, + dth->dth_epoch, flags); + dtx_entry_put(dte); + } + } - rc = dtx_cos_add(cont, dte, &dth->dth_leader_oid, dth->dth_dkey_hash, - dth->dth_epoch, flags); - dtx_entry_put(dte); + /* + * NOTE: If we failed to add the committable DTX into CoS cache, then we also have no way + * to commit (or abort) the DTX because of out of memory. Such DTX will be finally + * committed via next DTX resync (after recovered from OOM). + * + * Here, we only warning to notify the trouble, but not failed the transaction. + */ + if (rc != 0) { + D_WARN(DF_UUID": Fail to cache %s DTX "DF_DTI": "DF_RC"\n", + DP_UUID(cont->sc_uuid), dlh->dlh_coll ? "collective" : "regular", + DP_DTI(&dth->dth_xid), DP_RC(rc)); + D_GOTO(out, result = 0); } - if (rc == 0) { - if (!DAOS_FAIL_CHECK(DAOS_DTX_NO_COMMITTABLE)) { - vos_dtx_mark_committable(dth); - if (cont->sc_dtx_committable_count > DTX_THRESHOLD_COUNT || dlh->dlh_coll) - sched_req_wakeup(dss_get_module_info()->dmi_dtx_cmt_req); - } - } else { - dth->dth_sync = 1; + if (!DAOS_FAIL_CHECK(DAOS_DTX_NO_COMMITTABLE)) { + vos_dtx_mark_committable(dth); + if (cont->sc_dtx_committable_count > DTX_THRESHOLD_COUNT || dlh->dlh_coll) + sched_req_wakeup(dss_get_module_info()->dmi_dtx_cmt_req); } sync: @@ -1428,10 +1434,15 @@ dtx_leader_end(struct dtx_leader_handle *dlh, struct ds_cont_hdl *coh, int resul rc = dtx_commit(cont, &dte, NULL, 1, false); } - if (rc != 0) + if (rc != 0) { D_WARN(DF_UUID": Fail to sync %s commit DTX "DF_DTI": "DF_RC"\n", DP_UUID(cont->sc_uuid), dlh->dlh_coll ? "collective" : "regular", DP_DTI(&dth->dth_xid), DP_RC(rc)); + if (likely(dtx_batched_ult_max != 0)) { + dth->dth_sync = 0; + goto cache; + } + } /* * NOTE: The semantics of 'sync' commit does not guarantee that all @@ -1451,7 +1462,7 @@ dtx_leader_end(struct dtx_leader_handle *dlh, struct ds_cont_hdl *coh, int resul * to locally retry for avoiding related forwarded RPC timeout, instead, * The leader will trigger retry globally without abort 'prepared' ones. */ - if (unpin || (result < 0 && result != -DER_AGAIN && !dth->dth_solo)) { + if (result < 0 && result != -DER_AGAIN && !dth->dth_solo) { /* 1. Drop partial modification for distributed transaction. * 2. Remove the pinned DTX entry. */ diff --git a/src/dtx/dtx_rpc.c b/src/dtx/dtx_rpc.c index 2ccbfec2734..6d34e871269 100644 --- a/src/dtx/dtx_rpc.c +++ b/src/dtx/dtx_rpc.c @@ -1657,8 +1657,9 @@ dtx_coll_commit(struct ds_cont_child *cont, struct dtx_coll_entry *dce, struct d } D_CDEBUG(rc != 0 || rc1 != 0 || rc2 != 0, DLOG_ERR, DB_TRACE, - "Collectively commit DTX "DF_DTI": %d/%d/%d\n", - DP_DTI(&dce->dce_xid), rc, rc1, rc2); + "Collectively commit DTX "DF_DTI" in "DF_UUID"/"DF_UUID": %d/%d/%d\n", + DP_DTI(&dce->dce_xid), DP_UUID(cont->sc_pool_uuid), DP_UUID(cont->sc_uuid), + rc, rc1, rc2); return rc != 0 ? rc : rc1 != 0 ? rc1 : rc2; } @@ -1717,8 +1718,9 @@ dtx_coll_abort(struct ds_cont_child *cont, struct dtx_coll_entry *dce, daos_epoc rc2 = 0; D_CDEBUG(rc != 0 || rc1 != 0 || rc2 != 0, DLOG_ERR, DB_TRACE, - "Collectively abort DTX "DF_DTI": %d/%d/%d\n", - DP_DTI(&dce->dce_xid), rc, rc1, rc2); + "Collectively abort DTX "DF_DTI" with epoch "DF_X64" in " + DF_UUID"/"DF_UUID": %d/%d/%d\n", DP_DTI(&dce->dce_xid), epoch, + DP_UUID(cont->sc_pool_uuid), DP_UUID(cont->sc_uuid), rc, rc1, rc2); return rc != 0 ? rc : rc1 != 0 ? rc1 : rc2; } @@ -1766,8 +1768,9 @@ dtx_coll_check(struct ds_cont_child *cont, struct dtx_coll_entry *dce, daos_epoc } D_CDEBUG((rc < 0 && rc != -DER_NONEXIST) || (rc1 < 0 && rc1 != -DER_NONEXIST), DLOG_ERR, - DB_TRACE, "Collectively check DTX "DF_DTI": %d/%d/\n", - DP_DTI(&dce->dce_xid), rc, rc1); + DB_TRACE, "Collectively check DTX "DF_DTI" in "DF_UUID"/"DF_UUID": %d/%d/\n", + DP_DTI(&dce->dce_xid), DP_UUID(cont->sc_pool_uuid), DP_UUID(cont->sc_uuid), + rc, rc1); return dce->dce_ranks != NULL ? rc : rc1; } diff --git a/src/dtx/dtx_srv.c b/src/dtx/dtx_srv.c index 0fd4e7c513f..084f804ec81 100644 --- a/src/dtx/dtx_srv.c +++ b/src/dtx/dtx_srv.c @@ -474,8 +474,9 @@ dtx_coll_handler(crt_rpc_t *rpc) out: D_CDEBUG(rc < 0, DLOG_ERR, DB_TRACE, - "Handled collective DTX PRC %u on rank %u for "DF_DTI": "DF_RC"\n", - opc, myrank, DP_DTI(&dci->dci_xid), DP_RC(rc)); + "Handled collective DTX PRC %u on rank %u for "DF_DTI" in " + DF_UUID"/"DF_UUID": "DF_RC"\n", opc, myrank, DP_DTI(&dci->dci_xid), + DP_UUID(dci->dci_po_uuid), DP_UUID(dci->dci_co_uuid), DP_RC(rc)); dco->dco_status = rc; rc = crt_reply_send(rpc); diff --git a/src/include/daos_srv/container.h b/src/include/daos_srv/container.h index f99f4df14e3..9fc615c2a8b 100644 --- a/src/include/daos_srv/container.h +++ b/src/include/daos_srv/container.h @@ -108,6 +108,9 @@ struct ds_cont_child { uint32_t sc_dtx_committable_count; uint32_t sc_dtx_committable_coll_count; + /* Last timestamp when EC aggregation reports -DER_INPROGRESS. */ + uint64_t sc_ec_agg_busy_ts; + /* The global minimum EC aggregation epoch, which will be upper * limit for VOS aggregation, i.e. EC object VOS aggregation can * not cross this limit. For simplification purpose, all objects diff --git a/src/object/cli_coll.c b/src/object/cli_coll.c index 3285bec58b3..d517e3269d6 100644 --- a/src/object/cli_coll.c +++ b/src/object/cli_coll.c @@ -873,7 +873,7 @@ queue_coll_query_task(tse_task_t *api_task, struct obj_auxi_args *obj_auxi, stru 0, 0, ocdc); for (i = 0; i < ocdc->grp_nr; i++) { - obj_coll_disp_dest(ocdc, coa->coa_dcts, &tgt_ep); + obj_coll_disp_dest(ocdc, coa->coa_dcts, &tgt_ep, obj->cob_md.omd_id); tmp = coa->coa_dcts[ocdc->cur_pos].dct_shards[tgt_ep.ep_tag].dcs_idx; rc = queue_shard_query_key_task(api_task, obj_auxi, epoch, tmp, map_ver, diff --git a/src/object/cli_obj.c b/src/object/cli_obj.c index 318ba0cb51e..46161bf7a2d 100644 --- a/src/object/cli_obj.c +++ b/src/object/cli_obj.c @@ -1718,15 +1718,20 @@ dc_obj_retry_delay(tse_task_t *task, int err, uint16_t *retry_cnt, uint16_t *inp uint32_t timeout_sec) { uint32_t delay = 0; + uint32_t limit = 4; /* - * Randomly delay 5 - 68 us if it is not the first retry for + * Randomly delay 5 ~ 1028 us if it is not the first retry for * -DER_INPROGRESS || -DER_UPDATE_AGAIN cases. */ ++(*retry_cnt); if (err == -DER_INPROGRESS || err == -DER_UPDATE_AGAIN) { if (++(*inprogress_cnt) > 1) { - delay = (d_rand() & ((1 << 6) - 1)) + 5; + limit += *inprogress_cnt; + if (limit > 10) + limit = 10; + + delay = (d_rand() & ((1 << limit) - 1)) + 5; /* Rebuild is being established on the server side, wait a bit longer */ if (err == -DER_UPDATE_AGAIN) delay <<= 10; @@ -4856,11 +4861,14 @@ obj_comp_cb(tse_task_t *task, void *data) D_ASSERT(daos_handle_is_inval(obj_auxi->th)); D_ASSERT(obj_is_modification_opc(obj_auxi->opc)); - if (task->dt_result == -DER_TX_ID_REUSED && obj_auxi->retry_cnt != 0) - /* XXX: it is must because miss to set "RESEND" flag, that is bug. */ - D_ASSERTF(0, - "Miss 'RESEND' flag (%x) when resend the RPC for task %p: %u\n", - obj_auxi->flags, task, obj_auxi->retry_cnt); + if (task->dt_result == -DER_TX_ID_REUSED && obj_auxi->retry_cnt != 0) { + D_ERROR("TX ID maybe reused for unknown reason, " + "task %p, opc %u, flags %x, retry_cnt %u\n", + task, obj_auxi->opc, obj_auxi->flags, obj_auxi->retry_cnt); + task->dt_result = -DER_IO; + obj_auxi->io_retry = 0; + goto args_fini; + } if (obj_auxi->opc == DAOS_OBJ_RPC_UPDATE) { daos_obj_rw_t *api_args = dc_task_get_args(obj_auxi->obj_task); @@ -4886,6 +4894,7 @@ obj_comp_cb(tse_task_t *task, void *data) } } +args_fini: if (obj_auxi->opc == DAOS_OBJ_RPC_COLL_PUNCH) obj_coll_oper_args_fini(&obj_auxi->p_args.pa_coa); diff --git a/src/object/cli_shard.c b/src/object/cli_shard.c index 0c9dfc1418e..9f084140f80 100644 --- a/src/object/cli_shard.c +++ b/src/object/cli_shard.c @@ -1451,11 +1451,14 @@ obj_shard_coll_punch_cb(tse_task_t *task, void *data) shard_args->pa_auxi.obj_auxi->max_delay = timeout; } - DL_CDEBUG(task->dt_result < 0, DLOG_ERR, DB_IO, task->dt_result, - "DAOS_OBJ_RPC_COLL_PUNCH RPC %p for "DF_UOID" with DTX " - DF_DTI" for task %p, map_ver %u/%u, flags %lx/%x, %s layout", - rpc, DP_UOID(ocpi->ocpi_oid), DP_DTI(&ocpi->ocpi_xid), task, ocpi->ocpi_map_ver, - *cb_args->cpca_ver, (unsigned long)ocpi->ocpi_api_flags, ocpi->ocpi_flags, + DL_CDEBUG(task->dt_result < 0 && task->dt_result != -DER_INPROGRESS, + DLOG_ERR, DB_IO, task->dt_result, + "DAOS_OBJ_RPC_COLL_PUNCH RPC %p for "DF_UOID" in "DF_UUID"/"DF_UUID"/" + DF_UUID" with DTX "DF_DTI" for task %p, map_ver %u/%u, flags %lx/%x, %s layout", + rpc, DP_UOID(ocpi->ocpi_oid), DP_UUID(ocpi->ocpi_po_uuid), + DP_UUID(ocpi->ocpi_co_hdl), DP_UUID(ocpi->ocpi_co_uuid), DP_DTI(&ocpi->ocpi_xid), + task, ocpi->ocpi_map_ver, *cb_args->cpca_ver, + (unsigned long)ocpi->ocpi_api_flags, ocpi->ocpi_flags, cb_args->cpca_shard_args->pa_coa.coa_raw_sparse ? "sparse" : "continuous"); crt_req_decref(rpc); diff --git a/src/object/obj_internal.h b/src/object/obj_internal.h index 06cfdb5b195..c0df21dd009 100644 --- a/src/object/obj_internal.h +++ b/src/object/obj_internal.h @@ -1100,7 +1100,7 @@ int daos_obj_query_merge(struct obj_query_merge_args *oqma); void obj_coll_disp_init(uint32_t tgt_nr, uint32_t max_tgt_size, uint32_t inline_size, uint32_t start, uint32_t max_width, struct obj_coll_disp_cursor *ocdc); void obj_coll_disp_dest(struct obj_coll_disp_cursor *ocdc, struct daos_coll_target *tgts, - crt_endpoint_t *tgt_ep); + crt_endpoint_t *tgt_ep, daos_obj_id_t oid); void obj_coll_disp_move(struct obj_coll_disp_cursor *ocdc); int obj_utils_init(void); void obj_utils_fini(void); diff --git a/src/object/obj_utils.c b/src/object/obj_utils.c index 82d91c966ac..c01947a05a1 100644 --- a/src/object/obj_utils.c +++ b/src/object/obj_utils.c @@ -616,23 +616,22 @@ obj_coll_disp_init(uint32_t tgt_nr, uint32_t max_tgt_size, uint32_t inline_size, void obj_coll_disp_dest(struct obj_coll_disp_cursor *ocdc, struct daos_coll_target *tgts, - crt_endpoint_t *tgt_ep) + crt_endpoint_t *tgt_ep, daos_obj_id_t oid) { struct daos_coll_target *dct = &tgts[ocdc->cur_pos]; struct daos_coll_target tmp; - unsigned long rand = 0; uint32_t size; int pos; int i; if (ocdc->cur_step > 2) { - rand = d_rand(); /* - * Randomly choose an engine as the relay one for load balance. - * If the one corresponding to "pos" is former moved one, then - * use the "cur_pos" as the relay engine. + * Choose an engine (according to the given oid) as the relay one for load balance. + * If the one corresponding to "pos" is former moved one, then use the "cur_pos" as + * the relay engine. Then even if related RPC was resent without changing pool map, + * then the relay one will be the same as the original case. */ - pos = rand % (ocdc->tgt_nr - ocdc->cur_pos) + ocdc->cur_pos; + pos = oid.lo % (ocdc->tgt_nr - ocdc->cur_pos) + ocdc->cur_pos; if (pos > ocdc->cur_pos && tgts[pos].dct_rank > dct->dct_rank) { memcpy(&tmp, &tgts[pos], sizeof(tmp)); memcpy(&tgts[pos], dct, sizeof(tmp)); @@ -642,8 +641,8 @@ obj_coll_disp_dest(struct obj_coll_disp_cursor *ocdc, struct daos_coll_target *t size = dct->dct_bitmap_sz << 3; - /* Randomly choose a XS as the local leader on target engine for load balance. */ - for (i = 0, pos = (rand != 0 ? rand : d_rand()) % dct->dct_tgt_nr; i < size; i++) { + /* Choose a target as the local agent on the engine for load balance. */ + for (i = 0, pos = oid.lo % dct->dct_tgt_nr; i < size; i++) { if (isset(dct->dct_bitmap, i)) { pos -= dct->dct_shards[i].dcs_nr; if (pos < 0) diff --git a/src/object/srv_ec_aggregate.c b/src/object/srv_ec_aggregate.c index 71c630fa947..50b513d1612 100644 --- a/src/object/srv_ec_aggregate.c +++ b/src/object/srv_ec_aggregate.c @@ -2667,8 +2667,13 @@ cont_ec_aggregate_cb(struct ds_cont_child *cont, daos_epoch_range_t *epr, struct ec_agg_param *ec_agg_param = agg_param->ap_data; vos_iter_param_t iter_param = { 0 }; struct vos_iter_anchors anchors = { 0 }; + struct dtx_handle *dth = NULL; + struct dtx_share_peer *dsp; + struct dtx_id dti = { 0 }; + struct dtx_epoch epoch = { 0 }; + daos_unit_oid_t oid = { 0 }; + int blocks = 0; int rc = 0; - int blocks = 0; /* * Avoid calling into vos_aggregate() when aborting aggregation @@ -2715,8 +2720,32 @@ cont_ec_aggregate_cb(struct ds_cont_child *cont, daos_epoch_range_t *epr, agg_reset_entry(&ec_agg_param->ap_agg_entry, NULL, NULL); retry: + epoch.oe_value = epr->epr_hi; + rc = dtx_begin(cont->sc_hdl, &dti, &epoch, 0, cont->sc_pool->spc_map_version, &oid, + NULL, 0, 0, NULL, &dth); + if (rc != 0) + goto update_hae; + rc = vos_iterate(&iter_param, VOS_ITER_OBJ, true, &anchors, agg_iterate_pre_cb, - agg_iterate_post_cb, ec_agg_param, NULL); + agg_iterate_post_cb, ec_agg_param, dth); + if (rc == -DER_INPROGRESS && !d_list_empty(&dth->dth_share_tbd_list)) { + uint64_t now = daos_gettime_coarse(); + + /* Report warning per each 10 seconds to avoid log flood. */ + if (now - cont->sc_ec_agg_busy_ts > 10) { + while ((dsp = d_list_pop_entry(&dth->dth_share_tbd_list, + struct dtx_share_peer, dsp_link)) != NULL) { + D_WARN(DF_CONT ": EC aggregate hit non-committed DTX " DF_DTI "\n", + DP_CONT(cont->sc_pool->spc_uuid, cont->sc_uuid), + DP_DTI(&dsp->dsp_xid)); + dtx_dsp_free(dsp); + } + + cont->sc_ec_agg_busy_ts = now; + } + } + + dtx_end(dth, cont, rc); /* Post_cb may not being executed in some cases */ agg_clear_extents(&ec_agg_param->ap_agg_entry); diff --git a/src/object/srv_obj.c b/src/object/srv_obj.c index 041ea903c4f..4762e04b898 100644 --- a/src/object/srv_obj.c +++ b/src/object/srv_obj.c @@ -2953,8 +2953,11 @@ ds_obj_rw_handler(crt_rpc_t *rpc) d_tm_inc_counter(opm->opm_update_resent, 1); -again1: - e = 0; +again: + if (flags & ORF_RESEND) + e = orw->orw_epoch; + else + e = 0; rc = dtx_handle_resend(ioc.ioc_vos_coh, &orw->orw_dti, &e, &version); switch (rc) { @@ -2965,8 +2968,13 @@ ds_obj_rw_handler(crt_rpc_t *rpc) orw->orw_epoch = e; /* TODO: Also recover the epoch uncertainty. */ break; + case -DER_MISMATCH: + rc = vos_dtx_abort(ioc.ioc_vos_coh, &orw->orw_dti, e); + if (rc < 0 && rc != -DER_NONEXIST) + D_GOTO(out, rc); + /* Fall through */ case -DER_NONEXIST: - rc = 0; + flags = 0; break; default: D_GOTO(out, rc); @@ -2976,7 +2984,6 @@ ds_obj_rw_handler(crt_rpc_t *rpc) D_GOTO(out, rc); } -again2: /* For leader case, we need to find out the potential conflict * (or share the same non-committed object/dkey) DTX(s) in the * CoS (committable) cache, piggyback them via the dispdatched @@ -3021,7 +3028,7 @@ ds_obj_rw_handler(crt_rpc_t *rpc) exec_arg.rpc = rpc; exec_arg.ioc = &ioc; - exec_arg.flags = flags; + exec_arg.flags |= flags; exec_arg.start = orw->orw_start_shard; /* Execute the operation on all targets */ @@ -3036,28 +3043,25 @@ ds_obj_rw_handler(crt_rpc_t *rpc) case -DER_TX_RESTART: /* * If this is a standalone operation, we can restart the - * internal transaction right here. Otherwise, we have to defer - * the restart to the RPC client. + * internal transaction right here. Otherwise we have to + * defer the restart to the RPC sponsor. */ - if (opc == DAOS_OBJ_RPC_UPDATE) { - /* - * Only standalone updates use this RPC. Retry with - * newer epoch. - */ - orw->orw_epoch = d_hlc_get(); - orw->orw_flags &= ~ORF_RESEND; - flags = 0; - d_tm_inc_counter(opm->opm_update_restart, 1); - goto again2; - } + if (opc != DAOS_OBJ_RPC_UPDATE) + break; - break; + /* Only standalone updates use this RPC. Retry with newer epoch. */ + orw->orw_epoch = d_hlc_get(); + exec_arg.flags |= ORF_RESEND; + flags = ORF_RESEND; + d_tm_inc_counter(opm->opm_update_restart, 1); + goto again; case -DER_AGAIN: - orw->orw_flags |= ORF_RESEND; need_abort = true; + exec_arg.flags |= ORF_RESEND; + flags = ORF_RESEND; d_tm_inc_counter(opm->opm_update_retry, 1); ABT_thread_yield(); - goto again1; + goto again; default: break; } @@ -3875,8 +3879,11 @@ ds_obj_punch_handler(crt_rpc_t *rpc) if (opi->opi_flags & ORF_RESEND) { daos_epoch_t e; -again1: - e = 0; +again: + if (flags & ORF_RESEND) + e = opi->opi_epoch; + else + e = 0; rc = dtx_handle_resend(ioc.ioc_vos_coh, &opi->opi_dti, &e, &version); switch (rc) { @@ -3887,8 +3894,13 @@ ds_obj_punch_handler(crt_rpc_t *rpc) flags |= ORF_RESEND; /* TODO: Also recovery the epoch uncertainty. */ break; + case -DER_MISMATCH: + rc = vos_dtx_abort(ioc.ioc_vos_coh, &opi->opi_dti, e); + if (rc < 0 && rc != -DER_NONEXIST) + D_GOTO(out, rc); + /* Fall through */ case -DER_NONEXIST: - rc = 0; + flags = 0; break; default: D_GOTO(out, rc); @@ -3898,7 +3910,6 @@ ds_obj_punch_handler(crt_rpc_t *rpc) goto cleanup; } -again2: /* For leader case, we need to find out the potential conflict * (or share the same non-committed object/dkey) DTX(s) in the * CoS (committable) cache, piggyback them via the dispdatched @@ -3943,7 +3954,7 @@ ds_obj_punch_handler(crt_rpc_t *rpc) exec_arg.rpc = rpc; exec_arg.ioc = &ioc; - exec_arg.flags = flags; + exec_arg.flags |= flags; /* Execute the operation on all shards */ if (opi->opi_api_flags & DAOS_COND_PUNCH) @@ -3959,19 +3970,17 @@ ds_obj_punch_handler(crt_rpc_t *rpc) rc = dtx_leader_end(dlh, ioc.ioc_coh, rc); switch (rc) { case -DER_TX_RESTART: - /* - * Only standalone punches use this RPC. Retry with newer - * epoch. - */ + /* Only standalone punches use this RPC. Retry with newer epoch. */ opi->opi_epoch = d_hlc_get(); - opi->opi_flags &= ~ORF_RESEND; - flags = 0; - goto again2; + exec_arg.flags |= ORF_RESEND; + flags = ORF_RESEND; + goto again; case -DER_AGAIN: - opi->opi_flags |= ORF_RESEND; need_abort = true; + exec_arg.flags |= ORF_RESEND; + flags = ORF_RESEND; ABT_thread_yield(); - goto again1; + goto again; default: break; } @@ -5663,8 +5672,11 @@ ds_obj_coll_punch_handler(crt_rpc_t *rpc) if (ocpi->ocpi_flags & ORF_RESEND) { -again1: - tmp = 0; +again: + if (!(ocpi->ocpi_flags & ORF_LEADER) || (flags & ORF_RESEND)) + tmp = ocpi->ocpi_epoch; + else + tmp = 0; rc = dtx_handle_resend(ioc.ioc_vos_coh, &ocpi->ocpi_xid, &tmp, &version); switch (rc) { case -DER_ALREADY: @@ -5674,7 +5686,13 @@ ds_obj_coll_punch_handler(crt_rpc_t *rpc) flags |= ORF_RESEND; /* TODO: Also recovery the epoch uncertainty. */ break; + case -DER_MISMATCH: + rc = vos_dtx_abort(ioc.ioc_vos_coh, &ocpi->ocpi_xid, tmp); + if (rc < 0 && rc != -DER_NONEXIST) + D_GOTO(out, rc); + /* Fall through */ case -DER_NONEXIST: + flags = 0; break; default: D_GOTO(out, rc); @@ -5683,7 +5701,6 @@ ds_obj_coll_punch_handler(crt_rpc_t *rpc) dce->dce_ver = version; } -again2: epoch.oe_value = ocpi->ocpi_epoch; epoch.oe_first = epoch.oe_value; epoch.oe_flags = orf_to_dtx_epoch_flags(ocpi->ocpi_flags); @@ -5695,7 +5712,7 @@ ds_obj_coll_punch_handler(crt_rpc_t *rpc) exec_arg.rpc = rpc; exec_arg.ioc = &ioc; - exec_arg.flags = flags; + exec_arg.flags |= flags; exec_arg.coll_shards = dcts[0].dct_shards; exec_arg.coll_tgts = dcts; obj_coll_disp_init(dct_nr, ocpi->ocpi_max_tgt_sz, @@ -5728,14 +5745,15 @@ ds_obj_coll_punch_handler(crt_rpc_t *rpc) switch (rc) { case -DER_TX_RESTART: ocpi->ocpi_epoch = d_hlc_get(); - ocpi->ocpi_flags &= ~ORF_RESEND; - flags = 0; - goto again2; + exec_arg.flags |= ORF_RESEND; + flags = ORF_RESEND; + goto again; case -DER_AGAIN: - ocpi->ocpi_flags |= ORF_RESEND; need_abort = true; + exec_arg.flags |= ORF_RESEND; + flags = ORF_RESEND; ABT_thread_yield(); - goto again1; + goto again; default: break; } @@ -5755,12 +5773,14 @@ ds_obj_coll_punch_handler(crt_rpc_t *rpc) max_ver = version; DL_CDEBUG(rc != 0 && rc != -DER_INPROGRESS && rc != -DER_TX_RESTART, DLOG_ERR, DB_IO, rc, - "(%s) handled collective punch RPC %p for obj "DF_UOID" on XS %u/%u epc " - DF_X64" pmv %u/%u, with dti "DF_DTI", bulk_tgt_sz %u, bulk_tgt_nr %u, " - "tgt_nr %u, forward width %u, forward depth %u, flags %x", + "(%s) handled collective punch RPC %p for obj "DF_UOID" on XS %u/%u in "DF_UUID"/" + DF_UUID"/"DF_UUID" with epc "DF_X64", pmv %u/%u, dti "DF_DTI", bulk_tgt_sz %u, " + "bulk_tgt_nr %u, tgt_nr %u, forward width %u, forward depth %u, flags %x", (ocpi->ocpi_flags & ORF_LEADER) ? "leader" : (ocpi->ocpi_tgts.ca_count == 1 ? "non-leader" : "relay-engine"), rpc, - DP_UOID(ocpi->ocpi_oid), dmi->dmi_xs_id, dmi->dmi_tgt_id, ocpi->ocpi_epoch, + DP_UOID(ocpi->ocpi_oid), dmi->dmi_xs_id, dmi->dmi_tgt_id, + DP_UUID(ocpi->ocpi_po_uuid), DP_UUID(ocpi->ocpi_co_hdl), + DP_UUID(ocpi->ocpi_co_uuid), ocpi->ocpi_epoch, ocpi->ocpi_map_ver, max_ver, DP_DTI(&ocpi->ocpi_xid), ocpi->ocpi_bulk_tgt_sz, ocpi->ocpi_bulk_tgt_nr, (unsigned int)ocpi->ocpi_tgts.ca_count, ocpi->ocpi_disp_width, ocpi->ocpi_disp_depth, ocpi->ocpi_flags); diff --git a/src/object/srv_obj_remote.c b/src/object/srv_obj_remote.c index ce06723621b..f64d851e5b4 100644 --- a/src/object/srv_obj_remote.c +++ b/src/object/srv_obj_remote.c @@ -136,7 +136,7 @@ ds_obj_remote_update(struct dtx_leader_handle *dlh, void *data, int idx, *orw = *orw_parent; orw->orw_oid.id_shard = shard_tgt->st_shard_id; - orw->orw_flags |= ORF_BULK_BIND | obj_exec_arg->flags; + orw->orw_flags |= (ORF_BULK_BIND | obj_exec_arg->flags) & ~ORF_LEADER; if (shard_tgt->st_flags & DTF_DELAY_FORWARD && dlh->dlh_drop_cond) orw->orw_api_flags &= ~DAOS_COND_MASK; orw->orw_dti_cos.ca_count = dth->dth_dti_cos_count; @@ -247,7 +247,7 @@ ds_obj_remote_punch(struct dtx_leader_handle *dlh, void *data, int idx, *opi = *opi_parent; opi->opi_oid.id_shard = shard_tgt->st_shard_id; - opi->opi_flags |= obj_exec_arg->flags; + opi->opi_flags |= obj_exec_arg->flags & ~ORF_LEADER; if (shard_tgt->st_flags & DTF_DELAY_FORWARD && dlh->dlh_drop_cond) opi->opi_api_flags &= ~DAOS_COND_PUNCH; opi->opi_dti_cos.ca_count = dth->dth_dti_cos_count; @@ -495,7 +495,7 @@ ds_obj_coll_punch_remote(struct dtx_leader_handle *dlh, void *data, int idx, crt_endpoint_t tgt_ep = { 0 }; crt_rpc_t *parent_req = exec_arg->rpc; crt_rpc_t *req; - struct obj_coll_punch_in *ocpi_parent; + struct obj_coll_punch_in *ocpi_parent = crt_req_get(parent_req); struct obj_coll_punch_in *ocpi; int tag; int rc = 0; @@ -509,7 +509,7 @@ ds_obj_coll_punch_remote(struct dtx_leader_handle *dlh, void *data, int idx, if (remote_arg == NULL) D_GOTO(out, rc = -DER_NOMEM); - obj_coll_disp_dest(cursor, exec_arg->coll_tgts, &tgt_ep); + obj_coll_disp_dest(cursor, exec_arg->coll_tgts, &tgt_ep, ocpi_parent->ocpi_oid.id_pub); tag = tgt_ep.ep_tag; crt_req_addref(parent_req); @@ -524,9 +524,7 @@ ds_obj_coll_punch_remote(struct dtx_leader_handle *dlh, void *data, int idx, D_GOTO(out, rc); } - ocpi_parent = crt_req_get(parent_req); ocpi = crt_req_get(req); - ocpi->ocpi_odm = ocpi_parent->ocpi_odm; uuid_copy(ocpi->ocpi_po_uuid, ocpi_parent->ocpi_po_uuid); uuid_copy(ocpi->ocpi_co_hdl, ocpi_parent->ocpi_co_hdl); @@ -634,7 +632,7 @@ ds_obj_coll_query_remote(struct dtx_leader_handle *dlh, void *data, int idx, if (remote_arg == NULL) D_GOTO(out, rc = -DER_NOMEM); - obj_coll_disp_dest(cursor, exec_arg->coll_tgts, &tgt_ep); + obj_coll_disp_dest(cursor, exec_arg->coll_tgts, &tgt_ep, ocqi_parent->ocqi_oid.id_pub); tag = tgt_ep.ep_tag; remote_arg->dlh = dlh; diff --git a/src/vos/vos_dtx.c b/src/vos/vos_dtx.c index 0f87bfd4d0e..47e410fc1e7 100644 --- a/src/vos/vos_dtx.c +++ b/src/vos/vos_dtx.c @@ -312,16 +312,38 @@ dtx_act_ent_update(struct btr_instance *tins, struct btr_record *rec, if (unlikely(!dae_old->dae_aborted)) { /* - * XXX: There are two possible reasons for that: - * - * 1. Client resent the RPC but without set 'RESEND' flag. - * 2. Client reused the DTX ID for different modifications. - * - * Currently, the 1st case is more suspected. + * If the new entry and the old entry are for the same transaction, then the RPC + * for the new one will take 'RESEND' flag, that will cause the old one has been + * aborted before arriving at here. So it is quite possible that the new one and + * the old one are for different transactions. */ - D_ERROR("The TX ID "DF_DTI" may be reused for epoch "DF_X64" vs "DF_X64"\n", - DP_DTI(&DAE_XID(dae_old)), DAE_EPOCH(dae_old), DAE_EPOCH(dae_new)); - return -DER_TX_ID_REUSED; + if (DAE_EPOCH(dae_old) < DAE_EPOCH(dae_new)) { + D_ERROR("The TX ID "DF_DTI" may be reused for epoch "DF_X64" vs "DF_X64"\n", + DP_DTI(&DAE_XID(dae_old)), DAE_EPOCH(dae_old), DAE_EPOCH(dae_new)); + return -DER_TX_ID_REUSED; + } + + /* + * If the old entry has higher epoch, it is quite possible that the resent RPC + * was handled before the original RPC (corresponding to 'dae_new'). Returning + * -DER_INPROGRESS to make the RPC sponsor to retry the RPC with 'RESEND' flag, + * then related RPC handler logic will handle such case. + */ + if (DAE_EPOCH(dae_old) > DAE_EPOCH(dae_new)) { + D_ERROR("Resent RPC may be handled before original one for DTX "DF_DTI + " with epoch "DF_X64" vs "DF_X64"\n", + DP_DTI(&DAE_XID(dae_old)), DAE_EPOCH(dae_old), DAE_EPOCH(dae_new)); + return -DER_INPROGRESS; + } + + /* + * The two entries uses the same epoch, then it may be caused by repeated RPCs + * from different sources, such as multiple relay engines forward the same RPC + * to current target. We need to notify related caller for such buggy case. + */ + D_ERROR("Receive repeated DTX "DF_DTI" with epoch "DF_X64"\n", + DP_DTI(&DAE_XID(dae_old)), DAE_EPOCH(dae_old)); + return -DER_MISC; } rec->rec_off = umem_ptr2off(&tins->ti_umm, dae_new); @@ -1172,16 +1194,20 @@ vos_dtx_check_availability(daos_handle_t coh, uint32_t entry, } if (intent == DAOS_INTENT_PURGE) { - uint32_t age = d_hlc_age2sec(DAE_XID(dae).dti_hlc); + uint64_t now = daos_gettime_coarse(); /* * The DTX entry still references related data record, * then we cannot (vos) aggregate related data record. + * Report warning per each 10 seconds to avoid log flood. */ - if (age >= DAOS_AGG_THRESHOLD) - D_WARN("DTX "DF_DTI" (state:%u, age:%u) still references the data, " - "cannot be (VOS) aggregated\n", - DP_DTI(&DAE_XID(dae)), vos_dtx_status(dae), age); + if (now - cont->vc_agg_busy_ts > 10) { + D_WARN("DTX "DF_DTI" (state:%u, flags:%x, age:%u) still references " + "the modification, cannot be (VOS) aggregated\n", + DP_DTI(&DAE_XID(dae)), vos_dtx_status(dae), DAE_FLAGS(dae), + (unsigned int)d_hlc_age2sec(DAE_XID(dae).dti_hlc)); + cont->vc_agg_busy_ts = now; + } return ALB_AVAILABLE_DIRTY; } @@ -1909,8 +1935,13 @@ vos_dtx_check(daos_handle_t coh, struct dtx_id *dti, daos_epoch_t *epoch, daos_epoch_t e = *epoch; *epoch = DAE_EPOCH(dae); - if (e != 0 && e != DAE_EPOCH(dae)) - return -DER_MISMATCH; + if (e != 0) { + if (e > DAE_EPOCH(dae)) + return -DER_MISMATCH; + + if (e < DAE_EPOCH(dae)) + return -DER_TX_RESTART; + } } return vos_dae_is_prepare(dae) ? DTX_ST_PREPARED : DTX_ST_INITED; diff --git a/src/vos/vos_internal.h b/src/vos/vos_internal.h index 7d4dd3ac166..5aa0a1fadbb 100644 --- a/src/vos/vos_internal.h +++ b/src/vos/vos_internal.h @@ -353,6 +353,8 @@ struct vos_container { daos_epoch_range_t vc_epr_aggregation; /* Current ongoing discard EPR */ daos_epoch_range_t vc_epr_discard; + /* Last timestamp when VOS aggregation reports -DER_TX_BUSY */ + uint64_t vc_agg_busy_ts; /* Last timestamp when VOS aggregation reporting ENOSPACE */ uint64_t vc_agg_nospc_ts; /* Last timestamp when IO reporting ENOSPACE */