Skip to content

Commit

Permalink
Support for v3 protocol (#89)
Browse files Browse the repository at this point in the history
* implementation for simultaneous tcp connections
  • Loading branch information
RogerZhongAWS authored Sep 20, 2022
1 parent 3ab22c7 commit 6716623
Show file tree
Hide file tree
Showing 11 changed files with 961 additions and 299 deletions.
327 changes: 327 additions & 0 deletions V3WebSocketProtocolGuide.md

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions resources/Message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ message Message {
bytes payload = 4;
string serviceId = 5;
repeated string availableServiceIds = 6;
uint32 connectionId = 7;

enum Type {
UNKNOWN = 0;
Expand All @@ -22,5 +23,7 @@ message Message {
STREAM_RESET = 3;
SESSION_RESET = 4;
SERVICE_IDS = 5;
CONNECTION_START = 6;
CONNECTION_RESET = 7;
}
}
6 changes: 5 additions & 1 deletion src/LocalproxyConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ namespace aws {
/**
* The web proxy endpoint port. This will be set only if a web proxy is necessary. defaults to 3128.
*/
std::uint16_t web_proxy_port {0 };
std::uint16_t web_proxy_port { 0 };
/**
* The web proxy authN. This will be set only if an web proxy is necessary and it requires authN.
*/
Expand Down Expand Up @@ -105,6 +105,10 @@ namespace aws {
* If this is set to true, it means that v2 local proxy won't validate service id field.
*/
bool is_v1_message_format {false};
/**
* A flag to judge if v3 local proxy needs to fallback to communicate using v2 local proxy message format.
*/
bool is_v2_message_format {false};
};
}
}
Expand Down
8 changes: 6 additions & 2 deletions src/ProxySettings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ namespace aws { namespace iot { namespace securedtunneling { namespace settings
std::size_t const DEFAULT_MAX_DATA_FRAME_SIZE = DEFAULT_MESSAGE_MAX_SIZE + DEFAULT_DATA_LENGTH_SIZE;

char const * const KEY_TCP_CONNECTION_RETRY_COUNT = "tunneling.proxy.tcp.connection_retry_count";
std::int32_t const DEFAULT_TCP_CONNECTION_RETRY_COUNT = 5;
std::int32_t const DEFAULT_TCP_CONNECTION_RETRY_COUNT = -1;

char const * const KEY_TCP_CONNECTION_RETRY_DELAY_MS = "tunneling.proxy.tcp.connection_retry_delay_ms";
std::uint32_t const DEFAULT_TCP_CONNECTION_RETRY_DELAY_MS = 1000;
Expand All @@ -34,6 +34,9 @@ namespace aws { namespace iot { namespace securedtunneling { namespace settings

char const * const KEY_MESSAGE_MAX_SIZE = "tunneling.proxy.message.max_size";
std::size_t const DEFAULT_MESSAGE_MAX_SIZE = 64 * 1024;

char const * const KEY_MAX_ACTIVE_CONNECTIONS = "tunneling.proxy.tcp.max_active_connections";
std::uint32_t const DEFAULT_MAX_ACTIVE_CONNECTIONS = 128;

char const * const KEY_WEB_SOCKET_PING_PERIOD_MS = "tunneling.proxy.websocket.ping_period_ms";
std::uint32_t const DEFAULT_WEB_SOCKET_PING_PERIOD_MS = 5000;
Expand All @@ -48,7 +51,7 @@ namespace aws { namespace iot { namespace securedtunneling { namespace settings
bool const DEFAULT_WEB_SOCKET_DATA_ERROR_RETRY = true;

char const * const KEY_WEB_SOCKET_SUBPROTOCOL = "tunneling.proxy.websocket.subprotocol";
std::string const DEFAULT_WEB_SOCKET_SUBPROTOCOL = "aws.iot.securetunneling-2.0";
std::string const DEFAULT_WEB_SOCKET_SUBPROTOCOL = "aws.iot.securetunneling-3.0";

char const * const KEY_WEB_SOCKET_MAX_FRAME_SIZE = "tunneling.proxy.websocket.max_frame_size";
std::size_t const DEFAULT_WEB_SOCKET_MAX_FRAME_SIZE = DEFAULT_MAX_DATA_FRAME_SIZE * 2;
Expand Down Expand Up @@ -83,6 +86,7 @@ namespace aws { namespace iot { namespace securedtunneling { namespace settings
ADD_SETTING_DEFAULT(settings, TCP_READ_BUFFER_SIZE);
ADD_SETTING_DEFAULT(settings, MESSAGE_MAX_PAYLOAD_SIZE);
ADD_SETTING_DEFAULT(settings, MESSAGE_MAX_SIZE);
ADD_SETTING_DEFAULT(settings, MAX_ACTIVE_CONNECTIONS);
ADD_SETTING_DEFAULT(settings, WEB_SOCKET_PING_PERIOD_MS);
ADD_SETTING_DEFAULT(settings, WEB_SOCKET_CONNECT_RETRY_DELAY_MS);
ADD_SETTING_DEFAULT(settings, WEB_SOCKET_CONNECT_RETRY_COUNT);
Expand Down
3 changes: 3 additions & 0 deletions src/ProxySettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ namespace aws { namespace iot { namespace securedtunneling { namespace settings
extern char const * const KEY_MESSAGE_MAX_SIZE;
extern std::size_t const DEFAULT_MESSAGE_MAX_SIZE;

extern char const * const KEY_MAX_ACTIVE_CONNECTIONS;
extern std::uint32_t const DEFAULT_MAX_ACTIVE_CONNECTIONS;

extern char const * const KEY_WEB_SOCKET_PING_PERIOD_MS;
extern std::uint32_t const DEFAULT_WEB_SOCKET_PING_PERIOD_MS;

Expand Down
826 changes: 563 additions & 263 deletions src/TcpAdapterProxy.cpp

Large diffs are not rendered by default.

48 changes: 29 additions & 19 deletions src/TcpAdapterProxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,14 @@ namespace aws { namespace iot { namespace securedtunneling {
wss{ nullptr },
wss_resolver{ io_ctx },
wss_response{ },
num_active_connections{ 0 },
stream_id{ -1 },
service_id{ "" },
serviceId_to_streamId_map{},
serviceId_to_tcp_server_map{},
serviceId_to_tcp_client_map{},
serviceId_to_control_message_handler_map{},
serviceId_to_data_message_handler_map{},
bind_address_actual{ },
is_web_socket_reading{ false },
is_service_ids_received{ false },
Expand All @@ -105,6 +108,8 @@ namespace aws { namespace iot { namespace securedtunneling {
//debuggability.
boost::beast::websocket::response_type wss_response;

std::atomic_uint16_t num_active_connections;

//represents the current stream ID to expect data from
//care should be taken how(if) this is updated directly
// To be deleted
Expand All @@ -113,6 +118,8 @@ namespace aws { namespace iot { namespace securedtunneling {
std::unordered_map<std::string, std::int32_t> serviceId_to_streamId_map;
std::unordered_map<std::string, tcp_server::pointer> serviceId_to_tcp_server_map;
std::unordered_map<std::string, tcp_client::pointer> serviceId_to_tcp_client_map;
std::unordered_map<std::string, std::function<bool(message const &)>> serviceId_to_control_message_handler_map;
std::unordered_map<std::string, std::function<bool(message const &)>> serviceId_to_data_message_handler_map;
std::string bind_address_actual;
//flag set to true while web socket data is being drained
//necessary for better TCP socket recovery rather than destroying
Expand Down Expand Up @@ -152,15 +159,15 @@ namespace aws { namespace iot { namespace securedtunneling {

int run_proxy();
private:
void update_message_handlers(tcp_adapter_context &tac, std::function<bool(message const &)> handler);
void setup_tcp_socket(tcp_adapter_context &tac, std::string const & service_id);
void setup_tcp_sockets(tcp_adapter_context &tac);
//setup async io flow to connect tcp socket to the adapter config's data host/port
void async_setup_dest_tcp_socket(tcp_adapter_context &tac, std::string const & service_id);
void async_setup_dest_tcp_socket_retry(tcp_adapter_context &tac, std::shared_ptr<basic_retry_config> retry_config, std::string const & service_id);
void async_setup_dest_tcp_socket(tcp_adapter_context &tac, std::string const & service_id, uint32_t const & connection_id, bool is_first_connection);
void async_setup_dest_tcp_socket_retry(tcp_adapter_context &tac, std::shared_ptr<basic_retry_config> retry_config, std::string const & service_id, uint32_t const & connection_id, bool is_first_connection);
void async_setup_source_tcp_sockets(tcp_adapter_context &tac);
void async_setup_source_tcp_socket_retry(tcp_adapter_context &tac, std::shared_ptr<basic_retry_config> retry_config, std::string service_id);
void initialize_tcp_clients(tcp_adapter_context &tac);
void initialize_tcp_servers(tcp_adapter_context &tac);
void do_accept_tcp_connection(tcp_adapter_context &tac, std::shared_ptr<basic_retry_config> retry_config, std::string service_id, std::uint16_t local_port, bool is_first_connection);
void setup_web_socket(tcp_adapter_context &tac);
//setup async web socket, and as soon as connection is up, setup async ping schedule
void async_setup_web_socket(tcp_adapter_context &tac);
Expand All @@ -169,10 +176,13 @@ namespace aws { namespace iot { namespace securedtunneling {
//then the reset is intentionally reset via web socket, and retries
//occur definitely (regardless of retry configuration)
void tcp_socket_reset_all(tcp_adapter_context &tac, std::function<void()> post_reset_operation);
void tcp_socket_reset(tcp_adapter_context &tac, std::string service_id, std::function<void()> post_reset_operation);
tcp_connection::pointer get_tcp_connection(tcp_adapter_context &tac, std::string service_id);
void tcp_socket_reset_init(tcp_adapter_context &tac, std::string service_id, std::function<void()> post_reset_operation);
void tcp_socket_reset(tcp_adapter_context &tac, std::string service_id, uint32_t connection_id, std::function<void()> post_reset_operation);
void tcp_socket_close(tcp_adapter_context &tac, std::string service_id, uint32_t connection_id);
tcp_connection::pointer get_tcp_connection(tcp_adapter_context &tac, std::string service_id, uint32_t connection_id);

void tcp_socket_error(tcp_adapter_context &tac, boost::system::error_code const &_ec, std::string const & service_id);
void delete_tcp_socket(tcp_adapter_context &tac, std::string const & service_id, uint32_t const & connection_id);
void tcp_socket_error(tcp_adapter_context &tac, boost::system::error_code const &_ec, std::string const & service_id, uint32_t const & connection_id);

//sets up a web socket read loop that will read, and ignore most messages until a stream start
//is read and then do something with it (likely, connect to configured endpoint)
Expand All @@ -197,22 +207,21 @@ namespace aws { namespace iot { namespace securedtunneling {
//invokes after_setup_web_socket_read_until_stream_start() after stream start is encountered
bool async_wait_for_stream_start(tcp_adapter_context &tac, message const &message);
bool async_wait_for_service_ids(tcp_adapter_context &tac);
void async_tcp_socket_read_loop(tcp_adapter_context &tac, std::string const & service_id);
void async_tcp_socket_read_loop(tcp_adapter_context &tac, std::string const & service_id, uint32_t const & connection_id);

//below loop does continuous writes to TCP socket from the TCP adapter
//context's tcp_write_buffer. After consuming chunks out of the buffer
//the behavior will be to check
void async_tcp_write_buffer_drain(tcp_adapter_context &tac, std::string service_id);
//the behavior will be to check
void async_tcp_write_buffer_drain(tcp_adapter_context &tac, std::string service_id, uint32_t connection_id);

void async_setup_bidirectional_data_transfers(tcp_adapter_context &tac, std::string const & service_id);
void async_setup_web_socket_write_buffer_drain(tcp_adapter_context &tac, std::string const & service_id);
void async_setup_bidirectional_data_transfers(tcp_adapter_context &tac, std::string const & service_id, uint32_t const & connection_id);
void async_setup_web_socket_write_buffer_drain(tcp_adapter_context &tac, std::string const & service_id, uint32_t const & connection_id);

//returns a boolean that indicates if another web socket data read message can be put
//onto the tcp write buffer. We have no way of knowing what the next message is and if
//it will be too big to process, thus we don't do the read applying back pressure on
//the socket. Implicitly, this means that an async_read is not happening on the web socket
bool tcp_has_enough_write_buffer_space(tcp_connection::pointer connection);
bool tcp_has_enough_write_buffer_space(tcp_adapter_context const &tac);

//returns a boolean that indicates if another tcp socket read's data can be put on the
//web socket write buffer. It's a bit different from tcp write buffer space requirements
Expand All @@ -226,8 +235,11 @@ namespace aws { namespace iot { namespace securedtunneling {
bool is_valid_stream_id(tcp_adapter_context const& tac, message const &message);

void async_send_message(tcp_adapter_context &tac, message const &message);
void async_send_stream_start(tcp_adapter_context &tac, std::string const & service_id);
void async_send_stream_reset(tcp_adapter_context &tac, std::string const & service_id);
void async_send_message(tcp_adapter_context &tac, message const &message, std::string const & service_id, uint32_t const & connection_id);
void async_send_stream_start(tcp_adapter_context &tac, std::string const & service_id, uint32_t const & connection_id);
void async_send_stream_reset(tcp_adapter_context &tac, std::string const & service_id, uint32_t const & connection_id);
void async_send_connection_start(tcp_adapter_context &tac, std::string const & service_id, uint32_t const & connection_id);
void async_send_connection_reset(tcp_adapter_context &tac, std::string const & service_id, uint32_t const & connection_id);

//handler for successfully sent ping will delay the next one
void async_ping_handler_loop(tcp_adapter_context &tac,
Expand All @@ -239,16 +251,14 @@ namespace aws { namespace iot { namespace securedtunneling {
void clear_ws_buffers(tcp_adapter_context &tac);
void clear_tcp_connection_buffers(tcp_connection::pointer connection);

void tcp_socket_ensure_closed(boost::asio::ip::tcp::socket & tcp_socket);

//closes the websocket connection
//1 - shutdown the receive side of TCP
//2 - drain the web socket write buffer
//3 - send a web socket close frame
//4 - perform teardown procedure on websocket
void web_socket_close_and_stop(tcp_adapter_context &tac);

void async_resolve_destination_for_connect(tcp_adapter_context &tac, std::shared_ptr<basic_retry_config> retry_config, std::string const & service_id, boost::system::error_code const &ec, tcp::resolver::results_type results);
void async_resolve_destination_for_connect(tcp_adapter_context &tac, std::shared_ptr<basic_retry_config> retry_config, std::string const & service_id, uint32_t const & connection_id, boost::system::error_code const &ec, tcp::resolver::results_type results);

bool process_incoming_websocket_buffer(tcp_adapter_context &tac, boost::beast::multi_buffer &message_buffer);

Expand All @@ -264,7 +274,7 @@ namespace aws { namespace iot { namespace securedtunneling {

bool fall_back_to_v1_message_format(std::unordered_map<std::string, std::string> const & serviceId_to_endpoint_map);

void async_send_message_to_web_socket(tcp_adapter_context &tac, std::shared_ptr<boost::beast::flat_buffer> const& ss, std::string const & service_id);
void async_send_message_to_web_socket(tcp_adapter_context &tac, std::shared_ptr<boost::beast::flat_buffer> const& ss, std::string const & service_id, uint32_t const & connection_id);

void async_setup_destination_tcp_sockets(tcp_adapter_context &tac);

Expand Down
8 changes: 5 additions & 3 deletions src/TcpClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,20 @@ namespace aws { namespace iot { namespace securedtunneling { namespace connectio
tcp_client(boost::asio::io_context & io_context, std::size_t write_buf_size, std::size_t read_buf_size, std::size_t ws_write_buf_size)
: resolver_(io_context)
{
connection_ =
tcp_connection::create(io_context, write_buf_size, read_buf_size, ws_write_buf_size);

}
static pointer create(boost::asio::io_context& io_context, std::size_t const & write_buf_size, std::size_t const & read_buf_size, std::size_t const & ws_write_buf_size)
{
return pointer(new tcp_client(io_context, write_buf_size, read_buf_size, ws_write_buf_size));
}

tcp_connection::pointer connection_;
tcp::resolver resolver_;

std::unordered_map<uint32_t, tcp_connection::pointer> connectionId_to_tcp_connection_map;

// function object defines what to do after set up a tcp socket
std::function<void()> after_setup_tcp_socket = nullptr;

// function object defines what to do receiving control message: stream start
std::function<void()> on_receive_stream_start = nullptr;
};
Expand Down
11 changes: 8 additions & 3 deletions src/TcpConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,23 @@ namespace aws { namespace iot { namespace securedtunneling { namespace connectio
public:
typedef boost::shared_ptr<tcp_connection> pointer;

static pointer create(boost::asio::io_context& io_context, std::size_t const & write_buf_size, std::size_t const & read_buf_size, std::size_t ws_write_buf_size)
static pointer create(boost::asio::io_context& io_context, std::size_t const & write_buf_size, std::size_t const & read_buf_size, std::size_t ws_write_buf_size, uint32_t connection_id)
{
return pointer(new tcp_connection(io_context, write_buf_size, read_buf_size, ws_write_buf_size));
return pointer(new tcp_connection(io_context, write_buf_size, read_buf_size, ws_write_buf_size, connection_id));
}

tcp::socket& socket()
{
return socket_;
}

tcp_connection(boost::asio::io_context & io_context, std::size_t write_buf_size, std::size_t read_buf_size, std::size_t ws_write_buf_size)
tcp_connection(boost::asio::io_context & io_context, std::size_t write_buf_size, std::size_t read_buf_size, std::size_t ws_write_buf_size, uint32_t connection_id)
: socket_(io_context)
, tcp_write_buffer_(write_buf_size)
, tcp_read_buffer_(read_buf_size)
, web_socket_data_write_buffer_(ws_write_buf_size)
, connection_id_(connection_id)

{
}

Expand All @@ -51,6 +53,9 @@ namespace aws { namespace iot { namespace securedtunneling { namespace connectio
//condense smaller TCP read chunks to bigger web socket writes. It also makes
//it impossible to "inject" a non-data message in data sequence order
boost::beast::multi_buffer web_socket_data_write_buffer_;

uint32_t connection_id_; // assigned connection_id for tcp connection

// Is this tcp socket currently writing
bool is_tcp_socket_writing_{ false };
// Is this tcp socket currently reading
Expand Down
10 changes: 7 additions & 3 deletions src/TcpServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <boost/beast/core/flat_buffer.hpp>
#include <boost/asio.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <unordered_map>
#include "TcpConnection.h"

namespace aws { namespace iot { namespace securedtunneling { namespace connection {
Expand All @@ -16,8 +17,7 @@ namespace aws { namespace iot { namespace securedtunneling { namespace connectio
: acceptor_(io_context)
, resolver_(io_context)
{
connection_ =
tcp_connection::create(io_context, write_buf_size, read_buf_size, ws_write_buf_size);
highest_connection_id = 0;
}

static pointer create(boost::asio::io_context& io_context, std::size_t const & write_buf_size, std::size_t const & read_buf_size, std::size_t const & ws_write_buf_size)
Expand All @@ -32,7 +32,11 @@ namespace aws { namespace iot { namespace securedtunneling { namespace connectio

tcp::acceptor acceptor_;
tcp::resolver resolver_;
tcp_connection::pointer connection_;

std::unordered_map<uint32_t, tcp_connection::pointer> connectionId_to_tcp_connection_map;

std::atomic_uint32_t highest_connection_id;

// function object defines what to do after set up a tcp socket
std::function<void()> after_setup_tcp_socket = nullptr;
};
Expand Down
Loading

0 comments on commit 6716623

Please sign in to comment.