From 4656646739c5aa680ad1df7bae857b81f8248a8c Mon Sep 17 00:00:00 2001 From: harshpatel4_crest Date: Tue, 21 Dec 2021 14:42:01 +0530 Subject: [PATCH 1/9] added request timeout --- tap_mysql/__init__.py | 58 ++++++++-- tap_mysql/connection.py | 17 ++- tap_mysql/sync_strategies/binlog.py | 12 +- tap_mysql/sync_strategies/common.py | 5 +- tap_mysql/sync_strategies/full_table.py | 21 ++-- tap_mysql/sync_strategies/incremental.py | 3 +- tests/nosetests/test_timeout.py | 133 +++++++++++++++++++++++ 7 files changed, 217 insertions(+), 32 deletions(-) create mode 100644 tests/nosetests/test_timeout.py diff --git a/tap_mysql/__init__.py b/tap_mysql/__init__.py index 93b5f53..2d103bf 100644 --- a/tap_mysql/__init__.py +++ b/tap_mysql/__init__.py @@ -3,12 +3,13 @@ import datetime import collections +import functools import itertools from itertools import dropwhile import copy import os +import backoff import pendulum - import pymysql import singer @@ -74,6 +75,41 @@ DATETIME_TYPES = set(['datetime', 'timestamp', 'date', 'time']) +# boolean function to check if the error is 'timeout' error or not +def is_timeout_error(): + """ + This function checks whether the URLError contains 'timed out' substring and return boolean + values accordingly, to decide whether to backoff or not. + """ + def gen_fn(exc): + if str(exc).__contains__('timed out'): + # retry if the error string contains 'timed out' + return False + return True + + return gen_fn + +def reconnect(details): + # get connection and reconnect + connection = details.get("args")[3] + connection.ping(reconnect=True) + +def backoff_timeout_error(fnc): + @backoff.on_exception(backoff.expo, + (pymysql.err.OperationalError), + giveup=is_timeout_error(), + on_backoff=reconnect, + max_tries=5, + factor=2) + @functools.wraps(fnc) + def wrapper(*args, **kwargs): + return fnc(*args, **kwargs) + return wrapper + +@backoff_timeout_error +def execute_query(cursor, query, params, conn): + cursor.execute(query, params) + def schema_for_column(c): '''Returns the Schema object for the given Column.''' data_type = c.data_type.lower() @@ -164,14 +200,14 @@ def discover_catalog(mysql_conn, config): with connect_with_backoff(mysql_conn) as open_conn: with open_conn.cursor() as cur: - cur.execute(""" + execute_query(cur, """ SELECT table_schema, table_name, table_type, table_rows FROM information_schema.tables {} - """.format(table_schema_clause)) + """.format(table_schema_clause), None, mysql_conn) table_info = {} @@ -184,7 +220,7 @@ def discover_catalog(mysql_conn, config): 'is_view': table_type == 'VIEW' } - cur.execute(""" + execute_query(cur, """ SELECT table_schema, table_name, column_name, @@ -197,7 +233,7 @@ def discover_catalog(mysql_conn, config): FROM information_schema.columns {} ORDER BY table_schema, table_name - """.format(table_schema_clause)) + """.format(table_schema_clause), None, mysql_conn) columns = [] rec = cur.fetchone() @@ -319,12 +355,12 @@ def log_engine(mysql_conn, catalog_entry): else: with connect_with_backoff(mysql_conn) as open_conn: with open_conn.cursor() as cur: - cur.execute(""" + execute_query(cur, """ SELECT engine FROM information_schema.tables WHERE table_schema = %s AND table_name = %s - """, (database_name, catalog_entry.table)) + """, (database_name, catalog_entry.table), mysql_conn) row = cur.fetchone() @@ -683,12 +719,12 @@ def log_server_params(mysql_conn): with connect_with_backoff(mysql_conn) as open_conn: try: with open_conn.cursor() as cur: - cur.execute(''' + execute_query(cur, ''' SELECT VERSION() as version, @@session.wait_timeout as wait_timeout, @@session.innodb_lock_wait_timeout as innodb_lock_wait_timeout, @@session.max_allowed_packet as max_allowed_packet, - @@session.interactive_timeout as interactive_timeout''') + @@session.interactive_timeout as interactive_timeout''', None, mysql_conn) row = cur.fetchone() LOGGER.info('Server Parameters: ' + 'version: %s, ' + @@ -698,8 +734,8 @@ def log_server_params(mysql_conn): 'interactive_timeout: %s', *row) with open_conn.cursor() as cur: - cur.execute(''' - show session status where Variable_name IN ('Ssl_version', 'Ssl_cipher')''') + execute_query(cur, ''' + show session status where Variable_name IN ('Ssl_version', 'Ssl_cipher')''', None, mysql_conn) rows = cur.fetchall() mapped_row = dict(rows) LOGGER.info('Server SSL Parameters (blank means SSL is not active): ' + diff --git a/tap_mysql/connection.py b/tap_mysql/connection.py index 2c890b1..ca4cb61 100644 --- a/tap_mysql/connection.py +++ b/tap_mysql/connection.py @@ -16,6 +16,19 @@ # We need to hold onto this for self-signed SSL match_hostname = ssl.match_hostname +def get_request_timeout(): + args = singer.utils.parse_args([]) + # get the value of request timeout from config + config_request_timeout = args.config.get("request_timeout") + + # only return the timeout value if it is passed in the config and the value is not 0, "0" or "" + if config_request_timeout and float(config_request_timeout): + # return the timeout from config + return float(config_request_timeout) + + # return default timeout + return READ_TIMEOUT_SECONDS + @backoff.on_exception(backoff.expo, (pymysql.err.OperationalError), max_tries=5, @@ -36,7 +49,7 @@ def connect_with_backoff(connection): warnings.append('Could not set session.wait_timeout. Error: ({}) {}'.format(*e.args)) try: - cur.execute("SET @@session.net_read_timeout={}".format(READ_TIMEOUT_SECONDS)) + cur.execute("SET @@session.net_read_timeout={}".format(get_request_timeout())) except pymysql.err.InternalError as e: warnings.append('Could not set session.net_read_timeout. Error: ({}) {}'.format(*e.args)) @@ -91,7 +104,7 @@ def __init__(self, config): "port": int(config["port"]), "cursorclass": config.get("cursorclass") or pymysql.cursors.SSCursor, "connect_timeout": CONNECT_TIMEOUT_SECONDS, - "read_timeout": READ_TIMEOUT_SECONDS, + "read_timeout": get_request_timeout(), "charset": "utf8", } diff --git a/tap_mysql/sync_strategies/binlog.py b/tap_mysql/sync_strategies/binlog.py index 06c23c4..4111cdc 100644 --- a/tap_mysql/sync_strategies/binlog.py +++ b/tap_mysql/sync_strategies/binlog.py @@ -62,7 +62,7 @@ def add_automatic_properties(catalog_entry, columns): def verify_binlog_config(mysql_conn): with connect_with_backoff(mysql_conn) as open_conn: with open_conn.cursor() as cur: - cur.execute("SELECT @@binlog_format") + common.execute_query(cur, "SELECT @@binlog_format", None, mysql_conn) binlog_format = cur.fetchone()[0] if binlog_format != 'ROW': @@ -70,7 +70,7 @@ def verify_binlog_config(mysql_conn): .format(binlog_format)) try: - cur.execute("SELECT @@binlog_row_image") + common.execute_query(cur, "SELECT @@binlog_row_image", None, mysql_conn) binlog_row_image = cur.fetchone()[0] except pymysql.err.InternalError as ex: if ex.args[0] == 1193: @@ -86,7 +86,7 @@ def verify_binlog_config(mysql_conn): def verify_log_file_exists(mysql_conn, log_file, log_pos): with connect_with_backoff(mysql_conn) as open_conn: with open_conn.cursor() as cur: - cur.execute("SHOW BINARY LOGS") + common.execute_query(cur, "SHOW BINARY LOGS", None, mysql_conn) result = cur.fetchall() existing_log_file = list(filter(lambda log: log[0] == log_file, result)) @@ -105,7 +105,7 @@ def verify_log_file_exists(mysql_conn, log_file, log_pos): def fetch_current_log_file_and_pos(mysql_conn): with connect_with_backoff(mysql_conn) as open_conn: with open_conn.cursor() as cur: - cur.execute("SHOW MASTER STATUS") + common.execute_query(cur, "SHOW MASTER STATUS", None, mysql_conn) result = cur.fetchone() @@ -120,7 +120,7 @@ def fetch_current_log_file_and_pos(mysql_conn): def fetch_server_id(mysql_conn): with connect_with_backoff(mysql_conn) as open_conn: with open_conn.cursor() as cur: - cur.execute("SELECT @@server_id") + common.execute_query(cur, "SELECT @@server_id", None, mysql_conn) server_id = cur.fetchone()[0] return server_id @@ -199,7 +199,7 @@ def calculate_bookmark(mysql_conn, binlog_streams_map, state): with connect_with_backoff(mysql_conn) as open_conn: with open_conn.cursor() as cur: - cur.execute("SHOW BINARY LOGS") + common.execute_query(cur, "SHOW BINARY LOGS", None, mysql_conn) binary_logs = cur.fetchall() diff --git a/tap_mysql/sync_strategies/common.py b/tap_mysql/sync_strategies/common.py index 3f81a7e..2fcc60f 100644 --- a/tap_mysql/sync_strategies/common.py +++ b/tap_mysql/sync_strategies/common.py @@ -3,6 +3,7 @@ import copy import datetime +from tap_mysql.__init__ import execute_query as execute_query import singer import time import tzlocal @@ -187,7 +188,7 @@ def whitelist_bookmark_keys(bookmark_key_set, tap_stream_id, state): singer.clear_bookmark(state, tap_stream_id, bk) -def sync_query(cursor, catalog_entry, state, select_sql, columns, stream_version, params): +def sync_query(cursor, catalog_entry, state, select_sql, columns, stream_version, params, mysql_conn): replication_key = singer.get_bookmark(state, catalog_entry.tap_stream_id, 'replication_key') @@ -197,7 +198,7 @@ def sync_query(cursor, catalog_entry, state, select_sql, columns, stream_version time_extracted = utils.now() LOGGER.info('Running %s', query_string) - cursor.execute(select_sql, params) + execute_query(cursor, select_sql, params, mysql_conn) row = cursor.fetchone() rows_saved = 0 diff --git a/tap_mysql/sync_strategies/full_table.py b/tap_mysql/sync_strategies/full_table.py index c5f2bff..5d31600 100644 --- a/tap_mysql/sync_strategies/full_table.py +++ b/tap_mysql/sync_strategies/full_table.py @@ -64,9 +64,9 @@ def sync_is_resumable(mysql_conn, catalog_entry): with connect_with_backoff(mysql_conn) as open_conn: with open_conn.cursor() as cur: for pk in key_properties: - cur.execute(sql.format(database_name, - catalog_entry.table, - pk)) + common.execute_query(cur, sql.format(database_name, + catalog_entry.table, + pk), None, mysql_conn) result = cur.fetchone() @@ -82,7 +82,7 @@ def sync_is_resumable(mysql_conn, catalog_entry): return True -def get_max_pk_values(cursor, catalog_entry): +def get_max_pk_values(cursor, catalog_entry, mysql_conn): database_name = common.get_database_name(catalog_entry) escaped_db = common.escape(database_name) escaped_table = common.escape(catalog_entry.table) @@ -96,9 +96,9 @@ def get_max_pk_values(cursor, catalog_entry): select_column_clause = ", ".join(["max(" + pk + ")" for pk in escaped_columns]) - cursor.execute(sql.format(select_column_clause, + common.execute_query(cursor, sql.format(select_column_clause, escaped_db, - escaped_table)) + escaped_table), None, mysql_conn) result = cursor.fetchone() processed_results = [] for bm in result: @@ -200,10 +200,10 @@ def generate_pk_clause(catalog_entry, state): return sql -def update_incremental_full_table_state(catalog_entry, state, cursor): +def update_incremental_full_table_state(catalog_entry, state, cursor, mysql_conn): max_pk_values = singer.get_bookmark(state, catalog_entry.tap_stream_id, - 'max_pk_values') or get_max_pk_values(cursor, catalog_entry) + 'max_pk_values') or get_max_pk_values(cursor, catalog_entry, mysql_conn) if not max_pk_values: @@ -251,7 +251,7 @@ def sync_table(mysql_conn, catalog_entry, state, columns, stream_version): if perform_resumable_sync: LOGGER.info("Full table sync is resumable based on primary key definition, will replicate incrementally") - state = update_incremental_full_table_state(catalog_entry, state, cur) + state = update_incremental_full_table_state(catalog_entry, state, cur, mysql_conn) pk_clause = generate_pk_clause(catalog_entry, state) select_sql += pk_clause @@ -263,7 +263,8 @@ def sync_table(mysql_conn, catalog_entry, state, columns, stream_version): select_sql, columns, stream_version, - params) + params, + mysql_conn) # clear max pk value and last pk fetched upon successful sync singer.clear_bookmark(state, catalog_entry.tap_stream_id, 'max_pk_values') diff --git a/tap_mysql/sync_strategies/incremental.py b/tap_mysql/sync_strategies/incremental.py index b365d42..e953b21 100644 --- a/tap_mysql/sync_strategies/incremental.py +++ b/tap_mysql/sync_strategies/incremental.py @@ -78,6 +78,7 @@ def sync_table(mysql_conn, catalog_entry, state, columns, limit=None): select_sql, columns, stream_version, - params) + params, + mysql_conn) if limit is None or num_rows < limit: iterate_limit = False diff --git a/tests/nosetests/test_timeout.py b/tests/nosetests/test_timeout.py new file mode 100644 index 0000000..f556949 --- /dev/null +++ b/tests/nosetests/test_timeout.py @@ -0,0 +1,133 @@ +import unittest +import pymysql +import tap_mysql +import tap_mysql.connection as connection +from unittest import mock + +class MockParseArgs: + config = {} + def __init__(self, config): + self.config = config + +def get_args(config): + return MockParseArgs(config) + +class MockedConnection: + def __enter__(self): + return self + + def __exit__(self, *args, **kwargs): + pass + + def ping(*args, **kwargs): + pass + +@mock.patch("singer.utils.parse_args") +class TestTimeoutValue(unittest.TestCase): + + def test_timeout_value_in_config(self, mocked_parse_args): + + mock_config = {"request_timeout": 100} + # mock parse args + mocked_parse_args.return_value = get_args(mock_config) + + # get the timeout value for assertion + timeout = connection.get_request_timeout() + + # verify that we got expected timeout value + self.assertEquals(100.0, timeout) + + def test_timeout_value_not_in_config(self, mocked_parse_args): + + mock_config = {} + # mock parse args + mocked_parse_args.return_value = get_args(mock_config) + + # get the timeout value for assertion + timeout = connection.get_request_timeout() + + # verify that we got expected timeout value + self.assertEquals(3600, timeout) + + def test_timeout_string_value_in_config(self, mocked_parse_args): + + mock_config = {"request_timeout": "100"} + # mock parse args + mocked_parse_args.return_value = get_args(mock_config) + + # get the timeout value for assertion + timeout = connection.get_request_timeout() + + # verify that we got expected timeout value + self.assertEquals(100.0, timeout) + + def test_timeout_empty_value_in_config(self, mocked_parse_args): + + mock_config = {"request_timeout": ""} + # mock parse args + mocked_parse_args.return_value = get_args(mock_config) + + # get the timeout value for assertion + timeout = connection.get_request_timeout() + + # verify that we got expected timeout value + self.assertEquals(3600, timeout) + + def test_timeout_0_value_in_config(self, mocked_parse_args): + + mock_config = {"request_timeout": 0.0} + # mock parse args + mocked_parse_args.return_value = get_args(mock_config) + + # get the timeout value for assertion + timeout = connection.get_request_timeout() + + # verify that we got expected timeout value + self.assertEquals(3600, timeout) + + def test_timeout_string_0_value_in_config(self, mocked_parse_args): + + mock_config = {"request_timeout": "0.0"} + # mock parse args + mocked_parse_args.return_value = get_args(mock_config) + + # get the timeout value for assertion + timeout = connection.get_request_timeout() + + # verify that we got expected timeout value + self.assertEquals(3600, timeout) + +@mock.patch("time.sleep") +class TestTimeoutBackoff(unittest.TestCase): + + def test_timeout_backoff(self, mocked_sleep): + + # create mock class of cursor + cursor = mock.MagicMock() + # raise timeout error for "cursor.execute" + cursor.execute.side_effect = pymysql.err.OperationalError(2013, 'Lost connection to MySQL server during query (timed out)') + + try: + # function call + tap_mysql.execute_query(cursor, "SELECT * from Test", None, MockedConnection) + except pymysql.err.OperationalError: + pass + + # verify that we backoff for 5 times + self.assertEquals(cursor.execute.call_count, 5) + + def test_timeout_error_not_occurred(self, mocked_sleep): + + # create mock class of cursor + cursor = mock.MagicMock() + # raise any error other than timeout error for "cursor.execute" + cursor.execute.side_effect = pymysql.err.OperationalError(2003, 'Can\'t connect to MySQL server on \'localhost\' (111)') + + try: + # function call + tap_mysql.execute_query(cursor, "SELECT * from Test", None, MockedConnection) + except pymysql.err.OperationalError: + pass + + # verify that we did not backoff as timeout error has not occurred + self.assertEquals(cursor.execute.call_count, 1) From fda173d7c1db22421bf7ec38bcc9df6d7e4c6239 Mon Sep 17 00:00:00 2001 From: harshpatel4_crest Date: Tue, 21 Dec 2021 18:02:07 +0530 Subject: [PATCH 2/9] resolve pylint error --- tap_mysql/__init__.py | 47 +++-------------------------- tap_mysql/sync_strategies/common.py | 38 ++++++++++++++++++++++- tests/nosetests/test_timeout.py | 6 ++-- 3 files changed, 45 insertions(+), 46 deletions(-) diff --git a/tap_mysql/__init__.py b/tap_mysql/__init__.py index 2d103bf..1cecee4 100644 --- a/tap_mysql/__init__.py +++ b/tap_mysql/__init__.py @@ -3,12 +3,10 @@ import datetime import collections -import functools import itertools from itertools import dropwhile import copy import os -import backoff import pendulum import pymysql @@ -75,41 +73,6 @@ DATETIME_TYPES = set(['datetime', 'timestamp', 'date', 'time']) -# boolean function to check if the error is 'timeout' error or not -def is_timeout_error(): - """ - This function checks whether the URLError contains 'timed out' substring and return boolean - values accordingly, to decide whether to backoff or not. - """ - def gen_fn(exc): - if str(exc).__contains__('timed out'): - # retry if the error string contains 'timed out' - return False - return True - - return gen_fn - -def reconnect(details): - # get connection and reconnect - connection = details.get("args")[3] - connection.ping(reconnect=True) - -def backoff_timeout_error(fnc): - @backoff.on_exception(backoff.expo, - (pymysql.err.OperationalError), - giveup=is_timeout_error(), - on_backoff=reconnect, - max_tries=5, - factor=2) - @functools.wraps(fnc) - def wrapper(*args, **kwargs): - return fnc(*args, **kwargs) - return wrapper - -@backoff_timeout_error -def execute_query(cursor, query, params, conn): - cursor.execute(query, params) - def schema_for_column(c): '''Returns the Schema object for the given Column.''' data_type = c.data_type.lower() @@ -200,7 +163,7 @@ def discover_catalog(mysql_conn, config): with connect_with_backoff(mysql_conn) as open_conn: with open_conn.cursor() as cur: - execute_query(cur, """ + common.execute_query(cur, """ SELECT table_schema, table_name, table_type, @@ -220,7 +183,7 @@ def discover_catalog(mysql_conn, config): 'is_view': table_type == 'VIEW' } - execute_query(cur, """ + common.execute_query(cur, """ SELECT table_schema, table_name, column_name, @@ -355,7 +318,7 @@ def log_engine(mysql_conn, catalog_entry): else: with connect_with_backoff(mysql_conn) as open_conn: with open_conn.cursor() as cur: - execute_query(cur, """ + common.execute_query(cur, """ SELECT engine FROM information_schema.tables WHERE table_schema = %s @@ -719,7 +682,7 @@ def log_server_params(mysql_conn): with connect_with_backoff(mysql_conn) as open_conn: try: with open_conn.cursor() as cur: - execute_query(cur, ''' + common.execute_query(cur, ''' SELECT VERSION() as version, @@session.wait_timeout as wait_timeout, @@session.innodb_lock_wait_timeout as innodb_lock_wait_timeout, @@ -734,7 +697,7 @@ def log_server_params(mysql_conn): 'interactive_timeout: %s', *row) with open_conn.cursor() as cur: - execute_query(cur, ''' + common.execute_query(cur, ''' show session status where Variable_name IN ('Ssl_version', 'Ssl_cipher')''', None, mysql_conn) rows = cur.fetchall() mapped_row = dict(rows) diff --git a/tap_mysql/sync_strategies/common.py b/tap_mysql/sync_strategies/common.py index 2fcc60f..30319c6 100644 --- a/tap_mysql/sync_strategies/common.py +++ b/tap_mysql/sync_strategies/common.py @@ -3,7 +3,8 @@ import copy import datetime -from tap_mysql.__init__ import execute_query as execute_query +import functools +import backoff import singer import time import tzlocal @@ -48,6 +49,41 @@ def monkey_patch_date(date_str): #-------------------------------------------------------------------------------------------- #-------------------------------------------------------------------------------------------- +# boolean function to check if the error is 'timeout' error or not +def is_timeout_error(): + """ + This function checks whether the URLError contains 'timed out' substring and return boolean + values accordingly, to decide whether to backoff or not. + """ + def gen_fn(exc): + if str(exc).__contains__('timed out'): + # retry if the error string contains 'timed out' + return False + return True + + return gen_fn + +def reconnect(details): + # get connection and reconnect + connection = details.get("args")[3] + connection.ping(reconnect=True) + +def backoff_timeout_error(fnc): + @backoff.on_exception(backoff.expo, + (pymysql.err.OperationalError), + giveup=is_timeout_error(), + on_backoff=reconnect, + max_tries=5, + factor=2) + @functools.wraps(fnc) + def wrapper(*args, **kwargs): + return fnc(*args, **kwargs) + return wrapper + +@backoff_timeout_error +def execute_query(cursor, query, params, conn): + cursor.execute(query, params) + def escape(string): if '`' in string: raise Exception("Can't escape identifier {} because it contains a backtick" diff --git a/tests/nosetests/test_timeout.py b/tests/nosetests/test_timeout.py index f556949..1237154 100644 --- a/tests/nosetests/test_timeout.py +++ b/tests/nosetests/test_timeout.py @@ -1,6 +1,6 @@ import unittest import pymysql -import tap_mysql +from tap_mysql.sync_strategies.common import execute_query import tap_mysql.connection as connection from unittest import mock @@ -109,7 +109,7 @@ def test_timeout_backoff(self, mocked_sleep): try: # function call - tap_mysql.execute_query(cursor, "SELECT * from Test", None, MockedConnection) + execute_query(cursor, "SELECT * from Test", None, MockedConnection) except pymysql.err.OperationalError: pass @@ -125,7 +125,7 @@ def test_timeout_error_not_occurred(self, mocked_sleep): try: # function call - tap_mysql.execute_query(cursor, "SELECT * from Test", None, MockedConnection) + execute_query(cursor, "SELECT * from Test", None, MockedConnection) except pymysql.err.OperationalError: pass From 1dd1f663c9a78c96ad838fdeea735eeb404400ec Mon Sep 17 00:00:00 2001 From: harshpatel4_crest Date: Wed, 22 Dec 2021 14:21:41 +0530 Subject: [PATCH 3/9] resolve unittest failure --- tests/nosetests/test_date_types.py | 9 +- .../nosetests/test_full_table_interruption.py | 21 +++- tests/nosetests/test_tap_mysql.py | 118 +++++++++++++----- tests/nosetests/utils.py | 8 ++ 4 files changed, 119 insertions(+), 37 deletions(-) diff --git a/tests/nosetests/test_date_types.py b/tests/nosetests/test_date_types.py index 6fe9749..a603fe0 100644 --- a/tests/nosetests/test_date_types.py +++ b/tests/nosetests/test_date_types.py @@ -1,4 +1,5 @@ import unittest +from unittest import mock import pymysql import tap_mysql import copy @@ -36,7 +37,9 @@ def accumulate_singer_messages(message): class TestDateTypes(unittest.TestCase): - def setUp(self): + @mock.patch("singer.utils.parse_args") + def setUp(self, mocked_parse_args): + mocked_parse_args.return_value = test_utils.get_args({}) self.conn = test_utils.get_test_connection() self.state = {} @@ -85,7 +88,9 @@ def setUp(self): 'version', singer.utils.now()) - def test_initial_full_table(self): + @mock.patch("singer.utils.parse_args") + def test_initial_full_table(self, mocked_parse_args): + mocked_parse_args.return_value = test_utils.get_args({}) state = {} expected_log_file, expected_log_pos = binlog.fetch_current_log_file_and_pos(self.conn) diff --git a/tests/nosetests/test_full_table_interruption.py b/tests/nosetests/test_full_table_interruption.py index 5f5aa84..9f55715 100644 --- a/tests/nosetests/test_full_table_interruption.py +++ b/tests/nosetests/test_full_table_interruption.py @@ -1,5 +1,6 @@ import copy import os +from unittest import mock import pymysql import unittest import singer @@ -133,7 +134,9 @@ def init_tables(conn): class BinlogInterruption(unittest.TestCase): - def setUp(self): + @mock.patch("singer.utils.parse_args") + def setUp(self, mocked_parse_args): + mocked_parse_args.return_value = test_utils.get_args({}) self.conn = test_utils.get_test_connection() self.catalog = init_tables(self.conn) @@ -164,7 +167,9 @@ def setUp(self): global SINGER_MESSAGES SINGER_MESSAGES.clear() - def test_table_2_interrupted(self): + @mock.patch("singer.utils.parse_args") + def test_table_2_interrupted(self, mocked_parse_args): + mocked_parse_args.return_value = test_utils.get_args({}) singer.write_message = singer_write_message_no_table_2 state = {} @@ -291,7 +296,9 @@ def test_table_2_interrupted(self): self.assertIsNotNone(table_2_bookmark.get('log_file')) self.assertIsNotNone(table_2_bookmark.get('log_pos')) - def test_table_3_interrupted(self): + @mock.patch("singer.utils.parse_args") + def test_table_3_interrupted(self, mocked_parse_args): + mocked_parse_args.return_value = test_utils.get_args({}) singer.write_message = singer_write_message_no_table_3 state = {} @@ -424,7 +431,9 @@ def test_table_3_interrupted(self): self.assertIsNotNone(table_3_bookmark.get('log_pos')) class FullTableInterruption(unittest.TestCase): - def setUp(self): + @mock.patch("singer.utils.parse_args") + def setUp(self, mocked_parse_args): + mocked_parse_args.return_value = test_utils.get_args({}) self.conn = test_utils.get_test_connection() self.catalog = init_tables(self.conn) @@ -448,7 +457,9 @@ def setUp(self): global SINGER_MESSAGES SINGER_MESSAGES.clear() - def test_table_2_interrupted(self): + @mock.patch("singer.utils.parse_args") + def test_table_2_interrupted(self, mocked_parse_args): + mocked_parse_args.return_value = test_utils.get_args({}) singer.write_message = singer_write_message_no_table_2 state = {} diff --git a/tests/nosetests/test_tap_mysql.py b/tests/nosetests/test_tap_mysql.py index 11e55fe..80cef4d 100644 --- a/tests/nosetests/test_tap_mysql.py +++ b/tests/nosetests/test_tap_mysql.py @@ -1,4 +1,5 @@ import unittest +from unittest import mock import pymysql import tap_mysql import copy @@ -37,7 +38,9 @@ def accumulate_singer_messages(message): class TestTypeMapping(unittest.TestCase): @classmethod - def setUpClass(cls): + @mock.patch("singer.utils.parse_args") + def setUpClass(cls, mocked_parse_args): + mocked_parse_args.return_value = test_utils.get_args({}) conn = test_utils.get_test_connection() with connect_with_backoff(conn) as open_conn: @@ -248,7 +251,9 @@ def runTest(self): class TestSchemaMessages(unittest.TestCase): - def runTest(self): + @mock.patch("singer.utils.parse_args") + def runTest(self, mocked_parse_args): + mocked_parse_args.return_value = test_utils.get_args({}) conn = test_utils.get_test_connection() with connect_with_backoff(conn) as open_conn: @@ -290,7 +295,9 @@ def currently_syncing_seq(messages): class TestCurrentStream(unittest.TestCase): - def setUp(self): + @mock.patch("singer.utils.parse_args") + def setUp(self, mocked_parse_args): + mocked_parse_args.return_value = test_utils.get_args({}) self.conn = test_utils.get_test_connection() with connect_with_backoff(self.conn) as open_conn: @@ -315,7 +322,9 @@ def setUp(self): stream.stream = stream.table test_utils.set_replication_method_and_key(stream, 'FULL_TABLE', None) - def test_emit_currently_syncing(self): + @mock.patch("singer.utils.parse_args") + def test_emit_currently_syncing(self, mocked_parse_args): + mocked_parse_args.return_value = test_utils.get_args({}) state = {} global SINGER_MESSAGES @@ -324,7 +333,9 @@ def test_emit_currently_syncing(self): tap_mysql.do_sync(self.conn, {}, self.catalog, state) self.assertRegexpMatches(currently_syncing_seq(SINGER_MESSAGES), '^a+b+c+_+') - def test_start_at_currently_syncing(self): + @mock.patch("singer.utils.parse_args") + def test_start_at_currently_syncing(self, mocked_parse_args): + mocked_parse_args.return_value = test_utils.get_args({}) state = { 'currently_syncing': 'tap_mysql_test-b', 'bookmarks': { @@ -356,7 +367,9 @@ def message_types_and_versions(messages): class TestStreamVersionFullTable(unittest.TestCase): - def setUp(self): + @mock.patch("singer.utils.parse_args") + def setUp(self, mocked_parse_args): + mocked_parse_args.return_value = test_utils.get_args({}) self.conn = test_utils.get_test_connection() with connect_with_backoff(self.conn) as open_conn: @@ -376,7 +389,9 @@ def setUp(self): stream.stream = stream.table test_utils.set_replication_method_and_key(stream, 'FULL_TABLE', None) - def test_with_no_state(self): + @mock.patch("singer.utils.parse_args") + def test_with_no_state(self, mocked_parse_args): + mocked_parse_args.return_value = test_utils.get_args({}) state = {} global SINGER_MESSAGES @@ -389,7 +404,9 @@ def test_with_no_state(self): self.assertTrue(isinstance(versions[0], int)) self.assertEqual(versions[0], versions[1]) - def test_with_no_initial_full_table_complete_in_state(self): + @mock.patch("singer.utils.parse_args") + def test_with_no_initial_full_table_complete_in_state(self, mocked_parse_args): + mocked_parse_args.return_value = test_utils.get_args({}) common.get_stream_version = lambda a, b: 12345 state = { @@ -413,7 +430,9 @@ def test_with_no_initial_full_table_complete_in_state(self): self.assertFalse('version' in state['bookmarks']['tap_mysql_test-full_table'].keys()) self.assertTrue(state['bookmarks']['tap_mysql_test-full_table']['initial_full_table_complete']) - def test_with_initial_full_table_complete_in_state(self): + @mock.patch("singer.utils.parse_args") + def test_with_initial_full_table_complete_in_state(self, mocked_parse_args): + mocked_parse_args.return_value = test_utils.get_args({}) common.get_stream_version = lambda a, b: 12345 state = { @@ -433,7 +452,9 @@ def test_with_initial_full_table_complete_in_state(self): self.assertEqual(['RecordMessage', 'ActivateVersionMessage'], message_types) self.assertEqual(versions, [12345, 12345]) - def test_version_cleared_from_state_after_full_table_success(self): + @mock.patch("singer.utils.parse_args") + def test_version_cleared_from_state_after_full_table_success(self, mocked_parse_args): + mocked_parse_args.return_value = test_utils.get_args({}) common.get_stream_version = lambda a, b: 12345 state = { @@ -460,7 +481,9 @@ def test_version_cleared_from_state_after_full_table_success(self): class TestIncrementalReplication(unittest.TestCase): - def setUp(self): + @mock.patch("singer.utils.parse_args") + def setUp(self, mocked_parse_args): + mocked_parse_args.return_value = test_utils.get_args({}) self.conn = test_utils.get_test_connection() with connect_with_backoff(self.conn) as open_conn: @@ -490,7 +513,9 @@ def setUp(self): stream.stream = stream.table test_utils.set_replication_method_and_key(stream, 'INCREMENTAL', 'updated') - def test_with_no_state(self): + @mock.patch("singer.utils.parse_args") + def test_with_no_state(self, mocked_parse_args): + mocked_parse_args.return_value = test_utils.get_args({}) state = {} global SINGER_MESSAGES @@ -513,8 +538,9 @@ def test_with_no_state(self): self.assertTrue(isinstance(versions[0], int)) self.assertEqual(versions[0], versions[1]) - - def test_with_state(self): + @mock.patch("singer.utils.parse_args") + def test_with_state(self, mocked_parse_args): + mocked_parse_args.return_value = test_utils.get_args({}) state = { 'bookmarks': { 'tap_mysql_test-incremental': { @@ -547,7 +573,9 @@ def test_with_state(self): self.assertEqual(versions[0], versions[1]) self.assertEqual(versions[1], 1) - def test_change_replication_key(self): + @mock.patch("singer.utils.parse_args") + def test_change_replication_key(self, mocked_parse_args): + mocked_parse_args.return_value = test_utils.get_args({}) state = { 'bookmarks': { 'tap_mysql_test-incremental': { @@ -574,7 +602,9 @@ def test_change_replication_key(self): self.assertEqual(state['bookmarks']['tap_mysql_test-incremental']['replication_key_value'], 3) self.assertEqual(state['bookmarks']['tap_mysql_test-incremental']['version'], 1) - def test_version_not_cleared_from_state_after_incremental_success(self): + @mock.patch("singer.utils.parse_args") + def test_version_not_cleared_from_state_after_incremental_success(self, mocked_parse_args): + mocked_parse_args.return_value = test_utils.get_args({}) state = { 'bookmarks': { 'tap_mysql_test-incremental': { @@ -591,7 +621,9 @@ def test_version_not_cleared_from_state_after_incremental_success(self): class TestBinlogReplication(unittest.TestCase): - def setUp(self): + @mock.patch("singer.utils.parse_args") + def setUp(self, mocked_parse_args): + mocked_parse_args.return_value = test_utils.get_args({}) self.maxDiff = None self.state = {} self.conn = test_utils.get_test_connection() @@ -648,7 +680,9 @@ def setUp(self): 'version', singer.utils.now()) - def test_initial_full_table(self): + @mock.patch("singer.utils.parse_args") + def test_initial_full_table(self, mocked_parse_args): + mocked_parse_args.return_value = test_utils.get_args({}) state = {} expected_log_file, expected_log_pos = binlog.fetch_current_log_file_and_pos(self.conn) @@ -697,7 +731,9 @@ def test_initial_full_table(self): self.assertEqual(singer.get_bookmark(state, 'tap_mysql_test-binlog_2', 'version'), activate_version_message_2.version) - def test_fail_on_view(self): + @mock.patch("singer.utils.parse_args") + def test_fail_on_view(self, mocked_parse_args): + mocked_parse_args.return_value = test_utils.get_args({}) for stream in self.catalog.streams: md = singer.metadata.to_map(stream.metadata) singer.metadata.write(md, (), 'is-view', True) @@ -719,7 +755,9 @@ def test_fail_on_view(self): self.assertEqual(expected_exception_message, exception_message) - def test_fail_if_log_file_does_not_exist(self): + @mock.patch("singer.utils.parse_args") + def test_fail_if_log_file_does_not_exist(self, mocked_parse_args): + mocked_parse_args.return_value = test_utils.get_args({}) log_file = 'chicken' stream = self.catalog.streams[0] state = { @@ -747,7 +785,9 @@ def test_fail_if_log_file_does_not_exist(self): LOGGER.error(exception_message) - def test_binlog_stream(self): + @mock.patch("singer.utils.parse_args") + def test_binlog_stream(self, mocked_parse_args): + mocked_parse_args.return_value = test_utils.get_args({}) global SINGER_MESSAGES SINGER_MESSAGES.clear() @@ -798,7 +838,9 @@ def test_binlog_stream(self): class TestViews(unittest.TestCase): - def setUp(self): + @mock.patch("singer.utils.parse_args") + def setUp(self, mocked_parse_args): + mocked_parse_args.return_value = test_utils.get_args({}) self.conn = test_utils.get_test_connection() with connect_with_backoff(self.conn) as open_conn: @@ -816,7 +858,9 @@ def setUp(self): CREATE VIEW a_view AS SELECT id, a FROM a_table ''') - def test_discovery_sets_is_view(self): + @mock.patch("singer.utils.parse_args") + def test_discovery_sets_is_view(self, mocked_parse_args): + mocked_parse_args.return_value = test_utils.get_args({}) catalog = test_utils.discover_catalog(self.conn, {}) is_view = {} @@ -829,7 +873,9 @@ def test_discovery_sets_is_view(self): {'a_table': False, 'a_view': True}) - def test_do_not_discover_key_properties_for_view(self): + @mock.patch("singer.utils.parse_args") + def test_do_not_discover_key_properties_for_view(self, mocked_parse_args): + mocked_parse_args.return_value = test_utils.get_args({}) catalog = test_utils.discover_catalog(self.conn, {}) primary_keys = {} for c in catalog.streams: @@ -842,7 +888,9 @@ def test_do_not_discover_key_properties_for_view(self): class TestEscaping(unittest.TestCase): - def setUp(self): + @mock.patch("singer.utils.parse_args") + def setUp(self, mocked_parse_args): + mocked_parse_args.return_value = test_utils.get_args({}) self.conn = test_utils.get_test_connection() with connect_with_backoff(self.conn) as open_conn: @@ -866,7 +914,9 @@ def setUp(self): test_utils.set_replication_method_and_key(self.catalog.streams[0], 'FULL_TABLE', None) - def runTest(self): + @mock.patch("singer.utils.parse_args") + def runTest(self, mocked_parse_args): + mocked_parse_args.return_value = test_utils.get_args({}) global SINGER_MESSAGES SINGER_MESSAGES.clear() tap_mysql.do_sync(self.conn, {}, self.catalog, {}) @@ -878,7 +928,9 @@ def runTest(self): class TestJsonTables(unittest.TestCase): - def setUp(self): + @mock.patch("singer.utils.parse_args") + def setUp(self, mocked_parse_args): + mocked_parse_args.return_value = test_utils.get_args({}) self.conn = test_utils.get_test_connection() with connect_with_backoff(self.conn) as open_conn: @@ -898,7 +950,9 @@ def setUp(self): stream.stream = stream.table test_utils.set_replication_method_and_key(stream, 'FULL_TABLE', None) - def runTest(self): + @mock.patch("singer.utils.parse_args") + def runTest(self, mocked_parse_args): + mocked_parse_args.return_value = test_utils.get_args({}) global SINGER_MESSAGES SINGER_MESSAGES.clear() tap_mysql.do_sync(self.conn, {}, self.catalog, {}) @@ -909,7 +963,9 @@ def runTest(self): class TestUnsupportedPK(unittest.TestCase): - def setUp(self): + @mock.patch("singer.utils.parse_args") + def setUp(self, mocked_parse_args): + mocked_parse_args.return_value = test_utils.get_args({}) self.conn = test_utils.get_test_connection() with connect_with_backoff(self.conn) as open_conn: @@ -919,7 +975,9 @@ def setUp(self): cursor.execute("INSERT INTO bad_pk_tab (bad_pk, age) VALUES ('a', 100)") cursor.execute("INSERT INTO good_pk_tab (good_pk, age) VALUES (1, 100)") - def runTest(self): + @mock.patch("singer.utils.parse_args") + def runTest(self, mocked_parse_args): + mocked_parse_args.return_value = test_utils.get_args({}) catalog = test_utils.discover_catalog(self.conn, {}) primary_keys = {} diff --git a/tests/nosetests/utils.py b/tests/nosetests/utils.py index d13a1c2..82a0dab 100644 --- a/tests/nosetests/utils.py +++ b/tests/nosetests/utils.py @@ -7,6 +7,14 @@ DB_NAME='tap_mysql_test' +class MockParseArgs: + config = {} + def __init__(self, config): + self.config = config + +def get_args(config): + return MockParseArgs(config) + def get_db_config(): config = {} config['host'] = os.environ.get('TAP_MYSQL_HOST') From cca232884934c8ddfd0ab21419b159c86f94fe34 Mon Sep 17 00:00:00 2001 From: harshpatel4_crest Date: Mon, 27 Dec 2021 17:58:09 +0530 Subject: [PATCH 4/9] use int timeout value rather than float --- tap_mysql/connection.py | 20 ++++++++++++----- tests/nosetests/test_timeout.py | 40 +++++++++++++++++++++++++++++++-- 2 files changed, 52 insertions(+), 8 deletions(-) diff --git a/tap_mysql/connection.py b/tap_mysql/connection.py index ca4cb61..3c50c35 100644 --- a/tap_mysql/connection.py +++ b/tap_mysql/connection.py @@ -21,13 +21,21 @@ def get_request_timeout(): # get the value of request timeout from config config_request_timeout = args.config.get("request_timeout") - # only return the timeout value if it is passed in the config and the value is not 0, "0" or "" - if config_request_timeout and float(config_request_timeout): - # return the timeout from config - return float(config_request_timeout) + # return default value if timeout from config is none or empty + if not config_request_timeout: + return READ_TIMEOUT_SECONDS + + if isinstance(config_request_timeout, int): + # return value from config + return config_request_timeout + elif isinstance(config_request_timeout, str) and config_request_timeout.isdigit(): + # return default value if timeout from config is "0" and integer casted value of valid value + return int(config_request_timeout) if int(config_request_timeout) else READ_TIMEOUT_SECONDS + + # raise Exception as MySql dose not support float values + # Document: https://dev.mysql.com/doc/refman/8.0/en/server-system-variables.html#sysvar_net_read_timeout + raise Exception("Unsupported value of timeout, please use string or integer type values.") - # return default timeout - return READ_TIMEOUT_SECONDS @backoff.on_exception(backoff.expo, (pymysql.err.OperationalError), diff --git a/tests/nosetests/test_timeout.py b/tests/nosetests/test_timeout.py index 1237154..3b6ad3d 100644 --- a/tests/nosetests/test_timeout.py +++ b/tests/nosetests/test_timeout.py @@ -25,7 +25,7 @@ def ping(*args, **kwargs): @mock.patch("singer.utils.parse_args") class TestTimeoutValue(unittest.TestCase): - def test_timeout_value_in_config(self, mocked_parse_args): + def test_timeout_int_value_in_config(self, mocked_parse_args): mock_config = {"request_timeout": 100} # mock parse args @@ -36,6 +36,7 @@ def test_timeout_value_in_config(self, mocked_parse_args): # verify that we got expected timeout value self.assertEquals(100.0, timeout) + self.assertTrue(isinstance(timeout, int)) def test_timeout_value_not_in_config(self, mocked_parse_args): @@ -48,6 +49,17 @@ def test_timeout_value_not_in_config(self, mocked_parse_args): # verify that we got expected timeout value self.assertEquals(3600, timeout) + self.assertTrue(isinstance(timeout, int)) + + def test_timeout_decimal_value_in_config(self, mocked_parse_args): + + mock_config = {"request_timeout": 100.1} + # mock parse args + mocked_parse_args.return_value = get_args(mock_config) + + with self.assertRaises(Exception): + # get the timeout value for assertion + timeout = connection.get_request_timeout() def test_timeout_string_value_in_config(self, mocked_parse_args): @@ -60,6 +72,17 @@ def test_timeout_string_value_in_config(self, mocked_parse_args): # verify that we got expected timeout value self.assertEquals(100.0, timeout) + self.assertTrue(isinstance(timeout, int)) + + def test_timeout_string_decimal_value_in_config(self, mocked_parse_args): + + mock_config = {"request_timeout": "100.1"} + # mock parse args + mocked_parse_args.return_value = get_args(mock_config) + + with self.assertRaises(Exception): + # get the timeout value for assertion + timeout = connection.get_request_timeout() def test_timeout_empty_value_in_config(self, mocked_parse_args): @@ -72,6 +95,7 @@ def test_timeout_empty_value_in_config(self, mocked_parse_args): # verify that we got expected timeout value self.assertEquals(3600, timeout) + self.assertTrue(isinstance(timeout, int)) def test_timeout_0_value_in_config(self, mocked_parse_args): @@ -84,10 +108,11 @@ def test_timeout_0_value_in_config(self, mocked_parse_args): # verify that we got expected timeout value self.assertEquals(3600, timeout) + self.assertTrue(isinstance(timeout, int)) def test_timeout_string_0_value_in_config(self, mocked_parse_args): - mock_config = {"request_timeout": "0.0"} + mock_config = {"request_timeout": "0"} # mock parse args mocked_parse_args.return_value = get_args(mock_config) @@ -96,6 +121,17 @@ def test_timeout_string_0_value_in_config(self, mocked_parse_args): # verify that we got expected timeout value self.assertEquals(3600, timeout) + self.assertTrue(isinstance(timeout, int)) + + def test_timeout_string_0_decimal_value_in_config(self, mocked_parse_args): + + mock_config = {"request_timeout": "0.0"} + # mock parse args + mocked_parse_args.return_value = get_args(mock_config) + + with self.assertRaises(Exception): + # get the timeout value for assertion + timeout = connection.get_request_timeout() @mock.patch("time.sleep") class TestTimeoutBackoff(unittest.TestCase): From 02ed1b7370fb8ef29654e973079483771c7166ca Mon Sep 17 00:00:00 2001 From: harshpatel4_crest Date: Mon, 27 Dec 2021 18:09:11 +0530 Subject: [PATCH 5/9] resolve pylint error --- tap_mysql/connection.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tap_mysql/connection.py b/tap_mysql/connection.py index 3c50c35..3ee8e20 100644 --- a/tap_mysql/connection.py +++ b/tap_mysql/connection.py @@ -25,7 +25,7 @@ def get_request_timeout(): if not config_request_timeout: return READ_TIMEOUT_SECONDS - if isinstance(config_request_timeout, int): + if isinstance(config_request_timeout, int): # pylint: disable=no-else-return # return value from config return config_request_timeout elif isinstance(config_request_timeout, str) and config_request_timeout.isdigit(): From 727729a0aeb4aa748d0f71872a0c1367b133d273 Mon Sep 17 00:00:00 2001 From: harshpatel4_crest Date: Thu, 30 Dec 2021 11:57:58 +0530 Subject: [PATCH 6/9] added decorator over cursor.execute --- tap_mysql/__init__.py | 32 +++++++++++----- tap_mysql/sync_strategies/binlog.py | 12 +++--- tap_mysql/sync_strategies/common.py | 12 ++---- tap_mysql/sync_strategies/full_table.py | 21 +++++----- tap_mysql/sync_strategies/incremental.py | 3 +- tests/nosetests/test_timeout.py | 49 ++++++++++++++---------- 6 files changed, 72 insertions(+), 57 deletions(-) diff --git a/tap_mysql/__init__.py b/tap_mysql/__init__.py index 1cecee4..72a2ee9 100644 --- a/tap_mysql/__init__.py +++ b/tap_mysql/__init__.py @@ -8,6 +8,7 @@ import copy import os import pendulum + import pymysql import singer @@ -163,14 +164,14 @@ def discover_catalog(mysql_conn, config): with connect_with_backoff(mysql_conn) as open_conn: with open_conn.cursor() as cur: - common.execute_query(cur, """ + cur.execute(""" SELECT table_schema, table_name, table_type, table_rows FROM information_schema.tables {} - """.format(table_schema_clause), None, mysql_conn) + """.format(table_schema_clause)) table_info = {} @@ -183,7 +184,7 @@ def discover_catalog(mysql_conn, config): 'is_view': table_type == 'VIEW' } - common.execute_query(cur, """ + cur.execute(""" SELECT table_schema, table_name, column_name, @@ -196,7 +197,7 @@ def discover_catalog(mysql_conn, config): FROM information_schema.columns {} ORDER BY table_schema, table_name - """.format(table_schema_clause), None, mysql_conn) + """.format(table_schema_clause)) columns = [] rec = cur.fetchone() @@ -318,12 +319,12 @@ def log_engine(mysql_conn, catalog_entry): else: with connect_with_backoff(mysql_conn) as open_conn: with open_conn.cursor() as cur: - common.execute_query(cur, """ + cur.execute(""" SELECT engine FROM information_schema.tables WHERE table_schema = %s AND table_name = %s - """, (database_name, catalog_entry.table), mysql_conn) + """, (database_name, catalog_entry.table)) row = cur.fetchone() @@ -682,12 +683,12 @@ def log_server_params(mysql_conn): with connect_with_backoff(mysql_conn) as open_conn: try: with open_conn.cursor() as cur: - common.execute_query(cur, ''' + cur.execute(''' SELECT VERSION() as version, @@session.wait_timeout as wait_timeout, @@session.innodb_lock_wait_timeout as innodb_lock_wait_timeout, @@session.max_allowed_packet as max_allowed_packet, - @@session.interactive_timeout as interactive_timeout''', None, mysql_conn) + @@session.interactive_timeout as interactive_timeout''') row = cur.fetchone() LOGGER.info('Server Parameters: ' + 'version: %s, ' + @@ -697,8 +698,8 @@ def log_server_params(mysql_conn): 'interactive_timeout: %s', *row) with open_conn.cursor() as cur: - common.execute_query(cur, ''' - show session status where Variable_name IN ('Ssl_version', 'Ssl_cipher')''', None, mysql_conn) + cur.execute(''' + show session status where Variable_name IN ('Ssl_version', 'Ssl_cipher')''') rows = cur.fetchall() mapped_row = dict(rows) LOGGER.info('Server SSL Parameters (blank means SSL is not active): ' + @@ -718,6 +719,17 @@ def main(): os.environ['TZ'] = 'UTC' mysql_conn = MySQLConnection(args.config) + + # add timeout error decorator on 'cursor.execute' + # In connection.py's 'make_connection_wrapper', the 'cursorclass' in config is set to the value from kwargs and in binlog.py's, + # 'sync_binlog_stream' when initializing 'BinLogStreamReader' we are passing connection_settings={}, as per the code at: + # https://github.com/noplay/python-mysql-replication/blob/main/pymysqlreplication/binlogstream.py#L282 + # the 'self.__connection_settings' will be {} and hence default cursor 'pymysql.cursors.SSCursor.execute' will be set + pymysql.cursors.SSCursor.execute = common.backoff_timeout_error(pymysql.cursors.SSCursor.execute) + # add decorator for 'cursorclass' from config + if args.config.get("cursorclass"): + args.config.get("cursorclass").execute = common.backoff_timeout_error(args.config.get("cursorclass").execute) + log_server_params(mysql_conn) if args.discover: diff --git a/tap_mysql/sync_strategies/binlog.py b/tap_mysql/sync_strategies/binlog.py index 4111cdc..06c23c4 100644 --- a/tap_mysql/sync_strategies/binlog.py +++ b/tap_mysql/sync_strategies/binlog.py @@ -62,7 +62,7 @@ def add_automatic_properties(catalog_entry, columns): def verify_binlog_config(mysql_conn): with connect_with_backoff(mysql_conn) as open_conn: with open_conn.cursor() as cur: - common.execute_query(cur, "SELECT @@binlog_format", None, mysql_conn) + cur.execute("SELECT @@binlog_format") binlog_format = cur.fetchone()[0] if binlog_format != 'ROW': @@ -70,7 +70,7 @@ def verify_binlog_config(mysql_conn): .format(binlog_format)) try: - common.execute_query(cur, "SELECT @@binlog_row_image", None, mysql_conn) + cur.execute("SELECT @@binlog_row_image") binlog_row_image = cur.fetchone()[0] except pymysql.err.InternalError as ex: if ex.args[0] == 1193: @@ -86,7 +86,7 @@ def verify_binlog_config(mysql_conn): def verify_log_file_exists(mysql_conn, log_file, log_pos): with connect_with_backoff(mysql_conn) as open_conn: with open_conn.cursor() as cur: - common.execute_query(cur, "SHOW BINARY LOGS", None, mysql_conn) + cur.execute("SHOW BINARY LOGS") result = cur.fetchall() existing_log_file = list(filter(lambda log: log[0] == log_file, result)) @@ -105,7 +105,7 @@ def verify_log_file_exists(mysql_conn, log_file, log_pos): def fetch_current_log_file_and_pos(mysql_conn): with connect_with_backoff(mysql_conn) as open_conn: with open_conn.cursor() as cur: - common.execute_query(cur, "SHOW MASTER STATUS", None, mysql_conn) + cur.execute("SHOW MASTER STATUS") result = cur.fetchone() @@ -120,7 +120,7 @@ def fetch_current_log_file_and_pos(mysql_conn): def fetch_server_id(mysql_conn): with connect_with_backoff(mysql_conn) as open_conn: with open_conn.cursor() as cur: - common.execute_query(cur, "SELECT @@server_id", None, mysql_conn) + cur.execute("SELECT @@server_id") server_id = cur.fetchone()[0] return server_id @@ -199,7 +199,7 @@ def calculate_bookmark(mysql_conn, binlog_streams_map, state): with connect_with_backoff(mysql_conn) as open_conn: with open_conn.cursor() as cur: - common.execute_query(cur, "SHOW BINARY LOGS", None, mysql_conn) + cur.execute("SHOW BINARY LOGS") binary_logs = cur.fetchall() diff --git a/tap_mysql/sync_strategies/common.py b/tap_mysql/sync_strategies/common.py index 30319c6..be86e69 100644 --- a/tap_mysql/sync_strategies/common.py +++ b/tap_mysql/sync_strategies/common.py @@ -64,8 +64,8 @@ def gen_fn(exc): return gen_fn def reconnect(details): - # get connection and reconnect - connection = details.get("args")[3] + # get connection as 1st param will be 'self' and reconnect + connection = details.get("args")[0].connection connection.ping(reconnect=True) def backoff_timeout_error(fnc): @@ -80,10 +80,6 @@ def wrapper(*args, **kwargs): return fnc(*args, **kwargs) return wrapper -@backoff_timeout_error -def execute_query(cursor, query, params, conn): - cursor.execute(query, params) - def escape(string): if '`' in string: raise Exception("Can't escape identifier {} because it contains a backtick" @@ -224,7 +220,7 @@ def whitelist_bookmark_keys(bookmark_key_set, tap_stream_id, state): singer.clear_bookmark(state, tap_stream_id, bk) -def sync_query(cursor, catalog_entry, state, select_sql, columns, stream_version, params, mysql_conn): +def sync_query(cursor, catalog_entry, state, select_sql, columns, stream_version, params): replication_key = singer.get_bookmark(state, catalog_entry.tap_stream_id, 'replication_key') @@ -234,7 +230,7 @@ def sync_query(cursor, catalog_entry, state, select_sql, columns, stream_version time_extracted = utils.now() LOGGER.info('Running %s', query_string) - execute_query(cursor, select_sql, params, mysql_conn) + cursor.execute(select_sql, params) row = cursor.fetchone() rows_saved = 0 diff --git a/tap_mysql/sync_strategies/full_table.py b/tap_mysql/sync_strategies/full_table.py index 5d31600..c5f2bff 100644 --- a/tap_mysql/sync_strategies/full_table.py +++ b/tap_mysql/sync_strategies/full_table.py @@ -64,9 +64,9 @@ def sync_is_resumable(mysql_conn, catalog_entry): with connect_with_backoff(mysql_conn) as open_conn: with open_conn.cursor() as cur: for pk in key_properties: - common.execute_query(cur, sql.format(database_name, - catalog_entry.table, - pk), None, mysql_conn) + cur.execute(sql.format(database_name, + catalog_entry.table, + pk)) result = cur.fetchone() @@ -82,7 +82,7 @@ def sync_is_resumable(mysql_conn, catalog_entry): return True -def get_max_pk_values(cursor, catalog_entry, mysql_conn): +def get_max_pk_values(cursor, catalog_entry): database_name = common.get_database_name(catalog_entry) escaped_db = common.escape(database_name) escaped_table = common.escape(catalog_entry.table) @@ -96,9 +96,9 @@ def get_max_pk_values(cursor, catalog_entry, mysql_conn): select_column_clause = ", ".join(["max(" + pk + ")" for pk in escaped_columns]) - common.execute_query(cursor, sql.format(select_column_clause, + cursor.execute(sql.format(select_column_clause, escaped_db, - escaped_table), None, mysql_conn) + escaped_table)) result = cursor.fetchone() processed_results = [] for bm in result: @@ -200,10 +200,10 @@ def generate_pk_clause(catalog_entry, state): return sql -def update_incremental_full_table_state(catalog_entry, state, cursor, mysql_conn): +def update_incremental_full_table_state(catalog_entry, state, cursor): max_pk_values = singer.get_bookmark(state, catalog_entry.tap_stream_id, - 'max_pk_values') or get_max_pk_values(cursor, catalog_entry, mysql_conn) + 'max_pk_values') or get_max_pk_values(cursor, catalog_entry) if not max_pk_values: @@ -251,7 +251,7 @@ def sync_table(mysql_conn, catalog_entry, state, columns, stream_version): if perform_resumable_sync: LOGGER.info("Full table sync is resumable based on primary key definition, will replicate incrementally") - state = update_incremental_full_table_state(catalog_entry, state, cur, mysql_conn) + state = update_incremental_full_table_state(catalog_entry, state, cur) pk_clause = generate_pk_clause(catalog_entry, state) select_sql += pk_clause @@ -263,8 +263,7 @@ def sync_table(mysql_conn, catalog_entry, state, columns, stream_version): select_sql, columns, stream_version, - params, - mysql_conn) + params) # clear max pk value and last pk fetched upon successful sync singer.clear_bookmark(state, catalog_entry.tap_stream_id, 'max_pk_values') diff --git a/tap_mysql/sync_strategies/incremental.py b/tap_mysql/sync_strategies/incremental.py index e953b21..b365d42 100644 --- a/tap_mysql/sync_strategies/incremental.py +++ b/tap_mysql/sync_strategies/incremental.py @@ -78,7 +78,6 @@ def sync_table(mysql_conn, catalog_entry, state, columns, limit=None): select_sql, columns, stream_version, - params, - mysql_conn) + params) if limit is None or num_rows < limit: iterate_limit = False diff --git a/tests/nosetests/test_timeout.py b/tests/nosetests/test_timeout.py index 3b6ad3d..5508414 100644 --- a/tests/nosetests/test_timeout.py +++ b/tests/nosetests/test_timeout.py @@ -1,7 +1,7 @@ import unittest import pymysql -from tap_mysql.sync_strategies.common import execute_query import tap_mysql.connection as connection +from tap_mysql.sync_strategies.common import backoff_timeout_error from unittest import mock class MockParseArgs: @@ -13,6 +13,9 @@ def get_args(config): return MockParseArgs(config) class MockedConnection: + def __init__(self, *args, **kwargs): + pass + def __enter__(self): return self @@ -22,6 +25,9 @@ def __exit__(self, *args, **kwargs): def ping(*args, **kwargs): pass + def show_warnings(*args, **kwargs): + pass + @mock.patch("singer.utils.parse_args") class TestTimeoutValue(unittest.TestCase): @@ -134,36 +140,39 @@ def test_timeout_string_0_decimal_value_in_config(self, mocked_parse_args): timeout = connection.get_request_timeout() @mock.patch("time.sleep") +@mock.patch("singer.utils.parse_args") class TestTimeoutBackoff(unittest.TestCase): - def test_timeout_backoff(self, mocked_sleep): - - # create mock class of cursor - cursor = mock.MagicMock() - # raise timeout error for "cursor.execute" - cursor.execute.side_effect = pymysql.err.OperationalError(2013, 'Lost connection to MySQL server during query (timed out)') - + @mock.patch("pymysql.cursors.SSCursor.execute") + def test_timeout_backoff(self, mocked_cursor_execute, mocked_parse_args, mocked_sleep): + # mock 'cursor.execute' and raise error + mocked_cursor_execute.side_effect = pymysql.err.OperationalError(2013, 'Lost connection to MySQL server during query (timed out)') + # add decorator on 'cursor.execute' + pymysql.cursors.SSCursor.execute = backoff_timeout_error(pymysql.cursors.SSCursor.execute) + # initialize cursor + cursor = pymysql.cursors.SSCursor(MockedConnection) try: # function call - execute_query(cursor, "SELECT * from Test", None, MockedConnection) + cursor.execute("SELECT * FROM test") except pymysql.err.OperationalError: pass # verify that we backoff for 5 times - self.assertEquals(cursor.execute.call_count, 5) - - def test_timeout_error_not_occurred(self, mocked_sleep): - - # create mock class of cursor - cursor = mock.MagicMock() - # raise any error other than timeout error for "cursor.execute" - cursor.execute.side_effect = pymysql.err.OperationalError(2003, 'Can\'t connect to MySQL server on \'localhost\' (111)') - + self.assertEquals(mocked_cursor_execute.call_count, 5) + + @mock.patch("pymysql.cursors.SSCursor.execute") + def test_timeout_error_not_occurred(self, mocked_cursor_execute, mocked_parse_args, mocked_sleep): + # mock 'cursor.execute' and raise error + mocked_cursor_execute.side_effect = pymysql.err.OperationalError(2003, 'Can\'t connect to MySQL server on \'localhost\' (111)') + # add decorator on 'cursor.execute' + pymysql.cursors.SSCursor.execute = backoff_timeout_error(pymysql.cursors.SSCursor.execute) + # initialize cursor + cursor = pymysql.cursors.SSCursor(MockedConnection) try: # function call - execute_query(cursor, "SELECT * from Test", None, MockedConnection) + cursor.execute("SELECT * FROM test") except pymysql.err.OperationalError: pass # verify that we did not backoff as timeout error has not occurred - self.assertEquals(cursor.execute.call_count, 1) + self.assertEquals(mocked_cursor_execute.call_count, 1) From a3a63b72a1b8debf853aed0e5d77090b1a853153 Mon Sep 17 00:00:00 2001 From: harshpatel4_crest Date: Mon, 3 Jan 2022 18:31:36 +0530 Subject: [PATCH 7/9] updated readme file --- README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 49154ff..860fb0f 100644 --- a/README.md +++ b/README.md @@ -77,7 +77,8 @@ Create a config file containing the database connection credentials, e.g.: "host": "localhost", "port": "3306", "user": "root", - "password": "password" + "password": "password", + "request_timeout": 300 } ``` From 028ac3458020aab8128d79d3ea756965f51ccd7f Mon Sep 17 00:00:00 2001 From: harshpatel4_crest Date: Mon, 10 Jan 2022 18:55:32 +0530 Subject: [PATCH 8/9] used 1 function in giveup code during backoff --- tap_mysql/sync_strategies/common.py | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/tap_mysql/sync_strategies/common.py b/tap_mysql/sync_strategies/common.py index be86e69..791138c 100644 --- a/tap_mysql/sync_strategies/common.py +++ b/tap_mysql/sync_strategies/common.py @@ -50,19 +50,18 @@ def monkey_patch_date(date_str): #-------------------------------------------------------------------------------------------- # boolean function to check if the error is 'timeout' error or not -def is_timeout_error(): +def is_timeout_error(error_raise): """ This function checks whether the URLError contains 'timed out' substring and return boolean values accordingly, to decide whether to backoff or not. """ - def gen_fn(exc): - if str(exc).__contains__('timed out'): - # retry if the error string contains 'timed out' - return False - return True - - return gen_fn + # retry if the error string contains 'timed out' + if str(error_raise).__contains__('timed out'): + return False + return True +# as pymysql is closing the connection on encountering the error hence, getting +# the mysql connection from args and calling 'ping' with 'reconnect=True' to reconnect def reconnect(details): # get connection as 1st param will be 'self' and reconnect connection = details.get("args")[0].connection @@ -71,7 +70,7 @@ def reconnect(details): def backoff_timeout_error(fnc): @backoff.on_exception(backoff.expo, (pymysql.err.OperationalError), - giveup=is_timeout_error(), + giveup=is_timeout_error, on_backoff=reconnect, max_tries=5, factor=2) From 480f6b40cd4abe59af2af43b6ef3efbba07671db Mon Sep 17 00:00:00 2001 From: harshpatel4_crest Date: Mon, 10 Jan 2022 18:59:21 +0530 Subject: [PATCH 9/9] resolved typo --- tap_mysql/connection.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tap_mysql/connection.py b/tap_mysql/connection.py index 3ee8e20..f916894 100644 --- a/tap_mysql/connection.py +++ b/tap_mysql/connection.py @@ -32,7 +32,7 @@ def get_request_timeout(): # return default value if timeout from config is "0" and integer casted value of valid value return int(config_request_timeout) if int(config_request_timeout) else READ_TIMEOUT_SECONDS - # raise Exception as MySql dose not support float values + # raise Exception as MySql does not support float values # Document: https://dev.mysql.com/doc/refman/8.0/en/server-system-variables.html#sysvar_net_read_timeout raise Exception("Unsupported value of timeout, please use string or integer type values.")