diff --git a/zyte_api/aio/client.py b/zyte_api/aio/client.py index e82c832..1463c59 100644 --- a/zyte_api/aio/client.py +++ b/zyte_api/aio/client.py @@ -4,8 +4,10 @@ import asyncio import time +from base64 import b64decode +from collections.abc import Mapping from functools import partial -from typing import Optional, Iterator, List +from typing import Awaitable, Iterator, List, Optional, Union import aiohttp from aiohttp import TCPConnector @@ -16,9 +18,15 @@ from ..apikey import get_apikey from ..constants import API_URL, API_TIMEOUT from ..stats import AggStats, ResponseStats -from ..utils import user_agent +from ..utils import _to_lower_camel_case, user_agent +class _NotLoaded: + pass + + +_NOT_LOADED = _NotLoaded() + # 120 seconds is probably too long, but we are concerned about the case with # many concurrent requests and some processing logic running in the same reactor, # thus, saturating the CPU. This will make timeouts more likely. @@ -43,6 +51,38 @@ def _post_func(session): return session.post +class ExtractResult(Mapping): + """Result of a call to AsyncClient.extract. + + It can be used as a dictionary to access the raw API response. + + It also provides some helper properties for easier access to some of its + underlying data. + """ + + def __init__(self, api_response: dict): + self._api_response = api_response + self._http_response_body: Union[bytes|_NotLoaded] = _NOT_LOADED + + def __getitem__(self, key): + return self._api_response[key] + + def __iter__(self): + yield from self._api_response + + def __len__(self): + return len(self._api_response) + + @property + def http_response_body(self) -> Union[bytes|_NotLoaded]: + if self._http_response_body is _NOT_LOADED: + base64_body = self._api_response.get("httpResponseBody", None) + if base64_body is None: + raise ValueError("API response has no httpResponseBody key.") + self._http_response_body = b64decode(base64_body) + return self._http_response_body + + class AsyncClient: def __init__(self, *, api_key=None, @@ -148,3 +188,44 @@ async def _request(query): session=session) return asyncio.as_completed([_request(query) for query in queries]) + + @staticmethod + def _build_extract_query(raw_query): + return { + _to_lower_camel_case(k): v + for k, v in raw_query.items() + } + + async def extract( + self, + url: str, + *, + session: Optional[aiohttp.ClientSession] = None, + handle_retries: bool = True, + retrying: Optional[AsyncRetrying] = None, + **kwargs, + ) -> ExtractResult: + """…""" + query = self._build_extract_query({**kwargs, 'url': url}) + response = await self.request_raw( + query=query, + endpoint='extract', + session=session, + handle_retries=handle_retries, + retrying=retrying, + ) + return ExtractResult(response) + + def extract_in_parallel( + self, + queries: List[dict], + *, + session: Optional[aiohttp.ClientSession] = None, + ) -> Iterator[asyncio.Future]: + """…""" + queries = [self._build_extract_query(query) for query in queries] + return self.request_parallel_as_completed( + queries, + endpoint='extract', + session=session, + ) diff --git a/zyte_api/utils.py b/zyte_api/utils.py index 2be98de..eae095c 100644 --- a/zyte_api/utils.py +++ b/zyte_api/utils.py @@ -18,6 +18,13 @@ def _guess_intype(file_name, lines): return "txt" +def _to_lower_camel_case(snake_case_string): + """Convert from snake case (foo_bar) to lower-case-initial camel case + (fooBar).""" + prefix, *rest = snake_case_string.split('_') + return prefix + ''.join(part.title() for part in rest) + + def user_agent(library): return 'python-zyte-api/{} {}/{}'.format( __version__,