From b95ef018191e1b940657910124a45d67422a6fa5 Mon Sep 17 00:00:00 2001 From: Niu Yawei Date: Mon, 9 Sep 2024 11:07:49 +0800 Subject: [PATCH] DAOS-10877 vos: gang allocation for huge SV (#14790) To avoid allocation failure on a fragmented system, huge SV allocation will be split into multiple smaller allocations, each allocation size is capped to 8MB (the DMA chunk size, that could avoid huge DMA buffer allocation). The address of such scattered SV payload is represented by 'gang address'. Removed io_allocbuf_failure() vos unit test, it's not applicable in gang SV mode now. Signed-off-by: Niu Yawei --- src/bio/bio_buffer.c | 6 +- src/bio/bio_bulk.c | 3 +- src/bio/bio_xstream.c | 3 +- src/include/daos/common.h | 2 +- src/include/daos_srv/bio.h | 84 ++++++++- src/include/daos_srv/vos_types.h | 3 + src/object/srv_enum.c | 5 +- src/vos/tests/vts_io.c | 118 ++++++------ src/vos/vos_common.c | 1 + src/vos/vos_internal.h | 37 ++-- src/vos/vos_io.c | 315 ++++++++++++++++++++++--------- src/vos/vos_layout.h | 5 +- src/vos/vos_obj.c | 2 + src/vos/vos_pool.c | 4 + src/vos/vos_space.c | 19 +- src/vos/vos_tree.c | 68 ++++++- 16 files changed, 473 insertions(+), 202 deletions(-) diff --git a/src/bio/bio_buffer.c b/src/bio/bio_buffer.c index 551c13db40e..1f6baae521b 100644 --- a/src/bio/bio_buffer.c +++ b/src/bio/bio_buffer.c @@ -34,11 +34,6 @@ dma_alloc_chunk(unsigned int cnt) D_ASSERT(bytes > 0); - if (DAOS_FAIL_CHECK(DAOS_NVME_ALLOCBUF_ERR)) { - D_ERROR("Injected DMA buffer allocation error.\n"); - return NULL; - } - D_ALLOC_PTR(chunk); if (chunk == NULL) { return NULL; @@ -848,6 +843,7 @@ dma_map_one(struct bio_desc *biod, struct bio_iov *biov, void *arg) bio_iov_set_raw_buf(biov, NULL); return 0; } + D_ASSERT(!BIO_ADDR_IS_GANG(&biov->bi_addr)); if (direct_scm_access(biod, biov)) { struct umem_instance *umem = biod->bd_umem; diff --git a/src/bio/bio_bulk.c b/src/bio/bio_bulk.c index 05d3c5624c8..059401460ee 100644 --- a/src/bio/bio_bulk.c +++ b/src/bio/bio_bulk.c @@ -1,5 +1,5 @@ /** - * (C) Copyright 2021-2022 Intel Corporation. + * (C) Copyright 2021-2024 Intel Corporation. * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -640,6 +640,7 @@ bulk_map_one(struct bio_desc *biod, struct bio_iov *biov, void *data) goto done; } D_ASSERT(!BIO_ADDR_IS_DEDUP(&biov->bi_addr)); + D_ASSERT(!BIO_ADDR_IS_GANG(&biov->bi_addr)); hdl = bulk_get_hdl(biod, biov, roundup_pgs(pg_cnt), pg_off, arg); if (hdl == NULL) { diff --git a/src/bio/bio_xstream.c b/src/bio/bio_xstream.c index 4bba7359a7e..a5a1868e811 100644 --- a/src/bio/bio_xstream.c +++ b/src/bio/bio_xstream.c @@ -30,7 +30,6 @@ /* SPDK blob parameters */ #define DAOS_BS_CLUSTER_SZ (1ULL << 25) /* 32MB */ /* DMA buffer parameters */ -#define DAOS_DMA_CHUNK_MB 8 /* 8MB DMA chunks */ #define DAOS_DMA_CHUNK_CNT_INIT 24 /* Per-xstream init chunks, 192MB */ #define DAOS_DMA_CHUNK_CNT_MAX 128 /* Per-xstream max chunks, 1GB */ #define DAOS_DMA_CHUNK_CNT_MIN 32 /* Per-xstream min chunks, 256MB */ @@ -207,7 +206,7 @@ bio_nvme_init(const char *nvme_conf, int numa_node, unsigned int mem_size, { char *env; int rc, fd; - unsigned int size_mb = DAOS_DMA_CHUNK_MB; + unsigned int size_mb = BIO_DMA_CHUNK_MB; if (tgt_nr <= 0) { D_ERROR("tgt_nr: %u should be > 0\n", tgt_nr); diff --git a/src/include/daos/common.h b/src/include/daos/common.h index f3e7c172f6a..6bad86f91b8 100644 --- a/src/include/daos/common.h +++ b/src/include/daos/common.h @@ -851,7 +851,7 @@ enum { #define DAOS_NVME_FAULTY (DAOS_FAIL_UNIT_TEST_GROUP_LOC | 0x50) #define DAOS_NVME_WRITE_ERR (DAOS_FAIL_UNIT_TEST_GROUP_LOC | 0x51) #define DAOS_NVME_READ_ERR (DAOS_FAIL_UNIT_TEST_GROUP_LOC | 0x52) -#define DAOS_NVME_ALLOCBUF_ERR (DAOS_FAIL_UNIT_TEST_GROUP_LOC | 0x53) +#define DAOS_NVME_ALLOCBUF_ERR (DAOS_FAIL_UNIT_TEST_GROUP_LOC | 0x53) /* deprecated */ #define DAOS_NVME_WAL_TX_LOST (DAOS_FAIL_UNIT_TEST_GROUP_LOC | 0x54) #define DAOS_POOL_CREATE_FAIL_CORPC (DAOS_FAIL_UNIT_TEST_GROUP_LOC | 0x60) diff --git a/src/include/daos_srv/bio.h b/src/include/daos_srv/bio.h index a69f456e232..c32202a1b19 100644 --- a/src/include/daos_srv/bio.h +++ b/src/include/daos_srv/bio.h @@ -29,30 +29,47 @@ ((addr)->ba_flags &= ~(BIO_FLAG_DEDUP_BUF)) #define BIO_ADDR_IS_CORRUPTED(addr) ((addr)->ba_flags & BIO_FLAG_CORRUPTED) #define BIO_ADDR_SET_CORRUPTED(addr) ((addr)->ba_flags |= BIO_FLAG_CORRUPTED) +#define BIO_ADDR_IS_GANG(addr) ((addr)->ba_flags & BIO_FLAG_GANG) +#define BIO_ADDR_SET_GANG(addr) ((addr)->ba_flags |= BIO_FLAG_GANG) /* Can support up to 16 flags for a BIO address */ enum BIO_FLAG { /* The address is a hole */ BIO_FLAG_HOLE = (1 << 0), - /* The address is a deduped extent */ + /* The address is a deduped extent, transient only flag */ BIO_FLAG_DEDUP = (1 << 1), - /* The address is a buffer for dedup verify */ + /* The address is a buffer for dedup verify, transient only flag */ BIO_FLAG_DEDUP_BUF = (1 << 2), + /* The data located on the address is marked as corrupted */ BIO_FLAG_CORRUPTED = (1 << 3), + /* The address is a gang address */ + BIO_FLAG_GANG = (1 << 4), }; +#define BIO_DMA_CHUNK_MB 8 /* 8MB DMA chunks */ + +/** + * It's used to represent an address on SCM, or an address on NVMe, or a gang address. + * + * The gang address consists of N addresses from scattered allocations, the scattered + * allocations could have different size and media type, they are compactly stored on + * the SCM pointing by 'ba_off' as following: + * + * N 64bits offsets, N 32bits sizes, N 8bits media types + */ typedef struct { /* - * Byte offset within PMDK pmemobj pool for SCM; + * Byte offset within PMDK pmemobj pool for SCM or gang address; * Byte offset within SPDK blob for NVMe. */ uint64_t ba_off; /* DAOS_MEDIA_SCM or DAOS_MEDIA_NVME */ uint8_t ba_type; - uint8_t ba_pad1; + /* Number of addresses when BIO_FLAG_GANG is set */ + uint8_t ba_gang_nr; /* See BIO_FLAG enum */ uint16_t ba_flags; - uint32_t ba_pad2; + uint32_t ba_pad; } bio_addr_t; struct sys_db; @@ -127,8 +144,63 @@ enum bio_bs_state { BIO_BS_STATE_SETUP, }; +/* Size for storing N offset + size + metia_type */ +static inline unsigned int +bio_gaddr_size(uint8_t gang_nr) +{ + unsigned int size; + + if (gang_nr == 0) + return 0; + + size = sizeof(uint64_t) + sizeof(uint32_t) + sizeof(uint8_t); + return roundup(size * gang_nr, sizeof(uint64_t)); +} + +static inline void +bio_gaddr_set(struct umem_instance *umm, bio_addr_t *gaddr, int i, + uint8_t type, uint32_t len, uint64_t off) +{ + uint8_t *ptr; + unsigned int ptr_off; + + D_ASSERT(BIO_ADDR_IS_GANG(gaddr)); + D_ASSERT(i < gaddr->ba_gang_nr); + ptr = umem_off2ptr(umm, gaddr->ba_off); + + ptr_off = sizeof(uint64_t) * i; + *((uint64_t *)(ptr + ptr_off)) = off; + + ptr_off = sizeof(uint64_t) * gaddr->ba_gang_nr + sizeof(uint32_t) * i; + *((uint32_t *)(ptr + ptr_off)) = len; + + ptr_off = (sizeof(uint64_t) + sizeof(uint32_t)) * gaddr->ba_gang_nr + i; + *(ptr + ptr_off) = type; +} + +static inline void +bio_gaddr_get(struct umem_instance *umm, bio_addr_t *gaddr, int i, + uint8_t *type, uint32_t *len, uint64_t *off) +{ + uint8_t *ptr; + unsigned int ptr_off; + + D_ASSERT(BIO_ADDR_IS_GANG(gaddr)); + D_ASSERT(i < gaddr->ba_gang_nr); + ptr = umem_off2ptr(umm, gaddr->ba_off); + + ptr_off = sizeof(uint64_t) * i; + *off = *((uint64_t *)(ptr + ptr_off)); + + ptr_off = sizeof(uint64_t) * gaddr->ba_gang_nr + sizeof(uint32_t) * i; + *len = *((uint32_t *)(ptr + ptr_off)); + + ptr_off = (sizeof(uint64_t) + sizeof(uint32_t)) * gaddr->ba_gang_nr + i; + *type = *(ptr + ptr_off); +} + static inline void -bio_addr_set(bio_addr_t *addr, uint16_t type, uint64_t off) +bio_addr_set(bio_addr_t *addr, uint8_t type, uint64_t off) { addr->ba_type = type; addr->ba_off = umem_off2offset(off); diff --git a/src/include/daos_srv/vos_types.h b/src/include/daos_srv/vos_types.h index 194b2434c28..b57220f9a7c 100644 --- a/src/include/daos_srv/vos_types.h +++ b/src/include/daos_srv/vos_types.h @@ -21,6 +21,7 @@ #define VOS_POOL_DF_2_2 24 #define VOS_POOL_DF_2_4 25 #define VOS_POOL_DF_2_6 26 +#define VOS_POOL_DF_2_8 28 struct dtx_rsrvd_uint { void *dru_scm; @@ -299,6 +300,8 @@ enum { VOS_POOL_FEAT_EMBED_FIRST = (1ULL << 3), /** Flat DKEY support enabled */ VOS_POOL_FEAT_FLAT_DKEY = (1ULL << 4), + /** Gang address for SV support */ + VOS_POOL_FEAT_GANG_SV = (1ULL << 5), }; /** Mask for any conditionals passed to to the fetch */ diff --git a/src/object/srv_enum.c b/src/object/srv_enum.c index bb9c49d0566..e1513f02f7f 100644 --- a/src/object/srv_enum.c +++ b/src/object/srv_enum.c @@ -1,5 +1,5 @@ /* - * (C) Copyright 2018-2022 Intel Corporation. + * (C) Copyright 2018-2024 Intel Corporation. * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -617,7 +617,8 @@ fill_rec(daos_handle_t ih, vos_iter_entry_t *key_ent, struct ds_obj_enum_arg *ar * enum pack implementation doesn't support yield & re-probe. */ if (arg->inline_thres > 0 && data_size <= arg->inline_thres && - data_size > 0 && bio_iov2media(&key_ent->ie_biov) != DAOS_MEDIA_NVME) { + data_size > 0 && bio_iov2media(&key_ent->ie_biov) != DAOS_MEDIA_NVME && + !BIO_ADDR_IS_GANG(&key_ent->ie_biov.bi_addr)) { inline_data = true; size += data_size; } diff --git a/src/vos/tests/vts_io.c b/src/vos/tests/vts_io.c index 93bb20d4906..2f084a2d99d 100644 --- a/src/vos/tests/vts_io.c +++ b/src/vos/tests/vts_io.c @@ -2955,78 +2955,90 @@ io_query_key_negative(void **state) assert_rc_equal(rc, -DER_INVAL); } -static inline int -dummy_bulk_create(void *ctxt, d_sg_list_t *sgl, unsigned int perm, void **bulk_hdl) +static int +gang_sv_io(struct io_test_args *arg, daos_epoch_t epoch, char *dkey_buf, char *akey_buf, + char *update_buf, char *fetch_buf, daos_size_t rsize) { - return 0; -} + daos_iod_t iod = { 0 }; + daos_key_t dkey, akey; + d_iov_t val_iov; + d_sg_list_t sgl = { 0 }; + int rc; + + set_iov(&dkey, dkey_buf, is_daos_obj_type_set(arg->otype, DAOS_OT_DKEY_UINT64)); + set_iov(&akey, akey_buf, is_daos_obj_type_set(arg->otype, DAOS_OT_AKEY_UINT64)); + + iod.iod_name = akey; + iod.iod_type = DAOS_IOD_SINGLE; + iod.iod_size = rsize; + iod.iod_nr = 1; + + dts_buf_render(update_buf, rsize); + d_iov_set(&val_iov, update_buf, rsize); + sgl.sg_nr = 1; + sgl.sg_iovs = &val_iov; + + rc = io_test_obj_update(arg, epoch, 0, &dkey, &iod, &sgl, NULL, true); + if (rc) + return rc; + + memset(fetch_buf, 0, rsize); + d_iov_set(&val_iov, fetch_buf, rsize); + iod.iod_size = DAOS_REC_ANY; + + rc = io_test_obj_fetch(arg, epoch, 0, &dkey, &iod, &sgl, true); + if (rc) + return rc; + + /* Verify */ + assert_int_equal(iod.iod_size, rsize); + assert_memory_equal(update_buf, fetch_buf, rsize); -static inline int -dummy_bulk_free(void *bulk_hdl) -{ return 0; } -/* Verify the fix of DAOS-10748 */ static void -io_allocbuf_failure(void **state) +gang_sv_test(void **state) { struct io_test_args *arg = *state; - char dkey_buf[UPDATE_DKEY_SIZE] = { 0 }; - char akey_buf[UPDATE_AKEY_SIZE] = { 0 }; - daos_iod_t iod = { 0 }; - d_sg_list_t sgl = { 0 }; - daos_key_t dkey_iov, akey_iov; - daos_epoch_t epoch = 1; - char *buf; - daos_handle_t ioh; - int fake_ctxt; - daos_size_t buf_len = (40UL << 20); /* 40MB, larger than DMA chunk size */ - int rc; + char dkey_buf[UPDATE_DKEY_SIZE], akey_buf[UPDATE_AKEY_SIZE]; + char *update_buf, *fetch_buf; + daos_size_t rsize = (27UL << 20); /* 27MB */ + daos_epoch_t epoch = 1; + int rc; + + D_ALLOC(update_buf, rsize); + assert_non_null(update_buf); - FAULT_INJECTION_REQUIRED(); + D_ALLOC(fetch_buf, rsize); + assert_non_null(fetch_buf); vts_key_gen(&dkey_buf[0], arg->dkey_size, true, arg); vts_key_gen(&akey_buf[0], arg->akey_size, false, arg); - set_iov(&dkey_iov, &dkey_buf[0], is_daos_obj_type_set(arg->otype, DAOS_OT_DKEY_UINT64)); - set_iov(&akey_iov, &akey_buf[0], is_daos_obj_type_set(arg->otype, DAOS_OT_AKEY_UINT64)); - rc = d_sgl_init(&sgl, 1); + print_message("Gang SV update/fetch.\n"); + rc = gang_sv_io(arg, epoch, dkey_buf, akey_buf, update_buf, fetch_buf, rsize); assert_rc_equal(rc, 0); - D_ALLOC(buf, buf_len); - assert_non_null(buf); - - sgl.sg_iovs[0].iov_buf = buf; - sgl.sg_iovs[0].iov_buf_len = buf_len; - sgl.sg_iovs[0].iov_len = buf_len; - - iod.iod_name = akey_iov; - iod.iod_nr = 1; - iod.iod_type = DAOS_IOD_SINGLE; - iod.iod_size = buf_len; - iod.iod_recxs = NULL; - + print_message("Gang SV ZC update/fetch.\n"); + epoch++; arg->ta_flags |= TF_ZERO_COPY; - - bio_register_bulk_ops(dummy_bulk_create, dummy_bulk_free); - daos_fail_loc_set(DAOS_NVME_ALLOCBUF_ERR | DAOS_FAIL_ONCE); - - rc = vos_update_begin(arg->ctx.tc_co_hdl, arg->oid, epoch, 0, &dkey_iov, - 1, &iod, NULL, 0, &ioh, NULL); + rc = gang_sv_io(arg, epoch, dkey_buf, akey_buf, update_buf, fetch_buf, rsize); assert_rc_equal(rc, 0); - rc = bio_iod_prep(vos_ioh2desc(ioh), BIO_CHK_TYPE_IO, (void *)&fake_ctxt, 0); - assert_rc_equal(rc, -DER_NOMEM); - daos_fail_loc_set(0); - bio_register_bulk_ops(NULL, NULL); + print_message("Gang SV update/fetch with CSUM.\n"); + epoch++; + arg->ta_flags &= ~TF_ZERO_COPY; + arg->ta_flags |= TF_USE_CSUMS; + rc = gang_sv_io(arg, epoch, dkey_buf, akey_buf, update_buf, fetch_buf, rsize); + assert_rc_equal(rc, 0); - rc = vos_update_end(ioh, 0, &dkey_iov, rc, NULL, NULL); - assert_rc_equal(rc, -DER_NOMEM); + print_message("Gang SV overwrite with CSUM.\n"); + rc = gang_sv_io(arg, epoch, dkey_buf, akey_buf, update_buf, fetch_buf, rsize); + assert_rc_equal(rc, 0); - d_sgl_fini(&sgl, false); - D_FREE(buf); - arg->ta_flags &= ~TF_ZERO_COPY; + D_FREE(update_buf); + D_FREE(fetch_buf); } static const struct CMUnitTest iterator_tests[] = { @@ -3074,7 +3086,7 @@ static const struct CMUnitTest int_tests[] = { NULL}, {"VOS300.2: Key query test", io_query_key, NULL, NULL}, {"VOS300.3: Key query negative test", io_query_key_negative, NULL, NULL}, - {"VOS300.4: Return error on DMA buffer allocation failure", io_allocbuf_failure, NULL, NULL}, + {"VOS300.4: Gang SV update/fetch test", gang_sv_test, NULL, NULL}, }; static int diff --git a/src/vos/vos_common.c b/src/vos/vos_common.c index dbb8d28fd04..cd2f2a5a693 100644 --- a/src/vos/vos_common.c +++ b/src/vos/vos_common.c @@ -123,6 +123,7 @@ vos_bio_addr_free(struct vos_pool *pool, bio_addr_t *addr, daos_size_t nob) if (bio_addr_is_hole(addr)) return 0; + D_ASSERT(!BIO_ADDR_IS_GANG(addr)); if (addr->ba_type == DAOS_MEDIA_SCM) { rc = umem_free(&pool->vp_umm, addr->ba_off); } else { diff --git a/src/vos/vos_internal.h b/src/vos/vos_internal.h index f6a74fce7e6..9441ba45265 100644 --- a/src/vos/vos_internal.h +++ b/src/vos/vos_internal.h @@ -918,33 +918,31 @@ static inline void vos_irec_init_csum(struct vos_irec_df *irec, } } -/** Size of metadata without user payload */ -static inline uint64_t -vos_irec_msize(struct vos_rec_bundle *rbund) +#define VOS_GANG_SIZE_THRESH (BIO_DMA_CHUNK_MB << 20) /* 8MB */ + +static inline unsigned int +vos_irec_gang_nr(struct vos_pool *pool, daos_size_t rsize) { - uint64_t size = 0; + if (pool->vp_feats & VOS_POOL_FEAT_GANG_SV) { + if (rsize > VOS_GANG_SIZE_THRESH) + return (rsize + VOS_GANG_SIZE_THRESH - 1) / VOS_GANG_SIZE_THRESH; + } - if (rbund->rb_csum != NULL) - size = vos_size_round(rbund->rb_csum->cs_len); - return size + sizeof(struct vos_irec_df); + return 0; } +/** Size of metadata without user payload */ static inline uint64_t -vos_irec_size(struct vos_rec_bundle *rbund) +vos_irec_msize(struct vos_pool *pool, struct vos_rec_bundle *rbund) { - return vos_irec_msize(rbund) + rbund->rb_rsize; -} + uint64_t size = sizeof(struct vos_irec_df); -static inline bool -vos_irec_size_equal(struct vos_irec_df *irec, struct vos_rec_bundle *rbund) -{ - if (irec->ir_size != rbund->rb_rsize) - return false; + if (rbund->rb_csum != NULL) + size += vos_size_round(rbund->rb_csum->cs_len); - if (vos_irec2csum_size(irec) != rbund->rb_csum->cs_len) - return false; + size += bio_gaddr_size(vos_irec_gang_nr(pool, rbund->rb_rsize)); - return true; + return size; } static inline char * @@ -1300,9 +1298,6 @@ int key_tree_delete(struct vos_object *obj, daos_handle_t toh, d_iov_t *key_iov); /* vos_io.c */ -daos_size_t -vos_recx2irec_size(daos_size_t rsize, struct dcs_csum_info *csum); - int vos_dedup_init(struct vos_pool *pool); void diff --git a/src/vos/vos_io.c b/src/vos/vos_io.c index 4d452f50d6a..7aa3c897755 100644 --- a/src/vos/vos_io.c +++ b/src/vos/vos_io.c @@ -20,6 +20,11 @@ #include "evt_priv.h" #include +struct vos_sv_addr { + umem_off_t sa_umoff; /* SV record address */ + bio_addr_t sa_addr; /* SV payload address */ +}; + /** I/O context */ struct vos_io_context { EVT_ENT_ARRAY_LG_PTR(ic_ent_array); @@ -49,7 +54,10 @@ struct vos_io_context { /** reserved offsets for SCM update */ umem_off_t *ic_umoffs; unsigned int ic_umoffs_cnt; - unsigned int ic_umoffs_at; + /** reserved SV addresses */ + struct vos_sv_addr *ic_sv_addrs; + unsigned int ic_sv_addr_cnt; + unsigned int ic_sv_addr_at; /** reserved NVMe extents */ d_list_t ic_blk_exts; daos_size_t ic_space_held[DAOS_MEDIA_MAX]; @@ -518,6 +526,7 @@ vos_ioc_reserve_fini(struct vos_io_context *ioc) D_ASSERT(d_list_empty(&ioc->ic_blk_exts)); D_ASSERT(d_list_empty(&ioc->ic_dedup_entries)); D_FREE(ioc->ic_umoffs); + D_FREE(ioc->ic_sv_addrs); } static int @@ -525,6 +534,7 @@ vos_ioc_reserve_init(struct vos_io_context *ioc, struct dtx_handle *dth) { struct umem_rsrvd_act *scm; int total_acts = 0; + unsigned int gang_nr, sv_nr = 0; int i; if (!ioc->ic_update) @@ -533,9 +543,26 @@ vos_ioc_reserve_init(struct vos_io_context *ioc, struct dtx_handle *dth) for (i = 0; i < ioc->ic_iod_nr; i++) { daos_iod_t *iod = &ioc->ic_iods[i]; + if (iod->iod_type == DAOS_IOD_SINGLE) { + gang_nr = vos_irec_gang_nr(ioc->ic_cont->vc_pool, iod->iod_size); + if (gang_nr > UINT8_MAX) { + D_ERROR("Too large SV:"DF_U64", gang_nr:%u\n", + iod->iod_size, gang_nr); + return -DER_REC2BIG; + } + total_acts += gang_nr; + sv_nr++; + } total_acts += iod->iod_nr; } + if (sv_nr > 0) { + D_ALLOC_ARRAY(ioc->ic_sv_addrs, sv_nr); + if (ioc->ic_sv_addrs == NULL) + return -DER_NOMEM; + ioc->ic_sv_addr_cnt = sv_nr; + } + D_ALLOC_ARRAY(ioc->ic_umoffs, total_acts); if (ioc->ic_umoffs == NULL) return -DER_NOMEM; @@ -684,7 +711,7 @@ vos_ioc_create(daos_handle_t coh, daos_unit_oid_t oid, bool read_only, ioc->ic_remove = ((vos_flags & VOS_OF_REMOVE) != 0); ioc->ic_ec = ((vos_flags & VOS_OF_EC) != 0); ioc->ic_rebuild = ((vos_flags & VOS_OF_REBUILD) != 0); - ioc->ic_umoffs_cnt = ioc->ic_umoffs_at = 0; + ioc->ic_umoffs_cnt = 0; ioc->ic_iod_csums = iod_csums; vos_ilog_fetch_init(&ioc->ic_dkey_info); vos_ilog_fetch_init(&ioc->ic_akey_info); @@ -740,13 +767,26 @@ vos_ioc_create(daos_handle_t coh, daos_unit_oid_t oid, bool read_only, for (i = 0; i < iod_nr; i++) { int iov_nr = iods[i].iod_nr; + unsigned int gang_nr; struct bio_sglist *bsgl; - if ((iods[i].iod_type == DAOS_IOD_SINGLE && iov_nr != 1)) { - D_ERROR("Invalid iod_nr=%d, iod_type %d.\n", - iov_nr, iods[i].iod_type); - rc = -DER_IO_INVAL; - goto error; + if (iods[i].iod_type == DAOS_IOD_SINGLE) { + if (iov_nr != 1) { + D_ERROR("Invalid iod_nr=%d, iod_type %d.\n", + iov_nr, iods[i].iod_type); + rc = -DER_IO_INVAL; + goto error; + } + + gang_nr = vos_irec_gang_nr(cont->vc_pool, iods[i].iod_size); + if (gang_nr > UINT8_MAX) { + D_ERROR("Too large SV:"DF_U64", gang_nr:%u\n", + iods[i].iod_size, gang_nr); + rc = -DER_REC2BIG; + goto error; + } + if (gang_nr > 1) + iov_nr = gang_nr; } /* Don't bother to initialize SGLs for size fetch */ @@ -819,6 +859,55 @@ save_csum(struct vos_io_context *ioc, struct dcs_csum_info *csum_info, return dcs_csum_info_save(&ioc->ic_csum_list, &ci_duplicate); } +static int +iod_gang_fetch(struct vos_io_context *ioc, struct bio_iov *biov) +{ + struct bio_iov sub_iov = { 0 }; + uint64_t tot_len; + uint32_t data_len; + int i, rc = 0; + + if (ioc->ic_size_fetch) + return 0; + + if (biov->bi_addr.ba_gang_nr < 2) { + D_ERROR("Invalid gang address nr:%u\n", biov->bi_addr.ba_gang_nr); + return -DER_INVAL; + } + + tot_len = bio_iov2len(biov); + if (tot_len == 0) { + D_ERROR("Invalid gang addr, nr:%u, rsize:"DF_U64"\n", + biov->bi_addr.ba_gang_nr, bio_iov2len(biov)); + return -DER_INVAL; + } + + for (i = 0; i < biov->bi_addr.ba_gang_nr; i++) { + bio_gaddr_get(vos_ioc2umm(ioc), &biov->bi_addr, i, &sub_iov.bi_addr.ba_type, + &data_len, &sub_iov.bi_addr.ba_off); + + bio_iov_set_len(&sub_iov, data_len); + if (tot_len < data_len) { + D_ERROR("Invalid gang addr[%d], nr:%u, rsize:"DF_U64", len:"DF_U64"/%u\n", + i, biov->bi_addr.ba_gang_nr, bio_iov2len(biov), tot_len, data_len); + return -DER_INVAL; + } + tot_len -= data_len; + + rc = iod_fetch(ioc, &sub_iov); + if (rc) + return rc; + } + + if (tot_len != 0) { + D_ERROR("Invalid gang addr, nr:%u, rsize:"DF_U64", left:"DF_U64"\n", + biov->bi_addr.ba_gang_nr, bio_iov2len(biov), tot_len); + return -DER_INVAL; + } + + return 0; +} + /** Fetch the single value within the specified epoch range of an key */ static int akey_fetch_single(daos_handle_t toh, const daos_epoch_range_t *epr, @@ -873,7 +962,11 @@ akey_fetch_single(daos_handle_t toh, const daos_epoch_range_t *epr, return -DER_CSUM; } - rc = iod_fetch(ioc, &biov); + if (BIO_ADDR_IS_HOLE(&biov.bi_addr) || !BIO_ADDR_IS_GANG(&biov.bi_addr)) + rc = iod_fetch(ioc, &biov); + else + rc = iod_gang_fetch(ioc, &biov); + if (rc != 0) goto out; @@ -1612,21 +1705,6 @@ vos_fetch_begin(daos_handle_t coh, daos_unit_oid_t oid, daos_epoch_t epoch, return rc; } -static umem_off_t -iod_update_umoff(struct vos_io_context *ioc) -{ - umem_off_t umoff; - - D_ASSERTF(ioc->ic_umoffs_at < ioc->ic_umoffs_cnt, - "Invalid ioc_reserve at/cnt: %u/%u\n", - ioc->ic_umoffs_at, ioc->ic_umoffs_cnt); - - umoff = ioc->ic_umoffs[ioc->ic_umoffs_at]; - ioc->ic_umoffs_at++; - - return umoff; -} - static struct bio_iov * iod_update_biov(struct vos_io_context *ioc) { @@ -1643,6 +1721,20 @@ iod_update_biov(struct vos_io_context *ioc) return biov; } +static inline struct vos_sv_addr * +iod_get_sv_addr(struct vos_io_context *ioc) +{ + struct vos_sv_addr *sv_addr; + + D_ASSERTF(ioc->ic_sv_addr_at < ioc->ic_sv_addr_cnt, "sv_at:%u >= sv_cnt:%u\n", + ioc->ic_sv_addr_at, ioc->ic_sv_addr_cnt); + + sv_addr = &ioc->ic_sv_addrs[ioc->ic_sv_addr_at]; + ioc->ic_sv_addr_at++; + + return sv_addr; +} + static int akey_update_single(daos_handle_t toh, uint32_t pm_ver, daos_size_t rsize, daos_size_t gsize, struct vos_io_context *ioc, @@ -1652,22 +1744,22 @@ akey_update_single(daos_handle_t toh, uint32_t pm_ver, daos_size_t rsize, struct vos_rec_bundle rbund; struct dcs_csum_info csum; d_iov_t kiov, riov; - struct bio_iov *biov; + struct bio_iov biov; struct dcs_csum_info *value_csum; - umem_off_t umoff; + struct vos_sv_addr *sv_addr; daos_epoch_t epoch = ioc->ic_epr.epr_hi; int rc; + D_ASSERT(ioc->ic_iov_at == 0); + ci_set_null(&csum); d_iov_set(&kiov, &key, sizeof(key)); key.sk_epoch = epoch; key.sk_minor_epc = minor_epc; - umoff = iod_update_umoff(ioc); - D_ASSERT(!UMOFF_IS_NULL(umoff)); - - D_ASSERT(ioc->ic_iov_at == 0); - biov = iod_update_biov(ioc); + sv_addr = iod_get_sv_addr(ioc); + D_ASSERT(!UMOFF_IS_NULL(sv_addr->sa_umoff)); + bio_iov_set(&biov, sv_addr->sa_addr, rsize); tree_rec_bundle2iov(&rbund, &riov); @@ -1678,10 +1770,10 @@ akey_update_single(daos_handle_t toh, uint32_t pm_ver, daos_size_t rsize, else rbund.rb_csum = &csum; - rbund.rb_biov = biov; + rbund.rb_biov = &biov; rbund.rb_rsize = rsize; rbund.rb_gsize = gsize; - rbund.rb_off = umoff; + rbund.rb_off = sv_addr->sa_umoff; rbund.rb_ver = pm_ver; rc = dbtree_update(toh, &kiov, &riov); @@ -1830,10 +1922,7 @@ update_value(struct vos_io_context *ioc, daos_iod_t *iod, struct dcs_csum_info * } for (i = 0; i < iod->iod_nr; i++) { - umem_off_t umoff = iod_update_umoff(ioc); - if (iod->iod_recxs[i].rx_nr == 0) { - D_ASSERT(UMOFF_IS_NULL(umoff)); D_DEBUG(DB_IO, "Skip empty write IOD at %d: idx %lu, nr %lu\n", i, (unsigned long)iod->iod_recxs[i].rx_idx, (unsigned long)iod->iod_recxs[i].rx_nr); @@ -1997,6 +2086,7 @@ dkey_update(struct vos_io_context *ioc, uint32_t pm_ver, daos_key_t *dkey, goto out; } + ioc->ic_sv_addr_at = 0; if (krec->kr_bmap & KREC_BF_NO_AKEY) { struct dcs_csum_info *iod_csums = vos_csum_at(ioc->ic_iod_csums, 0); iod_set_cursor(ioc, 0); @@ -2027,17 +2117,6 @@ dkey_update(struct vos_io_context *ioc, uint32_t pm_ver, daos_key_t *dkey, return rc; } -daos_size_t -vos_recx2irec_size(daos_size_t rsize, struct dcs_csum_info *csum) -{ - struct vos_rec_bundle rbund; - - rbund.rb_csum = csum; - rbund.rb_rsize = rsize; - - return vos_irec_size(&rbund); -} - umem_off_t vos_reserve_scm(struct vos_container *cont, struct umem_rsrvd_act *rsrvd_scm, daos_size_t size) @@ -2127,7 +2206,7 @@ reserve_space(struct vos_io_context *ioc, uint16_t media, daos_size_t size, return rc; } -static int +static void iod_reserve(struct vos_io_context *ioc, struct bio_iov *biov) { struct bio_sglist *bsgl; @@ -2144,37 +2223,102 @@ iod_reserve(struct vos_io_context *ioc, struct bio_iov *biov) D_DEBUG(DB_TRACE, "media %d offset "DF_X64" size %zd\n", biov->bi_addr.ba_type, biov->bi_addr.ba_off, bio_iov2len(biov)); +} + +static inline void +iod_set_sv_addr(struct vos_io_context *ioc, umem_off_t umoff, bio_addr_t *addr) +{ + struct vos_sv_addr *sv_addr; + + D_ASSERTF(ioc->ic_sv_addr_at < ioc->ic_sv_addr_cnt, "sv_at:%u >= sv_cnt:%u\n", + ioc->ic_sv_addr_at, ioc->ic_sv_addr_cnt); + + sv_addr = &ioc->ic_sv_addrs[ioc->ic_sv_addr_at]; + sv_addr->sa_umoff = umoff; + sv_addr->sa_addr = *addr; + ioc->ic_sv_addr_at++; +} + +static int +gang_reserve_sv(struct vos_io_context *ioc, uint16_t media, daos_size_t size, + umem_off_t umoff, unsigned int gang_nr) +{ + struct vos_irec_df *irec; + struct bio_iov biov = { 0 }; + bio_addr_t gaddr = { 0 }; + daos_size_t alloc_sz; + uint64_t off; + char *gaddr_ptr; + int i, rc; + + D_ASSERT(gang_nr > 1); + D_ASSERT(size > VOS_GANG_SIZE_THRESH); + + irec = (struct vos_irec_df *)umem_off2ptr(vos_ioc2umm(ioc), umoff); + gaddr_ptr = vos_irec2data(irec); + + bio_addr_set(&gaddr, DAOS_MEDIA_SCM, umem_ptr2off(vos_ioc2umm(ioc), gaddr_ptr)); + gaddr.ba_gang_nr = gang_nr; + BIO_ADDR_SET_GANG(&gaddr); + + iod_set_sv_addr(ioc, umoff, &gaddr); + + for (i = 0; i < gang_nr; i++) { + D_ASSERT(size > 0); + alloc_sz = min(size, VOS_GANG_SIZE_THRESH); + + rc = reserve_space(ioc, media, alloc_sz, &off); + if (rc) { + DL_ERROR(rc, "Reserve SV on %s failed.", + media == DAOS_MEDIA_SCM ? "SCM" : "NVMe"); + return rc; + } + + bio_addr_set(&biov.bi_addr, media, off); + bio_iov_set_len(&biov, alloc_sz); + iod_reserve(ioc, &biov); + + /* + * Update the SV record metadata on SCM, tx_add_range() will be called by + * svt_rec_alloc_common() later. + */ + bio_gaddr_set(vos_ioc2umm(ioc), &gaddr, i, media, alloc_sz, off); + + size -= alloc_sz; + } + D_ASSERT(size == 0); + return 0; } /* Reserve single value record on specified media */ static int -vos_reserve_single(struct vos_io_context *ioc, uint16_t media, - daos_size_t size) +vos_reserve_single(struct vos_io_context *ioc, uint16_t media, daos_size_t size) { struct vos_irec_df *irec; daos_size_t scm_size; umem_off_t umoff; struct bio_iov biov; uint64_t off = 0; - int rc; + struct vos_rec_bundle rbund = { 0 }; + int rc, gang_nr; struct dcs_csum_info *value_csum = vos_csum_at(ioc->ic_iod_csums, ioc->ic_sgl_at); - /* - * TODO: - * To eliminate internal fragmentaion, misaligned record (record size - * isn't aligned with 4K) on NVMe could be split into two parts, large - * aligned part will be stored on NVMe and being referenced by - * vos_irec_df->ir_ex_addr, small unaligned part will be stored on SCM - * along with vos_irec_df, being referenced by vos_irec_df->ir_body. - */ - scm_size = (media == DAOS_MEDIA_SCM) ? - vos_recx2irec_size(size, value_csum) : - vos_recx2irec_size(0, value_csum); + gang_nr = vos_irec_gang_nr(ioc->ic_cont->vc_pool, size); + D_ASSERT(gang_nr <= UINT8_MAX); + + rbund.rb_csum = value_csum; + rbund.rb_rsize = size; + scm_size = vos_irec_msize(ioc->ic_cont->vc_pool, &rbund); + /* Payload is allocated along with the SV meta record */ + if (media == DAOS_MEDIA_SCM && gang_nr == 0) + scm_size += size; + + /* Reserve SCM for SV meta record */ rc = reserve_space(ioc, DAOS_MEDIA_SCM, scm_size, &off); if (rc) { - D_ERROR("Reserve SCM for SV failed. "DF_RC"\n", DP_RC(rc)); + DL_ERROR(rc, "Reserve SCM for SV meta failed."); return rc; } @@ -2183,13 +2327,14 @@ vos_reserve_single(struct vos_io_context *ioc, uint16_t media, irec = (struct vos_irec_df *)umem_off2ptr(vos_ioc2umm(ioc), umoff); vos_irec_init_csum(irec, value_csum); + /* The SV is huge, turn to gang allocation */ + if (gang_nr > 0) + return gang_reserve_sv(ioc, media, size, umoff, gang_nr); + memset(&biov, 0, sizeof(biov)); if (size == 0) { /* punch */ bio_addr_set_hole(&biov.bi_addr, 1); - goto done; - } - - if (media == DAOS_MEDIA_SCM) { + } else if (media == DAOS_MEDIA_SCM) { char *payload_addr; /* Get the record payload offset */ @@ -2199,15 +2344,16 @@ vos_reserve_single(struct vos_io_context *ioc, uint16_t media, } else { rc = reserve_space(ioc, DAOS_MEDIA_NVME, size, &off); if (rc) { - D_ERROR("Reserve NVMe for SV failed. "DF_RC"\n", - DP_RC(rc)); + DL_ERROR(rc, "Reserve SV on NVMe failed."); return rc; } } -done: + bio_addr_set(&biov.bi_addr, media, off); bio_iov_set_len(&biov, size); - rc = iod_reserve(ioc, &biov); + iod_reserve(ioc, &biov); + + iod_set_sv_addr(ioc, umoff, &biov.bi_addr); return rc; } @@ -2218,38 +2364,25 @@ vos_reserve_recx(struct vos_io_context *ioc, uint16_t media, daos_size_t size, { struct bio_iov biov; uint64_t off = 0; - int rc; + int rc = 0; memset(&biov, 0, sizeof(biov)); /* recx punch */ - if (size == 0 || media != DAOS_MEDIA_SCM) { - ioc->ic_umoffs[ioc->ic_umoffs_cnt] = UMOFF_NULL; - ioc->ic_umoffs_cnt++; - if (size == 0) { - bio_addr_set_hole(&biov.bi_addr, 1); - goto done; - } + if (size == 0) { + bio_addr_set_hole(&biov.bi_addr, 1); + goto done; } if (ioc->ic_dedup && size >= ioc->ic_dedup_th && - vos_dedup_lookup(vos_cont2pool(ioc->ic_cont), csum, csum_len, - &biov)) { + vos_dedup_lookup(vos_cont2pool(ioc->ic_cont), csum, csum_len, &biov)) { if (biov.bi_data_len == size) { D_ASSERT(biov.bi_addr.ba_off != 0); - ioc->ic_umoffs[ioc->ic_umoffs_cnt] = - biov.bi_addr.ba_off; - ioc->ic_umoffs_cnt++; - return iod_reserve(ioc, &biov); + iod_reserve(ioc, &biov); + return 0; } memset(&biov, 0, sizeof(biov)); } - /* - * TODO: - * To eliminate internal fragmentaion, misaligned recx (total recx size - * isn't aligned with 4K) on NVMe could be split into two evtree rects, - * larger rect will be stored on NVMe and small reminder on SCM. - */ rc = reserve_space(ioc, media, size, &off); if (rc) { D_ERROR("Reserve recx failed. "DF_RC"\n", DP_RC(rc)); @@ -2258,7 +2391,7 @@ vos_reserve_recx(struct vos_io_context *ioc, uint16_t media, daos_size_t size, done: bio_addr_set(&biov.bi_addr, media, off); bio_iov_set_len(&biov, size); - rc = iod_reserve(ioc, &biov); + iod_reserve(ioc, &biov); return rc; } diff --git a/src/vos/vos_layout.h b/src/vos/vos_layout.h index 72459544c27..902cb064e26 100644 --- a/src/vos/vos_layout.h +++ b/src/vos/vos_layout.h @@ -91,7 +91,7 @@ enum vos_gc_type { */ /** Current durable format version */ -#define POOL_DF_VERSION VOS_POOL_DF_2_6 +#define POOL_DF_VERSION VOS_POOL_DF_2_8 /** 2.2 features. Until we have an upgrade path for RDB, we need to support more than one old * version. @@ -104,6 +104,9 @@ enum vos_gc_type { /** 2.6 features */ #define VOS_POOL_FEAT_2_6 (VOS_POOL_FEAT_FLAT_DKEY | VOS_POOL_FEAT_EMBED_FIRST) +/** 2.8 features */ +#define VOS_POOL_FEAT_2_8 (VOS_POOL_FEAT_GANG_SV) + /** * Durable format for VOS pool */ diff --git a/src/vos/vos_obj.c b/src/vos/vos_obj.c index 77cb041711f..cc72575f608 100644 --- a/src/vos/vos_obj.c +++ b/src/vos/vos_obj.c @@ -1652,6 +1652,8 @@ recx_iter_copy(struct vos_obj_iter *oiter, vos_iter_entry_t *it_entry, /* Skip copy and return success for a punched record */ if (bio_addr_is_hole(&biov->bi_addr)) return 0; + else if (BIO_ADDR_IS_GANG(&biov->bi_addr)) + return -DER_NOTSUPPORTED; else if (iov_out->iov_buf_len < bio_iov2len(biov)) return -DER_OVERFLOW; diff --git a/src/vos/vos_pool.c b/src/vos/vos_pool.c index af958eafd5d..6c2e0120842 100644 --- a/src/vos/vos_pool.c +++ b/src/vos/vos_pool.c @@ -1416,6 +1416,8 @@ pool_open(void *ph, struct vos_pool_df *pool_df, unsigned int flags, void *metri pool->vp_feats |= VOS_POOL_FEAT_2_4; if (pool_df->pd_version >= VOS_POOL_DF_2_6) pool->vp_feats |= VOS_POOL_FEAT_2_6; + if (pool_df->pd_version >= VOS_POOL_DF_2_8) + pool->vp_feats |= VOS_POOL_FEAT_2_8; if (pool->vp_vea_info == NULL) /** always store on SCM if no bdev */ @@ -1587,6 +1589,8 @@ vos_pool_upgrade(daos_handle_t poh, uint32_t version) pool->vp_feats |= VOS_POOL_FEAT_2_4; if (version >= VOS_POOL_DF_2_6) pool->vp_feats |= VOS_POOL_FEAT_2_6; + if (version >= VOS_POOL_DF_2_8) + pool->vp_feats |= VOS_POOL_FEAT_2_8; return 0; } diff --git a/src/vos/vos_space.c b/src/vos/vos_space.c index a677d061cb6..5763e3f8bac 100644 --- a/src/vos/vos_space.c +++ b/src/vos/vos_space.c @@ -1,5 +1,5 @@ /** - * (C) Copyright 2020-2023 Intel Corporation. + * (C) Copyright 2020-2024 Intel Corporation. * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -212,6 +212,7 @@ estimate_space(struct vos_pool *pool, daos_key_t *dkey, unsigned int iod_nr, struct dcs_csum_info *csums, *recx_csum; daos_iod_t *iod; daos_recx_t *recx; + struct vos_rec_bundle rbund = { 0 }; daos_size_t size, scm, nvme = 0 /* in blk */; int i, j; @@ -233,16 +234,16 @@ estimate_space(struct vos_pool *pool, daos_key_t *dkey, unsigned int iod_nr, /* Single value */ if (iod->iod_type == DAOS_IOD_SINGLE) { size = iod->iod_size; + rbund.rb_csum = csums; + rbund.rb_rsize = size; /* Single value record */ - if (vos_io_scm(pool, iod->iod_type, size, VOS_IOS_GENERIC)) { - /** store data on DAOS_MEDIA_SCM */ - scm += vos_recx2irec_size(size, csums); - } else { - scm += vos_recx2irec_size(0, csums); - if (iod->iod_size != 0) - nvme += vos_byte2blkcnt(iod->iod_size); - } + scm += vos_irec_msize(pool, &rbund); + if (vos_io_scm(pool, iod->iod_type, size, VOS_IOS_GENERIC)) + scm += size; + else + nvme += vos_byte2blkcnt(size); + /* Assume one more SV tree node created */ scm += 256; continue; diff --git a/src/vos/vos_tree.c b/src/vos/vos_tree.c index c36fcaa88c5..e9dd4e94436 100644 --- a/src/vos/vos_tree.c +++ b/src/vos/vos_tree.c @@ -523,10 +523,11 @@ svt_rec_alloc_common(struct btr_instance *tins, struct btr_record *rec, struct vos_svt_key *skey, struct vos_rec_bundle *rbund) { struct vos_irec_df *irec; + struct vos_pool *pool = (struct vos_pool *)tins->ti_priv; int rc; D_ASSERT(!UMOFF_IS_NULL(rbund->rb_off)); - rc = umem_tx_xadd(&tins->ti_umm, rbund->rb_off, vos_irec_msize(rbund), + rc = umem_tx_xadd(&tins->ti_umm, rbund->rb_off, vos_irec_msize(pool, rbund), UMEM_XADD_NO_SNAPSHOT); if (rc != 0) return rc; @@ -591,6 +592,57 @@ cancel_nvme_exts(bio_addr_t *addr, struct dtx_handle *dth) D_ASSERT(0); } +static int +svt_free_payload(struct vos_pool *pool, bio_addr_t *addr, uint64_t rsize) +{ + uint64_t tot_len = rsize; + uint32_t data_len; + bio_addr_t sub_addr = { 0 }; + int i, rc = 0; + + if (bio_addr_is_hole(addr)) + return 0; + + if (tot_len == 0) { + D_ERROR("Invalid 0 SV record size\n"); + return -DER_INVAL; + } + + if (BIO_ADDR_IS_GANG(addr)) { + for (i = 0; i < addr->ba_gang_nr; i++) { + bio_gaddr_get(vos_pool2umm(pool), addr, i, &sub_addr.ba_type, &data_len, + &sub_addr.ba_off); + if (tot_len < data_len) { + D_ERROR("Invalid gang addr[%d], nr:%u, rsize:"DF_U64", " + "len:"DF_U64"/%u\n", i, addr->ba_gang_nr, rsize, + tot_len, data_len); + return -DER_INVAL; + } + tot_len -= data_len; + + rc = vos_bio_addr_free(pool, &sub_addr, data_len); + if (rc) { + DL_ERROR(rc, "SV gang free %d on %s failed.", + i, addr->ba_type == DAOS_MEDIA_SCM ? "SCM" : "NVMe"); + return rc; + } + } + + if (tot_len != 0) { + D_ERROR("Invalid gang addr, nr:%u, rsize:"DF_U64", left"DF_U64"\n", + addr->ba_gang_nr, rsize, tot_len); + return -DER_INVAL; + } + } else if (addr->ba_type == DAOS_MEDIA_NVME) { + rc = vos_bio_addr_free(pool, addr, rsize); + if (rc) + DL_ERROR(rc, "Free SV payload on NVMe failed."); + } + /* Payload is allocated along with vos_iref_df when SV is stored on SCM */ + + return rc; +} + static int svt_rec_free_internal(struct btr_instance *tins, struct btr_record *rec, bool overwrite) @@ -608,7 +660,7 @@ svt_rec_free_internal(struct btr_instance *tins, struct btr_record *rec, if (overwrite) { dth = vos_dth_get(cont->vc_pool->vp_sysdb); - if (dth == NULL) + if (dth == NULL || BIO_ADDR_IS_GANG(addr)) return -DER_NO_PERM; /* Not allowed */ } @@ -618,15 +670,11 @@ svt_rec_free_internal(struct btr_instance *tins, struct btr_record *rec, return rc; if (!overwrite) { - /* SCM value is stored together with vos_irec_df */ - if (addr->ba_type == DAOS_MEDIA_NVME) { - struct vos_pool *pool = tins->ti_priv; + struct vos_pool *pool = tins->ti_priv; - D_ASSERT(pool != NULL); - rc = vos_bio_addr_free(pool, addr, irec->ir_size); - if (rc) - return rc; - } + rc = svt_free_payload(pool, addr, irec->ir_size); + if (rc) + return rc; return umem_free(&tins->ti_umm, rec->rec_off); }