Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: add psycopg3 pool #298

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ on:
jobs:
build:

runs-on: ubuntu-latest
runs-on: ubuntu-22.04
strategy:
max-parallel: 4
matrix:
Expand Down
2 changes: 2 additions & 0 deletions CONTRIBUTORS.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Contributors

* @rudyryk | Alexey Kinev <[email protected]>
* @kalombos | Nikolay Gorshkov <[email protected]>
* @akerlay | Kirill Mineev
* @mrbox | Jakub Paczkowski
* @CyberROFL | Ilnaz Nizametdinov
* @insolite | Oleg
Expand Down
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,18 @@ http://peewee-async-lib.readthedocs.io
Install
-------

Install with `pip` for PostgreSQL:
Install with `pip` for PostgreSQL aiopg backend:

```bash
pip install peewee-async[postgresql]
```

or for PostgreSQL psycopg3 backend:

```bash
pip install peewee-async[psycopg]
```

or for MySQL:

```bash
Expand Down
3 changes: 3 additions & 0 deletions docs/peewee_async/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ Databases

.. automethod:: peewee_async.databases.AioDatabase.aio_atomic

.. autoclass:: peewee_async.PsycopgDatabase
:members: init

.. autoclass:: peewee_async.PooledPostgresqlDatabase
:members: init

Expand Down
3 changes: 3 additions & 0 deletions peewee_async/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
PooledPostgresqlDatabase,
PooledPostgresqlExtDatabase,
PooledMySQLDatabase,
PsycopgDatabase,
)
from .pool import PostgresqlPoolBackend, MysqlPoolBackend
from .transactions import Transaction
Expand All @@ -43,4 +44,6 @@

register_database(PooledPostgresqlDatabase, 'postgres+pool+async', 'postgresql+pool+async')
register_database(PooledPostgresqlExtDatabase, 'postgresext+pool+async', 'postgresqlext+pool+async')
register_database(PsycopgDatabase, 'postgres+psycopg+pool+async', 'postgres+psycopg+pool+async')
register_database(PooledMySQLDatabase, 'mysql+pool+async')

2 changes: 1 addition & 1 deletion peewee_async/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,5 @@ async def __aexit__(
) -> None:
if self.resuing_connection is False:
if self.connection_context is not None:
self.pool_backend.release(self.connection_context.connection)
await self.pool_backend.release(self.connection_context.connection)
connection_context.set(None)
42 changes: 32 additions & 10 deletions peewee_async/databases.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import contextlib
import logging
import warnings
from typing import Type, Optional, Any, AsyncIterator, Iterator, Dict, List

import peewee
from playhouse import postgres_ext as ext

from .connection import connection_context, ConnectionContextManager
from .pool import PoolBackend, PostgresqlPoolBackend, MysqlPoolBackend
from .pool import PoolBackend, PostgresqlPoolBackend, MysqlPoolBackend, PsycopgPoolBackend
from .transactions import Transaction
from .utils import aiopg, aiomysql, __log__, FetchResults
from .utils import aiopg, aiomysql, psycopg, __log__, FetchResults


class AioDatabase(peewee.Database):
Expand Down Expand Up @@ -46,12 +47,17 @@ def init_pool_params_defaults(self) -> None:

def init_pool_params(self) -> None:
self.init_pool_params_defaults()
self.pool_params.update(
{
"minsize": self.connect_params.pop("min_connections", 1),
"maxsize": self.connect_params.pop("max_connections", 20),
}
)
if "min_connections" in self.connect_params or "max_connections" in self.connect_params:
warnings.warn(
"`min_connections` and `max_connections` are deprecated, use `pool_params` instead.",
DeprecationWarning
)
self.pool_params.update(
{
"minsize": self.connect_params.pop("min_connections", 1),
"maxsize": self.connect_params.pop("max_connections", 20),
}
)
pool_params = self.connect_params.pop('pool_params', {})
self.pool_params.update(pool_params)
self.pool_params.update(self.connect_params)
Expand Down Expand Up @@ -178,9 +184,25 @@ async def aio_execute(self, query: Any, fetch_results: Optional[FetchResults] =
return await self.aio_execute_sql(sql, params, fetch_results=fetch_results)


class PsycopgDatabase(AioDatabase, peewee.PostgresqlDatabase):
"""Extension for `peewee.PostgresqlDatabase` providing extra methods
for managing async connection based on psycopg3 pool backend.

See also:
https://peewee.readthedocs.io/en/latest/peewee/api.html#PostgresqlDatabase
"""

pool_backend_cls = PsycopgPoolBackend

def init(self, database: Optional[str], **kwargs: Any) -> None:
if not psycopg:
raise Exception("Error, psycopg is not installed!")
super().init(database, **kwargs)


class PooledPostgresqlDatabase(AioDatabase, peewee.PostgresqlDatabase):
"""Extension for `peewee.PostgresqlDatabase` providing extra methods
for managing async connection.
for managing async connection based on aiopg pool backend.

See also:
https://peewee.readthedocs.io/en/latest/peewee/api.html#PostgresqlDatabase
Expand All @@ -202,7 +224,7 @@ class PooledPostgresqlExtDatabase(
ext.PostgresqlExtDatabase
):
"""PosgtreSQL database extended driver providing **single drop-in sync**
connection and **async connections pool** interface.
connection and **async connections pool** interface based on aiopg pool backend.

JSON fields support is enabled by default, HStore supports is disabled by
default, but can be enabled through pool_params or with ``register_hstore=False`` argument.
Expand Down
97 changes: 82 additions & 15 deletions peewee_async/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@
import asyncio
from typing import Any, Optional, cast

from .utils import aiopg, aiomysql, PoolProtocol, ConnectionProtocol
from .utils import aiopg, aiomysql, ConnectionProtocol, format_dsn, psycopg, psycopg_pool


class PoolBackend(metaclass=abc.ABCMeta):
"""Asynchronous database connection pool.
"""

def __init__(self, *, database: str, **kwargs: Any) -> None:
self.pool: Optional[PoolProtocol] = None
self.pool: Optional[Any] = None
self.database = database
self.connect_params = kwargs
self._connection_lock = asyncio.Lock()
Expand All @@ -21,6 +21,16 @@ def is_connected(self) -> bool:
return self.pool.closed is False
return False

@property
def min_size(self) -> int:
assert self.pool is not None, "Pool is not connected"
return cast(int, self.pool.minsize)

@property
def max_size(self) -> int:
assert self.pool is not None, "Pool is not connected"
return cast(int, self.pool.maxsize)

def has_acquired_connections(self) -> bool:
if self.pool is not None:
return len(self.pool._used) > 0
Expand All @@ -37,9 +47,9 @@ async def acquire(self) -> ConnectionProtocol:
if self.pool is None:
await self.connect()
assert self.pool is not None, "Pool is not connected"
return await self.pool.acquire()
return cast(ConnectionProtocol, await self.pool.acquire())

def release(self, conn: ConnectionProtocol) -> None:
async def release(self, conn: ConnectionProtocol) -> None:
"""Release connection to pool.
"""
assert self.pool is not None, "Pool is not connected"
Expand Down Expand Up @@ -68,14 +78,74 @@ async def create(self) -> None:
"""
if "connect_timeout" in self.connect_params:
self.connect_params['timeout'] = self.connect_params.pop("connect_timeout")
self.pool = cast(
PoolProtocol,
await aiopg.create_pool(
database=self.database,
**self.connect_params
)
self.pool = await aiopg.create_pool(
database=self.database,
**self.connect_params
)


class PsycopgPoolBackend(PoolBackend):
"""Asynchronous database connection pool based on psycopg + psycopg_pool libraries.
"""

async def create(self) -> None:
"""Create connection pool asynchronously.
"""
params = self.connect_params.copy()
pool = psycopg_pool.AsyncConnectionPool(
format_dsn(
'postgresql',
host=params.pop('host'),
port=params.pop('port'),
user=params.pop('user'),
password=params.pop('password'),
path=self.database,
),
kwargs={
'cursor_factory': psycopg.AsyncClientCursor,
'autocommit': True,
},
**params,
)
kalombos marked this conversation as resolved.
Show resolved Hide resolved

await pool.open()
self.pool = pool

def has_acquired_connections(self) -> bool:
kalombos marked this conversation as resolved.
Show resolved Hide resolved
if self.pool is not None:
return bool(self.pool._nconns - len(self.pool._pool) > 0)
return False

async def acquire(self) -> ConnectionProtocol:
"""Acquire connection from pool.
"""
if self.pool is None:
await self.connect()
assert self.pool is not None, "Pool is not connected"
return cast(ConnectionProtocol, await self.pool.getconn())

async def release(self, conn: ConnectionProtocol) -> None:
"""Release connection to pool.
"""
assert self.pool is not None, "Pool is not connected"
await self.pool.putconn(conn)

async def terminate(self) -> None:
"""Terminate all pool connections.
"""
if self.pool is not None:
await self.pool.close()

@property
def min_size(self) -> int:
assert self.pool is not None, "Pool is not connected"
return cast(int, self.pool.min_size)

@property
def max_size(self) -> int:
assert self.pool is not None, "Pool is not connected"
return cast(int, self.pool.max_size)


class MysqlPoolBackend(PoolBackend):
"""Asynchronous database connection pool.
Expand All @@ -84,9 +154,6 @@ class MysqlPoolBackend(PoolBackend):
async def create(self) -> None:
"""Create connection pool asynchronously.
"""
self.pool = cast(
PoolProtocol,
await aiomysql.create_pool(
db=self.database, **self.connect_params
),
self.pool = await aiomysql.create_pool(
db=self.database, **self.connect_params
)
32 changes: 11 additions & 21 deletions peewee_async/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from typing import Any, Protocol, Optional, Sequence, Set, AsyncContextManager, List, Callable, Awaitable
from typing import Any, Protocol, Optional, Sequence, Set, AsyncContextManager, List, Callable, Awaitable, Union

try:
import aiopg
Expand All @@ -8,6 +8,13 @@
aiopg = None # type: ignore
psycopg2 = None

try:
import psycopg
import psycopg_pool
except ImportError:
psycopg = None # type: ignore
psycopg_pool = None # type: ignore

try:
import aiomysql
import pymysql
Expand Down Expand Up @@ -50,25 +57,8 @@ def cursor(
...


class PoolProtocol(Protocol):

_used: Set[ConnectionProtocol]

@property
def closed(self) -> bool:
...

async def acquire(self) -> ConnectionProtocol:
...

def release(self, conn: ConnectionProtocol) -> None:
...

def terminate(self) -> None:
...

async def wait_closed(self) -> None:
...
FetchResults = Callable[[CursorProtocol], Awaitable[Any]]


FetchResults = Callable[[CursorProtocol], Awaitable[Any]]
def format_dsn(protocol: str, host: str, port: Union[str, int], user: str, password: str, path: str = '') -> str:
return f'{protocol}://{user}:{password}@{host}:{port}/{path}'
5 changes: 4 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@ sphinx = { version = "^7.1.2", optional = true }
sphinx-rtd-theme = { version = "^1.3.0rc1", optional = true }
mypy = { version = "^1.10.1", optional = true }
types-PyMySQL = { version = "^1.1.0.20240524", optional = true }
psycopg = { version = "^3.2.0", optional = true }
psycopg-pool = { version = "^3.2.0", optional = true }

[tool.poetry.extras]
postgresql = ["aiopg"]
mysql = ["aiomysql", "cryptography"]
develop = ["aiopg", "aiomysql", "cryptography", "pytest", "pytest-asyncio", "pytest-mock", "mypy", "types-PyMySQL"]
develop = ["aiopg", "aiomysql", "cryptography", "pytest", "pytest-asyncio", "pytest-mock", "mypy", "types-PyMySQL", "psycopg", "psycopg-pool"]
docs = ["aiopg", "aiomysql", "cryptography", "sphinx", "sphinx-rtd-theme"]
psycopg = ["psycopg", "psycopg-pool"]

[build-system]
requires = ["poetry-core"]
Expand Down
7 changes: 5 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from peewee import sort_models

from peewee_async.databases import AioDatabase
from peewee_async.utils import aiopg, aiomysql
from peewee_async.utils import aiopg, aiomysql, psycopg
from tests.db_config import DB_CLASSES, DB_DEFAULTS
from tests.models import ALL_MODELS

Expand Down Expand Up @@ -38,6 +38,8 @@ async def db(request: pytest.FixtureRequest) -> AsyncGenerator[AioDatabase, None
pytest.skip("aiopg is not installed")
if db.startswith('mysql') and aiomysql is None:
pytest.skip("aiomysql is not installed")
if db.startswith('psycopg') and psycopg is None:
pytest.skip("psycopg is not installed")

params = DB_DEFAULTS[db]
database = DB_CLASSES[db](**params)
Expand All @@ -59,7 +61,8 @@ async def db(request: pytest.FixtureRequest) -> AsyncGenerator[AioDatabase, None

PG_DBS = [
"postgres-pool",
"postgres-pool-ext"
"postgres-pool-ext",
"psycopg-pool",
]

MYSQL_DBS = ["mysql-pool"]
Expand Down
Loading
Loading