From 340aca9f8208fcdbf16d490512ef4174435992fb Mon Sep 17 00:00:00 2001 From: root Date: Wed, 9 Oct 2024 13:53:56 +0000 Subject: [PATCH 1/2] feat: add file locking to Xrootd delete and write operations This might be useful to prevent data loss/corruption when we have simultaneous transfers of the same file. We do the locking in the following way: locks are added on a separate object with the name ., where is different from file's object numbers (it is basically a text string). On lock release, this object is removed. We use this awkward tecnique instead of locking the first object for the following reason: radosstriper does not like when he tries to create a new file, and its first object already exists. --- src/XrdCeph.cmake | 3 +- src/XrdCeph/XrdCephFileLock.cc | 44 +++++++++++++++++ src/XrdCeph/XrdCephFileLock.hh | 30 ++++++++++++ src/XrdCeph/XrdCephPosix.cc | 88 +++++++++++++++++++++++++++++++++- 4 files changed, 163 insertions(+), 2 deletions(-) create mode 100644 src/XrdCeph/XrdCephFileLock.cc create mode 100644 src/XrdCeph/XrdCephFileLock.hh diff --git a/src/XrdCeph.cmake b/src/XrdCeph.cmake index 9f666637..05fb04a2 100644 --- a/src/XrdCeph.cmake +++ b/src/XrdCeph.cmake @@ -46,7 +46,8 @@ add_library( XrdCeph/XrdCephOss.cc XrdCeph/XrdCephOss.hh XrdCeph/XrdCephOssFile.cc XrdCeph/XrdCephOssFile.hh XrdCeph/XrdCephOssDir.cc XrdCeph/XrdCephOssDir.hh - XrdCeph/XrdCephBulkAioRead.cc XrdCeph/XrdCephBulkAioRead.hh) + XrdCeph/XrdCephBulkAioRead.cc XrdCeph/XrdCephBulkAioRead.hh + XrdCeph/XrdCephFileLock.cc XrdCeph/XrdCephFileLock.hh) target_link_libraries( ${LIB_XRD_CEPH} diff --git a/src/XrdCeph/XrdCephFileLock.cc b/src/XrdCeph/XrdCephFileLock.cc new file mode 100644 index 00000000..6eff80b7 --- /dev/null +++ b/src/XrdCeph/XrdCephFileLock.cc @@ -0,0 +1,44 @@ +#include "XrdCeph/XrdCephFileLock.hh" + +XrdCephFileLock::XrdCephFileLock(const CephFile file, librados::IoCtx* ctx) { + /** + * Constructor. + * + * @param file ceph file description + * + */ + + fr = file; + obj_name = fr.name + lock_ext; + + char host[128]; + gethostname(host, MAX_HOST_NAME); + cookie = std::string(host); + ioctx = ctx; +} + +int XrdCephFileLock::acquire() { + /** + * Acquire the lock. + * + */ + + struct timeval lock_lifetime; + lock_lifetime.tv_sec = 3600*5; + lock_lifetime.tv_usec = 0; + + int rc = ioctx->lock_exclusive(obj_name, XrdCeph_object_lock, cookie, "", &lock_lifetime, 0); + return rc; +} + +int XrdCephFileLock::release() { + /** + * Release the lock. + * + */ + int rc = ioctx->unlock(obj_name, XrdCeph_object_lock, cookie); + if (0 == rc) { + rc = ioctx->remove(obj_name); + } + return rc; +} diff --git a/src/XrdCeph/XrdCephFileLock.hh b/src/XrdCeph/XrdCephFileLock.hh new file mode 100644 index 00000000..000249df --- /dev/null +++ b/src/XrdCeph/XrdCephFileLock.hh @@ -0,0 +1,30 @@ +#include +#include + +#include "XrdCeph/XrdCephPosix.hh" + +#define MAX_HOST_NAME 128 + +/** + * Lock or unlock a file using rados. + * Host name is used as lock cookie. + * A separate object named .XrdCeph_Exclusive_lock is for locking (to prevent issues with striper) + * Can be useful to prevent simultaneous writes/deletes of the same file. + * + */ + + + +struct XrdCephFileLock { + const std::string lock_ext = std::string(".XrdCeph_Exlcusive_lock"); + //const std::string lock_ext = std::string(".0000000000000000"); + const std::string XrdCeph_object_lock = "xrdceph.lock"; + CephFile fr; + std::string obj_name; + std::string cookie; + librados::IoCtx* ioctx; + + XrdCephFileLock(const CephFile file, librados::IoCtx*); + int acquire(); + int release(); +}; diff --git a/src/XrdCeph/XrdCephPosix.cc b/src/XrdCeph/XrdCephPosix.cc index 975193a8..962f0d1d 100644 --- a/src/XrdCeph/XrdCephPosix.cc +++ b/src/XrdCeph/XrdCephPosix.cc @@ -54,6 +54,7 @@ #include "XrdCeph/XrdCephPosix.hh" #include "XrdCeph/XrdCephBulkAioRead.hh" +#include "XrdCeph/XrdCephFileLock.hh" /// small struct for directory listing struct DirIterator { @@ -616,6 +617,52 @@ void ceph_posix_set_logfunc(void (*logfunc) (char *, va_list argp)) { static int ceph_posix_internal_truncate(const CephFile &file, unsigned long long size); +/* + +struct XrdCephFileLock { + const std::string lock_ext = std::string(".XrdCeph_Exlcusive_lock"); + CephFile fr; + std::string obj_name; + std::string cookie; + + XrdCephFileLock(const CephFile file) { + + fr = file; + obj_name = fr.name + lock_ext; + + char host[128]; + gethostname(host, MAX_HOST_NAME); + cookie = std::string(host); + } + + int acquire() { + + librados::IoCtx *ioctx = getIoCtx(fr); + if (0 == ioctx) { + return -EINVAL; + } + + struct timeval lock_lifetime; + lock_lifetime.tv_sec = 3600*5; + lock_lifetime.tv_usec = 0; + + int rc = ioctx->lock_exclusive(obj_name, XrdCeph_object_lock, cookie, "", &lock_lifetime, 0); + return rc; + } + + int release() { + librados::IoCtx *ioctx = getIoCtx(fr); + if (0 == ioctx) { + return -EINVAL; + } + + int rc = ioctx->remove(obj_name); + + return rc; + } +}; +*/ + /** * * brief ceph_posix_open function opens a file for read or write * * details This function either: @@ -661,6 +708,7 @@ int ceph_posix_open(XrdOucEnv* env, const char *pathname, int flags, mode_t mode } else { // Access mode is WRITE if (fileExists) { + if (flags & O_TRUNC) { int rc = ceph_posix_unlink(env, pathname); if (rc < 0 && rc != -ENOENT) { @@ -674,6 +722,16 @@ int ceph_posix_open(XrdOucEnv* env, const char *pathname, int flags, mode_t mode } } } + //Lock file, so that no deletes/writes happen simultaneously + librados::IoCtx *ioctx = getIoCtx(fr); + if (0 == ioctx) { + return -EINVAL; + } + XrdCephFileLock file_lock = XrdCephFileLock(fr, ioctx); + int rc = file_lock.acquire(); + if (rc < 0) { + return rc; + } // At this point, we know either the target file didn't exist, or the ceph_posix_unlink above removed it int fd = insertFileRef(fr); logwrapper((char*)"File descriptor %d associated to file %s opened in write mode", fd, pathname); @@ -686,6 +744,18 @@ int ceph_posix_open(XrdOucEnv* env, const char *pathname, int flags, mode_t mode int ceph_posix_close(int fd) { CephFileRef* fr = getFileRef(fd); if (fr) { + //Unlock the file + librados::IoCtx *ioctx = getIoCtx(*fr); + if (0 == ioctx) { + return -EINVAL; + } + XrdCephFileLock file_lock = XrdCephFileLock(*fr, ioctx); + int rc = file_lock.release(); + //Ignore failures? + if (rc < 0) { + return rc; + } + ::timeval now; ::gettimeofday(&now, nullptr); XrdSysMutexHelper lock(fr->statsMutex); @@ -1449,12 +1519,27 @@ int ceph_posix_unlink(XrdOucEnv* env, const char *pathname) { logwrapper((char*)"ceph_posix_unlink : %s", pathname); // minimal stat : only size and times are filled CephFile file = getCephFile(pathname, env); + + //Lock file, so that no deletes/writes happen simultaneously + librados::IoCtx *ioctx = getIoCtx(file); + if (0 == ioctx) { + return -EINVAL; + } + + XrdCephFileLock file_lock = XrdCephFileLock(file, ioctx); + int rc = file_lock.acquire(); + if (rc < 0) { + return rc; + } + libradosstriper::RadosStriper *striper = getRadosStriper(file); if (0 == striper) { + file_lock.release(); return -EINVAL; } - int rc = striper->remove(file.name); + rc = striper->remove(file.name); if (rc != -EBUSY) { + file_lock.release(); return rc; } // if EBUSY returned, assume the file is locked; so try to remove the lock @@ -1464,6 +1549,7 @@ int ceph_posix_unlink(XrdOucEnv* env, const char *pathname) { rc = ceph_posix_internal_removexattr(file, "lock.striper.lock"); if (rc !=0 ) { logwrapper((char*)"ceph_posix_unlink : unlink rmxattr failed %s, %d", pathname, rc); + file_lock.release(); return rc; } From 1ad5eee56d13a3465fc29a4b8ac6fb58342d7505 Mon Sep 17 00:00:00 2001 From: root Date: Wed, 9 Oct 2024 14:02:26 +0000 Subject: [PATCH 2/2] fix: cleanup --- src/XrdCeph/XrdCephPosix.cc | 46 ------------------------------------- 1 file changed, 46 deletions(-) diff --git a/src/XrdCeph/XrdCephPosix.cc b/src/XrdCeph/XrdCephPosix.cc index 962f0d1d..543a6516 100644 --- a/src/XrdCeph/XrdCephPosix.cc +++ b/src/XrdCeph/XrdCephPosix.cc @@ -617,52 +617,6 @@ void ceph_posix_set_logfunc(void (*logfunc) (char *, va_list argp)) { static int ceph_posix_internal_truncate(const CephFile &file, unsigned long long size); -/* - -struct XrdCephFileLock { - const std::string lock_ext = std::string(".XrdCeph_Exlcusive_lock"); - CephFile fr; - std::string obj_name; - std::string cookie; - - XrdCephFileLock(const CephFile file) { - - fr = file; - obj_name = fr.name + lock_ext; - - char host[128]; - gethostname(host, MAX_HOST_NAME); - cookie = std::string(host); - } - - int acquire() { - - librados::IoCtx *ioctx = getIoCtx(fr); - if (0 == ioctx) { - return -EINVAL; - } - - struct timeval lock_lifetime; - lock_lifetime.tv_sec = 3600*5; - lock_lifetime.tv_usec = 0; - - int rc = ioctx->lock_exclusive(obj_name, XrdCeph_object_lock, cookie, "", &lock_lifetime, 0); - return rc; - } - - int release() { - librados::IoCtx *ioctx = getIoCtx(fr); - if (0 == ioctx) { - return -EINVAL; - } - - int rc = ioctx->remove(obj_name); - - return rc; - } -}; -*/ - /** * * brief ceph_posix_open function opens a file for read or write * * details This function either: