diff --git a/.github/workflows/python-app.yml b/.github/workflows/python-app.yml index 56f14ff87..77814efc2 100644 --- a/.github/workflows/python-app.yml +++ b/.github/workflows/python-app.yml @@ -22,7 +22,7 @@ jobs: strategy: matrix: - python-version: [ 3.8, 3.9, "3.10", "3.11", "3.12" ] + python-version: [ 3.9, "3.10", "3.11", "3.12" ] os: [ ubuntu-latest, macos-latest, windows-latest ] steps: diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 8637a4775..69df12af1 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -5,7 +5,7 @@ repos: hooks: - id: isort args: [ '-m', 'HANGING_INDENT', '-l', '120','--check-only' ] - files: \.py$ + files: . - repo: https://github.com/pycqa/flake8 rev: "7.0.0" diff --git a/misc/plist_sniffer.py b/misc/plist_sniffer.py index 9573f9f39..9d855dd8c 100755 --- a/misc/plist_sniffer.py +++ b/misc/plist_sniffer.py @@ -1,7 +1,7 @@ import plistlib import pprint import xml -from typing import IO, Mapping, Optional, TextIO +from typing import IO, Optional, TextIO import click from scapy.packet import Packet, Raw @@ -31,7 +31,7 @@ def process_packet(self, packet: Packet) -> None: except xml.parsers.expat.ExpatError: pass - def report(self, plist: Mapping) -> None: + def report(self, plist: dict) -> None: try: print(plist) if self.file is not None: diff --git a/misc/remotexpc_sniffer.py b/misc/remotexpc_sniffer.py index be6a78144..d222bd2bc 100644 --- a/misc/remotexpc_sniffer.py +++ b/misc/remotexpc_sniffer.py @@ -1,6 +1,6 @@ import logging from pprint import pformat -from typing import List, MutableMapping, Optional +from typing import Optional import click import coloredlogs @@ -71,7 +71,7 @@ def add(self, tcp_pkt: TCP) -> bool: class H2Stream(TCPStream): - def pop_frames(self) -> List[Frame]: + def pop_frames(self) -> list[Frame]: """ Pop all available H2Frames """ # If self.data starts with the http/2 magic bytes, pop them off @@ -96,8 +96,8 @@ def pop_frames(self) -> List[Frame]: class RemoteXPCSniffer: def __init__(self): - self._h2_streams: MutableMapping[str, H2Stream] = {} - self._previous_frame_data: MutableMapping[str, bytes] = {} + self._h2_streams: dict[str, H2Stream] = {} + self._previous_frame_data: dict[str, bytes] = {} def process_packet(self, packet: Packet) -> None: if packet.haslayer(TCP) and packet[TCP].payload: diff --git a/pymobiledevice3/bonjour.py b/pymobiledevice3/bonjour.py index fdd24de0d..3a3229e70 100644 --- a/pymobiledevice3/bonjour.py +++ b/pymobiledevice3/bonjour.py @@ -2,7 +2,7 @@ import dataclasses import sys from socket import AF_INET, AF_INET6, inet_ntop -from typing import List, Mapping, Optional +from typing import Optional from ifaddr import get_adapters from zeroconf import IPVersion, ServiceListener, ServiceStateChange, Zeroconf @@ -22,8 +22,8 @@ @dataclasses.dataclass class BonjourAnswer: name: str - properties: Mapping[bytes, bytes] - ips: List[str] + properties: dict[bytes, bytes] + ips: list[str] port: int @@ -31,10 +31,10 @@ class BonjourListener(ServiceListener): def __init__(self, ip: str): super().__init__() self.name: Optional[str] = None - self.properties: Mapping[bytes, bytes] = {} + self.properties: dict[bytes, bytes] = {} self.ip = ip self.port: Optional[int] = None - self.addresses: List[str] = [] + self.addresses: list[str] = [] self.queue: asyncio.Queue = asyncio.Queue() self.querying_task: Optional[asyncio.Task] = asyncio.create_task(self.query_addresses()) @@ -72,7 +72,7 @@ class BonjourQuery: listener: BonjourListener -def query_bonjour(service_names: List[str], ip: str) -> BonjourQuery: +def query_bonjour(service_names: list[str], ip: str) -> BonjourQuery: aiozc = AsyncZeroconf(interfaces=[ip]) listener = BonjourListener(ip) service_browser = AsyncServiceBrowser(aiozc.zeroconf, service_names, @@ -80,8 +80,8 @@ def query_bonjour(service_names: List[str], ip: str) -> BonjourQuery: return BonjourQuery(aiozc, service_browser, listener) -async def browse(service_names: List[str], ips: List[str], timeout: float = DEFAULT_BONJOUR_TIMEOUT) \ - -> List[BonjourAnswer]: +async def browse(service_names: list[str], ips: list[str], timeout: float = DEFAULT_BONJOUR_TIMEOUT) \ + -> list[BonjourAnswer]: try: bonjour_queries = [query_bonjour(service_names, adapter) for adapter in ips] except ValueError as e: @@ -104,11 +104,11 @@ async def browse(service_names: List[str], ips: List[str], timeout: float = DEFA return answers -async def browse_ipv6(service_names: List[str], timeout: float = DEFAULT_BONJOUR_TIMEOUT) -> List[BonjourAnswer]: +async def browse_ipv6(service_names: list[str], timeout: float = DEFAULT_BONJOUR_TIMEOUT) -> list[BonjourAnswer]: return await browse(service_names, OSUTILS.get_ipv6_ips(), timeout=timeout) -def get_ipv4_addresses() -> List[str]: +def get_ipv4_addresses() -> list[str]: ips = [] for adapter in get_adapters(): if adapter.nice_name.startswith('tun'): @@ -123,21 +123,21 @@ def get_ipv4_addresses() -> List[str]: return ips -async def browse_ipv4(service_names: List[str], timeout: float = DEFAULT_BONJOUR_TIMEOUT) -> List[BonjourAnswer]: +async def browse_ipv4(service_names: list[str], timeout: float = DEFAULT_BONJOUR_TIMEOUT) -> list[BonjourAnswer]: return await browse(service_names, get_ipv4_addresses(), timeout=timeout) -async def browse_remoted(timeout: float = DEFAULT_BONJOUR_TIMEOUT) -> List[BonjourAnswer]: +async def browse_remoted(timeout: float = DEFAULT_BONJOUR_TIMEOUT) -> list[BonjourAnswer]: return await browse_ipv6(REMOTED_SERVICE_NAMES, timeout=timeout) -async def browse_mobdev2(timeout: float = DEFAULT_BONJOUR_TIMEOUT) -> List[BonjourAnswer]: +async def browse_mobdev2(timeout: float = DEFAULT_BONJOUR_TIMEOUT) -> list[BonjourAnswer]: return await browse(MOBDEV2_SERVICE_NAMES, get_ipv4_addresses() + OSUTILS.get_ipv6_ips(), timeout=timeout) -async def browse_remotepairing(timeout: float = DEFAULT_BONJOUR_TIMEOUT) -> List[BonjourAnswer]: +async def browse_remotepairing(timeout: float = DEFAULT_BONJOUR_TIMEOUT) -> list[BonjourAnswer]: return await browse_ipv4(REMOTEPAIRING_SERVICE_NAMES, timeout=timeout) -async def browse_remotepairing_manual_pairing(timeout: float = DEFAULT_BONJOUR_TIMEOUT) -> List[BonjourAnswer]: +async def browse_remotepairing_manual_pairing(timeout: float = DEFAULT_BONJOUR_TIMEOUT) -> list[BonjourAnswer]: return await browse_ipv4(REMOTEPAIRING_MANUAL_PAIRING_SERVICE_NAMES, timeout=timeout) diff --git a/pymobiledevice3/cli/apps.py b/pymobiledevice3/cli/apps.py index f8535c4d7..8b25d1a61 100644 --- a/pymobiledevice3/cli/apps.py +++ b/pymobiledevice3/cli/apps.py @@ -1,5 +1,3 @@ -from typing import List - import click from pymobiledevice3.cli.cli_common import Command, print_json @@ -33,7 +31,7 @@ def apps_list(service_provider: LockdownServiceProvider, app_type: str, calculat @apps.command('query', cls=Command) @click.argument('bundle_identifiers', nargs=-1) @click.option('--calculate-sizes/--no-calculate-size', default=False) -def apps_query(service_provider: LockdownServiceProvider, bundle_identifiers: List[str], calculate_sizes: bool) -> None: +def apps_query(service_provider: LockdownServiceProvider, bundle_identifiers: list[str], calculate_sizes: bool) -> None: """ query installed apps """ print_json(InstallationProxyService(lockdown=service_provider) .get_apps(calculate_sizes=calculate_sizes, bundle_identifiers=bundle_identifiers)) diff --git a/pymobiledevice3/cli/cli_common.py b/pymobiledevice3/cli/cli_common.py index f3566b46f..b88c737d6 100644 --- a/pymobiledevice3/cli/cli_common.py +++ b/pymobiledevice3/cli/cli_common.py @@ -6,7 +6,7 @@ import sys import uuid from functools import wraps -from typing import Any, Callable, List, Mapping, Optional, Tuple +from typing import Any, Callable, Optional import click import coloredlogs @@ -126,7 +126,7 @@ def wrapper(*args, **kwargs): return wrapper -def prompt_selection(choices: List[Any], message: str, idx: bool = False) -> Any: +def prompt_selection(choices: list[Any], message: str, idx: bool = False) -> Any: question = [inquirer3.List('selection', message=message, choices=choices, carousel=True)] try: result = inquirer3.prompt(question, theme=GreenPassion(), raise_keyboard_interrupt=True) @@ -135,12 +135,12 @@ def prompt_selection(choices: List[Any], message: str, idx: bool = False) -> Any return result['selection'] if not idx else choices.index(result['selection']) -def prompt_device_list(device_list: List): +def prompt_device_list(device_list: list): return prompt_selection(device_list, 'Choose device') def choose_service_provider(callback: Callable): - def wrap_callback_calling(**kwargs: Mapping): + def wrap_callback_calling(**kwargs: dict) -> None: service_provider = None lockdown_service_provider = kwargs.pop('lockdown_service_provider', None) rsd_service_provider_manually = kwargs.pop('rsd_service_provider_manually', None) @@ -229,7 +229,7 @@ def __init__(self, *args, **kwargs): f'This option may also be transferred as an environment variable: {TUNNEL_ENV_VAR}') ] - def rsd(self, ctx, param: str, value: Optional[Tuple[str, int]]) -> Optional[RemoteServiceDiscoveryService]: + def rsd(self, ctx, param: str, value: Optional[tuple[str, int]]) -> Optional[RemoteServiceDiscoveryService]: if value is not None: rsd = RemoteServiceDiscoveryService(value) asyncio.run(rsd.connect(), debug=True) diff --git a/pymobiledevice3/cli/developer.py b/pymobiledevice3/cli/developer.py index 1cbb87458..85ccad2bc 100644 --- a/pymobiledevice3/cli/developer.py +++ b/pymobiledevice3/cli/developer.py @@ -11,7 +11,7 @@ from dataclasses import asdict from datetime import datetime from pathlib import Path -from typing import IO, List, Optional, Tuple +from typing import IO, Optional import click from click.exceptions import MissingParameter, UsageError @@ -165,7 +165,7 @@ def process_id_for_bundle_id(service_provider: LockdownServiceProvider, app_bund def get_matching_processes(service_provider: LockdownServiceProvider, name: Optional[str] = None, bundle_identifier: Optional[str] = None) \ - -> List[MatchedProcessByPid]: + -> list[MatchedProcessByPid]: result = [] with DvtSecureSocketProxyService(lockdown=service_provider) as dvt: device_info = DeviceInfo(dvt) @@ -331,7 +331,7 @@ def sysmon_process_monitor(service_provider: LockdownClient, threshold): @sysmon_process.command('single', cls=Command) @click.option('-a', '--attributes', multiple=True, help='filter processes by given attribute value given as key=value') -def sysmon_process_single(service_provider: LockdownClient, attributes: List[str]): +def sysmon_process_single(service_provider: LockdownClient, attributes: list[str]): """ show a single snapshot of currently running processes. """ count = 0 @@ -403,7 +403,7 @@ def core_profile_session(): help='Events subclass filter. Omit for all.') -def parse_filters(subclasses: List[int], classes: List[int]): +def parse_filters(subclasses: list[int], classes: list[int]): if not subclasses and not classes: return None parsed = set() @@ -1096,8 +1096,8 @@ def core_device_read_file( async def core_device_list_launch_application_task( - service_provider: RemoteServiceDiscoveryService, bundle_identifier: str, argument: List[str], - kill_existing: bool, suspended: bool, env: List[Tuple[str, str]]) -> None: + service_provider: RemoteServiceDiscoveryService, bundle_identifier: str, argument: list[str], + kill_existing: bool, suspended: bool, env: list[tuple[str, str]]) -> None: async with AppServiceService(service_provider) as app_service: print_json(await app_service.launch_application(bundle_identifier, argument, kill_existing, suspended, dict(env))) @@ -1112,8 +1112,8 @@ async def core_device_list_launch_application_task( @click.option('--env', multiple=True, type=click.Tuple((str, str)), help='Environment variables to pass to process given as a list of key value') def core_device_launch_application( - service_provider: RemoteServiceDiscoveryService, bundle_identifier: str, argument: Tuple[str], - kill_existing: bool, suspended: bool, env: List[Tuple[str, str]]) -> None: + service_provider: RemoteServiceDiscoveryService, bundle_identifier: str, argument: tuple[str], + kill_existing: bool, suspended: bool, env: list[tuple[str, str]]) -> None: """ Launch application """ asyncio.run( core_device_list_launch_application_task( @@ -1180,7 +1180,7 @@ def core_device_get_display_info(service_provider: RemoteServiceDiscoveryService asyncio.run(core_device_get_display_info_task(service_provider)) -async def core_device_query_mobilegestalt_task(service_provider: RemoteServiceDiscoveryService, key: List[str]) -> None: +async def core_device_query_mobilegestalt_task(service_provider: RemoteServiceDiscoveryService, key: list[str]) -> None: """ Query MobileGestalt """ async with DeviceInfoService(service_provider) as app_service: print_json(await app_service.query_mobilegestalt(key)) @@ -1188,7 +1188,7 @@ async def core_device_query_mobilegestalt_task(service_provider: RemoteServiceDi @core_device.command('query-mobilegestalt', cls=RSDCommand) @click.argument('key', nargs=-1, type=click.STRING) -def core_device_query_mobilegestalt(service_provider: RemoteServiceDiscoveryService, key: Tuple[str]) -> None: +def core_device_query_mobilegestalt(service_provider: RemoteServiceDiscoveryService, key: tuple[str]) -> None: """ Query MobileGestalt """ asyncio.run(core_device_query_mobilegestalt_task(service_provider, list(key))) diff --git a/pymobiledevice3/cli/profile.py b/pymobiledevice3/cli/profile.py index b03871597..00b92fbc8 100644 --- a/pymobiledevice3/cli/profile.py +++ b/pymobiledevice3/cli/profile.py @@ -2,7 +2,7 @@ import plistlib import tempfile from pathlib import Path -from typing import IO, List, Optional +from typing import IO, Optional import click @@ -36,7 +36,7 @@ def profile_list(service_provider: LockdownClient): @profile_group.command('install', cls=Command) @click.option('--keybag', type=click.Path(file_okay=True, dir_okay=False, exists=True)) @click.argument('profiles', nargs=-1, type=click.File('rb')) -def profile_install(service_provider: LockdownServiceProvider, keybag: Optional[str], profiles: List[IO]) -> None: +def profile_install(service_provider: LockdownServiceProvider, keybag: Optional[str], profiles: list[IO]) -> None: """ Install given profiles @@ -66,7 +66,7 @@ def profile_cloud_configuration(service_provider: LockdownServiceProvider, confi @profile_group.command('store', cls=Command) @click.argument('profiles', nargs=-1, type=click.File('rb')) -def profile_store(service_provider: LockdownServiceProvider, profiles: List[IO]) -> None: +def profile_store(service_provider: LockdownServiceProvider, profiles: list[IO]) -> None: """ Store a profile """ service = MobileConfigService(lockdown=service_provider) for profile in profiles: diff --git a/pymobiledevice3/cli/remote.py b/pymobiledevice3/cli/remote.py index a649990d6..bc683cad0 100644 --- a/pymobiledevice3/cli/remote.py +++ b/pymobiledevice3/cli/remote.py @@ -4,7 +4,7 @@ import sys import tempfile from functools import partial -from typing import List, Mapping, Optional, TextIO +from typing import Optional, TextIO import click @@ -25,7 +25,7 @@ logger = logging.getLogger(__name__) -async def browse_rsd(timeout: float = DEFAULT_BONJOUR_TIMEOUT) -> List[Mapping]: +async def browse_rsd(timeout: float = DEFAULT_BONJOUR_TIMEOUT) -> list[dict]: devices = [] for rsd in await get_rsds(timeout): devices.append({'address': rsd.service.address[0], @@ -36,7 +36,7 @@ async def browse_rsd(timeout: float = DEFAULT_BONJOUR_TIMEOUT) -> List[Mapping]: return devices -async def browse_remotepairing(timeout: float = DEFAULT_BONJOUR_TIMEOUT) -> List[Mapping]: +async def browse_remotepairing(timeout: float = DEFAULT_BONJOUR_TIMEOUT) -> list[dict]: devices = [] for remotepairing in await get_remote_pairing_tunnel_services(timeout): devices.append({'address': remotepairing.hostname, @@ -211,7 +211,7 @@ async def start_remote_pair_task(device_name: str) -> None: if start_tunnel is None: raise NotImplementedError('failed to start the tunnel on your platform') - devices: List[RemotePairingManualPairingDevice] = [] + devices: list[RemotePairingManualPairingDevice] = [] for answer in await browse_remotepairing_manual_pairing(): current_device_name = answer.properties[b'name'].decode() diff --git a/pymobiledevice3/cli/restore.py b/pymobiledevice3/cli/restore.py index db251de89..41150d838 100644 --- a/pymobiledevice3/cli/restore.py +++ b/pymobiledevice3/cli/restore.py @@ -5,8 +5,9 @@ import plistlib import tempfile import traceback +from collections.abc import Generator from pathlib import Path -from typing import IO, Generator, Optional, Union +from typing import IO, Optional, Union from zipfile import ZipFile import click diff --git a/pymobiledevice3/cli/syslog.py b/pymobiledevice3/cli/syslog.py index 6403eba96..2c73f2676 100644 --- a/pymobiledevice3/cli/syslog.py +++ b/pymobiledevice3/cli/syslog.py @@ -2,7 +2,7 @@ import os import posixpath import re -from typing import List, Optional, TextIO +from typing import Optional, TextIO import click @@ -82,8 +82,8 @@ def format_line(color, pid, syslog_entry, include_label): def syslog_live( service_provider: LockdownServiceProvider, out: Optional[TextIO], pid: Optional[int], - process_name: Optional[str], match: List[str], match_insensitive: List[str], include_label: bool, - regex: List[str], insensitive_regex: List[str]) -> None: + process_name: Optional[str], match: list[str], match_insensitive: list[str], include_label: bool, + regex: list[str], insensitive_regex: list[str]) -> None: match_regex = [re.compile(f'.*({r}).*', re.DOTALL) for r in regex] match_regex += [re.compile(f'.*({r}).*', re.IGNORECASE | re.DOTALL) for r in insensitive_regex] @@ -158,8 +158,8 @@ def replace(m): @click.option('-ei', '--insensitive-regex', multiple=True, help='filter only lines matching given regex (insensitive)') def cli_syslog_live( service_provider: LockdownServiceProvider, out: Optional[TextIO], pid: Optional[int], - process_name: Optional[str], match: List[str], match_insensitive: List[str], include_label: bool, - regex: List[str], insensitive_regex: List[str]) -> None: + process_name: Optional[str], match: list[str], match_insensitive: list[str], include_label: bool, + regex: list[str], insensitive_regex: list[str]) -> None: """ view live syslog lines """ syslog_live(service_provider, out, pid, process_name, match, match_insensitive, include_label, regex, diff --git a/pymobiledevice3/cli/webinspector.py b/pymobiledevice3/cli/webinspector.py index 6641f2cb7..37a094fdf 100644 --- a/pymobiledevice3/cli/webinspector.py +++ b/pymobiledevice3/cli/webinspector.py @@ -2,9 +2,10 @@ import logging import re from abc import ABC, abstractmethod +from collections.abc import Iterable from contextlib import asynccontextmanager from functools import update_wrapper -from typing import Iterable, List, Optional, Type +from typing import Optional import click import inquirer3 @@ -257,7 +258,7 @@ def cdp(service_provider: LockdownClient, host, port): ws_ping_timeout=None, ws='wsproto', loop='asyncio') -def get_js_completions(jsshell: 'JsShell', obj: str, prefix: str) -> List[Completion]: +def get_js_completions(jsshell: 'JsShell', obj: str, prefix: str) -> list[Completion]: if obj in JS_RESERVED_WORDS: return [] @@ -415,7 +416,7 @@ def query_page(inspector: WebinspectorService) -> Optional[Page]: return page -async def run_js_shell(js_shell_class: Type[JsShell], lockdown: LockdownClient, +async def run_js_shell(js_shell_class: type[JsShell], lockdown: LockdownClient, timeout: float, url: str): async with js_shell_class.create(lockdown, timeout, SAFARI) as js_shell_instance: await js_shell_instance.start(url) diff --git a/pymobiledevice3/exceptions.py b/pymobiledevice3/exceptions.py index 30b39590b..99e9c2642 100644 --- a/pymobiledevice3/exceptions.py +++ b/pymobiledevice3/exceptions.py @@ -17,7 +17,7 @@ 'CloudConfigurationAlreadyPresentError' ] -from typing import List, Optional +from typing import Optional class PyMobileDevice3Exception(Exception): @@ -305,7 +305,7 @@ class InvalidServiceError(LockdownError): class InspectorEvaluateError(PyMobileDevice3Exception): def __init__(self, class_name: str, message: str, line: Optional[int] = None, column: Optional[int] = None, - stack: Optional[List[str]] = None): + stack: Optional[list[str]] = None): super().__init__() self.class_name = class_name self.message = message diff --git a/pymobiledevice3/lockdown.py b/pymobiledevice3/lockdown.py index 10e64bf1b..d2de32f53 100755 --- a/pymobiledevice3/lockdown.py +++ b/pymobiledevice3/lockdown.py @@ -7,12 +7,13 @@ import tempfile import time from abc import ABC, abstractmethod +from collections.abc import Generator from contextlib import contextmanager, suppress from enum import Enum from functools import wraps from pathlib import Path from ssl import SSLZeroReturnError -from typing import Dict, Generator, Mapping, Optional, Tuple +from typing import Optional from cryptography import x509 from cryptography.hazmat.primitives import hashes, serialization @@ -79,7 +80,7 @@ def _inner_reconnect_on_remote_close(*args, **kwargs): class LockdownClient(ABC, LockdownServiceProvider): def __init__(self, service: ServiceConnection, host_id: str, identifier: str = None, - label: str = DEFAULT_LABEL, system_buid: str = SYSTEM_BUID, pair_record: Mapping = None, + label: str = DEFAULT_LABEL, system_buid: str = SYSTEM_BUID, pair_record: Optional[dict] = None, pairing_records_cache_folder: Path = None, port: int = SERVICE_PORT): """ Create a LockdownClient instance @@ -119,7 +120,7 @@ def __init__(self, service: ServiceConnection, host_id: str, identifier: str = N def create(cls, service: ServiceConnection, identifier: str = None, system_buid: str = SYSTEM_BUID, label: str = DEFAULT_LABEL, autopair: bool = True, pair_timeout: float = None, local_hostname: str = None, - pair_record: Mapping = None, pairing_records_cache_folder: Path = None, port: int = SERVICE_PORT, + pair_record: Optional[dict] = None, pairing_records_cache_folder: Path = None, port: int = SERVICE_PORT, private_key: Optional[RSAPrivateKey] = None, **cls_specific_args): """ Create a LockdownClient instance @@ -180,7 +181,7 @@ def wifi_mac_address(self) -> str: return self.all_values.get('WiFiAddress') @property - def short_info(self) -> Dict: + def short_info(self) -> dict: keys_to_copy = ['DeviceClass', 'DeviceName', 'BuildVersion', 'ProductVersion', 'ProductType', 'UniqueDeviceID'] result = { 'Identifier': self.identifier, @@ -244,7 +245,7 @@ def locale(self) -> str: return self.get_value(key='Locale', domain='com.apple.international') @property - def preflight_info(self) -> Mapping: + def preflight_info(self) -> dict: return self.get_value(key='FirmwarePreflightInfo') @property @@ -294,7 +295,7 @@ def set_uses24hClock(self, value: bool) -> None: def enter_recovery(self): return self._request('EnterRecovery') - def stop_session(self) -> Mapping: + def stop_session(self) -> dict: if self.session_id and self.service: response = self._request('StopSession', {'SessionID': self.session_id}) self.session_id = None @@ -458,7 +459,7 @@ def get_value(self, domain: str = None, key: str = None): return r @_reconnect_on_remote_close - def remove_value(self, domain: str = None, key: str = None) -> Mapping: + def remove_value(self, domain: str = None, key: str = None) -> dict: options = {} if domain: @@ -469,7 +470,7 @@ def remove_value(self, domain: str = None, key: str = None) -> Mapping: return self._request('RemoveValue', options) @_reconnect_on_remote_close - def set_value(self, value, domain: str = None, key: str = None) -> Mapping: + def set_value(self, value, domain: str = None, key: str = None) -> dict: options = {} if domain: @@ -480,7 +481,7 @@ def set_value(self, value, domain: str = None, key: str = None) -> Mapping: options['Value'] = value return self._request('SetValue', options) - def get_service_connection_attributes(self, name: str, include_escrow_bag: bool = False) -> Mapping: + def get_service_connection_attributes(self, name: str, include_escrow_bag: bool = False) -> dict: if not self.paired: raise NotPairedError() @@ -553,7 +554,7 @@ def _create_service_connection(self, port: int) -> ServiceConnection: """ Used to establish a new ServiceConnection to a given port """ pass - def _request(self, request: str, options: Mapping = None, verify_request: bool = True) -> Mapping: + def _request(self, request: str, options: Optional[dict] = None, verify_request: bool = True) -> dict: message = {'Label': self.label, 'Request': request} if options: message.update(options) @@ -584,7 +585,7 @@ def _request(self, request: str, options: Mapping = None, verify_request: bool = return response - def _request_pair(self, pair_options: Mapping, timeout: Optional[float] = None) -> Mapping: + def _request_pair(self, pair_options: dict, timeout: Optional[float] = None) -> dict: try: return self._request('Pair', pair_options) except PairingDialogResponsePendingError: @@ -614,7 +615,7 @@ def _reestablish_connection(self) -> None: class UsbmuxLockdownClient(LockdownClient): def __init__(self, service: ServiceConnection, host_id: str, identifier: str = None, - label: str = DEFAULT_LABEL, system_buid: str = SYSTEM_BUID, pair_record: Mapping = None, + label: str = DEFAULT_LABEL, system_buid: str = SYSTEM_BUID, pair_record: Optional[dict] = None, pairing_records_cache_folder: Path = None, port: int = SERVICE_PORT, usbmux_address: Optional[str] = None): super().__init__(service, host_id, identifier, label, system_buid, pair_record, pairing_records_cache_folder, @@ -622,7 +623,7 @@ def __init__(self, service: ServiceConnection, host_id: str, identifier: str = N self.usbmux_address = usbmux_address @property - def short_info(self) -> Dict: + def short_info(self) -> dict: short_info = super().short_info short_info['ConnectionType'] = self.service.mux_device.connection_type return short_info @@ -648,7 +649,7 @@ def save_pair_record(self) -> None: class TcpLockdownClient(LockdownClient): def __init__(self, service: ServiceConnection, host_id: str, hostname: str, identifier: str = None, - label: str = DEFAULT_LABEL, system_buid: str = SYSTEM_BUID, pair_record: Mapping = None, + label: str = DEFAULT_LABEL, system_buid: str = SYSTEM_BUID, pair_record: Optional[dict] = None, pairing_records_cache_folder: Path = None, port: int = SERVICE_PORT, keep_alive: bool = True): """ Create a LockdownClient instance @@ -689,7 +690,7 @@ def unpair(self, timeout: float = None) -> None: raise NotImplementedError('RemoteXPC lockdown version does not support pairing operations') def __init__(self, service: ServiceConnection, host_id: str, identifier: str = None, - label: str = DEFAULT_LABEL, system_buid: str = SYSTEM_BUID, pair_record: Mapping = None, + label: str = DEFAULT_LABEL, system_buid: str = SYSTEM_BUID, pair_record: Optional[dict] = None, pairing_records_cache_folder: Path = None, port: int = SERVICE_PORT): """ Create a LockdownClient instance @@ -709,7 +710,7 @@ def __init__(self, service: ServiceConnection, host_id: str, identifier: str = N def create_using_usbmux(serial: str = None, identifier: str = None, label: str = DEFAULT_LABEL, autopair: bool = True, connection_type: str = None, pair_timeout: float = None, local_hostname: str = None, - pair_record: Mapping = None, pairing_records_cache_folder: Path = None, + pair_record: Optional[dict] = None, pairing_records_cache_folder: Path = None, port: int = SERVICE_PORT, usbmux_address: Optional[str] = None) -> UsbmuxLockdownClient: """ Create a UsbmuxLockdownClient instance @@ -747,7 +748,7 @@ def create_using_usbmux(serial: str = None, identifier: str = None, label: str = def create_using_tcp(hostname: str, identifier: str = None, label: str = DEFAULT_LABEL, autopair: bool = True, - pair_timeout: float = None, local_hostname: str = None, pair_record: Mapping = None, + pair_timeout: float = None, local_hostname: str = None, pair_record: Optional[dict] = None, pairing_records_cache_folder: Path = None, port: int = SERVICE_PORT, keep_alive: bool = False) -> TcpLockdownClient: """ @@ -775,7 +776,7 @@ def create_using_tcp(hostname: str, identifier: str = None, label: str = DEFAULT def create_using_remote(service: ServiceConnection, identifier: str = None, label: str = DEFAULT_LABEL, autopair: bool = True, pair_timeout: float = None, local_hostname: str = None, - pair_record: Mapping = None, pairing_records_cache_folder: Path = None, + pair_record: Optional[dict] = None, pairing_records_cache_folder: Path = None, port: int = SERVICE_PORT) -> RemoteLockdownClient: """ Create a TcpLockdownClient instance over RSD @@ -801,7 +802,7 @@ def create_using_remote(service: ServiceConnection, identifier: str = None, labe async def get_mobdev2_lockdowns( udid: Optional[str] = None, pair_records: Optional[Path] = None, only_paired: bool = False, timeout: float = DEFAULT_BONJOUR_TIMEOUT) \ - -> Generator[Tuple[str, TcpLockdownClient], None, None]: + -> Generator[tuple[str, TcpLockdownClient], None, None]: records = {} if pair_records is None: pair_records = get_home_folder() diff --git a/pymobiledevice3/osu/os_utils.py b/pymobiledevice3/osu/os_utils.py index 71ff4d5de..2fc56afe0 100644 --- a/pymobiledevice3/osu/os_utils.py +++ b/pymobiledevice3/osu/os_utils.py @@ -3,7 +3,6 @@ import sys from datetime import datetime from pathlib import Path -from typing import List, Tuple from pymobiledevice3.exceptions import FeatureNotSupportedError, OSNotSupportedError @@ -41,7 +40,7 @@ def is_admin(self) -> bool: raise FeatureNotSupportedError(self._os_name, inspect.currentframe().f_code.co_name) @property - def usbmux_address(self) -> Tuple[str, int]: + def usbmux_address(self) -> tuple[str, int]: raise FeatureNotSupportedError(self._os_name, inspect.currentframe().f_code.co_name) @property @@ -60,7 +59,7 @@ def access_denied_error(self) -> str: def pair_record_path(self) -> Path: raise FeatureNotSupportedError(self._os_name, inspect.currentframe().f_code.co_name) - def get_ipv6_ips(self) -> List[str]: + def get_ipv6_ips(self) -> list[str]: raise FeatureNotSupportedError(self._os_name, inspect.currentframe().f_code.co_name) def set_keepalive(self, sock: socket.socket, after_idle_sec: int = DEFAULT_AFTER_IDLE_SEC, diff --git a/pymobiledevice3/osu/posix_util.py b/pymobiledevice3/osu/posix_util.py index 13f6a467c..7af9f86b4 100644 --- a/pymobiledevice3/osu/posix_util.py +++ b/pymobiledevice3/osu/posix_util.py @@ -4,7 +4,6 @@ import socket import struct from pathlib import Path -from typing import List, Tuple from ifaddr import get_adapters @@ -22,7 +21,7 @@ def is_admin(self) -> bool: return os.geteuid() == 0 @property - def usbmux_address(self) -> Tuple[str, int]: + def usbmux_address(self) -> tuple[str, int]: return MuxConnection.USBMUXD_PIPE, socket.AF_UNIX @property @@ -33,7 +32,7 @@ def bonjour_timeout(self) -> int: def access_denied_error(self) -> str: return 'This command requires root privileges. Consider retrying with "sudo".' - def get_ipv6_ips(self) -> List[str]: + def get_ipv6_ips(self) -> list[str]: return [f'{adapter.ips[0].ip[0]}%{adapter.nice_name}' for adapter in get_adapters() if adapter.ips[0].is_IPv6 and not adapter.nice_name.startswith('tun')] @@ -89,5 +88,5 @@ def get_homedir(self) -> Path: class Cygwin(Posix): @property - def usbmux_address(self) -> Tuple[str, int]: + def usbmux_address(self) -> tuple[str, int]: return MuxConnection.ITUNES_HOST, socket.AF_INET diff --git a/pymobiledevice3/osu/win_util.py b/pymobiledevice3/osu/win_util.py index 024acae99..687d22a7e 100644 --- a/pymobiledevice3/osu/win_util.py +++ b/pymobiledevice3/osu/win_util.py @@ -2,7 +2,6 @@ import os import socket from pathlib import Path -from typing import List, Tuple import win32security from ifaddr import get_adapters @@ -25,7 +24,7 @@ def is_admin(self) -> bool: return False @property - def usbmux_address(self) -> Tuple[str, int]: + def usbmux_address(self) -> tuple[str, int]: return MuxConnection.ITUNES_HOST, socket.AF_INET @property @@ -44,7 +43,7 @@ def access_denied_error(self) -> str: def pair_record_path(self) -> Path: return Path(os.environ.get('ALLUSERSPROFILE', ''), 'Apple', 'Lockdown') - def get_ipv6_ips(self) -> List[str]: + def get_ipv6_ips(self) -> list[str]: return [f'{adapter.ips[0].ip[0]}%{adapter.ips[0].ip[2]}' for adapter in get_adapters() if adapter.ips[0].is_IPv6] diff --git a/pymobiledevice3/pair_records.py b/pymobiledevice3/pair_records.py index 7cb248df6..f35702293 100644 --- a/pymobiledevice3/pair_records.py +++ b/pymobiledevice3/pair_records.py @@ -2,9 +2,10 @@ import platform import plistlib import uuid +from collections.abc import Generator from contextlib import suppress from pathlib import Path -from typing import Generator, Mapping, Optional +from typing import Optional from pymobiledevice3 import usbmux from pymobiledevice3.common import get_home_folder @@ -23,7 +24,7 @@ def generate_host_id(hostname: str = None) -> str: return str(host_id).upper() -def get_itunes_pairing_record(identifier: str) -> Optional[Mapping]: +def get_itunes_pairing_record(identifier: str) -> Optional[dict]: filename = OSUTILS.pair_record_path / f'{identifier}.plist' try: with open(filename, 'rb') as f: @@ -33,7 +34,7 @@ def get_itunes_pairing_record(identifier: str) -> Optional[Mapping]: return pair_record -def get_local_pairing_record(identifier: str, pairing_records_cache_folder: Path) -> Optional[Mapping]: +def get_local_pairing_record(identifier: str, pairing_records_cache_folder: Path) -> Optional[dict]: logger.debug('Looking for pymobiledevice3 pairing record') path = pairing_records_cache_folder / f'{identifier}.{PAIRING_RECORD_EXT}' if not path.exists(): @@ -43,7 +44,7 @@ def get_local_pairing_record(identifier: str, pairing_records_cache_folder: Path def get_preferred_pair_record(identifier: str, pairing_records_cache_folder: Path, - usbmux_address: Optional[str] = None) -> Mapping: + usbmux_address: Optional[str] = None) -> dict: """ look for an existing pair record to connected device by following order: - usbmuxd diff --git a/pymobiledevice3/remote/core_device/app_service.py b/pymobiledevice3/remote/core_device/app_service.py index 37f155dde..e4f2d77df 100644 --- a/pymobiledevice3/remote/core_device/app_service.py +++ b/pymobiledevice3/remote/core_device/app_service.py @@ -1,5 +1,5 @@ import plistlib -from typing import List, Mapping, Optional +from typing import Optional from pymobiledevice3.remote.core_device.core_device_service import CoreDeviceService from pymobiledevice3.remote.remote_service_discovery import RemoteServiceDiscoveryService @@ -18,7 +18,7 @@ def __init__(self, rsd: RemoteServiceDiscoveryService): async def list_apps(self, include_app_clips: bool = True, include_removable_apps: bool = True, include_hidden_apps: bool = True, include_internal_apps: bool = True, - include_default_apps: bool = True) -> List[Mapping]: + include_default_apps: bool = True) -> list[dict]: """ List applications """ return await self.invoke('com.apple.coredevice.feature.listapps', { 'includeAppClips': include_app_clips, 'includeRemovableApps': include_removable_apps, @@ -26,9 +26,9 @@ async def list_apps(self, include_app_clips: bool = True, include_removable_apps 'includeDefaultApps': include_default_apps}) async def launch_application( - self, bundle_id: str, arguments: Optional[List[str]] = None, kill_existing: bool = True, - start_suspended: bool = False, environment: Optional[Mapping] = None, extra_options: Mapping = None) \ - -> List[Mapping]: + self, bundle_id: str, arguments: Optional[list[str]] = None, kill_existing: bool = True, + start_suspended: bool = False, environment: Optional[dict] = None, extra_options: Optional[dict] = None) \ + -> list[dict]: """ launch application """ return await self.invoke('com.apple.coredevice.feature.launchapplication', { 'applicationSpecifier': { @@ -47,11 +47,11 @@ async def launch_application( }, }) - async def list_processes(self) -> List[Mapping]: + async def list_processes(self) -> list[dict]: """ List processes """ return (await self.invoke('com.apple.coredevice.feature.listprocesses'))['processTokens'] - async def list_roots(self) -> Mapping: + async def list_roots(self) -> dict: """ List roots. @@ -62,7 +62,7 @@ async def list_roots(self) -> Mapping: 'relative': '/' }}) - async def spawn_executable(self, executable: str, arguments: List[str]) -> Mapping: + async def spawn_executable(self, executable: str, arguments: list[str]) -> dict: """ Spawn given executable. @@ -89,7 +89,7 @@ async def spawn_executable(self, executable: str, arguments: List[str]) -> Mappi }, }) - async def monitor_process_termination(self, pid: int) -> Mapping: + async def monitor_process_termination(self, pid: int) -> dict: """ Monitor process termination. @@ -104,7 +104,7 @@ async def uninstall_app(self, bundle_identifier: str) -> None: """ await self.invoke('com.apple.coredevice.feature.uninstallapp', {'bundleIdentifier': bundle_identifier}) - async def send_signal_to_process(self, pid: int, signal: int) -> Mapping: + async def send_signal_to_process(self, pid: int, signal: int) -> dict: """ Send signal to given process by its pid """ @@ -114,7 +114,7 @@ async def send_signal_to_process(self, pid: int, signal: int) -> Mapping: }) async def fetch_icons(self, bundle_identifier: str, width: float, height: float, scale: float, - allow_placeholder: bool) -> Mapping: + allow_placeholder: bool) -> dict: """ Fetch given application's icons """ diff --git a/pymobiledevice3/remote/core_device/core_device_service.py b/pymobiledevice3/remote/core_device/core_device_service.py index b8f34a028..667562a61 100644 --- a/pymobiledevice3/remote/core_device/core_device_service.py +++ b/pymobiledevice3/remote/core_device/core_device_service.py @@ -1,12 +1,12 @@ import uuid -from typing import Any, Mapping +from typing import Any, Optional from pymobiledevice3.exceptions import CoreDeviceError from pymobiledevice3.remote.remote_service import RemoteService from pymobiledevice3.remote.xpc_message import XpcInt64Type, XpcUInt64Type -def _generate_core_device_version_dict(version: str) -> Mapping: +def _generate_core_device_version_dict(version: str) -> dict: version_components = version.split('.') return {'components': [XpcUInt64Type(component) for component in version_components], 'originalComponentsCount': XpcInt64Type(len(version_components)), @@ -17,7 +17,7 @@ def _generate_core_device_version_dict(version: str) -> Mapping: class CoreDeviceService(RemoteService): - async def invoke(self, feature_identifier: str, input_: Mapping = None) -> Any: + async def invoke(self, feature_identifier: str, input_: Optional[dict] = None) -> Any: if input_ is None: input_ = {} response = await self.service.send_receive_request({ diff --git a/pymobiledevice3/remote/core_device/device_info.py b/pymobiledevice3/remote/core_device/device_info.py index ecbba7932..ac4d90001 100644 --- a/pymobiledevice3/remote/core_device/device_info.py +++ b/pymobiledevice3/remote/core_device/device_info.py @@ -1,5 +1,3 @@ -from typing import List, Mapping - from pymobiledevice3.remote.core_device.core_device_service import CoreDeviceService from pymobiledevice3.remote.remote_service_discovery import RemoteServiceDiscoveryService @@ -14,19 +12,19 @@ class DeviceInfoService(CoreDeviceService): def __init__(self, rsd: RemoteServiceDiscoveryService): super().__init__(rsd, self.SERVICE_NAME) - async def get_device_info(self) -> Mapping: + async def get_device_info(self) -> dict: """ Get device information """ return await self.invoke('com.apple.coredevice.feature.getdeviceinfo', {}) - async def get_display_info(self) -> Mapping: + async def get_display_info(self) -> dict: """ Get display information """ return await self.invoke('com.apple.coredevice.feature.getdisplayinfo', {}) - async def query_mobilegestalt(self, keys: List[str]) -> Mapping: + async def query_mobilegestalt(self, keys: list[str]) -> dict: """ Query MobileGestalt. @@ -34,7 +32,7 @@ async def query_mobilegestalt(self, keys: List[str]) -> Mapping: """ return await self.invoke('com.apple.coredevice.feature.querymobilegestalt', {'keys': keys}) - async def get_lockstate(self) -> Mapping: + async def get_lockstate(self) -> dict: """ Get lockstate """ diff --git a/pymobiledevice3/remote/core_device/diagnostics_service.py b/pymobiledevice3/remote/core_device/diagnostics_service.py index 225fcd2a3..aa6c328ec 100644 --- a/pymobiledevice3/remote/core_device/diagnostics_service.py +++ b/pymobiledevice3/remote/core_device/diagnostics_service.py @@ -1,4 +1,4 @@ -from typing import AsyncGenerator +from collections.abc import AsyncGenerator from pymobiledevice3.remote.core_device.core_device_service import CoreDeviceService from pymobiledevice3.remote.remote_service_discovery import RemoteServiceDiscoveryService diff --git a/pymobiledevice3/remote/core_device/file_service.py b/pymobiledevice3/remote/core_device/file_service.py index 555301185..2ea0e2cd9 100644 --- a/pymobiledevice3/remote/core_device/file_service.py +++ b/pymobiledevice3/remote/core_device/file_service.py @@ -1,8 +1,9 @@ import asyncio import struct import uuid +from collections.abc import AsyncGenerator from enum import IntEnum -from typing import AsyncGenerator, List, Mapping, Optional +from typing import Optional from pymobiledevice3.exceptions import CoreDeviceError from pymobiledevice3.remote.core_device.core_device_service import CoreDeviceService @@ -44,7 +45,7 @@ async def connect(self) -> None: 'User': 'mobile'}) self.session = response['NewSessionID'] - async def retrieve_directory_list(self, path: str = '.') -> AsyncGenerator[List[str], None]: + async def retrieve_directory_list(self, path: str = '.') -> AsyncGenerator[list[str], None]: return (await self.send_receive_request({ 'Cmd': 'RetrieveDirectoryList', 'MessageUUID': str(uuid.uuid4()), 'Path': path, 'SessionID': self.session} ))['FileList'] @@ -60,7 +61,7 @@ async def retrieve_file(self, path: str = '.') -> bytes: await reader.readexactly(0x24) return await reader.readexactly(struct.unpack('>I', await reader.readexactly(4))[0]) - async def send_receive_request(self, request: Mapping) -> Mapping: + async def send_receive_request(self, request: dict) -> dict: response = await self.service.send_receive_request(request) encoded_error = response.get('EncodedError') if encoded_error is not None: diff --git a/pymobiledevice3/remote/remote_service_discovery.py b/pymobiledevice3/remote/remote_service_discovery.py index d8796d029..7efc5e8b0 100644 --- a/pymobiledevice3/remote/remote_service_discovery.py +++ b/pymobiledevice3/remote/remote_service_discovery.py @@ -2,7 +2,7 @@ import logging from dataclasses import dataclass from datetime import datetime -from typing import List, Mapping, Optional, Tuple, Union +from typing import Optional, Union from pymobiledevice3.bonjour import DEFAULT_BONJOUR_TIMEOUT, browse_remoted from pymobiledevice3.common import get_home_folder @@ -28,13 +28,13 @@ class RSDDevice: class RemoteServiceDiscoveryService(LockdownServiceProvider): - def __init__(self, address: Tuple[str, int], name: Optional[str] = None) -> None: + def __init__(self, address: tuple[str, int], name: Optional[str] = None) -> None: super().__init__() self.name = name self.service = RemoteXPCConnection(address) - self.peer_info: Optional[Mapping] = None + self.peer_info: Optional[dict] = None self.lockdown: Optional[LockdownClient] = None - self.all_values: Optional[Mapping] = None + self.all_values: Optional[dict] = None @property def product_version(self) -> str: @@ -140,7 +140,7 @@ def __repr__(self) -> str: f'UDID:{self.udid}{name_str}>') -async def get_remoted_devices(timeout: float = DEFAULT_BONJOUR_TIMEOUT) -> List[RSDDevice]: +async def get_remoted_devices(timeout: float = DEFAULT_BONJOUR_TIMEOUT) -> list[RSDDevice]: result = [] for hostname in await browse_remoted(timeout): with RemoteServiceDiscoveryService((hostname, RSD_PORT)) as rsd: diff --git a/pymobiledevice3/remote/remotexpc.py b/pymobiledevice3/remote/remotexpc.py index e11d90085..3cd739efa 100644 --- a/pymobiledevice3/remote/remotexpc.py +++ b/pymobiledevice3/remote/remotexpc.py @@ -1,7 +1,8 @@ import asyncio import sys from asyncio import IncompleteReadError -from typing import Generator, Mapping, Optional, Tuple +from collections.abc import Generator +from typing import Optional import IPython import nest_asyncio @@ -35,10 +36,10 @@ class RemoteXPCConnection: - def __init__(self, address: Tuple[str, int]): + def __init__(self, address: tuple[str, int]): self._previous_frame_data = b'' self.address = address - self.next_message_id: Mapping[int: int] = {ROOT_CHANNEL: 0, REPLY_CHANNEL: 0} + self.next_message_id: dict[int, int] = {ROOT_CHANNEL: 0, REPLY_CHANNEL: 0} self.peer_info = None self._reader: Optional[asyncio.StreamReader] = None self._writer: Optional[asyncio.StreamWriter] = None @@ -65,7 +66,7 @@ async def close(self) -> None: self._writer = None self._reader = None - async def send_request(self, data: Mapping, wanting_reply: bool = False) -> None: + async def send_request(self, data: dict, wanting_reply: bool = False) -> None: xpc_wrapper = create_xpc_wrapper( data, message_id=self.next_message_id[ROOT_CHANNEL], wanting_reply=wanting_reply) self._writer.write(DataFrame(stream_id=ROOT_CHANNEL, data=xpc_wrapper).serialize()) @@ -96,7 +97,7 @@ async def receive_file(self, total_size: int) -> bytes: buf += chunk return buf - async def receive_response(self) -> Mapping: + async def receive_response(self) -> dict: while True: frame = await self._receive_next_data_frame() try: @@ -112,7 +113,7 @@ async def receive_response(self) -> Mapping: self.next_message_id[frame.stream_id] = xpc_message.message_id + 1 return decode_xpc_object(xpc_message.payload.obj) - async def send_receive_request(self, data: Mapping) -> Mapping: + async def send_receive_request(self, data: dict) -> dict: await self.send_request(data, wanting_reply=True) return await self.receive_response() diff --git a/pymobiledevice3/remote/tunnel_service.py b/pymobiledevice3/remote/tunnel_service.py index 0d5141c4f..f00a58f57 100644 --- a/pymobiledevice3/remote/tunnel_service.py +++ b/pymobiledevice3/remote/tunnel_service.py @@ -14,11 +14,12 @@ from abc import ABC, abstractmethod from asyncio import CancelledError, StreamReader, StreamWriter from collections import namedtuple +from collections.abc import AsyncGenerator from contextlib import asynccontextmanager, suppress from pathlib import Path from socket import create_connection from ssl import VerifyMode -from typing import AsyncGenerator, List, Mapping, Optional, TextIO, cast +from typing import Optional, TextIO, cast import aiofiles from construct import Const, Container @@ -146,7 +147,7 @@ async def send_packet_to_device(self, packet: bytes) -> None: pass @abstractmethod - async def request_tunnel_establish(self) -> Mapping: + async def request_tunnel_establish(self) -> dict: pass @abstractmethod @@ -191,7 +192,7 @@ async def stop_tunnel(self) -> None: self.tun = None @staticmethod - def _encode_cdtunnel_packet(data: Mapping) -> bytes: + def _encode_cdtunnel_packet(data: dict) -> bytes: return CDTunnelPacket.build({'body': json.dumps(data).encode()}) @@ -215,7 +216,7 @@ async def send_packet_to_device(self, packet: bytes) -> None: self._quic.send_datagram_frame(packet) self.transmit() - async def request_tunnel_establish(self) -> Mapping: + async def request_tunnel_establish(self) -> dict: stream_id = self._quic.get_next_available_stream_id() # pad the data with random data to force the MTU size correctly self._quic.send_datagram_frame(b'x' * 1024) @@ -249,7 +250,7 @@ def quic_event_received(self, event: QuicEvent) -> None: self.tun.write(LOOPBACK_HEADER + event.data) @staticmethod - def _encode_cdtunnel_packet(data: Mapping) -> bytes: + def _encode_cdtunnel_packet(data: dict) -> bytes: return CDTunnelPacket.build({'body': json.dumps(data).encode()}) @@ -287,7 +288,7 @@ async def wait_closed(self) -> None: except OSError: pass - async def request_tunnel_establish(self) -> Mapping: + async def request_tunnel_establish(self) -> dict: self._writer.write(self._encode_cdtunnel_packet( {'type': 'clientHandshakeRequest', 'mtu': self.REQUESTED_MTU})) await self._writer.drain() @@ -354,14 +355,14 @@ async def close(self) -> None: pass @abstractmethod - async def receive_response(self) -> Mapping: + async def receive_response(self) -> dict: pass @abstractmethod - async def send_request(self, data: Mapping) -> None: + async def send_request(self, data: dict) -> None: pass - async def send_receive_request(self, data: Mapping) -> Mapping: + async def send_receive_request(self, data: dict) -> dict: await self.send_request(data) return await self.receive_response() @@ -373,7 +374,7 @@ async def connect(self, autopair: bool = True) -> None: await self._pair() self._init_client_server_main_encryption_keys() - async def create_quic_listener(self, private_key: RSAPrivateKey) -> Mapping: + async def create_quic_listener(self, private_key: RSAPrivateKey) -> dict: request = {'request': {'_0': {'createListener': { 'key': base64.b64encode( private_key.public_key().public_bytes(Encoding.DER, PublicFormat.SubjectPublicKeyInfo) @@ -384,7 +385,7 @@ async def create_quic_listener(self, private_key: RSAPrivateKey) -> Mapping: response = await self._send_receive_encrypted_request(request) return response['createListener'] - async def create_tcp_listener(self) -> Mapping: + async def create_tcp_listener(self) -> dict: request = {'request': {'_0': {'createListener': { 'key': base64.b64encode(self.encryption_key).decode(), 'peerConnectionsInfo': [{'owningPID': os.getpid(), 'owningProcessName': 'CoreDeviceService'}], @@ -469,7 +470,7 @@ def save_pair_record(self) -> None: OSUTIL.chown_to_non_sudo_if_needed(self.pair_record_path) @property - def pair_record(self) -> Optional[Mapping]: + def pair_record(self) -> Optional[dict]: if self.pair_record_path.exists(): return plistlib.loads(self.pair_record_path.read_bytes()) return None @@ -561,7 +562,7 @@ async def _verify_proof(self) -> None: data = self.decode_tlv(PairingDataComponentTLVBuf.parse(response)) assert self.srp_context.verify_proof(data[PairingDataComponentType.PROOF].hex().encode()) - async def _save_pair_record_on_peer(self) -> Mapping: + async def _save_pair_record_on_peer(self) -> dict: # HKDF with above computed key (SRP_compute_key) + Pair-Setup-Encrypt-Salt + Pair-Setup-Encrypt-Info # result used as key for chacha20-poly1305 setup_encryption_key = HKDF( @@ -728,7 +729,7 @@ async def _validate_pairing(self) -> bool: async def _send_pair_verify_failed(self) -> None: await self._send_plain_request({'event': {'_0': {'pairVerifyFailed': {}}}}) - async def _send_receive_encrypted_request(self, request: Mapping) -> Mapping: + async def _send_receive_encrypted_request(self, request: dict) -> dict: nonce = Int64ul.build(self._encrypted_sequence_number) + b'\x00' * 4 encrypted_data = self.client_cip.encrypt( nonce, @@ -750,15 +751,15 @@ async def _send_receive_encrypted_request(self, request: Mapping) -> Mapping: return response - async def _send_receive_handshake(self, handshake_data: Mapping) -> Mapping: + async def _send_receive_handshake(self, handshake_data: dict) -> dict: response = await self._send_receive_plain_request({'request': {'_0': {'handshake': {'_0': handshake_data}}}}) return response['response']['_1']['handshake']['_0'] - async def _send_receive_pairing_data(self, pairing_data: Mapping) -> bytes: + async def _send_receive_pairing_data(self, pairing_data: dict) -> bytes: await self._send_pairing_data(pairing_data) return await self._receive_pairing_data() - async def _send_pairing_data(self, pairing_data: Mapping) -> None: + async def _send_pairing_data(self, pairing_data: dict) -> None: await self._send_plain_request({'event': {'_0': {'pairingData': {'_0': pairing_data}}}}) async def _receive_pairing_data(self) -> bytes: @@ -773,22 +774,22 @@ async def _receive_pairing_data(self) -> bytes: .get('NSLocalizedDescription')) raise PyMobileDevice3Exception(f'Got an unknown state message: {response}') - async def _send_receive_plain_request(self, plain_request: Mapping): + async def _send_receive_plain_request(self, plain_request: dict): await self._send_plain_request(plain_request) return await self._receive_plain_response() - async def _send_plain_request(self, plain_request: Mapping) -> None: + async def _send_plain_request(self, plain_request: dict) -> None: await self.send_request({'message': {'plain': {'_0': plain_request}}, 'originatedBy': 'host', 'sequenceNumber': XpcUInt64Type(self._sequence_number)}) self._sequence_number += 1 - async def _receive_plain_response(self) -> Mapping: + async def _receive_plain_response(self) -> dict: response = await self.receive_response() return response['message']['plain']['_0'] @staticmethod - def decode_tlv(tlv_list: List[Container]) -> Mapping: + def decode_tlv(tlv_list: list[Container]) -> dict: result = {} for tlv in tlv_list: if tlv.type in result: @@ -828,11 +829,11 @@ async def close(self) -> None: await self.rsd.close() await self.service.close() - async def receive_response(self) -> Mapping: + async def receive_response(self) -> dict: response = await self.service.receive_response() return response['value'] - async def send_request(self, data: Mapping) -> None: + async def send_request(self, data: dict) -> None: return await self.service.send_request({ 'mangledTypeName': 'RemotePairing.ControlChannelMessageEnvelope', 'value': data}) @@ -874,12 +875,12 @@ async def close(self) -> None: self._writer = None self._reader = None - async def receive_response(self) -> Mapping: + async def receive_response(self) -> dict: await self._reader.readexactly(len(REPAIRING_PACKET_MAGIC)) size = struct.unpack('>H', await self._reader.readexactly(2))[0] return json.loads(await self._reader.readexactly(size)) - async def send_request(self, data: Mapping) -> None: + async def send_request(self, data: dict) -> None: self._writer.write( RPPairingPacket.build({'body': json.dumps(data, default=self._default_json_encoder).encode()})) await self._writer.drain() @@ -1016,7 +1017,7 @@ async def start_tunnel( async def get_core_device_tunnel_services( bonjour_timeout: float = DEFAULT_BONJOUR_TIMEOUT, - udid: Optional[str] = None) -> List[CoreDeviceTunnelService]: + udid: Optional[str] = None) -> list[CoreDeviceTunnelService]: result = [] for rsd in await get_rsds(bonjour_timeout=bonjour_timeout, udid=udid): if udid is None and ((Version(rsd.product_version) < Version('17.0')) @@ -1035,7 +1036,7 @@ async def get_core_device_tunnel_services( async def get_remote_pairing_tunnel_services( bonjour_timeout: float = DEFAULT_BONJOUR_TIMEOUT, - udid: Optional[str] = None) -> List[RemotePairingTunnelService]: + udid: Optional[str] = None) -> list[RemotePairingTunnelService]: result = [] for answer in await browse_remotepairing(timeout=bonjour_timeout): for ip in answer.ips: diff --git a/pymobiledevice3/remote/utils.py b/pymobiledevice3/remote/utils.py index 240e72e5f..38e687cfa 100644 --- a/pymobiledevice3/remote/utils.py +++ b/pymobiledevice3/remote/utils.py @@ -1,6 +1,7 @@ import contextlib import platform -from typing import Generator, List, Optional +from collections.abc import Generator +from typing import Optional import psutil @@ -12,7 +13,7 @@ async def get_rsds(bonjour_timeout: float = DEFAULT_BONJOUR_TIMEOUT, udid: Optional[str] = None) -> \ - List[RemoteServiceDiscoveryService]: + list[RemoteServiceDiscoveryService]: result = [] with stop_remoted(): for answer in await browse_remoted(timeout=bonjour_timeout): diff --git a/pymobiledevice3/remote/xpc_message.py b/pymobiledevice3/remote/xpc_message.py index a2b9b05a7..73df9ffe1 100644 --- a/pymobiledevice3/remote/xpc_message.py +++ b/pymobiledevice3/remote/xpc_message.py @@ -1,7 +1,7 @@ import dataclasses import uuid from datetime import datetime -from typing import Any, List, Mapping +from typing import Any from construct import Aligned, Array, Bytes, Const, CString, Default, Double, Enum, ExprAdapter, FlagsEnum, \ GreedyBytes, Hex, If, Int32ul, Int64sl, Int64ul, LazyBound @@ -124,7 +124,7 @@ class FileTransferType: transfer_size: int -def _decode_xpc_dictionary(xpc_object) -> Mapping: +def _decode_xpc_dictionary(xpc_object) -> dict: if xpc_object.data.count == 0: return {} result = {} @@ -133,7 +133,7 @@ def _decode_xpc_dictionary(xpc_object) -> Mapping: return result -def _decode_xpc_array(xpc_object) -> List: +def _decode_xpc_array(xpc_object) -> list: result = [] for entry in xpc_object.data.entries: result.append(decode_xpc_object(entry)) @@ -202,7 +202,7 @@ def decode_xpc_object(xpc_object) -> Any: return decoder(xpc_object) -def _build_xpc_array(payload: List) -> Mapping: +def _build_xpc_array(payload: list) -> dict: entries = [] for entry in payload: entry = _build_xpc_object(entry) @@ -216,7 +216,7 @@ def _build_xpc_array(payload: List) -> Mapping: } -def _build_xpc_dictionary(payload: Mapping) -> Mapping: +def _build_xpc_dictionary(payload: dict) -> dict: entries = [] for key, value in payload.items(): entry = {'key': key, 'value': _build_xpc_object(value)} @@ -230,63 +230,63 @@ def _build_xpc_dictionary(payload: Mapping) -> Mapping: } -def _build_xpc_bool(payload: bool) -> Mapping: +def _build_xpc_bool(payload: bool) -> dict: return { 'type': XpcMessageType.BOOL, 'data': payload, } -def _build_xpc_string(payload: str) -> Mapping: +def _build_xpc_string(payload: str) -> dict: return { 'type': XpcMessageType.STRING, 'data': payload, } -def _build_xpc_data(payload: bool) -> Mapping: +def _build_xpc_data(payload: bool) -> dict: return { 'type': XpcMessageType.DATA, 'data': payload, } -def _build_xpc_double(payload: float) -> Mapping: +def _build_xpc_double(payload: float) -> dict: return { 'type': XpcMessageType.DOUBLE, 'data': payload, } -def _build_xpc_uuid(payload: uuid.UUID) -> Mapping: +def _build_xpc_uuid(payload: uuid.UUID) -> dict: return { 'type': XpcMessageType.UUID, 'data': payload.bytes, } -def _build_xpc_null(payload: None) -> Mapping: +def _build_xpc_null(payload: None) -> dict: return { 'type': XpcMessageType.NULL, 'data': None, } -def _build_xpc_uint64(payload: XpcUInt64Type) -> Mapping: +def _build_xpc_uint64(payload: XpcUInt64Type) -> dict: return { 'type': XpcMessageType.UINT64, 'data': payload, } -def _build_xpc_int64(payload: XpcInt64Type) -> Mapping: +def _build_xpc_int64(payload: XpcInt64Type) -> dict: return { 'type': XpcMessageType.INT64, 'data': payload, } -def _build_xpc_object(payload: Any) -> Mapping: +def _build_xpc_object(payload: Any) -> dict: if payload is None: return _build_xpc_null(payload) payload_builders = { @@ -307,7 +307,7 @@ def _build_xpc_object(payload: Any) -> Mapping: return builder(payload) -def create_xpc_wrapper(d: Mapping, message_id: int = 0, wanting_reply: bool = False) -> bytes: +def create_xpc_wrapper(d: dict, message_id: int = 0, wanting_reply: bool = False) -> bytes: flags = XpcFlags.ALWAYS_SET if len(d.keys()) > 0: flags |= XpcFlags.DATA_PRESENT diff --git a/pymobiledevice3/resources/firmware_notifications.py b/pymobiledevice3/resources/firmware_notifications.py index ce0cbeade..4a4206029 100644 --- a/pymobiledevice3/resources/firmware_notifications.py +++ b/pymobiledevice3/resources/firmware_notifications.py @@ -1,7 +1,6 @@ import logging import os import plistlib -from typing import List import click import coloredlogs @@ -14,7 +13,7 @@ def get_notifications(): return f.read().decode().split('\n') -def save_notifications(notifications: List[str]): +def save_notifications(notifications: list[str]): with open(NOTIFICATIONS_FILENAME, 'wb') as f: notifications.sort() f.write('\n'.join(notifications).encode()) diff --git a/pymobiledevice3/restore/asr.py b/pymobiledevice3/restore/asr.py index 05bed60a1..4f1aa1872 100644 --- a/pymobiledevice3/restore/asr.py +++ b/pymobiledevice3/restore/asr.py @@ -48,20 +48,20 @@ async def connect(self, port: int = DEFAULT_ASR_SYNC_PORT) -> None: self.checksum_chunks = data.get('Checksum Chunks', False) self.logger.debug(f'Checksum Chunks: {self.checksum_chunks}') - async def recv_plist(self) -> typing.Mapping: + async def recv_plist(self) -> dict: buf = b'' while not buf.endswith(b'\n'): buf += await self.service.aio_recvall(1) return plistlib.loads(buf) - async def send_plist(self, plist: typing.Mapping) -> None: + async def send_plist(self, plist: dict) -> None: self.logger.debug(plistlib.dumps(plist).decode()) await self.send_buffer(plistlib.dumps(plist)) async def send_buffer(self, buf: bytes) -> None: await self.service.aio_sendall(buf) - async def handle_oob_data_request(self, packet: typing.Mapping, filesystem: typing.IO): + async def handle_oob_data_request(self, packet: dict, filesystem: typing.IO): oob_length = packet['OOB Length'] oob_offset = packet['OOB Offset'] filesystem.seek(oob_offset, os.SEEK_SET) diff --git a/pymobiledevice3/restore/base_restore.py b/pymobiledevice3/restore/base_restore.py index 58a41dd6c..5398332be 100644 --- a/pymobiledevice3/restore/base_restore.py +++ b/pymobiledevice3/restore/base_restore.py @@ -1,7 +1,7 @@ import asyncio import logging -import typing from enum import Enum +from typing import Optional from zipfile import ZipFile from ipsw_parser.exceptions import NoSuchBuildIdentityError @@ -22,7 +22,7 @@ class Behavior(Enum): class BaseRestore: - def __init__(self, ipsw: ZipFile, device: Device, tss: typing.Mapping = None, + def __init__(self, ipsw: ZipFile, device: Device, tss: Optional[dict] = None, behavior: Behavior = Behavior.Update) -> None: self.ipsw = IPSW(ipsw) self.device = device diff --git a/pymobiledevice3/restore/recovery.py b/pymobiledevice3/restore/recovery.py index 56cb13939..ec8a2d7e0 100644 --- a/pymobiledevice3/restore/recovery.py +++ b/pymobiledevice3/restore/recovery.py @@ -1,7 +1,7 @@ import hashlib import logging import time -import typing +from typing import Optional from zipfile import ZipFile from usb import USBError @@ -20,7 +20,7 @@ class Recovery(BaseRestore): - def __init__(self, ipsw: ZipFile, device: Device, tss: typing.Mapping = None, behavior: Behavior = Behavior.Update): + def __init__(self, ipsw: ZipFile, device: Device, tss: Optional[dict] = None, behavior: Behavior = Behavior.Update): super().__init__(ipsw, device, tss, behavior) self.tss_localpolicy = None self.tss_recoveryos_root_ticket = None diff --git a/pymobiledevice3/restore/restore.py b/pymobiledevice3/restore/restore.py index 1549937cd..124970633 100644 --- a/pymobiledevice3/restore/restore.py +++ b/pymobiledevice3/restore/restore.py @@ -9,7 +9,7 @@ import typing import zipfile from concurrent.futures import ThreadPoolExecutor -from typing import Mapping, MutableMapping, Optional +from typing import Optional import requests from tqdm import tqdm, trange @@ -140,12 +140,12 @@ def __init__(self, ipsw: zipfile.ZipFile, device: Device, tss=None, behavior: Be 'DeviceTree': self.send_component, } - def handle_async_data_request_msg(self, message: Mapping) -> typing.Coroutine: + def handle_async_data_request_msg(self, message: dict) -> typing.Coroutine: self._tasks.append(asyncio.create_task(self.handle_data_request_msg(message), name=f'AsyncDataRequestMsg-{message["DataType"]}')) return asyncio.sleep(0) - async def send_filesystem(self, message: Mapping) -> None: + async def send_filesystem(self, message: dict) -> None: self.logger.info('about to send filesystem...') asr_port = message.get('DataPort', DEFAULT_ASR_SYNC_PORT) @@ -178,7 +178,7 @@ async def send_filesystem(self, message: Mapping) -> None: def get_build_identity_from_request(self, msg): return self.get_build_identity(msg['Arguments'].get('IsRecoveryOS', False)) - async def send_buildidentity(self, message: Mapping) -> None: + async def send_buildidentity(self, message: dict) -> None: self.logger.info('About to send BuildIdentity Dict...') service = await self._get_service_for_data_request(message) req = {'BuildIdentityDict': dict(self.get_build_identity_from_request(message))} @@ -188,7 +188,7 @@ async def send_buildidentity(self, message: Mapping) -> None: self.logger.info('Sending BuildIdentityDict now...') await service.aio_send_plist(req) - async def extract_global_manifest(self) -> Mapping: + async def extract_global_manifest(self) -> dict: build_info = self.build_identity.get('Info') if build_info is None: raise PyMobileDevice3Exception('build identity does not contain an "Info" element') @@ -204,7 +204,7 @@ async def extract_global_manifest(self) -> Mapping: # The path of the global manifest is hardcoded. There's no pointer to in the build manifest. return self.ipsw.get_global_manifest(macos_variant, device_class) - async def send_personalized_boot_object_v3(self, message: Mapping) -> None: + async def send_personalized_boot_object_v3(self, message: dict) -> None: self.logger.debug('send_personalized_boot_object_v3') service = await self._get_service_for_data_request(message) image_name = message['Arguments']['ImageName'] @@ -230,7 +230,7 @@ async def send_personalized_boot_object_v3(self, message: Mapping) -> None: self.logger.info(f'Done sending {component_name}') - async def send_source_boot_object_v4(self, message: Mapping) -> None: + async def send_source_boot_object_v4(self, message: dict) -> None: self.logger.debug('send_source_boot_object_v4') service = await self._get_service_for_data_request(message) image_name = message['Arguments']['ImageName'] @@ -314,7 +314,7 @@ def get_build_identity(self, is_recovery_os: bool): return self.ipsw.build_manifest.get_build_identity(self.device.hardware_model, variant=variant) - async def send_restore_local_policy(self, message: Mapping) -> None: + async def send_restore_local_policy(self, message: dict) -> None: component = 'Ap,LocalPolicy' service = await self._get_service_for_data_request(message) @@ -326,7 +326,7 @@ async def send_restore_local_policy(self, message: Mapping) -> None: await service.aio_send_plist({'Ap,LocalPolicy': build_identity.get_component(component, tss=tss_localpolicy, data=lpol_file).personalized_data}) - async def send_recovery_os_root_ticket(self, message: Mapping) -> None: + async def send_recovery_os_root_ticket(self, message: dict) -> None: self.logger.info('About to send RecoveryOSRootTicket...') service = await self._get_service_for_data_request(message) @@ -347,7 +347,7 @@ async def send_recovery_os_root_ticket(self, message: Mapping) -> None: self.logger.info('Sending RecoveryOSRootTicket now...') await service.aio_send_plist(req) - async def send_root_ticket(self, message: Mapping) -> None: + async def send_root_ticket(self, message: dict) -> None: self.logger.info('About to send RootTicket...') service = await self._get_service_for_data_request(message) @@ -357,7 +357,7 @@ async def send_root_ticket(self, message: Mapping) -> None: self.logger.info('Sending RootTicket now...') await service.aio_send_plist({'RootTicketData': self.recovery.tss.ap_img4_ticket}) - async def send_nor(self, message: Mapping): + async def send_nor(self, message: dict): self.logger.info('About to send NORData...') service = await self._get_service_for_data_request(message) @@ -556,7 +556,7 @@ def sign_bbfw(self, bbfw_orig, bbtss, bb_nonce): if tmp_zip_read_name: os.remove(tmp_zip_read_name) - async def send_baseband_data(self, message: Mapping): + async def send_baseband_data(self, message: dict): self.logger.info(f'About to send BasebandData: {message}') service = await self._get_service_for_data_request(message) @@ -610,7 +610,7 @@ async def send_baseband_data(self, message: Mapping): self.logger.info('Sending BasebandData now...') await service.aio_send_plist({'BasebandData': buffer}) - async def send_fdr_trust_data(self, message: Mapping) -> None: + async def send_fdr_trust_data(self, message: dict) -> None: self.logger.info('About to send FDR Trust data...') service = await self._get_service_for_data_request(message) @@ -621,7 +621,7 @@ async def send_fdr_trust_data(self, message: Mapping) -> None: await service.aio_send_plist({}) async def send_image_data( - self, message: Mapping, image_list_k: Optional[str], image_type_k: Optional[str], + self, message: dict, image_list_k: Optional[str], image_type_k: Optional[str], image_data_k: Optional[str]) -> None: self.logger.debug(f'send_image_data: {message}') arguments = message['Arguments'] @@ -682,7 +682,7 @@ async def send_image_data( await self._restored.send(req) - async def send_bootability_bundle_data(self, message: Mapping) -> None: + async def send_bootability_bundle_data(self, message: dict) -> None: self.logger.debug(f'send_bootability_bundle_data: {message}') service = await self._get_service_for_data_request(message) await service.aio_sendall(self.ipsw.bootability) @@ -692,7 +692,7 @@ async def send_manifest(self) -> None: self.logger.debug('send_manifest') await self._restored.send({'ReceiptManifest': self.build_identity.manifest}) - async def get_se_firmware_data(self, updater_name: str, info: Mapping, arguments: Mapping) -> Mapping: + async def get_se_firmware_data(self, updater_name: str, info: dict, arguments: dict) -> dict: chip_id = info.get('SE,ChipID') if chip_id is None: chip_id = self.build_identity['Manifest']['SE,ChipID'] @@ -741,7 +741,7 @@ async def get_se_firmware_data(self, updater_name: str, info: Mapping, arguments return response - async def get_yonkers_firmware_data(self, info: Mapping): + async def get_yonkers_firmware_data(self, info: dict): # create Yonkers request request = TSSRequest() parameters = dict() @@ -779,7 +779,7 @@ async def get_yonkers_firmware_data(self, info: Mapping): return response - async def get_savage_firmware_data(self, info: Mapping): + async def get_savage_firmware_data(self, info: dict): # create Savage request request = TSSRequest() parameters = dict() @@ -814,7 +814,7 @@ async def get_savage_firmware_data(self, info: Mapping): return response - async def get_rose_firmware_data(self, updater_name: str, info: Mapping, arguments: Mapping): + async def get_rose_firmware_data(self, updater_name: str, info: dict, arguments: dict): self.logger.info(f'get_rose_firmware_data: {info}') if 'DeviceGeneratedTags' in arguments: @@ -868,7 +868,7 @@ async def get_rose_firmware_data(self, updater_name: str, info: Mapping, argumen return response - async def get_veridian_firmware_data(self, updater_name: str, info: Mapping, arguments: Mapping): + async def get_veridian_firmware_data(self, updater_name: str, info: dict, arguments: dict): self.logger.info(f'get_veridian_firmware_data: {info}') comp_name = 'BMU,FirmwareMap' @@ -904,7 +904,7 @@ async def get_veridian_firmware_data(self, updater_name: str, info: Mapping, arg return response - async def get_tcon_firmware_data(self, info: Mapping): + async def get_tcon_firmware_data(self, info: dict): self.logger.info(f'restore_get_tcon_firmware_data: {info}') comp_name = 'Baobab,TCON' @@ -932,7 +932,7 @@ async def get_tcon_firmware_data(self, info: Mapping): return response - async def get_device_generated_firmware_data(self, updater_name: str, info: Mapping, arguments: Mapping) -> Mapping: + async def get_device_generated_firmware_data(self, updater_name: str, info: dict, arguments: dict) -> dict: self.logger.info(f'get_device_generated_firmware_data ({updater_name}): {arguments}') request = TSSRequest() parameters = dict() @@ -971,7 +971,7 @@ async def get_device_generated_firmware_data(self, updater_name: str, info: Mapp return response - async def get_timer_firmware_data(self, info: Mapping): + async def get_timer_firmware_data(self, info: dict): self.logger.info(f'get_timer_firmware_data: {info}') ftab = None @@ -1045,7 +1045,7 @@ async def get_timer_firmware_data(self, info: Mapping): return response - async def send_firmware_updater_data(self, message: Mapping): + async def send_firmware_updater_data(self, message: dict): self.logger.debug(f'got FirmwareUpdaterData request: {message}') service = await self._get_service_for_data_request(message) arguments = message['Arguments'] @@ -1105,12 +1105,12 @@ async def send_firmware_updater_data(self, message: Mapping): self.logger.info('Sending FirmwareResponse data now...') await service.aio_send_plist({'FirmwareResponseData': fwdict}) - async def send_firmware_updater_preflight(self, message: Mapping) -> None: + async def send_firmware_updater_preflight(self, message: dict) -> None: self.logger.warning(f'send_firmware_updater_preflight: {message}') service = await self._get_service_for_data_request(message) await service.aio_send_plist({}) - async def send_url_asset(self, message: Mapping) -> None: + async def send_url_asset(self, message: dict) -> None: self.logger.info(f'send_url_asset: {message}') service = await self._get_service_for_data_request(message) arguments = message['Arguments'] @@ -1133,7 +1133,7 @@ async def send_url_asset(self, message: Mapping) -> None: }, fmt=plistlib.FMT_BINARY) await service.aio_close() - async def send_streamed_image_decryption_key(self, message: Mapping) -> None: + async def send_streamed_image_decryption_key(self, message: dict) -> None: self.logger.info(f'send_streamed_image_decryption_key: {message}') service = await self._get_service_for_data_request(message) arguments = message['Arguments'] @@ -1160,7 +1160,7 @@ async def send_component(self, component: str, component_name: Optional[str] = N {f'{component_name}File': self.build_identity.get_component(component, tss=self.recovery.tss).personalized_data}) - async def handle_data_request_msg(self, message: Mapping): + async def handle_data_request_msg(self, message: dict): self.logger.debug(f'handle_data_request_msg: {message}') # checks and see what kind of data restored is requests and pass the request to its own handler @@ -1192,11 +1192,11 @@ async def handle_data_request_msg(self, message: Mapping): else: self.logger.error(f'unknown data request: {message}') - async def handle_previous_restore_log_msg(self, message: Mapping): + async def handle_previous_restore_log_msg(self, message: dict): restorelog = message['PreviousRestoreLog'] self.logger.debug(f'PreviousRestoreLog: {restorelog}') - async def handle_progress_msg(self, message: MutableMapping) -> None: + async def handle_progress_msg(self, message: dict) -> None: operation = message['Operation'] if operation in PROGRESS_BAR_OPERATIONS: message['Operation'] = PROGRESS_BAR_OPERATIONS[operation] @@ -1219,7 +1219,7 @@ async def handle_progress_msg(self, message: MutableMapping) -> None: self.logger.debug(f'progress-bar: {message}') - async def handle_status_msg(self, message: Mapping): + async def handle_status_msg(self, message: dict): self.logger.debug(f'status message: {message}') status = message['Status'] log = message.get('Log') @@ -1237,15 +1237,15 @@ async def handle_status_msg(self, message: Mapping): else: self.logger.error('unknown error') - async def handle_checkpoint_msg(self, message: Mapping): + async def handle_checkpoint_msg(self, message: dict): self.logger.debug(f'checkpoint: {message}') - async def handle_bb_update_status_msg(self, message: Mapping): + async def handle_bb_update_status_msg(self, message: dict): self.logger.debug(f'bb_update_status_msg: {message}') if not message['Accepted']: raise PyMobileDevice3Exception(str(message)) - async def handle_baseband_updater_output_data(self, message: Mapping) -> None: + async def handle_baseband_updater_output_data(self, message: dict) -> None: self.logger.debug(f'restore_handle_baseband_updater_output_data: {message}') data_port = message['DataPort'] @@ -1277,14 +1277,14 @@ async def handle_baseband_updater_output_data(self, message: Mapping) -> None: self.logger.debug('Closing connection of BasebandUpdaterOutputData data port') client.close() - async def handle_restored_crash(self, message: Mapping) -> None: + async def handle_restored_crash(self, message: dict) -> None: backtrace = '\n'.join(message['RestoredBacktrace']) self.logger.info(f'restored crashed. backtrace:\n{backtrace}') - async def handle_async_wait(self, message: Mapping) -> None: + async def handle_async_wait(self, message: dict) -> None: self.logger.debug(message) - async def handle_restore_attestation(self, message: Mapping) -> None: + async def handle_restore_attestation(self, message: dict) -> None: self.logger.debug(message) await self._restored.send({'RestoreShouldAttest': False}) @@ -1354,7 +1354,7 @@ async def update(self): # device is finally in restore mode, let's do this await self.restore_device() - async def _get_service_for_data_request(self, message: Mapping) -> ServiceConnection: + async def _get_service_for_data_request(self, message: dict) -> ServiceConnection: data_port = message.get('DataPort') if data_port is None: return self._restored.service diff --git a/pymobiledevice3/restore/restored_client.py b/pymobiledevice3/restore/restored_client.py index 80200bd51..69984c87d 100644 --- a/pymobiledevice3/restore/restored_client.py +++ b/pymobiledevice3/restore/restored_client.py @@ -1,6 +1,6 @@ import logging from functools import cached_property -from typing import Any, Mapping, Optional +from typing import Any, Optional from pymobiledevice3 import usbmux from pymobiledevice3.exceptions import ConnectionFailedError, NoDeviceConnectedError @@ -58,19 +58,19 @@ async def start_restore(self, opts: Optional[RestoreOptions] = None) -> None: return await self.service.aio_send_plist(req) - async def reboot(self) -> Mapping: + async def reboot(self) -> dict: return await self.service.aio_send_recv_plist({'Request': 'Reboot', 'Label': self.label}) - async def send(self, message: Mapping) -> None: + async def send(self, message: dict) -> None: await self.service.aio_send_plist(message) - async def recv(self) -> Mapping: + async def recv(self) -> dict: return await self.service.aio_recv_plist() @cached_property - async def hardware_info(self) -> Mapping[str, Any]: + async def hardware_info(self) -> dict[str, Any]: return (await self.query_value('HardwareInfo'))['HardwareInfo'] @property - async def saved_debug_info(self) -> Mapping[str, Any]: + async def saved_debug_info(self) -> dict[str, Any]: return (await self.query_value('SavedDebugInfo'))['SavedDebugInfo'] diff --git a/pymobiledevice3/restore/tss.py b/pymobiledevice3/restore/tss.py index 12403859b..3a8930b44 100644 --- a/pymobiledevice3/restore/tss.py +++ b/pymobiledevice3/restore/tss.py @@ -19,14 +19,14 @@ logger = logging.getLogger(__name__) -def get_with_or_without_comma(obj: typing.Mapping, k: str, default=None): +def get_with_or_without_comma(obj: dict, k: str, default=None): val = obj.get(k, obj.get(k.replace(',', ''))) if val is None and default is not None: val = default return val -def is_fw_payload(info: typing.Mapping[str, typing.Any]) -> bool: +def is_fw_payload(info: dict[str, typing.Any]) -> bool: return (info.get('IsFirmwarePayload') or info.get('IsSecondaryFirmwarePayload') or info.get('IsFUDFirmware') or info.get('IsLoadedByiBoot') or info.get('IsEarlyAccessFirmware') or info.get('IsiBootEANFirmware') or info.get('IsiBootNonEssentialFirmware')) @@ -63,8 +63,7 @@ def __init__(self): } @staticmethod - def apply_restore_request_rules(tss_entry: typing.MutableMapping, parameters: typing.MutableMapping, - rules: typing.List): + def apply_restore_request_rules(tss_entry: dict, parameters: dict, rules: list) -> dict: for rule in rules: conditions_fulfilled = True conditions = rule['Conditions'] @@ -106,13 +105,13 @@ def apply_restore_request_rules(tss_entry: typing.MutableMapping, parameters: ty tss_entry[key] = value return tss_entry - def add_tags(self, parameters: typing.Mapping): + def add_tags(self, parameters: dict): for key, value in parameters.items(): if isinstance(value, str) and value.startswith('0x'): value = int(value, 16) self._request[key] = value - def add_common_tags(self, parameters: typing.Mapping, overrides=None): + def add_common_tags(self, parameters: dict, overrides=None): keys = ('ApECID', 'UniqueBuildID', 'ApChipID', 'ApBoardID', 'ApSecurityDomain') for k in keys: if k in parameters: @@ -120,7 +119,7 @@ def add_common_tags(self, parameters: typing.Mapping, overrides=None): if overrides is not None: self._request.update(overrides) - def add_ap_recovery_tags(self, parameters: typing.Mapping, overrides=None): + def add_ap_recovery_tags(self, parameters: dict, overrides=None): skip_keys = ('BasebandFirmware', 'SE,UpdatePayload', 'BaseSystem', 'ANS', 'Ap,AudioBootChime', 'Ap,CIO', 'Ap,RestoreCIO', 'Ap,RestoreTMU', 'Ap,TMU', 'Ap,rOSLogo1', 'Ap,rOSLogo2', 'AppleLogo', 'DCP', 'LLB', 'RecoveryMode', 'RestoreANS', 'RestoreDCP', 'RestoreDeviceTree', 'RestoreKernelCache', @@ -168,7 +167,7 @@ def add_ap_recovery_tags(self, parameters: typing.Mapping, overrides=None): if overrides: self._request.update(overrides) - def add_timer_tags(self, parameters: typing.Mapping, overrides=None): + def add_timer_tags(self, parameters: dict, overrides=None): manifest = parameters['Manifest'] # add tags indicating we want to get the Timer ticket @@ -229,7 +228,7 @@ def add_timer_tags(self, parameters: typing.Mapping, overrides=None): if overrides is not None: self._request.update(overrides) - def add_local_policy_tags(self, parameters: typing.Mapping): + def add_local_policy_tags(self, parameters: dict): self._request['@ApImg4Ticket'] = True keys_to_copy = ( @@ -243,7 +242,7 @@ def add_local_policy_tags(self, parameters: typing.Mapping): v = int(v, 16) self._request[k] = v - def add_vinyl_tags(self, parameters: typing.Mapping, overrides=None): + def add_vinyl_tags(self, parameters: dict, overrides=None): self._request['@BBTicket'] = True self._request['eUICC,ApProductionMode'] = parameters.get('eUICC,ApProductionMode', @@ -281,7 +280,7 @@ def add_vinyl_tags(self, parameters: typing.Mapping, overrides=None): if overrides is not None: self._request.update(overrides) - def add_ap_tags(self, parameters: typing.Mapping, overrides=None): + def add_ap_tags(self, parameters: dict, overrides=None): """ loop over components from build manifest """ manifest_node = parameters['Manifest'] @@ -340,7 +339,7 @@ def add_ap_tags(self, parameters: typing.Mapping, overrides=None): if overrides is not None: self._request.update(overrides) - def add_ap_img3_tags(self, parameters: typing.Mapping): + def add_ap_img3_tags(self, parameters: dict): if 'ApNonce' in parameters: self._request['ApNonce'] = parameters['ApNonce'] self._request['@APTicket'] = True @@ -369,7 +368,7 @@ def add_ap_img4_tags(self, parameters): if parameters.get('RequiresUIDMode'): self._request['Ap,SikaFuse'] = 0 - def add_se_tags(self, parameters: typing.Mapping, overrides=None): + def add_se_tags(self, parameters: dict, overrides=None): manifest = parameters['Manifest'] # add tags indicating we want to get the SE,Ticket @@ -424,7 +423,7 @@ def add_se_tags(self, parameters: typing.Mapping, overrides=None): if overrides is not None: self._request.update(overrides) - def add_savage_tags(self, parameters: typing.Mapping, overrides=None, component_name=None): + def add_savage_tags(self, parameters: dict, overrides=None, component_name=None): manifest = parameters['Manifest'] # add tags indicating we want to get the Savage,Ticket @@ -470,7 +469,7 @@ def add_savage_tags(self, parameters: typing.Mapping, overrides=None, component_ return comp_name - def add_yonkers_tags(self, parameters: typing.Mapping, overrides=None): + def add_yonkers_tags(self, parameters: dict, overrides=None): manifest = parameters['Manifest'] # add tags indicating we want to get the Yonkers,Ticket @@ -522,7 +521,7 @@ def add_yonkers_tags(self, parameters: typing.Mapping, overrides=None): return result_comp_name - def add_baseband_tags(self, parameters: typing.Mapping, overrides=None): + def add_baseband_tags(self, parameters: dict, overrides=None): self._request['@BBTicket'] = True keys_to_copy = ( @@ -554,7 +553,7 @@ def add_baseband_tags(self, parameters: typing.Mapping, overrides=None): if overrides: self._request.update(overrides) - def add_rose_tags(self, parameters: typing.Mapping, overrides: typing.Mapping = None): + def add_rose_tags(self, parameters: dict, overrides: typing.Optional[dict] = None): manifest = parameters['Manifest'] # add tags indicating we want to get the Rap,Ticket @@ -613,7 +612,7 @@ def add_rose_tags(self, parameters: typing.Mapping, overrides: typing.Mapping = if overrides is not None: self._request.update(overrides) - def add_veridian_tags(self, parameters: typing.Mapping, overrides: typing.Mapping = None): + def add_veridian_tags(self, parameters: dict, overrides: typing.Optional[dict] = None): manifest = parameters['Manifest'] # add tags indicating we want to get the Rap,Ticket @@ -656,7 +655,7 @@ def add_veridian_tags(self, parameters: typing.Mapping, overrides: typing.Mappin if overrides is not None: self._request.update(overrides) - def add_tcon_tags(self, parameters: typing.Mapping, overrides: typing.Mapping = None): + def add_tcon_tags(self, parameters: dict, overrides: typing.Optional[dict] = None): manifest = parameters['Manifest'] # add tags indicating we want to get the Baobab,Ticket diff --git a/pymobiledevice3/service_connection.py b/pymobiledevice3/service_connection.py index 0ff03feda..55a3c78e8 100755 --- a/pymobiledevice3/service_connection.py +++ b/pymobiledevice3/service_connection.py @@ -6,7 +6,7 @@ import struct import time from enum import Enum -from typing import Any, Mapping, Optional +from typing import Any, Optional import IPython from pygments import formatters, highlight, lexers @@ -38,7 +38,7 @@ """ -def build_plist(d: Mapping, endianity: str = '>', fmt: Enum = plistlib.FMT_XML) -> bytes: +def build_plist(d: dict, endianity: str = '>', fmt: Enum = plistlib.FMT_XML) -> bytes: payload = plistlib.dumps(d, fmt=fmt) message = struct.pack(endianity + 'L', len(payload)) return message + payload @@ -125,11 +125,11 @@ def sendall(self, data: bytes) -> None: except ssl.SSLEOFError as e: raise ConnectionTerminatedError from e - def send_recv_plist(self, data: Mapping, endianity='>', fmt=plistlib.FMT_XML) -> Any: + def send_recv_plist(self, data: dict, endianity='>', fmt=plistlib.FMT_XML) -> Any: self.send_plist(data, endianity=endianity, fmt=fmt) return self.recv_plist(endianity=endianity) - async def aio_send_recv_plist(self, data: Mapping, endianity='>', fmt=plistlib.FMT_XML) -> Any: + async def aio_send_recv_plist(self, data: dict, endianity='>', fmt=plistlib.FMT_XML) -> Any: await self.aio_send_plist(data, endianity=endianity, fmt=fmt) return await self.aio_recv_plist(endianity=endianity) @@ -173,10 +173,10 @@ def send_prefixed(self, data: bytes) -> None: msg = b''.join([hdr, data]) return self.sendall(msg) - def recv_plist(self, endianity='>') -> Mapping: + def recv_plist(self, endianity='>') -> dict: return parse_plist(self.recv_prefixed(endianity=endianity)) - async def aio_recv_plist(self, endianity='>') -> Mapping: + async def aio_recv_plist(self, endianity='>') -> dict: return parse_plist(await self.aio_recv_prefixed(endianity)) def send_plist(self, d, endianity='>', fmt=plistlib.FMT_XML) -> None: @@ -186,7 +186,7 @@ async def aio_sendall(self, payload: bytes) -> None: self.writer.write(payload) await self.writer.drain() - async def aio_send_plist(self, d: Mapping, endianity: str = '>', fmt: Enum = plistlib.FMT_XML) -> None: + async def aio_send_plist(self, d: dict, endianity: str = '>', fmt: Enum = plistlib.FMT_XML) -> None: await self.aio_sendall(build_plist(d, endianity, fmt)) def ssl_start(self, certfile, keyfile=None) -> None: diff --git a/pymobiledevice3/services/accessibilityaudit.py b/pymobiledevice3/services/accessibilityaudit.py index c0f28aaac..2cc9a01ef 100644 --- a/pymobiledevice3/services/accessibilityaudit.py +++ b/pymobiledevice3/services/accessibilityaudit.py @@ -11,7 +11,7 @@ class SerializedObject: - def __init__(self, fields: typing.MutableMapping): + def __init__(self, fields: dict): self._fields = fields @@ -141,7 +141,7 @@ def foreground_color(self) -> typing.Any: def background_color(self) -> typing.Any: return self._fields['BackgroundColorValue_v1'] - def json(self) -> typing.Mapping: + def json(self) -> dict: resp = { 'element_rect_value': self.rect, 'issue_classification': self.issue_type, @@ -218,11 +218,11 @@ def __init__(self, lockdown: LockdownServiceProvider): self.recv_plist() @property - def capabilities(self) -> typing.List[str]: + def capabilities(self) -> list[str]: self.broadcast.deviceCapabilities() return self.recv_plist()[0] - def run_audit(self, value: typing.List) -> typing.List[AXAuditIssue_v1]: + def run_audit(self, value: list) -> list[AXAuditIssue_v1]: if self.product_version >= Version('15.0'): self.broadcast.deviceBeginAuditTypes_(MessageAux().append_obj(value)) else: @@ -242,7 +242,7 @@ def supported_audits_types(self) -> None: return deserialize_object(self.recv_plist()[0]) @property - def settings(self) -> typing.List[AXAuditDeviceSetting_v1]: + def settings(self) -> list[AXAuditDeviceSetting_v1]: self.broadcast.deviceAccessibilitySettings() return deserialize_object(self.recv_plist()[0]) diff --git a/pymobiledevice3/services/afc.py b/pymobiledevice3/services/afc.py index ffd504d29..e18fcb263 100755 --- a/pymobiledevice3/services/afc.py +++ b/pymobiledevice3/services/afc.py @@ -12,7 +12,7 @@ from collections import namedtuple from datetime import datetime from re import Pattern -from typing import Callable, List, Optional, Union +from typing import Callable, Optional, Union import hexdump from click.exceptions import Exit @@ -341,7 +341,7 @@ def rm_single(self, filename: str, force: bool = False) -> bool: raise @path_to_str() - def rm(self, filename: str, match: Optional[Pattern] = None, force: bool = False) -> List[str]: + def rm(self, filename: str, match: Optional[Pattern] = None, force: bool = False) -> list[str]: """ recursive removal of a directory or a file if did not succeed, return list of undeleted filenames or raise exception depending on force parameter. @@ -857,7 +857,7 @@ def _do_walk(self, directory: Annotated[str, Arg(completer=dir_completer)]): def _do_cat(self, filename: str): print(try_decode(self.afc.get_file_contents(self.relative_path(filename)))) - def _do_rm(self, file: Annotated[List[str], Arg(nargs='+', completer=path_completer)]): + def _do_rm(self, file: Annotated[list[str], Arg(nargs='+', completer=path_completer)]): for filename in file: self.afc.rm(self.relative_path(filename)) diff --git a/pymobiledevice3/services/companion.py b/pymobiledevice3/services/companion.py index 54d4a08b6..9603bf927 100644 --- a/pymobiledevice3/services/companion.py +++ b/pymobiledevice3/services/companion.py @@ -1,5 +1,5 @@ #!/usr/bin/env python3 -import typing +from typing import Optional from pymobiledevice3.exceptions import PyMobileDevice3Exception from pymobiledevice3.lockdown import LockdownClient @@ -40,7 +40,8 @@ def get_value(self, udid: str, key: str): error = response.get('Error') raise PyMobileDevice3Exception(error) - def start_forwarding_service_port(self, remote_port: int, service_name: str = None, options: typing.Mapping = None): + def start_forwarding_service_port(self, remote_port: int, service_name: Optional[str] = None, + options: Optional[dict] = None): service = self.lockdown.start_lockdown_service(self.service_name) request = {'Command': 'StartForwardingServicePort', diff --git a/pymobiledevice3/services/crash_reports.py b/pymobiledevice3/services/crash_reports.py index 3c0d77b7b..dded72a96 100644 --- a/pymobiledevice3/services/crash_reports.py +++ b/pymobiledevice3/services/crash_reports.py @@ -2,7 +2,8 @@ import posixpath import re import time -from typing import Callable, Generator, List, Optional +from collections.abc import Generator +from typing import Callable, Optional from pycrashreport.crash_report import get_crash_report_from_buf from xonsh.built_ins import XSH @@ -69,7 +70,7 @@ def clear(self) -> None: if item != self.APPSTORED_PATH: raise AfcException(f'failed to clear crash reports directory, undeleted items: {undeleted_items}', None) - def ls(self, path: str = '/', depth: int = 1) -> List[str]: + def ls(self, path: str = '/', depth: int = 1) -> list[str]: """ List file and folder in the crash report's directory. :param path: Path to list, relative to the crash report's directory. diff --git a/pymobiledevice3/services/debugserver_applist.py b/pymobiledevice3/services/debugserver_applist.py index 0e2f6313f..e081d620d 100755 --- a/pymobiledevice3/services/debugserver_applist.py +++ b/pymobiledevice3/services/debugserver_applist.py @@ -1,5 +1,4 @@ import plistlib -import typing from pymobiledevice3.lockdown import LockdownClient from pymobiledevice3.services.lockdown_service import LockdownService @@ -13,7 +12,7 @@ class DebugServerAppList(LockdownService): def __init__(self, lockdown: LockdownClient): super().__init__(lockdown, self.SERVICE_NAME) - def get(self) -> typing.Mapping: + def get(self) -> dict: buf = b'' while b'' not in buf: buf += self.service.recv(CHUNK_SIZE) diff --git a/pymobiledevice3/services/device_arbitration.py b/pymobiledevice3/services/device_arbitration.py index cf643bdbf..a718b95f1 100755 --- a/pymobiledevice3/services/device_arbitration.py +++ b/pymobiledevice3/services/device_arbitration.py @@ -1,5 +1,3 @@ -from typing import Mapping - from pymobiledevice3.exceptions import ArbitrationError, DeviceAlreadyInUseError from pymobiledevice3.lockdown import LockdownClient from pymobiledevice3.services.lockdown_service import LockdownService @@ -12,7 +10,7 @@ def __init__(self, lockdown: LockdownClient): super().__init__(lockdown, self.SERVICE_NAME, is_developer_service=True) @property - def version(self) -> Mapping: + def version(self) -> dict: return self.service.send_recv_plist({'command': 'version'}) def check_in(self, hostname: str, force: bool = False): diff --git a/pymobiledevice3/services/diagnostics.py b/pymobiledevice3/services/diagnostics.py index 9dab51072..24f17132a 100755 --- a/pymobiledevice3/services/diagnostics.py +++ b/pymobiledevice3/services/diagnostics.py @@ -1,4 +1,4 @@ -from typing import List, Mapping, Optional +from typing import Optional from pymobiledevice3.exceptions import ConnectionFailedError, DeprecationError, PyMobileDevice3Exception from pymobiledevice3.lockdown import LockdownClient @@ -968,7 +968,7 @@ def __init__(self, lockdown: LockdownServiceProvider): super().__init__(lockdown, service_name, service=service) - def mobilegestalt(self, keys: List[str] = None) -> Mapping: + def mobilegestalt(self, keys: list[str] = None) -> dict: if keys is None or len(keys) == 0: keys = MobileGestaltKeys response = self.service.send_recv_plist({'Request': 'MobileGestalt', 'MobileGestaltKeys': keys}) @@ -984,7 +984,7 @@ def mobilegestalt(self, keys: List[str] = None) -> Mapping: return response['Diagnostics']['MobileGestalt'] - def action(self, action: str) -> Optional[Mapping]: + def action(self, action: str) -> Optional[dict]: response = self.service.send_recv_plist({'Request': action}) if response['Status'] != 'Success': raise PyMobileDevice3Exception(f'failed to perform action: {action}') @@ -999,7 +999,7 @@ def shutdown(self): def sleep(self): self.action('Sleep') - def info(self, diag_type: str = 'All') -> Mapping: + def info(self, diag_type: str = 'All') -> dict: return self.action(diag_type) def ioregistry(self, plane: str = None, name: str = None, ioclass: str = None): @@ -1025,8 +1025,8 @@ def ioregistry(self, plane: str = None, name: str = None, ioclass: str = None): return dd.get('IORegistry') return None - def get_battery(self) -> Mapping: + def get_battery(self) -> dict: return self.ioregistry(ioclass='IOPMPowerSource') - def get_wifi(self) -> Mapping: + def get_wifi(self) -> dict: return self.ioregistry(name='AppleBCMWLANSkywalkInterface') diff --git a/pymobiledevice3/services/dtfetchsymbols.py b/pymobiledevice3/services/dtfetchsymbols.py index 86764394a..12f43d0f6 100755 --- a/pymobiledevice3/services/dtfetchsymbols.py +++ b/pymobiledevice3/services/dtfetchsymbols.py @@ -16,7 +16,7 @@ def __init__(self, lockdown: LockdownClient): self.logger = logging.getLogger(__name__) self.lockdown = lockdown - def list_files(self) -> typing.List[str]: + def list_files(self) -> list[str]: service = self._start_command(self.CMD_LIST_FILES_PLIST) files = service.recv_plist().get('files') service.close() diff --git a/pymobiledevice3/services/dvt/instruments/core_profile_session_tap.py b/pymobiledevice3/services/dvt/instruments/core_profile_session_tap.py index 09f0c65f5..d7dd75446 100644 --- a/pymobiledevice3/services/dvt/instruments/core_profile_session_tap.py +++ b/pymobiledevice3/services/dvt/instruments/core_profile_session_tap.py @@ -586,7 +586,7 @@ class CoreProfileSessionTap(Tap): """ IDENTIFIER = 'com.apple.instruments.server.services.coreprofilesessiontap' - def __init__(self, dvt: DvtSecureSocketProxyService, time_config: typing.Mapping, filters: typing.Set = None): + def __init__(self, dvt: DvtSecureSocketProxyService, time_config: dict, filters: set = None): """ :param dvt: Instruments service proxy. :param time_config: Timing information - numer, denom, mach_absolute_time and matching usecs_since_epoch, @@ -612,7 +612,7 @@ def __init__(self, dvt: DvtSecureSocketProxyService, time_config: typing.Mapping } super().__init__(dvt, self.IDENTIFIER, config) - def get_stackshot(self) -> typing.Mapping: + def get_stackshot(self) -> dict: """ Get a stackshot from the tap. """ diff --git a/pymobiledevice3/services/dvt/instruments/device_info.py b/pymobiledevice3/services/dvt/instruments/device_info.py index 819ca37bd..2acebd517 100644 --- a/pymobiledevice3/services/dvt/instruments/device_info.py +++ b/pymobiledevice3/services/dvt/instruments/device_info.py @@ -32,7 +32,7 @@ def execname_for_pid(self, pid: int) -> str: self._channel.execnameForPid_(MessageAux().append_obj(pid)) return self._channel.receive_plist() - def proclist(self) -> typing.List[typing.Mapping]: + def proclist(self) -> list[dict]: """ Get the process list from the device. :return: List of process and their attributes. @@ -60,7 +60,7 @@ def mach_time_info(self): def mach_kernel_name(self) -> str: return self.request_information('machKernelName') - def kpep_database(self) -> typing.Optional[typing.Mapping]: + def kpep_database(self) -> typing.Optional[dict]: kpep_database = self.request_information('kpepDatabase') if kpep_database is not None: return plistlib.loads(kpep_database) diff --git a/pymobiledevice3/services/dvt/instruments/process_control.py b/pymobiledevice3/services/dvt/instruments/process_control.py index 01b3b1958..822b4e724 100644 --- a/pymobiledevice3/services/dvt/instruments/process_control.py +++ b/pymobiledevice3/services/dvt/instruments/process_control.py @@ -1,5 +1,6 @@ import dataclasses import typing +from typing import Optional from pymobiledevice3.osu.os_utils import get_os_utils from pymobiledevice3.services.dvt.dvt_secure_socket_proxy import DvtSecureSocketProxyService @@ -51,7 +52,7 @@ def process_identifier_for_bundle_identifier(self, app_bundle_identifier: str) - return self._channel.receive_plist() def launch(self, bundle_id: str, arguments=None, kill_existing: bool = True, start_suspended: bool = False, - environment: typing.Mapping = None, extra_options: typing.Mapping = None) -> int: + environment: Optional[dict] = None, extra_options: Optional[dict] = None) -> int: """ Launch a process. :param bundle_id: Bundle id of the process. diff --git a/pymobiledevice3/services/dvt/testmanaged/xcuitest.py b/pymobiledevice3/services/dvt/testmanaged/xcuitest.py index f68c91260..b92b0792a 100644 --- a/pymobiledevice3/services/dvt/testmanaged/xcuitest.py +++ b/pymobiledevice3/services/dvt/testmanaged/xcuitest.py @@ -1,6 +1,6 @@ import logging import time -from typing import Any, Mapping, Optional +from typing import Any, Optional from bpylist2 import archiver from packaging.version import Version @@ -265,7 +265,7 @@ def launch_test_app( return pid -def get_app_info(service_provider: LockdownClient, bundle_id: str) -> Mapping[str, Any]: +def get_app_info(service_provider: LockdownClient, bundle_id: str) -> dict[str, Any]: with InstallationProxyService(lockdown=service_provider) as install_service: apps = install_service.get_apps(bundle_identifiers=[bundle_id]) if not apps: diff --git a/pymobiledevice3/services/installation_proxy.py b/pymobiledevice3/services/installation_proxy.py index 36d6790d7..1babb973a 100644 --- a/pymobiledevice3/services/installation_proxy.py +++ b/pymobiledevice3/services/installation_proxy.py @@ -1,7 +1,7 @@ import os from pathlib import Path from tempfile import TemporaryDirectory -from typing import Callable, List, Mapping, Optional +from typing import Callable, Optional from zipfile import ZIP_DEFLATED, ZipFile from parameter_decorators import str_to_path @@ -59,8 +59,8 @@ def _watch_completion(self, handler: Callable = None, *args) -> None: return raise AppInstallError() - def send_cmd_for_bundle_identifier(self, bundle_identifier: str, cmd: str = 'Archive', options: Mapping = None, - handler: Mapping = None, *args) -> None: + def send_cmd_for_bundle_identifier(self, bundle_identifier: str, cmd: str = 'Archive', options: Optional[dict] = None, + handler: Optional[dict] = None, *args) -> None: """ send a low-level command to installation relay """ cmd = {'Command': cmd, 'ApplicationIdentifier': bundle_identifier} @@ -72,24 +72,24 @@ def send_cmd_for_bundle_identifier(self, bundle_identifier: str, cmd: str = 'Arc self.service.send_plist(cmd) self._watch_completion(handler, *args) - def install(self, ipa_path: str, options: Mapping = None, handler: Callable = None, *args) -> None: + def install(self, ipa_path: str, options: Optional[dict] = None, handler: Callable = None, *args) -> None: """ install given ipa from device path """ self.install_from_local(ipa_path, 'Install', options, handler, args) - def upgrade(self, ipa_path: str, options: Mapping = None, handler: Callable = None, *args) -> None: + def upgrade(self, ipa_path: str, options: Optional[dict] = None, handler: Callable = None, *args) -> None: """ upgrade given ipa from device path """ self.install_from_local(ipa_path, 'Upgrade', options, handler, args) - def restore(self, bundle_identifier: str, options: Mapping = None, handler: Callable = None, *args) -> None: + def restore(self, bundle_identifier: str, options: Optional[dict] = None, handler: Callable = None, *args) -> None: """ no longer supported on newer iOS versions """ self.send_cmd_for_bundle_identifier(bundle_identifier, 'Restore', options, handler, args) - def uninstall(self, bundle_identifier: str, options: Mapping = None, handler: Callable = None, *args) -> None: + def uninstall(self, bundle_identifier: str, options: Optional[dict] = None, handler: Callable = None, *args) -> None: """ uninstall given bundle_identifier """ self.send_cmd_for_bundle_identifier(bundle_identifier, 'Uninstall', options, handler, args) @str_to_path('ipa_or_app_path') - def install_from_local(self, ipa_or_app_path: Path, cmd: str = 'Install', options: Optional[Mapping] = None, + def install_from_local(self, ipa_or_app_path: Path, cmd: str = 'Install', options: Optional[dict] = None, handler: Callable = None, *args) -> None: """ upload given ipa onto device and install it """ if options is None: @@ -108,7 +108,7 @@ def install_from_local(self, ipa_or_app_path: Path, cmd: str = 'Install', option 'PackagePath': TEMP_REMOTE_IPA_FILE}) self._watch_completion(handler, args) - def check_capabilities_match(self, capabilities: Mapping = None, options: Mapping = None) -> Mapping: + def check_capabilities_match(self, capabilities: Optional[dict] = None, options: Optional[dict] = None) -> dict: if options is None: options = {} cmd = {'Command': 'CheckCapabilitiesMatch', @@ -119,7 +119,7 @@ def check_capabilities_match(self, capabilities: Mapping = None, options: Mappin return self.service.send_recv_plist(cmd).get('LookupResult') - def browse(self, options: Mapping = None, attributes: List[str] = None) -> List[Mapping]: + def browse(self, options: Optional[dict] = None, attributes: list[str] = None) -> list[dict]: if options is None: options = {} if attributes: @@ -145,7 +145,7 @@ def browse(self, options: Mapping = None, attributes: List[str] = None) -> List[ return result - def lookup(self, options: Optional[Mapping] = None) -> Mapping: + def lookup(self, options: Optional[dict] = None) -> dict: """ search installation database """ if options is None: options = {} @@ -153,7 +153,7 @@ def lookup(self, options: Optional[Mapping] = None) -> Mapping: return self.service.send_recv_plist(cmd).get('LookupResult') def get_apps(self, application_type: str = 'Any', calculate_sizes: bool = False, - bundle_identifiers: Optional[List[str]] = None) -> Mapping[str, Mapping]: + bundle_identifiers: Optional[list[str]] = None) -> dict[str, dict]: """ get applications according to given criteria """ options = {} if bundle_identifiers is not None: diff --git a/pymobiledevice3/services/misagent.py b/pymobiledevice3/services/misagent.py index a0cb0f287..d99589bf1 100755 --- a/pymobiledevice3/services/misagent.py +++ b/pymobiledevice3/services/misagent.py @@ -1,6 +1,5 @@ import plistlib from io import BytesIO -from typing import List, Mapping from pymobiledevice3.exceptions import PyMobileDevice3Exception from pymobiledevice3.lockdown import LockdownClient @@ -29,7 +28,7 @@ def __init__(self, lockdown: LockdownClient): else: super().__init__(lockdown, self.RSD_SERVICE_NAME) - def install(self, plist: BytesIO) -> Mapping: + def install(self, plist: BytesIO) -> dict: response = self.service.send_recv_plist({'MessageType': 'Install', 'Profile': plist.read(), 'ProfileType': 'Provisioning'}) @@ -38,7 +37,7 @@ def install(self, plist: BytesIO) -> Mapping: return response - def remove(self, profile_id: str) -> Mapping: + def remove(self, profile_id: str) -> dict: response = self.service.send_recv_plist({'MessageType': 'Remove', 'ProfileID': profile_id, 'ProfileType': 'Provisioning'}) @@ -47,7 +46,7 @@ def remove(self, profile_id: str) -> Mapping: return response - def copy_all(self) -> List[ProvisioningProfile]: + def copy_all(self) -> list[ProvisioningProfile]: response = self.service.send_recv_plist({'MessageType': 'CopyAll', 'ProfileType': 'Provisioning'}) if response['Status']: diff --git a/pymobiledevice3/services/mobile_activation.py b/pymobiledevice3/services/mobile_activation.py index 1b54d289e..8b86a0dd3 100755 --- a/pymobiledevice3/services/mobile_activation.py +++ b/pymobiledevice3/services/mobile_activation.py @@ -4,7 +4,6 @@ import xml.etree.ElementTree as ET from contextlib import closing from pathlib import Path -from typing import List, Mapping import click import inquirer3 @@ -39,8 +38,8 @@ class Field: class ActivationForm: title: str description: str - fields: List[Field] - server_info: Mapping[str, str] + fields: list[Field] + server_info: dict[str, str] class MobileActivationService: diff --git a/pymobiledevice3/services/mobile_config.py b/pymobiledevice3/services/mobile_config.py index 26185f60f..8664e030b 100755 --- a/pymobiledevice3/services/mobile_config.py +++ b/pymobiledevice3/services/mobile_config.py @@ -1,7 +1,7 @@ import plistlib from enum import Enum from pathlib import Path -from typing import Any, Mapping, Optional +from typing import Any, Optional from uuid import uuid4 from cryptography import x509 @@ -60,16 +60,16 @@ def escalate(self, keybag_file: Path) -> None: self._send_recv({'RequestType': 'EscalateResponse', 'SignedRequest': signed_challenge}) self._send_recv({'RequestType': 'ProceedWithKeybagMigration'}) - def get_stored_profile(self, purpose: Purpose = Purpose.PostSetupInstallation) -> Mapping: + def get_stored_profile(self, purpose: Purpose = Purpose.PostSetupInstallation) -> dict: return self._send_recv({'RequestType': 'GetStoredProfile', 'Purpose': purpose.value}) def store_profile(self, profile_data: bytes, purpose: Purpose = Purpose.PostSetupInstallation) -> None: self._send_recv({'RequestType': 'StoreProfile', 'ProfileData': profile_data, 'Purpose': purpose.value}) - def get_cloud_configuration(self) -> Mapping: + def get_cloud_configuration(self) -> dict: return self._send_recv({'RequestType': 'GetCloudConfiguration'}).get('CloudConfiguration') - def set_cloud_configuration(self, cloud_configuration: Mapping) -> None: + def set_cloud_configuration(self, cloud_configuration: dict) -> None: self._send_recv({'RequestType': 'SetCloudConfiguration', 'CloudConfiguration': cloud_configuration}) def establish_provisional_enrollment(self, nonce: bytes) -> None: @@ -85,7 +85,7 @@ def erase_device(self, preserve_data_plan: bool, disallow_proximity_setup: bool) except ConnectionAbortedError: pass - def get_profile_list(self) -> Mapping: + def get_profile_list(self) -> dict: return self._send_recv({'RequestType': 'GetProfileList'}) def install_profile(self, payload: bytes) -> None: @@ -110,7 +110,7 @@ def remove_profile(self, identifier: str) -> None: }) self._send_recv({'RequestType': 'RemoveProfile', 'ProfileIdentifier': data}) - def _send_recv(self, request: Mapping) -> Mapping: + def _send_recv(self, request: dict) -> dict: response = self.service.send_recv_plist(request) if response.get('Status', None) != 'Acknowledged': error_chain = response.get('ErrorChain') @@ -258,7 +258,7 @@ def supervise(self, organization: str, keybag_file: Path) -> None: }) def install_managed_profile( - self, display_name: str, payload_content: Mapping[str, Any], payload_uuid: str = str(uuid4()), + self, display_name: str, payload_content: dict[str, Any], payload_uuid: str = str(uuid4()), keybag_file: Optional[Path] = None) -> None: profile_data = plistlib.dumps({ 'PayloadContent': [ diff --git a/pymobiledevice3/services/mobile_image_mounter.py b/pymobiledevice3/services/mobile_image_mounter.py index 96e379805..cb99e026e 100755 --- a/pymobiledevice3/services/mobile_image_mounter.py +++ b/pymobiledevice3/services/mobile_image_mounter.py @@ -1,7 +1,7 @@ import hashlib import plistlib from pathlib import Path -from typing import List, Mapping +from typing import Optional from developer_disk_image.repo import DeveloperDiskImageRepository from packaging.version import Version @@ -22,7 +22,7 @@ class MobileImageMounterService(LockdownService): # implemented in /usr/libexec/mobile_storage_proxy SERVICE_NAME = 'com.apple.mobile.mobile_image_mounter' RSD_SERVICE_NAME = 'com.apple.mobile.mobile_image_mounter.shim.remote' - IMAGE_TYPE: str = None + IMAGE_TYPE: Optional[str] = None def __init__(self, lockdown: LockdownServiceProvider): if isinstance(lockdown, LockdownClient): @@ -36,7 +36,7 @@ def raise_if_cannot_mount(self) -> None: if Version(self.lockdown.product_version).major >= 16 and not self.lockdown.developer_mode_status: raise DeveloperModeIsNotEnabledError() - def copy_devices(self) -> List[Mapping]: + def copy_devices(self) -> list[dict]: """ Copy mounted devices list. """ try: return self.service.send_recv_plist({'Command': 'CopyDevices'})['EntryList'] @@ -81,7 +81,7 @@ def unmount_image(self, mount_path: str) -> None: else: raise PyMobileDevice3Exception(response) - def mount_image(self, image_type: str, signature: bytes, extras: Mapping = None) -> None: + def mount_image(self, image_type: str, signature: bytes, extras: Optional[dict] = None) -> None: """ Upload image into device. """ if self.is_image_mounted(image_type): @@ -132,7 +132,7 @@ def query_developer_mode_status(self) -> bool: except KeyError as e: raise MessageNotSupportedError from e - def query_nonce(self, personalized_image_type: str = None) -> bytes: + def query_nonce(self, personalized_image_type: Optional[str] = None) -> bytes: request = {'Command': 'QueryNonce'} if personalized_image_type is not None: request['PersonalizedImageType'] = personalized_image_type @@ -142,7 +142,7 @@ def query_nonce(self, personalized_image_type: str = None) -> bytes: except KeyError as e: raise MessageNotSupportedError from e - def query_personalization_identifiers(self, image_type: str = None) -> Mapping: + def query_personalization_identifiers(self, image_type: Optional[str] = None) -> dict: request = {'Command': 'QueryPersonalizationIdentifiers'} if image_type is not None: @@ -197,7 +197,7 @@ class PersonalizedImageMounter(MobileImageMounterService): IMAGE_TYPE = 'Personalized' async def mount(self, image: Path, build_manifest: Path, trust_cache: Path, - info_plist: Mapping = None) -> None: + info_plist: Optional[dict]) -> None: self.raise_if_cannot_mount() image = image.read_bytes() @@ -223,7 +223,7 @@ async def mount(self, image: Path, build_manifest: Path, trust_cache: Path, def umount(self) -> None: self.unmount_image('/System/Developer') - async def get_manifest_from_tss(self, build_manifest: Mapping) -> bytes: + async def get_manifest_from_tss(self, build_manifest: dict) -> bytes: request = TSSRequest() personalization_identifiers = self.query_personalization_identifiers() @@ -297,7 +297,8 @@ async def get_manifest_from_tss(self, build_manifest: Mapping) -> bytes: return response['ApImg4Ticket'] -def auto_mount_developer(lockdown: LockdownServiceProvider, xcode: str = None, version: str = None) -> None: +def auto_mount_developer( + lockdown: LockdownServiceProvider, xcode: Optional[str] = None, version: Optional[str] = None) -> None: """ auto-detect correct DeveloperDiskImage and mount it """ if xcode is None: # avoid "default"-ing this option, because Windows and Linux won't have this path @@ -358,7 +359,7 @@ async def auto_mount_personalized(lockdown: LockdownServiceProvider) -> None: await PersonalizedImageMounter(lockdown=lockdown).mount(image, build_manifest, trustcache) -async def auto_mount(lockdown: LockdownServiceProvider, xcode: str = None, version: str = None) -> None: +async def auto_mount(lockdown: LockdownServiceProvider, xcode: Optional[str] = None, version: Optional[str] = None) -> None: if Version(lockdown.product_version) < Version('17.0'): auto_mount_developer(lockdown, xcode=xcode, version=version) else: diff --git a/pymobiledevice3/services/notification_proxy.py b/pymobiledevice3/services/notification_proxy.py index da8da4f42..a35d509af 100755 --- a/pymobiledevice3/services/notification_proxy.py +++ b/pymobiledevice3/services/notification_proxy.py @@ -1,5 +1,6 @@ import socket -from typing import Generator, Mapping, Union +from collections.abc import Generator +from typing import Union from pymobiledevice3.exceptions import NotificationTimeoutError from pymobiledevice3.lockdown_service_provider import LockdownServiceProvider @@ -39,7 +40,7 @@ def notify_register_dispatch(self, name: str) -> None: self.logger.info(f'Observing {name}') self.service.send_plist({'Command': 'ObserveNotification', 'Name': name}) - def receive_notification(self) -> Generator[Mapping, None, None]: + def receive_notification(self) -> Generator[dict, None, None]: while True: try: yield self.service.recv_plist() diff --git a/pymobiledevice3/services/pcapd.py b/pymobiledevice3/services/pcapd.py index 5d4e85812..b4bb1da29 100755 --- a/pymobiledevice3/services/pcapd.py +++ b/pymobiledevice3/services/pcapd.py @@ -1,7 +1,8 @@ #!/usr/bin/env python3 import enum -from typing import Generator, Optional +from collections.abc import Generator +from typing import Optional import pcapng.blocks as blocks from construct import Byte, Bytes, Container, CString, Int16ub, Int32ub, Int32ul, Padded, Seek, Struct, this diff --git a/pymobiledevice3/services/remote_fetch_symbols.py b/pymobiledevice3/services/remote_fetch_symbols.py index 8848df7fc..f30a59e21 100755 --- a/pymobiledevice3/services/remote_fetch_symbols.py +++ b/pymobiledevice3/services/remote_fetch_symbols.py @@ -1,7 +1,6 @@ import dataclasses import uuid from pathlib import Path -from typing import List from tqdm import tqdm @@ -21,8 +20,8 @@ class RemoteFetchSymbolsService(RemoteService): def __init__(self, rsd: RemoteServiceDiscoveryService): super().__init__(rsd, self.SERVICE_NAME) - async def get_dsc_file_list(self) -> List[DSCFile]: - files: List[DSCFile] = [] + async def get_dsc_file_list(self) -> list[DSCFile]: + files: list[DSCFile] = [] response = await self.service.send_receive_request({'XPCDictionary_sideChannel': uuid.uuid4(), 'DSCFilePaths': []}) file_count = response['DSCFilePaths'] for i in range(file_count): diff --git a/pymobiledevice3/services/remote_server.py b/pymobiledevice3/services/remote_server.py index b41fdb9ba..f49fe1760 100644 --- a/pymobiledevice3/services/remote_server.py +++ b/pymobiledevice3/services/remote_server.py @@ -2,7 +2,6 @@ import io import os import plistlib -import typing import uuid from functools import partial from pprint import pprint @@ -507,7 +506,7 @@ def close(self): class Tap: - def __init__(self, dvt, channel_name: str, config: typing.Mapping): + def __init__(self, dvt, channel_name: str, config: dict): self._dvt = dvt self._channel_name = channel_name self._config = config diff --git a/pymobiledevice3/services/restore_service.py b/pymobiledevice3/services/restore_service.py index 3be806a00..242e64495 100755 --- a/pymobiledevice3/services/restore_service.py +++ b/pymobiledevice3/services/restore_service.py @@ -1,5 +1,3 @@ -from typing import Mapping - from pymobiledevice3.exceptions import PyMobileDevice3Exception from pymobiledevice3.remote.remote_service import RemoteService from pymobiledevice3.remote.remote_service_discovery import RemoteServiceDiscoveryService @@ -25,21 +23,21 @@ async def reboot(self) -> None: """ Reboot device """ await self.validate_command('reboot') - async def get_preflightinfo(self) -> Mapping: + async def get_preflightinfo(self) -> dict: """ Get preflight info """ return await self.service.send_receive_request({'command': 'getpreflightinfo'}) - async def get_nonces(self) -> Mapping: + async def get_nonces(self) -> dict: """ Get ApNonce and SEPNonce """ return await self.service.send_receive_request({'command': 'getnonces'}) - async def get_app_parameters(self) -> Mapping: + async def get_app_parameters(self) -> dict: return await self.validate_command('getappparameters') - async def restore_lang(self, language: str) -> Mapping: + async def restore_lang(self, language: str) -> dict: return await self.service.send_receive_request({'command': 'restorelang', 'argument': language}) - async def validate_command(self, command: str) -> Mapping: + async def validate_command(self, command: str) -> dict: """ Execute command and validate result is `success` """ response = await self.service.send_receive_request({'command': command}) if response.get('result') != 'success': diff --git a/pymobiledevice3/services/springboard.py b/pymobiledevice3/services/springboard.py index db38c332f..6865bf2ab 100644 --- a/pymobiledevice3/services/springboard.py +++ b/pymobiledevice3/services/springboard.py @@ -1,5 +1,5 @@ from enum import IntEnum -from typing import List, Mapping, Optional +from typing import Optional from pymobiledevice3.lockdown import LockdownClient from pymobiledevice3.lockdown_service_provider import LockdownServiceProvider @@ -23,13 +23,13 @@ def __init__(self, lockdown: LockdownServiceProvider) -> None: else: super().__init__(lockdown, self.RSD_SERVICE_NAME) - def get_icon_state(self, format_version: str = '2') -> List: + def get_icon_state(self, format_version: str = '2') -> list: cmd = {'command': 'getIconState'} if format_version: cmd['formatVersion'] = format_version return self.service.send_recv_plist(cmd) - def set_icon_state(self, newstate: Optional[List] = None) -> None: + def set_icon_state(self, newstate: Optional[list] = None) -> None: if newstate is None: newstate = {} self.service.send_plist({'command': 'setIconState', 'iconState': newstate}) @@ -46,10 +46,10 @@ def get_interface_orientation(self) -> InterfaceOrientation: def get_wallpaper_pngdata(self) -> bytes: return self.service.send_recv_plist({'command': 'getHomeScreenWallpaperPNGData'}).get('pngData') - def get_homescreen_icon_metrics(self) -> Mapping[str, float]: + def get_homescreen_icon_metrics(self) -> dict[str, float]: return self.service.send_recv_plist({'command': 'getHomeScreenIconMetrics'}) - def get_wallpaper_info(self, wallpaper_name: str) -> Mapping: + def get_wallpaper_info(self, wallpaper_name: str) -> dict: return self.service.send_recv_plist({'command': 'getWallpaperInfo', 'wallpaperName': wallpaper_name}) def reload_icon_state(self) -> None: diff --git a/pymobiledevice3/services/web_protocol/driver.py b/pymobiledevice3/services/web_protocol/driver.py index 031d05b9f..97ee967e0 100644 --- a/pymobiledevice3/services/web_protocol/driver.py +++ b/pymobiledevice3/services/web_protocol/driver.py @@ -1,5 +1,4 @@ from dataclasses import asdict, dataclass -from typing import List from pymobiledevice3.services.web_protocol.automation_session import RESOURCES, Point, Rect, Size from pymobiledevice3.services.web_protocol.element import WebElement @@ -91,7 +90,7 @@ def find_element(self, by=By.ID, value=None) -> WebElement: elem = self.session.find_elements(by, value) return None if elem is None else WebElement(self.session, elem) - def find_elements(self, by=By.ID, value=None) -> List[WebElement]: + def find_elements(self, by=By.ID, value=None) -> list[WebElement]: """ Find elements given a By strategy and locator. """ elements = self.session.find_elements(by, value, single=False) return list(map(lambda elem: WebElement(self.session, elem), elements)) @@ -118,7 +117,7 @@ def get_cookie(self, name: str) -> Cookie: if cookie.name == name: return cookie - def get_cookies(self) -> List[Cookie]: + def get_cookies(self) -> list[Cookie]: """ Returns cookies visible in the current session. """ return list(map(Cookie.from_automation, self.session.get_all_cookies())) @@ -191,6 +190,6 @@ def title(self) -> str: return self.session.evaluate_js_function('function() { return document.title; }') @property - def window_handles(self) -> List[str]: + def window_handles(self) -> list[str]: """ Returns the handles of all windows within the current session. """ return self.session.get_window_handles() diff --git a/pymobiledevice3/services/web_protocol/inspector_session.py b/pymobiledevice3/services/web_protocol/inspector_session.py index 3b1049598..a418f11ca 100644 --- a/pymobiledevice3/services/web_protocol/inspector_session.py +++ b/pymobiledevice3/services/web_protocol/inspector_session.py @@ -2,7 +2,7 @@ import json import logging from collections import UserDict -from typing import List, Mapping, Optional +from typing import Optional from pymobiledevice3.exceptions import InspectorEvaluateError from pymobiledevice3.services.web_protocol.session_protocol import SessionProtocol @@ -21,7 +21,7 @@ class JSObjectPreview(UserDict): - def __init__(self, properties: List[Mapping]): + def __init__(self, properties: list[dict]): super().__init__() for p in properties: name = p['name'] @@ -30,7 +30,7 @@ def __init__(self, properties: List[Mapping]): class JSObjectProperties(UserDict): - def __init__(self, properties: List[Mapping]): + def __init__(self, properties: list[dict]): super().__init__() for p in properties: name = p['name'] @@ -152,7 +152,7 @@ async def runtime_evaluate(self, exp: str, return_by_value: bool = False): async def navigate_to_url(self, url: str): return await self.runtime_evaluate(exp=f'window.location = "{url}"') - async def send_and_receive(self, message: Mapping) -> Mapping: + async def send_and_receive(self, message: dict) -> dict: if self.target_id is None: message_id = await self.protocol.send_command(message['method'], **message.get('params', {})) return await self.protocol.wait_for_message(message_id) @@ -160,7 +160,7 @@ async def send_and_receive(self, message: Mapping) -> Mapping: message_id = await self.send_message_to_target(message) return await self.receive_response_by_id(message_id) - async def send_message_to_target(self, message: Mapping) -> int: + async def send_message_to_target(self, message: dict) -> int: message['id'] = self.message_id self.message_id += 1 await self.protocol.send_command('Target.sendMessageToTarget', targetId=self.target_id, @@ -179,7 +179,7 @@ async def _receive_loop(self): else: logger.error(f'Unknown response: {response}') - async def receive_response_by_id(self, message_id: int) -> Mapping: + async def receive_response_by_id(self, message_id: int) -> dict: while True: if message_id in self._dispatch_message_responses: return self._dispatch_message_responses.pop(message_id) @@ -192,7 +192,7 @@ async def get_properties(self, object_id: str) -> JSObjectProperties: message = json.loads(message['params']['message'])['result'] return JSObjectProperties(message['properties']) - async def _parse_runtime_evaluate(self, response: Mapping): + async def _parse_runtime_evaluate(self, response: dict): if self.target_id is None: message = response else: @@ -227,7 +227,7 @@ async def _parse_runtime_evaluate(self, response: Mapping): return result['value'] # response methods - def _target_dispatch_message_from_target(self, response: Mapping): + def _target_dispatch_message_from_target(self, response: dict): target_message = json.loads(response['params']['message']) receive_message_id = target_message.get('id') if receive_message_id is None: @@ -235,30 +235,30 @@ def _target_dispatch_message_from_target(self, response: Mapping): return self._dispatch_message_responses[receive_message_id] = response - def _missing_id_in_message(self, message: Mapping): + def _missing_id_in_message(self, message: dict): handler = self.response_methods.get(message['method']) if handler is not None: handler(message) else: logger.critical(f'unhandled message: {message}') - def _console_message_added(self, message: Mapping): + def _console_message_added(self, message: dict): log_level = message['params']['message']['level'] text = message['params']['message']['text'] self._last_console_message = message webinspector_logger_handlers[log_level](text) - def _console_message_repeated_count_updated(self, message: Mapping): + def _console_message_repeated_count_updated(self, message: dict): self._console_message_added(self._last_console_message) - def _heap_garbage_collected(self, message: Mapping): + def _heap_garbage_collected(self, message: dict): heap_logger.debug(message['params']) - def _target_created(self, response: Mapping): + def _target_created(self, response: dict): pass - def _target_destroyed(self, response: Mapping): + def _target_destroyed(self, response: dict): pass - def _target_did_commit_provisional_target(self, response: Mapping): + def _target_did_commit_provisional_target(self, response: dict): self.set_target_id(response['params']['newTargetId']) diff --git a/pymobiledevice3/services/webinspector.py b/pymobiledevice3/services/webinspector.py index 8e052a716..d1070c742 100644 --- a/pymobiledevice3/services/webinspector.py +++ b/pymobiledevice3/services/webinspector.py @@ -4,7 +4,7 @@ import uuid from dataclasses import dataclass, fields from enum import Enum -from typing import Mapping, Optional, Tuple, Union +from typing import Optional, Union import nest_asyncio @@ -54,7 +54,7 @@ class Page: automation_connection_id: str = '' @classmethod - def from_page_dictionary(cls, page_dict: Mapping) -> 'Page': + def from_page_dictionary(cls, page_dict: dict) -> 'Page': p = cls(page_dict['WIRPageIdentifierKey'], WirTypes(page_dict['WIRTypeKey'])) if p.type_ in (WirTypes.WEB, WirTypes.WEB_PAGE): p.web_title = page_dict['WIRTitleKey'] @@ -68,7 +68,7 @@ def from_page_dictionary(cls, page_dict: Mapping) -> 'Page': p.automation_connection_id = page_dict['WIRConnectionIdentifierKey'] return p - def update(self, page_dict: Mapping): + def update(self, page_dict: dict): new_p = self.from_page_dictionary(page_dict) for field in fields(self): setattr(self, field.name, getattr(new_p, field.name)) @@ -190,7 +190,7 @@ async def inspector_session(self, app: Application, page: Page, wait_target: boo return await InspectorSession.create(SessionProtocol(self, session_id, app, page, method_prefix=''), wait_target=wait_target) - def get_open_pages(self) -> Mapping: + def get_open_pages(self) -> dict: apps = {} self.await_(asyncio.gather(*[self._forward_get_listing(app) for app in self.connected_application])) for app in self.connected_application: @@ -206,13 +206,13 @@ def open_app(self, bundle: str, timeout: Union[float, int] = 3) -> Application: except TimeoutError: raise LaunchingApplicationError() - async def send_socket_data(self, session_id: str, app_id: str, page_id: int, data: Mapping): + async def send_socket_data(self, session_id: str, app_id: str, page_id: int, data: dict): await self._forward_socket_data(session_id, app_id, page_id, data) async def setup_inspector_socket(self, session_id: str, app_id: str, page_id: int): await self._forward_socket_setup(session_id, app_id, page_id, pause=False) - def find_page_id(self, page_id: str) -> Tuple[Application, Page]: + def find_page_id(self, page_id: str) -> tuple[Application, Page]: for app_id in self.application_pages: for page in self.application_pages[app_id]: if page == page_id: @@ -301,7 +301,7 @@ async def _forward_socket_setup(self, session_id: str, app_id: str, page_id: int message['WIRAutomaticallyPause'] = False await self._send_message('_rpc_forwardSocketSetup:', message) - async def _forward_socket_data(self, session_id: str, app_id: str, page_id: int, data: Mapping): + async def _forward_socket_data(self, session_id: str, app_id: str, page_id: int, data: dict): await self._send_message('_rpc_forwardSocketData:', { 'WIRApplicationIdentifierKey': app_id, 'WIRPageIdentifierKey': page_id, diff --git a/pymobiledevice3/tunneld.py b/pymobiledevice3/tunneld.py index d7c374004..ca961ea6a 100644 --- a/pymobiledevice3/tunneld.py +++ b/pymobiledevice3/tunneld.py @@ -6,7 +6,7 @@ import signal import traceback from contextlib import asynccontextmanager, suppress -from typing import Dict, List, Mapping, Optional, Tuple, Union +from typing import Optional, Union import construct import fastapi @@ -56,8 +56,8 @@ class TunneldCore: def __init__(self, protocol: TunnelProtocol = TunnelProtocol.QUIC, wifi_monitor: bool = True, usb_monitor: bool = True, usbmux_monitor: bool = True, mobdev2_monitor: bool = True) -> None: self.protocol = protocol - self.tasks: List[asyncio.Task] = [] - self.tunnel_tasks: Dict[str, TunnelTask] = {} + self.tasks: list[asyncio.Task] = [] + self.tunnel_tasks: dict[str, TunnelTask] = {} self.usb_monitor = usb_monitor self.wifi_monitor = wifi_monitor self.usbmux_monitor = usbmux_monitor @@ -298,7 +298,7 @@ async def close(self) -> None: with suppress(asyncio.CancelledError): await task - def get_tunnels_ips(self) -> Dict: + def get_tunnels_ips(self) -> dict: """ Retrieve the available tunnel tasks and format them as {UDID: [IP]} """ tunnels_ips = {} for ip, active_tunnel in self.tunnel_tasks.items(): @@ -351,7 +351,7 @@ async def lifespan(app: FastAPI): usbmux_monitor=usbmux_monitor, mobdev2_monitor=mobdev2_monitor) @self._app.get('/') - async def list_tunnels() -> Mapping[str, List[Mapping]]: + async def list_tunnels() -> dict[str, list[dict]]: """ Retrieve the available tunnels and format them as {UUID: TUNNEL_ADDRESS} """ tunnels = {} for ip, active_tunnel in self._tunneld_core.tunnel_tasks.items(): @@ -359,10 +359,10 @@ async def list_tunnels() -> Mapping[str, List[Mapping]]: continue if active_tunnel.udid not in tunnels: tunnels[active_tunnel.udid] = [] - tunnels[active_tunnel.udid].append(({ + tunnels[active_tunnel.udid].append({ 'tunnel-address': active_tunnel.tunnel.address, 'tunnel-port': active_tunnel.tunnel.port, - 'interface': ip})) + 'interface': ip}) return tunnels @self._app.get('/shutdown') @@ -475,18 +475,18 @@ def _run_app(self) -> None: uvicorn.run(self._app, host=self.host, port=self.port, loop='asyncio') -async def async_get_tunneld_devices(tunneld_address: Tuple[str, int] = TUNNELD_DEFAULT_ADDRESS) \ - -> List[RemoteServiceDiscoveryService]: +async def async_get_tunneld_devices(tunneld_address: tuple[str, int] = TUNNELD_DEFAULT_ADDRESS) \ + -> list[RemoteServiceDiscoveryService]: tunnels = _list_tunnels(tunneld_address) return await _create_rsds_from_tunnels(tunnels) -def get_tunneld_devices(tunneld_address: Tuple[str, int] = TUNNELD_DEFAULT_ADDRESS) \ - -> List[RemoteServiceDiscoveryService]: +def get_tunneld_devices(tunneld_address: tuple[str, int] = TUNNELD_DEFAULT_ADDRESS) \ + -> list[RemoteServiceDiscoveryService]: return get_asyncio_loop().run_until_complete(async_get_tunneld_devices(tunneld_address)) -async def async_get_tunneld_device_by_udid(udid: str, tunneld_address: Tuple[str, int] = TUNNELD_DEFAULT_ADDRESS) \ +async def async_get_tunneld_device_by_udid(udid: str, tunneld_address: tuple[str, int] = TUNNELD_DEFAULT_ADDRESS) \ -> Optional[RemoteServiceDiscoveryService]: tunnels = _list_tunnels(tunneld_address) if udid not in tunnels: @@ -495,12 +495,12 @@ async def async_get_tunneld_device_by_udid(udid: str, tunneld_address: Tuple[str return rsds[0] -def get_tunneld_device_by_udid(udid: str, tunneld_address: Tuple[str, int] = TUNNELD_DEFAULT_ADDRESS) \ +def get_tunneld_device_by_udid(udid: str, tunneld_address: tuple[str, int] = TUNNELD_DEFAULT_ADDRESS) \ -> Optional[RemoteServiceDiscoveryService]: return get_asyncio_loop().run_until_complete(async_get_tunneld_device_by_udid(udid, tunneld_address)) -def _list_tunnels(tunneld_address: Tuple[str, int] = TUNNELD_DEFAULT_ADDRESS) -> Mapping[str, List[Mapping]]: +def _list_tunnels(tunneld_address: tuple[str, int] = TUNNELD_DEFAULT_ADDRESS) -> dict[str, list[dict]]: try: # Get the list of tunnels from the specified address resp = requests.get(f'http://{tunneld_address[0]}:{tunneld_address[1]}') @@ -510,7 +510,7 @@ def _list_tunnels(tunneld_address: Tuple[str, int] = TUNNELD_DEFAULT_ADDRESS) -> return tunnels -async def _create_rsds_from_tunnels(tunnels: Mapping[str, List[Mapping]]) -> List[RemoteServiceDiscoveryService]: +async def _create_rsds_from_tunnels(tunnels: dict[str, list[dict]]) -> list[RemoteServiceDiscoveryService]: rsds = [] for udid, details in tunnels.items(): for tunnel_details in details: diff --git a/pymobiledevice3/usbmux.py b/pymobiledevice3/usbmux.py index 615812088..f5c8b8b49 100644 --- a/pymobiledevice3/usbmux.py +++ b/pymobiledevice3/usbmux.py @@ -3,7 +3,7 @@ import socket import time from dataclasses import dataclass -from typing import List, Mapping, Optional +from typing import Optional from construct import Const, CString, Enum, FixedSized, GreedyBytes, Int16ul, Int32ul, Padding, Prefixed, StreamError, \ Struct, Switch, this @@ -293,7 +293,7 @@ def _connect(self, device_id: int, port: int): f'failed to connect to device: {device_id} at port: {port}. reason: ' f'{response.data.result}') - def _send(self, data: Mapping): + def _send(self, data: dict) -> None: self._assert_not_connected() self._sock.send(usbmuxd_request.build(data)) self._tag += 1 @@ -341,7 +341,7 @@ def __init__(self, sock: SafeStreamSocket): def listen(self) -> None: self._send_receive({'MessageType': 'Listen'}) - def get_pair_record(self, serial: str) -> Mapping: + def get_pair_record(self, serial: str) -> dict: # serials are saved inside usbmuxd without '-' self._send({'MessageType': 'ReadPairRecord', 'PairRecordID': serial}) response = self._receive(self._tag - 1) @@ -382,7 +382,7 @@ def save_pair_record(self, serial: str, device_id: int, record_data: bytes): def _connect(self, device_id: int, port: int): self._send_receive({'MessageType': 'Connect', 'DeviceID': device_id, 'PortNumber': port}) - def _send(self, data: Mapping): + def _send(self, data: dict): request = {'ClientVersionString': 'qt4i-usbmuxd', 'ProgName': 'pymobiledevice3', 'kLibUSBMuxVersion': 3} request.update(data) super()._send({'header': {'version': self._version, @@ -391,13 +391,13 @@ def _send(self, data: Mapping): 'data': plistlib.dumps(request), }) - def _receive(self, expected_tag: int = None) -> Mapping: + def _receive(self, expected_tag: int = None) -> dict: response = super()._receive(expected_tag=expected_tag) if response.header.message != usbmuxd_msgtype.PLIST: raise MuxException(f'Received non-plist type {response}') return plistlib.loads(response.data) - def _send_receive(self, data: Mapping): + def _send_receive(self, data: dict): self._send(data) response = self._receive(self._tag - 1) if response['MessageType'] != 'Result': @@ -410,7 +410,7 @@ def create_mux(usbmux_address: Optional[str] = None) -> MuxConnection: return MuxConnection.create(usbmux_address=usbmux_address) -def list_devices(usbmux_address: Optional[str] = None) -> List[MuxDevice]: +def list_devices(usbmux_address: Optional[str] = None) -> list[MuxDevice]: mux = create_mux(usbmux_address=usbmux_address) mux.get_device_list(0.1) devices = mux.devices @@ -444,7 +444,7 @@ def select_device(udid: str = None, connection_type: str = None, usbmux_address: return tmp -def select_devices_by_connection_type(connection_type: str, usbmux_address: Optional[str] = None) -> List[MuxDevice]: +def select_devices_by_connection_type(connection_type: str, usbmux_address: Optional[str] = None) -> list[MuxDevice]: """ select all UsbMux devices by connection type """ diff --git a/tests/services/test_apps.py b/tests/services/test_apps.py index be9b76c19..1528c71fb 100644 --- a/tests/services/test_apps.py +++ b/tests/services/test_apps.py @@ -9,7 +9,7 @@ def test_get_apps(lockdown): def test_get_system_apps(lockdown): with InstallationProxyService(lockdown=lockdown) as installation_proxy: - app_types = set( - [app['ApplicationType'] for app in installation_proxy.get_apps(application_type='System').values()]) + app_types = { + app['ApplicationType'] for app in installation_proxy.get_apps(application_type='System').values()} assert len(app_types) == 1 assert app_types.pop() == 'System'