Skip to content

Commit

Permalink
DAOS-16697 cart: crt_reply_send_input_free() (#15314)
Browse files Browse the repository at this point in the history
- Implement crt_reply_send_input_free() that frees input buffer
during the reply, as opposed to freeing it when rpc is destroyed

- Use new function for some of OBJ rpcs.

Signed-off-by: Alexander A Oganezov <[email protected]>
  • Loading branch information
frostedcmos authored Oct 25, 2024
1 parent df3dd24 commit c30d619
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 30 deletions.
10 changes: 10 additions & 0 deletions src/cart/crt_hg.c
Original file line number Diff line number Diff line change
Expand Up @@ -1489,6 +1489,16 @@ crt_hg_reply_send(struct crt_rpc_priv *rpc_priv)
rc = crt_hgret_2_der(hg_ret);
}

/* Release input buffer */
if (rpc_priv->crp_release_input_early && !rpc_priv->crp_forward) {
hg_ret = HG_Release_input_buf(rpc_priv->crp_hg_hdl);
if (hg_ret != HG_SUCCESS) {
RPC_ERROR(rpc_priv, "HG_Release_input_buf failed, hg_ret: " DF_HG_RC "\n",
DP_HG_RC(hg_ret));
/* Fall through */
}
}

return rc;
}

Expand Down
20 changes: 20 additions & 0 deletions src/cart/crt_rpc.c
Original file line number Diff line number Diff line change
Expand Up @@ -1545,6 +1545,26 @@ crt_req_send(crt_rpc_t *req, crt_cb_t complete_cb, void *arg)
return rc;
}

int
crt_reply_send_input_free(crt_rpc_t *req)
{
struct crt_rpc_priv *rpc_priv = NULL;
int rc = 0;

if (req == NULL) {
D_ERROR("invalid parameter (NULL req).\n");
D_GOTO(out, rc = -DER_INVAL);
}

rpc_priv = container_of(req, struct crt_rpc_priv, crp_pub);
rpc_priv->crp_release_input_early = 1;

return crt_reply_send(req);

out:
return rc;
}

int
crt_reply_send(crt_rpc_t *req)
{
Expand Down
47 changes: 24 additions & 23 deletions src/cart/crt_rpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,29 +166,30 @@ struct crt_rpc_priv {
* match with crp_req_hdr.cch_flags.
*/
uint32_t crp_flags;
uint32_t crp_srv:1, /* flag of server received request */
crp_output_got:1,
crp_input_got:1,
/* flag of collective RPC request */
crp_coll:1,
/* flag of crp_tgt_uri need to be freed */
crp_uri_free:1,
/* flag of forwarded rpc for corpc */
crp_forward:1,
/* flag of in timeout binheap */
crp_in_binheap:1,
/* set if a call to crt_req_reply pending */
crp_reply_pending:1,
/* set to 1 if target ep is set */
crp_have_ep:1,
/* RPC is tracked by the context */
crp_ctx_tracked:1,
/* 1 if RPC fails HLC epsilon check */
crp_fail_hlc:1,
/* RPC completed flag */
crp_completed:1,
/* RPC originated from a primary provider */
crp_src_is_primary:1;
uint32_t crp_srv : 1, /* flag of server received request */
crp_output_got : 1, crp_input_got : 1,
/* flag of collective RPC request */
crp_coll : 1,
/* flag of crp_tgt_uri need to be freed */
crp_uri_free : 1,
/* flag of forwarded rpc for corpc */
crp_forward : 1,
/* flag of in timeout binheap */
crp_in_binheap : 1,
/* set if a call to crt_req_reply pending */
crp_reply_pending : 1,
/* set to 1 if target ep is set */
crp_have_ep : 1,
/* RPC is tracked by the context */
crp_ctx_tracked : 1,
/* 1 if RPC fails HLC epsilon check */
crp_fail_hlc : 1,
/* RPC completed flag */
crp_completed : 1,
/* RPC originated from a primary provider */
crp_src_is_primary : 1,
/* release input buffer early */
crp_release_input_early : 1;

struct crt_opc_info *crp_opc_info;
/* corpc info, only valid when (crp_coll == 1) */
Expand Down
2 changes: 1 addition & 1 deletion src/dtx/dtx_srv.c
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ dtx_handler(crt_rpc_t *rpc)
dout->do_status = rc;
/* For DTX_COMMIT, it is the count of real committed DTX entries. */
dout->do_misc = committed;
rc = crt_reply_send(rpc);
rc = crt_reply_send_input_free(rpc);
if (rc != 0)
D_ERROR("send reply failed for DTX rpc %u: rc = "DF_RC"\n", opc,
DP_RC(rc));
Expand Down
15 changes: 15 additions & 0 deletions src/include/cart/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,21 @@ crt_req_send(crt_rpc_t *req, crt_cb_t complete_cb, void *arg);
int
crt_reply_send(crt_rpc_t *req);

/**
* Send an RPC reply and free the input buffer immediately.
* Only to be called on the server side.
*
* \param[in] req pointer to RPC request
*
* \return DER_SUCCESS on success, negative value if error
*
* \note the crt_rpc_t is exported to user, caller should fill the
* crt_rpc_t::cr_output before sending the RPC reply.
* See \ref crt_req_create.
*/
int
crt_reply_send_input_free(crt_rpc_t *req);

/**
* Return request buffer
*
Expand Down
22 changes: 16 additions & 6 deletions src/object/srv_obj.c
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ obj_rw_complete(crt_rpc_t *rpc, struct obj_io_context *ioc,
}

static void
obj_rw_reply(crt_rpc_t *rpc, int status, uint64_t epoch,
obj_rw_reply(crt_rpc_t *rpc, int status, uint64_t epoch, bool release_input,
struct obj_io_context *ioc)
{
struct obj_rw_out *orwo = crt_reply_get(rpc);
Expand All @@ -187,7 +187,11 @@ obj_rw_reply(crt_rpc_t *rpc, int status, uint64_t epoch,
ioc->ioc_map_ver, orwo->orw_epoch, status);

if (!ioc->ioc_lost_reply) {
rc = crt_reply_send(rpc);
if (release_input)
rc = crt_reply_send_input_free(rpc);
else
rc = crt_reply_send(rpc);

if (rc != 0)
D_ERROR("send reply failed: "DF_RC"\n", DP_RC(rc));
} else {
Expand Down Expand Up @@ -2079,6 +2083,8 @@ obj_ioc_init(uuid_t pool_uuid, uuid_t coh_uuid, uuid_t cont_uuid, crt_rpc_t *rpc

D_ASSERT(ioc != NULL);
memset(ioc, 0, sizeof(*ioc));

crt_req_addref(rpc);
ioc->ioc_rpc = rpc;
ioc->ioc_opc = opc_get(rpc->cr_opc);
rc = ds_cont_find_hdl(pool_uuid, coh_uuid, &coh);
Expand Down Expand Up @@ -2154,6 +2160,10 @@ obj_ioc_fini(struct obj_io_context *ioc, int err)
ds_cont_child_put(ioc->ioc_coc);
ioc->ioc_coc = NULL;
}
if (ioc->ioc_rpc) {
crt_req_decref(ioc->ioc_rpc);
ioc->ioc_rpc = NULL;
}
}

/* Setup lite IO context, it is only for compound RPC so far:
Expand Down Expand Up @@ -2508,7 +2518,7 @@ ds_obj_ec_rep_handler(crt_rpc_t *rpc)
rc = vos_obj_array_remove(ioc.ioc_coc->sc_hdl, oer->er_oid, &oer->er_epoch_range, dkey,
&iod->iod_name, &recx);
out:
obj_rw_reply(rpc, rc, 0, &ioc);
obj_rw_reply(rpc, rc, 0, false, &ioc);
obj_ioc_end(&ioc, rc);
}

Expand Down Expand Up @@ -2604,7 +2614,7 @@ ds_obj_ec_agg_handler(crt_rpc_t *rpc)
D_ERROR(DF_UOID ": array_remove failed: " DF_RC "\n", DP_UOID(oea->ea_oid),
DP_RC(rc1));
out:
obj_rw_reply(rpc, rc, 0, &ioc);
obj_rw_reply(rpc, rc, 0, false, &ioc);
obj_ioc_end(&ioc, rc);
}

Expand Down Expand Up @@ -2733,7 +2743,7 @@ ds_obj_tgt_update_handler(crt_rpc_t *rpc)
out:
if (dth != NULL)
rc = dtx_end(dth, ioc.ioc_coc, rc);
obj_rw_reply(rpc, rc, 0, &ioc);
obj_rw_reply(rpc, rc, 0, true, &ioc);
D_FREE(mbs);
obj_ioc_end(&ioc, rc);
}
Expand Down Expand Up @@ -3073,7 +3083,7 @@ ds_obj_rw_handler(crt_rpc_t *rpc)
if (ioc.ioc_map_ver < max_ver)
ioc.ioc_map_ver = max_ver;

obj_rw_reply(rpc, rc, epoch.oe_value, &ioc);
obj_rw_reply(rpc, rc, epoch.oe_value, false, &ioc);
D_FREE(mbs);
D_FREE(dti_cos);
obj_ioc_end(&ioc, rc);
Expand Down

0 comments on commit c30d619

Please sign in to comment.