Skip to content

Commit

Permalink
Merge branch 'master' into Nasf-Fan/DAOS-16170_2
Browse files Browse the repository at this point in the history
  • Loading branch information
Nasf-Fan committed Nov 15, 2024
2 parents 8931cd0 + 083aa4c commit 11017c0
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 11 deletions.
20 changes: 17 additions & 3 deletions src/container/srv_target.c
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,6 @@ cont_aggregate_runnable(struct ds_cont_child *cont, struct sched_request *req,
}

if (pool->sp_rebuilding && !vos_agg) {
cont->sc_ec_agg_active = 0;
D_DEBUG(DB_EPC, DF_CONT": skip EC aggregation during rebuild %d.\n",
DP_CONT(cont->sc_pool->spc_uuid, cont->sc_uuid),
pool->sp_rebuilding);
Expand All @@ -192,12 +191,10 @@ cont_aggregate_runnable(struct ds_cont_child *cont, struct sched_request *req,
if (!cont->sc_vos_agg_active)
D_DEBUG(DB_EPC, DF_CONT": resume VOS aggregation after reintegration.\n",
DP_CONT(cont->sc_pool->spc_uuid, cont->sc_uuid));
cont->sc_vos_agg_active = 1;
} else {
if (!cont->sc_ec_agg_active)
D_DEBUG(DB_EPC, DF_CONT": resume EC aggregation after reintegration.\n",
DP_CONT(cont->sc_pool->spc_uuid, cont->sc_uuid));
cont->sc_ec_agg_active = 1;
}

if (!cont->sc_props_fetched)
Expand Down Expand Up @@ -471,6 +468,11 @@ cont_aggregate_interval(struct ds_cont_child *cont, cont_aggregate_cb_t cb,
if (!cont_aggregate_runnable(cont, req, param->ap_vos_agg))
goto next;

if (param->ap_vos_agg)
cont->sc_vos_agg_active = 1;
else
cont->sc_ec_agg_active = 1;

rc = cont_child_aggregate(cont, cb, param);
if (rc == -DER_SHUTDOWN) {
break; /* pool destroyed */
Expand All @@ -483,10 +485,22 @@ cont_aggregate_interval(struct ds_cont_child *cont, cont_aggregate_cb_t cb,
/* Don't sleep too long when there is space pressure */
msecs = 2ULL * 100;
}

if (param->ap_vos_agg)
cont->sc_vos_agg_active = 0;
else
cont->sc_ec_agg_active = 0;

next:
if (dss_ult_exiting(req))
break;

/* sleep 18 seconds for EC aggregation ULT if the pool is in rebuilding,
* if no space pressure.
*/
if (cont->sc_pool->spc_pool->sp_rebuilding && !param->ap_vos_agg && msecs != 200)
msecs = 18000;

sched_req_sleep(req, msecs);
}
out:
Expand Down
22 changes: 21 additions & 1 deletion src/object/srv_ec_aggregate.c
Original file line number Diff line number Diff line change
Expand Up @@ -2277,6 +2277,13 @@ ec_aggregate_yield(struct ec_agg_param *agg_param)
{
int rc;

if (agg_param->ap_pool_info.api_pool->sp_rebuilding > 0) {
D_INFO(DF_UUID": abort ec aggregation, sp_rebuilding %d\n",
DP_UUID(agg_param->ap_pool_info.api_pool->sp_uuid),
agg_param->ap_pool_info.api_pool->sp_rebuilding);
return true;
}

D_ASSERT(agg_param->ap_yield_func != NULL);
rc = agg_param->ap_yield_func(agg_param->ap_yield_arg);
if (rc < 0) /* Abort */
Expand Down Expand Up @@ -2474,6 +2481,17 @@ agg_iterate_pre_cb(daos_handle_t ih, vos_iter_entry_t *entry,

D_ASSERT(agg_param->ap_initialized);

/* If rebuild started, abort it to save conflict window with rebuild
* (see obj_inflight_io_check()).
*/
if (agg_param->ap_pool_info.api_pool->sp_rebuilding > 0) {
D_INFO(DF_CONT" abort as rebuild started, sp_rebuilding %d\n",
DP_CONT(agg_param->ap_pool_info.api_pool_uuid,
agg_param->ap_pool_info.api_cont_uuid),
agg_param->ap_pool_info.api_pool->sp_rebuilding);
return -1;
}

switch (type) {
case VOS_ITER_OBJ:
agg_param->ap_epr = param->ip_epr;
Expand All @@ -2495,7 +2513,9 @@ agg_iterate_pre_cb(daos_handle_t ih, vos_iter_entry_t *entry,
}

if (rc < 0) {
D_ERROR("EC aggregation failed: "DF_RC"\n", DP_RC(rc));
D_ERROR(DF_UUID" EC aggregation (rebuilding %d) failed: "DF_RC"\n",
DP_UUID(agg_param->ap_pool_info.api_pool->sp_uuid),
agg_param->ap_pool_info.api_pool->sp_rebuilding, DP_RC(rc));
return rc;
}

Expand Down
5 changes: 3 additions & 2 deletions src/object/srv_obj.c
Original file line number Diff line number Diff line change
Expand Up @@ -2362,8 +2362,9 @@ obj_inflight_io_check(struct ds_cont_child *child, uint32_t opc,
{
if (opc == DAOS_OBJ_RPC_ENUMERATE && flags & ORF_FOR_MIGRATION) {
if (child->sc_ec_agg_active) {
D_ERROR(DF_UUID" ec aggregate still active\n",
DP_UUID(child->sc_pool->spc_uuid));
D_ERROR(DF_CONT" ec aggregate still active, rebuilding %d\n",
DP_CONT(child->sc_pool->spc_uuid, child->sc_uuid),
child->sc_pool->spc_pool->sp_rebuilding);
return -DER_UPDATE_AGAIN;
}
}
Expand Down
18 changes: 13 additions & 5 deletions src/rebuild/scan.c
Original file line number Diff line number Diff line change
Expand Up @@ -876,9 +876,9 @@ rebuild_container_scan_cb(daos_handle_t ih, vos_iter_entry_t *entry,
rpt->rt_rebuild_op != RB_OP_FAIL_RECLAIM) {
D_ASSERTF(rpt->rt_pool->sp_rebuilding >= 0, DF_UUID" rebuilding %d\n",
DP_UUID(rpt->rt_pool_uuid), rpt->rt_pool->sp_rebuilding);
/* Wait for EC aggregation to abort before discard the object */
D_INFO(DF_RB " " DF_UUID " wait for ec agg abort.\n", DP_RB_RPT(rpt),
DP_UUID(entry->ie_couuid));
/* Wait for EC aggregation to abort before discard the object */
D_INFO(DF_RB " " DF_UUID " wait for ec agg abort, rebuilding %d.\n", DP_RB_RPT(rpt),
DP_UUID(entry->ie_couuid), rpt->rt_pool->sp_rebuilding);
dss_sleep(1000);
if (rpt->rt_abort || rpt->rt_finishing) {
D_DEBUG(DB_REBUILD, DF_RB " " DF_UUID " rebuild abort %u/%u.\n",
Expand Down Expand Up @@ -1051,6 +1051,7 @@ rebuild_scan_leader(void *data)
struct rebuild_tgt_pool_tracker *rpt = data;
struct rebuild_pool_tls *tls;
int rc;
bool wait = false;

D_DEBUG(DB_REBUILD, DF_RB " check resync %u/%u < %u\n", DP_RB_RPT(rpt),
rpt->rt_pool->sp_dtx_resync_version, rpt->rt_global_dtx_resync_version,
Expand All @@ -1063,6 +1064,7 @@ rebuild_scan_leader(void *data)
if (rpt->rt_global_dtx_resync_version < rpt->rt_rebuild_ver) {
D_INFO(DF_RB " wait for global dtx %u\n", DP_RB_RPT(rpt),
rpt->rt_global_dtx_resync_version);
wait = true;
ABT_cond_wait(rpt->rt_global_dtx_wait_cond, rpt->rt_lock);
}
ABT_mutex_unlock(rpt->rt_lock);
Expand All @@ -1073,15 +1075,21 @@ rebuild_scan_leader(void *data)
}
}

D_DEBUG(DB_REBUILD, DF_RB " scan collective begin\n", DP_RB_RPT(rpt));
if (wait)
D_INFO(DF_RB " scan collective begin\n", DP_RB_RPT(rpt));
else
D_DEBUG(DB_REBUILD, DF_RB " scan collective begin\n", DP_RB_RPT(rpt));

rc = ds_pool_thread_collective(rpt->rt_pool_uuid, PO_COMP_ST_NEW | PO_COMP_ST_DOWN |
PO_COMP_ST_DOWNOUT, rebuild_scanner, rpt,
DSS_ULT_DEEP_STACK);
if (rc)
D_GOTO(out, rc);

D_DEBUG(DB_REBUILD, DF_RB "rebuild scan collective done\n", DP_RB_RPT(rpt));
if (wait)
D_INFO(DF_RB "rebuild scan collective done\n", DP_RB_RPT(rpt));
else
D_DEBUG(DB_REBUILD, DF_RB "rebuild scan collective done\n", DP_RB_RPT(rpt));

ABT_mutex_lock(rpt->rt_lock);
rc = ds_pool_task_collective(rpt->rt_pool_uuid, PO_COMP_ST_NEW | PO_COMP_ST_DOWN |
Expand Down

0 comments on commit 11017c0

Please sign in to comment.