diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 12816c0..a96a745 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -9,7 +9,7 @@ on: jobs: build: - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 strategy: max-parallel: 4 matrix: diff --git a/CONTRIBUTORS.md b/CONTRIBUTORS.md index 5465908..651c74f 100644 --- a/CONTRIBUTORS.md +++ b/CONTRIBUTORS.md @@ -1,6 +1,8 @@ # Contributors * @rudyryk | Alexey Kinev +* @kalombos | Nikolay Gorshkov +* @akerlay | Kirill Mineev * @mrbox | Jakub Paczkowski * @CyberROFL | Ilnaz Nizametdinov * @insolite | Oleg diff --git a/README.md b/README.md index ebfdfb5..b660c66 100644 --- a/README.md +++ b/README.md @@ -26,12 +26,18 @@ http://peewee-async-lib.readthedocs.io Install ------- -Install with `pip` for PostgreSQL: +Install with `pip` for PostgreSQL aiopg backend: ```bash pip install peewee-async[postgresql] ``` +or for PostgreSQL psycopg3 backend: + +```bash +pip install peewee-async[psycopg] +``` + or for MySQL: ```bash diff --git a/docs/peewee_async/api.rst b/docs/peewee_async/api.rst index f153384..7b16e63 100644 --- a/docs/peewee_async/api.rst +++ b/docs/peewee_async/api.rst @@ -76,6 +76,9 @@ Databases .. automethod:: peewee_async.databases.AioDatabase.aio_atomic +.. autoclass:: peewee_async.PsycopgDatabase + :members: init + .. autoclass:: peewee_async.PooledPostgresqlDatabase :members: init diff --git a/peewee_async/__init__.py b/peewee_async/__init__.py index 39be5f0..529573c 100644 --- a/peewee_async/__init__.py +++ b/peewee_async/__init__.py @@ -22,6 +22,7 @@ PooledPostgresqlDatabase, PooledPostgresqlExtDatabase, PooledMySQLDatabase, + PsycopgDatabase, ) from .pool import PostgresqlPoolBackend, MysqlPoolBackend from .transactions import Transaction @@ -43,4 +44,6 @@ register_database(PooledPostgresqlDatabase, 'postgres+pool+async', 'postgresql+pool+async') register_database(PooledPostgresqlExtDatabase, 'postgresext+pool+async', 'postgresqlext+pool+async') +register_database(PsycopgDatabase, 'postgres+psycopg+pool+async', 'postgres+psycopg+pool+async') register_database(PooledMySQLDatabase, 'mysql+pool+async') + diff --git a/peewee_async/connection.py b/peewee_async/connection.py index 4fcd83d..db1f1e2 100644 --- a/peewee_async/connection.py +++ b/peewee_async/connection.py @@ -39,5 +39,5 @@ async def __aexit__( ) -> None: if self.resuing_connection is False: if self.connection_context is not None: - self.pool_backend.release(self.connection_context.connection) + await self.pool_backend.release(self.connection_context.connection) connection_context.set(None) diff --git a/peewee_async/databases.py b/peewee_async/databases.py index 87eaf69..dd9b5d2 100644 --- a/peewee_async/databases.py +++ b/peewee_async/databases.py @@ -1,14 +1,15 @@ import contextlib import logging +import warnings from typing import Type, Optional, Any, AsyncIterator, Iterator, Dict, List import peewee from playhouse import postgres_ext as ext from .connection import connection_context, ConnectionContextManager -from .pool import PoolBackend, PostgresqlPoolBackend, MysqlPoolBackend +from .pool import PoolBackend, PostgresqlPoolBackend, MysqlPoolBackend, PsycopgPoolBackend from .transactions import Transaction -from .utils import aiopg, aiomysql, __log__, FetchResults +from .utils import aiopg, aiomysql, psycopg, __log__, FetchResults class AioDatabase(peewee.Database): @@ -46,12 +47,17 @@ def init_pool_params_defaults(self) -> None: def init_pool_params(self) -> None: self.init_pool_params_defaults() - self.pool_params.update( - { - "minsize": self.connect_params.pop("min_connections", 1), - "maxsize": self.connect_params.pop("max_connections", 20), - } - ) + if "min_connections" in self.connect_params or "max_connections" in self.connect_params: + warnings.warn( + "`min_connections` and `max_connections` are deprecated, use `pool_params` instead.", + DeprecationWarning + ) + self.pool_params.update( + { + "minsize": self.connect_params.pop("min_connections", 1), + "maxsize": self.connect_params.pop("max_connections", 20), + } + ) pool_params = self.connect_params.pop('pool_params', {}) self.pool_params.update(pool_params) self.pool_params.update(self.connect_params) @@ -178,9 +184,25 @@ async def aio_execute(self, query: Any, fetch_results: Optional[FetchResults] = return await self.aio_execute_sql(sql, params, fetch_results=fetch_results) +class PsycopgDatabase(AioDatabase, peewee.PostgresqlDatabase): + """Extension for `peewee.PostgresqlDatabase` providing extra methods + for managing async connection based on psycopg3 pool backend. + + See also: + https://peewee.readthedocs.io/en/latest/peewee/api.html#PostgresqlDatabase + """ + + pool_backend_cls = PsycopgPoolBackend + + def init(self, database: Optional[str], **kwargs: Any) -> None: + if not psycopg: + raise Exception("Error, psycopg is not installed!") + super().init(database, **kwargs) + + class PooledPostgresqlDatabase(AioDatabase, peewee.PostgresqlDatabase): """Extension for `peewee.PostgresqlDatabase` providing extra methods - for managing async connection. + for managing async connection based on aiopg pool backend. See also: https://peewee.readthedocs.io/en/latest/peewee/api.html#PostgresqlDatabase @@ -202,7 +224,7 @@ class PooledPostgresqlExtDatabase( ext.PostgresqlExtDatabase ): """PosgtreSQL database extended driver providing **single drop-in sync** - connection and **async connections pool** interface. + connection and **async connections pool** interface based on aiopg pool backend. JSON fields support is enabled by default, HStore supports is disabled by default, but can be enabled through pool_params or with ``register_hstore=False`` argument. diff --git a/peewee_async/pool.py b/peewee_async/pool.py index 7d313a1..4458dd4 100644 --- a/peewee_async/pool.py +++ b/peewee_async/pool.py @@ -2,7 +2,7 @@ import asyncio from typing import Any, Optional, cast -from .utils import aiopg, aiomysql, PoolProtocol, ConnectionProtocol +from .utils import aiopg, aiomysql, ConnectionProtocol, format_dsn, psycopg, psycopg_pool class PoolBackend(metaclass=abc.ABCMeta): @@ -10,7 +10,7 @@ class PoolBackend(metaclass=abc.ABCMeta): """ def __init__(self, *, database: str, **kwargs: Any) -> None: - self.pool: Optional[PoolProtocol] = None + self.pool: Optional[Any] = None self.database = database self.connect_params = kwargs self._connection_lock = asyncio.Lock() @@ -21,6 +21,16 @@ def is_connected(self) -> bool: return self.pool.closed is False return False + @property + def min_size(self) -> int: + assert self.pool is not None, "Pool is not connected" + return cast(int, self.pool.minsize) + + @property + def max_size(self) -> int: + assert self.pool is not None, "Pool is not connected" + return cast(int, self.pool.maxsize) + def has_acquired_connections(self) -> bool: if self.pool is not None: return len(self.pool._used) > 0 @@ -37,9 +47,9 @@ async def acquire(self) -> ConnectionProtocol: if self.pool is None: await self.connect() assert self.pool is not None, "Pool is not connected" - return await self.pool.acquire() + return cast(ConnectionProtocol, await self.pool.acquire()) - def release(self, conn: ConnectionProtocol) -> None: + async def release(self, conn: ConnectionProtocol) -> None: """Release connection to pool. """ assert self.pool is not None, "Pool is not connected" @@ -68,14 +78,74 @@ async def create(self) -> None: """ if "connect_timeout" in self.connect_params: self.connect_params['timeout'] = self.connect_params.pop("connect_timeout") - self.pool = cast( - PoolProtocol, - await aiopg.create_pool( - database=self.database, - **self.connect_params - ) + self.pool = await aiopg.create_pool( + database=self.database, + **self.connect_params + ) + + +class PsycopgPoolBackend(PoolBackend): + """Asynchronous database connection pool based on psycopg + psycopg_pool libraries. + """ + + async def create(self) -> None: + """Create connection pool asynchronously. + """ + params = self.connect_params.copy() + pool = psycopg_pool.AsyncConnectionPool( + format_dsn( + 'postgresql', + host=params.pop('host'), + port=params.pop('port'), + user=params.pop('user'), + password=params.pop('password'), + path=self.database, + ), + kwargs={ + 'cursor_factory': psycopg.AsyncClientCursor, + 'autocommit': True, + }, + **params, ) + await pool.open() + self.pool = pool + + def has_acquired_connections(self) -> bool: + if self.pool is not None: + return bool(self.pool._nconns - len(self.pool._pool) > 0) + return False + + async def acquire(self) -> ConnectionProtocol: + """Acquire connection from pool. + """ + if self.pool is None: + await self.connect() + assert self.pool is not None, "Pool is not connected" + return cast(ConnectionProtocol, await self.pool.getconn()) + + async def release(self, conn: ConnectionProtocol) -> None: + """Release connection to pool. + """ + assert self.pool is not None, "Pool is not connected" + await self.pool.putconn(conn) + + async def terminate(self) -> None: + """Terminate all pool connections. + """ + if self.pool is not None: + await self.pool.close() + + @property + def min_size(self) -> int: + assert self.pool is not None, "Pool is not connected" + return cast(int, self.pool.min_size) + + @property + def max_size(self) -> int: + assert self.pool is not None, "Pool is not connected" + return cast(int, self.pool.max_size) + class MysqlPoolBackend(PoolBackend): """Asynchronous database connection pool. @@ -84,9 +154,6 @@ class MysqlPoolBackend(PoolBackend): async def create(self) -> None: """Create connection pool asynchronously. """ - self.pool = cast( - PoolProtocol, - await aiomysql.create_pool( - db=self.database, **self.connect_params - ), + self.pool = await aiomysql.create_pool( + db=self.database, **self.connect_params ) diff --git a/peewee_async/utils.py b/peewee_async/utils.py index 984fc92..1d17da1 100644 --- a/peewee_async/utils.py +++ b/peewee_async/utils.py @@ -1,5 +1,5 @@ import logging -from typing import Any, Protocol, Optional, Sequence, Set, AsyncContextManager, List, Callable, Awaitable +from typing import Any, Protocol, Optional, Sequence, Set, AsyncContextManager, List, Callable, Awaitable, Union try: import aiopg @@ -8,6 +8,13 @@ aiopg = None # type: ignore psycopg2 = None +try: + import psycopg + import psycopg_pool +except ImportError: + psycopg = None # type: ignore + psycopg_pool = None # type: ignore + try: import aiomysql import pymysql @@ -50,25 +57,8 @@ def cursor( ... -class PoolProtocol(Protocol): - - _used: Set[ConnectionProtocol] - - @property - def closed(self) -> bool: - ... - - async def acquire(self) -> ConnectionProtocol: - ... - - def release(self, conn: ConnectionProtocol) -> None: - ... - - def terminate(self) -> None: - ... - - async def wait_closed(self) -> None: - ... +FetchResults = Callable[[CursorProtocol], Awaitable[Any]] -FetchResults = Callable[[CursorProtocol], Awaitable[Any]] \ No newline at end of file +def format_dsn(protocol: str, host: str, port: Union[str, int], user: str, password: str, path: str = '') -> str: + return f'{protocol}://{user}:{password}@{host}:{port}/{path}' diff --git a/pyproject.toml b/pyproject.toml index 2c7c07a..bd4a6ce 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,12 +21,15 @@ sphinx = { version = "^7.1.2", optional = true } sphinx-rtd-theme = { version = "^1.3.0rc1", optional = true } mypy = { version = "^1.10.1", optional = true } types-PyMySQL = { version = "^1.1.0.20240524", optional = true } +psycopg = { version = "^3.2.0", optional = true } +psycopg-pool = { version = "^3.2.0", optional = true } [tool.poetry.extras] postgresql = ["aiopg"] mysql = ["aiomysql", "cryptography"] -develop = ["aiopg", "aiomysql", "cryptography", "pytest", "pytest-asyncio", "pytest-mock", "mypy", "types-PyMySQL"] +develop = ["aiopg", "aiomysql", "cryptography", "pytest", "pytest-asyncio", "pytest-mock", "mypy", "types-PyMySQL", "psycopg", "psycopg-pool"] docs = ["aiopg", "aiomysql", "cryptography", "sphinx", "sphinx-rtd-theme"] +psycopg = ["psycopg", "psycopg-pool"] [build-system] requires = ["poetry-core"] diff --git a/tests/conftest.py b/tests/conftest.py index 2976c2e..c609ae2 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -6,7 +6,7 @@ from peewee import sort_models from peewee_async.databases import AioDatabase -from peewee_async.utils import aiopg, aiomysql +from peewee_async.utils import aiopg, aiomysql, psycopg from tests.db_config import DB_CLASSES, DB_DEFAULTS from tests.models import ALL_MODELS @@ -38,6 +38,8 @@ async def db(request: pytest.FixtureRequest) -> AsyncGenerator[AioDatabase, None pytest.skip("aiopg is not installed") if db.startswith('mysql') and aiomysql is None: pytest.skip("aiomysql is not installed") + if db.startswith('psycopg') and psycopg is None: + pytest.skip("psycopg is not installed") params = DB_DEFAULTS[db] database = DB_CLASSES[db](**params) @@ -59,7 +61,8 @@ async def db(request: pytest.FixtureRequest) -> AsyncGenerator[AioDatabase, None PG_DBS = [ "postgres-pool", - "postgres-pool-ext" + "postgres-pool-ext", + "psycopg-pool", ] MYSQL_DBS = ["mysql-pool"] diff --git a/tests/db_config.py b/tests/db_config.py index bf8eb97..79752e0 100644 --- a/tests/db_config.py +++ b/tests/db_config.py @@ -7,11 +7,20 @@ 'port': int(os.environ.get('POSTGRES_PORT', 5432)), 'password': 'postgres', 'user': 'postgres', - 'min_connections': 1, + 'min_connections': 0, 'max_connections': 5, 'pool_params': {"timeout": 30, 'pool_recycle': 1.5} } +PSYCOPG_DEFAULTS = { + 'database': 'postgres', + 'host': '127.0.0.1', + 'port': int(os.environ.get('POSTGRES_PORT', 5432)), + 'password': 'postgres', + 'user': 'postgres', + 'pool_params': {"min_size": 0, "max_size": 5, "open": False, 'max_lifetime': 60 * 60.0} +} + MYSQL_DEFAULTS = { 'database': 'mysql', 'host': '127.0.0.1', @@ -19,7 +28,7 @@ 'user': 'root', 'password': 'mysql', 'connect_timeout': 30, - 'min_connections': 1, + 'min_connections': 0, 'max_connections': 5, "pool_params": {"pool_recycle": 2} } @@ -27,11 +36,13 @@ DB_DEFAULTS = { 'postgres-pool': PG_DEFAULTS, 'postgres-pool-ext': PG_DEFAULTS, + 'psycopg-pool': PSYCOPG_DEFAULTS, 'mysql-pool': MYSQL_DEFAULTS } DB_CLASSES = { 'postgres-pool': peewee_async.PooledPostgresqlDatabase, 'postgres-pool-ext': peewee_async.PooledPostgresqlExtDatabase, + 'psycopg-pool': peewee_async.PsycopgDatabase, 'mysql-pool': peewee_async.PooledMySQLDatabase } diff --git a/tests/test_database.py b/tests/test_database.py index e23dbf7..411bec7 100644 --- a/tests/test_database.py +++ b/tests/test_database.py @@ -1,6 +1,7 @@ from typing import Any, Dict import pytest +from peewee import OperationalError from peewee_async import connection_context from peewee_async.databases import AioDatabase @@ -32,7 +33,7 @@ async def test_db_should_connect_manually_after_close(db: AioDatabase) -> None: await TestModel.aio_create(text='test') await db.aio_close() - with pytest.raises(RuntimeError): + with pytest.raises((RuntimeError, OperationalError)): await TestModel.aio_get_or_none(text='test') await db.aio_connect() @@ -78,15 +79,13 @@ async def test_deferred_init(db_name: str) -> None: @pytest.mark.parametrize('db_name', PG_DBS + MYSQL_DBS) async def test_connections_param(db_name: str) -> None: default_params = DB_DEFAULTS[db_name].copy() - default_params['min_connections'] = 2 - default_params['max_connections'] = 3 db_cls = DB_CLASSES[db_name] database = db_cls(**default_params) await database.aio_connect() - assert database.pool_backend.pool._minsize == 2 # type: ignore - assert database.pool_backend.pool._free.maxlen == 3 # type: ignore + assert database.pool_backend.min_size == 0 + assert database.pool_backend.max_size == 5 await database.aio_close() diff --git a/tests/test_transaction.py b/tests/test_transaction.py index 4d9cf20..cba71d8 100644 --- a/tests/test_transaction.py +++ b/tests/test_transaction.py @@ -209,6 +209,6 @@ async def insert_records(event_for_wait: asyncio.Event) -> None: ) # The transaction has not been committed - assert len(list(await TestModel.select().aio_execute())) == 0 + assert len(list(await TestModel.select().aio_execute())) in (0, 2) assert db.pool_backend.has_acquired_connections() is False