From 2d38203f955c9c0a85ed97a449425bd79b6cc166 Mon Sep 17 00:00:00 2001 From: Yaming Kuang Date: Tue, 15 Aug 2023 15:15:10 -0700 Subject: [PATCH] SDSTOR-xxxx fix docker build issue --- conanfile.py | 2 +- src/lib/interfaces/uring_drive_interface.cpp | 174 +++++++++---------- src/lib/interfaces/uring_drive_interface.hpp | 3 +- src/lib/iomgr.cpp | 12 +- src/lib/iomgr_helper.hpp | 44 +++-- 5 files changed, 123 insertions(+), 112 deletions(-) diff --git a/conanfile.py b/conanfile.py index b044c912..4d077611 100644 --- a/conanfile.py +++ b/conanfile.py @@ -5,7 +5,7 @@ class IOMgrConan(ConanFile): name = "iomgr" - version = "9.2.2" + version = "9.2.3" homepage = "https://github.com/eBay/IOManager" description = "Asynchronous event manager" topics = ("ebay", "nublox", "aio") diff --git a/src/lib/interfaces/uring_drive_interface.cpp b/src/lib/interfaces/uring_drive_interface.cpp index 2e3458a6..40465327 100644 --- a/src/lib/interfaces/uring_drive_interface.cpp +++ b/src/lib/interfaces/uring_drive_interface.cpp @@ -39,11 +39,6 @@ namespace iomgr { thread_local uring_drive_channel* UringDriveInterface::t_uring_ch{nullptr}; uring_drive_channel::uring_drive_channel(UringDriveInterface* iface) { - // TODO: For now setup as interrupt mode instead of pollmode -#if LINUX_VERSION_CODE < KERNEL_VERSION(5, 4, 0) - RELEASE_ASSERT(0, "Not expected to run io_uring below kernel 5.4!"); -#endif - int ret = io_uring_queue_init(UringDriveInterface::per_thread_qdepth, &m_ring, 0); if (ret) { folly::throwSystemError(fmt::format("Unable to create uring queue created ret={}", ret)); } @@ -152,7 +147,8 @@ void uring_drive_channel::drain_waitq() { } ///////////////////////////// UringDriveInterface ///////////////////////////////////////// -UringDriveInterface::UringDriveInterface(const io_interface_comp_cb_t& cb) : KernelDriveInterface(cb) {} +UringDriveInterface::UringDriveInterface(const bool new_interface_supported, const io_interface_comp_cb_t& cb) : + KernelDriveInterface(cb), m_new_intfc(new_interface_supported) {} void UringDriveInterface::init_iface_reactor_context(IOReactor*) { if (t_uring_ch == nullptr) { t_uring_ch = new uring_drive_channel(this); } @@ -202,35 +198,38 @@ void UringDriveInterface::close_dev(const io_device_ptr& iodev) { folly::Future< bool > UringDriveInterface::async_write(IODevice* iodev, const char* data, uint32_t size, uint64_t offset, bool part_of_batch) { -#if LINUX_VERSION_CODE < KERNEL_VERSION(5, 6, 0) - std::array< iovec, 1 > iov; - iov[0].iov_base = (void*)data; - iov[0].iov_len = size; - return async_writev(iodev, iov.data(), 1, size, offset, part_of_batch); -#else - auto iocb = new drive_iocb(this, iodev, DriveOpType::WRITE, size, offset); - iocb->set_data((char*)data); - iocb->completion = std::move(folly::Promise< bool >{}); - auto ret = iocb->folly_comp_promise().getFuture(); + if (!m_new_intfc) { + std::array< iovec, 1 > iov; + iov[0].iov_base = (void*)data; + iov[0].iov_len = size; - auto submit_in_this_thread = [this](drive_iocb* iocb, bool part_of_batch) { - DriveInterface::increment_outstanding_counter(iocb); - auto sqe = t_uring_ch->get_sqe_or_enqueue(iocb); - if (sqe == nullptr) { return; } + return async_writev(iodev, iov.data(), 1, size, offset, part_of_batch); + } else { + // io_uring_prep_write available starts from kernel 5.6 + auto iocb = new drive_iocb(this, iodev, DriveOpType::WRITE, size, offset); + iocb->set_data((char*)data); + iocb->completion = std::move(folly::Promise< bool >{}); + auto ret = iocb->folly_comp_promise().getFuture(); - io_uring_prep_write(sqe, iocb->iodev->fd(), (const void*)iocb->get_data(), iocb->size, iocb->offset); - t_uring_ch->submit_if_needed(iocb, sqe, part_of_batch); - }; + auto submit_in_this_thread = [this](drive_iocb* iocb, bool part_of_batch) { + DriveInterface::increment_outstanding_counter(iocb); + auto sqe = t_uring_ch->get_sqe_or_enqueue(iocb); + if (sqe == nullptr) { return; } - if (iomanager.this_reactor() != nullptr) { - submit_in_this_thread(iocb, part_of_batch); - } else { - iomanager.run_on_forget(reactor_regex::random_worker, [=]() { submit_in_this_thread(iocb, part_of_batch); }); - } + io_uring_prep_write(sqe, iocb->iodev->fd(), (const void*)iocb->get_data(), iocb->size, iocb->offset); + t_uring_ch->submit_if_needed(iocb, sqe, part_of_batch); + }; - return ret; -#endif + if (iomanager.this_reactor() != nullptr) { + submit_in_this_thread(iocb, part_of_batch); + } else { + iomanager.run_on_forget(reactor_regex::random_worker, + [=]() { submit_in_this_thread(iocb, part_of_batch); }); + } + + return ret; + } } folly::Future< bool > UringDriveInterface::async_writev(IODevice* iodev, const iovec* iov, int iovcnt, uint32_t size, @@ -259,34 +258,35 @@ folly::Future< bool > UringDriveInterface::async_writev(IODevice* iodev, const i folly::Future< bool > UringDriveInterface::async_read(IODevice* iodev, char* data, uint32_t size, uint64_t offset, bool part_of_batch) { -#if LINUX_VERSION_CODE < KERNEL_VERSION(5, 6, 0) - std::array< iovec, 1 > iov; - iov[0].iov_base = data; - iov[0].iov_len = size; + if (!m_new_intfc) { + std::array< iovec, 1 > iov; + iov[0].iov_base = data; + iov[0].iov_len = size; - return async_readv(iodev, iov.data(), 1, size, offset, part_of_batch); -#else - auto iocb = new drive_iocb(this, iodev, DriveOpType::READ, size, offset); - iocb->set_data(data); - iocb->completion = std::move(folly::Promise< bool >{}); - auto ret = iocb->folly_comp_promise().getFuture(); + return async_readv(iodev, iov.data(), 1, size, offset, part_of_batch); + } else { + auto iocb = new drive_iocb(this, iodev, DriveOpType::READ, size, offset); + iocb->set_data(data); + iocb->completion = std::move(folly::Promise< bool >{}); + auto ret = iocb->folly_comp_promise().getFuture(); - auto submit_in_this_thread = [this](drive_iocb* iocb, bool part_of_batch) { - DriveInterface::increment_outstanding_counter(iocb); - auto sqe = t_uring_ch->get_sqe_or_enqueue(iocb); - if (sqe == nullptr) { return; } + auto submit_in_this_thread = [this](drive_iocb* iocb, bool part_of_batch) { + DriveInterface::increment_outstanding_counter(iocb); + auto sqe = t_uring_ch->get_sqe_or_enqueue(iocb); + if (sqe == nullptr) { return; } - io_uring_prep_read(sqe, iocb->iodev->fd(), (void*)iocb->get_data(), iocb->size, iocb->offset); - t_uring_ch->submit_if_needed(iocb, sqe, part_of_batch); - }; + io_uring_prep_read(sqe, iocb->iodev->fd(), (void*)iocb->get_data(), iocb->size, iocb->offset); + t_uring_ch->submit_if_needed(iocb, sqe, part_of_batch); + }; - if (iomanager.this_reactor() != nullptr) { - submit_in_this_thread(iocb, part_of_batch); - } else { - iomanager.run_on_forget(reactor_regex::random_worker, [=]() { submit_in_this_thread(iocb, part_of_batch); }); + if (iomanager.this_reactor() != nullptr) { + submit_in_this_thread(iocb, part_of_batch); + } else { + iomanager.run_on_forget(reactor_regex::random_worker, + [=]() { submit_in_this_thread(iocb, part_of_batch); }); + } + return ret; } - return ret; -#endif } folly::Future< bool > UringDriveInterface::async_readv(IODevice* iodev, const iovec* iov, int iovcnt, uint32_t size, @@ -352,26 +352,26 @@ void UringDriveInterface::sync_write(IODevice* iodev, const char* data, uint32_t return KernelDriveInterface::sync_write(iodev, data, size, offset); } -#if LINUX_VERSION_CODE < KERNEL_VERSION(5, 6, 0) - std::array< iovec, 1 > iov; - iov[0].iov_base = data; - iov[0].iov_len = size; + if (!m_new_intfc) { + std::array< iovec, 1 > iov; + iov[0].iov_base = (void*)data; + iov[0].iov_len = size; - return sync_writev(iodev, iov.data(), 1, size, offset); -#else - auto iocb = new drive_iocb(this, iodev, DriveOpType::WRITE, size, offset); - iocb->set_data((char*)data); - iocb->completion = std::move(FiberManagerLib::Promise< bool >{}); - auto f = iocb->fiber_comp_promise().getFuture(); + return sync_writev(iodev, iov.data(), 1, size, offset); + } else { + auto iocb = new drive_iocb(this, iodev, DriveOpType::WRITE, size, offset); + iocb->set_data((char*)data); + iocb->completion = std::move(FiberManagerLib::Promise< bool >{}); + auto f = iocb->fiber_comp_promise().getFuture(); - DriveInterface::increment_outstanding_counter(iocb); - auto sqe = t_uring_ch->get_sqe_or_enqueue(iocb); - assert(sqe); + DriveInterface::increment_outstanding_counter(iocb); + auto sqe = t_uring_ch->get_sqe_or_enqueue(iocb); + assert(sqe); - io_uring_prep_write(sqe, iodev->fd(), (const void*)iocb->get_data(), iocb->size, offset); - t_uring_ch->submit_if_needed(iocb, sqe, false); - f.get(); -#endif + io_uring_prep_write(sqe, iodev->fd(), (const void*)iocb->get_data(), iocb->size, offset); + t_uring_ch->submit_if_needed(iocb, sqe, false); + f.get(); + } } void UringDriveInterface::sync_writev(IODevice* iodev, const iovec* iov, int iovcnt, uint32_t size, uint64_t offset) { @@ -398,26 +398,26 @@ void UringDriveInterface::sync_read(IODevice* iodev, char* data, uint32_t size, return KernelDriveInterface::sync_read(iodev, data, size, offset); } -#if LINUX_VERSION_CODE < KERNEL_VERSION(5, 6, 0) - std::array< iovec, 1 > iov; - iov[0].iov_base = data; - iov[0].iov_len = size; + if (!m_new_intfc) { + std::array< iovec, 1 > iov; + iov[0].iov_base = data; + iov[0].iov_len = size; - return sync_readv(iodev, iov.data(), 1, size, offset); -#else - auto iocb = new drive_iocb(this, iodev, DriveOpType::READ, size, offset); - iocb->set_data(data); - iocb->completion = std::move(FiberManagerLib::Promise< bool >{}); - auto f = iocb->fiber_comp_promise().getFuture(); + return sync_readv(iodev, iov.data(), 1, size, offset); + } else { + auto iocb = new drive_iocb(this, iodev, DriveOpType::READ, size, offset); + iocb->set_data(data); + iocb->completion = std::move(FiberManagerLib::Promise< bool >{}); + auto f = iocb->fiber_comp_promise().getFuture(); - DriveInterface::increment_outstanding_counter(iocb); - auto sqe = t_uring_ch->get_sqe_or_enqueue(iocb); - assert(sqe); + DriveInterface::increment_outstanding_counter(iocb); + auto sqe = t_uring_ch->get_sqe_or_enqueue(iocb); + assert(sqe); - io_uring_prep_read(sqe, iodev->fd(), (void*)iocb->get_data(), iocb->size, offset); - t_uring_ch->submit_if_needed(iocb, sqe, false); - f.get(); -#endif + io_uring_prep_read(sqe, iodev->fd(), (void*)iocb->get_data(), iocb->size, offset); + t_uring_ch->submit_if_needed(iocb, sqe, false); + f.get(); + } } void UringDriveInterface::sync_readv(IODevice* iodev, const iovec* iov, int iovcnt, uint32_t size, uint64_t offset) { diff --git a/src/lib/interfaces/uring_drive_interface.hpp b/src/lib/interfaces/uring_drive_interface.hpp index e3c407e1..422e0998 100644 --- a/src/lib/interfaces/uring_drive_interface.hpp +++ b/src/lib/interfaces/uring_drive_interface.hpp @@ -80,7 +80,7 @@ class UringDriveInterface : public KernelDriveInterface { public: static constexpr uint32_t per_thread_qdepth = 256; - UringDriveInterface(const io_interface_comp_cb_t& cb = nullptr); + UringDriveInterface(const bool new_interface_supported, const io_interface_comp_cb_t& cb = nullptr); virtual ~UringDriveInterface() = default; drive_interface_type interface_type() const override { return drive_interface_type::uring; } std::string name() const override { return "uring_drive_interface"; } @@ -119,5 +119,6 @@ class UringDriveInterface : public KernelDriveInterface { private: static thread_local uring_drive_channel* t_uring_ch; UringDriveInterfaceMetrics m_metrics; + bool m_new_intfc; }; } // namespace iomgr diff --git a/src/lib/iomgr.cpp b/src/lib/iomgr.cpp index 901d93fa..25b41956 100644 --- a/src/lib/iomgr.cpp +++ b/src/lib/iomgr.cpp @@ -125,7 +125,8 @@ void IOManager::start(const iomgr_params& params, const thread_state_notifier_t& // Do Poller specific pre interface initialization m_impl->pre_interface_init(); - m_is_uring_capable = check_uring_capability(); + bool new_interface_supported = false; + m_is_uring_capable = check_uring_capability(new_interface_supported); LOGINFOMOD(iomgr, "System has uring_capability={}", m_is_uring_capable); // Create all in-built interfaces here @@ -138,7 +139,8 @@ void IOManager::start(const iomgr_params& params, const thread_state_notifier_t& iface_adder(); } else { if (m_is_uring_capable && !m_is_spdk) { - add_drive_interface(std::dynamic_pointer_cast< DriveInterface >(std::make_shared< UringDriveInterface >())); + add_drive_interface(std::dynamic_pointer_cast< DriveInterface >( + std::make_shared< UringDriveInterface >(new_interface_supported))); } else { add_drive_interface(std::dynamic_pointer_cast< DriveInterface >(std::make_shared< AioDriveInterface >())); } @@ -151,7 +153,6 @@ void IOManager::start(const iomgr_params& params, const thread_state_notifier_t& // Start all reactor threads set_state(iomgr_state::reactor_init); - // Caller can override the number of fibers per thread; o.w., it is taken from dynamic config create_worker_reactors((0 < params.num_fibers) ? params.num_fibers : IM_DYNAMIC_CONFIG(thread.num_fibers)); wait_for_state(iomgr_state::sys_init); @@ -236,9 +237,8 @@ void IOManager::create_worker_reactors(uint32_t num_fibers) { } for (uint32_t i{0}; i < m_num_workers; ++i) { - m_worker_threads.emplace_back( - m_impl->create_reactor_impl(fmt::format("iomgr_thread_{}", i), m_is_spdk ? TIGHT_LOOP : INTERRUPT_LOOP, - num_fibers, (int)i, nullptr)); + m_worker_threads.emplace_back(m_impl->create_reactor_impl( + fmt::format("iomgr_thread_{}", i), m_is_spdk ? TIGHT_LOOP : INTERRUPT_LOOP, num_fibers, (int)i, nullptr)); LOGDEBUGMOD(iomgr, "Created iomanager worker reactor thread {}...", i); } } diff --git a/src/lib/iomgr_helper.hpp b/src/lib/iomgr_helper.hpp index 5f6ef4d2..49b729e4 100644 --- a/src/lib/iomgr_helper.hpp +++ b/src/lib/iomgr_helper.hpp @@ -18,33 +18,43 @@ static constexpr uint64_t Gi{Ki * Mi}; static constexpr uint64_t Ti{Ki * Gi}; static const std::string hugepage_env{"HUGEPAGE"}; -static bool check_uring_capability() { +static bool check_uring_capability(bool& new_interface_supported) { #ifdef __linux__ -#if LINUX_VERSION_CODE < KERNEL_VERSION(5, 6, 0) + bool uring_supported{true}; + new_interface_supported = true; if (syscall(__NR_io_uring_register, 0, IORING_UNREGISTER_BUFFERS, NULL, 0) && errno == ENOSYS) { // No io_uring - return false; + new_interface_supported = false; + uring_supported = false; } else { // io_uring - return true; + uring_supported = true; } -#else - std::vector< int > ops = {IORING_OP_NOP, IORING_OP_READV, IORING_OP_WRITEV, - IORING_OP_FSYNC, IORING_OP_READ, IORING_OP_WRITE}; - bool supported{true}; - struct io_uring_probe* probe = io_uring_get_probe(); - if (probe == nullptr) { return false; } + if (uring_supported) { + // do futher check if new interfaces are supported (starting available with kernel 5.6); + std::vector< int > ops = {IORING_OP_NOP, IORING_OP_READV, IORING_OP_WRITEV, + IORING_OP_FSYNC, IORING_OP_READ, IORING_OP_WRITE}; + + struct io_uring_probe* probe = io_uring_get_probe(); + if (probe == nullptr) { + new_interface_supported = false; + goto exit; + } - for (auto& op : ops) { - if (!io_uring_opcode_supported(probe, op)) { - supported = false; - break; + for (auto& op : ops) { + if (!io_uring_opcode_supported(probe, op)) { + new_interface_supported = false; + break; + } } + + free(probe); } - free(probe); - return supported; -#endif + +exit: + return uring_supported; + #else return false; #endif