Skip to content

Commit

Permalink
Merge pull request #42 from yamingk/master
Browse files Browse the repository at this point in the history
SDSTOR-11552 fix docker build issue
  • Loading branch information
szmyd authored Aug 16, 2023
2 parents 79ba797 + 2d38203 commit 169654a
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 112 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

class IOMgrConan(ConanFile):
name = "iomgr"
version = "9.2.2"
version = "9.2.3"
homepage = "https://github.com/eBay/IOManager"
description = "Asynchronous event manager"
topics = ("ebay", "nublox", "aio")
Expand Down
174 changes: 87 additions & 87 deletions src/lib/interfaces/uring_drive_interface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,6 @@ namespace iomgr {
thread_local uring_drive_channel* UringDriveInterface::t_uring_ch{nullptr};

uring_drive_channel::uring_drive_channel(UringDriveInterface* iface) {
// TODO: For now setup as interrupt mode instead of pollmode
#if LINUX_VERSION_CODE < KERNEL_VERSION(5, 4, 0)
RELEASE_ASSERT(0, "Not expected to run io_uring below kernel 5.4!");
#endif

int ret = io_uring_queue_init(UringDriveInterface::per_thread_qdepth, &m_ring, 0);
if (ret) { folly::throwSystemError(fmt::format("Unable to create uring queue created ret={}", ret)); }

Expand Down Expand Up @@ -152,7 +147,8 @@ void uring_drive_channel::drain_waitq() {
}

///////////////////////////// UringDriveInterface /////////////////////////////////////////
UringDriveInterface::UringDriveInterface(const io_interface_comp_cb_t& cb) : KernelDriveInterface(cb) {}
UringDriveInterface::UringDriveInterface(const bool new_interface_supported, const io_interface_comp_cb_t& cb) :
KernelDriveInterface(cb), m_new_intfc(new_interface_supported) {}

void UringDriveInterface::init_iface_reactor_context(IOReactor*) {
if (t_uring_ch == nullptr) { t_uring_ch = new uring_drive_channel(this); }
Expand Down Expand Up @@ -202,35 +198,38 @@ void UringDriveInterface::close_dev(const io_device_ptr& iodev) {

folly::Future< bool > UringDriveInterface::async_write(IODevice* iodev, const char* data, uint32_t size,
uint64_t offset, bool part_of_batch) {
#if LINUX_VERSION_CODE < KERNEL_VERSION(5, 6, 0)
std::array< iovec, 1 > iov;
iov[0].iov_base = (void*)data;
iov[0].iov_len = size;

return async_writev(iodev, iov.data(), 1, size, offset, part_of_batch);
#else
auto iocb = new drive_iocb(this, iodev, DriveOpType::WRITE, size, offset);
iocb->set_data((char*)data);
iocb->completion = std::move(folly::Promise< bool >{});
auto ret = iocb->folly_comp_promise().getFuture();
if (!m_new_intfc) {
std::array< iovec, 1 > iov;
iov[0].iov_base = (void*)data;
iov[0].iov_len = size;

auto submit_in_this_thread = [this](drive_iocb* iocb, bool part_of_batch) {
DriveInterface::increment_outstanding_counter(iocb);
auto sqe = t_uring_ch->get_sqe_or_enqueue(iocb);
if (sqe == nullptr) { return; }
return async_writev(iodev, iov.data(), 1, size, offset, part_of_batch);
} else {
// io_uring_prep_write available starts from kernel 5.6
auto iocb = new drive_iocb(this, iodev, DriveOpType::WRITE, size, offset);
iocb->set_data((char*)data);
iocb->completion = std::move(folly::Promise< bool >{});
auto ret = iocb->folly_comp_promise().getFuture();

io_uring_prep_write(sqe, iocb->iodev->fd(), (const void*)iocb->get_data(), iocb->size, iocb->offset);
t_uring_ch->submit_if_needed(iocb, sqe, part_of_batch);
};
auto submit_in_this_thread = [this](drive_iocb* iocb, bool part_of_batch) {
DriveInterface::increment_outstanding_counter(iocb);
auto sqe = t_uring_ch->get_sqe_or_enqueue(iocb);
if (sqe == nullptr) { return; }

if (iomanager.this_reactor() != nullptr) {
submit_in_this_thread(iocb, part_of_batch);
} else {
iomanager.run_on_forget(reactor_regex::random_worker, [=]() { submit_in_this_thread(iocb, part_of_batch); });
}
io_uring_prep_write(sqe, iocb->iodev->fd(), (const void*)iocb->get_data(), iocb->size, iocb->offset);
t_uring_ch->submit_if_needed(iocb, sqe, part_of_batch);
};

return ret;
#endif
if (iomanager.this_reactor() != nullptr) {
submit_in_this_thread(iocb, part_of_batch);
} else {
iomanager.run_on_forget(reactor_regex::random_worker,
[=]() { submit_in_this_thread(iocb, part_of_batch); });
}

return ret;
}
}

folly::Future< bool > UringDriveInterface::async_writev(IODevice* iodev, const iovec* iov, int iovcnt, uint32_t size,
Expand Down Expand Up @@ -259,34 +258,35 @@ folly::Future< bool > UringDriveInterface::async_writev(IODevice* iodev, const i

folly::Future< bool > UringDriveInterface::async_read(IODevice* iodev, char* data, uint32_t size, uint64_t offset,
bool part_of_batch) {
#if LINUX_VERSION_CODE < KERNEL_VERSION(5, 6, 0)
std::array< iovec, 1 > iov;
iov[0].iov_base = data;
iov[0].iov_len = size;
if (!m_new_intfc) {
std::array< iovec, 1 > iov;
iov[0].iov_base = data;
iov[0].iov_len = size;

return async_readv(iodev, iov.data(), 1, size, offset, part_of_batch);
#else
auto iocb = new drive_iocb(this, iodev, DriveOpType::READ, size, offset);
iocb->set_data(data);
iocb->completion = std::move(folly::Promise< bool >{});
auto ret = iocb->folly_comp_promise().getFuture();
return async_readv(iodev, iov.data(), 1, size, offset, part_of_batch);
} else {
auto iocb = new drive_iocb(this, iodev, DriveOpType::READ, size, offset);
iocb->set_data(data);
iocb->completion = std::move(folly::Promise< bool >{});
auto ret = iocb->folly_comp_promise().getFuture();

auto submit_in_this_thread = [this](drive_iocb* iocb, bool part_of_batch) {
DriveInterface::increment_outstanding_counter(iocb);
auto sqe = t_uring_ch->get_sqe_or_enqueue(iocb);
if (sqe == nullptr) { return; }
auto submit_in_this_thread = [this](drive_iocb* iocb, bool part_of_batch) {
DriveInterface::increment_outstanding_counter(iocb);
auto sqe = t_uring_ch->get_sqe_or_enqueue(iocb);
if (sqe == nullptr) { return; }

io_uring_prep_read(sqe, iocb->iodev->fd(), (void*)iocb->get_data(), iocb->size, iocb->offset);
t_uring_ch->submit_if_needed(iocb, sqe, part_of_batch);
};
io_uring_prep_read(sqe, iocb->iodev->fd(), (void*)iocb->get_data(), iocb->size, iocb->offset);
t_uring_ch->submit_if_needed(iocb, sqe, part_of_batch);
};

if (iomanager.this_reactor() != nullptr) {
submit_in_this_thread(iocb, part_of_batch);
} else {
iomanager.run_on_forget(reactor_regex::random_worker, [=]() { submit_in_this_thread(iocb, part_of_batch); });
if (iomanager.this_reactor() != nullptr) {
submit_in_this_thread(iocb, part_of_batch);
} else {
iomanager.run_on_forget(reactor_regex::random_worker,
[=]() { submit_in_this_thread(iocb, part_of_batch); });
}
return ret;
}
return ret;
#endif
}

folly::Future< bool > UringDriveInterface::async_readv(IODevice* iodev, const iovec* iov, int iovcnt, uint32_t size,
Expand Down Expand Up @@ -352,26 +352,26 @@ void UringDriveInterface::sync_write(IODevice* iodev, const char* data, uint32_t
return KernelDriveInterface::sync_write(iodev, data, size, offset);
}

#if LINUX_VERSION_CODE < KERNEL_VERSION(5, 6, 0)
std::array< iovec, 1 > iov;
iov[0].iov_base = data;
iov[0].iov_len = size;
if (!m_new_intfc) {
std::array< iovec, 1 > iov;
iov[0].iov_base = (void*)data;
iov[0].iov_len = size;

return sync_writev(iodev, iov.data(), 1, size, offset);
#else
auto iocb = new drive_iocb(this, iodev, DriveOpType::WRITE, size, offset);
iocb->set_data((char*)data);
iocb->completion = std::move(FiberManagerLib::Promise< bool >{});
auto f = iocb->fiber_comp_promise().getFuture();
return sync_writev(iodev, iov.data(), 1, size, offset);
} else {
auto iocb = new drive_iocb(this, iodev, DriveOpType::WRITE, size, offset);
iocb->set_data((char*)data);
iocb->completion = std::move(FiberManagerLib::Promise< bool >{});
auto f = iocb->fiber_comp_promise().getFuture();

DriveInterface::increment_outstanding_counter(iocb);
auto sqe = t_uring_ch->get_sqe_or_enqueue(iocb);
assert(sqe);
DriveInterface::increment_outstanding_counter(iocb);
auto sqe = t_uring_ch->get_sqe_or_enqueue(iocb);
assert(sqe);

io_uring_prep_write(sqe, iodev->fd(), (const void*)iocb->get_data(), iocb->size, offset);
t_uring_ch->submit_if_needed(iocb, sqe, false);
f.get();
#endif
io_uring_prep_write(sqe, iodev->fd(), (const void*)iocb->get_data(), iocb->size, offset);
t_uring_ch->submit_if_needed(iocb, sqe, false);
f.get();
}
}

void UringDriveInterface::sync_writev(IODevice* iodev, const iovec* iov, int iovcnt, uint32_t size, uint64_t offset) {
Expand All @@ -398,26 +398,26 @@ void UringDriveInterface::sync_read(IODevice* iodev, char* data, uint32_t size,
return KernelDriveInterface::sync_read(iodev, data, size, offset);
}

#if LINUX_VERSION_CODE < KERNEL_VERSION(5, 6, 0)
std::array< iovec, 1 > iov;
iov[0].iov_base = data;
iov[0].iov_len = size;
if (!m_new_intfc) {
std::array< iovec, 1 > iov;
iov[0].iov_base = data;
iov[0].iov_len = size;

return sync_readv(iodev, iov.data(), 1, size, offset);
#else
auto iocb = new drive_iocb(this, iodev, DriveOpType::READ, size, offset);
iocb->set_data(data);
iocb->completion = std::move(FiberManagerLib::Promise< bool >{});
auto f = iocb->fiber_comp_promise().getFuture();
return sync_readv(iodev, iov.data(), 1, size, offset);
} else {
auto iocb = new drive_iocb(this, iodev, DriveOpType::READ, size, offset);
iocb->set_data(data);
iocb->completion = std::move(FiberManagerLib::Promise< bool >{});
auto f = iocb->fiber_comp_promise().getFuture();

DriveInterface::increment_outstanding_counter(iocb);
auto sqe = t_uring_ch->get_sqe_or_enqueue(iocb);
assert(sqe);
DriveInterface::increment_outstanding_counter(iocb);
auto sqe = t_uring_ch->get_sqe_or_enqueue(iocb);
assert(sqe);

io_uring_prep_read(sqe, iodev->fd(), (void*)iocb->get_data(), iocb->size, offset);
t_uring_ch->submit_if_needed(iocb, sqe, false);
f.get();
#endif
io_uring_prep_read(sqe, iodev->fd(), (void*)iocb->get_data(), iocb->size, offset);
t_uring_ch->submit_if_needed(iocb, sqe, false);
f.get();
}
}

void UringDriveInterface::sync_readv(IODevice* iodev, const iovec* iov, int iovcnt, uint32_t size, uint64_t offset) {
Expand Down
3 changes: 2 additions & 1 deletion src/lib/interfaces/uring_drive_interface.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class UringDriveInterface : public KernelDriveInterface {
public:
static constexpr uint32_t per_thread_qdepth = 256;

UringDriveInterface(const io_interface_comp_cb_t& cb = nullptr);
UringDriveInterface(const bool new_interface_supported, const io_interface_comp_cb_t& cb = nullptr);
virtual ~UringDriveInterface() = default;
drive_interface_type interface_type() const override { return drive_interface_type::uring; }
std::string name() const override { return "uring_drive_interface"; }
Expand Down Expand Up @@ -119,5 +119,6 @@ class UringDriveInterface : public KernelDriveInterface {
private:
static thread_local uring_drive_channel* t_uring_ch;
UringDriveInterfaceMetrics m_metrics;
bool m_new_intfc;
};
} // namespace iomgr
12 changes: 6 additions & 6 deletions src/lib/iomgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ void IOManager::start(const iomgr_params& params, const thread_state_notifier_t&

// Do Poller specific pre interface initialization
m_impl->pre_interface_init();
m_is_uring_capable = check_uring_capability();
bool new_interface_supported = false;
m_is_uring_capable = check_uring_capability(new_interface_supported);
LOGINFOMOD(iomgr, "System has uring_capability={}", m_is_uring_capable);

// Create all in-built interfaces here
Expand All @@ -138,7 +139,8 @@ void IOManager::start(const iomgr_params& params, const thread_state_notifier_t&
iface_adder();
} else {
if (m_is_uring_capable && !m_is_spdk) {
add_drive_interface(std::dynamic_pointer_cast< DriveInterface >(std::make_shared< UringDriveInterface >()));
add_drive_interface(std::dynamic_pointer_cast< DriveInterface >(
std::make_shared< UringDriveInterface >(new_interface_supported)));
} else {
add_drive_interface(std::dynamic_pointer_cast< DriveInterface >(std::make_shared< AioDriveInterface >()));
}
Expand All @@ -151,7 +153,6 @@ void IOManager::start(const iomgr_params& params, const thread_state_notifier_t&
// Start all reactor threads
set_state(iomgr_state::reactor_init);


// Caller can override the number of fibers per thread; o.w., it is taken from dynamic config
create_worker_reactors((0 < params.num_fibers) ? params.num_fibers : IM_DYNAMIC_CONFIG(thread.num_fibers));
wait_for_state(iomgr_state::sys_init);
Expand Down Expand Up @@ -236,9 +237,8 @@ void IOManager::create_worker_reactors(uint32_t num_fibers) {
}

for (uint32_t i{0}; i < m_num_workers; ++i) {
m_worker_threads.emplace_back(
m_impl->create_reactor_impl(fmt::format("iomgr_thread_{}", i), m_is_spdk ? TIGHT_LOOP : INTERRUPT_LOOP,
num_fibers, (int)i, nullptr));
m_worker_threads.emplace_back(m_impl->create_reactor_impl(
fmt::format("iomgr_thread_{}", i), m_is_spdk ? TIGHT_LOOP : INTERRUPT_LOOP, num_fibers, (int)i, nullptr));
LOGDEBUGMOD(iomgr, "Created iomanager worker reactor thread {}...", i);
}
}
Expand Down
44 changes: 27 additions & 17 deletions src/lib/iomgr_helper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,43 @@ static constexpr uint64_t Gi{Ki * Mi};
static constexpr uint64_t Ti{Ki * Gi};
static const std::string hugepage_env{"HUGEPAGE"};

static bool check_uring_capability() {
static bool check_uring_capability(bool& new_interface_supported) {
#ifdef __linux__
#if LINUX_VERSION_CODE < KERNEL_VERSION(5, 6, 0)
bool uring_supported{true};
new_interface_supported = true;
if (syscall(__NR_io_uring_register, 0, IORING_UNREGISTER_BUFFERS, NULL, 0) && errno == ENOSYS) {
// No io_uring
return false;
new_interface_supported = false;
uring_supported = false;
} else {
// io_uring
return true;
uring_supported = true;
}
#else
std::vector< int > ops = {IORING_OP_NOP, IORING_OP_READV, IORING_OP_WRITEV,
IORING_OP_FSYNC, IORING_OP_READ, IORING_OP_WRITE};

bool supported{true};
struct io_uring_probe* probe = io_uring_get_probe();
if (probe == nullptr) { return false; }
if (uring_supported) {
// do futher check if new interfaces are supported (starting available with kernel 5.6);
std::vector< int > ops = {IORING_OP_NOP, IORING_OP_READV, IORING_OP_WRITEV,
IORING_OP_FSYNC, IORING_OP_READ, IORING_OP_WRITE};

struct io_uring_probe* probe = io_uring_get_probe();
if (probe == nullptr) {
new_interface_supported = false;
goto exit;
}

for (auto& op : ops) {
if (!io_uring_opcode_supported(probe, op)) {
supported = false;
break;
for (auto& op : ops) {
if (!io_uring_opcode_supported(probe, op)) {
new_interface_supported = false;
break;
}
}

free(probe);
}
free(probe);
return supported;
#endif

exit:
return uring_supported;

#else
return false;
#endif
Expand Down

0 comments on commit 169654a

Please sign in to comment.