Skip to content

Commit

Permalink
Remove client notify io (#698)
Browse files Browse the repository at this point in the history
* sources/client: remove notify_io

The only purpose of it is to send kill command
to the clients. But using of eventfd for that is overkill and creates
an 2x usage of fd for odyssey.

This patch replaces the eventfd to atomic var.

Signed-off-by: rkhapov <[email protected]>

* client.h: remove useless signal

Signed-off-by: rkhapov <[email protected]>

---------

Signed-off-by: rkhapov <[email protected]>
Co-authored-by: rkhapov <[email protected]>
  • Loading branch information
rkhapov and rkhapov authored Oct 4, 2024
1 parent f191bb7 commit 4fde8db
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 95 deletions.
10 changes: 10 additions & 0 deletions sources/atomic.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,14 @@ static inline uint64_t od_atomic_u64_cas(od_atomic_u64_t *atomic,
return __sync_val_compare_and_swap(atomic, compValue, exchValue);
}

static inline void od_atomic_u64_set(od_atomic_u64_t *atomic, uint64_t newValue)
{
for (;;) {
uint64_t oldValue = od_atomic_u64_of(atomic);

if (__sync_bool_compare_and_swap(atomic, oldValue, newValue))
break;
}
}

#endif /* ODYSSEY_ATOMIC_H */
47 changes: 6 additions & 41 deletions sources/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
* Scalable PostgreSQL connection pooler.
*/

typedef struct od_client_ctl od_client_ctl_t;
typedef struct od_client od_client_t;

typedef enum {
Expand All @@ -17,25 +16,17 @@ typedef enum {
OD_CLIENT_QUEUE
} od_client_state_t;

typedef enum { OD_CLIENT_OP_NONE = 0, OD_CLIENT_OP_KILL = 1 } od_clientop_t;

struct od_client_ctl {
od_atomic_u32_t op;
};

#define OD_CLIENT_MAX_PEERLEN 128

struct od_client {
od_client_state_t state;
od_pool_client_type_t type;
od_id_t id;
od_client_ctl_t ctl;
uint64_t coroutine_id;
machine_tls_t *tls;
od_io_t io;
machine_cond_t *cond;
od_relay_t relay;
machine_io_t *notify_io;
od_rule_t *rule;
od_config_listen_t *config_listen;

Expand Down Expand Up @@ -66,6 +57,9 @@ struct od_client {
od_list_t link_pool;
od_list_t link;

/* Used to kill client in kill_client or odyssey reload */
od_atomic_u64_t killed;

/* storage_user & storage_password provided by ldapsearch result */
#ifdef LDAP_FOUND
char *ldap_storage_username;
Expand Down Expand Up @@ -104,8 +98,6 @@ static inline void od_client_init(od_client_t *client)
client->global = NULL;
client->time_accept = 0;
client->time_setup = 0;
client->notify_io = NULL;
client->ctl.op = OD_CLIENT_OP_NONE;
#ifdef LDAP_FOUND
client->ldap_storage_username = NULL;
client->ldap_storage_username_len = 0;
Expand All @@ -129,6 +121,8 @@ static inline void od_client_init(od_client_t *client)
od_list_init(&client->link);

client->prep_stmt_ids = NULL;

od_atomic_u64_set(&client->killed, 0);
}

static inline od_client_t *od_client_allocate(void)
Expand Down Expand Up @@ -158,38 +152,9 @@ static inline void od_client_free(od_client_t *client)
free(client);
}

static inline od_retcode_t od_client_notify_read(od_client_t *client)
{
uint64_t value;
return machine_read_raw(client->notify_io, &value, sizeof(value));
}

static inline void od_client_notify(od_client_t *client)
{
uint64_t value = 1;
size_t processed = 0;
machine_write_raw(client->notify_io, &value, sizeof(value), &processed);
}

static inline uint32_t od_client_ctl_of(od_client_t *client)
{
return od_atomic_u32_of(&client->ctl.op);
}

static inline void od_client_ctl_set(od_client_t *client, uint32_t op)
{
od_atomic_u32_or(&client->ctl.op, op);
}

static inline void od_client_ctl_unset(od_client_t *client, uint32_t op)
{
od_atomic_u32_xor(&client->ctl.op, op);
}

static inline void od_client_kill(od_client_t *client)
{
od_client_ctl_set(client, OD_CLIENT_OP_KILL);
od_client_notify(client);
od_atomic_u64_set(&client->killed, 1UL);
}

#endif /* ODYSSEY_CLIENT_H */
30 changes: 3 additions & 27 deletions sources/frontend.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,6 @@ static inline void od_frontend_close(od_client_t *client)
od_atomic_u32_dec(&router->clients);

od_io_close(&client->io);
if (client->notify_io) {
machine_close(client->notify_io);
machine_io_free(client->notify_io);
client->notify_io = NULL;
}
od_client_free(client);
}

Expand Down Expand Up @@ -625,12 +620,10 @@ static inline bool od_should_drop_connection(od_client_t *client,
}
static od_frontend_status_t od_frontend_ctl(od_client_t *client)
{
uint32_t op = od_client_ctl_of(client);
if (op & OD_CLIENT_OP_KILL) {
od_client_ctl_unset(client, OD_CLIENT_OP_KILL);
od_client_notify_read(client);
if (od_atomic_u64_of(&client->killed) == 1) {
return OD_STOP;
}

return OD_OK;
}

Expand Down Expand Up @@ -1581,12 +1574,6 @@ static od_frontend_status_t od_frontend_remote(od_client_t *client)

od_frontend_status_t status;

/* enable client notification mechanism */
int rc;
rc = machine_read_start(client->notify_io, client->cond);
if (rc == -1) {
return OD_ECLIENT_READ;
}
bool reserve_session_server_connection =
route->rule->reserve_session_server_connection;

Expand Down Expand Up @@ -1747,6 +1734,7 @@ static od_frontend_status_t od_frontend_remote(od_client_t *client)
status = OD_ESERVER_WRITE;
break;
}
int rc;
rc = od_write(&server->io, msg);
if (rc == -1) {
status = OD_ESERVER_WRITE;
Expand Down Expand Up @@ -2019,18 +2007,6 @@ void od_frontend(void *arg)
od_error(&instance->logger, "startup", client, NULL,
"failed to transfer client io");
od_io_close(&client->io);
machine_close(client->notify_io);
od_client_free(client);
od_atomic_u32_dec(&router->clients_routing);
return;
}

rc = machine_io_attach(client->notify_io);
if (rc == -1) {
od_error(&instance->logger, "startup", client, NULL,
"failed to transfer client notify io");
od_io_close(&client->io);
machine_close(client->notify_io);
od_client_free(client);
od_atomic_u32_dec(&router->clients_routing);
return;
Expand Down
27 changes: 0 additions & 27 deletions sources/system.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,35 +60,11 @@ static inline void od_system_server(void *arg)
instance->config.keepalive_probes,
instance->config.keepalive_usr_timeout);

machine_io_t *notify_io;
notify_io = machine_io_create();
if (notify_io == NULL) {
od_error(&instance->logger, "server", NULL, NULL,
"failed to allocate client io notify object");
machine_close(client_io);
machine_io_free(client_io);
continue;
}

rc = machine_eventfd(notify_io);
if (rc == -1) {
od_error(&instance->logger, "server", NULL, NULL,
"failed to get eventfd for client: %s",
machine_error(client_io));
machine_close(notify_io);
machine_io_free(notify_io);
machine_close(client_io);
machine_io_free(client_io);
continue;
}

/* allocate new client */
od_client_t *client = od_client_allocate();
if (client == NULL) {
od_error(&instance->logger, "server", NULL, NULL,
"failed to allocate client object");
machine_close(notify_io);
machine_io_free(notify_io);
machine_close(client_io);
machine_io_free(client_io);
continue;
Expand All @@ -104,8 +80,6 @@ static inline void od_system_server(void *arg)
if (rc == -1) {
od_error(&instance->logger, "server", NULL, NULL,
"failed to allocate client io object");
machine_close(notify_io);
machine_io_free(notify_io);
machine_close(client_io);
machine_io_free(client_io);
od_client_free(client);
Expand All @@ -115,7 +89,6 @@ static inline void od_system_server(void *arg)
client->config_listen = server->config;
client->tls = server->tls;
client->time_accept = 0;
client->notify_io = notify_io;
client->time_accept = machine_time_us();

/* create new client event and pass it to worker pool */
Expand Down

0 comments on commit 4fde8db

Please sign in to comment.