diff --git a/src/object/srv_coll.c b/src/object/srv_coll.c index 9e421810861..a63a11d574b 100644 --- a/src/object/srv_coll.c +++ b/src/object/srv_coll.c @@ -183,7 +183,7 @@ obj_coll_punch_bulk(crt_rpc_t *rpc, d_iov_t *iov, crt_proc_t *p_proc, sgl.sg_iovs = iov; rc = obj_bulk_transfer(rpc, CRT_BULK_GET, false, &ocpi->ocpi_tgt_bulk, NULL, NULL, - DAOS_HDL_INVAL, &sgls, 1, NULL, NULL); + DAOS_HDL_INVAL, &sgls, 1, 1, NULL, NULL); if (rc != 0) { D_ERROR("Failed to prepare bulk transfer for coll_punch, size %u: "DF_RC"\n", ocpi->ocpi_bulk_tgt_sz, DP_RC(rc)); diff --git a/src/object/srv_internal.h b/src/object/srv_internal.h index 6f13e3f36dc..a24986247a5 100644 --- a/src/object/srv_internal.h +++ b/src/object/srv_internal.h @@ -280,7 +280,7 @@ typedef int (*ds_iofw_cb_t)(crt_rpc_t *req, void *arg); int obj_bulk_transfer(crt_rpc_t *rpc, crt_bulk_op_t bulk_op, bool bulk_bind, crt_bulk_t *remote_bulks, uint64_t *remote_offs, uint8_t *skips, - daos_handle_t ioh, d_sg_list_t **sgls, int sgl_nr, + daos_handle_t ioh, d_sg_list_t **sgls, int sgl_nr, int bulk_nr, struct obj_bulk_args *p_arg, struct ds_cont_hdl *coh); int obj_tgt_punch(struct obj_tgt_punch_args *otpa, uint32_t *shards, uint32_t count); int obj_tgt_query(struct obj_tgt_query_args *otqa, uuid_t po_uuid, uuid_t co_hdl, uuid_t co_uuid, diff --git a/src/object/srv_obj.c b/src/object/srv_obj.c index 0a246bebdca..febd3d36ead 100644 --- a/src/object/srv_obj.c +++ b/src/object/srv_obj.c @@ -488,22 +488,24 @@ bulk_transfer_sgl(daos_handle_t ioh, crt_rpc_t *rpc, crt_bulk_t remote_bulk, int obj_bulk_transfer(crt_rpc_t *rpc, crt_bulk_op_t bulk_op, bool bulk_bind, crt_bulk_t *remote_bulks, uint64_t *remote_offs, uint8_t *skips, daos_handle_t ioh, d_sg_list_t **sgls, - int sgl_nr, struct obj_bulk_args *p_arg, struct ds_cont_hdl *coh) + int sgl_nr, int bulk_nr, struct obj_bulk_args *p_arg, struct ds_cont_hdl *coh) { - struct obj_rw_in *orw = crt_req_get(rpc); struct obj_bulk_args arg = { 0 }; int i, rc, *status, ret; int skip_nr = 0; - int bulk_nr; bool async = true; uint64_t time = daos_get_ntime(); + if (unlikely(sgl_nr > bulk_nr)) { + D_ERROR("Invalid sgl_nr vs bulk_nr: %d/%d\n", sgl_nr, bulk_nr); + return -DER_INVAL; + } + if (remote_bulks == NULL) { D_ERROR("No remote bulks provided\n"); return -DER_INVAL; } - bulk_nr = orw->orw_bulks.ca_count; if (p_arg == NULL) { p_arg = &arg; async = false; @@ -514,7 +516,7 @@ obj_bulk_transfer(crt_rpc_t *rpc, crt_bulk_op_t bulk_op, bool bulk_bind, crt_bul return dss_abterr2der(rc); p_arg->inited = true; - D_DEBUG(DB_IO, "bulk_op %d sgl_nr %d\n", bulk_op, sgl_nr); + D_DEBUG(DB_IO, "bulk_op %d, sgl_nr %d, bulk_nr %d\n", bulk_op, sgl_nr, bulk_nr); p_arg->bulks_inflight++; @@ -542,9 +544,9 @@ obj_bulk_transfer(crt_rpc_t *rpc, crt_bulk_op_t bulk_op, bool bulk_bind, crt_bul while (skips != NULL && isset(skips, i + skip_nr)) skip_nr++; - if (bulk_nr > 0) - D_ASSERTF(i + skip_nr < bulk_nr, "i %d, skip_nr %d, bulk_nr %d\n", - i, skip_nr, bulk_nr); + D_ASSERTF(i + skip_nr < bulk_nr, "i %d, skip_nr %d, sgl_nr %d, bulk_nr %d\n", + i, skip_nr, sgl_nr, bulk_nr); + if (remote_bulks[i + skip_nr] == NULL) continue; @@ -574,6 +576,12 @@ obj_bulk_transfer(crt_rpc_t *rpc, crt_bulk_op_t bulk_op, bool bulk_bind, crt_bul break; } } + + if (skips != NULL) + D_ASSERTF(skip_nr + sgl_nr <= bulk_nr, + "Unmatched skip_nr %d, sgl_nr %d, bulk_nr %d\n", + skip_nr, sgl_nr, bulk_nr); + done: if (--(p_arg->bulks_inflight) == 0) ABT_eventual_set(p_arg->eventual, &rc, sizeof(rc)); @@ -836,7 +844,7 @@ obj_echo_rw(crt_rpc_t *rpc, daos_iod_t *iod, uint64_t *off) /* Only support 1 iod now */ bulk_bind = orw->orw_flags & ORF_BULK_BIND; rc = obj_bulk_transfer(rpc, bulk_op, bulk_bind, orw->orw_bulks.ca_arrays, off, - NULL, DAOS_HDL_INVAL, &p_sgl, 1, NULL, NULL); + NULL, DAOS_HDL_INVAL, &p_sgl, 1, 1, NULL, NULL); out: orwo->orw_ret = rc; orwo->orw_map_version = orw->orw_map_ver; @@ -1636,7 +1644,8 @@ obj_local_rw_internal(crt_rpc_t *rpc, struct obj_io_context *ioc, daos_iod_t *io if (rma) { bulk_bind = orw->orw_flags & ORF_BULK_BIND; rc = obj_bulk_transfer(rpc, bulk_op, bulk_bind, orw->orw_bulks.ca_arrays, offs, - skips, ioh, NULL, iods_nr, NULL, ioc->ioc_coh); + skips, ioh, NULL, iods_nr, orw->orw_bulks.ca_count, NULL, + ioc->ioc_coh); if (rc == 0) { bio_iod_flush(biod); @@ -1809,7 +1818,7 @@ obj_get_iods_offs_by_oid(daos_unit_oid_t uoid, struct obj_iod_array *iod_array, } } if (oiod_nr > LOCAL_SKIP_BITS_NUM || *skips == NULL) { - D_ALLOC(*skips, roundup(oiod_nr / NBBY, 4)); + D_ALLOC(*skips, (oiod_nr + NBBY - 1) / NBBY); if (*skips == NULL) D_GOTO(out, rc = -DER_NOMEM); } @@ -2448,7 +2457,7 @@ ds_obj_ec_rep_handler(crt_rpc_t *rpc) goto end; } rc = obj_bulk_transfer(rpc, CRT_BULK_GET, false, &oer->er_bulk, NULL, NULL, - ioh, NULL, 1, NULL, ioc.ioc_coh); + ioh, NULL, 1, 1, NULL, ioc.ioc_coh); if (rc) D_ERROR(DF_UOID " bulk transfer failed: " DF_RC "\n", DP_UOID(oer->er_oid), DP_RC(rc)); @@ -2526,7 +2535,7 @@ ds_obj_ec_agg_handler(crt_rpc_t *rpc) goto end; } rc = obj_bulk_transfer(rpc, CRT_BULK_GET, false, &oea->ea_bulk, - NULL, NULL, ioh, NULL, 1, NULL, ioc.ioc_coh); + NULL, NULL, ioh, NULL, 1, 1, NULL, ioc.ioc_coh); if (rc) D_ERROR(DF_UOID " bulk transfer failed: " DF_RC "\n", DP_UOID(oea->ea_oid), DP_RC(rc)); @@ -3275,7 +3284,7 @@ obj_enum_reply_bulk(crt_rpc_t *rpc) return 0; rc = obj_bulk_transfer(rpc, CRT_BULK_PUT, false, bulks, NULL, NULL, - DAOS_HDL_INVAL, sgls, idx, NULL, NULL); + DAOS_HDL_INVAL, sgls, idx, idx, NULL, NULL); if (oei->oei_kds_bulk) { D_FREE(oeo->oeo_kds.ca_arrays); oeo->oeo_kds.ca_count = 0; @@ -4560,7 +4569,7 @@ ds_cpd_handle_one(crt_rpc_t *rpc, struct daos_cpd_sub_head *dcsh, struct daos_cp rc = obj_bulk_transfer(rpc, CRT_BULK_GET, dcu->dcu_flags & ORF_BULK_BIND, dcu->dcu_bulks, poffs[i], pskips[i], iohs[i], NULL, - piod_nrs[i], &bulks[i], ioc->ioc_coh); + piod_nrs[i], dcsr->dcsr_nr, &bulks[i], ioc->ioc_coh); if (rc != 0) { D_ERROR("Bulk transfer failed for obj " DF_UOID", DTX "DF_DTI": "DF_RC"\n", @@ -5276,7 +5285,7 @@ ds_obj_cpd_body_bulk(crt_rpc_t *rpc, struct obj_io_context *ioc, bool leader, } rc = obj_bulk_transfer(rpc, CRT_BULK_GET, ORF_BULK_BIND, bulks, NULL, NULL, - DAOS_HDL_INVAL, sgls, count, NULL, ioc->ioc_coh); + DAOS_HDL_INVAL, sgls, count, count, NULL, ioc->ioc_coh); if (rc != 0) goto out;