Skip to content

Commit

Permalink
enabled custom executors in io.
Browse files Browse the repository at this point in the history
  • Loading branch information
klemens-morgenstern committed Aug 4, 2023
1 parent ade95f3 commit 4619c05
Show file tree
Hide file tree
Showing 25 changed files with 237 additions and 152 deletions.
4 changes: 3 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ add_custom_command(OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/doc/index.html

add_custom_target(boost_async_doc DEPENDS ${CMAKE_CURRENT_BINARY_DIR}/doc/index.html)

add_compile_definitions(BOOST_ASIO_HAS_IO_URING=1)

if(BOOST_ASYNC_IS_ROOT)
#include(CTest)
Expand Down Expand Up @@ -120,7 +121,8 @@ add_library(boost_async
src/io/write_at.cpp
src/io/detail/random_access_device.cpp
src/io/copy.cpp
src/io/copy_n.cpp)
src/io/copy_n.cpp
src/io/sleep.cpp)

target_include_directories(boost_async PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/include")
target_link_libraries(boost_async PUBLIC
Expand Down
4 changes: 2 additions & 2 deletions include/boost/async/io/acceptor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ namespace boost::async::io

struct acceptor
{
BOOST_ASYNC_DECL acceptor();
BOOST_ASYNC_DECL acceptor(endpoint ep);
BOOST_ASYNC_DECL acceptor(const async::executor & executor = this_thread::get_executor());
BOOST_ASYNC_DECL acceptor(endpoint ep, const async::executor & executor = this_thread::get_executor());
BOOST_ASYNC_DECL system::result<void> bind(endpoint ep);
BOOST_ASYNC_DECL system::result<void> listen(int backlog = asio::socket_base::max_listen_connections); // int backlog = asio::max_backlog()
BOOST_ASYNC_DECL endpoint local_endpoint();
Expand Down
10 changes: 6 additions & 4 deletions include/boost/async/io/datagram_socket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ namespace boost::async::io
struct [[nodiscard]] datagram_socket final : socket
{
// duplicate onto another thread
system::result<datagram_socket> duplicate();
system::result<datagram_socket> duplicate(const async::executor & executor = this_thread::get_executor());

datagram_socket();
datagram_socket(const async::executor & executor = this_thread::get_executor());
datagram_socket(datagram_socket && lhs);
datagram_socket(native_handle_type h, protocol_type protocol = protocol_type());
datagram_socket(endpoint ep);
datagram_socket(native_handle_type h, protocol_type protocol = protocol_type(),
const async::executor & executor = this_thread::get_executor());
datagram_socket(endpoint ep,
const async::executor & executor = this_thread::get_executor());
private:
struct receive_op_;
struct receive_op_seq_;
Expand Down
23 changes: 12 additions & 11 deletions include/boost/async/io/pipe.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
namespace boost::async::io
{

system::result<std::pair<struct readable_pipe, struct writable_pipe>> make_pipe();
system::result<std::pair<struct readable_pipe, struct writable_pipe>> make_pipe(
const async::executor & executor = this_thread::get_executor()
);

struct readable_pipe final : stream
{
Expand All @@ -27,13 +29,13 @@ struct readable_pipe final : stream
using native_handle_type = typename asio::basic_readable_pipe<executor>::native_handle_type;
native_handle_type native_handle() {return pipe_.native_handle();}

BOOST_ASYNC_DECL readable_pipe();
BOOST_ASYNC_DECL readable_pipe(native_handle_type native_handle);
BOOST_ASYNC_DECL readable_pipe(const async::executor & executor = this_thread::get_executor());
BOOST_ASYNC_DECL readable_pipe(native_handle_type native_handle, const async::executor & executor = this_thread::get_executor());

BOOST_ASYNC_DECL system::result<void> assign(native_handle_type native_handle);
BOOST_ASYNC_DECL system::result<native_handle_type> release();

BOOST_ASYNC_DECL system::result<readable_pipe> duplicate();
BOOST_ASYNC_DECL system::result<readable_pipe> duplicate(const async::executor & executor = this_thread::get_executor());

void write_some(buffers::mutable_buffer_subspan buffers) = delete;
void write_some(buffers::mutable_buffer buffer) = delete;
Expand All @@ -42,7 +44,7 @@ struct readable_pipe final : stream
BOOST_ASYNC_DECL void async_read_some_impl_(buffers::mutable_buffer_subspan buffer, async::completion_handler<system::error_code, std::size_t> h) override;
BOOST_ASYNC_DECL void async_write_some_impl_(buffers::const_buffer_subspan buffer, async::completion_handler<system::error_code, std::size_t> h) override;

friend system::result<std::pair<readable_pipe, writable_pipe>> make_pipe();
friend system::result<std::pair<readable_pipe, writable_pipe>> make_pipe(const async::executor & exec);
asio::basic_readable_pipe<executor> pipe_;
};

Expand All @@ -55,24 +57,23 @@ struct writable_pipe final : stream
using native_handle_type = typename asio::basic_readable_pipe<executor>::native_handle_type;
native_handle_type native_handle() {return pipe_.native_handle();}

BOOST_ASYNC_DECL writable_pipe();
BOOST_ASYNC_DECL writable_pipe(native_handle_type native_handle);
BOOST_ASYNC_DECL writable_pipe(const async::executor & executor = this_thread::get_executor());
BOOST_ASYNC_DECL writable_pipe(native_handle_type native_handle,
const async::executor & executor = this_thread::get_executor());

BOOST_ASYNC_DECL system::result<void> assign(native_handle_type native_handle);
BOOST_ASYNC_DECL system::result<native_handle_type> release();
BOOST_ASYNC_DECL system::result<writable_pipe> duplicate();
BOOST_ASYNC_DECL system::result<writable_pipe> duplicate(const async::executor & executor = this_thread::get_executor());

void read_some(buffers::mutable_buffer_subspan buffers) = delete;
void read_some(buffers::mutable_buffer buffer) = delete;
private:
BOOST_ASYNC_DECL void async_read_some_impl_(buffers::mutable_buffer_subspan buffer, async::completion_handler<system::error_code, std::size_t> h) override;
BOOST_ASYNC_DECL void async_write_some_impl_(buffers::const_buffer_subspan buffer, async::completion_handler<system::error_code, std::size_t> h) override;
friend system::result<std::pair<readable_pipe, writable_pipe>> make_pipe();
friend system::result<std::pair<readable_pipe, writable_pipe>> make_pipe(const async::executor & exec);
asio::basic_writable_pipe<executor> pipe_;
};

system::result<std::pair<readable_pipe, writable_pipe>> make_pipe();


}

Expand Down
10 changes: 6 additions & 4 deletions include/boost/async/io/popen.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@ struct popen : stream
using native_handle_type = typename boost::process::v2::basic_process<executor>::native_handle_type;

BOOST_ASYNC_DECL popen(boost::process::v2::filesystem::path executable,
std::initializer_list<core::string_view> args,
process_initializer initializer = {});
std::initializer_list<core::string_view> args,
process_initializer initializer = {},
const async::executor & executor = this_thread::get_executor());


BOOST_ASYNC_DECL popen(boost::process::v2::filesystem::path executable,
std::span<core::string_view> args,
process_initializer initializer = {});
std::span<core::string_view> args,
process_initializer initializer = {},
const async::executor & executor = this_thread::get_executor());

[[nodiscard]] BOOST_ASYNC_DECL system::result<void> interrupt();
[[nodiscard]] BOOST_ASYNC_DECL system::result<void> request_exit();
Expand Down
11 changes: 7 additions & 4 deletions include/boost/async/io/process.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,18 @@ struct process

BOOST_ASYNC_DECL process(boost::process::v2::filesystem::path executable,
std::initializer_list<core::string_view> args,
process_initializer initializer = {});
process_initializer initializer = {},
const async::executor & executor = this_thread::get_executor());


BOOST_ASYNC_DECL process(boost::process::v2::filesystem::path executable,
std::span<core::string_view> args,
process_initializer initializer = {});
process_initializer initializer = {},
const async::executor & executor = this_thread::get_executor());

BOOST_ASYNC_DECL process(pid_type pid);
BOOST_ASYNC_DECL process(pid_type pid, native_handle_type native_handle);
BOOST_ASYNC_DECL process(pid_type pid, const async::executor & executor = this_thread::get_executor());
BOOST_ASYNC_DECL process(pid_type pid, native_handle_type native_handle,
const async::executor & executor = this_thread::get_executor());

[[nodiscard]] BOOST_ASYNC_DECL system::result<void> interrupt();
[[nodiscard]] BOOST_ASYNC_DECL system::result<void> request_exit();
Expand Down
19 changes: 8 additions & 11 deletions include/boost/async/io/resolver.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ struct resolver
{
using resolve_result = system::result<container::pmr::vector<endpoint>>;

BOOST_ASYNC_DECL resolver();
BOOST_ASYNC_DECL resolver(const async::executor & executor = this_thread::get_executor());
BOOST_ASYNC_DECL resolver(resolver && ) = delete;

BOOST_ASYNC_DECL void cancel();
Expand All @@ -42,32 +42,29 @@ struct resolver
asio::ip::basic_resolver<protocol_type, executor> & resolver_;
core::string_view host_;
core::string_view service_;


};

public:

[[nodiscard]] resolve_op_ resolve(core::string_view host, core::string_view service)
{
return resolve_op_{resolver_, host, service};
}


private:
asio::ip::basic_resolver<protocol_type, executor> resolver_;
};

struct lookup
struct lookup final : result_op<resolver::resolve_result::value_type>
{
lookup(core::string_view host, core::string_view service)
: host_(host), service_(service) {}
auto operator co_await() {return resolver_.resolve(host_, service_);}
private:

BOOST_ASYNC_DECL
void initiate(completion_handler<system::error_code, resolver::resolve_result::value_type>);

private:
core::string_view host_;
core::string_view service_;
resolver resolver_;

std::optional<asio::ip::basic_resolver<protocol_type, executor>> resolver_;
};

}
Expand Down
48 changes: 30 additions & 18 deletions include/boost/async/io/sleep.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,37 +8,49 @@
#ifndef BOOST_ASYNC_IO_SLEEP_HPP
#define BOOST_ASYNC_IO_SLEEP_HPP

#include <boost/async/io/steady_timer.hpp>

#include <boost/async/io/system_timer.hpp>
#include <boost/async/io/result.hpp>

#include <boost/asio/basic_waitable_timer.hpp>

#include <boost/async/promise.hpp>
#include <chrono>

namespace boost::async::detail::io
{

struct steady_sleep

struct steady_sleep final : async::io::result_op<>
{
steady_sleep(const std::chrono::steady_clock::time_point & tp) : tim{tp} {}
steady_sleep(const std::chrono::steady_clock::duration & du) : tim{du} {}
steady_sleep(const std::chrono::steady_clock::time_point & tp) : time_{tp} {}
steady_sleep(const std::chrono::steady_clock::duration & du)
: time_{std::chrono::steady_clock::now() + du} {}

BOOST_ASYNC_DECL void ready(async::handler<system::error_code> h);
BOOST_ASYNC_DECL void initiate(async::completion_handler<system::error_code> complete);

async::io::steady_timer::wait_op_ operator co_await() { return tim.wait(); }
async::io::steady_timer::wait_op_::vawaitable value() { return std::move(op_.emplace(tim.wait())).value(); }
private:
async::io::steady_timer tim;
std::optional<async::io::steady_timer::wait_op_> op_;
std::chrono::steady_clock::time_point time_;
std::optional<boost::asio::basic_waitable_timer<std::chrono::steady_clock,
asio::wait_traits<std::chrono::steady_clock>,
executor>> timer_;
};


struct system_sleep
struct system_sleep final : async::io::result_op<>
{
system_sleep(const std::chrono::system_clock::time_point & tp) : tim{tp} {}
system_sleep(const std::chrono::system_clock::duration & du) : tim{du} {}

async::io::system_timer::wait_op_ operator co_await() { return tim.wait(); }
async::io::system_timer::wait_op_::vawaitable value() { return std::move(op_.emplace(tim.wait())).value(); }
private:
async::io::system_timer tim;
std::optional<async::io::system_timer::wait_op_> op_;
system_sleep(const std::chrono::system_clock::time_point & tp) : time_{tp} {}
system_sleep(const std::chrono::system_clock::duration & du)
: time_{std::chrono::system_clock::now() + du} {}

BOOST_ASYNC_DECL void ready(async::handler<system::error_code> h);
BOOST_ASYNC_DECL void initiate(async::completion_handler<system::error_code> complete);

private:
std::chrono::system_clock::time_point time_;
std::optional<boost::asio::basic_waitable_timer<std::chrono::system_clock,
asio::wait_traits<std::chrono::system_clock>,
executor>> timer_;
};

}
Expand Down
4 changes: 2 additions & 2 deletions include/boost/async/io/ssl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ struct ssl_stream : private detail::ssl_stream_base, stream, socket
[[nodiscard]] BOOST_ASYNC_DECL system::result<void> cancel() override;
[[nodiscard]] BOOST_ASYNC_DECL bool is_open() const override;

BOOST_ASYNC_DECL ssl_stream();
BOOST_ASYNC_DECL ssl_stream(const async::executor & executor = this_thread::get_executor());
BOOST_ASYNC_DECL ssl_stream(ssl_stream && steam);
BOOST_ASYNC_DECL ssl_stream(stream_socket && socket);

BOOST_ASYNC_DECL ssl_stream(asio::ssl::context & ctx);
BOOST_ASYNC_DECL ssl_stream(asio::ssl::context & ctx, const async::executor & executor = this_thread::get_executor());
BOOST_ASYNC_DECL ssl_stream(asio::ssl::context & ctx, stream_socket && socket);

private:
Expand Down
6 changes: 3 additions & 3 deletions include/boost/async/io/steady_timer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ struct steady_timer
/// The time point type of the clock.
typedef typename clock_type::time_point time_point;

BOOST_ASYNC_DECL steady_timer();
BOOST_ASYNC_DECL steady_timer(const time_point& expiry_time);
BOOST_ASYNC_DECL steady_timer(const duration& expiry_time);
BOOST_ASYNC_DECL steady_timer(const async::executor & executor = this_thread::get_executor());
BOOST_ASYNC_DECL steady_timer(const time_point& expiry_time, const async::executor & executor = this_thread::get_executor());
BOOST_ASYNC_DECL steady_timer(const duration& expiry_time, const async::executor & executor = this_thread::get_executor());

BOOST_ASYNC_DECL void cancel();

Expand Down
10 changes: 6 additions & 4 deletions include/boost/async/io/stream_file.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,18 @@ namespace boost::async::io

struct stream_file : file, stream
{
BOOST_ASYNC_DECL system::result<stream_file> duplicate();
BOOST_ASYNC_DECL
system::result<stream_file> duplicate(const async::executor & executor = this_thread::get_executor());

[[nodiscard]] BOOST_ASYNC_DECL system::result<void> close() override;
[[nodiscard]] BOOST_ASYNC_DECL system::result<void> cancel() override;
[[nodiscard]] BOOST_ASYNC_DECL bool is_open() const override;

BOOST_ASYNC_DECL stream_file();
BOOST_ASYNC_DECL stream_file(const async::executor & executor = this_thread::get_executor());
BOOST_ASYNC_DECL stream_file(stream_file && lhs);
BOOST_ASYNC_DECL stream_file(native_handle_type h);
BOOST_ASYNC_DECL stream_file(core::string_view file, flags open_flags = flags::read_write);
BOOST_ASYNC_DECL stream_file(native_handle_type h, const async::executor & executor = this_thread::get_executor());
BOOST_ASYNC_DECL stream_file(core::string_view file, flags open_flags = flags::read_write,
const async::executor & executor = this_thread::get_executor());
private:
BOOST_ASYNC_DECL void async_read_some_impl_(buffers::mutable_buffer_subspan buffer, async::completion_handler<system::error_code, std::size_t> h) override;
BOOST_ASYNC_DECL void async_write_some_impl_(buffers::const_buffer_subspan buffer, async::completion_handler<system::error_code, std::size_t> h) override;
Expand Down
9 changes: 5 additions & 4 deletions include/boost/async/io/stream_socket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,17 @@ namespace boost::async::io
struct [[nodiscard]] stream_socket final : stream, socket
{
// duplicate onto another thread
BOOST_ASYNC_DECL system::result<stream_socket> duplicate();
BOOST_ASYNC_DECL system::result<stream_socket> duplicate(const async::executor & executor = this_thread::get_executor());

[[nodiscard]] BOOST_ASYNC_DECL system::result<void> close() override;
[[nodiscard]] BOOST_ASYNC_DECL system::result<void> cancel() override;
[[nodiscard]] BOOST_ASYNC_DECL bool is_open() const override;

BOOST_ASYNC_DECL stream_socket();
BOOST_ASYNC_DECL stream_socket(const async::executor & executor = this_thread::get_executor());
BOOST_ASYNC_DECL stream_socket(stream_socket && lhs);
BOOST_ASYNC_DECL stream_socket(native_handle_type h, protocol_type protocol = protocol_type());
BOOST_ASYNC_DECL stream_socket(endpoint ep);
BOOST_ASYNC_DECL stream_socket(native_handle_type h, protocol_type protocol = protocol_type(),
const async::executor & executor = this_thread::get_executor());
BOOST_ASYNC_DECL stream_socket(endpoint ep, const async::executor & executor = this_thread::get_executor());
private:
BOOST_ASYNC_DECL void async_read_some_impl_(buffers::mutable_buffer_subspan buffer, async::completion_handler<system::error_code, std::size_t> h) override;
BOOST_ASYNC_DECL void async_write_some_impl_(buffers::const_buffer_subspan buffer, async::completion_handler<system::error_code, std::size_t> h) override;
Expand Down
6 changes: 3 additions & 3 deletions include/boost/async/io/system_timer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ struct system_timer final
/// The time point type of the clock.
typedef typename clock_type::time_point time_point;

BOOST_ASYNC_DECL system_timer();
BOOST_ASYNC_DECL system_timer(const time_point& expiry_time);
BOOST_ASYNC_DECL system_timer(const duration& expiry_time);
BOOST_ASYNC_DECL system_timer(const async::executor & executor = this_thread::get_executor());
BOOST_ASYNC_DECL system_timer(const time_point& expiry_time, const async::executor & executor = this_thread::get_executor());
BOOST_ASYNC_DECL system_timer(const duration& expiry_time, const async::executor & executor = this_thread::get_executor());

BOOST_ASYNC_DECL void cancel();

Expand Down
6 changes: 4 additions & 2 deletions src/io/acceptor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@

namespace boost::async::io
{
acceptor::acceptor() : acceptor_{this_thread::get_executor()} {}
acceptor::acceptor(endpoint ep) : acceptor_{this_thread::get_executor(), ep} {}
acceptor::acceptor(const async::executor & exec)
: acceptor_{exec} {}
acceptor::acceptor(endpoint ep, const async::executor & exec)
: acceptor_{exec, ep} {}


system::result<void> acceptor::bind(endpoint ep)
Expand Down
Loading

0 comments on commit 4619c05

Please sign in to comment.