From 1d4b054d1ee5760d9537bdc675fb7cd8be7be70c Mon Sep 17 00:00:00 2001 From: Koen Vervloesem Date: Tue, 4 Jul 2023 15:23:33 +0200 Subject: [PATCH] Add read time command for PVVX firmware (#19) --- README.rst | 4 +- pyproject.toml | 3 +- src/bluetooth_clocks/devices/pvvx.py | 75 ++++++++++++++++++++++++++-- tests/test_pvvx.py | 11 ++-- 4 files changed, 82 insertions(+), 11 deletions(-) diff --git a/README.rst b/README.rst index e1d0a47..c4ecafd 100644 --- a/README.rst +++ b/README.rst @@ -47,8 +47,8 @@ Bluetooth Clocks supports the following devices: | (e.g. PineTime with | | | | | InfiniTime firmware) | | | | +--------------------------+------------+-------------------+-----------+ -| `PVVX firmware`_ | Yes | No | No | -| (LYWSD03MMC, MHO-C401, | | | (not yet) | +| `PVVX firmware`_ | Yes | No | Yes | +| (LYWSD03MMC, MHO-C401, | | | | | CGG1, CGDK2, MJWSD05MMC, | | | | | MHO-C122) | | | | +--------------------------+------------+-------------------+-----------+ diff --git a/pyproject.toml b/pyproject.toml index 3465277..80f2e18 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,7 +13,8 @@ select = ["ALL"] ignore = [ "ANN101", # flake8-annotations "ANN102", - "ARG002", # flake8-unused-arguments + "ARG001", # flake8-unused-arguments + "ARG002", "ARG003", "DTZ001", # flake8-datetimez "DTZ006", diff --git a/src/bluetooth_clocks/devices/pvvx.py b/src/bluetooth_clocks/devices/pvvx.py index ef5c99d..58880b3 100644 --- a/src/bluetooth_clocks/devices/pvvx.py +++ b/src/bluetooth_clocks/devices/pvvx.py @@ -1,6 +1,8 @@ """Bluetooth clock support for devices running the PVVX firmware.""" from __future__ import annotations +import asyncio +import logging import struct from time import localtime from typing import TYPE_CHECKING @@ -10,7 +12,12 @@ from bleak.backends.device import BLEDevice from bleak.backends.scanner import AdvertisementData +from bleak import BleakClient, BleakGATTCharacteristic + from bluetooth_clocks import BluetoothClock +from bluetooth_clocks.exceptions import InvalidTimeBytesError + +_logger = logging.getLogger(__name__) class PVVX(BluetoothClock): @@ -20,8 +27,8 @@ class PVVX(BluetoothClock): SERVICE_UUID = UUID("00001f10-0000-1000-8000-00805f9b34fb") CHAR_UUID = UUID("00001f1f-0000-1000-8000-00805f9b34fb") - TIME_GET_FORMAT = None - """The PVVX firmware doesn't support reading the time.""" + TIME_GET_FORMAT = " None: + """Create a PVVX object. + + Args: + device (BLEDevice): The Bluetooth device. + """ + super().__init__(device) + self.notified_time = 0.0 + @classmethod def recognize( cls, @@ -53,6 +72,54 @@ def recognize( """ return str(cls.SERVICE_DATA_UUID) in advertisement_data.service_data + async def get_time(self) -> float: + """Get the time of the PVVX device. + + Returns: + float: The time of the Bluetooth clock. + """ + _logger.info("Connecting to device...") + async with BleakClient(self.address) as client: + service = client.services.get_service(self.SERVICE_UUID) + characteristic = service.get_characteristic(self.CHAR_UUID) + + # Define callback function for notifications and subscribe + def time_callback(sender: BleakGATTCharacteristic, data: bytearray) -> None: + """Convert bytes read from a notification to a timestamp.""" + self.notified_time = self.get_time_from_bytes(data) + + await client.start_notify(characteristic, time_callback) + + # Write Get/Set Time command + await client.write_gatt_char( + characteristic, + [self.PVVX_GET_SET_TIME_COMMAND], + response=self.WRITE_WITH_RESPONSE, + ) + + # Wait for a few seconds so a notification is received. + await asyncio.sleep(5) + + return self.notified_time + + def get_time_from_bytes(self, time_bytes: bytes) -> float: + """Convert bytes read from the PVVX device to a timestamp. + + Args: + time_bytes (bytes): The raw bytes read from the device. + + Raises: + InvalidTimeBytesError: If `time_bytes` don't have the right format. + + Returns: + float: The time encoded as a Unix timestamp. + """ + try: + _, time_time, _ = struct.unpack(self.TIME_GET_FORMAT, time_bytes) + except struct.error as exception: + raise InvalidTimeBytesError(time_bytes) from exception + return float(time_time) + def get_bytes_from_time( self, timestamp: float, @@ -70,9 +137,9 @@ def get_bytes_from_time( bytes: The bytes needed to set the time of the device to `timestamp`. """ # Convert timestamp to little-endian unsigned long integer - # And add header byte + # And add header byte for Get/Set Time command. return struct.pack( self.TIME_SET_FORMAT, - 0x23, + self.PVVX_GET_SET_TIME_COMMAND, int(timestamp + localtime(timestamp).tm_gmtoff), ) diff --git a/tests/test_pvvx.py b/tests/test_pvvx.py index 5813b00..02c0bf6 100644 --- a/tests/test_pvvx.py +++ b/tests/test_pvvx.py @@ -10,7 +10,6 @@ from bluetooth_clocks import supported_devices from bluetooth_clocks.devices.pvvx import PVVX -from bluetooth_clocks.exceptions import TimeNotReadableError if sys.version_info >= (3, 9): from zoneinfo import ZoneInfo @@ -67,15 +66,19 @@ def test_recognize() -> None: def test_readable() -> None: """Test whether the time is readable on the device.""" - assert not PVVX.is_readable() + assert PVVX.is_readable() +@pytest.mark.skipif(sys.platform.startswith("win"), reason="timezone problem") +@time_machine.travel(datetime(2023, 7, 4, 17, 4, 5, tzinfo=CET_TZ), tick=False) def test_get_time_from_bytes() -> None: """Test the conversion from bytes to a timestamp.""" - with pytest.raises(TimeNotReadableError): + assert ( PVVX(BLEDevice("A4:C1:38:D9:01:10", "", {}, -67)).get_time_from_bytes( - bytes([0x23, 0xDD, 0xBC, 0xB9, 0x63]), + bytes([0x23, 0xE5, 0x34, 0xA4, 0x64, 0x33, 0x3B, 0xA3, 0x64]), ) + == time() + ) @pytest.mark.skipif(sys.platform.startswith("win"), reason="timezone problem")