Skip to content
This repository has been archived by the owner on Nov 6, 2023. It is now read-only.

Commit

Permalink
feat: postgres refactoring (#48)
Browse files Browse the repository at this point in the history
* feat(postgres): adding repository
* feat(postgres): adding dataclass to models
* feat(postgres): adding MappingException
* feat(postgres): add primary keys
* chore: black formatting 

Co-authored-by: akolesnik <[email protected]>
Co-authored-by: Pavel Mackarichev <[email protected]>
  • Loading branch information
3 people authored Jul 21, 2022
1 parent 4321341 commit 7feb6c5
Show file tree
Hide file tree
Showing 15 changed files with 461 additions and 315 deletions.
1 change: 1 addition & 0 deletions odd_collector/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@


try:

loop = asyncio.get_event_loop()

cur_dirname = os.path.dirname(os.path.realpath(__file__))
Expand Down
Empty file.
1 change: 0 additions & 1 deletion odd_collector/adapters/postgresql/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +0,0 @@
__version__ = "0.1.0"
88 changes: 16 additions & 72 deletions odd_collector/adapters/postgresql/adapter.py
Original file line number Diff line number Diff line change
@@ -1,97 +1,41 @@
import logging

import psycopg2
from odd_models.models import DataEntity, DataEntityType, DataEntityList
from oddrn_generator import PostgresqlGenerator
from psycopg2 import sql

from typing import List

from odd_collector_sdk.domain.adapter import AbstractAdapter
from odd_models.models import DataEntity, DataEntityList
from oddrn_generator import PostgresqlGenerator


from .logger import logger
from .mappers.tables import map_table
from .mappers import _table_select, _column_select

from typing import List
from .repository import PostgreSQLRepository


class Adapter(AbstractAdapter):
__connection = None
__cursor = None

def __init__(self, config) -> None:
self.__host = config.host
self.__port = config.port
self.__database = config.database
self.__user = config.user
self.__password = config.password
self.__repository = PostgreSQLRepository(config)
self.__oddrn_generator = PostgresqlGenerator(
host_settings=f"{self.__host}", databases=self.__database
host_settings=f"{config.host}", databases=self.__database
)

def get_data_source_oddrn(self) -> str:
return self.__oddrn_generator.get_data_source_oddrn()

def get_data_entities(self) -> List[DataEntity]:
try:
self.__connect()

tables = self.__execute(_table_select)
columns = self.__execute(_column_select)
tables = self.__repository.get_tables()
columns = self.__repository.get_columns()
primary_keys = self.__repository.get_primary_keys()

return map_table(self.__oddrn_generator, tables, columns, self.__database)
except Exception as e:
logging.error("Failed to load metadata for tables")
logging.exception(e)
finally:
self.__disconnect()
return []
return map_table(
self.__oddrn_generator, tables, columns, primary_keys, self.__database
)
except Exception:
logger.error("Failed to load metadata for tables", exc_info=True)
return []

def get_data_entity_list(self) -> DataEntityList:
return DataEntityList(
data_source_oddrn=self.get_data_source_oddrn(),
items=(self.get_data_entities()),
)

def __execute(self, query: str) -> List[tuple]:
self.__cursor.execute(query)
records = self.__cursor.fetchall()
return records

def __execute_sql(self, query: sql.Composed) -> List[tuple]:
self.__cursor.execute(query)
records = self.__cursor.fetchall()
return records

def __connect(self):
try:
self.__connection = psycopg2.connect(
dbname=self.__database,
user=self.__user,
password=self.__password,
host=self.__host,
port=self.__port,
)
self.__cursor = self.__connection.cursor()

except psycopg2.Error as err:
logging.error(err)
raise DBException("Database error")
return

def __disconnect(self):
try:
if self.__cursor:
self.__cursor.close()
except Exception:
pass
try:
if self.__connection:
self.__connection.close()
except Exception:
pass
return


class DBException(Exception):
pass
32 changes: 32 additions & 0 deletions odd_collector/adapters/postgresql/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
_METADATA_SCHEMA_URL_PREFIX: str = (
"https://raw.githubusercontent.com/opendatadiscovery/opendatadiscovery-specification/main/specification/"
"extensions/postgresql.json#/definitions/Postgresql"
)

_data_set_metadata_schema_url: str = _METADATA_SCHEMA_URL_PREFIX + "DataSetExtension"
_data_set_field_metadata_schema_url: str = (
_METADATA_SCHEMA_URL_PREFIX + "DataSetFieldExtension"
)

_data_set_metadata_excluded_keys: set = {
"table_catalog",
"table_schema",
"table_name",
"table_type",
"view_definition",
"table_owner",
"table_rows",
"description",
}


_data_set_field_metadata_excluded_keys: set = {
"table_catalog",
"table_schema",
"table_name",
"column_name",
"column_default",
"is_nullable",
"data_type",
"description",
}
58 changes: 58 additions & 0 deletions odd_collector/adapters/postgresql/connectors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import contextlib
import logging
from abc import ABC, abstractmethod

import psycopg2
from odd_collector.adapters.postgresql.exceptions import DbPostgreSQLException


class AbstractConnector(ABC): # TODO: Create one abstract connector for all adapters
@abstractmethod
def connection(self):
pass


class PostgreSQLConnector(AbstractConnector):
__connection = None
__cursor = None

def __init__(self, config) -> None:
self.__host = config.host
self.__port = config.port
self.__database = config.database
self.__user = config.user
self.__password = config.password

@contextlib.contextmanager
def connection(self):
self.__connect()
yield self.__cursor
self.__disconnect()

def __connect(self):
try:
self.__connection = psycopg2.connect(
dbname=self.__database,
user=self.__user,
password=self.__password,
host=self.__host,
port=self.__port,
)
self.__cursor = self.__connection.cursor()
except psycopg2.Error as e:
logging.error("Error in __connect method", exc_info=True)
raise DbPostgreSQLException(
"Database error. Troubles with connecting"
) from e

def __disconnect(self) -> None:
try:
if self.__cursor:
self.__cursor.close()
if self.__connection:
self.__connection.close()
except (psycopg2.OperationalError, psycopg2.InternalError) as e:
logging.error("Error in disconnecting from database", exc_info=True)
raise DbPostgreSQLException(
"Database error. Troubles with disconnecting"
) from e
10 changes: 10 additions & 0 deletions odd_collector/adapters/postgresql/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
class DbPostgreSQLException(Exception):
pass


class TablePostgreSQLException(Exception):
pass


class MappingException(Exception):
pass
3 changes: 3 additions & 0 deletions odd_collector/adapters/postgresql/logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import logging

logger = logging.getLogger("postgres")
153 changes: 0 additions & 153 deletions odd_collector/adapters/postgresql/mappers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,153 +0,0 @@
from collections import namedtuple

_METADATA_SCHEMA_URL_PREFIX: str = (
"https://raw.githubusercontent.com/opendatadiscovery/opendatadiscovery-specification/main/specification/"
"extensions/postgresql.json#/definitions/Postgresql"
)

_data_set_metadata_schema_url: str = _METADATA_SCHEMA_URL_PREFIX + "DataSetExtension"
_data_set_field_metadata_schema_url: str = (
_METADATA_SCHEMA_URL_PREFIX + "DataSetFieldExtension"
)

_data_set_metadata_excluded_keys: set = {
"table_catalog",
"table_schema",
"table_name",
"table_type",
"view_definition",
"table_owner",
"table_rows",
"description",
}

_table_metadata: str = (
"table_catalog, table_schema, table_name, table_type, self_referencing_column_name, "
"reference_generation, user_defined_type_catalog, user_defined_type_schema, user_defined_type_name, "
"is_insertable_into, is_typed, commit_action, "
"view_definition, view_check_option, view_is_updatable, view_is_insertable_into, "
"view_is_trigger_updatable, view_is_trigger_deletable, view_is_trigger_insertable_into, "
"table_owner, table_rows, description"
)

_table_select = """
select it.table_catalog
, it.table_schema
, it.table_name
, it.table_type
, it.self_referencing_column_name
, it.reference_generation
, it.user_defined_type_catalog
, it.user_defined_type_schema
, it.user_defined_type_name
, it.is_insertable_into
, it.is_typed
, it.commit_action
, iw.view_definition
, iw.check_option as view_check_option
, iw.is_updatable as view_is_updatable
, iw.is_insertable_into as view_is_insertable_into
, iw.is_trigger_updatable as view_is_trigger_updatable
, iw.is_trigger_deletable as view_is_trigger_deletable
, iw.is_trigger_insertable_into as view_is_trigger_insertable_into
, pg_catalog.pg_get_userbyid(c.relowner) as table_owner
, c.reltuples as table_rows
, pg_catalog.obj_description(c.oid) as description
from pg_catalog.pg_class c
join pg_catalog.pg_namespace n on n.oid = c.relnamespace
join information_schema.tables it on it.table_schema = n.nspname and it.table_name = c.relname
left join information_schema.views iw on iw.table_schema = n.nspname and iw.table_name = c.relname
where c.relkind in ('r', 'v')
and n.nspname not like 'pg_temp_%'
and n.nspname not in ('pg_toast', 'pg_internal', 'catalog_history', 'pg_catalog', 'information_schema')
order by n.nspname, c.relname
"""

_data_set_field_metadata_excluded_keys: set = {
"table_catalog",
"table_schema",
"table_name",
"column_name",
"column_default",
"is_nullable",
"data_type",
"description",
}

_column_metadata: str = (
"table_catalog, table_schema, table_name, column_name, ordinal_position, column_default, is_nullable, "
"data_type, character_maximum_length, character_octet_length, "
"numeric_precision, numeric_precision_radix, numeric_scale, "
"datetime_precision, interval_type, interval_precision, "
"character_set_catalog, character_set_schema, character_set_name, "
"collation_catalog, collation_schema, collation_name, "
"domain_catalog, domain_schema, domain_name, "
"udt_catalog, udt_schema, udt_name, "
"scope_catalog, scope_schema, scope_name, "
"maximum_cardinality, dtd_identifier, is_self_referencing, "
"is_identity, "
"identity_generation, identity_start, identity_increment, identity_maximum, identity_minimum, identity_cycle, "
"is_generated, generation_expression, is_updatable, "
"description"
)

_column_select: str = """
select ic.table_catalog
, ic.table_schema
, ic.table_name
, ic.column_name
, ic.ordinal_position
, ic.column_default
, ic.is_nullable
, ic.data_type
, ic.character_maximum_length
, ic.character_octet_length
, ic.numeric_precision
, ic.numeric_precision_radix
, ic.numeric_scale
, ic.datetime_precision
, ic.interval_type
, ic.interval_precision
, ic.character_set_catalog
, ic.character_set_schema
, ic.character_set_name
, ic.collation_catalog
, ic.collation_schema
, ic.collation_name
, ic.domain_catalog
, ic.domain_schema
, ic.domain_name
, ic.udt_catalog
, ic.udt_schema
, ic.udt_name
, ic.scope_catalog
, ic.scope_schema
, ic.scope_name
, ic.maximum_cardinality
, ic.dtd_identifier
, ic.is_self_referencing
, ic.is_identity
, ic.identity_generation
, ic.identity_start
, ic.identity_increment
, ic.identity_maximum
, ic.identity_minimum
, ic.identity_cycle
, ic.is_generated
, ic.generation_expression
, ic.is_updatable
, pg_catalog.col_description(c.oid, a.attnum) as description
from pg_catalog.pg_attribute a
join pg_catalog.pg_class c on c.oid = a.attrelid
join pg_catalog.pg_namespace n on n.oid = c.relnamespace
join information_schema.columns ic on ic.table_schema = n.nspname and ic.table_name = c.relname and
ic.ordinal_position = a.attnum
where c.relkind in ('r', 'v')
and a.attnum > 0
and n.nspname not like 'pg_temp_%'
and n.nspname not in ('pg_toast', 'pg_internal', 'catalog_history', 'pg_catalog', 'information_schema')
order by n.nspname, c.relname, a.attnum
"""

MetadataNamedtuple = namedtuple("MetadataNamedtuple", _table_metadata)
ColumnMetadataNamedtuple = namedtuple("ColumnMetadataNamedtuple", _column_metadata)
Loading

0 comments on commit 7feb6c5

Please sign in to comment.