From 7a4c1c47a4f9ca92012d251fc014b303fe1aca68 Mon Sep 17 00:00:00 2001 From: Abhinav Sharma Date: Mon, 30 Oct 2023 13:14:24 -0700 Subject: [PATCH] Adding ability to apply binlogs thru mysqlbinlog in multi-threaded mode MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Summary: Added a new flag in `mysqlbinlog` `--mta-workers=x` that tells the server to spawn `x` dependency applier workers to apply transactions. When `--mta-workers` is specified all events are printed in their base64 representation so we can create log events out of them. Differential Revision: D49466823 --------------------------------------------------------------------------- Cast enum into target type in a ternary operator (#1411) Summary: This fixes a GCC build error: sql/rpl_replica.cc: In function ‘int slave_start_single_worker(Relay_log_info*, ulong)’: sql/rpl_replica.cc:7207:28: error: enumerated and non-enumerated type in conditional expression [-Werror=extra] 7207 | rli->is_fake() ? INFO_REPOSITORY_DUMMY : opt_rli_repository_id, i, | ~~~~~~~~~~~~~~~^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Pull Request resolved: https://github.com/facebook/mysql-5.6/pull/1411 Differential Revision: D52207263 fbshipit-source-id: 47d7f5c3bbf6ec6eb25247b728e354e60d29552a --- client/mysqlbinlog.cc | 79 ++++++++-- mysql-test/r/mysqld--help-notwin.result | 4 + .../r/binlog_persist_only_variables.result | 2 + .../r/binlog_persist_variables.result | 2 + .../rpl/r/rpl_mysqlbinlog_mta_workers.result | 37 +++++ .../t/rpl_mysqlbinlog_mta_workers-master.opt | 1 + .../rpl/t/rpl_mysqlbinlog_mta_workers.test | 49 ++++++ .../mta_binlog_statement_workers_basic.result | 10 ++ .../t/mta_binlog_statement_workers_basic.test | 12 ++ sql/log_event.cc | 5 +- sql/log_event.h | 4 + sql/mysqld.h | 1 + sql/rpl_mta_submode.cc | 4 +- sql/rpl_replica.cc | 10 +- sql/rpl_replica.h | 3 + sql/rpl_rli.cc | 25 +-- sql/rpl_rli.h | 2 + sql/rpl_rli_pdb.cc | 8 +- sql/sql_binlog.cc | 148 +++++++++++++++--- sql/sys_vars.cc | 42 +++++ sql/system_variables.h | 2 + 21 files changed, 398 insertions(+), 52 deletions(-) create mode 100644 mysql-test/suite/rpl/r/rpl_mysqlbinlog_mta_workers.result create mode 100644 mysql-test/suite/rpl/t/rpl_mysqlbinlog_mta_workers-master.opt create mode 100644 mysql-test/suite/rpl/t/rpl_mysqlbinlog_mta_workers.test create mode 100644 mysql-test/suite/sys_vars/r/mta_binlog_statement_workers_basic.result create mode 100644 mysql-test/suite/sys_vars/t/mta_binlog_statement_workers_basic.test diff --git a/client/mysqlbinlog.cc b/client/mysqlbinlog.cc index ea1bca2bf34c..a1556f531fcd 100644 --- a/client/mysqlbinlog.cc +++ b/client/mysqlbinlog.cc @@ -743,6 +743,7 @@ static char *database = nullptr; static char *output_file = nullptr; static char *rewrite = nullptr; bool force_opt = false, short_form = false, idempotent_mode = false; +int mta_workers = 0; static bool debug_info_flag, debug_check_flag; static bool force_if_open_opt = true, raw_mode = false; static bool to_last_remote_log = false, stop_never = false; @@ -1279,6 +1280,26 @@ static bool shall_skip_gtids(const Log_event *ev, Gtid *cached_gtid) { return filtered; } +static void print_event(Log_event *ev, FILE *result_file, + PRINT_EVENT_INFO *info) { + if (info->base64_output_mode != BASE64_OUTPUT_FULL) { + return ev->print(result_file, info); + } + + auto cache = &info->head_cache; + + if (!info->inside_group) my_b_printf(cache, "BINLOG '\n"); + if (ev->starts_group() || is_gtid_event(ev)) info->inside_group = true; + if (ev->ends_group()) info->inside_group = false; + ev->print_base64(cache, info, info->inside_group); + + if (ev->get_type_code() == binary_log::FORMAT_DESCRIPTION_EVENT) { + info->printed_fd_event = true; + my_b_printf(cache, "/*!50616 SET @@SESSION.GTID_NEXT='AUTOMATIC'*/%s\n", + info->delimiter); + } +} + /** Helper function that prints the cached begin query event to the output @@ -1288,7 +1309,7 @@ static bool shall_skip_gtids(const Log_event *ev, Gtid *cached_gtid) { @retval False ERROR */ static bool print_cached_begin_query(PRINT_EVENT_INFO *print_event_info) { - begin_query_ev_cache->print(result_file, print_event_info); + print_event(begin_query_ev_cache, result_file, print_event_info); auto head = &print_event_info->head_cache; if (head->error == -1) { return false; @@ -1520,7 +1541,7 @@ void handle_last_rows_query_event(bool print, my_off_t temp_log_pos = last_rows_query_event.event_pos; auto old_hexdump_from = print_event_info->hexdump_from; print_event_info->hexdump_from = (opt_hexdump ? temp_log_pos : 0); - last_rows_query_event.event->print(result_file, print_event_info); + print_event(last_rows_query_event.event, result_file, print_event_info); print_event_info->hexdump_from = old_hexdump_from; } last_rows_query_event.event->register_temp_buf(old_temp_buf); @@ -1637,7 +1658,7 @@ static Exit_status process_event(PRINT_EVENT_INFO *print_event_info, switch (ev_type) { case binary_log::TRANSACTION_PAYLOAD_EVENT: - ev->print(result_file, print_event_info); + print_event(ev, result_file, print_event_info); if (head->error == -1) goto err; break; case binary_log::QUERY_EVENT: { @@ -1660,7 +1681,7 @@ static Exit_status process_event(PRINT_EVENT_INFO *print_event_info, my_off_t temp_log_pos = pop_event_array.event_pos; print_event_info->hexdump_from = (opt_hexdump ? temp_log_pos : 0); if (!parent_query_skips) - temp_event->print(result_file, print_event_info); + print_event(temp_event, result_file, print_event_info); delete temp_event; } @@ -1761,7 +1782,7 @@ static Exit_status process_event(PRINT_EVENT_INFO *print_event_info, if (!in_transaction) seen_gtid = false; } - ev->print(result_file, print_event_info); + print_event(ev, result_file, print_event_info); if (head->error == -1) goto err; break; } @@ -1795,7 +1816,7 @@ static Exit_status process_event(PRINT_EVENT_INFO *print_event_info, the subsequent call load_processor.process fails, because the output of Append_block_log_event::print is only a comment. */ - ev->print(result_file, print_event_info); + print_event(ev, result_file, print_event_info); if (opt_print_gtids && encounter_gtid(cached_gtid)) goto err; @@ -1833,7 +1854,7 @@ static Exit_status process_event(PRINT_EVENT_INFO *print_event_info, print_event_info->common_header_len = dynamic_cast(ev)->common_header_len; - ev->print(result_file, print_event_info); + print_event(ev, result_file, print_event_info); if (head->error == -1) goto err; if (!force_if_open_opt && @@ -1848,7 +1869,7 @@ static Exit_status process_event(PRINT_EVENT_INFO *print_event_info, break; } case binary_log::BEGIN_LOAD_QUERY_EVENT: - ev->print(result_file, print_event_info); + print_event(ev, result_file, print_event_info); if (head->error == -1) goto err; if ((retval = load_processor.process( (Begin_load_query_log_event *)ev)) != OK_CONTINUE) @@ -1864,6 +1885,10 @@ static Exit_status process_event(PRINT_EVENT_INFO *print_event_info, if (shall_skip_database(exlq->db)) print_event_info->skipped_event_in_transaction = true; else { + if (print_event_info->base64_output_mode == BASE64_OUTPUT_FULL) { + error("Cannot handle Execute_load_query"); + goto err; + } if (fname) { convert_path_to_forward_slashes(fname); exlq->print(result_file, print_event_info, fname); @@ -2038,7 +2063,7 @@ static Exit_status process_event(PRINT_EVENT_INFO *print_event_info, goto err; } - ev->print(result_file, print_event_info); + print_event(ev, result_file, print_event_info); print_event_info->have_unflushed_events = true; @@ -2069,7 +2094,7 @@ static Exit_status process_event(PRINT_EVENT_INFO *print_event_info, print_event_info->delimiter); print_event_info->skipped_event_in_transaction = false; - ev->print(result_file, print_event_info); + print_event(ev, result_file, print_event_info); if (head->error == -1) goto err; break; } @@ -2091,12 +2116,12 @@ static Exit_status process_event(PRINT_EVENT_INFO *print_event_info, begin_query_ev_cache = nullptr; if (skip) break; } - ev->print(result_file, print_event_info); + print_event(ev, result_file, print_event_info); if (head->error == -1) goto err; break; } case binary_log::METADATA_EVENT: { - ev->print(result_file, print_event_info); + print_event(ev, result_file, print_event_info); if (head->error == -1) goto err; /* Copy and flush head cache and body cache */ @@ -2118,7 +2143,7 @@ static Exit_status process_event(PRINT_EVENT_INFO *print_event_info, "--include-gtids, respectively, instead."); [[fallthrough]]; default: - ev->print(result_file, print_event_info); + print_event(ev, result_file, print_event_info); if (head->error == -1) goto err; } /* Flush head cache to result_file for every event */ @@ -2246,6 +2271,11 @@ static struct my_option my_long_options[] = { "applying Row Events", &idempotent_mode, &idempotent_mode, nullptr, GET_BOOL, NO_ARG, 0, 0, 0, nullptr, 0, nullptr}, + {"mta-workers", 'w', + "Number of multi-threaded workers to spawn on the " + "server to apply binlogs", + &mta_workers, &mta_workers, nullptr, GET_INT, REQUIRED_ARG, 0, 0, 0, + nullptr, 0, nullptr}, {"local-load", 'l', "Prepare local temporary files for LOAD DATA INFILE in the specified " "directory.", @@ -3808,6 +3838,11 @@ static int args_post_process(void) { global_sid_lock->unlock(); } + if (mta_workers && opt_skip_gtids == 0) { + error("--mta-workers requires --skip-gtids option"); + return ERROR_STOP; + } + return OK_CONTINUE; } @@ -4182,6 +4217,17 @@ int main(int argc, char **argv) { fprintf(result_file, "/*!80019 SET @@SESSION.REQUIRE_ROW_FORMAT=1*/;\n\n"); } + auto orig_base64_output_mode = opt_base64_output_mode; + auto orig_short_form = short_form; + if (mta_workers) { + // we need to work in full base64 and short form mode for MTA + opt_base64_output_mode = BASE64_OUTPUT_FULL; + short_form = true; + fprintf(result_file, + "/*!50700 SET @@SESSION.MTA_BINLOG_STATEMENT_WORKERS=%d*/;\n", + mta_workers); + } + if (opt_start_gtid_str != nullptr || opt_find_gtid_str != nullptr) { if (opt_start_gtid_str != nullptr && opt_remote_proto == BINLOG_DUMP_GTID) { char *args = const_cast(""); @@ -4210,7 +4256,12 @@ int main(int argc, char **argv) { if (!raw_mode && opt_find_gtid_str == nullptr) { fprintf(result_file, "# End of log file\n"); - + if (mta_workers) { + opt_base64_output_mode = orig_base64_output_mode; + short_form = orig_short_form; + fprintf(result_file, + "/*!50700 SET @@SESSION.MTA_BINLOG_STATEMENT_WORKERS=0*/;\n"); + } fprintf(result_file, "/*!50003 SET COMPLETION_TYPE=@OLD_COMPLETION_TYPE*/;\n"); if (disable_log_bin) diff --git a/mysql-test/r/mysqld--help-notwin.result b/mysql-test/r/mysqld--help-notwin.result index 72e3db885c55..54e70d2255e4 100644 --- a/mysql-test/r/mysqld--help-notwin.result +++ b/mysql-test/r/mysqld--help-notwin.result @@ -1090,6 +1090,9 @@ The following options may be given as the first argument: minimum_hlc_ns is successfully updated is guranteed to be greater than this value. The maximum allowed drift (forward) is controlled by maximum_hlc_drift_ns + --mta-binlog-statement-workers[=#] + Internal variable to specify the Number of workers to + spawn to apply binlogs thru mysqlbinlog piping --mts-dependency-cond-wait-timeout[=#] Timeout for all conditional waits in dependency repl in milliseconds @@ -3442,6 +3445,7 @@ memlock FALSE min-examined-row-limit 0 min-examined-row-limit-sql-stats 0 minimum-hlc-ns 0 +mta-binlog-statement-workers 0 mts-dependency-cond-wait-timeout 5000 mts-dependency-max-keys 100000 mts-dependency-order-commits DB diff --git a/mysql-test/suite/binlog_nogtid/r/binlog_persist_only_variables.result b/mysql-test/suite/binlog_nogtid/r/binlog_persist_only_variables.result index 03692ed85ab8..98d86a560af6 100644 --- a/mysql-test/suite/binlog_nogtid/r/binlog_persist_only_variables.result +++ b/mysql-test/suite/binlog_nogtid/r/binlog_persist_only_variables.result @@ -159,6 +159,7 @@ SET PERSIST_ONLY max_binlog_cache_size = @@GLOBAL.max_binlog_cache_size; SET PERSIST_ONLY max_binlog_size = @@GLOBAL.max_binlog_size; SET PERSIST_ONLY max_binlog_stmt_cache_size = @@GLOBAL.max_binlog_stmt_cache_size; SET PERSIST_ONLY max_relay_log_size = @@GLOBAL.max_relay_log_size; +SET PERSIST_ONLY mta_binlog_statement_workers = @@GLOBAL.mta_binlog_statement_workers; SET PERSIST_ONLY mts_dependency_replication = @@GLOBAL.mts_dependency_replication; SET PERSIST_ONLY prev_gtid_and_opid = @@GLOBAL.prev_gtid_and_opid; ERROR HY000: Variable 'prev_gtid_and_opid' is a non persistent read only variable @@ -403,6 +404,7 @@ RESET PERSIST max_binlog_cache_size; RESET PERSIST max_binlog_size; RESET PERSIST max_binlog_stmt_cache_size; RESET PERSIST max_relay_log_size; +RESET PERSIST mta_binlog_statement_workers; RESET PERSIST mts_dependency_replication; RESET PERSIST prev_gtid_and_opid; ERROR HY000: Variable prev_gtid_and_opid does not exist in persisted config file diff --git a/mysql-test/suite/binlog_nogtid/r/binlog_persist_variables.result b/mysql-test/suite/binlog_nogtid/r/binlog_persist_variables.result index 62fe8edebef6..1e0bdfe99fa0 100644 --- a/mysql-test/suite/binlog_nogtid/r/binlog_persist_variables.result +++ b/mysql-test/suite/binlog_nogtid/r/binlog_persist_variables.result @@ -134,6 +134,7 @@ SET PERSIST max_binlog_cache_size = @@GLOBAL.max_binlog_cache_size; SET PERSIST max_binlog_size = @@GLOBAL.max_binlog_size; SET PERSIST max_binlog_stmt_cache_size = @@GLOBAL.max_binlog_stmt_cache_size; SET PERSIST max_relay_log_size = @@GLOBAL.max_relay_log_size; +SET PERSIST mta_binlog_statement_workers = @@GLOBAL.mta_binlog_statement_workers; SET PERSIST mts_dependency_replication = @@GLOBAL.mts_dependency_replication; SET PERSIST prev_gtid_and_opid = @@GLOBAL.prev_gtid_and_opid; ERROR HY000: Variable 'prev_gtid_and_opid' is a read only variable @@ -405,6 +406,7 @@ RESET PERSIST IF EXISTS max_binlog_cache_size; RESET PERSIST IF EXISTS max_binlog_size; RESET PERSIST IF EXISTS max_binlog_stmt_cache_size; RESET PERSIST IF EXISTS max_relay_log_size; +RESET PERSIST IF EXISTS mta_binlog_statement_workers; RESET PERSIST IF EXISTS mts_dependency_replication; RESET PERSIST IF EXISTS prev_gtid_and_opid; Warnings: diff --git a/mysql-test/suite/rpl/r/rpl_mysqlbinlog_mta_workers.result b/mysql-test/suite/rpl/r/rpl_mysqlbinlog_mta_workers.result new file mode 100644 index 000000000000..52ff120c9fd2 --- /dev/null +++ b/mysql-test/suite/rpl/r/rpl_mysqlbinlog_mta_workers.result @@ -0,0 +1,37 @@ +include/master-slave.inc +Warnings: +Note #### Sending passwords in plain text without SSL/TLS is extremely insecure. +Note #### Storing MySQL user name or password information in the connection metadata repository is not secure and is therefore not recommended. Please consider using the USER and PASSWORD connection options for START REPLICA; see the 'START REPLICA Syntax' in the MySQL Manual for more information. +[connection master] +call mtr.add_suppression("HA_ERR_FOUND_DUPP_KEY"); +create table t1 (a int primary key) engine = innodb; +flush binary logs; +purge binary logs to 'master-bin.000001'; +insert into t1 values(1); +insert into t1 values(2); +insert into t1 values(3); +insert into t1 values(4); +include/sync_slave_sql_with_master.inc +flush binary logs; +delete from t1; +"Case 1: No errors" +include/sync_slave_sql_with_master.inc +select * from t1; +a +1 +2 +3 +4 +select * from t1; +a +1 +2 +3 +4 +"Case 2: Duplicate key error on worker" +delete from t1; +insert into t1 values(3); +include/sync_slave_sql_with_master.inc +drop table t1; +include/sync_slave_sql_with_master.inc +include/rpl_end.inc diff --git a/mysql-test/suite/rpl/t/rpl_mysqlbinlog_mta_workers-master.opt b/mysql-test/suite/rpl/t/rpl_mysqlbinlog_mta_workers-master.opt new file mode 100644 index 000000000000..fa98ba18994e --- /dev/null +++ b/mysql-test/suite/rpl/t/rpl_mysqlbinlog_mta_workers-master.opt @@ -0,0 +1 @@ +--enable-binlog-hlc diff --git a/mysql-test/suite/rpl/t/rpl_mysqlbinlog_mta_workers.test b/mysql-test/suite/rpl/t/rpl_mysqlbinlog_mta_workers.test new file mode 100644 index 000000000000..28709b0ea881 --- /dev/null +++ b/mysql-test/suite/rpl/t/rpl_mysqlbinlog_mta_workers.test @@ -0,0 +1,49 @@ +source include/master-slave.inc; +source include/have_binlog_format_row.inc; + +call mtr.add_suppression("HA_ERR_FOUND_DUPP_KEY"); + +connection master; +let $MYSQLD_DATADIR = `select @@global.datadir`; + +let $binlog_file = query_get_value(SHOW MASTER STATUS, File, 1); +create table t1 (a int primary key) engine = innodb; + +flush binary logs; +eval purge binary logs to '$binlog_file'; + +insert into t1 values(1); +insert into t1 values(2); +insert into t1 values(3); +insert into t1 values(4); +source include/sync_slave_sql_with_master.inc; + +connection master; +let $binlog_file = query_get_value(SHOW MASTER STATUS, File, 1); + +flush binary logs; +delete from t1; + +echo "Case 1: No errors"; +exec $MYSQL_BINLOG --skip-gtids --mta-workers=2 $MYSQLD_DATADIR/$binlog_file | $MYSQL --host=127.0.0.1 -P $MASTER_MYPORT -uroot; +source include/sync_slave_sql_with_master.inc; + +connection master; +select * from t1; + +connection slave; +select * from t1; + +echo "Case 2: Duplicate key error on worker"; +connection master; +delete from t1; +insert into t1 values(3); # this will cause dup key error + +exec $MYSQL_BINLOG --skip-gtids --mta-workers=2 $MYSQLD_DATADIR/$binlog_file | $MYSQL --host=127.0.0.1 -P $MASTER_MYPORT -uroot || true; +source include/sync_slave_sql_with_master.inc; + +connection master; +drop table t1; +source include/sync_slave_sql_with_master.inc; + +source include/rpl_end.inc; diff --git a/mysql-test/suite/sys_vars/r/mta_binlog_statement_workers_basic.result b/mysql-test/suite/sys_vars/r/mta_binlog_statement_workers_basic.result new file mode 100644 index 000000000000..63bd170c8eed --- /dev/null +++ b/mysql-test/suite/sys_vars/r/mta_binlog_statement_workers_basic.result @@ -0,0 +1,10 @@ +SET @start_value = @@global.mta_binlog_statement_workers; +SELECT @start_value; +@start_value +0 +SET @@GLOBAL.mta_binlog_statement_workers = 4; +SET @@SESSION.mta_binlog_statement_workers = 4; +SET @@SESSION.mta_binlog_statement_workers = 0; +Warnings: +Warning 1231 mta_binlog_statement_workers can only be set from mysqlbinlog +SET @@GLOBAL.mta_binlog_statement_workers = @start_value; diff --git a/mysql-test/suite/sys_vars/t/mta_binlog_statement_workers_basic.test b/mysql-test/suite/sys_vars/t/mta_binlog_statement_workers_basic.test new file mode 100644 index 000000000000..042bfd218378 --- /dev/null +++ b/mysql-test/suite/sys_vars/t/mta_binlog_statement_workers_basic.test @@ -0,0 +1,12 @@ +source include/load_sysvars.inc; + +SET @start_value = @@global.mta_binlog_statement_workers; +SELECT @start_value; + +SET @@GLOBAL.mta_binlog_statement_workers = 4; + +SET @@SESSION.mta_binlog_statement_workers = 4; + +SET @@SESSION.mta_binlog_statement_workers = 0; + +SET @@GLOBAL.mta_binlog_statement_workers = @start_value; diff --git a/sql/log_event.cc b/sql/log_event.cc index 37d075ac25ce..3ae8f89a8524 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -2563,7 +2563,10 @@ void Log_event::print_base64(IO_CACHE *file, PRINT_EVENT_INFO *print_event_info, } if (print_event_info->base64_output_mode != BASE64_OUTPUT_DECODE_ROWS) { - if (my_b_tell(file) == 0) { + // In BASE64_OUTPUT_FULL mode mysqlbinlog.cc will determine when to print + // "BINLOG". See @print_event() + if (print_event_info->base64_output_mode != BASE64_OUTPUT_FULL && + my_b_tell(file) == 0) { my_b_printf(file, "\nBINLOG '\n"); print_event_info->inside_binlog = true; } diff --git a/sql/log_event.h b/sql/log_event.h index 349003402f91..00ab83bf7e02 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -456,6 +456,7 @@ enum enum_base64_output_mode { BASE64_OUTPUT_AUTO = 1, BASE64_OUTPUT_UNSPEC = 2, BASE64_OUTPUT_DECODE_ROWS = 3, + BASE64_OUTPUT_FULL = 4, /* insert new output modes here */ BASE64_OUTPUT_MODE_COUNT }; @@ -573,6 +574,9 @@ struct PRINT_EVENT_INFO { The version of the last server that sent the transaction */ uint32_t immediate_server_version; + + // Are we inside a binlog group/transaction? Only used in MTA mode + bool inside_group = false; }; #endif diff --git a/sql/mysqld.h b/sql/mysqld.h index 16db180cd54f..e6fe32a7625b 100644 --- a/sql/mysqld.h +++ b/sql/mysqld.h @@ -394,6 +394,7 @@ extern ulonglong opt_mts_dependency_max_keys; extern ulonglong opt_mts_pending_jobs_size_max; extern ulong opt_mts_dependency_order_commits; extern ulonglong opt_mts_dependency_cond_wait_timeout; +extern ulong mts_parallel_option; extern ulong rpl_stop_replica_timeout; extern ulong opt_slave_commit_order_wait_timeout; extern bool rpl_skip_tx_api; diff --git a/sql/rpl_mta_submode.cc b/sql/rpl_mta_submode.cc index d93bca932383..e2eae1ace76d 100644 --- a/sql/rpl_mta_submode.cc +++ b/sql/rpl_mta_submode.cc @@ -1424,7 +1424,9 @@ void Mts_submode_dependency::handle_terminal_event( assert(opt_debug_sync_timeout > 0); assert(!debug_sync_set_action(rli->info_thd, STRING_WITH_LEN(act))); };); - thd->mem_root->ClearForReuse(); + if (!thd->variables.mta_binlog_statement_workers) { + thd->mem_root->ClearForReuse(); + } } } diff --git a/sql/rpl_replica.cc b/sql/rpl_replica.cc index 33fe7aade514..e3c8b45ad768 100644 --- a/sql/rpl_replica.cc +++ b/sql/rpl_replica.cc @@ -7315,8 +7315,10 @@ static int slave_start_single_worker(Relay_log_info *rli, ulong i) { mysql_mutex_assert_owner(&rli->run_lock); - if (!(w = Rpl_info_factory::create_worker(opt_rli_repository_id, i, rli, - false))) { + if (!(w = Rpl_info_factory::create_worker( + rli->is_fake() ? static_cast(INFO_REPOSITORY_DUMMY) + : opt_rli_repository_id, + i, rli, false))) { LogErr(ERROR_LEVEL, ER_RPL_REPLICA_WORKER_THREAD_CREATION_FAILED, rli->get_for_channel_str()); error = 1; @@ -7377,7 +7379,7 @@ static int slave_start_single_worker(Relay_log_info *rli, ulong i) { @return 0 success non-zero as failure */ -static int slave_start_workers(Relay_log_info *rli, ulong n, bool *mts_inited) { +int slave_start_workers(Relay_log_info *rli, ulong n, bool *mts_inited) { int error = 0; /** gtid_monitoring_info must be cleared when MTS is enabled or @@ -7479,7 +7481,7 @@ static int slave_start_workers(Relay_log_info *rli, ulong n, bool *mts_inited) { worker's running_status. Coordinator finalizes with its MTS running status to reset few objects. */ -static void slave_stop_workers(Relay_log_info *rli, bool *mts_inited) { +void slave_stop_workers(Relay_log_info *rli, bool *mts_inited) { THD *thd = rli->info_thd; if (!*mts_inited) diff --git a/sql/rpl_replica.h b/sql/rpl_replica.h index 6b053ab32148..d7a58ebaf234 100644 --- a/sql/rpl_replica.h +++ b/sql/rpl_replica.h @@ -591,6 +591,9 @@ bool start_slave_thread(PSI_thread_key thread_key, my_start_routine h_func, std::atomic *slave_running, std::atomic *slave_run_id, Master_info *mi); +int slave_start_workers(Relay_log_info *rli, ulong n, bool *mts_inited); +void slave_stop_workers(Relay_log_info *rli, bool *mts_inited); + std::string get_active_master_info(); bool show_slave_status(THD *thd, Master_info *mi); bool show_slave_status(THD *thd); diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index f640bd71e2aa..fd4d12a81329 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -230,13 +230,14 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery, inited_hash_workers = false; commit_timestamps_status = COMMIT_TS_UNKNOWN; + mysql_mutex_init(key_mutex_slave_parallel_pend_jobs, &pending_jobs_lock, + MY_MUTEX_INIT_FAST); + mysql_cond_init(key_cond_slave_parallel_pend_jobs, &pending_jobs_cond); + if (!rli_fake) { mysql_mutex_init(key_relay_log_info_log_space_lock, &log_space_lock, MY_MUTEX_INIT_FAST); mysql_cond_init(key_relay_log_info_log_space_cond, &log_space_cond); - mysql_mutex_init(key_mutex_slave_parallel_pend_jobs, &pending_jobs_lock, - MY_MUTEX_INIT_FAST); - mysql_cond_init(key_cond_slave_parallel_pend_jobs, &pending_jobs_cond); mysql_mutex_init(key_mutex_slave_parallel_worker_count, &exit_count_lock, MY_MUTEX_INIT_FAST); mysql_mutex_init(key_mta_temp_table_LOCK, &mts_temp_table_LOCK, @@ -298,9 +299,17 @@ void Relay_log_info::deinit_workers() { workers.clear(); } Relay_log_info::~Relay_log_info() { DBUG_TRACE; + delete current_mts_submode; + if (workers_copy_pfs.size()) { + for (int i = static_cast(workers_copy_pfs.size()) - 1; i >= 0; i--) + delete workers_copy_pfs[i]; + workers_copy_pfs.clear(); + } + mysql_mutex_destroy(&pending_jobs_lock); + mysql_cond_destroy(&pending_jobs_cond); + if (!rli_fake) { if (recovery_groups_inited) bitmap_free(&recovery_groups); - delete current_mts_submode; if (rpl_filter != nullptr) { /* Remove the channel's replication filter from rpl_channel_filters. */ @@ -308,16 +317,8 @@ Relay_log_info::~Relay_log_info() { rpl_filter = nullptr; } - if (workers_copy_pfs.size()) { - for (int i = static_cast(workers_copy_pfs.size()) - 1; i >= 0; i--) - delete workers_copy_pfs[i]; - workers_copy_pfs.clear(); - } - mysql_mutex_destroy(&log_space_lock); mysql_cond_destroy(&log_space_cond); - mysql_mutex_destroy(&pending_jobs_lock); - mysql_cond_destroy(&pending_jobs_cond); mysql_mutex_destroy(&exit_count_lock); mysql_mutex_destroy(&mts_temp_table_LOCK); mysql_mutex_destroy(&mts_gaq_LOCK); diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h index 931fb76fb4eb..366b38301e11 100644 --- a/sql/rpl_rli.h +++ b/sql/rpl_rli.h @@ -672,6 +672,8 @@ class Relay_log_info : public Rpl_info { */ bool is_applier_source_position_info_invalid() const; + bool is_fake() const { return rli_fake; } + /* Let's call a group (of events) : - a transaction diff --git a/sql/rpl_rli_pdb.cc b/sql/rpl_rli_pdb.cc index ef4b77fd8a9f..d1e5ddd35682 100644 --- a/sql/rpl_rli_pdb.cc +++ b/sql/rpl_rli_pdb.cc @@ -270,6 +270,9 @@ Slave_worker::Slave_worker(Relay_log_info *rli, worker_checkpoint_seqno(0), running_status(NOT_RUNNING), exit_incremented(false) { + if (c_rli && c_rli->is_fake()) { + sql_print_information("Creating a fake worker thread"); + } /* In the future, it would be great if we use only one identifier. So when factoring out this code, please, consider this. @@ -285,6 +288,9 @@ Slave_worker::Slave_worker(Relay_log_info *rli, } Slave_worker::~Slave_worker() { + if (c_rli && c_rli->is_fake()) { + sql_print_information("Destroying a fake worker thread"); + } end_info(); if (jobs.inited_queue) { assert(jobs.m_Q.size() == jobs.capacity); @@ -493,7 +499,7 @@ int Slave_worker::flush_info(const bool force) { if (!inited) return 0; - if (c_rli->mi->is_gtid_only_mode()) return 0; + if (c_rli->mi && c_rli->mi->is_gtid_only_mode()) return 0; /* We update the sync_period at this point because only here we diff --git a/sql/sql_binlog.cc b/sql/sql_binlog.cc index 93d469e32cf9..e90844a0261f 100644 --- a/sql/sql_binlog.cc +++ b/sql/sql_binlog.cc @@ -28,6 +28,7 @@ #include #include "base64.h" // base64_needed_decoded_length +#include "include/scope_guard.h" #include "lex_string.h" #include "libbinlogevents/include/binlog_event.h" #include "m_string.h" @@ -44,18 +45,27 @@ #include "sql/psi_memory_key.h" #include "sql/rpl_info_factory.h" // Rpl_info_factory #include "sql/rpl_info_handler.h" -#include "sql/rpl_rli.h" // Relay_log_info +#include "sql/rpl_replica.h" // slave_start_workers +#include "sql/rpl_rli.h" // Relay_log_info #include "sql/sql_class.h" #include "sql/sql_lex.h" #include "sql/system_variables.h" +enum check_event_status { + SUCCESS, + ERROR, + SKIP, +}; + /** Check if the event type is allowed in a BINLOG statement. - @retval 0 if the event type is ok. - @retval 1 if the event type is not ok. + @retval SUCCESS if the event type is ok. + @retval ERROR if the event type is not ok. + @retval SKIP if the event type should be skipped */ -static int check_event_type(int type, Relay_log_info *rli) { +static check_event_status check_event_type(int type, THD *thd) { + Relay_log_info *rli = thd->rli_fake; Format_description_log_event *fd_event = rli->get_rli_description_event(); switch (type) { @@ -68,12 +78,12 @@ static int check_event_type(int type, Relay_log_info *rli) { fd_event = new Format_description_log_event(); if (rli->set_rli_description_event(fd_event)) { delete fd_event; - return 1; + return ERROR; } } /* It is always allowed to execute FD events. */ - return 0; + return SUCCESS; case binary_log::ROWS_QUERY_LOG_EVENT: case binary_log::TABLE_MAP_EVENT: @@ -90,14 +100,22 @@ static int check_event_type(int type, Relay_log_info *rli) { already been seen. */ if (fd_event) - return 0; + return SUCCESS; else { my_error(ER_NO_FORMAT_DESCRIPTION_EVENT_BEFORE_BINLOG_STATEMENT, MYF(0), Log_event::get_type_str((Log_event_type)type)); - return 1; + return ERROR; } break; + case binary_log::ROTATE_EVENT: + case binary_log::STOP_EVENT: + // We need to skip applying these events. Read comment below + if (thd->variables.mta_binlog_statement_workers) { + return SKIP; + } + [[fallthrough]]; + default: /* It is not meaningful to execute other events than row-events and @@ -105,10 +123,16 @@ static int check_event_type(int type, Relay_log_info *rli) { and Rotate_log_event since they call flush_relay_log_info, which is not allowed to call by other threads than the slave SQL thread when the slave SQL thread is running. + + When mta_binlog_statement_workers is enabled all events are in base64 so + we skip selected events above and return success on all others */ + if (thd->variables.mta_binlog_statement_workers) { + return SUCCESS; + } my_error(ER_ONLY_FD_AND_RBR_EVENTS_ALLOWED_IN_BINLOG_STATEMENT, MYF(0), Log_event::get_type_str((Log_event_type)type)); - return 1; + return ERROR; } } @@ -161,7 +185,10 @@ void mysql_client_binlog_statement(THD *thd) { Allocation */ int err = 0; + const char *error = nullptr; Relay_log_info *rli = thd->rli_fake; + char *buf = nullptr; + Log_event *ev = nullptr; if (!rli) { /* We create a Relay_log_info object with a INFO_REPOSITORY_DUMMY because @@ -183,13 +210,55 @@ void mysql_client_binlog_statement(THD *thd) { } else rli->set_rbr_column_type_mismatch_whitelist( std::unordered_set()); + + if (thd->variables.mta_binlog_statement_workers) { + bool mts_inited = false; + + mysql_mutex_lock(&rli->run_lock); + auto save_mts_parallel_option = mts_parallel_option; + auto save_opt_mts_dependency_replication = opt_mts_dependency_replication; + auto save_opt_mts_replica_parallel_workers = + opt_mts_replica_parallel_workers; + + auto grd = create_scope_guard([&]() { + mts_parallel_option = save_mts_parallel_option; + opt_mts_dependency_replication = save_opt_mts_dependency_replication; + opt_mts_replica_parallel_workers = + save_opt_mts_replica_parallel_workers; + + mysql_mutex_unlock(&rli->run_lock); + }); + + mts_parallel_option = MTS_PARALLEL_TYPE_DEPENDENCY; + opt_mts_dependency_replication = DEP_RPL_STATEMENT; + opt_mts_replica_parallel_workers = rli->opt_replica_parallel_workers = + thd->variables.mta_binlog_statement_workers; + + rli->channel_mts_submode = (enum_mts_parallel_type)mts_parallel_option; + rli->current_mts_submode = new Mts_submode_dependency(); + rli->mts_dependency_size = opt_mts_dependency_size; + + if ((err = slave_start_workers(rli, opt_mts_replica_parallel_workers, + &mts_inited)) || + !mts_inited) { + my_error(ER_REPLICA_THREAD, MYF(0)); + goto end; + } + rli->slave_running = 1; + rli->reported_unsafe_warning = false; + rli->sql_thread_kill_accepted = false; + } } - const char *error = nullptr; - char *buf = (char *)my_malloc(key_memory_binlog_statement_buffer, decoded_len, - MYF(MY_WME)); - Log_event *ev = nullptr; + if (thd->variables.mta_binlog_statement_workers && + sql_slave_killed(thd, rli)) { + err = 1; + my_error(ER_REPLICA_THREAD, MYF(0)); + goto end; + } + buf = (char *)my_malloc(key_memory_binlog_statement_buffer, decoded_len, + MYF(MY_WME)); /* Out of memory check */ @@ -255,7 +324,18 @@ void mysql_client_binlog_statement(THD *thd) { DBUG_PRINT("info", ("event_len=%lu, bytes_decoded=%" PRId64, event_len, bytes_decoded)); - if (check_event_type(bufptr[EVENT_TYPE_OFFSET], rli)) goto end; + switch (check_event_type(bufptr[EVENT_TYPE_OFFSET], thd)) { + case check_event_status::ERROR: + goto end; + case check_event_status::SKIP: + bytes_decoded -= event_len; + bufptr += event_len; + continue; + case check_event_status::SUCCESS: + break; + default: + break; + } Binlog_read_error binlog_read_error = binlog_event_deserialize( reinterpret_cast(bufptr), event_len, @@ -276,6 +356,15 @@ void mysql_client_binlog_statement(THD *thd) { DBUG_PRINT("info", ("ev->common_header()=%d", ev->get_type_code())); ev->thd = thd; + + if (thd->variables.mta_binlog_statement_workers) { + const bool force = rli->rli_checkpoint_seqno >= rli->checkpoint_group; + if ((force || rli->is_time_for_mta_checkpoint()) && + mta_checkpoint_routine(rli, force)) { + err = 1; + } + } + /* We go directly to the application phase, since we don't need to check if the event shall be skipped or not. @@ -284,7 +373,20 @@ void mysql_client_binlog_statement(THD *thd) { not used at all: the rli_fake instance is used only for error reporting. */ - err = ev->apply_event(rli); + err = err || ev->apply_event(rli); + + if (!err && thd->variables.mta_binlog_statement_workers && + (ev->worker != rli) && is_mts_parallel_type_dependency(rli)) { + // we'll pass owership to workers, so we reset the pointer + auto grd = create_scope_guard([&]() { ev = nullptr; }); + assert(ev->worker == nullptr); + if (ev->m_mts_dep_allowed && + !static_cast(rli->current_mts_submode) + ->schedule_dep(rli, ev)) { + err = 1; + } + } + /* Format_description_log_event should not be deleted because it will be used to read info about the relay log's format; it @@ -293,7 +395,7 @@ void mysql_client_binlog_statement(THD *thd) { ROWS_QUERY_LOG_EVENT if present in rli is deleted at the end of the event but ones with trx meta data are deleted here. */ - if (ev->get_type_code() != binary_log::FORMAT_DESCRIPTION_EVENT && + if (ev && ev->get_type_code() != binary_log::FORMAT_DESCRIPTION_EVENT && ev->get_type_code() != binary_log::ROWS_QUERY_LOG_EVENT) { delete ev; ev = nullptr; @@ -314,9 +416,17 @@ void mysql_client_binlog_statement(THD *thd) { end: if (rli) { - if ((error || err) && rli->rows_query_ev) { - delete rli->rows_query_ev; - rli->rows_query_ev = nullptr; + if (error || err) { + if (rli->rows_query_ev) { + delete rli->rows_query_ev; + rli->rows_query_ev = nullptr; + } + if (thd->variables.mta_binlog_statement_workers) { + bool mts_inited = true; + slave_stop_workers(rli, &mts_inited); + delete rli->current_mts_submode; + rli->current_mts_submode = nullptr; + } } rli->slave_close_thread_tables(thd); } diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index 33ace3c64b10..235ff9925f1a 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -10235,3 +10235,45 @@ static Sys_var_bool Sys_use_mdl_mutex( "Use thread pool friendly MDL lock instead of regular mutex where " "instrumented. If disabled, the regular mutex is used.", READ_ONLY GLOBAL_VAR(use_mdl_mutex), CMD_LINE(OPT_ARG), DEFAULT(true)); + +static bool check_mta_binlog_statement_workers(sys_var * /*self*/, THD *thd, + set_var *var) { + uint prev_val = thd->variables.mta_binlog_statement_workers; + uint val = (uint)var->save_result.ulonglong_value; + + // case: we need to cleanup workers + if (prev_val > 0 && val == 0) { + assert(thd->variables.mta_binlog_statement_workers); + if (!thd->rli_fake) { + push_warning( + thd, Sql_condition::SL_WARNING, ER_WRONG_VALUE_FOR_VAR, + "mta_binlog_statement_workers can only be set from mysqlbinlog"); + return false; + } + bool mts_inited = true; + auto save_mts_parallel_option = mts_parallel_option; + auto save_opt_mts_dependency_replication = opt_mts_dependency_replication; + mts_parallel_option = MTS_PARALLEL_TYPE_DEPENDENCY; + opt_mts_dependency_replication = DEP_RPL_STATEMENT; + auto submode = static_cast( + thd->rli_fake->current_mts_submode); + if (!submode->wait_for_dep_workers_to_finish(thd->rli_fake, false)) { + push_warning(thd, Sql_condition::SL_WARNING, ER_WRONG_VALUE_FOR_VAR, + "Could not wait for all transactions to complete"); + } + slave_stop_workers(thd->rli_fake, &mts_inited); + mts_parallel_option = save_mts_parallel_option; + opt_mts_dependency_replication = save_opt_mts_dependency_replication; + } + + return false; +} + +static Sys_var_uint Sys_mta_binlog_statement_workers( + "mta_binlog_statement_workers", + "Internal variable to specify the Number of workers to spawn to apply " + "binlogs thru mysqlbinlog piping", + SESSION_VAR(mta_binlog_statement_workers), CMD_LINE(OPT_ARG), + VALID_RANGE(0, MTS_MAX_WORKERS), DEFAULT(0), BLOCK_SIZE(1), NO_MUTEX_GUARD, + NOT_IN_BINLOG, ON_CHECK(check_mta_binlog_statement_workers), + ON_UPDATE(nullptr)); diff --git a/sql/system_variables.h b/sql/system_variables.h index 436a28231890..3c21bb2b328c 100644 --- a/sql/system_variables.h +++ b/sql/system_variables.h @@ -639,6 +639,8 @@ struct System_variables { @sa Sys_explain_format */ Explain_format_type explain_format; + + uint mta_binlog_statement_workers; }; /**