Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backported explicit session-SQL support from pipelinewise-tap-mysql #153

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ mysql> select * from example_db.animals;

### Create the configuration file

Create a config file containing the database connection credentials, e.g.:
Create a config file containing the database connection credentials. The required parameters are the same basic configuration properties used by the MySQL command-line client (`mysql`):

```json
{
Expand All @@ -81,8 +81,17 @@ Create a config file containing the database connection credentials, e.g.:
}
```

These are the same basic configuration properties used by the MySQL command-line
client (`mysql`).


List of config parameters:

| Parameter | type | required | default | description |
|--------------|-------------------------------|----------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------|
| host | string | Yes | - | mysql/mariadb host |
| port | int | Yes | - | mysql/mariadb port |
| user | string | Yes | - | db username |
| password | string | Yes | - | db password |
| session_sqls | List of strings | No | ```['SET @@session.time_zone="+0:00"', 'SET @@session.wait_timeout=28800', 'SET @@session.net_read_timeout=3600', 'SET @@session.innodb_lock_wait_timeout=3600']``` | Set session variables dynamically. |

### Discovery mode

Expand Down
59 changes: 25 additions & 34 deletions tap_mysql/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@
LOGGER = singer.get_logger()

CONNECT_TIMEOUT_SECONDS = 30
READ_TIMEOUT_SECONDS = 3600
DEFAULT_SESSION_SQLS = ['SET @@session.time_zone="+0:00"',
'SET @@session.wait_timeout=28800',
'SET @@session.net_read_timeout=3600',
'SET @@session.innodb_lock_wait_timeout=3600']

# We need to hold onto this for self-signed SSL
match_hostname = ssl.match_hostname
Expand All @@ -22,40 +25,28 @@
factor=2)
def connect_with_backoff(connection):
connection.connect()
run_session_sqls(connection)
return connection

def run_session_sqls(connection):
session_sqls = connection.session_sqls

warnings = []
if session_sqls and isinstance(session_sqls, list):
for sql in session_sqls:
try:
run_sql(connection, sql)
except pymysql.err.InternalError as exc:
warnings.append(f'Could not set session variable `{sql}`: {exc}')

if warnings:
LOGGER.warning('Encountered non-fatal errors when configuring session that could impact performance:')
for warning in warnings:
LOGGER.warning(warning)

def run_sql(connection, sql):
with connection.cursor() as cur:
try:
cur.execute('SET @@session.time_zone="+0:00"')
except pymysql.err.InternalError as e:
warnings.append('Could not set session.time_zone. Error: ({}) {}'.format(*e.args))

try:
cur.execute('SET @@session.wait_timeout=2700')
except pymysql.err.InternalError as e:
warnings.append('Could not set session.wait_timeout. Error: ({}) {}'.format(*e.args))

try:
cur.execute("SET @@session.net_read_timeout={}".format(READ_TIMEOUT_SECONDS))
except pymysql.err.InternalError as e:
warnings.append('Could not set session.net_read_timeout. Error: ({}) {}'.format(*e.args))


try:
cur.execute('SET @@session.innodb_lock_wait_timeout=2700')
except pymysql.err.InternalError as e:
warnings.append(
'Could not set session.innodb_lock_wait_timeout. Error: ({}) {}'.format(*e.args)
)

if warnings:
LOGGER.info(("Encountered non-fatal errors when configuring MySQL session that could "
"impact performance:"))
for w in warnings:
LOGGER.warning(w)

return connection

cur.execute(sql)

def parse_internal_hostname(hostname):
# special handling for google cloud
Expand Down Expand Up @@ -91,7 +82,6 @@ def __init__(self, config):
"port": int(config["port"]),
"cursorclass": config.get("cursorclass") or pymysql.cursors.SSCursor,
"connect_timeout": CONNECT_TIMEOUT_SECONDS,
"read_timeout": READ_TIMEOUT_SECONDS,
"charset": "utf8",
}

Expand Down Expand Up @@ -123,7 +113,7 @@ def __init__(self, config):
if config.get("internal_hostname"):
parsed_hostname = parse_internal_hostname(config["internal_hostname"])
ssl.match_hostname = lambda cert, hostname: match_hostname(cert, parsed_hostname)

super().__init__(defer_connect=True, ssl=ssl_arg, **args)

# Configure SSL without custom CA
Expand All @@ -142,6 +132,7 @@ def __init__(self, config):
self.ctx.verify_mode = ssl.CERT_REQUIRED if verify_mode else ssl.CERT_NONE
self.client_flag |= CLIENT.SSL

self.session_sqls = config.get("session_sqls", DEFAULT_SESSION_SQLS)

def __enter__(self):
return self
Expand Down
75 changes: 74 additions & 1 deletion tests/nosetests/test_tap_mysql.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import unittest
from unittest.mock import patch
import pymysql
import tap_mysql
import copy
import singer
import os
import singer.metadata
from tap_mysql.connection import connect_with_backoff
from tap_mysql.connection import connect_with_backoff, MySQLConnection

try:
import tests.utils as test_utils
Expand Down Expand Up @@ -928,6 +929,78 @@ def runTest(self):

self.assertEqual(primary_keys, {'good_pk_tab': ['good_pk'], 'bad_pk_tab': []})

class MySQLConnectionMock(MySQLConnection):
"""
Mocked MySQLConnection class
"""
def __init__(self, config):
super().__init__(config)

self.executed_queries = []

def run_sql(self, sql):
self.executed_queries.append(sql)

class TestSessionSqls(unittest.TestCase):

def setUp(self) -> None:
self.executed_queries = []

def run_sql_mock(self, connection, sql):
if sql.startswith('INVALID-SQL'):
raise pymysql.err.InternalError

self.executed_queries.append(sql)

def test_open_connections_with_default_session_sqls(self):
"""Default session parameters should be applied if no custom session SQLs"""
with patch('tap_mysql.connection.MySQLConnection.connect'):
with patch('tap_mysql.connection.run_sql') as run_sql_mock:
run_sql_mock.side_effect = self.run_sql_mock
conn = MySQLConnectionMock(config=test_utils.get_db_config())
connect_with_backoff(conn)

# Test if session variables applied on connection
self.assertEqual(self.executed_queries, tap_mysql.connection.DEFAULT_SESSION_SQLS)

def test_open_connections_with_session_sqls(self):
"""Custom session parameters should be applied if defined"""
session_sqls = [
'SET SESSION max_statement_time=0',
'SET SESSION wait_timeout=28800'
]

with patch('tap_mysql.connection.MySQLConnection.connect'):
with patch('tap_mysql.connection.run_sql') as run_sql_mock:
run_sql_mock.side_effect = self.run_sql_mock
conn = MySQLConnectionMock(config={**test_utils.get_db_config(),
**{'session_sqls': session_sqls}})
connect_with_backoff(conn)

# Test if session variables applied on connection
self.assertEqual(self.executed_queries, session_sqls)

def test_open_connections_with_invalid_session_sqls(self):
"""Invalid SQLs in session_sqls should be ignored"""
session_sqls = [
'SET SESSION max_statement_time=0',
'INVALID-SQL-SHOULD-BE-SILENTLY-IGNORED',
'SET SESSION wait_timeout=28800'
]

with patch('tap_mysql.connection.MySQLConnection.connect'):
with patch('tap_mysql.connection.run_sql') as run_sql_mock:
run_sql_mock.side_effect = self.run_sql_mock
conn = MySQLConnectionMock(config={**test_utils.get_db_config(),
**{'session_sqls': session_sqls}})
connect_with_backoff(conn)

# Test if session variables applied on connection
self.assertEqual(self.executed_queries, ['SET SESSION max_statement_time=0',
'SET SESSION wait_timeout=28800'])




if __name__== "__main__":
test1 = TestBinlogReplication()
Expand Down
6 changes: 4 additions & 2 deletions tests/nosetests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def get_db_config():
return config


def get_test_connection():
def get_test_connection(extra_config=None):
db_config = get_db_config()

con = pymysql.connect(**db_config)
Expand All @@ -38,7 +38,9 @@ def get_test_connection():
db_config['database'] = DB_NAME
db_config['autocommit'] = True

mysql_conn = MySQLConnection(db_config)
if not extra_config:
extra_config = {}
mysql_conn = MySQLConnection({**db_config, **extra_config})
mysql_conn.autocommit_mode = True

return mysql_conn
Expand Down