Skip to content

Commit

Permalink
Adding ability to apply binlogs thru mysqlbinlog in multi-threaded mode
Browse files Browse the repository at this point in the history
Summary:
Added a new flag in `mysqlbinlog` `--mta-workers=x` that tells
the server to spawn `x` dependency applier workers to apply
transactions. When `--mta-workers` is specified all events are printed
in their base64 representation so we can create log events out of them.

Differential Revision: D49466823

---------------------------------------------------------------------------

Cast enum into target type in a ternary operator (facebook#1411)

Summary:
This fixes a GCC build error:

sql/rpl_replica.cc: In function ‘int slave_start_single_worker(Relay_log_info*, ulong)’: sql/rpl_replica.cc:7207:28: error: enumerated and non-enumerated type in conditional expression [-Werror=extra]
 7207 |             rli->is_fake() ? INFO_REPOSITORY_DUMMY : opt_rli_repository_id, i,
      |             ~~~~~~~~~~~~~~~^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Pull Request resolved: facebook#1411

Differential Revision: D52207263

fbshipit-source-id: 47d7f5c
  • Loading branch information
abhinav04sharma authored and inikep committed May 15, 2024
1 parent 36f8385 commit 7a4c1c4
Show file tree
Hide file tree
Showing 21 changed files with 398 additions and 52 deletions.
79 changes: 65 additions & 14 deletions client/mysqlbinlog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -743,6 +743,7 @@ static char *database = nullptr;
static char *output_file = nullptr;
static char *rewrite = nullptr;
bool force_opt = false, short_form = false, idempotent_mode = false;
int mta_workers = 0;
static bool debug_info_flag, debug_check_flag;
static bool force_if_open_opt = true, raw_mode = false;
static bool to_last_remote_log = false, stop_never = false;
Expand Down Expand Up @@ -1279,6 +1280,26 @@ static bool shall_skip_gtids(const Log_event *ev, Gtid *cached_gtid) {
return filtered;
}

static void print_event(Log_event *ev, FILE *result_file,
PRINT_EVENT_INFO *info) {
if (info->base64_output_mode != BASE64_OUTPUT_FULL) {
return ev->print(result_file, info);
}

auto cache = &info->head_cache;

if (!info->inside_group) my_b_printf(cache, "BINLOG '\n");
if (ev->starts_group() || is_gtid_event(ev)) info->inside_group = true;
if (ev->ends_group()) info->inside_group = false;
ev->print_base64(cache, info, info->inside_group);

if (ev->get_type_code() == binary_log::FORMAT_DESCRIPTION_EVENT) {
info->printed_fd_event = true;
my_b_printf(cache, "/*!50616 SET @@SESSION.GTID_NEXT='AUTOMATIC'*/%s\n",
info->delimiter);
}
}

/**
Helper function that prints the cached begin query event to the output
Expand All @@ -1288,7 +1309,7 @@ static bool shall_skip_gtids(const Log_event *ev, Gtid *cached_gtid) {
@retval False ERROR
*/
static bool print_cached_begin_query(PRINT_EVENT_INFO *print_event_info) {
begin_query_ev_cache->print(result_file, print_event_info);
print_event(begin_query_ev_cache, result_file, print_event_info);
auto head = &print_event_info->head_cache;
if (head->error == -1) {
return false;
Expand Down Expand Up @@ -1520,7 +1541,7 @@ void handle_last_rows_query_event(bool print,
my_off_t temp_log_pos = last_rows_query_event.event_pos;
auto old_hexdump_from = print_event_info->hexdump_from;
print_event_info->hexdump_from = (opt_hexdump ? temp_log_pos : 0);
last_rows_query_event.event->print(result_file, print_event_info);
print_event(last_rows_query_event.event, result_file, print_event_info);
print_event_info->hexdump_from = old_hexdump_from;
}
last_rows_query_event.event->register_temp_buf(old_temp_buf);
Expand Down Expand Up @@ -1637,7 +1658,7 @@ static Exit_status process_event(PRINT_EVENT_INFO *print_event_info,

switch (ev_type) {
case binary_log::TRANSACTION_PAYLOAD_EVENT:
ev->print(result_file, print_event_info);
print_event(ev, result_file, print_event_info);
if (head->error == -1) goto err;
break;
case binary_log::QUERY_EVENT: {
Expand All @@ -1660,7 +1681,7 @@ static Exit_status process_event(PRINT_EVENT_INFO *print_event_info,
my_off_t temp_log_pos = pop_event_array.event_pos;
print_event_info->hexdump_from = (opt_hexdump ? temp_log_pos : 0);
if (!parent_query_skips)
temp_event->print(result_file, print_event_info);
print_event(temp_event, result_file, print_event_info);
delete temp_event;
}

Expand Down Expand Up @@ -1761,7 +1782,7 @@ static Exit_status process_event(PRINT_EVENT_INFO *print_event_info,
if (!in_transaction) seen_gtid = false;
}

ev->print(result_file, print_event_info);
print_event(ev, result_file, print_event_info);
if (head->error == -1) goto err;
break;
}
Expand Down Expand Up @@ -1795,7 +1816,7 @@ static Exit_status process_event(PRINT_EVENT_INFO *print_event_info,
the subsequent call load_processor.process fails, because the
output of Append_block_log_event::print is only a comment.
*/
ev->print(result_file, print_event_info);
print_event(ev, result_file, print_event_info);

if (opt_print_gtids && encounter_gtid(cached_gtid)) goto err;

Expand Down Expand Up @@ -1833,7 +1854,7 @@ static Exit_status process_event(PRINT_EVENT_INFO *print_event_info,

print_event_info->common_header_len =
dynamic_cast<Format_description_event *>(ev)->common_header_len;
ev->print(result_file, print_event_info);
print_event(ev, result_file, print_event_info);

if (head->error == -1) goto err;
if (!force_if_open_opt &&
Expand All @@ -1848,7 +1869,7 @@ static Exit_status process_event(PRINT_EVENT_INFO *print_event_info,
break;
}
case binary_log::BEGIN_LOAD_QUERY_EVENT:
ev->print(result_file, print_event_info);
print_event(ev, result_file, print_event_info);
if (head->error == -1) goto err;
if ((retval = load_processor.process(
(Begin_load_query_log_event *)ev)) != OK_CONTINUE)
Expand All @@ -1864,6 +1885,10 @@ static Exit_status process_event(PRINT_EVENT_INFO *print_event_info,
if (shall_skip_database(exlq->db))
print_event_info->skipped_event_in_transaction = true;
else {
if (print_event_info->base64_output_mode == BASE64_OUTPUT_FULL) {
error("Cannot handle Execute_load_query");
goto err;
}
if (fname) {
convert_path_to_forward_slashes(fname);
exlq->print(result_file, print_event_info, fname);
Expand Down Expand Up @@ -2038,7 +2063,7 @@ static Exit_status process_event(PRINT_EVENT_INFO *print_event_info,
goto err;
}

ev->print(result_file, print_event_info);
print_event(ev, result_file, print_event_info);

print_event_info->have_unflushed_events = true;

Expand Down Expand Up @@ -2069,7 +2094,7 @@ static Exit_status process_event(PRINT_EVENT_INFO *print_event_info,
print_event_info->delimiter);
print_event_info->skipped_event_in_transaction = false;

ev->print(result_file, print_event_info);
print_event(ev, result_file, print_event_info);
if (head->error == -1) goto err;
break;
}
Expand All @@ -2091,12 +2116,12 @@ static Exit_status process_event(PRINT_EVENT_INFO *print_event_info,
begin_query_ev_cache = nullptr;
if (skip) break;
}
ev->print(result_file, print_event_info);
print_event(ev, result_file, print_event_info);
if (head->error == -1) goto err;
break;
}
case binary_log::METADATA_EVENT: {
ev->print(result_file, print_event_info);
print_event(ev, result_file, print_event_info);
if (head->error == -1) goto err;

/* Copy and flush head cache and body cache */
Expand All @@ -2118,7 +2143,7 @@ static Exit_status process_event(PRINT_EVENT_INFO *print_event_info,
"--include-gtids, respectively, instead.");
[[fallthrough]];
default:
ev->print(result_file, print_event_info);
print_event(ev, result_file, print_event_info);
if (head->error == -1) goto err;
}
/* Flush head cache to result_file for every event */
Expand Down Expand Up @@ -2246,6 +2271,11 @@ static struct my_option my_long_options[] = {
"applying Row Events",
&idempotent_mode, &idempotent_mode, nullptr, GET_BOOL, NO_ARG, 0, 0, 0,
nullptr, 0, nullptr},
{"mta-workers", 'w',
"Number of multi-threaded workers to spawn on the "
"server to apply binlogs",
&mta_workers, &mta_workers, nullptr, GET_INT, REQUIRED_ARG, 0, 0, 0,
nullptr, 0, nullptr},
{"local-load", 'l',
"Prepare local temporary files for LOAD DATA INFILE in the specified "
"directory.",
Expand Down Expand Up @@ -3808,6 +3838,11 @@ static int args_post_process(void) {
global_sid_lock->unlock();
}

if (mta_workers && opt_skip_gtids == 0) {
error("--mta-workers requires --skip-gtids option");
return ERROR_STOP;
}

return OK_CONTINUE;
}

Expand Down Expand Up @@ -4182,6 +4217,17 @@ int main(int argc, char **argv) {
fprintf(result_file, "/*!80019 SET @@SESSION.REQUIRE_ROW_FORMAT=1*/;\n\n");
}

auto orig_base64_output_mode = opt_base64_output_mode;
auto orig_short_form = short_form;
if (mta_workers) {
// we need to work in full base64 and short form mode for MTA
opt_base64_output_mode = BASE64_OUTPUT_FULL;
short_form = true;
fprintf(result_file,
"/*!50700 SET @@SESSION.MTA_BINLOG_STATEMENT_WORKERS=%d*/;\n",
mta_workers);
}

if (opt_start_gtid_str != nullptr || opt_find_gtid_str != nullptr) {
if (opt_start_gtid_str != nullptr && opt_remote_proto == BINLOG_DUMP_GTID) {
char *args = const_cast<char *>("");
Expand Down Expand Up @@ -4210,7 +4256,12 @@ int main(int argc, char **argv) {

if (!raw_mode && opt_find_gtid_str == nullptr) {
fprintf(result_file, "# End of log file\n");

if (mta_workers) {
opt_base64_output_mode = orig_base64_output_mode;
short_form = orig_short_form;
fprintf(result_file,
"/*!50700 SET @@SESSION.MTA_BINLOG_STATEMENT_WORKERS=0*/;\n");
}
fprintf(result_file,
"/*!50003 SET COMPLETION_TYPE=@OLD_COMPLETION_TYPE*/;\n");
if (disable_log_bin)
Expand Down
4 changes: 4 additions & 0 deletions mysql-test/r/mysqld--help-notwin.result
Original file line number Diff line number Diff line change
Expand Up @@ -1090,6 +1090,9 @@ The following options may be given as the first argument:
minimum_hlc_ns is successfully updated is guranteed to be
greater than this value. The maximum allowed drift
(forward) is controlled by maximum_hlc_drift_ns
--mta-binlog-statement-workers[=#]
Internal variable to specify the Number of workers to
spawn to apply binlogs thru mysqlbinlog piping
--mts-dependency-cond-wait-timeout[=#]
Timeout for all conditional waits in dependency repl in
milliseconds
Expand Down Expand Up @@ -3442,6 +3445,7 @@ memlock FALSE
min-examined-row-limit 0
min-examined-row-limit-sql-stats 0
minimum-hlc-ns 0
mta-binlog-statement-workers 0
mts-dependency-cond-wait-timeout 5000
mts-dependency-max-keys 100000
mts-dependency-order-commits DB
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ SET PERSIST_ONLY max_binlog_cache_size = @@GLOBAL.max_binlog_cache_size;
SET PERSIST_ONLY max_binlog_size = @@GLOBAL.max_binlog_size;
SET PERSIST_ONLY max_binlog_stmt_cache_size = @@GLOBAL.max_binlog_stmt_cache_size;
SET PERSIST_ONLY max_relay_log_size = @@GLOBAL.max_relay_log_size;
SET PERSIST_ONLY mta_binlog_statement_workers = @@GLOBAL.mta_binlog_statement_workers;
SET PERSIST_ONLY mts_dependency_replication = @@GLOBAL.mts_dependency_replication;
SET PERSIST_ONLY prev_gtid_and_opid = @@GLOBAL.prev_gtid_and_opid;
ERROR HY000: Variable 'prev_gtid_and_opid' is a non persistent read only variable
Expand Down Expand Up @@ -403,6 +404,7 @@ RESET PERSIST max_binlog_cache_size;
RESET PERSIST max_binlog_size;
RESET PERSIST max_binlog_stmt_cache_size;
RESET PERSIST max_relay_log_size;
RESET PERSIST mta_binlog_statement_workers;
RESET PERSIST mts_dependency_replication;
RESET PERSIST prev_gtid_and_opid;
ERROR HY000: Variable prev_gtid_and_opid does not exist in persisted config file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ SET PERSIST max_binlog_cache_size = @@GLOBAL.max_binlog_cache_size;
SET PERSIST max_binlog_size = @@GLOBAL.max_binlog_size;
SET PERSIST max_binlog_stmt_cache_size = @@GLOBAL.max_binlog_stmt_cache_size;
SET PERSIST max_relay_log_size = @@GLOBAL.max_relay_log_size;
SET PERSIST mta_binlog_statement_workers = @@GLOBAL.mta_binlog_statement_workers;
SET PERSIST mts_dependency_replication = @@GLOBAL.mts_dependency_replication;
SET PERSIST prev_gtid_and_opid = @@GLOBAL.prev_gtid_and_opid;
ERROR HY000: Variable 'prev_gtid_and_opid' is a read only variable
Expand Down Expand Up @@ -405,6 +406,7 @@ RESET PERSIST IF EXISTS max_binlog_cache_size;
RESET PERSIST IF EXISTS max_binlog_size;
RESET PERSIST IF EXISTS max_binlog_stmt_cache_size;
RESET PERSIST IF EXISTS max_relay_log_size;
RESET PERSIST IF EXISTS mta_binlog_statement_workers;
RESET PERSIST IF EXISTS mts_dependency_replication;
RESET PERSIST IF EXISTS prev_gtid_and_opid;
Warnings:
Expand Down
37 changes: 37 additions & 0 deletions mysql-test/suite/rpl/r/rpl_mysqlbinlog_mta_workers.result
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
include/master-slave.inc
Warnings:
Note #### Sending passwords in plain text without SSL/TLS is extremely insecure.
Note #### Storing MySQL user name or password information in the connection metadata repository is not secure and is therefore not recommended. Please consider using the USER and PASSWORD connection options for START REPLICA; see the 'START REPLICA Syntax' in the MySQL Manual for more information.
[connection master]
call mtr.add_suppression("HA_ERR_FOUND_DUPP_KEY");
create table t1 (a int primary key) engine = innodb;
flush binary logs;
purge binary logs to 'master-bin.000001';
insert into t1 values(1);
insert into t1 values(2);
insert into t1 values(3);
insert into t1 values(4);
include/sync_slave_sql_with_master.inc
flush binary logs;
delete from t1;
"Case 1: No errors"
include/sync_slave_sql_with_master.inc
select * from t1;
a
1
2
3
4
select * from t1;
a
1
2
3
4
"Case 2: Duplicate key error on worker"
delete from t1;
insert into t1 values(3);
include/sync_slave_sql_with_master.inc
drop table t1;
include/sync_slave_sql_with_master.inc
include/rpl_end.inc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
--enable-binlog-hlc
49 changes: 49 additions & 0 deletions mysql-test/suite/rpl/t/rpl_mysqlbinlog_mta_workers.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
source include/master-slave.inc;
source include/have_binlog_format_row.inc;

call mtr.add_suppression("HA_ERR_FOUND_DUPP_KEY");

connection master;
let $MYSQLD_DATADIR = `select @@global.datadir`;

let $binlog_file = query_get_value(SHOW MASTER STATUS, File, 1);
create table t1 (a int primary key) engine = innodb;

flush binary logs;
eval purge binary logs to '$binlog_file';

insert into t1 values(1);
insert into t1 values(2);
insert into t1 values(3);
insert into t1 values(4);
source include/sync_slave_sql_with_master.inc;

connection master;
let $binlog_file = query_get_value(SHOW MASTER STATUS, File, 1);

flush binary logs;
delete from t1;

echo "Case 1: No errors";
exec $MYSQL_BINLOG --skip-gtids --mta-workers=2 $MYSQLD_DATADIR/$binlog_file | $MYSQL --host=127.0.0.1 -P $MASTER_MYPORT -uroot;
source include/sync_slave_sql_with_master.inc;

connection master;
select * from t1;

connection slave;
select * from t1;

echo "Case 2: Duplicate key error on worker";
connection master;
delete from t1;
insert into t1 values(3); # this will cause dup key error

exec $MYSQL_BINLOG --skip-gtids --mta-workers=2 $MYSQLD_DATADIR/$binlog_file | $MYSQL --host=127.0.0.1 -P $MASTER_MYPORT -uroot || true;
source include/sync_slave_sql_with_master.inc;

connection master;
drop table t1;
source include/sync_slave_sql_with_master.inc;

source include/rpl_end.inc;
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
SET @start_value = @@global.mta_binlog_statement_workers;
SELECT @start_value;
@start_value
0
SET @@GLOBAL.mta_binlog_statement_workers = 4;
SET @@SESSION.mta_binlog_statement_workers = 4;
SET @@SESSION.mta_binlog_statement_workers = 0;
Warnings:
Warning 1231 mta_binlog_statement_workers can only be set from mysqlbinlog
SET @@GLOBAL.mta_binlog_statement_workers = @start_value;
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
source include/load_sysvars.inc;

SET @start_value = @@global.mta_binlog_statement_workers;
SELECT @start_value;

SET @@GLOBAL.mta_binlog_statement_workers = 4;

SET @@SESSION.mta_binlog_statement_workers = 4;

SET @@SESSION.mta_binlog_statement_workers = 0;

SET @@GLOBAL.mta_binlog_statement_workers = @start_value;
5 changes: 4 additions & 1 deletion sql/log_event.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2563,7 +2563,10 @@ void Log_event::print_base64(IO_CACHE *file, PRINT_EVENT_INFO *print_event_info,
}

if (print_event_info->base64_output_mode != BASE64_OUTPUT_DECODE_ROWS) {
if (my_b_tell(file) == 0) {
// In BASE64_OUTPUT_FULL mode mysqlbinlog.cc will determine when to print
// "BINLOG". See @print_event()
if (print_event_info->base64_output_mode != BASE64_OUTPUT_FULL &&
my_b_tell(file) == 0) {
my_b_printf(file, "\nBINLOG '\n");
print_event_info->inside_binlog = true;
}
Expand Down
Loading

0 comments on commit 7a4c1c4

Please sign in to comment.