Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DAOS-16469 dtx: properly handle DTX partial commit #15335

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/dtx/dtx_coll.c
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ dtx_coll_local_one(void *args)

switch (opc) {
case DTX_COLL_COMMIT:
rc = vos_dtx_commit(cont->sc_hdl, &dcla->dcla_xid, 1, NULL);
rc = vos_dtx_commit(cont->sc_hdl, &dcla->dcla_xid, 1, false, NULL);
break;
case DTX_COLL_ABORT:
rc = vos_dtx_abort(cont->sc_hdl, &dcla->dcla_xid, dcla->dcla_epoch);
Expand Down
2 changes: 1 addition & 1 deletion src/dtx/dtx_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -1605,7 +1605,7 @@ dtx_end(struct dtx_handle *dth, struct ds_cont_child *cont, int result)
* and can be committed next time.
*/
rc = vos_dtx_commit(cont->sc_hdl, dth->dth_dti_cos,
dth->dth_dti_cos_count, NULL);
dth->dth_dti_cos_count, false, NULL);
if (rc < 0)
D_ERROR(DF_UUID": Fail to DTX CoS commit: %d\n",
DP_UUID(cont->sc_uuid), rc);
Expand Down
45 changes: 32 additions & 13 deletions src/dtx/dtx_cos.c
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,16 @@ dtx_cos_del_one(struct ds_cont_child *cont, struct dtx_cos_rec_child *dcrc)
return rc;
}

static void
dtx_cos_demote_one(struct ds_cont_child *cont, struct dtx_cos_rec_child *dcrc)
{
d_list_del(&dcrc->dcrc_gl_committable);
if (dcrc->dcrc_coll)
d_list_add_tail(&dcrc->dcrc_gl_committable, &cont->sc_dtx_coll_list);
else
d_list_add_tail(&dcrc->dcrc_gl_committable, &cont->sc_dtx_cos_list);
}

int
dtx_fetch_committable(struct ds_cont_child *cont, uint32_t max_cnt,
daos_unit_oid_t *oid, daos_epoch_t epoch, bool force,
Expand Down Expand Up @@ -622,7 +632,7 @@ dtx_cos_add(struct ds_cont_child *cont, void *entry, daos_unit_oid_t *oid,

int
dtx_cos_del(struct ds_cont_child *cont, struct dtx_id *xid,
daos_unit_oid_t *oid, uint64_t dkey_hash)
daos_unit_oid_t *oid, uint64_t dkey_hash, bool demote)
{
struct dtx_cos_key key;
d_iov_t kiov;
Expand All @@ -645,36 +655,41 @@ dtx_cos_del(struct ds_cont_child *cont, struct dtx_id *xid,

d_list_for_each_entry(dcrc, &dcr->dcr_prio_list, dcrc_lo_link) {
if (memcmp(&dcrc->dcrc_dte->dte_xid, xid, sizeof(*xid)) == 0) {
rc = dtx_cos_del_one(cont, dcrc);
if (demote)
dtx_cos_demote_one(cont, dcrc);
else
rc = dtx_cos_del_one(cont, dcrc);
D_GOTO(out, found = 1);
}
}

d_list_for_each_entry(dcrc, &dcr->dcr_reg_list, dcrc_lo_link) {
if (memcmp(&dcrc->dcrc_dte->dte_xid, xid, sizeof(*xid)) == 0) {
rc = dtx_cos_del_one(cont, dcrc);
if (demote)
dtx_cos_demote_one(cont, dcrc);
else
rc = dtx_cos_del_one(cont, dcrc);
D_GOTO(out, found = 2);
}
}

d_list_for_each_entry(dcrc, &dcr->dcr_expcmt_list, dcrc_lo_link) {
if (memcmp(&dcrc->dcrc_dte->dte_xid, xid, sizeof(*xid)) == 0) {
rc = dtx_cos_del_one(cont, dcrc);
if (demote)
dtx_cos_demote_one(cont, dcrc);
else
rc = dtx_cos_del_one(cont, dcrc);
D_GOTO(out, found = 3);
}
}

out:
if (found > 0)
if (found > 0 && !demote)
d_tm_dec_gauge(dtx_tls_get()->dt_committable, 1);

if (rc == 0 && found == 0)
rc = -DER_NONEXIST;

DL_CDEBUG(rc != 0 && rc != -DER_NONEXIST, DLOG_ERR, DB_TRACE, rc,
"Remove DTX from CoS cache "DF_UOID", key %lu",
DP_UOID(*oid), (unsigned long)dkey_hash);

return rc == -DER_NONEXIST ? 0 : rc;
}

Expand Down Expand Up @@ -778,10 +793,14 @@ dtx_cos_batched_del(struct ds_cont_child *cont, struct dtx_id xid[], bool rm[],
if (memcmp(&dcrc->dcrc_dte->dte_xid, &xid[i], sizeof(struct dtx_id)) == 0) {
found = true;

if (rm[i]) {
rc = dtx_cos_del_one(cont, dcrc);
if (rc == 0)
del++;
if (rm != NULL) {
if (rm[i]) {
rc = dtx_cos_del_one(cont, dcrc);
if (rc == 0)
del++;
}
} else {
dtx_cos_demote_one(cont, dcrc);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/dtx/dtx_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ int dtx_fetch_committable(struct ds_cont_child *cont, uint32_t max_cnt,
int dtx_cos_add(struct ds_cont_child *cont, void *entry, daos_unit_oid_t *oid,
uint64_t dkey_hash, daos_epoch_t epoch, uint32_t flags);
int dtx_cos_del(struct ds_cont_child *cont, struct dtx_id *xid,
daos_unit_oid_t *oid, uint64_t dkey_hash);
daos_unit_oid_t *oid, uint64_t dkey_hash, bool demote);
uint64_t dtx_cos_oldest(struct ds_cont_child *cont);
void dtx_cos_prio(struct ds_cont_child *cont, struct dtx_id *xid,
daos_unit_oid_t *oid, uint64_t dkey_hash);
Expand Down
105 changes: 70 additions & 35 deletions src/dtx/dtx_rpc.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ struct dtx_req_args {
int dra_length;
/* The collective RPC result. */
int dra_result;
uint32_t dra_local_fail:1;
/* Pointer to the committed DTX list, used for DTX_REFRESH case. */
d_list_t *dra_cmt_list;
/* Pointer to the aborted DTX list, used for DTX_REFRESH case. */
Expand All @@ -81,6 +82,7 @@ struct dtx_req_rec {
int drr_count; /* DTX count */
int drr_result; /* The RPC result */
uint32_t drr_comp:1,
drr_local_fail:1,
drr_single_dti:1;
uint32_t drr_inline_flags;
struct dtx_id *drr_dti; /* The DTX array */
Expand Down Expand Up @@ -290,10 +292,13 @@ dtx_req_send(struct dtx_req_rec *drr, daos_epoch_t epoch)
"DTX req for opc %x to %d/%d (req %p future %p) sent epoch "DF_X64,
dra->dra_opc, drr->drr_rank, drr->drr_tag, req, dra->dra_future, epoch);

if (rc != 0 && drr->drr_comp == 0) {
drr->drr_comp = 1;
drr->drr_result = rc;
ABT_future_set(dra->dra_future, drr);
if (rc != 0) {
drr->drr_local_fail = 1;
if (drr->drr_comp == 0) {
drr->drr_comp = 1;
drr->drr_result = rc;
ABT_future_set(dra->dra_future, drr);
}
}

return rc;
Expand All @@ -309,13 +314,17 @@ dtx_req_list_cb(void **args)
if (dra->dra_opc == DTX_CHECK) {
for (i = 0; i < dra->dra_length; i++) {
drr = args[i];
if (drr->drr_local_fail)
dra->dra_local_fail = 1;
dtx_merge_check_result(&dra->dra_result, drr->drr_result);
D_DEBUG(DB_TRACE, "The DTX "DF_DTI" RPC req result %d, status is %d.\n",
DP_DTI(&drr->drr_dti[0]), drr->drr_result, dra->dra_result);
}
} else {
for (i = 0; i < dra->dra_length; i++) {
drr = args[i];
if (drr->drr_local_fail)
dra->dra_local_fail = 1;
if (dra->dra_result == 0 || dra->dra_result == -DER_NONEXIST)
dra->dra_result = drr->drr_result;
}
Expand Down Expand Up @@ -382,7 +391,12 @@ dtx_req_list_send(struct dtx_common_args *dca, bool is_reentrance)
if (rc != ABT_SUCCESS) {
D_ERROR("ABT_future_create failed for opc %x, len %d: rc %d.\n",
dra->dra_opc, dca->dca_steps, rc);
return dss_abterr2der(rc);
dra->dra_local_fail = 1;
if (dra->dra_opc == DTX_CHECK)
dtx_merge_check_result(&dra->dra_result, dss_abterr2der(rc));
else if (dra->dra_result == 0 || dra->dra_result == -DER_NONEXIST)
dra->dra_result = dss_abterr2der(rc);
return DSS_CHORE_DONE;
}

D_DEBUG(DB_TRACE, "%p: DTX req for opc %x, future %p (%d) start.\n",
Expand Down Expand Up @@ -750,7 +764,12 @@ dtx_rpc(struct ds_cont_child *cont,d_list_t *dti_list, struct dtx_entry **dtes,
switch (opc) {
case DTX_COMMIT:
case DTX_ABORT:
if (rc != -DER_EXCLUDED && rc != -DER_OOG)
/*
* Continue to send out more RPCs as long as there is no local failure,
* then other healthy participants can commit/abort related DTX entries
* without being affected by the bad one(s).
*/
if (dca->dca_dra.dra_local_fail)
goto out;
break;
case DTX_CHECK:
Expand Down Expand Up @@ -826,17 +845,8 @@ dtx_commit(struct ds_cont_child *cont, struct dtx_entry **dtes,
if (rc > 0 || rc == -DER_NONEXIST || rc == -DER_EXCLUDED || rc == -DER_OOG)
rc = 0;

if (rc != 0) {
/*
* Some DTX entries may have been committed on some participants. Then mark all
* the DTX entries (in the dtis) as "PARTIAL_COMMITTED" and re-commit them later.
* It is harmless to re-commit the DTX that has ever been committed.
*/
if (dra->dra_committed > 0)
rc1 = vos_dtx_set_flags(cont->sc_hdl, dca.dca_dtis, count,
DTE_PARTIAL_COMMITTED);
} else {
if (has_cos) {
if (rc == 0 || dra->dra_committed > 0) {
if (rc == 0 && has_cos) {
if (count > 1) {
D_ALLOC_ARRAY(rm_cos, count);
if (rm_cos == NULL)
Expand All @@ -846,7 +856,12 @@ dtx_commit(struct ds_cont_child *cont, struct dtx_entry **dtes,
}
}

rc1 = vos_dtx_commit(cont->sc_hdl, dca.dca_dtis, count, rm_cos);
/*
* Some DTX entries may have been committed on some participants. Then mark all
* the DTX entries (in the dtis) as "PARTIAL_COMMITTED" and re-commit them later.
* It is harmless to re-commit the DTX that has ever been committed.
*/
rc1 = vos_dtx_commit(cont->sc_hdl, dca.dca_dtis, count, rc != 0, rm_cos);
if (rc1 > 0) {
dra->dra_committed += rc1;
rc1 = 0;
Expand All @@ -855,13 +870,28 @@ dtx_commit(struct ds_cont_child *cont, struct dtx_entry **dtes,
rc1 = 0;
}

if (rc1 == 0 && rm_cos != NULL) {
/*
* For partial commit case, move related DTX entries to the tail of the
* committable list, then the next batched commit can commit others and
* retry those partial committed sometime later instead of blocking the
* others committable with continuously retry the failed ones.
*
* The side-effect of such behavior is that the DTX which is committable
* earlier maybe delay committed than the later ones.
*/
if (rc1 == 0 && has_cos) {
if (dcks != NULL) {
for (i = 0; i < count; i++) {
if (rm_cos[i]) {
D_ASSERT(!daos_oid_is_null(dcks[i].oid.id_pub));
if (rm_cos != NULL) {
for (i = 0; i < count; i++) {
if (!rm_cos[i])
continue;
dtx_cos_del(cont, &dca.dca_dtis[i], &dcks[i].oid,
dcks[i].dkey_hash);
dcks[i].dkey_hash, false);
}
} else {
for (i = 0; i < count; i++) {
dtx_cos_del(cont, &dca.dca_dtis[i], &dcks[i].oid,
dcks[i].dkey_hash, true);
}
}
} else {
Expand Down Expand Up @@ -1141,7 +1171,7 @@ dtx_refresh_internal(struct ds_cont_child *cont, int *check_count, d_list_t *che
* It has been committed/committable on leader, we may miss
* related DTX commit request, so let's commit it locally.
*/
rc1 = vos_dtx_commit(cont->sc_hdl, &dsp->dsp_xid, 1, NULL);
rc1 = vos_dtx_commit(cont->sc_hdl, &dsp->dsp_xid, 1, false, NULL);
if (rc1 == 0 || rc1 == -DER_NONEXIST || !for_io /* cleanup case */) {
d_list_del(&dsp->dsp_link);
dtx_dsp_free(dsp);
Expand Down Expand Up @@ -1636,24 +1666,29 @@ dtx_coll_commit(struct ds_cont_child *cont, struct dtx_coll_entry *dce, struct d
committed += dcra.dcra_committed;
}

if (rc == 0 && rc1 == 0)
rc2 = vos_dtx_commit(cont->sc_hdl, &dce->dce_xid, 1, NULL);
else if (committed > 0)
if ((rc == 0 && rc1 == 0) || committed > 0) {
/* Mark the DTX as "PARTIAL_COMMITTED" and re-commit it later via cleanup logic. */
rc2 = vos_dtx_set_flags(cont->sc_hdl, &dce->dce_xid, 1, DTE_PARTIAL_COMMITTED);
if (rc2 > 0 || rc2 == -DER_NONEXIST)
rc2 = 0;
rc2 = vos_dtx_commit(cont->sc_hdl, &dce->dce_xid, 1, rc != 0 || rc1 != 0, NULL);
if (rc2 > 0 || rc2 == -DER_NONEXIST)
rc2 = 0;
}

/*
* NOTE: Currently, we commit collective DTX one by one with high priority. So here we have
* to remove the collective DTX entry from the CoS even if the commit failed remotely.
* Otherwise, the batched commit ULT may be blocked by such "bad" entry.
* For partial commit case, move related DTX entries to the tail of the
* committable list, then the next batched commit can commit others and
* retry those partial committed sometime later instead of blocking the
* others committable with continuously retry the failed ones.
*
* The side-effect of such behavior is that the DTX which is committable
* earlier maybe delay committed than the later ones.
*/
if (rc2 == 0 && has_cos) {
if (dck != NULL)
dtx_cos_del(cont, &dce->dce_xid, &dck->oid, dck->dkey_hash);
dtx_cos_del(cont, &dce->dce_xid, &dck->oid, dck->dkey_hash,
rc != 0 || rc1 != 0);
else
dtx_cos_batched_del(cont, &dce->dce_xid, &cos, 1);
dtx_cos_batched_del(cont, &dce->dce_xid,
rc != 0 || rc1 != 0 ? NULL : &cos, 1);
}

D_CDEBUG(rc != 0 || rc1 != 0 || rc2 != 0, DLOG_ERR, DB_TRACE,
Expand Down
2 changes: 1 addition & 1 deletion src/dtx/dtx_srv.c
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ dtx_handler(crt_rpc_t *rpc)
count = din->di_dtx_array.ca_count - i;

dtis = (struct dtx_id *)din->di_dtx_array.ca_arrays + i;
rc1 = vos_dtx_commit(cont->sc_hdl, dtis, count, NULL);
rc1 = vos_dtx_commit(cont->sc_hdl, dtis, count, false, NULL);
if (rc1 > 0)
committed += rc1;
else if (rc == 0 && rc1 < 0)
Expand Down
3 changes: 2 additions & 1 deletion src/include/daos_srv/vos.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,13 +168,14 @@ vos_dtx_load_mbs(daos_handle_t coh, struct dtx_id *dti, daos_unit_oid_t *oid,
* \param coh [IN] Container open handle.
* \param dtis [IN] The array for DTX identifiers to be committed.
* \param count [IN] The count of DTXs to be committed.
* \param keep_act [IN] Keep DTX entry or not.
* \param rm_cos [OUT] The array for whether remove entry from CoS cache.
*
* \return Negative value if error.
* \return Others are for the count of committed DTXs.
*/
int
vos_dtx_commit(daos_handle_t coh, struct dtx_id dtis[], int count, bool rm_cos[]);
vos_dtx_commit(daos_handle_t coh, struct dtx_id dtis[], int count, bool keep_act, bool rm_cos[]);

/**
* Abort the specified DTXs.
Expand Down
Loading
Loading